Merge pull request #1 from friendlyrobotnyc/feature/flow

stream methods using Flows instead of Channels
This commit is contained in:
Brian Plummer 2019-05-27 11:04:45 -04:00 committed by GitHub
commit 98cd388e7f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 65 additions and 50 deletions

View file

@ -7,7 +7,7 @@ import android.widget.TextView
import com.nytimes.android.external.store3.base.impl.MemoryPolicy
import com.nytimes.android.external.store3.base.impl.StoreBuilder
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.flow.collect
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext
@ -49,17 +49,17 @@ class StreamActivity : AppCompatActivity(), CoroutineScope {
}
launch {
store.stream(1).consumeEach {
store.stream(1).collect {
findViewById<TextView>(R.id.stream_1).text = "Stream 1 $it"
}
}
launch {
store.stream(2).consumeEach {
store.stream(2).collect {
findViewById<TextView>(R.id.stream_2).text = "Stream 2 $it"
}
}
launch {
store.stream().consumeEach {
store.stream().collect {
findViewById<TextView>(R.id.stream).text = "Stream $it"
}
}

View file

@ -8,9 +8,7 @@ import com.nytimes.android.external.store3.util.KeyParser
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.filter
import kotlinx.coroutines.channels.map
import kotlinx.coroutines.flow.*
import java.util.concurrent.ConcurrentMap
/**
@ -176,18 +174,18 @@ internal class RealInternalStore<Raw, Parsed, Key>(
}
//STREAM NO longer calls get
override fun stream(key: Key): ReceiveChannel<Parsed> =
streamSubscription().filter { it.first == key }.map { (_, value) -> value }
override fun stream(key: Key): Flow<Parsed> =
streamSubscription().filter { it.first == key }.map { (_, value) -> value }
override fun stream(): ReceiveChannel<Parsed> {
override fun stream(): Flow<Parsed> {
return streamSubscription().map { (_, value) -> value }
}
private fun streamSubscription() =
subject.openSubscription()
//ignore first element so only new elements are returned
.apply { poll() }
.map { it!! }
subject.asFlow()
//ignore first element so only new elements are returned
.drop(1)
.map { it!! }
@Deprecated("")
override fun clearMemory() {

View file

@ -8,7 +8,7 @@ import com.nytimes.android.external.store3.util.KeyParser
import com.nytimes.android.external.store3.util.NoKeyParser
import com.nytimes.android.external.store3.util.NoopParserFunc
import com.nytimes.android.external.store3.util.NoopPersister
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
open class RealStore<Parsed, Key> : Store<Parsed, Key> {
@ -81,16 +81,16 @@ open class RealStore<Parsed, Key> : Store<Parsed, Key> {
*
* @return data from fresh and store it in memory and persister
*/
suspend override fun fresh(key: Key): Parsed {
override suspend fun fresh(key: Key): Parsed {
return internalStore.fresh(key)
}
override fun stream(): ReceiveChannel<Parsed> {
override fun stream(): Flow<Parsed> {
return internalStore.stream()
}
override fun stream(key: Key): ReceiveChannel<Parsed> {
override fun stream(key: Key): Flow<Parsed> {
return internalStore.stream(key)
}

View file

@ -1,7 +1,7 @@
package com.nytimes.android.external.store3.base.impl
import com.nytimes.android.external.store.util.Result
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
/**
* a [StoreBuilder]
@ -54,7 +54,7 @@ interface Store<T, V> {
* with operators that expect an OnComplete event
*/
// TODO should this method return a Pair<K, T> ?
fun stream(): ReceiveChannel<T>
fun stream(): Flow<T>
/**
* Similar to [Store.get() ][Store.get]
@ -63,7 +63,7 @@ interface Store<T, V> {
* Errors will be dropped
*
*/
fun stream(key: V): ReceiveChannel<T>
fun stream(key: V): Flow<T>
/**
* Clear the memory cache of all entries

View file

@ -43,30 +43,32 @@ class StreamOneKeyTest {
@Test
fun testStream() = runBlocking<Unit> {
val streamObservable = store.stream(barCode)
//stream doesn't invoke get anymore so when we call it the channel is empty
assertThat(streamObservable.isEmpty).isTrue()
val streamSubscription = store.stream(barCode).openChannelSubscription()
try {//stream doesn't invoke get anymore so when we call it the channel is empty
assertThat(streamSubscription.isEmpty).isTrue()
store.clear()
//fresh should notify subscribers again
store.fresh(barCode)
assertThat(streamObservable.poll()).isEqualTo(TEST_ITEM)
store.clear()
//fresh should notify subscribers again
store.fresh(barCode)
assertThat(streamSubscription.poll()).isEqualTo(TEST_ITEM)
// assertThat(streamObservable.poll()).isEqualTo(TEST_ITEM2)
//get for another barcode should not trigger a stream for barcode1
whenever(fetcher.fetch(barCode2))
.thenReturn(TEST_ITEM)
whenever(persister.read(barCode2))
.thenReturn(TEST_ITEM)
whenever(persister.write(barCode2, TEST_ITEM))
.thenReturn(true)
store.get(barCode2)
assertThat(streamObservable.isEmpty).isTrue()
//get for another barcode should not trigger a stream for barcode1
whenever(fetcher.fetch(barCode2))
.thenReturn(TEST_ITEM)
whenever(persister.read(barCode2))
.thenReturn(TEST_ITEM)
whenever(persister.write(barCode2, TEST_ITEM))
.thenReturn(true)
store.get(barCode2)
assertThat(streamSubscription.isEmpty).isTrue()
} finally {
streamSubscription.cancel()
}
}
companion object {
private val TEST_ITEM = "test"
private val TEST_ITEM2 = "test2"
private const val TEST_ITEM = "test"
private const val TEST_ITEM2 = "test2"
}
}

View file

@ -6,6 +6,11 @@ import com.nytimes.android.external.store3.base.Fetcher
import com.nytimes.android.external.store3.base.Persister
import com.nytimes.android.external.store3.base.impl.BarCode
import com.nytimes.android.external.store3.base.impl.StoreBuilder
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.broadcastIn
import kotlinx.coroutines.plus
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.Before
@ -13,8 +18,8 @@ import org.junit.Test
class StreamTest {
val fetcher: Fetcher<String, BarCode> = mock()
val persister: Persister<String, BarCode> = mock()
private val fetcher: Fetcher<String, BarCode> = mock()
private val persister: Persister<String, BarCode> = mock()
private val barCode = BarCode("key", "value")
@ -37,21 +42,31 @@ class StreamTest {
@Test
fun testStream() = runBlocking<Unit> {
val streamObservable = store.stream()
assertThat(streamObservable.isEmpty).isTrue()
store.get(barCode)
assertThat(streamObservable.isEmpty).isFalse()
val streamSubscription = store.stream().openChannelSubscription()
try {
assertThat(streamSubscription.isEmpty).isTrue()
store.get(barCode)
assertThat(streamSubscription.isEmpty).isFalse()
} finally {
streamSubscription.cancel()
}
}
@Test
fun testStreamEmitsOnlyFreshData() = runBlocking<Unit> {
store.get(barCode)
val streamObservable = store.stream()
assertThat(streamObservable.isEmpty).isTrue()
val streamSubscription = store.stream().openChannelSubscription()
try {
assertThat(streamSubscription.isEmpty).isTrue()
} finally {
streamSubscription.cancel()
}
}
companion object {
private val TEST_ITEM = "test"
private const val TEST_ITEM = "test"
}
}
fun <T> Flow<T>.openChannelSubscription() =
broadcastIn(GlobalScope + Dispatchers.Unconfined).openSubscription()