diff --git a/.gitignore b/.gitignore index 7d11a5a..c8408e9 100644 --- a/.gitignore +++ b/.gitignore @@ -40,6 +40,9 @@ captures/ # Intellij *.iml .idea/ +.classpath +.project +.settings # Keystore files *.jks diff --git a/build.gradle b/build.gradle index 675fbd1..edb8e08 100644 --- a/build.gradle +++ b/build.gradle @@ -21,7 +21,7 @@ buildscript { ] dependencies { - classpath 'com.android.tools.build:gradle:3.6.0-alpha11' + classpath 'com.android.tools.build:gradle:3.6.0-beta01' classpath 'com.getkeepsafe.dexcount:dexcount-gradle-plugin:0.5.6' classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$rootProject.ext.versions.kotlin" classpath 'org.jetbrains.dokka:dokka-gradle-plugin:0.9.17' @@ -32,7 +32,6 @@ allprojects { buildscript {} repositories { - maven { url 'https://oss.sonatype.org/content/repositories/snapshots/'} mavenCentral() jcenter() } diff --git a/buildsystem/dependencies.gradle b/buildsystem/dependencies.gradle index f8df3e9..b308301 100644 --- a/buildsystem/dependencies.gradle +++ b/buildsystem/dependencies.gradle @@ -54,7 +54,7 @@ ext.versions = [ supportTestRunner : '0.4.1', espresso : '2.2.1', compileTesting : '0.8', - coroutines : '1.3.0', + coroutines : '1.3.2', ] ext.libraries = [ diff --git a/store/src/main/java/com/nytimes/android/external/store3/pipeline/PipelineStore.kt b/store/src/main/java/com/nytimes/android/external/store3/pipeline/PipelineStore.kt index edfbce2..ba3949d 100644 --- a/store/src/main/java/com/nytimes/android/external/store3/pipeline/PipelineStore.kt +++ b/store/src/main/java/com/nytimes/android/external/store3/pipeline/PipelineStore.kt @@ -44,7 +44,8 @@ fun PipelineStore.open(): Store { override fun stream(key: Key): Flow = self.stream( StoreRequest.skipMemory( key = key, - refresh = true) + refresh = true + ) ).transform { it.throwIfError() it.dataOrNull()?.let { diff --git a/store/src/main/java/com/nytimes/android/external/store3/pipeline/StoreResponse.kt b/store/src/main/java/com/nytimes/android/external/store3/pipeline/StoreResponse.kt index bc339b7..296a869 100644 --- a/store/src/main/java/com/nytimes/android/external/store3/pipeline/StoreResponse.kt +++ b/store/src/main/java/com/nytimes/android/external/store3/pipeline/StoreResponse.kt @@ -74,10 +74,11 @@ sealed class StoreResponse( else -> null } + @Suppress("UNCHECKED_CAST") internal fun swapType(): StoreResponse = when (this) { is Error -> Error(error, origin) is Loading -> Loading(origin) - else -> throw IllegalStateException("cannot swap type for $this") + is Data-> Data(value = value as R, origin = origin) } } diff --git a/store/src/main/java/com/nytimes/android/external/store4/FetcherController.kt b/store/src/main/java/com/nytimes/android/external/store4/FetcherController.kt new file mode 100644 index 0000000..33f830a --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/FetcherController.kt @@ -0,0 +1,52 @@ +package com.nytimes.android.external.store4 + +import com.nytimes.android.external.store4.multiplex.Multiplexer +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow + +/** + * This class maintains one and only 1 fetcher for a given [Key]. + */ +@FlowPreview +@ExperimentalCoroutinesApi +internal class FetcherController( + private val scope: CoroutineScope, + private val realFetcher: (Key) -> Flow, + private val sourceOfTruth: SourceOfTruthWithBarrier? +) { + private val fetchers = RefCountedResource( + create = { key: Key -> + Multiplexer( + scope = scope, + bufferSize = 0, + source = { + realFetcher(key) + }, + onEach = { + sourceOfTruth?.write(key, it) + } + ) + }, + onRelease = { key: Key, multiplexer: Multiplexer -> + multiplexer.close() + } + ) + + fun getFetcher(key: Key): Flow { + return flow { + val fetcher = fetchers.acquire(key) + try { + emitAll(fetcher.create()) + } finally { + fetchers.release(key, fetcher) + } + } + } + + // visible for testing + internal suspend fun fetcherSize() = fetchers.size() +} diff --git a/store/src/main/java/com/nytimes/android/external/store4/FlowMerge.kt b/store/src/main/java/com/nytimes/android/external/store4/FlowMerge.kt new file mode 100644 index 0000000..50fe623 --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/FlowMerge.kt @@ -0,0 +1,35 @@ +package com.nytimes.android.external.store4 + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.launch + +/** + * Merge implementation tells downstream what the source is and also uses a rendezvous channel + */ +@ExperimentalCoroutinesApi +internal fun Flow.merge(other: Flow): Flow> { + return channelFlow> { + launch { + this@merge.collect { + send(Either.Left(it)) + + } + } + launch { + other.collect { + send(Either.Right(it)) + } + } + }.buffer(Channel.RENDEZVOUS) +} + +internal sealed class Either { + data class Left(val value: T) : Either() + + data class Right(val value: R) : Either() +} diff --git a/store/src/main/java/com/nytimes/android/external/store4/RealInternalCoroutineStore.kt b/store/src/main/java/com/nytimes/android/external/store4/RealInternalCoroutineStore.kt new file mode 100644 index 0000000..56612f4 --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/RealInternalCoroutineStore.kt @@ -0,0 +1,283 @@ +package com.nytimes.android.external.store4 + +import com.com.nytimes.suspendCache.StoreCache +import com.nytimes.android.external.store3.base.impl.MemoryPolicy +import com.nytimes.android.external.store3.base.impl.StoreDefaults +import com.nytimes.android.external.store3.pipeline.CacheType +import com.nytimes.android.external.store3.pipeline.PipelineStore +import com.nytimes.android.external.store3.pipeline.ResponseOrigin +import com.nytimes.android.external.store3.pipeline.StoreRequest +import com.nytimes.android.external.store3.pipeline.StoreResponse +import com.nytimes.android.external.store3.pipeline.open +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.transform +import kotlinx.coroutines.flow.withIndex + +@ExperimentalCoroutinesApi +@FlowPreview +internal class RealInternalCoroutineStore( + private val scope: CoroutineScope, + private val fetcher: (Key) -> Flow, + sourceOfTruth: SourceOfTruth? = null, + private val memoryPolicy: MemoryPolicy? +) : PipelineStore { + /** + * This source of truth is either a real database or an in memory source of truth created by + * the builder. + * Whatever is given, we always put a [SourceOfTruthWithBarrier] in front of it so that while + * we write the value from fetcher into the disk, we can block reads to avoid sending new data + * as if it came from the server (the [StoreResponse.origin] field). + */ + private val sourceOfTruth: SourceOfTruthWithBarrier? = + sourceOfTruth?.let { + SourceOfTruthWithBarrier(it) + } + private val memCache = memoryPolicy?.let { + StoreCache.fromRequest>( + loader = { + TODO( + """ + This should've never been called. We don't need this anymore, should remove + loader after we clean old Store ? + """.trimIndent() + ) + }, + memoryPolicy = memoryPolicy + ) + } + /** + * Fetcher controller maintains 1 and only 1 `Multiplexer` for a given key to ensure network + * requests are shared. + */ + private val fetcherController = FetcherController( + scope = scope, + realFetcher = fetcher, + sourceOfTruth = this.sourceOfTruth + ) + + override fun stream(request: StoreRequest): Flow> { + return if (sourceOfTruth == null) { + createNetworkFlow( + request = request, + networkLock = null + ) + } else { + diskNetworkCombined(request) + }.onEach { + // whenever a value is dispatched, save it to the memory cache + if (it.origin != ResponseOrigin.Cache) { + it.dataOrNull()?.let { data -> + memCache?.put(request.key, data) + } + } + }.onStart { + // if there is anything cached, dispatch it first if requested + if (!request.shouldSkipCache(CacheType.MEMORY)) { + memCache?.getIfPresent(request.key)?.let { cached -> + emit(StoreResponse.Data(value = cached, origin = ResponseOrigin.Cache)) + } + } + } + } + + override suspend fun clear(key: Key) { + memCache?.invalidate(key) + sourceOfTruth?.delete(key) + } + + /** + * We want to stream from disk but also want to refresh. If requested or necessary. + * + * How it works: + * There are two flows: + * Fetcher: The flow we get for the fetching + * Disk: The flow we get from the [SourceOfTruth]. + * Both flows are controlled by a lock for each so that we can start the right one based on + * the request status or values we receive. + * + * Value is always returned from [SourceOfTruth] while the errors are dispatched from both the + * `Fetcher` and [SourceOfTruth]. + * + * There are two initialization paths: + * + * 1) Request wants to skip disk cache: + * In this case, we first start the fetcher flow. When fetcher flow provides something besides + * an error, we enable the disk flow. + * + * 2) Request does not want to skip disk cache: + * In this case, we first start the disk flow. If disk flow returns `null` or + * [StoreRequest.refresh] is set to `true`, we enable the fetcher flow. + * This ensures we first get the value from disk and then load from server if necessary. + */ + private fun diskNetworkCombined( + request: StoreRequest + ): Flow> { + val diskLock = CompletableDeferred() + val networkLock = CompletableDeferred() + val networkFlow = createNetworkFlow(request, networkLock) + if (!request.shouldSkipCache(CacheType.DISK)) { + diskLock.complete(Unit) + } + val diskFlow = sourceOfTruth!!.reader(request.key, diskLock) + // we use a merge implementation that gives the source of the flow so that we can decide + // based on that. + return networkFlow.merge(diskFlow.withIndex()) + .transform { + // left is Fetcher while right is source of truth + if (it is Either.Left) { + if (it.value !is StoreResponse.Data<*>) { + emit(it.value.swapType()) + } + // network sent something + if (it.value is StoreResponse.Data<*>) { + // unlocking disk only if network sent data so that fresh data request never + // receives disk data by mistake + diskLock.complete(Unit) + } + } else if (it is Either.Right) { + // right, that is data from disk + val (index, diskData) = it.value + if (diskData.value != null) { + emit( + StoreResponse.Data( + value = diskData.value, + origin = diskData.origin + ) as StoreResponse + ) + } + + // if this is the first disk value and it is null, we should enable fetcher + // TODO should we ignore the index and always enable? + if (index == 0 && (diskData.value == null || request.refresh)) { + networkLock.complete(Unit) + } + } + } + } + + private fun createNetworkFlow( + request: StoreRequest, + networkLock: CompletableDeferred? + ): Flow> { + return fetcherController + .getFetcher(request.key) + .map { + StoreResponse.Data( + value = it, + origin = ResponseOrigin.Fetcher + ) as StoreResponse + }.catch { + emit( + StoreResponse.Error( + error = it, + origin = ResponseOrigin.Fetcher + ) + ) + }.onStart { + if (!request.shouldSkipCache(CacheType.DISK)) { + // wait until network gives us the go + networkLock?.await() + } + emit( + StoreResponse.Loading( + origin = ResponseOrigin.Fetcher + ) + ) + }.map { + it.swapType() + } + } + + fun asLegacyStore() = open() + + // TODO this builder w/ 3 type args is really ugly, think more about it... + companion object { + fun beginWithNonFlowingFetcher( + fetcher: suspend (key: Key) -> Input + ) = Builder { key: Key -> + flow { + emit(fetcher(key)) + } + } + + fun beginWithFlowingFetcher( + fetcher: (key: Key) -> Flow + ) = Builder(fetcher) + } + + class Builder( + private val fetcher: (key: Key) -> Flow + ) { + private var scope: CoroutineScope? = null + private var sourceOfTruth: SourceOfTruth? = null + private var cachePolicy: MemoryPolicy? = StoreDefaults.memoryPolicy + + fun scope(scope: CoroutineScope): Builder { + this.scope = scope + return this + } + + fun nonFlowingPersister( + reader: suspend (Key) -> Output?, + writer: suspend (Key, Input) -> Unit, + delete: (suspend (Key) -> Unit)? = null + ): Builder { + sourceOfTruth = PersistentNonFlowingSourceOfTruth( + realReader = reader, + realWriter = writer, + realDelete = delete + ) + return this + } + + fun persister( + reader: (Key) -> Flow, + writer: suspend (Key, Input) -> Unit, + delete: (suspend (Key) -> Unit)? = null + ): Builder { + sourceOfTruth = PersistentSourceOfTruth( + realReader = reader, + realWriter = writer, + realDelete = delete + ) + return this + } + + fun sourceOfTruth( + sourceOfTruth: SourceOfTruth + ): Builder { + this.sourceOfTruth = sourceOfTruth + return this + } + + fun cachePolicy(memoryPolicy: MemoryPolicy?): Builder { + cachePolicy = memoryPolicy + return this + } + + fun disableCache(): Builder { + cachePolicy = null + return this + } + + fun build(): RealInternalCoroutineStore { + @Suppress("UNCHECKED_CAST") + return RealInternalCoroutineStore( + scope = scope ?: GlobalScope, + sourceOfTruth = sourceOfTruth, + fetcher = fetcher, + memoryPolicy = cachePolicy + ) + } + } +} diff --git a/store/src/main/java/com/nytimes/android/external/store4/RefCountedResource.kt b/store/src/main/java/com/nytimes/android/external/store4/RefCountedResource.kt new file mode 100644 index 0000000..80d74e9 --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/RefCountedResource.kt @@ -0,0 +1,45 @@ +package com.nytimes.android.external.store4 + +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +/** + * Simple holder that can ref-count items by a given key. + */ +internal class RefCountedResource( + private val create: suspend (Key) -> T, + private val onRelease : (suspend (Key, T) -> Unit)? = null +) { + private val items = mutableMapOf() + private val lock = Mutex() + + suspend fun acquire(key: Key) : T = lock.withLock { + items.getOrPut(key) { + Item(create(key)) + }.also { + it.refCount ++ + }.value + } + + suspend fun release(key: Key, value : T) = lock.withLock { + val existing = items[key] + check(existing != null && existing.value === value) { + "inconsistent release, seems like $value was leaked or never acquired" + } + existing.refCount -- + if (existing.refCount < 1) { + items.remove(key) + onRelease?.invoke(key, value) + } + } + + // used in tests + suspend fun size() = lock.withLock { + items.size + } + + private inner class Item( + val value: T, + var refCount: Int = 0 + ) +} diff --git a/store/src/main/java/com/nytimes/android/external/store4/SourceOfTruth.kt b/store/src/main/java/com/nytimes/android/external/store4/SourceOfTruth.kt new file mode 100644 index 0000000..d02ba24 --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/SourceOfTruth.kt @@ -0,0 +1,90 @@ +package com.nytimes.android.external.store4 + +import com.nytimes.android.external.store3.base.Clearable +import com.nytimes.android.external.store3.base.Persister +import com.nytimes.android.external.store3.pipeline.ResponseOrigin +import com.nytimes.android.external.store3.util.KeyParser +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow + +/** + * Source of truth takes care of making any source (no matter if it has flowing reads or not) into + * a common flowing API. Used w/ a [SourceOfTruthWithBarrier] in front of it in the + * [RealInternalCoroutineStore] implementation to avoid dispatching values to downstream while + * a write is in progress. + */ +internal interface SourceOfTruth { + val defaultOrigin: ResponseOrigin + fun reader(key: Key): Flow + suspend fun write(key: Key, value: Input) + suspend fun delete(key: Key) + // for testing + suspend fun getSize(): Int + + companion object { + fun fromLegacy( + persister: Persister, + // parser that runs after get from db + postParser: KeyParser? = null + ): SourceOfTruth { + return PersistentSourceOfTruth( + realReader = { key -> + flow { + if (postParser == null) { + emit(persister.read(key)) + } else { + persister.read(key)?.let { + val postParsed = postParser.apply(key, it) + emit(postParsed) + } ?: emit(null) + } + } + }, + realWriter = { key, value -> + persister.write(key, value) + }, + realDelete = { key -> + (persister as? Clearable)?.clear(key) + } + ) + } + } +} + +internal class PersistentSourceOfTruth( + private val realReader: (Key) -> Flow, + private val realWriter: suspend (Key, Input) -> Unit, + private val realDelete: (suspend (Key) -> Unit)? = null +) : SourceOfTruth { + override val defaultOrigin = ResponseOrigin.Persister + override fun reader(key: Key): Flow = realReader(key) + override suspend fun write(key: Key, value: Input) = realWriter(key, value) + override suspend fun delete(key: Key) { + realDelete?.invoke(key) + } + + // for testing + override suspend fun getSize(): Int { + throw UnsupportedOperationException("not supported for persistent") + } +} + +internal class PersistentNonFlowingSourceOfTruth( + private val realReader: suspend (Key) -> Output?, + private val realWriter: suspend (Key, Input) -> Unit, + private val realDelete: (suspend (Key) -> Unit)? = null +) : SourceOfTruth { + override val defaultOrigin = ResponseOrigin.Persister + override fun reader(key: Key): Flow = flow { + emit(realReader(key)) + } + override suspend fun write(key: Key, value: Input) = realWriter(key, value) + override suspend fun delete(key: Key) { + realDelete?.invoke(key) + } + + // for testing + override suspend fun getSize(): Int { + throw UnsupportedOperationException("not supported for persistent") + } +} diff --git a/store/src/main/java/com/nytimes/android/external/store4/SourceOfTruthWithBarrier.kt b/store/src/main/java/com/nytimes/android/external/store4/SourceOfTruthWithBarrier.kt new file mode 100644 index 0000000..c378836 --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/SourceOfTruthWithBarrier.kt @@ -0,0 +1,122 @@ +package com.nytimes.android.external.store4 + +import com.nytimes.android.external.store3.pipeline.ResponseOrigin +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.channels.ConflatedBroadcastChannel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.collectIndexed +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import java.util.concurrent.atomic.AtomicLong + +/** + * Wraps a [SourceOfTruth] and blocks reads while a write is in progress. + */ +@FlowPreview +@ExperimentalCoroutinesApi +internal class SourceOfTruthWithBarrier( + private val delegate: SourceOfTruth +) { + /** + * Each key has a barrier so that we can block reads while writing. + */ + private val barriers = RefCountedResource>( + create = { key -> + ConflatedBroadcastChannel(BarrierMsg.Open.INITIAL) + } + ) + /** + * Each message gets dispatched with a version. This ensures we won't accidentally turn on the + * reader flow for a new reader that happens to have arrived while a write is in progress since + * that write should be considered as a disk read for that flow, not fetcher. + */ + private val versionCounter = AtomicLong(0) + + + fun reader(key: Key, lock: CompletableDeferred): Flow> { + return flow { + val barrier = barriers.acquire(key) + val readerVersion: Long = versionCounter.incrementAndGet() + try { + lock.await() + emitAll(barrier.asFlow() + .flatMapLatest { + val messageArrivedAfterMe = readerVersion < it.version + when (it) { + is BarrierMsg.Open -> delegate.reader(key).mapIndexed { index, output -> + if (index == 0 && messageArrivedAfterMe) { + DataWithOrigin( + origin = ResponseOrigin.Fetcher, + value = output + ) + } else { + DataWithOrigin( + origin = delegate.defaultOrigin, + value = output + ) + } + } + is BarrierMsg.Blocked -> { + flowOf() + } + } + }) + } finally { + // we are using a finally here instead of onCompletion as there might be a + // possibility where flow gets cancelled right before `emitAll`. + barriers.release(key, barrier) + } + + } + } + + suspend fun write(key: Key, value: Input) { + val barrier = barriers.acquire(key) + try { + barrier.send(BarrierMsg.Blocked(versionCounter.incrementAndGet())) + delegate.write(key, value) + barrier.send(BarrierMsg.Open(versionCounter.incrementAndGet())) + } finally { + barriers.release(key, barrier) + } + } + + suspend fun delete(key: Key) { + delegate.delete(key) + } + + private sealed class BarrierMsg( + val version: Long + ) { + class Blocked(version: Long) : BarrierMsg(version) + class Open(version: Long) : BarrierMsg(version) { + companion object { + val INITIAL = Open(INITIAL_VERSION) + } + } + } + + // visible for testing + internal suspend fun barrierCount() = barriers.size() + + companion object { + private const val INITIAL_VERSION = -1L + } +} + +@ExperimentalCoroutinesApi +private inline fun Flow.mapIndexed(crossinline block: (Int, T) -> R) = flow { + this@mapIndexed.collectIndexed { index, value -> + emit(block(index, value)) + } +} + +internal data class DataWithOrigin( + val origin: ResponseOrigin, + val value: T? +) diff --git a/store/src/main/java/com/nytimes/android/external/store4/multiplex/ChannelManager.kt b/store/src/main/java/com/nytimes/android/external/store4/multiplex/ChannelManager.kt new file mode 100644 index 0000000..8b43ea6 --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/multiplex/ChannelManager.kt @@ -0,0 +1,321 @@ +package com.nytimes.android.external.store4.multiplex + +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import java.util.ArrayDeque +import java.util.Collections + +/** + * This actor helps tracking active channels and is able to dispatch values to each of them + * in parallel. As soon as one of them receives the value, the ack in the dispatch message is + * completed so that the sender can continue for the next item. + */ +@ExperimentalCoroutinesApi +class ChannelManager( + /** + * The scope in which ChannelManager actor runs + */ + scope: CoroutineScope, + /** + * The buffer size that is used while the upstream is active + */ + bufferSize: Int, + private val onEach: suspend (T) -> Unit, + /** + * Called when the channel manager is active (e.g. it has downstream collectors and needs a + * producer) + */ + private val onActive: (ChannelManager) -> SharedFlowProducer +) : StoreRealActor>(scope) { + private val buffer = Buffer(bufferSize) + /** + * The current producer + */ + private var producer: SharedFlowProducer? = null + + /** + * Tracks whether we've ever dispatched value or error from the current producer. + * Reset when producer finishes. + */ + private var dispatchedValue: Boolean = false + + /** + * List of downstream collectors. + */ + private val channels = mutableListOf>() + + override suspend fun handle(msg: Message) { + when (msg) { + is Message.AddLeftovers -> doAddLefovers(msg.leftovers) + is Message.AddChannel -> doAdd(msg) + is Message.RemoveChannel -> doRemove(msg.channel) + is Message.DispatchValue -> doDispatchValue(msg) + is Message.DispatchError -> doDispatchError(msg) + is Message.UpstreamFinished -> doHandleUpstreamClose(msg.producer) + } + } + + /** + * We are closing. Do a cleanup on existing channels where we'll close them and also decide + * on the list of leftovers. + */ + fun doHandleUpstreamClose(producer: SharedFlowProducer?) { + if (this.producer !== producer) { + return + } + val leftovers = mutableListOf>() + channels.forEach { + when { + it.receivedValue -> it.close() + dispatchedValue -> + // we dispatched a value but this channel didn't receive so put it into + // leftovers + leftovers.add(it) + else -> // upstream didn't dispatch + it.close() + } + } + channels.clear() // empty references + channels.addAll(leftovers) + this.producer = null + if (leftovers.isNotEmpty()) { + activateIfNecessary(true) + } + } + + override fun onClosed() { + producer?.cancel() + } + + /** + * Dispatch value to all downstream collectors. + */ + private suspend fun doDispatchValue(msg: Message.DispatchValue) { + onEach(msg.value) + buffer.add(msg) + dispatchedValue = true + channels.forEach { + it.dispatchValue(msg) + } + } + + /** + * Dispatch an upstream error to downstream collectors. + */ + private fun doDispatchError(msg: Message.DispatchError) { + // dispatching error is as good as dispatching value + dispatchedValue = true + channels.forEach { + it.dispatchError(msg.error) + } + } + + /** + * Remove a downstream collector. + */ + private suspend fun doRemove(channel: Channel>) { + val index = channels.indexOfFirst { + it.hasChannel(channel) + } + if (index >= 0) { + channels.removeAt(index) + if (channels.isEmpty()) { + producer?.cancelAndJoin() + } + } + } + + /** + * We've received some leftovers from the previous [ChannelManager]. Add them to our list. + */ + private suspend fun doAddLefovers(leftovers: List>) { + val wasEmpty = channels.isEmpty() + leftovers.forEach { channelEntry -> + addEntry( + entry = channelEntry.copy(_receivedValue = false) + ) + } + activateIfNecessary(wasEmpty) + } + + /** + * Add a new downstream collector + */ + private suspend fun doAdd(msg: Message.AddChannel) { + val wasEmpty = channels.isEmpty() + + addEntry( + entry = ChannelEntry( + channel = msg.channel + ) + ) + activateIfNecessary(wasEmpty) + } + + private fun activateIfNecessary(wasEmpty: Boolean) { + if (producer == null && wasEmpty) { + producer = onActive(this) + dispatchedValue = false + producer!!.start() + } + } + + /** + * Internally add the new downstream collector to our list, send it anything buffered. + */ + private suspend fun addEntry(entry: ChannelEntry) { + val new = channels.none { + it.hasChannel(entry) + } + check(new) { + "$entry is already in the list." + } + check(!entry.receivedValue) { + "$entry already received a value" + } + channels.add(entry) + if (buffer.items.isNotEmpty()) { + // if there is anything in the buffer, send it + buffer.items.forEach { + entry.dispatchValue(it) + } + } + } + + /** + * Holder for each downstream collector + */ + internal data class ChannelEntry( + /** + * The channel used by the collector + */ + private val channel: Channel>, + /** + * Tracking whether we've ever dispatched a value or an error to downstream + */ + private var _receivedValue: Boolean = false + ) { + val receivedValue + get() = _receivedValue + + suspend fun dispatchValue(value: Message.DispatchValue) { + _receivedValue = true + channel.send(value) + } + + fun dispatchError(error: Throwable) { + _receivedValue = true + channel.close(error) + } + + fun close() { + channel.close() + } + + fun hasChannel(channel: Channel>) = this.channel === channel + + fun hasChannel(entry: ChannelEntry) = this.channel === entry.channel + } + + /** + * Messages accepted by the [ChannelManager]. + */ + sealed class Message { + /** + * Add a new channel, that means a new downstream subscriber + */ + class AddChannel( + val channel: Channel> + ) : Message() + + /** + * Add multiple channels. Happens when we are carrying over leftovers from a previous + * manager + */ + internal class AddLeftovers(val leftovers: List>) : + Message() + + /** + * Remove a downstream subscriber, that means it completed + */ + class RemoveChannel(val channel: Channel>) : Message() + + /** + * Upstream dispatched a new value, send it to all downstream items + */ + class DispatchValue( + /** + * The value dispatched by the upstream + */ + val value: T, + /** + * Ack that is completed by all receiver. Upstream producer will await this before asking + * for a new value from upstream + */ + val delivered: CompletableDeferred + ) : Message() + + /** + * Upstream dispatched an error, send it to all downstream items + */ + class DispatchError( + /** + * The error sent by the upstream + */ + val error: Throwable + ) : Message() + + class UpstreamFinished( + /** + * SharedFlowProducer finished emitting + */ + val producer: SharedFlowProducer + ) : Message() + } + + /** + * Buffer implementation for any late arrivals. + */ + private interface Buffer { + fun add(item: Message.DispatchValue) + val items: Collection> + } + + /** + * Default implementation of buffer which does not buffer anything. + */ + private class NoBuffer : Buffer { + override val items: Collection> + get() = Collections.emptyList() + + + override fun add(item: Message.DispatchValue) { + // ignore + } + } + + /** + * Create a new buffer insteance based on the provided limit. + */ + private fun Buffer(limit: Int): Buffer = if (limit > 0) { + BufferImpl(limit) + } else { + NoBuffer() + } + + /** + * A real buffer implementation that has a FIFO queue. + */ + private class BufferImpl(private val limit: Int) : + Buffer { + override val items = ArrayDeque>(limit.coerceAtMost(10)) + override fun add(item: Message.DispatchValue) { + while (items.size >= limit) { + items.pollFirst() + } + items.offerLast(item) + } + } +} diff --git a/store/src/main/java/com/nytimes/android/external/store4/multiplex/Multiplexer.kt b/store/src/main/java/com/nytimes/android/external/store4/multiplex/Multiplexer.kt new file mode 100644 index 0000000..3df0ad3 --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/multiplex/Multiplexer.kt @@ -0,0 +1,81 @@ +package com.nytimes.android.external.store4.multiplex + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.transform + +/** + * Like a publish, shares 1 upstream value with multiple downstream receiver. + * It has one store specific behavior where upstream flow is suspended until at least 1 downstream + * flow emits the value to ensure we don't abuse the upstream flow of downstream cannot keep up. + */ +@FlowPreview +@ExperimentalCoroutinesApi +internal class Multiplexer( + /** + * The [CoroutineScope] to use for upstream subscription + */ + private val scope: CoroutineScope, + /** + * The buffer size that is used only if the upstream has not complete yet. + * Defaults to 0. + */ + bufferSize: Int = 0, + /** + * Source function to create a new flow when necessary. + */ + // TODO does this have to be a method or just a flow ? Will decide when actual implementation + // happens + private val source: () -> Flow, + /** + * Called when upstream dispatches a value. + */ + private val onEach: suspend (T) -> Unit +) { + + private val channelManager by lazy(LazyThreadSafetyMode.SYNCHRONIZED) { + ChannelManager( + scope = scope, + bufferSize = bufferSize, + onActive = { + SharedFlowProducer( + scope = scope, + src = source(), + channelManager = it + ) + }, + onEach = onEach + ) + } + + fun create(): Flow { + val channel = Channel>(Channel.UNLIMITED) + return channel.consumeAsFlow() + .onStart { + channelManager.send( + ChannelManager.Message.AddChannel( + channel + )) + } + .transform { + emit(it.value) + it.delivered.complete(Unit) + }.onCompletion { + channelManager.send( + ChannelManager.Message.RemoveChannel( + channel + ) + ) + } + } + + fun close() { + channelManager.close() + } +} \ No newline at end of file diff --git a/store/src/main/java/com/nytimes/android/external/store4/multiplex/SharedFlowProducer.kt b/store/src/main/java/com/nytimes/android/external/store4/multiplex/SharedFlowProducer.kt new file mode 100644 index 0000000..9b3c18e --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/multiplex/SharedFlowProducer.kt @@ -0,0 +1,88 @@ +package com.nytimes.android.external.store4.multiplex + +import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.DispatchError +import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.DispatchValue +import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.UpstreamFinished +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.channels.ClosedSendChannelException +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.launch +import java.nio.channels.ClosedChannelException + +/** + * A flow collector that works with a [ChannelManager] to collect values from an upstream flow + * and dispatch to the [ChannelManager] which then dispatches to downstream collectors. + * + * They work in sync such that this producer always expects an ack from the [ChannelManager] after + * sending an event. + * + * Cancellation of the collection might be triggered by both this producer (e.g. upstream completes) + * or the [ChannelManager] (e.g. all active collectors complete). + */ +@ExperimentalCoroutinesApi +class SharedFlowProducer( + private val scope: CoroutineScope, + private val src: Flow, + private val channelManager: ChannelManager +) { + private lateinit var collectionJob: Job + + /** + * Starts the collection of the upstream flow. + */ + fun start() { + scope.launch { + try { + // launch again to track the collection job + collectionJob = scope.launch { + try { + src.catch { + channelManager.send( + DispatchError( + it + ) + ) + }.collect { + val ack = CompletableDeferred() + channelManager.send( + DispatchValue( + it, + ack + ) + ) + // suspend until at least 1 receives the new value + ack.await() + } + } catch (closed: ClosedSendChannelException) { + // ignore. if consumers are gone, it might close itself. + } + } + // wait until collection ends, either due to an error or ordered by the channel + // manager + collectionJob.join() + } finally { + // cleanup the channel manager so that downstreams can be closed if they are not + // closed already and leftovers can be moved to a new producer if necessary. + try { + channelManager.send(UpstreamFinished(this@SharedFlowProducer)) + } catch (closed : ClosedSendChannelException) { + // it might close before us, its fine. + } + } + } + } + + suspend fun cancelAndJoin() { + collectionJob.cancelAndJoin() + } + + fun cancel() { + collectionJob.cancel() + } +} diff --git a/store/src/main/java/com/nytimes/android/external/store4/multiplex/StoreRealActor.kt b/store/src/main/java/com/nytimes/android/external/store4/multiplex/StoreRealActor.kt new file mode 100644 index 0000000..2f176da --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/multiplex/StoreRealActor.kt @@ -0,0 +1,46 @@ +package com.nytimes.android.external.store4.multiplex + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.channels.actor + +/** + * Simple actor implementation abstracting away Coroutine.actor since it is deprecated. + * It also enforces a 0 capacity buffer. + */ +@Suppress("EXPERIMENTAL_API_USAGE") +@ExperimentalCoroutinesApi +abstract class StoreRealActor( + scope: CoroutineScope +) { + val inboundChannel: SendChannel + + init { + inboundChannel = scope.actor( + capacity = 0 + ) { + channel.invokeOnClose { + onClosed() + } + for (msg in channel) { + handle(msg) + } + } + } + + open fun onClosed() { + + } + + abstract suspend fun handle(msg: T) + + suspend fun send(msg: T) { + inboundChannel.send(msg) + } + + fun close() { + inboundChannel.close() + } +} diff --git a/store/src/test/java/com/nytimes/android/external/store3/ClearStoreMemoryTest.kt b/store/src/test/java/com/nytimes/android/external/store3/ClearStoreMemoryTest.kt index 6d90ffc..26d338c 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/ClearStoreMemoryTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/ClearStoreMemoryTest.kt @@ -12,12 +12,12 @@ import org.junit.runners.Parameterized @ExperimentalCoroutinesApi @RunWith(Parameterized::class) class ClearStoreMemoryTest( - storeType : TestStoreType + storeType: TestStoreType ) { private val testScope = TestCoroutineScope() private var networkCalls = 0 - private val store = TestStoreBuilder.from { - networkCalls ++ + private val store = TestStoreBuilder.from(testScope) { + networkCalls++ }.build(storeType) @Test diff --git a/store/src/test/java/com/nytimes/android/external/store3/ClearStoreTest.kt b/store/src/test/java/com/nytimes/android/external/store3/ClearStoreTest.kt index 089411e..72fd74c 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/ClearStoreTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/ClearStoreTest.kt @@ -14,7 +14,7 @@ import java.util.concurrent.atomic.AtomicInteger @RunWith(Parameterized::class) class ClearStoreTest( - storeType: TestStoreType + storeType: TestStoreType ) { private val testScope = TestCoroutineScope() @@ -22,10 +22,11 @@ class ClearStoreTest( private val networkCalls = AtomicInteger(0) private val store = TestStoreBuilder.from( - fetcher = { - networkCalls.incrementAndGet() - }, - persister = persister + scope = testScope, + fetcher = { + networkCalls.incrementAndGet() + }, + persister = persister ).build(storeType) @Test @@ -34,10 +35,10 @@ class ClearStoreTest( val barcode = BarCode("type", "key") whenever(persister.read(barcode)) - .thenReturn(null) //read from disk on get - .thenReturn(1) //read from disk after fetching from network - .thenReturn(null) //read from disk after clearing - .thenReturn(1) //read from disk after making additional network call + .thenReturn(null) //read from disk on get + .thenReturn(1) //read from disk after fetching from network + .thenReturn(null) //read from disk after clearing + .thenReturn(1) //read from disk after making additional network call whenever(persister.write(barcode, 1)).thenReturn(true) whenever(persister.write(barcode, 2)).thenReturn(true) @@ -57,18 +58,18 @@ class ClearStoreTest( val barcode2 = BarCode("type2", "key2") whenever(persister.read(barcode1)) - .thenReturn(null) //read from disk - .thenReturn(1) //read from disk after fetching from network - .thenReturn(null) //read from disk after clearing disk cache - .thenReturn(1) //read from disk after making additional network call + .thenReturn(null) //read from disk + .thenReturn(1) //read from disk after fetching from network + .thenReturn(null) //read from disk after clearing disk cache + .thenReturn(1) //read from disk after making additional network call whenever(persister.write(barcode1, 1)).thenReturn(true) whenever(persister.write(barcode1, 2)).thenReturn(true) whenever(persister.read(barcode2)) - .thenReturn(null) //read from disk - .thenReturn(1) //read from disk after fetching from network - .thenReturn(null) //read from disk after clearing disk cache - .thenReturn(1) //read from disk after making additional network call + .thenReturn(null) //read from disk + .thenReturn(1) //read from disk after fetching from network + .thenReturn(null) //read from disk after clearing disk cache + .thenReturn(1) //read from disk after making additional network call whenever(persister.write(barcode2, 1)).thenReturn(true) whenever(persister.write(barcode2, 2)).thenReturn(true) diff --git a/store/src/test/java/com/nytimes/android/external/store3/DontCacheErrorsTest1.kt b/store/src/test/java/com/nytimes/android/external/store3/DontCacheErrorsTest1.kt index b43d8a6..6286f5b 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/DontCacheErrorsTest1.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/DontCacheErrorsTest1.kt @@ -2,18 +2,24 @@ package com.nytimes.android.external.store3 import com.nytimes.android.external.store3.base.impl.BarCode import junit.framework.Assert.fail +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized +import kotlin.coroutines.EmptyCoroutineContext @RunWith(Parameterized::class) class DontCacheErrorsTest( storeType: TestStoreType ) { - + private val testScope = TestCoroutineScope() private var shouldThrow: Boolean = false - private val store = TestStoreBuilder.from { + // TODO move to test coroutine scope + private val store = TestStoreBuilder.from(testScope) { if (shouldThrow) { throw RuntimeException() } else { @@ -22,7 +28,7 @@ class DontCacheErrorsTest( }.build(storeType) @Test - fun testStoreDoesntCacheErrors() = runBlocking { + fun testStoreDoesntCacheErrors() = testScope.runBlockingTest { val barcode = BarCode("bar", "code") shouldThrow = true diff --git a/store/src/test/java/com/nytimes/android/external/store3/KeyParserTest.kt b/store/src/test/java/com/nytimes/android/external/store3/KeyParserTest.kt index 227103c..7b6aa15 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/KeyParserTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/KeyParserTest.kt @@ -10,19 +10,20 @@ import org.junit.runners.Parameterized @RunWith(Parameterized::class) class KeyParserTest( - storeType: TestStoreType + storeType: TestStoreType ) { private val testScope = TestCoroutineScope() private val store = TestStoreBuilder.from( - fetcher = { - NETWORK - }, - fetchParser = object : KeyParser { - override suspend fun apply(key: Int, raw: String): String { - return raw + key - } + scope = testScope, + fetcher = { + NETWORK + }, + fetchParser = object : KeyParser { + override suspend fun apply(key: Int, raw: String): String { + return raw + key } + } ).build(storeType) @Test diff --git a/store/src/test/java/com/nytimes/android/external/store3/NoNetworkTest.kt b/store/src/test/java/com/nytimes/android/external/store3/NoNetworkTest.kt index 97514ce..664bd64 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/NoNetworkTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/NoNetworkTest.kt @@ -14,10 +14,11 @@ import org.junit.runners.Parameterized class NoNetworkTest( storeType: TestStoreType ) { - private val store: Store = TestStoreBuilder.from { + private val testScope = TestCoroutineScope() + private val store: Store = TestStoreBuilder.from(testScope) { throw EXCEPTION }.build(storeType) - private val testScope = TestCoroutineScope() + @Test fun testNoNetwork() = testScope.runBlockingTest { diff --git a/store/src/test/java/com/nytimes/android/external/store3/ParsingFetcherTest.kt b/store/src/test/java/com/nytimes/android/external/store3/ParsingFetcherTest.kt index d7b7698..66a58bc 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/ParsingFetcherTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/ParsingFetcherTest.kt @@ -19,7 +19,7 @@ import org.mockito.Mockito.verify @ExperimentalCoroutinesApi @RunWith(Parameterized::class) class ParsingFetcherTest( - private val storeType: TestStoreType + private val storeType: TestStoreType ) { private val testScope = TestCoroutineScope() private val fetcher: Fetcher = mock() @@ -30,22 +30,23 @@ class ParsingFetcherTest( @Test fun testPersistFetcher() = testScope.runBlockingTest { val simpleStore = TestStoreBuilder.from( - fetcher = fetcher, - fetchParser = parser, - persister = persister + scope = testScope, + fetcher = fetcher, + fetchParser = parser, + persister = persister ).build(storeType) whenever(fetcher.fetch(barCode)) - .thenReturn(RAW_DATA) + .thenReturn(RAW_DATA) whenever(parser.apply(RAW_DATA)) - .thenReturn(PARSED) + .thenReturn(PARSED) whenever(persister.read(barCode)) - .thenReturn(PARSED) + .thenReturn(PARSED) whenever(persister.write(barCode, PARSED)) - .thenReturn(true) + .thenReturn(true) val value = simpleStore.fresh(barCode) diff --git a/store/src/test/java/com/nytimes/android/external/store3/SequentialTest.kt b/store/src/test/java/com/nytimes/android/external/store3/SequentialTest.kt index 25ab863..eee96b3 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/SequentialTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/SequentialTest.kt @@ -17,12 +17,16 @@ import kotlin.random.Random @RunWith(Parameterized::class) class SequentialTes( - storeType: TestStoreType) { + storeType: TestStoreType +) { private val testScope = TestCoroutineScope() var networkCalls = 0 - private val store = TestStoreBuilder.from(cached = true) { - networkCalls ++ + private val store = TestStoreBuilder.from( + scope = testScope, + cached = true + ) { + networkCalls++ }.build(storeType) @Test @@ -58,8 +62,8 @@ class SequentialTes( val store: Store = Store.from { it + Random.nextInt() } with { parser { it.toString() } - .persister(persister) - .cache() + .persister(persister) + .cache() } val v1 = store.get(4) diff --git a/store/src/test/java/com/nytimes/android/external/store3/StoreTest.kt b/store/src/test/java/com/nytimes/android/external/store3/StoreTest.kt index dd5410b..72c20c2 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/StoreTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/StoreTest.kt @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger @ExperimentalCoroutinesApi @RunWith(Parameterized::class) class StoreTest( - private val storeType: TestStoreType + private val storeType: TestStoreType ) { private val testScope = TestCoroutineScope() private val counter = AtomicInteger(0) @@ -41,19 +41,20 @@ class StoreTest( @Test fun testSimple() = testScope.runBlockingTest { val simpleStore = TestStoreBuilder.from( - fetcher = fetcher, - persister = persister + scope = testScope, + fetcher = fetcher, + persister = persister ).build(storeType) whenever(fetcher.fetch(barCode)) - .thenReturn(NETWORK) + .thenReturn(NETWORK) whenever(persister.read(barCode)) - .thenReturn(null) - .thenReturn(DISK) + .thenReturn(null) + .thenReturn(DISK) whenever(persister.write(barCode, NETWORK)) - .thenReturn(true) + .thenReturn(true) var value = simpleStore.get(barCode) @@ -66,24 +67,25 @@ class StoreTest( @Test fun testDoubleTap() = testScope.runBlockingTest { val simpleStore = TestStoreBuilder.from( - fetcher = fetcher, - persister = persister + scope = testScope, + fetcher = fetcher, + persister = persister ).build(storeType) whenever(fetcher.fetch(barCode)) - .thenAnswer { - if (counter.incrementAndGet() == 1) { - NETWORK - } else { - throw RuntimeException("Yo Dawg your inflight is broken") - } + .thenAnswer { + if (counter.incrementAndGet() == 1) { + NETWORK + } else { + throw RuntimeException("Yo Dawg your inflight is broken") } + } whenever(persister.read(barCode)) - .thenReturn(null) - .thenReturn(DISK) + .thenReturn(null) + .thenReturn(DISK) whenever(persister.write(barCode, NETWORK)) - .thenReturn(true) + .thenReturn(true) val deferred = async { simpleStore.get(barCode) } @@ -97,21 +99,22 @@ class StoreTest( fun testSubclass() = testScope.runBlockingTest { val simpleStore = TestStoreBuilder.from( - fetcher = fetcher, - persister = persister + scope = testScope, + fetcher = fetcher, + persister = persister ).build(storeType) simpleStore.clear(barCode) whenever(fetcher.fetch(barCode)) - .thenReturn(NETWORK) + .thenReturn(NETWORK) whenever(persister.read(barCode)) - .thenReturn(null) - .thenReturn(DISK) + .thenReturn(null) + .thenReturn(DISK) whenever(persister.write(barCode, NETWORK)).thenReturn(true) - var value = simpleStore.get(barCode) + var value = simpleStore.get(barCode) assertThat(value).isEqualTo(DISK) value = simpleStore.get(barCode) assertThat(value).isEqualTo(DISK) @@ -122,13 +125,14 @@ class StoreTest( fun testNoopAndDefault() = testScope.runBlockingTest { val persister = spy(NoopPersister.create()) val simpleStore = TestStoreBuilder.from( - fetcher = fetcher, - persister = persister, - cached = true + scope = testScope, + fetcher = fetcher, + persister = persister, + cached = true ).build(storeType) whenever(fetcher.fetch(barCode)) - .thenReturn(NETWORK) + .thenReturn(NETWORK) var value = simpleStore.get(barCode) verify(fetcher, times(1)).fetch(barCode) @@ -148,9 +152,9 @@ class StoreTest( @Test fun testEquivalence() = testScope.runBlockingTest { val cache = CacheBuilder.newBuilder() - .maximumSize(1) - .expireAfterAccess(java.lang.Long.MAX_VALUE, TimeUnit.SECONDS) - .build() + .maximumSize(1) + .expireAfterAccess(java.lang.Long.MAX_VALUE, TimeUnit.SECONDS) + .build() cache.put(barCode, MEMORY) var value = cache.getIfPresent(barCode) @@ -163,9 +167,10 @@ class StoreTest( @Test fun testFreshUsesOnlyNetwork() = testScope.runBlockingTest { val simpleStore = TestStoreBuilder.from( - fetcher = fetcher, - persister = persister, - persisterStalePolicy = StalePolicy.NETWORK_BEFORE_STALE + scope = testScope, + fetcher = fetcher, + persister = persister, + persisterStalePolicy = StalePolicy.NETWORK_BEFORE_STALE ).build(storeType) whenever(fetcher.fetch(barCode)) doThrow RuntimeException(ERROR) diff --git a/store/src/test/java/com/nytimes/android/external/store3/StoreThrowOnNoItems.kt b/store/src/test/java/com/nytimes/android/external/store3/StoreThrowOnNoItems.kt index 1d71402..5b04b93 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/StoreThrowOnNoItems.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/StoreThrowOnNoItems.kt @@ -18,7 +18,7 @@ import java.util.concurrent.atomic.AtomicInteger @ExperimentalCoroutinesApi @RunWith(Parameterized::class) class StoreThrowOnNoItems( - private val storeType: TestStoreType + private val storeType: TestStoreType ) { private val testScope = TestCoroutineScope() private val counter = AtomicInteger(0) @@ -29,15 +29,16 @@ class StoreThrowOnNoItems( @Test fun testShouldThrowOnFetcherEmitsNoSuckElementException() = testScope.runBlockingTest { val simpleStore = TestStoreBuilder.from( - fetcher = fetcher + scope = testScope, + fetcher = fetcher ).build(storeType) whenever(fetcher.fetch(barCode)) - .thenThrow(NoSuchElementException()) + .thenThrow(NoSuchElementException()) try { - simpleStore.get(barCode) - fail("exception not thrown when no items emitted from fetcher") + val unexpected = simpleStore.get(barCode) + fail("exception not thrown when no items emitted from fetcher $unexpected") } catch (e: NoSuchElementException) { assertThat(e).isInstanceOf(NoSuchElementException::class.java) } diff --git a/store/src/test/java/com/nytimes/android/external/store3/StoreWithParserTest.kt b/store/src/test/java/com/nytimes/android/external/store3/StoreWithParserTest.kt index 07a298e..8f47f39 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/StoreWithParserTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/StoreWithParserTest.kt @@ -19,7 +19,7 @@ import org.mockito.Mockito.verify @ExperimentalCoroutinesApi @RunWith(Parameterized::class) class StoreWithParserTest( - private val storeType: TestStoreType + private val storeType: TestStoreType ) { private val testScope = TestCoroutineScope() private val fetcher: Fetcher = mock() @@ -31,20 +31,21 @@ class StoreWithParserTest( @Test fun testSimple() = testScope.runBlockingTest { val simpleStore = TestStoreBuilder.fromPostParser( - fetcher = fetcher, - persister = persister, - postParser = parser + scope = testScope, + fetcher = fetcher, + persister = persister, + postParser = parser ).build(storeType) whenever(fetcher.fetch(barCode)) - .thenReturn(NETWORK) + .thenReturn(NETWORK) whenever(persister.read(barCode)) - .thenReturn(null) - .thenReturn(DISK) + .thenReturn(null) + .thenReturn(DISK) whenever(persister.write(barCode, NETWORK)) - .thenReturn(true) + .thenReturn(true) whenever(parser.apply(DISK)).thenReturn(barCode.key) @@ -58,19 +59,20 @@ class StoreWithParserTest( @Test fun testSubclass() = testScope.runBlockingTest { val simpleStore = TestStoreBuilder.fromPostParser( - fetcher = fetcher, - persister = persister, - postParser = parser + scope = testScope, + fetcher = fetcher, + persister = persister, + postParser = parser ).build(storeType) whenever(fetcher.fetch(barCode)) - .thenReturn(NETWORK) + .thenReturn(NETWORK) whenever(persister.read(barCode)) - .thenReturn(null) - .thenReturn(DISK) + .thenReturn(null) + .thenReturn(DISK) whenever(persister.write(barCode, NETWORK)) - .thenReturn(true) + .thenReturn(true) whenever(parser.apply(DISK)).thenReturn(barCode.key) diff --git a/store/src/test/java/com/nytimes/android/external/store3/StreamOneKeyTest.kt b/store/src/test/java/com/nytimes/android/external/store3/StreamOneKeyTest.kt index 271b04d..0384dac 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/StreamOneKeyTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/StreamOneKeyTest.kt @@ -17,50 +17,52 @@ import org.junit.runners.Parameterized @ExperimentalCoroutinesApi @RunWith(Parameterized::class) class StreamOneKeyTest( - private val storeType: TestStoreType + private val storeType: TestStoreType ) { val fetcher: Fetcher = mock() val persister: Persister = mock() private val barCode = BarCode("key", "value") private val barCode2 = BarCode("key2", "value2") + private val testScope = TestCoroutineScope() private val store = TestStoreBuilder.from( - fetcher = fetcher, - persister = persister + scope = testScope, + fetcher = fetcher, + persister = persister ).build(storeType) - private val testScope = TestCoroutineScope() + @Before fun setUp() = runBlockingTest { whenever(fetcher.fetch(barCode)) - .thenReturn(TEST_ITEM) - .thenReturn(TEST_ITEM2) + .thenReturn(TEST_ITEM) + .thenReturn(TEST_ITEM2) whenever(persister.read(barCode)) - .let { - // the backport stream method of Pipeline to Store does not skip disk so we - // make sure disk returns empty value first - if (storeType == TestStoreType.Pipeline) { - it.thenReturn(null) - } else { - it - } + .let { + // the backport stream method of Pipeline to Store does not skip disk so we + // make sure disk returns empty value first + if (storeType != TestStoreType.Store) { + it.thenReturn(null) + } else { + it } - .thenReturn(TEST_ITEM) - .thenReturn(TEST_ITEM2) + } + .thenReturn(TEST_ITEM) + .thenReturn(TEST_ITEM2) whenever(persister.write(barCode, TEST_ITEM)) - .thenReturn(true) + .thenReturn(true) whenever(persister.write(barCode, TEST_ITEM2)) - .thenReturn(true) + .thenReturn(true) } @Suppress("UsePropertyAccessSyntax") // for assert isTrue() isFalse() @Test - fun testStream() = runBlockingTest { + fun testStream() = testScope.runBlockingTest { val streamSubscription = store.stream(barCode) - .openChannelSubscription() + .openChannelSubscription() try { if (storeType == TestStoreType.Store) { //stream doesn't invoke get anymore so when we call it the channel is empty @@ -82,11 +84,11 @@ class StreamOneKeyTest( assertThat(streamSubscription.poll()).isEqualTo(TEST_ITEM) //get for another barcode should not trigger a stream for barcode1 whenever(fetcher.fetch(barCode2)) - .thenReturn(TEST_ITEM) + .thenReturn(TEST_ITEM) whenever(persister.read(barCode2)) - .thenReturn(TEST_ITEM) + .thenReturn(TEST_ITEM) whenever(persister.write(barCode2, TEST_ITEM)) - .thenReturn(true) + .thenReturn(true) store.get(barCode2) assertThat(streamSubscription.isEmpty).isTrue() diff --git a/store/src/test/java/com/nytimes/android/external/store3/StreamTest.kt b/store/src/test/java/com/nytimes/android/external/store3/StreamTest.kt index e662bbe..2ad3b3a 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/StreamTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/StreamTest.kt @@ -26,7 +26,7 @@ import org.junit.runners.Parameterized @ObsoleteCoroutinesApi @RunWith(Parameterized::class) class StreamTest( - private val storeType: TestStoreType + private val storeType: TestStoreType ) { private val testScope = TestCoroutineScope() private val fetcher: Fetcher = mock() @@ -35,8 +35,9 @@ class StreamTest( private val barCode = BarCode("key", "value") private val store = TestStoreBuilder.from( - fetcher = fetcher, - persister = persister + scope = testScope, + fetcher = fetcher, + persister = persister ).build(storeType) @Before @@ -44,17 +45,17 @@ class StreamTest( whenever(fetcher.fetch(barCode)).thenReturn(TEST_ITEM) whenever(persister.read(barCode)) - .thenReturn(null) - .thenReturn(TEST_ITEM) + .thenReturn(null) + .thenReturn(TEST_ITEM) whenever(persister.write(barCode, TEST_ITEM)) - .thenReturn(true) + .thenReturn(true) } @Suppress("UsePropertyAccessSyntax")// for isTrue() / isFalse() @Test fun testStream() = testScope.runBlockingTest { - if (storeType == TestStoreType.Pipeline) { + if (storeType != TestStoreType.Store) { throw AssumptionViolatedException("Pipeline store does not support stream() no arg") } val streamSubscription = store.stream().openChannelSubscription() @@ -70,7 +71,7 @@ class StreamTest( @Suppress("UsePropertyAccessSyntax")// for isTrue() / isFalse() @Test fun testStreamEmitsOnlyFreshData() = testScope.runBlockingTest { - if (storeType == TestStoreType.Pipeline) { + if (storeType != TestStoreType.Store) { throw AssumptionViolatedException("Pipeline store does not support stream() no arg") } store.get(barCode) @@ -93,4 +94,4 @@ class StreamTest( @ExperimentalCoroutinesApi fun Flow.openChannelSubscription() = - broadcastIn(GlobalScope + Dispatchers.Unconfined).openSubscription() \ No newline at end of file + broadcastIn(GlobalScope + Dispatchers.Unconfined).openSubscription() \ No newline at end of file diff --git a/store/src/test/java/com/nytimes/android/external/store3/TestStoreBuilder.kt b/store/src/test/java/com/nytimes/android/external/store3/TestStoreBuilder.kt index 797d08c..e5adf84 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/TestStoreBuilder.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/TestStoreBuilder.kt @@ -16,199 +16,246 @@ import com.nytimes.android.external.store3.pipeline.withCache import com.nytimes.android.external.store3.pipeline.withKeyConverter import com.nytimes.android.external.store3.pipeline.withNonFlowPersister import com.nytimes.android.external.store3.util.KeyParser +import com.nytimes.android.external.store4.RealInternalCoroutineStore +import com.nytimes.android.external.store4.SourceOfTruth +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.flow data class TestStoreBuilder( - private val buildStore: () -> Store, - private val buildPipelineStore: () -> Store + private val buildStore: () -> Store, + private val buildPipelineStore: () -> Store, + private val builRealInternalCoroutineStore: () -> Store ) { fun build(storeType: TestStoreType): Store = when (storeType) { TestStoreType.Store -> buildStore() TestStoreType.Pipeline -> buildPipelineStore() + TestStoreType.CoroutineInternal -> builRealInternalCoroutineStore() } companion object { fun from( - inflight: Boolean = true, - fetcher: suspend (Key) -> Output + scope: CoroutineScope, + inflight: Boolean = true, + fetcher: suspend (Key) -> Output ): TestStoreBuilder = from( - inflight = inflight, - persister = null, - fetchParser = null, - fetcher = fetcher + scope = scope, + inflight = inflight, + persister = null, + fetchParser = null, + fetcher = fetcher ) fun from( - inflight: Boolean = true, - fetcher: suspend (Key) -> Output, - fetchParser : KeyParser + scope: CoroutineScope, + inflight: Boolean = true, + fetcher: suspend (Key) -> Output, + fetchParser: KeyParser ): TestStoreBuilder = from( - inflight = inflight, - persister = null, - fetchParser = fetchParser, - fetcher = fetcher + scope = scope, + inflight = inflight, + persister = null, + fetchParser = fetchParser, + fetcher = fetcher ) fun from( - inflight: Boolean = true, - fetcher: Fetcher, - fetchParser : Parser, - persister: Persister + scope: CoroutineScope, + inflight: Boolean = true, + fetcher: Fetcher, + fetchParser: Parser, + persister: Persister ): TestStoreBuilder = from( - inflight = inflight, - persister = persister, - fetchParser = object : KeyParser { - override suspend fun apply(key: Key, raw: Output): Output { - return fetchParser.apply(raw) - } - }, - fetcher = fetcher + scope = scope, + inflight = inflight, + persister = persister, + fetchParser = object : KeyParser { + override suspend fun apply(key: Key, raw: Output): Output { + return fetchParser.apply(raw) + } + }, + fetcher = fetcher ) fun fromPostParser( - inflight: Boolean = true, - fetcher: Fetcher, - postParser : Parser, - persister: Persister + scope: CoroutineScope, + inflight: Boolean = true, + fetcher: Fetcher, + postParser: Parser, + persister: Persister ): TestStoreBuilder = from( - inflight = inflight, - persister = persister, - postParser = object : KeyParser { - override suspend fun apply(key: Key, raw: Output): Output { - return postParser.apply(raw) - } - }, - fetcher = fetcher + scope = scope, + inflight = inflight, + persister = persister, + postParser = object : KeyParser { + override suspend fun apply(key: Key, raw: Output): Output { + return postParser.apply(raw) + } + }, + fetcher = fetcher ) @Suppress("UNCHECKED_CAST") fun from( - inflight: Boolean = true, - cached : Boolean = false, - cacheMemoryPolicy: MemoryPolicy? = null, - persister: Persister? = null, - persisterStalePolicy: StalePolicy = StalePolicy.UNSPECIFIED, - fetchParser: KeyParser? = null, - fetcher: suspend (Key) -> Output + scope: CoroutineScope, + inflight: Boolean = true, + cached: Boolean = false, + cacheMemoryPolicy: MemoryPolicy? = null, + persister: Persister? = null, + persisterStalePolicy: StalePolicy = StalePolicy.UNSPECIFIED, + fetchParser: KeyParser? = null, + fetcher: suspend (Key) -> Output ): TestStoreBuilder = from( - inflight = inflight, - cached = cached, - cacheMemoryPolicy = cacheMemoryPolicy, - persister = persister, - persisterStalePolicy = persisterStalePolicy, - fetchParser = fetchParser, - fetcher = object : Fetcher { - override suspend fun fetch(key: Key): Output = fetcher(key) - } + scope = scope, + inflight = inflight, + cached = cached, + cacheMemoryPolicy = cacheMemoryPolicy, + persister = persister, + persisterStalePolicy = persisterStalePolicy, + fetchParser = fetchParser, + fetcher = object : Fetcher { + override suspend fun fetch(key: Key): Output = fetcher(key) + } ) + @Suppress("UNCHECKED_CAST") fun from( - inflight: Boolean = true, - cached : Boolean = false, - cacheMemoryPolicy: MemoryPolicy? = null, - persister: Persister? = null, - persisterStalePolicy: StalePolicy = StalePolicy.UNSPECIFIED, - // parser that runs after fetch - fetchParser: KeyParser? = null, - // parser that runs after get from db - postParser: KeyParser? = null, - fetcher: Fetcher + scope: CoroutineScope, + inflight: Boolean = true, + cached: Boolean = false, + cacheMemoryPolicy: MemoryPolicy? = null, + persister: Persister? = null, + persisterStalePolicy: StalePolicy = StalePolicy.UNSPECIFIED, + // parser that runs after fetch + fetchParser: KeyParser? = null, + // parser that runs after get from db + postParser: KeyParser? = null, + fetcher: Fetcher ): TestStoreBuilder { return TestStoreBuilder( - buildStore = { - Store.from( - inflight = inflight, - f = fetcher - ).let { - if (fetchParser == null) { - it - } else { - it.parser(fetchParser) + buildStore = { + Store.from( + inflight = inflight, + f = fetcher + ).let { + if (fetchParser == null) { + it + } else { + it.parser(fetchParser) + } + }.let { + if (persister == null) { + it + } else { + it.persister(persister, persisterStalePolicy) + } + }.let { + if (postParser == null) { + it + } else { + it.parser(postParser) + } + }.let { + if (cached) { + it.cache(cacheMemoryPolicy) + } else { + it + } + }.open() + }, + buildPipelineStore = { + beginPipeline( + fetcher = { + flow { + emit(fetcher.fetch(it)) } - }.let { + } + ).let { + if (fetchParser == null) { + it + } else { + it.withKeyConverter { key, oldOutput -> + fetchParser.apply(key, oldOutput) + } + } + }.let { + if (persister == null) { + it + } else { + it.withNonFlowPersister( + reader = { + persister.read(it) + }, + writer = { key, value -> + persister.write(key, value) + }, + delete = if (persister is Clearable<*>) { + SuspendWrapper( + (persister as Clearable)::clear + )::apply + } else { + null + } + ) + } + }.let { + if (cached) { + it.withCache(cacheMemoryPolicy) + } else { + it + } + }.let { + if (postParser == null) { + it + } else { + it.withKeyConverter { key, oldOutput -> + postParser.apply(key, oldOutput) + } + } + }.open() + }, + builRealInternalCoroutineStore = { + RealInternalCoroutineStore + .beginWithFlowingFetcher { key: Key -> + flow { + val value = fetcher.fetch(key = key) + if (fetchParser != null) { + emit(fetchParser.apply(key, value)) + } else { + emit(value) + } + } + } + .scope(scope) + .let { if (persister == null) { it } else { - it.persister(persister, persisterStalePolicy) + it.sourceOfTruth(SourceOfTruth.fromLegacy(persister, postParser)) } - }.let { - if (postParser == null) { - it - } else { - it.parser(postParser) - } - }.let { + } + .let { if (cached) { - it.cache(cacheMemoryPolicy) - } else { - it - } - }.open() - }, - buildPipelineStore = { - beginPipeline( - fetcher = { - flow { - emit(fetcher.fetch(it)) - } - } - ).let { - if (fetchParser == null) { it } else { - it.withKeyConverter { key, oldOutput -> - fetchParser.apply(key, oldOutput) - } + it.disableCache() } - }.let { - if (persister == null) { - it - } else { - it.withNonFlowPersister( - reader = { - persister.read(it) - }, - writer = { key, value -> - persister.write(key, value) - }, - delete = if (persister is Clearable<*>) { - SuspendWrapper( - (persister as Clearable)::clear - )::apply - } else { - null - } - ) - } - }.let { - if (cached) { - it.withCache(cacheMemoryPolicy) - } else { - it - } - }.let { - if (postParser == null) { - it - } else { - it.withKeyConverter { key, oldOutput -> - postParser.apply(key, oldOutput) - } - } - }.open() - } + } + .build().asLegacyStore() + } ) } } // wraps a regular fun to suspend, couldn't figure out how to create suspend fun variables :/ private class SuspendWrapper( - val f : (P0) -> R + val f: (P0) -> R ) { - suspend fun apply(input : P0) : R = f(input) + suspend fun apply(input: P0): R = f(input) } } enum class TestStoreType { Store, - Pipeline -} \ No newline at end of file + Pipeline, + CoroutineInternal +} diff --git a/store/src/test/java/com/nytimes/android/external/store3/pipeline/PipelineStoreTest.kt b/store/src/test/java/com/nytimes/android/external/store3/pipeline/PipelineStoreTest.kt index 82fac52..79a99c3 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/pipeline/PipelineStoreTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/pipeline/PipelineStoreTest.kt @@ -1,10 +1,12 @@ package com.nytimes.android.external.store3.pipeline +import com.nytimes.android.external.store3.TestStoreType import com.nytimes.android.external.store3.pipeline.ResponseOrigin.Cache import com.nytimes.android.external.store3.pipeline.ResponseOrigin.Fetcher import com.nytimes.android.external.store3.pipeline.ResponseOrigin.Persister import com.nytimes.android.external.store3.pipeline.StoreResponse.Data import com.nytimes.android.external.store3.pipeline.StoreResponse.Loading +import com.nytimes.android.external.store4.RealInternalCoroutineStore import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow @@ -14,17 +16,18 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.take import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.TestCoroutineScope import kotlinx.coroutines.test.runBlockingTest import org.assertj.core.api.Assertions.assertThat import org.junit.Test import org.junit.runner.RunWith -import org.junit.runners.JUnit4 +import org.junit.runners.Parameterized @ExperimentalCoroutinesApi -@RunWith(JUnit4::class) -class PipelineStoreTest { +@RunWith(Parameterized::class) +class PipelineStoreTest( + private val storeType: TestStoreType +) { private val testScope = TestCoroutineScope() @Test @@ -33,10 +36,12 @@ class PipelineStoreTest { 3 to "three-1", 3 to "three-2" ) - val pipeline = beginNonFlowingPipeline(fetcher::fetch) - .withCache() + val pipeline = build( + nonFlowingFetcher = fetcher::fetch, + enableCache = true + ) pipeline.stream(StoreRequest.cached(3, refresh = false)) - .assertCompleteStream( + .assertItems( Loading( origin = Fetcher ), Data( @@ -45,7 +50,7 @@ class PipelineStoreTest { ) ) pipeline.stream(StoreRequest.cached(3, refresh = false)) - .assertCompleteStream( + .assertItems( Data( value = "three-1", origin = Cache @@ -71,18 +76,18 @@ class PipelineStoreTest { } @Test - fun getAndFresh_withPersister() = runBlocking { + fun getAndFresh_withPersister() = testScope.runBlockingTest { val fetcher = FakeFetcher( 3 to "three-1", 3 to "three-2" ) val persister = InMemoryPersister() - val pipeline = beginNonFlowingPipeline(fetcher::fetch) - .withNonFlowPersister( - reader = persister::read, - writer = persister::write - ) - .withCache() + val pipeline = build( + nonFlowingFetcher = fetcher::fetch, + persisterReader = persister::read, + persisterWriter = persister::write, + enableCache = true + ) pipeline.stream(StoreRequest.cached(3, refresh = false)) .assertItems( Loading( @@ -127,12 +132,12 @@ class PipelineStoreTest { ) val persister = InMemoryPersister() - val pipeline = beginNonFlowingPipeline(fetcher::fetch) - .withNonFlowPersister( - reader = persister::read, - writer = persister::write - ) - .withCache() + val pipeline = build( + nonFlowingFetcher = fetcher::fetch, + persisterReader = persister::read, + persisterWriter = persister::write, + enableCache = true + ) pipeline.stream(StoreRequest.cached(3, refresh = true)) .assertItems( @@ -171,11 +176,13 @@ class PipelineStoreTest { 3 to "three-1", 3 to "three-2" ) - val pipeline = beginNonFlowingPipeline(fetcher::fetch) - .withCache() + val pipeline = build( + nonFlowingFetcher = fetcher::fetch, + enableCache = true + ) pipeline.stream(StoreRequest.cached(3, refresh = true)) - .assertCompleteStream( + .assertItems( Loading( origin = Fetcher ), @@ -186,7 +193,7 @@ class PipelineStoreTest { ) pipeline.stream(StoreRequest.cached(3, refresh = true)) - .assertCompleteStream( + .assertItems( Data( value = "three-1", origin = Cache @@ -207,11 +214,13 @@ class PipelineStoreTest { 3 to "three-1", 3 to "three-2" ) - val pipeline = beginNonFlowingPipeline(fetcher::fetch) - .withCache() + val pipeline = build( + nonFlowingFetcher = fetcher::fetch, + enableCache = true + ) pipeline.stream(StoreRequest.skipMemory(3, refresh = false)) - .assertCompleteStream( + .assertItems( Loading( origin = Fetcher ), @@ -222,7 +231,7 @@ class PipelineStoreTest { ) pipeline.stream(StoreRequest.skipMemory(3, refresh = false)) - .assertCompleteStream( + .assertItems( Loading( origin = Fetcher ), @@ -241,11 +250,13 @@ class PipelineStoreTest { ) val persister = InMemoryPersister() - val pipeline = beginPipeline(fetcher::createFlow) - .withNonFlowPersister( - reader = persister::read, - writer = persister::write - ) + val pipeline = build( + flowingFetcher = fetcher::createFlow, + persisterReader = persister::read, + persisterWriter = persister::write, + enableCache = false + ) + pipeline.stream(StoreRequest.fresh(3)) .assertItems( Loading( @@ -260,37 +271,40 @@ class PipelineStoreTest { origin = Fetcher ) ) - - pipeline.stream(StoreRequest.cached(3, refresh = true)).assertItems( - Data( - value = "three-2", - origin = Persister - ), - Loading( - origin = Fetcher - ), - Data( - value = "three-1", - origin = Fetcher - ), - Data( - value = "three-2", - origin = Fetcher + pipeline.stream(StoreRequest.cached(3, refresh = true)) + .assertItems( + Data( + value = "three-2", + origin = Persister + ), + Loading( + origin = Fetcher + ), + Data( + value = "three-1", + origin = Fetcher + ), + Data( + value = "three-2", + origin = Fetcher + ) ) - ) } @Test fun diskChangeWhileNetworkIsFlowing_simple() = testScope.runBlockingTest { val persister = InMemoryPersister().asObservable() - val pipeline = beginPipeline { - flow { - // never emit - } - }.withPersister( - reader = persister::flowReader, - writer = persister::flowWriter + val pipeline = build( + flowingFetcher = { + flow { + + } + }, + flowingPersisterReader = persister::flowReader, + persisterWriter = persister::flowWriter, + enableCache = false ) + launch { delay(10) persister.flowWriter(3, "local-1") @@ -311,16 +325,18 @@ class PipelineStoreTest { @Test fun diskChangeWhileNetworkIsFlowing_overwrite() = testScope.runBlockingTest { val persister = InMemoryPersister().asObservable() - val pipeline = beginPipeline { - flow { - delay(10) - emit("three-1") - delay(10) - emit("three-2") - } - }.withPersister( - reader = persister::flowReader, - writer = persister::flowWriter + val pipeline = build( + flowingFetcher = { + flow { + delay(10) + emit("three-1") + delay(10) + emit("three-2") + } + }, + flowingPersisterReader = persister::flowReader, + persisterWriter = persister::flowWriter, + enableCache = false ) launch { delay(5) @@ -357,11 +373,13 @@ class PipelineStoreTest { fun errorTest() = testScope.runBlockingTest { val exception = IllegalArgumentException("wow") val persister = InMemoryPersister().asObservable() - val pipeline = beginNonFlowingPipeline { key: Int -> - throw exception - }.withPersister( - reader = persister::flowReader, - writer = persister::flowWriter + val pipeline = build( + nonFlowingFetcher = { + throw exception + }, + flowingPersisterReader = persister::flowReader, + persisterWriter = persister::flowWriter, + enableCache = false ) launch { delay(10) @@ -420,8 +438,11 @@ class PipelineStoreTest { responses.filter { it.first == key }.forEach { - emit(it.second) + // we delay here to avoid collapsing fetcher values, otherwise, there is a + // possibility that consumer won't be fast enough to get both values before new + // value overrides the previous one. delay(1) + emit(it.second) } } } @@ -441,7 +462,7 @@ class PipelineStoreTest { } } - private class InMemoryPersister { + class InMemoryPersister { private val data = mutableMapOf() @Suppress("RedundantSuspendModifier")// for function reference @@ -468,12 +489,93 @@ class PipelineStoreTest { .isEqualTo(expected.toList()) } - /** - * Takes all elements from the stream and asserts them. - * Use this if test does not have an infinite flow (e.g. no persister or no infinite fetcher) - */ - private suspend fun Flow.assertCompleteStream(vararg expected: T) { - assertThat(this.toList()) - .isEqualTo(expected.toList()) + private fun build( + nonFlowingFetcher: (suspend (Key) -> Input)? = null, + flowingFetcher: ((Key) -> Flow)? = null, + persisterReader: (suspend (Key) -> Output?)? = null, + flowingPersisterReader: ((Key) -> Flow)? = null, + persisterWriter: (suspend (Key, Input) -> Unit)? = null, + persisterDelete: (suspend (Key) -> Unit)? = null, + enableCache: Boolean + ): PipelineStore { + check(nonFlowingFetcher != null || flowingFetcher != null) { + "need to provide a fetcher" + } + check(nonFlowingFetcher == null || flowingFetcher == null) { + "need 1 fetcher" + } + check(persisterReader == null || flowingPersisterReader == null) { + "need 0 or 1 persister" + } + if (storeType == TestStoreType.Pipeline) { + return if (nonFlowingFetcher != null) { + beginNonFlowingPipeline(nonFlowingFetcher) + } else { + beginPipeline(flowingFetcher!!) + }.let { + when { + flowingPersisterReader != null -> it.withPersister( + reader = flowingPersisterReader, + writer = persisterWriter!!, + delete = persisterDelete + ) + persisterReader != null -> it.withNonFlowPersister( + reader = persisterReader, + writer = persisterWriter!!, + delete = persisterDelete + ) + else -> it as PipelineStore + } + }.let { + if (enableCache) { + it.withCache() + } else { + it + } + } + } else if (storeType == TestStoreType.CoroutineInternal) { + return if (nonFlowingFetcher != null) { + RealInternalCoroutineStore.beginWithNonFlowingFetcher( + nonFlowingFetcher + ) + } else { + RealInternalCoroutineStore.beginWithFlowingFetcher( + flowingFetcher!! + ) + }.let { + when { + flowingPersisterReader != null -> it.persister( + reader = flowingPersisterReader, + writer = persisterWriter!!, + delete = persisterDelete + ) + persisterReader != null -> it.nonFlowingPersister( + reader = persisterReader, + writer = persisterWriter!!, + delete = persisterDelete + ) + else -> it + } + }.let { + if (enableCache) { + it + } else { + it.disableCache() + } + }.scope(testScope) + .build() + } else { + throw UnsupportedOperationException("cannot test $storeType") + } } -} \ No newline at end of file + + + companion object { + @JvmStatic + @Parameterized.Parameters(name = "{0}") + fun params() = listOf( + TestStoreType.Pipeline, + TestStoreType.CoroutineInternal + ) + } +} diff --git a/store/src/test/java/com/nytimes/android/external/store4/FetcherControllerTest.kt b/store/src/test/java/com/nytimes/android/external/store4/FetcherControllerTest.kt new file mode 100644 index 0000000..1e938cb --- /dev/null +++ b/store/src/test/java/com/nytimes/android/external/store4/FetcherControllerTest.kt @@ -0,0 +1,78 @@ +package com.nytimes.android.external.store4 + +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +@ExperimentalCoroutinesApi +@FlowPreview +@RunWith(JUnit4::class) +class FetcherControllerTest { + private val testScope = TestCoroutineScope() + @Test + fun simple() = testScope.runBlockingTest { + val fetcherController = FetcherController( + scope = testScope, + realFetcher = { key: Int -> + flow { + emit(key * key) + } + }, + sourceOfTruth = null + ) + val fetcher = fetcherController.getFetcher(3) + assertThat(fetcherController.fetcherSize()).isEqualTo(0) + val received = fetcher.onEach { + assertThat(fetcherController.fetcherSize()).isEqualTo(1) + }.first() + assertThat(received).isEqualTo(9) + assertThat(fetcherController.fetcherSize()).isEqualTo(0) + } + + @Test + fun concurrent() = testScope.runBlockingTest { + var createdCnt = 0 + val fetcherController = FetcherController( + scope = testScope, + realFetcher = { key: Int -> + createdCnt ++ + flow { + // make sure it takes time, otherwise, we may not share + delay(1) + emit(key * key) + } + }, + sourceOfTruth = null + ) + val fetcherCount = 20 + fun createFetcher() = async { + fetcherController.getFetcher(3) + .onEach { + assertThat(fetcherController.fetcherSize()).isEqualTo(1) + }.first() + } + val fetchers = (0 until fetcherCount).map { + createFetcher() + } + fetchers.forEach { + assertThat(it.await()).isEqualTo(9) + } + assertThat(fetcherController.fetcherSize()).isEqualTo(0) + assertThat(createdCnt).isEqualTo(1) + } +} diff --git a/store/src/test/java/com/nytimes/android/external/store4/SourceOfTruthWithBarrierTest.kt b/store/src/test/java/com/nytimes/android/external/store4/SourceOfTruthWithBarrierTest.kt new file mode 100644 index 0000000..d46e408 --- /dev/null +++ b/store/src/test/java/com/nytimes/android/external/store4/SourceOfTruthWithBarrierTest.kt @@ -0,0 +1,66 @@ +package com.nytimes.android.external.store4 + +import com.nytimes.android.external.store3.pipeline.PipelineStoreTest +import com.nytimes.android.external.store3.pipeline.ResponseOrigin +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.async +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test + +@FlowPreview +@ExperimentalCoroutinesApi +class SourceOfTruthWithBarrierTest { + private val testScope = TestCoroutineScope() + private val persister = PipelineStoreTest.InMemoryPersister() + private val delegate: SourceOfTruth = + PersistentSourceOfTruth( + realReader = { key -> + flow { + emit(persister.read(key)) + } + }, + realWriter = persister::write, + realDelete = null + ) + private val source = SourceOfTruthWithBarrier( + delegate = delegate + ) + + @Test + fun simple() = testScope.runBlockingTest { + val collector = async { + source.reader(1, CompletableDeferred(Unit)).take(2).toList() + } + source.write(1, "a") + assertThat(collector.await()).isEqualTo( + listOf( + DataWithOrigin(delegate.defaultOrigin, null), + DataWithOrigin(ResponseOrigin.Fetcher, "a") + ) + ) + assertThat(source.barrierCount()).isEqualTo(0) + } + + @Test + fun preAndPostWrites() = testScope.runBlockingTest { + source.write(1, "a") + val collector = async { + source.reader(1, CompletableDeferred(Unit)).take(2).toList() + } + source.write(1, "b") + assertThat(collector.await()).isEqualTo( + listOf( + DataWithOrigin(delegate.defaultOrigin, "a"), + DataWithOrigin(ResponseOrigin.Fetcher, "b") + ) + ) + assertThat(source.barrierCount()).isEqualTo(0) + } +} diff --git a/store/src/test/java/com/nytimes/android/external/store4/multiplex/ChannelManagerTest.kt b/store/src/test/java/com/nytimes/android/external/store4/multiplex/ChannelManagerTest.kt new file mode 100644 index 0000000..1e801f0 --- /dev/null +++ b/store/src/test/java/com/nytimes/android/external/store4/multiplex/ChannelManagerTest.kt @@ -0,0 +1,63 @@ +package com.nytimes.android.external.store4.multiplex + +import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.AddChannel +import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.DispatchValue +import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.RemoveChannel +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.async +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 + +@FlowPreview +@ExperimentalCoroutinesApi +@RunWith(JUnit4::class) +class ChannelManagerTest { + private val scope = TestCoroutineScope() + private val manager = ChannelManager( + scope, + 0, + onEach = {} + ) { + SharedFlowProducer( + scope, src = + flow { + suspendCancellableCoroutine { + // never end + } + }, + channelManager = it + ) + } + + @Test + fun simple() = scope.runBlockingTest { + val collection = async { + val channel = Channel>(Channel.UNLIMITED) + try { + manager.send(AddChannel(channel)) + channel.consumeAsFlow().take(2).toList() + .map { it.value } + } finally { + manager.send(RemoveChannel(channel)) + } + } + val ack1 = CompletableDeferred() + manager.send(DispatchValue("a", ack1)) + + val ack2 = CompletableDeferred() + manager.send(DispatchValue("b", ack2)) + assertThat(collection.await()).isEqualTo(listOf("a", "b")) + } +} diff --git a/store/src/test/java/com/nytimes/android/external/store4/multiplex/MultiplexTest.kt b/store/src/test/java/com/nytimes/android/external/store4/multiplex/MultiplexTest.kt new file mode 100644 index 0000000..1d92492 --- /dev/null +++ b/store/src/test/java/com/nytimes/android/external/store4/multiplex/MultiplexTest.kt @@ -0,0 +1,300 @@ +package com.nytimes.android.external.store4.multiplex + +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.async +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import kotlinx.coroutines.yield +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 + +@FlowPreview +@ExperimentalCoroutinesApi +@RunWith(JUnit4::class) +class MultiplexTest { + private val testScope = TestCoroutineScope() + + private fun createMultiplexer(f: () -> Flow): Multiplexer { + return Multiplexer(testScope, 0, f, {}) + } + + @Test + fun serialial_notShared() = testScope.runBlockingTest { + var createCnt = 0 + val activeFlow = createMultiplexer { + createCnt++ + when (createCnt) { + 1 -> flowOf("a", "b", "c") + 2 -> flowOf("d", "e", "f") + else -> throw AssertionError("should not create more") + } + } + assertThat(activeFlow.create().toList()) + .isEqualTo(listOf("a", "b", "c")) + assertThat(activeFlow.create().toList()) + .isEqualTo(listOf("d", "e", "f")) + } + + @Test + fun slowFastCollector() = testScope.runBlockingTest { + val activeFlow = createMultiplexer { + flowOf("a", "b", "c").onStart { + // make sure both registers on time so that no one drops a value + delay(100) + } + } + val c1 = async { + activeFlow.create().onEach { + delay(100) + }.toList() + } + val c2 = async { + activeFlow.create().onEach { + delay(200) + }.toList() + } + assertThat(c1.await()) + .isEqualTo(listOf("a", "b", "c")) + assertThat(c2.await()) + .isEqualTo(listOf("a", "b", "c")) + } + + @Test + fun slowDispatcher() = testScope.runBlockingTest { + val activeFlow = createMultiplexer { + flowOf("a", "b", "c").onEach { + delay(100) + } + } + val c1 = async { + activeFlow.create().toList() + } + val c2 = async { + activeFlow.create().toList() + } + assertThat(c1.await()).isEqualTo(listOf("a", "b", "c")) + assertThat(c2.await()).isEqualTo(listOf("a", "b", "c")) + } + + @Test + fun lateToTheParty_arrivesAfterUpstreamClosed() = testScope.runBlockingTest { + val activeFlow = createMultiplexer { + flowOf("a", "b", "c").onStart { + delay(100) + } + } + val c1 = async { + activeFlow.create().toList() + } + val c2 = async { + activeFlow.create().also { + delay(110) + }.toList() + } + assertThat(c1.await()).isEqualTo(listOf("a", "b", "c")) + assertThat(c2.await()).isEqualTo(listOf("a", "b", "c")) + } + + @Test + fun lateToTheParty_arrivesBeforeUpstreamClosed() = testScope.runBlockingTest { + var generationCounter = 0 + val activeFlow = createMultiplexer { + flow { + val gen = generationCounter++ + check(gen < 2) { + "created one too many" + } + emit("a_$gen") + delay(5) + emit("b_$gen") + delay(100) + } + } + val c1 = async { + activeFlow.create().onEach { + }.toList() + } + val c2 = async { + activeFlow.create().also { + delay(3) + }.toList() + } + val c3 = async { + activeFlow.create().also { + delay(20) + }.toList() + } + val lists = listOf(c1, c2, c3).map { + it.await() + } + assertThat(lists[0]).isEqualTo(listOf("a_0", "b_0")) + assertThat(lists[1]).isEqualTo(listOf("b_0")) + assertThat(lists[2]).isEqualTo(listOf("a_1", "b_1")) + } + + @Test + fun upstreamError() = testScope.runBlockingTest { + val exception = + MyCustomException("hey") + val activeFlow = createMultiplexer { + flow { + emit("a") + throw exception + } + } + val receivedValue = CompletableDeferred() + val receivedError = CompletableDeferred() + activeFlow.create() + .onEach { + check(receivedValue.isActive) { + "already received value" + } + receivedValue.complete(it) + }.catch { + check(receivedError.isActive) { + "already received error" + } + receivedError.complete(it) + }.toList() + assertThat(receivedValue.await()).isEqualTo("a") + val error = receivedError.await() + assertThat(error).isEqualTo(exception) + } + + @Test + fun upstreamError_secondJustGetsError() = testScope.runBlockingTest { + val exception = + MyCustomException("hey") + val dispatchedFirstValue = CompletableDeferred() + val registeredSecondCollector = CompletableDeferred() + val activeFlow = createMultiplexer { + flow { + emit("a") + dispatchedFirstValue.complete(Unit) + registeredSecondCollector.await() + yield() //yield to allow second collector to register + throw exception + } + } + launch { + activeFlow.create().catch { + + }.toList() + } + // wait until the above collector registers and receives first value + dispatchedFirstValue.await() + val receivedValue = CompletableDeferred() + val receivedError = CompletableDeferred() + activeFlow.create() + .onStart { + registeredSecondCollector.complete(Unit) + } + .onEach { + receivedValue.complete(it) + }.catch { + check(receivedError.isActive) { + "already received error" + } + receivedError.complete(it) + }.toList() + val error = receivedError.await() + assertThat(error).isEqualTo(exception) + // test sanity, second collector never receives a value + assertThat(receivedValue.isActive).isTrue() + } + + @Test + fun lateArrival_unregistersFromTheCorrectManager() = testScope.runBlockingTest { + var createdCount = 0 + var didntFinish = false + val activeFlow = createMultiplexer { + flow { + check(createdCount < 2) { + "created 1 too many" + } + val index = ++createdCount + emit("a_$index") + emit("b_$index") + delay(100) + if (index == 2) { + didntFinish = true + } + } + } + val firstCollector = async { + activeFlow.create().onEach { delay(5) }.take(2).toList() + } + delay(11) // miss first two values + val secondCollector = async { + // this will come in a new channel + activeFlow.create().take(2).toList() + } + assertThat(firstCollector.await()).isEqualTo(listOf("a_1", "b_1")) + assertThat(secondCollector.await()).isEqualTo(listOf("a_2", "b_2")) + assertThat(createdCount).isEqualTo(2) + delay(200) + assertThat(didntFinish).isEqualTo(false) + } + + @Test + fun lateArrival_buffered() = testScope.runBlockingTest { + var createdCount = 0 + val activeFlow = Multiplexer( + scope = testScope, + bufferSize = 2, + source = { + createdCount++ + flow { + emit("a") + delay(5) + emit("b") + emit("c") + emit("d") + delay(100) + emit("e") + // dont finish to see the buffer behavior + delay(2000) + } + }, + onEach = {} + ) + val c1 = async { + activeFlow.create().toList() + } + delay(4)// c2 misses first value + val c2 = async { + activeFlow.create().toList() + } + delay(50) // c3 misses first 4 values + val c3 = async { + activeFlow.create().toList() + } + delay(100) // c4 misses all values + val c4 = async { + activeFlow.create().toList() + } + assertThat(c1.await()).isEqualTo(listOf("a", "b", "c", "d", "e")) + assertThat(c2.await()).isEqualTo(listOf("a", "b", "c", "d", "e")) + assertThat(c3.await()).isEqualTo(listOf("c", "d", "e")) + assertThat(c4.await()).isEqualTo(listOf("d", "e")) + assertThat(createdCount).isEqualTo(1) + } + + class MyCustomException(val x: String) : RuntimeException("hello") { + override fun toString() = "custom$x" + } +}