From ba22d0c66c64050907fc7d9f72c237eed2c878b4 Mon Sep 17 00:00:00 2001 From: Yigit Boyar Date: Sun, 3 Nov 2019 07:41:32 -0800 Subject: [PATCH] Internal Store 3rd attempt (#35) * WIP publish test * add actor based implementation, seems the most promising * add notes into channel manager as well * use unlimited channel on the receiver to avoid launching to send * carry over remaining subscribers into a new flow * dispatch errors from upstream to all downstreams * carry over all leftovers at once to avoid starting producer before all is added to the list * handle swapping channel managers in the consumer This CL fixes an issue where we wouldn't unsubscribe from the right channel if the downstream is moved between channel managers due to not receiving any event after registering. I've also cleaned up dispatchError to close the channel with error instead of passing it down as if it is value and throwing again * allow live buffering this adds a live buffering functionality to actor publish where it only buffers if the upstream is still running * remove logging * move into src * code cleanup, more comments * first shot at new internal store, tests pass, code ugly * tmp builder for real internal store, starting pipeline tests * more wip in fixing pipeline store tests, a lot to cleanup * all tests pass * code cleanup * move multiplexer inside store4 * lots of cleanup of unnecessary code * release barriers that are not used * don't use pipeline persister. also fixed a barrier cleanup code in source of truth with barrier * revert simple persister as flowable change cleanup for clearstorememorytest * close multiplexers in fetcher controller when not used * code style fixes --- .gitignore | 3 + build.gradle | 3 +- buildsystem/dependencies.gradle | 2 +- .../external/store3/pipeline/PipelineStore.kt | 3 +- .../external/store3/pipeline/StoreResponse.kt | 3 +- .../external/store4/FetcherController.kt | 52 +++ .../android/external/store4/FlowMerge.kt | 35 ++ .../store4/RealInternalCoroutineStore.kt | 283 +++++++++++++++ .../external/store4/RefCountedResource.kt | 45 +++ .../android/external/store4/SourceOfTruth.kt | 90 +++++ .../store4/SourceOfTruthWithBarrier.kt | 122 +++++++ .../store4/multiplex/ChannelManager.kt | 321 +++++++++++++++++ .../external/store4/multiplex/Multiplexer.kt | 81 +++++ .../store4/multiplex/SharedFlowProducer.kt | 88 +++++ .../store4/multiplex/StoreRealActor.kt | 46 +++ .../external/store3/ClearStoreMemoryTest.kt | 6 +- .../android/external/store3/ClearStoreTest.kt | 35 +- .../external/store3/DontCacheErrorsTest1.kt | 12 +- .../android/external/store3/KeyParserTest.kt | 17 +- .../android/external/store3/NoNetworkTest.kt | 5 +- .../external/store3/ParsingFetcherTest.kt | 17 +- .../android/external/store3/SequentialTest.kt | 14 +- .../android/external/store3/StoreTest.kt | 73 ++-- .../external/store3/StoreThrowOnNoItems.kt | 11 +- .../external/store3/StoreWithParserTest.kt | 32 +- .../external/store3/StreamOneKeyTest.kt | 48 +-- .../android/external/store3/StreamTest.kt | 19 +- .../external/store3/TestStoreBuilder.kt | 329 ++++++++++-------- .../store3/pipeline/PipelineStoreTest.kt | 268 +++++++++----- .../external/store4/FetcherControllerTest.kt | 78 +++++ .../store4/SourceOfTruthWithBarrierTest.kt | 66 ++++ .../store4/multiplex/ChannelManagerTest.kt | 63 ++++ .../store4/multiplex/MultiplexTest.kt | 300 ++++++++++++++++ 33 files changed, 2209 insertions(+), 361 deletions(-) create mode 100644 store/src/main/java/com/nytimes/android/external/store4/FetcherController.kt create mode 100644 store/src/main/java/com/nytimes/android/external/store4/FlowMerge.kt create mode 100644 store/src/main/java/com/nytimes/android/external/store4/RealInternalCoroutineStore.kt create mode 100644 store/src/main/java/com/nytimes/android/external/store4/RefCountedResource.kt create mode 100644 store/src/main/java/com/nytimes/android/external/store4/SourceOfTruth.kt create mode 100644 store/src/main/java/com/nytimes/android/external/store4/SourceOfTruthWithBarrier.kt create mode 100644 store/src/main/java/com/nytimes/android/external/store4/multiplex/ChannelManager.kt create mode 100644 store/src/main/java/com/nytimes/android/external/store4/multiplex/Multiplexer.kt create mode 100644 store/src/main/java/com/nytimes/android/external/store4/multiplex/SharedFlowProducer.kt create mode 100644 store/src/main/java/com/nytimes/android/external/store4/multiplex/StoreRealActor.kt create mode 100644 store/src/test/java/com/nytimes/android/external/store4/FetcherControllerTest.kt create mode 100644 store/src/test/java/com/nytimes/android/external/store4/SourceOfTruthWithBarrierTest.kt create mode 100644 store/src/test/java/com/nytimes/android/external/store4/multiplex/ChannelManagerTest.kt create mode 100644 store/src/test/java/com/nytimes/android/external/store4/multiplex/MultiplexTest.kt diff --git a/.gitignore b/.gitignore index 7d11a5a..c8408e9 100644 --- a/.gitignore +++ b/.gitignore @@ -40,6 +40,9 @@ captures/ # Intellij *.iml .idea/ +.classpath +.project +.settings # Keystore files *.jks diff --git a/build.gradle b/build.gradle index 675fbd1..edb8e08 100644 --- a/build.gradle +++ b/build.gradle @@ -21,7 +21,7 @@ buildscript { ] dependencies { - classpath 'com.android.tools.build:gradle:3.6.0-alpha11' + classpath 'com.android.tools.build:gradle:3.6.0-beta01' classpath 'com.getkeepsafe.dexcount:dexcount-gradle-plugin:0.5.6' classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$rootProject.ext.versions.kotlin" classpath 'org.jetbrains.dokka:dokka-gradle-plugin:0.9.17' @@ -32,7 +32,6 @@ allprojects { buildscript {} repositories { - maven { url 'https://oss.sonatype.org/content/repositories/snapshots/'} mavenCentral() jcenter() } diff --git a/buildsystem/dependencies.gradle b/buildsystem/dependencies.gradle index f8df3e9..b308301 100644 --- a/buildsystem/dependencies.gradle +++ b/buildsystem/dependencies.gradle @@ -54,7 +54,7 @@ ext.versions = [ supportTestRunner : '0.4.1', espresso : '2.2.1', compileTesting : '0.8', - coroutines : '1.3.0', + coroutines : '1.3.2', ] ext.libraries = [ diff --git a/store/src/main/java/com/nytimes/android/external/store3/pipeline/PipelineStore.kt b/store/src/main/java/com/nytimes/android/external/store3/pipeline/PipelineStore.kt index edfbce2..ba3949d 100644 --- a/store/src/main/java/com/nytimes/android/external/store3/pipeline/PipelineStore.kt +++ b/store/src/main/java/com/nytimes/android/external/store3/pipeline/PipelineStore.kt @@ -44,7 +44,8 @@ fun PipelineStore.open(): Store { override fun stream(key: Key): Flow = self.stream( StoreRequest.skipMemory( key = key, - refresh = true) + refresh = true + ) ).transform { it.throwIfError() it.dataOrNull()?.let { diff --git a/store/src/main/java/com/nytimes/android/external/store3/pipeline/StoreResponse.kt b/store/src/main/java/com/nytimes/android/external/store3/pipeline/StoreResponse.kt index bc339b7..296a869 100644 --- a/store/src/main/java/com/nytimes/android/external/store3/pipeline/StoreResponse.kt +++ b/store/src/main/java/com/nytimes/android/external/store3/pipeline/StoreResponse.kt @@ -74,10 +74,11 @@ sealed class StoreResponse( else -> null } + @Suppress("UNCHECKED_CAST") internal fun swapType(): StoreResponse = when (this) { is Error -> Error(error, origin) is Loading -> Loading(origin) - else -> throw IllegalStateException("cannot swap type for $this") + is Data-> Data(value = value as R, origin = origin) } } diff --git a/store/src/main/java/com/nytimes/android/external/store4/FetcherController.kt b/store/src/main/java/com/nytimes/android/external/store4/FetcherController.kt new file mode 100644 index 0000000..33f830a --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/FetcherController.kt @@ -0,0 +1,52 @@ +package com.nytimes.android.external.store4 + +import com.nytimes.android.external.store4.multiplex.Multiplexer +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow + +/** + * This class maintains one and only 1 fetcher for a given [Key]. + */ +@FlowPreview +@ExperimentalCoroutinesApi +internal class FetcherController( + private val scope: CoroutineScope, + private val realFetcher: (Key) -> Flow, + private val sourceOfTruth: SourceOfTruthWithBarrier? +) { + private val fetchers = RefCountedResource( + create = { key: Key -> + Multiplexer( + scope = scope, + bufferSize = 0, + source = { + realFetcher(key) + }, + onEach = { + sourceOfTruth?.write(key, it) + } + ) + }, + onRelease = { key: Key, multiplexer: Multiplexer -> + multiplexer.close() + } + ) + + fun getFetcher(key: Key): Flow { + return flow { + val fetcher = fetchers.acquire(key) + try { + emitAll(fetcher.create()) + } finally { + fetchers.release(key, fetcher) + } + } + } + + // visible for testing + internal suspend fun fetcherSize() = fetchers.size() +} diff --git a/store/src/main/java/com/nytimes/android/external/store4/FlowMerge.kt b/store/src/main/java/com/nytimes/android/external/store4/FlowMerge.kt new file mode 100644 index 0000000..50fe623 --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/FlowMerge.kt @@ -0,0 +1,35 @@ +package com.nytimes.android.external.store4 + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.launch + +/** + * Merge implementation tells downstream what the source is and also uses a rendezvous channel + */ +@ExperimentalCoroutinesApi +internal fun Flow.merge(other: Flow): Flow> { + return channelFlow> { + launch { + this@merge.collect { + send(Either.Left(it)) + + } + } + launch { + other.collect { + send(Either.Right(it)) + } + } + }.buffer(Channel.RENDEZVOUS) +} + +internal sealed class Either { + data class Left(val value: T) : Either() + + data class Right(val value: R) : Either() +} diff --git a/store/src/main/java/com/nytimes/android/external/store4/RealInternalCoroutineStore.kt b/store/src/main/java/com/nytimes/android/external/store4/RealInternalCoroutineStore.kt new file mode 100644 index 0000000..56612f4 --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/RealInternalCoroutineStore.kt @@ -0,0 +1,283 @@ +package com.nytimes.android.external.store4 + +import com.com.nytimes.suspendCache.StoreCache +import com.nytimes.android.external.store3.base.impl.MemoryPolicy +import com.nytimes.android.external.store3.base.impl.StoreDefaults +import com.nytimes.android.external.store3.pipeline.CacheType +import com.nytimes.android.external.store3.pipeline.PipelineStore +import com.nytimes.android.external.store3.pipeline.ResponseOrigin +import com.nytimes.android.external.store3.pipeline.StoreRequest +import com.nytimes.android.external.store3.pipeline.StoreResponse +import com.nytimes.android.external.store3.pipeline.open +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.transform +import kotlinx.coroutines.flow.withIndex + +@ExperimentalCoroutinesApi +@FlowPreview +internal class RealInternalCoroutineStore( + private val scope: CoroutineScope, + private val fetcher: (Key) -> Flow, + sourceOfTruth: SourceOfTruth? = null, + private val memoryPolicy: MemoryPolicy? +) : PipelineStore { + /** + * This source of truth is either a real database or an in memory source of truth created by + * the builder. + * Whatever is given, we always put a [SourceOfTruthWithBarrier] in front of it so that while + * we write the value from fetcher into the disk, we can block reads to avoid sending new data + * as if it came from the server (the [StoreResponse.origin] field). + */ + private val sourceOfTruth: SourceOfTruthWithBarrier? = + sourceOfTruth?.let { + SourceOfTruthWithBarrier(it) + } + private val memCache = memoryPolicy?.let { + StoreCache.fromRequest>( + loader = { + TODO( + """ + This should've never been called. We don't need this anymore, should remove + loader after we clean old Store ? + """.trimIndent() + ) + }, + memoryPolicy = memoryPolicy + ) + } + /** + * Fetcher controller maintains 1 and only 1 `Multiplexer` for a given key to ensure network + * requests are shared. + */ + private val fetcherController = FetcherController( + scope = scope, + realFetcher = fetcher, + sourceOfTruth = this.sourceOfTruth + ) + + override fun stream(request: StoreRequest): Flow> { + return if (sourceOfTruth == null) { + createNetworkFlow( + request = request, + networkLock = null + ) + } else { + diskNetworkCombined(request) + }.onEach { + // whenever a value is dispatched, save it to the memory cache + if (it.origin != ResponseOrigin.Cache) { + it.dataOrNull()?.let { data -> + memCache?.put(request.key, data) + } + } + }.onStart { + // if there is anything cached, dispatch it first if requested + if (!request.shouldSkipCache(CacheType.MEMORY)) { + memCache?.getIfPresent(request.key)?.let { cached -> + emit(StoreResponse.Data(value = cached, origin = ResponseOrigin.Cache)) + } + } + } + } + + override suspend fun clear(key: Key) { + memCache?.invalidate(key) + sourceOfTruth?.delete(key) + } + + /** + * We want to stream from disk but also want to refresh. If requested or necessary. + * + * How it works: + * There are two flows: + * Fetcher: The flow we get for the fetching + * Disk: The flow we get from the [SourceOfTruth]. + * Both flows are controlled by a lock for each so that we can start the right one based on + * the request status or values we receive. + * + * Value is always returned from [SourceOfTruth] while the errors are dispatched from both the + * `Fetcher` and [SourceOfTruth]. + * + * There are two initialization paths: + * + * 1) Request wants to skip disk cache: + * In this case, we first start the fetcher flow. When fetcher flow provides something besides + * an error, we enable the disk flow. + * + * 2) Request does not want to skip disk cache: + * In this case, we first start the disk flow. If disk flow returns `null` or + * [StoreRequest.refresh] is set to `true`, we enable the fetcher flow. + * This ensures we first get the value from disk and then load from server if necessary. + */ + private fun diskNetworkCombined( + request: StoreRequest + ): Flow> { + val diskLock = CompletableDeferred() + val networkLock = CompletableDeferred() + val networkFlow = createNetworkFlow(request, networkLock) + if (!request.shouldSkipCache(CacheType.DISK)) { + diskLock.complete(Unit) + } + val diskFlow = sourceOfTruth!!.reader(request.key, diskLock) + // we use a merge implementation that gives the source of the flow so that we can decide + // based on that. + return networkFlow.merge(diskFlow.withIndex()) + .transform { + // left is Fetcher while right is source of truth + if (it is Either.Left) { + if (it.value !is StoreResponse.Data<*>) { + emit(it.value.swapType()) + } + // network sent something + if (it.value is StoreResponse.Data<*>) { + // unlocking disk only if network sent data so that fresh data request never + // receives disk data by mistake + diskLock.complete(Unit) + } + } else if (it is Either.Right) { + // right, that is data from disk + val (index, diskData) = it.value + if (diskData.value != null) { + emit( + StoreResponse.Data( + value = diskData.value, + origin = diskData.origin + ) as StoreResponse + ) + } + + // if this is the first disk value and it is null, we should enable fetcher + // TODO should we ignore the index and always enable? + if (index == 0 && (diskData.value == null || request.refresh)) { + networkLock.complete(Unit) + } + } + } + } + + private fun createNetworkFlow( + request: StoreRequest, + networkLock: CompletableDeferred? + ): Flow> { + return fetcherController + .getFetcher(request.key) + .map { + StoreResponse.Data( + value = it, + origin = ResponseOrigin.Fetcher + ) as StoreResponse + }.catch { + emit( + StoreResponse.Error( + error = it, + origin = ResponseOrigin.Fetcher + ) + ) + }.onStart { + if (!request.shouldSkipCache(CacheType.DISK)) { + // wait until network gives us the go + networkLock?.await() + } + emit( + StoreResponse.Loading( + origin = ResponseOrigin.Fetcher + ) + ) + }.map { + it.swapType() + } + } + + fun asLegacyStore() = open() + + // TODO this builder w/ 3 type args is really ugly, think more about it... + companion object { + fun beginWithNonFlowingFetcher( + fetcher: suspend (key: Key) -> Input + ) = Builder { key: Key -> + flow { + emit(fetcher(key)) + } + } + + fun beginWithFlowingFetcher( + fetcher: (key: Key) -> Flow + ) = Builder(fetcher) + } + + class Builder( + private val fetcher: (key: Key) -> Flow + ) { + private var scope: CoroutineScope? = null + private var sourceOfTruth: SourceOfTruth? = null + private var cachePolicy: MemoryPolicy? = StoreDefaults.memoryPolicy + + fun scope(scope: CoroutineScope): Builder { + this.scope = scope + return this + } + + fun nonFlowingPersister( + reader: suspend (Key) -> Output?, + writer: suspend (Key, Input) -> Unit, + delete: (suspend (Key) -> Unit)? = null + ): Builder { + sourceOfTruth = PersistentNonFlowingSourceOfTruth( + realReader = reader, + realWriter = writer, + realDelete = delete + ) + return this + } + + fun persister( + reader: (Key) -> Flow, + writer: suspend (Key, Input) -> Unit, + delete: (suspend (Key) -> Unit)? = null + ): Builder { + sourceOfTruth = PersistentSourceOfTruth( + realReader = reader, + realWriter = writer, + realDelete = delete + ) + return this + } + + fun sourceOfTruth( + sourceOfTruth: SourceOfTruth + ): Builder { + this.sourceOfTruth = sourceOfTruth + return this + } + + fun cachePolicy(memoryPolicy: MemoryPolicy?): Builder { + cachePolicy = memoryPolicy + return this + } + + fun disableCache(): Builder { + cachePolicy = null + return this + } + + fun build(): RealInternalCoroutineStore { + @Suppress("UNCHECKED_CAST") + return RealInternalCoroutineStore( + scope = scope ?: GlobalScope, + sourceOfTruth = sourceOfTruth, + fetcher = fetcher, + memoryPolicy = cachePolicy + ) + } + } +} diff --git a/store/src/main/java/com/nytimes/android/external/store4/RefCountedResource.kt b/store/src/main/java/com/nytimes/android/external/store4/RefCountedResource.kt new file mode 100644 index 0000000..80d74e9 --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/RefCountedResource.kt @@ -0,0 +1,45 @@ +package com.nytimes.android.external.store4 + +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +/** + * Simple holder that can ref-count items by a given key. + */ +internal class RefCountedResource( + private val create: suspend (Key) -> T, + private val onRelease : (suspend (Key, T) -> Unit)? = null +) { + private val items = mutableMapOf() + private val lock = Mutex() + + suspend fun acquire(key: Key) : T = lock.withLock { + items.getOrPut(key) { + Item(create(key)) + }.also { + it.refCount ++ + }.value + } + + suspend fun release(key: Key, value : T) = lock.withLock { + val existing = items[key] + check(existing != null && existing.value === value) { + "inconsistent release, seems like $value was leaked or never acquired" + } + existing.refCount -- + if (existing.refCount < 1) { + items.remove(key) + onRelease?.invoke(key, value) + } + } + + // used in tests + suspend fun size() = lock.withLock { + items.size + } + + private inner class Item( + val value: T, + var refCount: Int = 0 + ) +} diff --git a/store/src/main/java/com/nytimes/android/external/store4/SourceOfTruth.kt b/store/src/main/java/com/nytimes/android/external/store4/SourceOfTruth.kt new file mode 100644 index 0000000..d02ba24 --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/SourceOfTruth.kt @@ -0,0 +1,90 @@ +package com.nytimes.android.external.store4 + +import com.nytimes.android.external.store3.base.Clearable +import com.nytimes.android.external.store3.base.Persister +import com.nytimes.android.external.store3.pipeline.ResponseOrigin +import com.nytimes.android.external.store3.util.KeyParser +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow + +/** + * Source of truth takes care of making any source (no matter if it has flowing reads or not) into + * a common flowing API. Used w/ a [SourceOfTruthWithBarrier] in front of it in the + * [RealInternalCoroutineStore] implementation to avoid dispatching values to downstream while + * a write is in progress. + */ +internal interface SourceOfTruth { + val defaultOrigin: ResponseOrigin + fun reader(key: Key): Flow + suspend fun write(key: Key, value: Input) + suspend fun delete(key: Key) + // for testing + suspend fun getSize(): Int + + companion object { + fun fromLegacy( + persister: Persister, + // parser that runs after get from db + postParser: KeyParser? = null + ): SourceOfTruth { + return PersistentSourceOfTruth( + realReader = { key -> + flow { + if (postParser == null) { + emit(persister.read(key)) + } else { + persister.read(key)?.let { + val postParsed = postParser.apply(key, it) + emit(postParsed) + } ?: emit(null) + } + } + }, + realWriter = { key, value -> + persister.write(key, value) + }, + realDelete = { key -> + (persister as? Clearable)?.clear(key) + } + ) + } + } +} + +internal class PersistentSourceOfTruth( + private val realReader: (Key) -> Flow, + private val realWriter: suspend (Key, Input) -> Unit, + private val realDelete: (suspend (Key) -> Unit)? = null +) : SourceOfTruth { + override val defaultOrigin = ResponseOrigin.Persister + override fun reader(key: Key): Flow = realReader(key) + override suspend fun write(key: Key, value: Input) = realWriter(key, value) + override suspend fun delete(key: Key) { + realDelete?.invoke(key) + } + + // for testing + override suspend fun getSize(): Int { + throw UnsupportedOperationException("not supported for persistent") + } +} + +internal class PersistentNonFlowingSourceOfTruth( + private val realReader: suspend (Key) -> Output?, + private val realWriter: suspend (Key, Input) -> Unit, + private val realDelete: (suspend (Key) -> Unit)? = null +) : SourceOfTruth { + override val defaultOrigin = ResponseOrigin.Persister + override fun reader(key: Key): Flow = flow { + emit(realReader(key)) + } + override suspend fun write(key: Key, value: Input) = realWriter(key, value) + override suspend fun delete(key: Key) { + realDelete?.invoke(key) + } + + // for testing + override suspend fun getSize(): Int { + throw UnsupportedOperationException("not supported for persistent") + } +} diff --git a/store/src/main/java/com/nytimes/android/external/store4/SourceOfTruthWithBarrier.kt b/store/src/main/java/com/nytimes/android/external/store4/SourceOfTruthWithBarrier.kt new file mode 100644 index 0000000..c378836 --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/SourceOfTruthWithBarrier.kt @@ -0,0 +1,122 @@ +package com.nytimes.android.external.store4 + +import com.nytimes.android.external.store3.pipeline.ResponseOrigin +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.channels.ConflatedBroadcastChannel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.collectIndexed +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import java.util.concurrent.atomic.AtomicLong + +/** + * Wraps a [SourceOfTruth] and blocks reads while a write is in progress. + */ +@FlowPreview +@ExperimentalCoroutinesApi +internal class SourceOfTruthWithBarrier( + private val delegate: SourceOfTruth +) { + /** + * Each key has a barrier so that we can block reads while writing. + */ + private val barriers = RefCountedResource>( + create = { key -> + ConflatedBroadcastChannel(BarrierMsg.Open.INITIAL) + } + ) + /** + * Each message gets dispatched with a version. This ensures we won't accidentally turn on the + * reader flow for a new reader that happens to have arrived while a write is in progress since + * that write should be considered as a disk read for that flow, not fetcher. + */ + private val versionCounter = AtomicLong(0) + + + fun reader(key: Key, lock: CompletableDeferred): Flow> { + return flow { + val barrier = barriers.acquire(key) + val readerVersion: Long = versionCounter.incrementAndGet() + try { + lock.await() + emitAll(barrier.asFlow() + .flatMapLatest { + val messageArrivedAfterMe = readerVersion < it.version + when (it) { + is BarrierMsg.Open -> delegate.reader(key).mapIndexed { index, output -> + if (index == 0 && messageArrivedAfterMe) { + DataWithOrigin( + origin = ResponseOrigin.Fetcher, + value = output + ) + } else { + DataWithOrigin( + origin = delegate.defaultOrigin, + value = output + ) + } + } + is BarrierMsg.Blocked -> { + flowOf() + } + } + }) + } finally { + // we are using a finally here instead of onCompletion as there might be a + // possibility where flow gets cancelled right before `emitAll`. + barriers.release(key, barrier) + } + + } + } + + suspend fun write(key: Key, value: Input) { + val barrier = barriers.acquire(key) + try { + barrier.send(BarrierMsg.Blocked(versionCounter.incrementAndGet())) + delegate.write(key, value) + barrier.send(BarrierMsg.Open(versionCounter.incrementAndGet())) + } finally { + barriers.release(key, barrier) + } + } + + suspend fun delete(key: Key) { + delegate.delete(key) + } + + private sealed class BarrierMsg( + val version: Long + ) { + class Blocked(version: Long) : BarrierMsg(version) + class Open(version: Long) : BarrierMsg(version) { + companion object { + val INITIAL = Open(INITIAL_VERSION) + } + } + } + + // visible for testing + internal suspend fun barrierCount() = barriers.size() + + companion object { + private const val INITIAL_VERSION = -1L + } +} + +@ExperimentalCoroutinesApi +private inline fun Flow.mapIndexed(crossinline block: (Int, T) -> R) = flow { + this@mapIndexed.collectIndexed { index, value -> + emit(block(index, value)) + } +} + +internal data class DataWithOrigin( + val origin: ResponseOrigin, + val value: T? +) diff --git a/store/src/main/java/com/nytimes/android/external/store4/multiplex/ChannelManager.kt b/store/src/main/java/com/nytimes/android/external/store4/multiplex/ChannelManager.kt new file mode 100644 index 0000000..8b43ea6 --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/multiplex/ChannelManager.kt @@ -0,0 +1,321 @@ +package com.nytimes.android.external.store4.multiplex + +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import java.util.ArrayDeque +import java.util.Collections + +/** + * This actor helps tracking active channels and is able to dispatch values to each of them + * in parallel. As soon as one of them receives the value, the ack in the dispatch message is + * completed so that the sender can continue for the next item. + */ +@ExperimentalCoroutinesApi +class ChannelManager( + /** + * The scope in which ChannelManager actor runs + */ + scope: CoroutineScope, + /** + * The buffer size that is used while the upstream is active + */ + bufferSize: Int, + private val onEach: suspend (T) -> Unit, + /** + * Called when the channel manager is active (e.g. it has downstream collectors and needs a + * producer) + */ + private val onActive: (ChannelManager) -> SharedFlowProducer +) : StoreRealActor>(scope) { + private val buffer = Buffer(bufferSize) + /** + * The current producer + */ + private var producer: SharedFlowProducer? = null + + /** + * Tracks whether we've ever dispatched value or error from the current producer. + * Reset when producer finishes. + */ + private var dispatchedValue: Boolean = false + + /** + * List of downstream collectors. + */ + private val channels = mutableListOf>() + + override suspend fun handle(msg: Message) { + when (msg) { + is Message.AddLeftovers -> doAddLefovers(msg.leftovers) + is Message.AddChannel -> doAdd(msg) + is Message.RemoveChannel -> doRemove(msg.channel) + is Message.DispatchValue -> doDispatchValue(msg) + is Message.DispatchError -> doDispatchError(msg) + is Message.UpstreamFinished -> doHandleUpstreamClose(msg.producer) + } + } + + /** + * We are closing. Do a cleanup on existing channels where we'll close them and also decide + * on the list of leftovers. + */ + fun doHandleUpstreamClose(producer: SharedFlowProducer?) { + if (this.producer !== producer) { + return + } + val leftovers = mutableListOf>() + channels.forEach { + when { + it.receivedValue -> it.close() + dispatchedValue -> + // we dispatched a value but this channel didn't receive so put it into + // leftovers + leftovers.add(it) + else -> // upstream didn't dispatch + it.close() + } + } + channels.clear() // empty references + channels.addAll(leftovers) + this.producer = null + if (leftovers.isNotEmpty()) { + activateIfNecessary(true) + } + } + + override fun onClosed() { + producer?.cancel() + } + + /** + * Dispatch value to all downstream collectors. + */ + private suspend fun doDispatchValue(msg: Message.DispatchValue) { + onEach(msg.value) + buffer.add(msg) + dispatchedValue = true + channels.forEach { + it.dispatchValue(msg) + } + } + + /** + * Dispatch an upstream error to downstream collectors. + */ + private fun doDispatchError(msg: Message.DispatchError) { + // dispatching error is as good as dispatching value + dispatchedValue = true + channels.forEach { + it.dispatchError(msg.error) + } + } + + /** + * Remove a downstream collector. + */ + private suspend fun doRemove(channel: Channel>) { + val index = channels.indexOfFirst { + it.hasChannel(channel) + } + if (index >= 0) { + channels.removeAt(index) + if (channels.isEmpty()) { + producer?.cancelAndJoin() + } + } + } + + /** + * We've received some leftovers from the previous [ChannelManager]. Add them to our list. + */ + private suspend fun doAddLefovers(leftovers: List>) { + val wasEmpty = channels.isEmpty() + leftovers.forEach { channelEntry -> + addEntry( + entry = channelEntry.copy(_receivedValue = false) + ) + } + activateIfNecessary(wasEmpty) + } + + /** + * Add a new downstream collector + */ + private suspend fun doAdd(msg: Message.AddChannel) { + val wasEmpty = channels.isEmpty() + + addEntry( + entry = ChannelEntry( + channel = msg.channel + ) + ) + activateIfNecessary(wasEmpty) + } + + private fun activateIfNecessary(wasEmpty: Boolean) { + if (producer == null && wasEmpty) { + producer = onActive(this) + dispatchedValue = false + producer!!.start() + } + } + + /** + * Internally add the new downstream collector to our list, send it anything buffered. + */ + private suspend fun addEntry(entry: ChannelEntry) { + val new = channels.none { + it.hasChannel(entry) + } + check(new) { + "$entry is already in the list." + } + check(!entry.receivedValue) { + "$entry already received a value" + } + channels.add(entry) + if (buffer.items.isNotEmpty()) { + // if there is anything in the buffer, send it + buffer.items.forEach { + entry.dispatchValue(it) + } + } + } + + /** + * Holder for each downstream collector + */ + internal data class ChannelEntry( + /** + * The channel used by the collector + */ + private val channel: Channel>, + /** + * Tracking whether we've ever dispatched a value or an error to downstream + */ + private var _receivedValue: Boolean = false + ) { + val receivedValue + get() = _receivedValue + + suspend fun dispatchValue(value: Message.DispatchValue) { + _receivedValue = true + channel.send(value) + } + + fun dispatchError(error: Throwable) { + _receivedValue = true + channel.close(error) + } + + fun close() { + channel.close() + } + + fun hasChannel(channel: Channel>) = this.channel === channel + + fun hasChannel(entry: ChannelEntry) = this.channel === entry.channel + } + + /** + * Messages accepted by the [ChannelManager]. + */ + sealed class Message { + /** + * Add a new channel, that means a new downstream subscriber + */ + class AddChannel( + val channel: Channel> + ) : Message() + + /** + * Add multiple channels. Happens when we are carrying over leftovers from a previous + * manager + */ + internal class AddLeftovers(val leftovers: List>) : + Message() + + /** + * Remove a downstream subscriber, that means it completed + */ + class RemoveChannel(val channel: Channel>) : Message() + + /** + * Upstream dispatched a new value, send it to all downstream items + */ + class DispatchValue( + /** + * The value dispatched by the upstream + */ + val value: T, + /** + * Ack that is completed by all receiver. Upstream producer will await this before asking + * for a new value from upstream + */ + val delivered: CompletableDeferred + ) : Message() + + /** + * Upstream dispatched an error, send it to all downstream items + */ + class DispatchError( + /** + * The error sent by the upstream + */ + val error: Throwable + ) : Message() + + class UpstreamFinished( + /** + * SharedFlowProducer finished emitting + */ + val producer: SharedFlowProducer + ) : Message() + } + + /** + * Buffer implementation for any late arrivals. + */ + private interface Buffer { + fun add(item: Message.DispatchValue) + val items: Collection> + } + + /** + * Default implementation of buffer which does not buffer anything. + */ + private class NoBuffer : Buffer { + override val items: Collection> + get() = Collections.emptyList() + + + override fun add(item: Message.DispatchValue) { + // ignore + } + } + + /** + * Create a new buffer insteance based on the provided limit. + */ + private fun Buffer(limit: Int): Buffer = if (limit > 0) { + BufferImpl(limit) + } else { + NoBuffer() + } + + /** + * A real buffer implementation that has a FIFO queue. + */ + private class BufferImpl(private val limit: Int) : + Buffer { + override val items = ArrayDeque>(limit.coerceAtMost(10)) + override fun add(item: Message.DispatchValue) { + while (items.size >= limit) { + items.pollFirst() + } + items.offerLast(item) + } + } +} diff --git a/store/src/main/java/com/nytimes/android/external/store4/multiplex/Multiplexer.kt b/store/src/main/java/com/nytimes/android/external/store4/multiplex/Multiplexer.kt new file mode 100644 index 0000000..3df0ad3 --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/multiplex/Multiplexer.kt @@ -0,0 +1,81 @@ +package com.nytimes.android.external.store4.multiplex + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.transform + +/** + * Like a publish, shares 1 upstream value with multiple downstream receiver. + * It has one store specific behavior where upstream flow is suspended until at least 1 downstream + * flow emits the value to ensure we don't abuse the upstream flow of downstream cannot keep up. + */ +@FlowPreview +@ExperimentalCoroutinesApi +internal class Multiplexer( + /** + * The [CoroutineScope] to use for upstream subscription + */ + private val scope: CoroutineScope, + /** + * The buffer size that is used only if the upstream has not complete yet. + * Defaults to 0. + */ + bufferSize: Int = 0, + /** + * Source function to create a new flow when necessary. + */ + // TODO does this have to be a method or just a flow ? Will decide when actual implementation + // happens + private val source: () -> Flow, + /** + * Called when upstream dispatches a value. + */ + private val onEach: suspend (T) -> Unit +) { + + private val channelManager by lazy(LazyThreadSafetyMode.SYNCHRONIZED) { + ChannelManager( + scope = scope, + bufferSize = bufferSize, + onActive = { + SharedFlowProducer( + scope = scope, + src = source(), + channelManager = it + ) + }, + onEach = onEach + ) + } + + fun create(): Flow { + val channel = Channel>(Channel.UNLIMITED) + return channel.consumeAsFlow() + .onStart { + channelManager.send( + ChannelManager.Message.AddChannel( + channel + )) + } + .transform { + emit(it.value) + it.delivered.complete(Unit) + }.onCompletion { + channelManager.send( + ChannelManager.Message.RemoveChannel( + channel + ) + ) + } + } + + fun close() { + channelManager.close() + } +} \ No newline at end of file diff --git a/store/src/main/java/com/nytimes/android/external/store4/multiplex/SharedFlowProducer.kt b/store/src/main/java/com/nytimes/android/external/store4/multiplex/SharedFlowProducer.kt new file mode 100644 index 0000000..9b3c18e --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/multiplex/SharedFlowProducer.kt @@ -0,0 +1,88 @@ +package com.nytimes.android.external.store4.multiplex + +import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.DispatchError +import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.DispatchValue +import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.UpstreamFinished +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.channels.ClosedSendChannelException +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.launch +import java.nio.channels.ClosedChannelException + +/** + * A flow collector that works with a [ChannelManager] to collect values from an upstream flow + * and dispatch to the [ChannelManager] which then dispatches to downstream collectors. + * + * They work in sync such that this producer always expects an ack from the [ChannelManager] after + * sending an event. + * + * Cancellation of the collection might be triggered by both this producer (e.g. upstream completes) + * or the [ChannelManager] (e.g. all active collectors complete). + */ +@ExperimentalCoroutinesApi +class SharedFlowProducer( + private val scope: CoroutineScope, + private val src: Flow, + private val channelManager: ChannelManager +) { + private lateinit var collectionJob: Job + + /** + * Starts the collection of the upstream flow. + */ + fun start() { + scope.launch { + try { + // launch again to track the collection job + collectionJob = scope.launch { + try { + src.catch { + channelManager.send( + DispatchError( + it + ) + ) + }.collect { + val ack = CompletableDeferred() + channelManager.send( + DispatchValue( + it, + ack + ) + ) + // suspend until at least 1 receives the new value + ack.await() + } + } catch (closed: ClosedSendChannelException) { + // ignore. if consumers are gone, it might close itself. + } + } + // wait until collection ends, either due to an error or ordered by the channel + // manager + collectionJob.join() + } finally { + // cleanup the channel manager so that downstreams can be closed if they are not + // closed already and leftovers can be moved to a new producer if necessary. + try { + channelManager.send(UpstreamFinished(this@SharedFlowProducer)) + } catch (closed : ClosedSendChannelException) { + // it might close before us, its fine. + } + } + } + } + + suspend fun cancelAndJoin() { + collectionJob.cancelAndJoin() + } + + fun cancel() { + collectionJob.cancel() + } +} diff --git a/store/src/main/java/com/nytimes/android/external/store4/multiplex/StoreRealActor.kt b/store/src/main/java/com/nytimes/android/external/store4/multiplex/StoreRealActor.kt new file mode 100644 index 0000000..2f176da --- /dev/null +++ b/store/src/main/java/com/nytimes/android/external/store4/multiplex/StoreRealActor.kt @@ -0,0 +1,46 @@ +package com.nytimes.android.external.store4.multiplex + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.channels.actor + +/** + * Simple actor implementation abstracting away Coroutine.actor since it is deprecated. + * It also enforces a 0 capacity buffer. + */ +@Suppress("EXPERIMENTAL_API_USAGE") +@ExperimentalCoroutinesApi +abstract class StoreRealActor( + scope: CoroutineScope +) { + val inboundChannel: SendChannel + + init { + inboundChannel = scope.actor( + capacity = 0 + ) { + channel.invokeOnClose { + onClosed() + } + for (msg in channel) { + handle(msg) + } + } + } + + open fun onClosed() { + + } + + abstract suspend fun handle(msg: T) + + suspend fun send(msg: T) { + inboundChannel.send(msg) + } + + fun close() { + inboundChannel.close() + } +} diff --git a/store/src/test/java/com/nytimes/android/external/store3/ClearStoreMemoryTest.kt b/store/src/test/java/com/nytimes/android/external/store3/ClearStoreMemoryTest.kt index 6d90ffc..26d338c 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/ClearStoreMemoryTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/ClearStoreMemoryTest.kt @@ -12,12 +12,12 @@ import org.junit.runners.Parameterized @ExperimentalCoroutinesApi @RunWith(Parameterized::class) class ClearStoreMemoryTest( - storeType : TestStoreType + storeType: TestStoreType ) { private val testScope = TestCoroutineScope() private var networkCalls = 0 - private val store = TestStoreBuilder.from { - networkCalls ++ + private val store = TestStoreBuilder.from(testScope) { + networkCalls++ }.build(storeType) @Test diff --git a/store/src/test/java/com/nytimes/android/external/store3/ClearStoreTest.kt b/store/src/test/java/com/nytimes/android/external/store3/ClearStoreTest.kt index 089411e..72fd74c 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/ClearStoreTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/ClearStoreTest.kt @@ -14,7 +14,7 @@ import java.util.concurrent.atomic.AtomicInteger @RunWith(Parameterized::class) class ClearStoreTest( - storeType: TestStoreType + storeType: TestStoreType ) { private val testScope = TestCoroutineScope() @@ -22,10 +22,11 @@ class ClearStoreTest( private val networkCalls = AtomicInteger(0) private val store = TestStoreBuilder.from( - fetcher = { - networkCalls.incrementAndGet() - }, - persister = persister + scope = testScope, + fetcher = { + networkCalls.incrementAndGet() + }, + persister = persister ).build(storeType) @Test @@ -34,10 +35,10 @@ class ClearStoreTest( val barcode = BarCode("type", "key") whenever(persister.read(barcode)) - .thenReturn(null) //read from disk on get - .thenReturn(1) //read from disk after fetching from network - .thenReturn(null) //read from disk after clearing - .thenReturn(1) //read from disk after making additional network call + .thenReturn(null) //read from disk on get + .thenReturn(1) //read from disk after fetching from network + .thenReturn(null) //read from disk after clearing + .thenReturn(1) //read from disk after making additional network call whenever(persister.write(barcode, 1)).thenReturn(true) whenever(persister.write(barcode, 2)).thenReturn(true) @@ -57,18 +58,18 @@ class ClearStoreTest( val barcode2 = BarCode("type2", "key2") whenever(persister.read(barcode1)) - .thenReturn(null) //read from disk - .thenReturn(1) //read from disk after fetching from network - .thenReturn(null) //read from disk after clearing disk cache - .thenReturn(1) //read from disk after making additional network call + .thenReturn(null) //read from disk + .thenReturn(1) //read from disk after fetching from network + .thenReturn(null) //read from disk after clearing disk cache + .thenReturn(1) //read from disk after making additional network call whenever(persister.write(barcode1, 1)).thenReturn(true) whenever(persister.write(barcode1, 2)).thenReturn(true) whenever(persister.read(barcode2)) - .thenReturn(null) //read from disk - .thenReturn(1) //read from disk after fetching from network - .thenReturn(null) //read from disk after clearing disk cache - .thenReturn(1) //read from disk after making additional network call + .thenReturn(null) //read from disk + .thenReturn(1) //read from disk after fetching from network + .thenReturn(null) //read from disk after clearing disk cache + .thenReturn(1) //read from disk after making additional network call whenever(persister.write(barcode2, 1)).thenReturn(true) whenever(persister.write(barcode2, 2)).thenReturn(true) diff --git a/store/src/test/java/com/nytimes/android/external/store3/DontCacheErrorsTest1.kt b/store/src/test/java/com/nytimes/android/external/store3/DontCacheErrorsTest1.kt index b43d8a6..6286f5b 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/DontCacheErrorsTest1.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/DontCacheErrorsTest1.kt @@ -2,18 +2,24 @@ package com.nytimes.android.external.store3 import com.nytimes.android.external.store3.base.impl.BarCode import junit.framework.Assert.fail +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized +import kotlin.coroutines.EmptyCoroutineContext @RunWith(Parameterized::class) class DontCacheErrorsTest( storeType: TestStoreType ) { - + private val testScope = TestCoroutineScope() private var shouldThrow: Boolean = false - private val store = TestStoreBuilder.from { + // TODO move to test coroutine scope + private val store = TestStoreBuilder.from(testScope) { if (shouldThrow) { throw RuntimeException() } else { @@ -22,7 +28,7 @@ class DontCacheErrorsTest( }.build(storeType) @Test - fun testStoreDoesntCacheErrors() = runBlocking { + fun testStoreDoesntCacheErrors() = testScope.runBlockingTest { val barcode = BarCode("bar", "code") shouldThrow = true diff --git a/store/src/test/java/com/nytimes/android/external/store3/KeyParserTest.kt b/store/src/test/java/com/nytimes/android/external/store3/KeyParserTest.kt index 227103c..7b6aa15 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/KeyParserTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/KeyParserTest.kt @@ -10,19 +10,20 @@ import org.junit.runners.Parameterized @RunWith(Parameterized::class) class KeyParserTest( - storeType: TestStoreType + storeType: TestStoreType ) { private val testScope = TestCoroutineScope() private val store = TestStoreBuilder.from( - fetcher = { - NETWORK - }, - fetchParser = object : KeyParser { - override suspend fun apply(key: Int, raw: String): String { - return raw + key - } + scope = testScope, + fetcher = { + NETWORK + }, + fetchParser = object : KeyParser { + override suspend fun apply(key: Int, raw: String): String { + return raw + key } + } ).build(storeType) @Test diff --git a/store/src/test/java/com/nytimes/android/external/store3/NoNetworkTest.kt b/store/src/test/java/com/nytimes/android/external/store3/NoNetworkTest.kt index 97514ce..664bd64 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/NoNetworkTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/NoNetworkTest.kt @@ -14,10 +14,11 @@ import org.junit.runners.Parameterized class NoNetworkTest( storeType: TestStoreType ) { - private val store: Store = TestStoreBuilder.from { + private val testScope = TestCoroutineScope() + private val store: Store = TestStoreBuilder.from(testScope) { throw EXCEPTION }.build(storeType) - private val testScope = TestCoroutineScope() + @Test fun testNoNetwork() = testScope.runBlockingTest { diff --git a/store/src/test/java/com/nytimes/android/external/store3/ParsingFetcherTest.kt b/store/src/test/java/com/nytimes/android/external/store3/ParsingFetcherTest.kt index d7b7698..66a58bc 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/ParsingFetcherTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/ParsingFetcherTest.kt @@ -19,7 +19,7 @@ import org.mockito.Mockito.verify @ExperimentalCoroutinesApi @RunWith(Parameterized::class) class ParsingFetcherTest( - private val storeType: TestStoreType + private val storeType: TestStoreType ) { private val testScope = TestCoroutineScope() private val fetcher: Fetcher = mock() @@ -30,22 +30,23 @@ class ParsingFetcherTest( @Test fun testPersistFetcher() = testScope.runBlockingTest { val simpleStore = TestStoreBuilder.from( - fetcher = fetcher, - fetchParser = parser, - persister = persister + scope = testScope, + fetcher = fetcher, + fetchParser = parser, + persister = persister ).build(storeType) whenever(fetcher.fetch(barCode)) - .thenReturn(RAW_DATA) + .thenReturn(RAW_DATA) whenever(parser.apply(RAW_DATA)) - .thenReturn(PARSED) + .thenReturn(PARSED) whenever(persister.read(barCode)) - .thenReturn(PARSED) + .thenReturn(PARSED) whenever(persister.write(barCode, PARSED)) - .thenReturn(true) + .thenReturn(true) val value = simpleStore.fresh(barCode) diff --git a/store/src/test/java/com/nytimes/android/external/store3/SequentialTest.kt b/store/src/test/java/com/nytimes/android/external/store3/SequentialTest.kt index 25ab863..eee96b3 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/SequentialTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/SequentialTest.kt @@ -17,12 +17,16 @@ import kotlin.random.Random @RunWith(Parameterized::class) class SequentialTes( - storeType: TestStoreType) { + storeType: TestStoreType +) { private val testScope = TestCoroutineScope() var networkCalls = 0 - private val store = TestStoreBuilder.from(cached = true) { - networkCalls ++ + private val store = TestStoreBuilder.from( + scope = testScope, + cached = true + ) { + networkCalls++ }.build(storeType) @Test @@ -58,8 +62,8 @@ class SequentialTes( val store: Store = Store.from { it + Random.nextInt() } with { parser { it.toString() } - .persister(persister) - .cache() + .persister(persister) + .cache() } val v1 = store.get(4) diff --git a/store/src/test/java/com/nytimes/android/external/store3/StoreTest.kt b/store/src/test/java/com/nytimes/android/external/store3/StoreTest.kt index dd5410b..72c20c2 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/StoreTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/StoreTest.kt @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger @ExperimentalCoroutinesApi @RunWith(Parameterized::class) class StoreTest( - private val storeType: TestStoreType + private val storeType: TestStoreType ) { private val testScope = TestCoroutineScope() private val counter = AtomicInteger(0) @@ -41,19 +41,20 @@ class StoreTest( @Test fun testSimple() = testScope.runBlockingTest { val simpleStore = TestStoreBuilder.from( - fetcher = fetcher, - persister = persister + scope = testScope, + fetcher = fetcher, + persister = persister ).build(storeType) whenever(fetcher.fetch(barCode)) - .thenReturn(NETWORK) + .thenReturn(NETWORK) whenever(persister.read(barCode)) - .thenReturn(null) - .thenReturn(DISK) + .thenReturn(null) + .thenReturn(DISK) whenever(persister.write(barCode, NETWORK)) - .thenReturn(true) + .thenReturn(true) var value = simpleStore.get(barCode) @@ -66,24 +67,25 @@ class StoreTest( @Test fun testDoubleTap() = testScope.runBlockingTest { val simpleStore = TestStoreBuilder.from( - fetcher = fetcher, - persister = persister + scope = testScope, + fetcher = fetcher, + persister = persister ).build(storeType) whenever(fetcher.fetch(barCode)) - .thenAnswer { - if (counter.incrementAndGet() == 1) { - NETWORK - } else { - throw RuntimeException("Yo Dawg your inflight is broken") - } + .thenAnswer { + if (counter.incrementAndGet() == 1) { + NETWORK + } else { + throw RuntimeException("Yo Dawg your inflight is broken") } + } whenever(persister.read(barCode)) - .thenReturn(null) - .thenReturn(DISK) + .thenReturn(null) + .thenReturn(DISK) whenever(persister.write(barCode, NETWORK)) - .thenReturn(true) + .thenReturn(true) val deferred = async { simpleStore.get(barCode) } @@ -97,21 +99,22 @@ class StoreTest( fun testSubclass() = testScope.runBlockingTest { val simpleStore = TestStoreBuilder.from( - fetcher = fetcher, - persister = persister + scope = testScope, + fetcher = fetcher, + persister = persister ).build(storeType) simpleStore.clear(barCode) whenever(fetcher.fetch(barCode)) - .thenReturn(NETWORK) + .thenReturn(NETWORK) whenever(persister.read(barCode)) - .thenReturn(null) - .thenReturn(DISK) + .thenReturn(null) + .thenReturn(DISK) whenever(persister.write(barCode, NETWORK)).thenReturn(true) - var value = simpleStore.get(barCode) + var value = simpleStore.get(barCode) assertThat(value).isEqualTo(DISK) value = simpleStore.get(barCode) assertThat(value).isEqualTo(DISK) @@ -122,13 +125,14 @@ class StoreTest( fun testNoopAndDefault() = testScope.runBlockingTest { val persister = spy(NoopPersister.create()) val simpleStore = TestStoreBuilder.from( - fetcher = fetcher, - persister = persister, - cached = true + scope = testScope, + fetcher = fetcher, + persister = persister, + cached = true ).build(storeType) whenever(fetcher.fetch(barCode)) - .thenReturn(NETWORK) + .thenReturn(NETWORK) var value = simpleStore.get(barCode) verify(fetcher, times(1)).fetch(barCode) @@ -148,9 +152,9 @@ class StoreTest( @Test fun testEquivalence() = testScope.runBlockingTest { val cache = CacheBuilder.newBuilder() - .maximumSize(1) - .expireAfterAccess(java.lang.Long.MAX_VALUE, TimeUnit.SECONDS) - .build() + .maximumSize(1) + .expireAfterAccess(java.lang.Long.MAX_VALUE, TimeUnit.SECONDS) + .build() cache.put(barCode, MEMORY) var value = cache.getIfPresent(barCode) @@ -163,9 +167,10 @@ class StoreTest( @Test fun testFreshUsesOnlyNetwork() = testScope.runBlockingTest { val simpleStore = TestStoreBuilder.from( - fetcher = fetcher, - persister = persister, - persisterStalePolicy = StalePolicy.NETWORK_BEFORE_STALE + scope = testScope, + fetcher = fetcher, + persister = persister, + persisterStalePolicy = StalePolicy.NETWORK_BEFORE_STALE ).build(storeType) whenever(fetcher.fetch(barCode)) doThrow RuntimeException(ERROR) diff --git a/store/src/test/java/com/nytimes/android/external/store3/StoreThrowOnNoItems.kt b/store/src/test/java/com/nytimes/android/external/store3/StoreThrowOnNoItems.kt index 1d71402..5b04b93 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/StoreThrowOnNoItems.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/StoreThrowOnNoItems.kt @@ -18,7 +18,7 @@ import java.util.concurrent.atomic.AtomicInteger @ExperimentalCoroutinesApi @RunWith(Parameterized::class) class StoreThrowOnNoItems( - private val storeType: TestStoreType + private val storeType: TestStoreType ) { private val testScope = TestCoroutineScope() private val counter = AtomicInteger(0) @@ -29,15 +29,16 @@ class StoreThrowOnNoItems( @Test fun testShouldThrowOnFetcherEmitsNoSuckElementException() = testScope.runBlockingTest { val simpleStore = TestStoreBuilder.from( - fetcher = fetcher + scope = testScope, + fetcher = fetcher ).build(storeType) whenever(fetcher.fetch(barCode)) - .thenThrow(NoSuchElementException()) + .thenThrow(NoSuchElementException()) try { - simpleStore.get(barCode) - fail("exception not thrown when no items emitted from fetcher") + val unexpected = simpleStore.get(barCode) + fail("exception not thrown when no items emitted from fetcher $unexpected") } catch (e: NoSuchElementException) { assertThat(e).isInstanceOf(NoSuchElementException::class.java) } diff --git a/store/src/test/java/com/nytimes/android/external/store3/StoreWithParserTest.kt b/store/src/test/java/com/nytimes/android/external/store3/StoreWithParserTest.kt index 07a298e..8f47f39 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/StoreWithParserTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/StoreWithParserTest.kt @@ -19,7 +19,7 @@ import org.mockito.Mockito.verify @ExperimentalCoroutinesApi @RunWith(Parameterized::class) class StoreWithParserTest( - private val storeType: TestStoreType + private val storeType: TestStoreType ) { private val testScope = TestCoroutineScope() private val fetcher: Fetcher = mock() @@ -31,20 +31,21 @@ class StoreWithParserTest( @Test fun testSimple() = testScope.runBlockingTest { val simpleStore = TestStoreBuilder.fromPostParser( - fetcher = fetcher, - persister = persister, - postParser = parser + scope = testScope, + fetcher = fetcher, + persister = persister, + postParser = parser ).build(storeType) whenever(fetcher.fetch(barCode)) - .thenReturn(NETWORK) + .thenReturn(NETWORK) whenever(persister.read(barCode)) - .thenReturn(null) - .thenReturn(DISK) + .thenReturn(null) + .thenReturn(DISK) whenever(persister.write(barCode, NETWORK)) - .thenReturn(true) + .thenReturn(true) whenever(parser.apply(DISK)).thenReturn(barCode.key) @@ -58,19 +59,20 @@ class StoreWithParserTest( @Test fun testSubclass() = testScope.runBlockingTest { val simpleStore = TestStoreBuilder.fromPostParser( - fetcher = fetcher, - persister = persister, - postParser = parser + scope = testScope, + fetcher = fetcher, + persister = persister, + postParser = parser ).build(storeType) whenever(fetcher.fetch(barCode)) - .thenReturn(NETWORK) + .thenReturn(NETWORK) whenever(persister.read(barCode)) - .thenReturn(null) - .thenReturn(DISK) + .thenReturn(null) + .thenReturn(DISK) whenever(persister.write(barCode, NETWORK)) - .thenReturn(true) + .thenReturn(true) whenever(parser.apply(DISK)).thenReturn(barCode.key) diff --git a/store/src/test/java/com/nytimes/android/external/store3/StreamOneKeyTest.kt b/store/src/test/java/com/nytimes/android/external/store3/StreamOneKeyTest.kt index 271b04d..0384dac 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/StreamOneKeyTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/StreamOneKeyTest.kt @@ -17,50 +17,52 @@ import org.junit.runners.Parameterized @ExperimentalCoroutinesApi @RunWith(Parameterized::class) class StreamOneKeyTest( - private val storeType: TestStoreType + private val storeType: TestStoreType ) { val fetcher: Fetcher = mock() val persister: Persister = mock() private val barCode = BarCode("key", "value") private val barCode2 = BarCode("key2", "value2") + private val testScope = TestCoroutineScope() private val store = TestStoreBuilder.from( - fetcher = fetcher, - persister = persister + scope = testScope, + fetcher = fetcher, + persister = persister ).build(storeType) - private val testScope = TestCoroutineScope() + @Before fun setUp() = runBlockingTest { whenever(fetcher.fetch(barCode)) - .thenReturn(TEST_ITEM) - .thenReturn(TEST_ITEM2) + .thenReturn(TEST_ITEM) + .thenReturn(TEST_ITEM2) whenever(persister.read(barCode)) - .let { - // the backport stream method of Pipeline to Store does not skip disk so we - // make sure disk returns empty value first - if (storeType == TestStoreType.Pipeline) { - it.thenReturn(null) - } else { - it - } + .let { + // the backport stream method of Pipeline to Store does not skip disk so we + // make sure disk returns empty value first + if (storeType != TestStoreType.Store) { + it.thenReturn(null) + } else { + it } - .thenReturn(TEST_ITEM) - .thenReturn(TEST_ITEM2) + } + .thenReturn(TEST_ITEM) + .thenReturn(TEST_ITEM2) whenever(persister.write(barCode, TEST_ITEM)) - .thenReturn(true) + .thenReturn(true) whenever(persister.write(barCode, TEST_ITEM2)) - .thenReturn(true) + .thenReturn(true) } @Suppress("UsePropertyAccessSyntax") // for assert isTrue() isFalse() @Test - fun testStream() = runBlockingTest { + fun testStream() = testScope.runBlockingTest { val streamSubscription = store.stream(barCode) - .openChannelSubscription() + .openChannelSubscription() try { if (storeType == TestStoreType.Store) { //stream doesn't invoke get anymore so when we call it the channel is empty @@ -82,11 +84,11 @@ class StreamOneKeyTest( assertThat(streamSubscription.poll()).isEqualTo(TEST_ITEM) //get for another barcode should not trigger a stream for barcode1 whenever(fetcher.fetch(barCode2)) - .thenReturn(TEST_ITEM) + .thenReturn(TEST_ITEM) whenever(persister.read(barCode2)) - .thenReturn(TEST_ITEM) + .thenReturn(TEST_ITEM) whenever(persister.write(barCode2, TEST_ITEM)) - .thenReturn(true) + .thenReturn(true) store.get(barCode2) assertThat(streamSubscription.isEmpty).isTrue() diff --git a/store/src/test/java/com/nytimes/android/external/store3/StreamTest.kt b/store/src/test/java/com/nytimes/android/external/store3/StreamTest.kt index e662bbe..2ad3b3a 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/StreamTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/StreamTest.kt @@ -26,7 +26,7 @@ import org.junit.runners.Parameterized @ObsoleteCoroutinesApi @RunWith(Parameterized::class) class StreamTest( - private val storeType: TestStoreType + private val storeType: TestStoreType ) { private val testScope = TestCoroutineScope() private val fetcher: Fetcher = mock() @@ -35,8 +35,9 @@ class StreamTest( private val barCode = BarCode("key", "value") private val store = TestStoreBuilder.from( - fetcher = fetcher, - persister = persister + scope = testScope, + fetcher = fetcher, + persister = persister ).build(storeType) @Before @@ -44,17 +45,17 @@ class StreamTest( whenever(fetcher.fetch(barCode)).thenReturn(TEST_ITEM) whenever(persister.read(barCode)) - .thenReturn(null) - .thenReturn(TEST_ITEM) + .thenReturn(null) + .thenReturn(TEST_ITEM) whenever(persister.write(barCode, TEST_ITEM)) - .thenReturn(true) + .thenReturn(true) } @Suppress("UsePropertyAccessSyntax")// for isTrue() / isFalse() @Test fun testStream() = testScope.runBlockingTest { - if (storeType == TestStoreType.Pipeline) { + if (storeType != TestStoreType.Store) { throw AssumptionViolatedException("Pipeline store does not support stream() no arg") } val streamSubscription = store.stream().openChannelSubscription() @@ -70,7 +71,7 @@ class StreamTest( @Suppress("UsePropertyAccessSyntax")// for isTrue() / isFalse() @Test fun testStreamEmitsOnlyFreshData() = testScope.runBlockingTest { - if (storeType == TestStoreType.Pipeline) { + if (storeType != TestStoreType.Store) { throw AssumptionViolatedException("Pipeline store does not support stream() no arg") } store.get(barCode) @@ -93,4 +94,4 @@ class StreamTest( @ExperimentalCoroutinesApi fun Flow.openChannelSubscription() = - broadcastIn(GlobalScope + Dispatchers.Unconfined).openSubscription() \ No newline at end of file + broadcastIn(GlobalScope + Dispatchers.Unconfined).openSubscription() \ No newline at end of file diff --git a/store/src/test/java/com/nytimes/android/external/store3/TestStoreBuilder.kt b/store/src/test/java/com/nytimes/android/external/store3/TestStoreBuilder.kt index 797d08c..e5adf84 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/TestStoreBuilder.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/TestStoreBuilder.kt @@ -16,199 +16,246 @@ import com.nytimes.android.external.store3.pipeline.withCache import com.nytimes.android.external.store3.pipeline.withKeyConverter import com.nytimes.android.external.store3.pipeline.withNonFlowPersister import com.nytimes.android.external.store3.util.KeyParser +import com.nytimes.android.external.store4.RealInternalCoroutineStore +import com.nytimes.android.external.store4.SourceOfTruth +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.flow data class TestStoreBuilder( - private val buildStore: () -> Store, - private val buildPipelineStore: () -> Store + private val buildStore: () -> Store, + private val buildPipelineStore: () -> Store, + private val builRealInternalCoroutineStore: () -> Store ) { fun build(storeType: TestStoreType): Store = when (storeType) { TestStoreType.Store -> buildStore() TestStoreType.Pipeline -> buildPipelineStore() + TestStoreType.CoroutineInternal -> builRealInternalCoroutineStore() } companion object { fun from( - inflight: Boolean = true, - fetcher: suspend (Key) -> Output + scope: CoroutineScope, + inflight: Boolean = true, + fetcher: suspend (Key) -> Output ): TestStoreBuilder = from( - inflight = inflight, - persister = null, - fetchParser = null, - fetcher = fetcher + scope = scope, + inflight = inflight, + persister = null, + fetchParser = null, + fetcher = fetcher ) fun from( - inflight: Boolean = true, - fetcher: suspend (Key) -> Output, - fetchParser : KeyParser + scope: CoroutineScope, + inflight: Boolean = true, + fetcher: suspend (Key) -> Output, + fetchParser: KeyParser ): TestStoreBuilder = from( - inflight = inflight, - persister = null, - fetchParser = fetchParser, - fetcher = fetcher + scope = scope, + inflight = inflight, + persister = null, + fetchParser = fetchParser, + fetcher = fetcher ) fun from( - inflight: Boolean = true, - fetcher: Fetcher, - fetchParser : Parser, - persister: Persister + scope: CoroutineScope, + inflight: Boolean = true, + fetcher: Fetcher, + fetchParser: Parser, + persister: Persister ): TestStoreBuilder = from( - inflight = inflight, - persister = persister, - fetchParser = object : KeyParser { - override suspend fun apply(key: Key, raw: Output): Output { - return fetchParser.apply(raw) - } - }, - fetcher = fetcher + scope = scope, + inflight = inflight, + persister = persister, + fetchParser = object : KeyParser { + override suspend fun apply(key: Key, raw: Output): Output { + return fetchParser.apply(raw) + } + }, + fetcher = fetcher ) fun fromPostParser( - inflight: Boolean = true, - fetcher: Fetcher, - postParser : Parser, - persister: Persister + scope: CoroutineScope, + inflight: Boolean = true, + fetcher: Fetcher, + postParser: Parser, + persister: Persister ): TestStoreBuilder = from( - inflight = inflight, - persister = persister, - postParser = object : KeyParser { - override suspend fun apply(key: Key, raw: Output): Output { - return postParser.apply(raw) - } - }, - fetcher = fetcher + scope = scope, + inflight = inflight, + persister = persister, + postParser = object : KeyParser { + override suspend fun apply(key: Key, raw: Output): Output { + return postParser.apply(raw) + } + }, + fetcher = fetcher ) @Suppress("UNCHECKED_CAST") fun from( - inflight: Boolean = true, - cached : Boolean = false, - cacheMemoryPolicy: MemoryPolicy? = null, - persister: Persister? = null, - persisterStalePolicy: StalePolicy = StalePolicy.UNSPECIFIED, - fetchParser: KeyParser? = null, - fetcher: suspend (Key) -> Output + scope: CoroutineScope, + inflight: Boolean = true, + cached: Boolean = false, + cacheMemoryPolicy: MemoryPolicy? = null, + persister: Persister? = null, + persisterStalePolicy: StalePolicy = StalePolicy.UNSPECIFIED, + fetchParser: KeyParser? = null, + fetcher: suspend (Key) -> Output ): TestStoreBuilder = from( - inflight = inflight, - cached = cached, - cacheMemoryPolicy = cacheMemoryPolicy, - persister = persister, - persisterStalePolicy = persisterStalePolicy, - fetchParser = fetchParser, - fetcher = object : Fetcher { - override suspend fun fetch(key: Key): Output = fetcher(key) - } + scope = scope, + inflight = inflight, + cached = cached, + cacheMemoryPolicy = cacheMemoryPolicy, + persister = persister, + persisterStalePolicy = persisterStalePolicy, + fetchParser = fetchParser, + fetcher = object : Fetcher { + override suspend fun fetch(key: Key): Output = fetcher(key) + } ) + @Suppress("UNCHECKED_CAST") fun from( - inflight: Boolean = true, - cached : Boolean = false, - cacheMemoryPolicy: MemoryPolicy? = null, - persister: Persister? = null, - persisterStalePolicy: StalePolicy = StalePolicy.UNSPECIFIED, - // parser that runs after fetch - fetchParser: KeyParser? = null, - // parser that runs after get from db - postParser: KeyParser? = null, - fetcher: Fetcher + scope: CoroutineScope, + inflight: Boolean = true, + cached: Boolean = false, + cacheMemoryPolicy: MemoryPolicy? = null, + persister: Persister? = null, + persisterStalePolicy: StalePolicy = StalePolicy.UNSPECIFIED, + // parser that runs after fetch + fetchParser: KeyParser? = null, + // parser that runs after get from db + postParser: KeyParser? = null, + fetcher: Fetcher ): TestStoreBuilder { return TestStoreBuilder( - buildStore = { - Store.from( - inflight = inflight, - f = fetcher - ).let { - if (fetchParser == null) { - it - } else { - it.parser(fetchParser) + buildStore = { + Store.from( + inflight = inflight, + f = fetcher + ).let { + if (fetchParser == null) { + it + } else { + it.parser(fetchParser) + } + }.let { + if (persister == null) { + it + } else { + it.persister(persister, persisterStalePolicy) + } + }.let { + if (postParser == null) { + it + } else { + it.parser(postParser) + } + }.let { + if (cached) { + it.cache(cacheMemoryPolicy) + } else { + it + } + }.open() + }, + buildPipelineStore = { + beginPipeline( + fetcher = { + flow { + emit(fetcher.fetch(it)) } - }.let { + } + ).let { + if (fetchParser == null) { + it + } else { + it.withKeyConverter { key, oldOutput -> + fetchParser.apply(key, oldOutput) + } + } + }.let { + if (persister == null) { + it + } else { + it.withNonFlowPersister( + reader = { + persister.read(it) + }, + writer = { key, value -> + persister.write(key, value) + }, + delete = if (persister is Clearable<*>) { + SuspendWrapper( + (persister as Clearable)::clear + )::apply + } else { + null + } + ) + } + }.let { + if (cached) { + it.withCache(cacheMemoryPolicy) + } else { + it + } + }.let { + if (postParser == null) { + it + } else { + it.withKeyConverter { key, oldOutput -> + postParser.apply(key, oldOutput) + } + } + }.open() + }, + builRealInternalCoroutineStore = { + RealInternalCoroutineStore + .beginWithFlowingFetcher { key: Key -> + flow { + val value = fetcher.fetch(key = key) + if (fetchParser != null) { + emit(fetchParser.apply(key, value)) + } else { + emit(value) + } + } + } + .scope(scope) + .let { if (persister == null) { it } else { - it.persister(persister, persisterStalePolicy) + it.sourceOfTruth(SourceOfTruth.fromLegacy(persister, postParser)) } - }.let { - if (postParser == null) { - it - } else { - it.parser(postParser) - } - }.let { + } + .let { if (cached) { - it.cache(cacheMemoryPolicy) - } else { - it - } - }.open() - }, - buildPipelineStore = { - beginPipeline( - fetcher = { - flow { - emit(fetcher.fetch(it)) - } - } - ).let { - if (fetchParser == null) { it } else { - it.withKeyConverter { key, oldOutput -> - fetchParser.apply(key, oldOutput) - } + it.disableCache() } - }.let { - if (persister == null) { - it - } else { - it.withNonFlowPersister( - reader = { - persister.read(it) - }, - writer = { key, value -> - persister.write(key, value) - }, - delete = if (persister is Clearable<*>) { - SuspendWrapper( - (persister as Clearable)::clear - )::apply - } else { - null - } - ) - } - }.let { - if (cached) { - it.withCache(cacheMemoryPolicy) - } else { - it - } - }.let { - if (postParser == null) { - it - } else { - it.withKeyConverter { key, oldOutput -> - postParser.apply(key, oldOutput) - } - } - }.open() - } + } + .build().asLegacyStore() + } ) } } // wraps a regular fun to suspend, couldn't figure out how to create suspend fun variables :/ private class SuspendWrapper( - val f : (P0) -> R + val f: (P0) -> R ) { - suspend fun apply(input : P0) : R = f(input) + suspend fun apply(input: P0): R = f(input) } } enum class TestStoreType { Store, - Pipeline -} \ No newline at end of file + Pipeline, + CoroutineInternal +} diff --git a/store/src/test/java/com/nytimes/android/external/store3/pipeline/PipelineStoreTest.kt b/store/src/test/java/com/nytimes/android/external/store3/pipeline/PipelineStoreTest.kt index 82fac52..79a99c3 100644 --- a/store/src/test/java/com/nytimes/android/external/store3/pipeline/PipelineStoreTest.kt +++ b/store/src/test/java/com/nytimes/android/external/store3/pipeline/PipelineStoreTest.kt @@ -1,10 +1,12 @@ package com.nytimes.android.external.store3.pipeline +import com.nytimes.android.external.store3.TestStoreType import com.nytimes.android.external.store3.pipeline.ResponseOrigin.Cache import com.nytimes.android.external.store3.pipeline.ResponseOrigin.Fetcher import com.nytimes.android.external.store3.pipeline.ResponseOrigin.Persister import com.nytimes.android.external.store3.pipeline.StoreResponse.Data import com.nytimes.android.external.store3.pipeline.StoreResponse.Loading +import com.nytimes.android.external.store4.RealInternalCoroutineStore import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow @@ -14,17 +16,18 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.take import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.TestCoroutineScope import kotlinx.coroutines.test.runBlockingTest import org.assertj.core.api.Assertions.assertThat import org.junit.Test import org.junit.runner.RunWith -import org.junit.runners.JUnit4 +import org.junit.runners.Parameterized @ExperimentalCoroutinesApi -@RunWith(JUnit4::class) -class PipelineStoreTest { +@RunWith(Parameterized::class) +class PipelineStoreTest( + private val storeType: TestStoreType +) { private val testScope = TestCoroutineScope() @Test @@ -33,10 +36,12 @@ class PipelineStoreTest { 3 to "three-1", 3 to "three-2" ) - val pipeline = beginNonFlowingPipeline(fetcher::fetch) - .withCache() + val pipeline = build( + nonFlowingFetcher = fetcher::fetch, + enableCache = true + ) pipeline.stream(StoreRequest.cached(3, refresh = false)) - .assertCompleteStream( + .assertItems( Loading( origin = Fetcher ), Data( @@ -45,7 +50,7 @@ class PipelineStoreTest { ) ) pipeline.stream(StoreRequest.cached(3, refresh = false)) - .assertCompleteStream( + .assertItems( Data( value = "three-1", origin = Cache @@ -71,18 +76,18 @@ class PipelineStoreTest { } @Test - fun getAndFresh_withPersister() = runBlocking { + fun getAndFresh_withPersister() = testScope.runBlockingTest { val fetcher = FakeFetcher( 3 to "three-1", 3 to "three-2" ) val persister = InMemoryPersister() - val pipeline = beginNonFlowingPipeline(fetcher::fetch) - .withNonFlowPersister( - reader = persister::read, - writer = persister::write - ) - .withCache() + val pipeline = build( + nonFlowingFetcher = fetcher::fetch, + persisterReader = persister::read, + persisterWriter = persister::write, + enableCache = true + ) pipeline.stream(StoreRequest.cached(3, refresh = false)) .assertItems( Loading( @@ -127,12 +132,12 @@ class PipelineStoreTest { ) val persister = InMemoryPersister() - val pipeline = beginNonFlowingPipeline(fetcher::fetch) - .withNonFlowPersister( - reader = persister::read, - writer = persister::write - ) - .withCache() + val pipeline = build( + nonFlowingFetcher = fetcher::fetch, + persisterReader = persister::read, + persisterWriter = persister::write, + enableCache = true + ) pipeline.stream(StoreRequest.cached(3, refresh = true)) .assertItems( @@ -171,11 +176,13 @@ class PipelineStoreTest { 3 to "three-1", 3 to "three-2" ) - val pipeline = beginNonFlowingPipeline(fetcher::fetch) - .withCache() + val pipeline = build( + nonFlowingFetcher = fetcher::fetch, + enableCache = true + ) pipeline.stream(StoreRequest.cached(3, refresh = true)) - .assertCompleteStream( + .assertItems( Loading( origin = Fetcher ), @@ -186,7 +193,7 @@ class PipelineStoreTest { ) pipeline.stream(StoreRequest.cached(3, refresh = true)) - .assertCompleteStream( + .assertItems( Data( value = "three-1", origin = Cache @@ -207,11 +214,13 @@ class PipelineStoreTest { 3 to "three-1", 3 to "three-2" ) - val pipeline = beginNonFlowingPipeline(fetcher::fetch) - .withCache() + val pipeline = build( + nonFlowingFetcher = fetcher::fetch, + enableCache = true + ) pipeline.stream(StoreRequest.skipMemory(3, refresh = false)) - .assertCompleteStream( + .assertItems( Loading( origin = Fetcher ), @@ -222,7 +231,7 @@ class PipelineStoreTest { ) pipeline.stream(StoreRequest.skipMemory(3, refresh = false)) - .assertCompleteStream( + .assertItems( Loading( origin = Fetcher ), @@ -241,11 +250,13 @@ class PipelineStoreTest { ) val persister = InMemoryPersister() - val pipeline = beginPipeline(fetcher::createFlow) - .withNonFlowPersister( - reader = persister::read, - writer = persister::write - ) + val pipeline = build( + flowingFetcher = fetcher::createFlow, + persisterReader = persister::read, + persisterWriter = persister::write, + enableCache = false + ) + pipeline.stream(StoreRequest.fresh(3)) .assertItems( Loading( @@ -260,37 +271,40 @@ class PipelineStoreTest { origin = Fetcher ) ) - - pipeline.stream(StoreRequest.cached(3, refresh = true)).assertItems( - Data( - value = "three-2", - origin = Persister - ), - Loading( - origin = Fetcher - ), - Data( - value = "three-1", - origin = Fetcher - ), - Data( - value = "three-2", - origin = Fetcher + pipeline.stream(StoreRequest.cached(3, refresh = true)) + .assertItems( + Data( + value = "three-2", + origin = Persister + ), + Loading( + origin = Fetcher + ), + Data( + value = "three-1", + origin = Fetcher + ), + Data( + value = "three-2", + origin = Fetcher + ) ) - ) } @Test fun diskChangeWhileNetworkIsFlowing_simple() = testScope.runBlockingTest { val persister = InMemoryPersister().asObservable() - val pipeline = beginPipeline { - flow { - // never emit - } - }.withPersister( - reader = persister::flowReader, - writer = persister::flowWriter + val pipeline = build( + flowingFetcher = { + flow { + + } + }, + flowingPersisterReader = persister::flowReader, + persisterWriter = persister::flowWriter, + enableCache = false ) + launch { delay(10) persister.flowWriter(3, "local-1") @@ -311,16 +325,18 @@ class PipelineStoreTest { @Test fun diskChangeWhileNetworkIsFlowing_overwrite() = testScope.runBlockingTest { val persister = InMemoryPersister().asObservable() - val pipeline = beginPipeline { - flow { - delay(10) - emit("three-1") - delay(10) - emit("three-2") - } - }.withPersister( - reader = persister::flowReader, - writer = persister::flowWriter + val pipeline = build( + flowingFetcher = { + flow { + delay(10) + emit("three-1") + delay(10) + emit("three-2") + } + }, + flowingPersisterReader = persister::flowReader, + persisterWriter = persister::flowWriter, + enableCache = false ) launch { delay(5) @@ -357,11 +373,13 @@ class PipelineStoreTest { fun errorTest() = testScope.runBlockingTest { val exception = IllegalArgumentException("wow") val persister = InMemoryPersister().asObservable() - val pipeline = beginNonFlowingPipeline { key: Int -> - throw exception - }.withPersister( - reader = persister::flowReader, - writer = persister::flowWriter + val pipeline = build( + nonFlowingFetcher = { + throw exception + }, + flowingPersisterReader = persister::flowReader, + persisterWriter = persister::flowWriter, + enableCache = false ) launch { delay(10) @@ -420,8 +438,11 @@ class PipelineStoreTest { responses.filter { it.first == key }.forEach { - emit(it.second) + // we delay here to avoid collapsing fetcher values, otherwise, there is a + // possibility that consumer won't be fast enough to get both values before new + // value overrides the previous one. delay(1) + emit(it.second) } } } @@ -441,7 +462,7 @@ class PipelineStoreTest { } } - private class InMemoryPersister { + class InMemoryPersister { private val data = mutableMapOf() @Suppress("RedundantSuspendModifier")// for function reference @@ -468,12 +489,93 @@ class PipelineStoreTest { .isEqualTo(expected.toList()) } - /** - * Takes all elements from the stream and asserts them. - * Use this if test does not have an infinite flow (e.g. no persister or no infinite fetcher) - */ - private suspend fun Flow.assertCompleteStream(vararg expected: T) { - assertThat(this.toList()) - .isEqualTo(expected.toList()) + private fun build( + nonFlowingFetcher: (suspend (Key) -> Input)? = null, + flowingFetcher: ((Key) -> Flow)? = null, + persisterReader: (suspend (Key) -> Output?)? = null, + flowingPersisterReader: ((Key) -> Flow)? = null, + persisterWriter: (suspend (Key, Input) -> Unit)? = null, + persisterDelete: (suspend (Key) -> Unit)? = null, + enableCache: Boolean + ): PipelineStore { + check(nonFlowingFetcher != null || flowingFetcher != null) { + "need to provide a fetcher" + } + check(nonFlowingFetcher == null || flowingFetcher == null) { + "need 1 fetcher" + } + check(persisterReader == null || flowingPersisterReader == null) { + "need 0 or 1 persister" + } + if (storeType == TestStoreType.Pipeline) { + return if (nonFlowingFetcher != null) { + beginNonFlowingPipeline(nonFlowingFetcher) + } else { + beginPipeline(flowingFetcher!!) + }.let { + when { + flowingPersisterReader != null -> it.withPersister( + reader = flowingPersisterReader, + writer = persisterWriter!!, + delete = persisterDelete + ) + persisterReader != null -> it.withNonFlowPersister( + reader = persisterReader, + writer = persisterWriter!!, + delete = persisterDelete + ) + else -> it as PipelineStore + } + }.let { + if (enableCache) { + it.withCache() + } else { + it + } + } + } else if (storeType == TestStoreType.CoroutineInternal) { + return if (nonFlowingFetcher != null) { + RealInternalCoroutineStore.beginWithNonFlowingFetcher( + nonFlowingFetcher + ) + } else { + RealInternalCoroutineStore.beginWithFlowingFetcher( + flowingFetcher!! + ) + }.let { + when { + flowingPersisterReader != null -> it.persister( + reader = flowingPersisterReader, + writer = persisterWriter!!, + delete = persisterDelete + ) + persisterReader != null -> it.nonFlowingPersister( + reader = persisterReader, + writer = persisterWriter!!, + delete = persisterDelete + ) + else -> it + } + }.let { + if (enableCache) { + it + } else { + it.disableCache() + } + }.scope(testScope) + .build() + } else { + throw UnsupportedOperationException("cannot test $storeType") + } } -} \ No newline at end of file + + + companion object { + @JvmStatic + @Parameterized.Parameters(name = "{0}") + fun params() = listOf( + TestStoreType.Pipeline, + TestStoreType.CoroutineInternal + ) + } +} diff --git a/store/src/test/java/com/nytimes/android/external/store4/FetcherControllerTest.kt b/store/src/test/java/com/nytimes/android/external/store4/FetcherControllerTest.kt new file mode 100644 index 0000000..1e938cb --- /dev/null +++ b/store/src/test/java/com/nytimes/android/external/store4/FetcherControllerTest.kt @@ -0,0 +1,78 @@ +package com.nytimes.android.external.store4 + +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +@ExperimentalCoroutinesApi +@FlowPreview +@RunWith(JUnit4::class) +class FetcherControllerTest { + private val testScope = TestCoroutineScope() + @Test + fun simple() = testScope.runBlockingTest { + val fetcherController = FetcherController( + scope = testScope, + realFetcher = { key: Int -> + flow { + emit(key * key) + } + }, + sourceOfTruth = null + ) + val fetcher = fetcherController.getFetcher(3) + assertThat(fetcherController.fetcherSize()).isEqualTo(0) + val received = fetcher.onEach { + assertThat(fetcherController.fetcherSize()).isEqualTo(1) + }.first() + assertThat(received).isEqualTo(9) + assertThat(fetcherController.fetcherSize()).isEqualTo(0) + } + + @Test + fun concurrent() = testScope.runBlockingTest { + var createdCnt = 0 + val fetcherController = FetcherController( + scope = testScope, + realFetcher = { key: Int -> + createdCnt ++ + flow { + // make sure it takes time, otherwise, we may not share + delay(1) + emit(key * key) + } + }, + sourceOfTruth = null + ) + val fetcherCount = 20 + fun createFetcher() = async { + fetcherController.getFetcher(3) + .onEach { + assertThat(fetcherController.fetcherSize()).isEqualTo(1) + }.first() + } + val fetchers = (0 until fetcherCount).map { + createFetcher() + } + fetchers.forEach { + assertThat(it.await()).isEqualTo(9) + } + assertThat(fetcherController.fetcherSize()).isEqualTo(0) + assertThat(createdCnt).isEqualTo(1) + } +} diff --git a/store/src/test/java/com/nytimes/android/external/store4/SourceOfTruthWithBarrierTest.kt b/store/src/test/java/com/nytimes/android/external/store4/SourceOfTruthWithBarrierTest.kt new file mode 100644 index 0000000..d46e408 --- /dev/null +++ b/store/src/test/java/com/nytimes/android/external/store4/SourceOfTruthWithBarrierTest.kt @@ -0,0 +1,66 @@ +package com.nytimes.android.external.store4 + +import com.nytimes.android.external.store3.pipeline.PipelineStoreTest +import com.nytimes.android.external.store3.pipeline.ResponseOrigin +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.async +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test + +@FlowPreview +@ExperimentalCoroutinesApi +class SourceOfTruthWithBarrierTest { + private val testScope = TestCoroutineScope() + private val persister = PipelineStoreTest.InMemoryPersister() + private val delegate: SourceOfTruth = + PersistentSourceOfTruth( + realReader = { key -> + flow { + emit(persister.read(key)) + } + }, + realWriter = persister::write, + realDelete = null + ) + private val source = SourceOfTruthWithBarrier( + delegate = delegate + ) + + @Test + fun simple() = testScope.runBlockingTest { + val collector = async { + source.reader(1, CompletableDeferred(Unit)).take(2).toList() + } + source.write(1, "a") + assertThat(collector.await()).isEqualTo( + listOf( + DataWithOrigin(delegate.defaultOrigin, null), + DataWithOrigin(ResponseOrigin.Fetcher, "a") + ) + ) + assertThat(source.barrierCount()).isEqualTo(0) + } + + @Test + fun preAndPostWrites() = testScope.runBlockingTest { + source.write(1, "a") + val collector = async { + source.reader(1, CompletableDeferred(Unit)).take(2).toList() + } + source.write(1, "b") + assertThat(collector.await()).isEqualTo( + listOf( + DataWithOrigin(delegate.defaultOrigin, "a"), + DataWithOrigin(ResponseOrigin.Fetcher, "b") + ) + ) + assertThat(source.barrierCount()).isEqualTo(0) + } +} diff --git a/store/src/test/java/com/nytimes/android/external/store4/multiplex/ChannelManagerTest.kt b/store/src/test/java/com/nytimes/android/external/store4/multiplex/ChannelManagerTest.kt new file mode 100644 index 0000000..1e801f0 --- /dev/null +++ b/store/src/test/java/com/nytimes/android/external/store4/multiplex/ChannelManagerTest.kt @@ -0,0 +1,63 @@ +package com.nytimes.android.external.store4.multiplex + +import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.AddChannel +import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.DispatchValue +import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.RemoveChannel +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.async +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 + +@FlowPreview +@ExperimentalCoroutinesApi +@RunWith(JUnit4::class) +class ChannelManagerTest { + private val scope = TestCoroutineScope() + private val manager = ChannelManager( + scope, + 0, + onEach = {} + ) { + SharedFlowProducer( + scope, src = + flow { + suspendCancellableCoroutine { + // never end + } + }, + channelManager = it + ) + } + + @Test + fun simple() = scope.runBlockingTest { + val collection = async { + val channel = Channel>(Channel.UNLIMITED) + try { + manager.send(AddChannel(channel)) + channel.consumeAsFlow().take(2).toList() + .map { it.value } + } finally { + manager.send(RemoveChannel(channel)) + } + } + val ack1 = CompletableDeferred() + manager.send(DispatchValue("a", ack1)) + + val ack2 = CompletableDeferred() + manager.send(DispatchValue("b", ack2)) + assertThat(collection.await()).isEqualTo(listOf("a", "b")) + } +} diff --git a/store/src/test/java/com/nytimes/android/external/store4/multiplex/MultiplexTest.kt b/store/src/test/java/com/nytimes/android/external/store4/multiplex/MultiplexTest.kt new file mode 100644 index 0000000..1d92492 --- /dev/null +++ b/store/src/test/java/com/nytimes/android/external/store4/multiplex/MultiplexTest.kt @@ -0,0 +1,300 @@ +package com.nytimes.android.external.store4.multiplex + +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.async +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest +import kotlinx.coroutines.yield +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 + +@FlowPreview +@ExperimentalCoroutinesApi +@RunWith(JUnit4::class) +class MultiplexTest { + private val testScope = TestCoroutineScope() + + private fun createMultiplexer(f: () -> Flow): Multiplexer { + return Multiplexer(testScope, 0, f, {}) + } + + @Test + fun serialial_notShared() = testScope.runBlockingTest { + var createCnt = 0 + val activeFlow = createMultiplexer { + createCnt++ + when (createCnt) { + 1 -> flowOf("a", "b", "c") + 2 -> flowOf("d", "e", "f") + else -> throw AssertionError("should not create more") + } + } + assertThat(activeFlow.create().toList()) + .isEqualTo(listOf("a", "b", "c")) + assertThat(activeFlow.create().toList()) + .isEqualTo(listOf("d", "e", "f")) + } + + @Test + fun slowFastCollector() = testScope.runBlockingTest { + val activeFlow = createMultiplexer { + flowOf("a", "b", "c").onStart { + // make sure both registers on time so that no one drops a value + delay(100) + } + } + val c1 = async { + activeFlow.create().onEach { + delay(100) + }.toList() + } + val c2 = async { + activeFlow.create().onEach { + delay(200) + }.toList() + } + assertThat(c1.await()) + .isEqualTo(listOf("a", "b", "c")) + assertThat(c2.await()) + .isEqualTo(listOf("a", "b", "c")) + } + + @Test + fun slowDispatcher() = testScope.runBlockingTest { + val activeFlow = createMultiplexer { + flowOf("a", "b", "c").onEach { + delay(100) + } + } + val c1 = async { + activeFlow.create().toList() + } + val c2 = async { + activeFlow.create().toList() + } + assertThat(c1.await()).isEqualTo(listOf("a", "b", "c")) + assertThat(c2.await()).isEqualTo(listOf("a", "b", "c")) + } + + @Test + fun lateToTheParty_arrivesAfterUpstreamClosed() = testScope.runBlockingTest { + val activeFlow = createMultiplexer { + flowOf("a", "b", "c").onStart { + delay(100) + } + } + val c1 = async { + activeFlow.create().toList() + } + val c2 = async { + activeFlow.create().also { + delay(110) + }.toList() + } + assertThat(c1.await()).isEqualTo(listOf("a", "b", "c")) + assertThat(c2.await()).isEqualTo(listOf("a", "b", "c")) + } + + @Test + fun lateToTheParty_arrivesBeforeUpstreamClosed() = testScope.runBlockingTest { + var generationCounter = 0 + val activeFlow = createMultiplexer { + flow { + val gen = generationCounter++ + check(gen < 2) { + "created one too many" + } + emit("a_$gen") + delay(5) + emit("b_$gen") + delay(100) + } + } + val c1 = async { + activeFlow.create().onEach { + }.toList() + } + val c2 = async { + activeFlow.create().also { + delay(3) + }.toList() + } + val c3 = async { + activeFlow.create().also { + delay(20) + }.toList() + } + val lists = listOf(c1, c2, c3).map { + it.await() + } + assertThat(lists[0]).isEqualTo(listOf("a_0", "b_0")) + assertThat(lists[1]).isEqualTo(listOf("b_0")) + assertThat(lists[2]).isEqualTo(listOf("a_1", "b_1")) + } + + @Test + fun upstreamError() = testScope.runBlockingTest { + val exception = + MyCustomException("hey") + val activeFlow = createMultiplexer { + flow { + emit("a") + throw exception + } + } + val receivedValue = CompletableDeferred() + val receivedError = CompletableDeferred() + activeFlow.create() + .onEach { + check(receivedValue.isActive) { + "already received value" + } + receivedValue.complete(it) + }.catch { + check(receivedError.isActive) { + "already received error" + } + receivedError.complete(it) + }.toList() + assertThat(receivedValue.await()).isEqualTo("a") + val error = receivedError.await() + assertThat(error).isEqualTo(exception) + } + + @Test + fun upstreamError_secondJustGetsError() = testScope.runBlockingTest { + val exception = + MyCustomException("hey") + val dispatchedFirstValue = CompletableDeferred() + val registeredSecondCollector = CompletableDeferred() + val activeFlow = createMultiplexer { + flow { + emit("a") + dispatchedFirstValue.complete(Unit) + registeredSecondCollector.await() + yield() //yield to allow second collector to register + throw exception + } + } + launch { + activeFlow.create().catch { + + }.toList() + } + // wait until the above collector registers and receives first value + dispatchedFirstValue.await() + val receivedValue = CompletableDeferred() + val receivedError = CompletableDeferred() + activeFlow.create() + .onStart { + registeredSecondCollector.complete(Unit) + } + .onEach { + receivedValue.complete(it) + }.catch { + check(receivedError.isActive) { + "already received error" + } + receivedError.complete(it) + }.toList() + val error = receivedError.await() + assertThat(error).isEqualTo(exception) + // test sanity, second collector never receives a value + assertThat(receivedValue.isActive).isTrue() + } + + @Test + fun lateArrival_unregistersFromTheCorrectManager() = testScope.runBlockingTest { + var createdCount = 0 + var didntFinish = false + val activeFlow = createMultiplexer { + flow { + check(createdCount < 2) { + "created 1 too many" + } + val index = ++createdCount + emit("a_$index") + emit("b_$index") + delay(100) + if (index == 2) { + didntFinish = true + } + } + } + val firstCollector = async { + activeFlow.create().onEach { delay(5) }.take(2).toList() + } + delay(11) // miss first two values + val secondCollector = async { + // this will come in a new channel + activeFlow.create().take(2).toList() + } + assertThat(firstCollector.await()).isEqualTo(listOf("a_1", "b_1")) + assertThat(secondCollector.await()).isEqualTo(listOf("a_2", "b_2")) + assertThat(createdCount).isEqualTo(2) + delay(200) + assertThat(didntFinish).isEqualTo(false) + } + + @Test + fun lateArrival_buffered() = testScope.runBlockingTest { + var createdCount = 0 + val activeFlow = Multiplexer( + scope = testScope, + bufferSize = 2, + source = { + createdCount++ + flow { + emit("a") + delay(5) + emit("b") + emit("c") + emit("d") + delay(100) + emit("e") + // dont finish to see the buffer behavior + delay(2000) + } + }, + onEach = {} + ) + val c1 = async { + activeFlow.create().toList() + } + delay(4)// c2 misses first value + val c2 = async { + activeFlow.create().toList() + } + delay(50) // c3 misses first 4 values + val c3 = async { + activeFlow.create().toList() + } + delay(100) // c4 misses all values + val c4 = async { + activeFlow.create().toList() + } + assertThat(c1.await()).isEqualTo(listOf("a", "b", "c", "d", "e")) + assertThat(c2.await()).isEqualTo(listOf("a", "b", "c", "d", "e")) + assertThat(c3.await()).isEqualTo(listOf("c", "d", "e")) + assertThat(c4.await()).isEqualTo(listOf("d", "e")) + assertThat(createdCount).isEqualTo(1) + } + + class MyCustomException(val x: String) : RuntimeException("hello") { + override fun toString() = "custom$x" + } +}