Internal Store 3rd attempt (#35)

* WIP publish test

* add actor based implementation, seems the most promising

* add notes into channel manager as well

* use unlimited channel on the receiver to avoid launching to send

* carry over remaining subscribers into a new flow

* dispatch errors from upstream to all downstreams

* carry over all leftovers at once to avoid starting producer before all is added to the list

* handle swapping channel managers in the consumer

This CL fixes an issue where we wouldn't unsubscribe from the right channel if
the downstream is moved between channel managers due to not receiving any event
after registering.

I've also cleaned up dispatchError to close the channel with error instead of
passing it down as if it is value and throwing again

* allow live buffering

this adds a live buffering functionality to actor publish where
it only buffers if the upstream is still running

* remove logging

* move into src

* code cleanup, more comments

* first shot at new internal store, tests pass, code ugly

* tmp builder for real internal store, starting pipeline tests

* more wip in fixing pipeline store tests, a lot to cleanup

* all tests pass

* code cleanup

* move multiplexer inside store4

* lots of cleanup of unnecessary code

* release barriers that are not used

* don't use pipeline persister.
also fixed a barrier cleanup code in source of truth with barrier

* revert simple persister as flowable change
cleanup for clearstorememorytest

* close multiplexers in fetcher controller when not used

* code style fixes
This commit is contained in:
Yigit Boyar 2019-11-03 07:41:32 -08:00 committed by Mike Nakhimovich
parent e9d6a91d83
commit ba22d0c66c
33 changed files with 2209 additions and 361 deletions

3
.gitignore vendored
View file

@ -40,6 +40,9 @@ captures/
# Intellij
*.iml
.idea/
.classpath
.project
.settings
# Keystore files
*.jks

View file

@ -21,7 +21,7 @@ buildscript {
]
dependencies {
classpath 'com.android.tools.build:gradle:3.6.0-alpha11'
classpath 'com.android.tools.build:gradle:3.6.0-beta01'
classpath 'com.getkeepsafe.dexcount:dexcount-gradle-plugin:0.5.6'
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$rootProject.ext.versions.kotlin"
classpath 'org.jetbrains.dokka:dokka-gradle-plugin:0.9.17'
@ -32,7 +32,6 @@ allprojects {
buildscript {}
repositories {
maven { url 'https://oss.sonatype.org/content/repositories/snapshots/'}
mavenCentral()
jcenter()
}

View file

@ -54,7 +54,7 @@ ext.versions = [
supportTestRunner : '0.4.1',
espresso : '2.2.1',
compileTesting : '0.8',
coroutines : '1.3.0',
coroutines : '1.3.2',
]
ext.libraries = [

View file

@ -44,7 +44,8 @@ fun <Key, Output> PipelineStore<Key, Output>.open(): Store<Output, Key> {
override fun stream(key: Key): Flow<Output> = self.stream(
StoreRequest.skipMemory(
key = key,
refresh = true)
refresh = true
)
).transform {
it.throwIfError()
it.dataOrNull()?.let {

View file

@ -74,10 +74,11 @@ sealed class StoreResponse<T>(
else -> null
}
@Suppress("UNCHECKED_CAST")
internal fun <R> swapType(): StoreResponse<R> = when (this) {
is Error -> Error(error, origin)
is Loading -> Loading(origin)
else -> throw IllegalStateException("cannot swap type for $this")
is Data-> Data(value = value as R, origin = origin)
}
}

View file

@ -0,0 +1,52 @@
package com.nytimes.android.external.store4
import com.nytimes.android.external.store4.multiplex.Multiplexer
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
/**
* This class maintains one and only 1 fetcher for a given [Key].
*/
@FlowPreview
@ExperimentalCoroutinesApi
internal class FetcherController<Key, Input, Output>(
private val scope: CoroutineScope,
private val realFetcher: (Key) -> Flow<Input>,
private val sourceOfTruth: SourceOfTruthWithBarrier<Key, Input, Output>?
) {
private val fetchers = RefCountedResource(
create = { key: Key ->
Multiplexer(
scope = scope,
bufferSize = 0,
source = {
realFetcher(key)
},
onEach = {
sourceOfTruth?.write(key, it)
}
)
},
onRelease = { key: Key, multiplexer: Multiplexer<Input> ->
multiplexer.close()
}
)
fun getFetcher(key: Key): Flow<Input> {
return flow {
val fetcher = fetchers.acquire(key)
try {
emitAll(fetcher.create())
} finally {
fetchers.release(key, fetcher)
}
}
}
// visible for testing
internal suspend fun fetcherSize() = fetchers.size()
}

View file

@ -0,0 +1,35 @@
package com.nytimes.android.external.store4
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
/**
* Merge implementation tells downstream what the source is and also uses a rendezvous channel
*/
@ExperimentalCoroutinesApi
internal fun <T, R> Flow<T>.merge(other: Flow<R>): Flow<Either<T, R>> {
return channelFlow<Either<T, R>> {
launch {
this@merge.collect {
send(Either.Left(it))
}
}
launch {
other.collect {
send(Either.Right(it))
}
}
}.buffer(Channel.RENDEZVOUS)
}
internal sealed class Either<T, R> {
data class Left<T, R>(val value: T) : Either<T, R>()
data class Right<T, R>(val value: R) : Either<T, R>()
}

View file

@ -0,0 +1,283 @@
package com.nytimes.android.external.store4
import com.com.nytimes.suspendCache.StoreCache
import com.nytimes.android.external.store3.base.impl.MemoryPolicy
import com.nytimes.android.external.store3.base.impl.StoreDefaults
import com.nytimes.android.external.store3.pipeline.CacheType
import com.nytimes.android.external.store3.pipeline.PipelineStore
import com.nytimes.android.external.store3.pipeline.ResponseOrigin
import com.nytimes.android.external.store3.pipeline.StoreRequest
import com.nytimes.android.external.store3.pipeline.StoreResponse
import com.nytimes.android.external.store3.pipeline.open
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.flow.withIndex
@ExperimentalCoroutinesApi
@FlowPreview
internal class RealInternalCoroutineStore<Key, Input, Output>(
private val scope: CoroutineScope,
private val fetcher: (Key) -> Flow<Input>,
sourceOfTruth: SourceOfTruth<Key, Input, Output>? = null,
private val memoryPolicy: MemoryPolicy?
) : PipelineStore<Key, Output> {
/**
* This source of truth is either a real database or an in memory source of truth created by
* the builder.
* Whatever is given, we always put a [SourceOfTruthWithBarrier] in front of it so that while
* we write the value from fetcher into the disk, we can block reads to avoid sending new data
* as if it came from the server (the [StoreResponse.origin] field).
*/
private val sourceOfTruth: SourceOfTruthWithBarrier<Key, Input, Output>? =
sourceOfTruth?.let {
SourceOfTruthWithBarrier(it)
}
private val memCache = memoryPolicy?.let {
StoreCache.fromRequest<Key, Output?, StoreRequest<Key>>(
loader = {
TODO(
"""
This should've never been called. We don't need this anymore, should remove
loader after we clean old Store ?
""".trimIndent()
)
},
memoryPolicy = memoryPolicy
)
}
/**
* Fetcher controller maintains 1 and only 1 `Multiplexer` for a given key to ensure network
* requests are shared.
*/
private val fetcherController = FetcherController(
scope = scope,
realFetcher = fetcher,
sourceOfTruth = this.sourceOfTruth
)
override fun stream(request: StoreRequest<Key>): Flow<StoreResponse<Output>> {
return if (sourceOfTruth == null) {
createNetworkFlow(
request = request,
networkLock = null
)
} else {
diskNetworkCombined(request)
}.onEach {
// whenever a value is dispatched, save it to the memory cache
if (it.origin != ResponseOrigin.Cache) {
it.dataOrNull()?.let { data ->
memCache?.put(request.key, data)
}
}
}.onStart {
// if there is anything cached, dispatch it first if requested
if (!request.shouldSkipCache(CacheType.MEMORY)) {
memCache?.getIfPresent(request.key)?.let { cached ->
emit(StoreResponse.Data(value = cached, origin = ResponseOrigin.Cache))
}
}
}
}
override suspend fun clear(key: Key) {
memCache?.invalidate(key)
sourceOfTruth?.delete(key)
}
/**
* We want to stream from disk but also want to refresh. If requested or necessary.
*
* How it works:
* There are two flows:
* Fetcher: The flow we get for the fetching
* Disk: The flow we get from the [SourceOfTruth].
* Both flows are controlled by a lock for each so that we can start the right one based on
* the request status or values we receive.
*
* Value is always returned from [SourceOfTruth] while the errors are dispatched from both the
* `Fetcher` and [SourceOfTruth].
*
* There are two initialization paths:
*
* 1) Request wants to skip disk cache:
* In this case, we first start the fetcher flow. When fetcher flow provides something besides
* an error, we enable the disk flow.
*
* 2) Request does not want to skip disk cache:
* In this case, we first start the disk flow. If disk flow returns `null` or
* [StoreRequest.refresh] is set to `true`, we enable the fetcher flow.
* This ensures we first get the value from disk and then load from server if necessary.
*/
private fun diskNetworkCombined(
request: StoreRequest<Key>
): Flow<StoreResponse<Output>> {
val diskLock = CompletableDeferred<Unit>()
val networkLock = CompletableDeferred<Unit>()
val networkFlow = createNetworkFlow(request, networkLock)
if (!request.shouldSkipCache(CacheType.DISK)) {
diskLock.complete(Unit)
}
val diskFlow = sourceOfTruth!!.reader(request.key, diskLock)
// we use a merge implementation that gives the source of the flow so that we can decide
// based on that.
return networkFlow.merge(diskFlow.withIndex())
.transform {
// left is Fetcher while right is source of truth
if (it is Either.Left) {
if (it.value !is StoreResponse.Data<*>) {
emit(it.value.swapType())
}
// network sent something
if (it.value is StoreResponse.Data<*>) {
// unlocking disk only if network sent data so that fresh data request never
// receives disk data by mistake
diskLock.complete(Unit)
}
} else if (it is Either.Right) {
// right, that is data from disk
val (index, diskData) = it.value
if (diskData.value != null) {
emit(
StoreResponse.Data(
value = diskData.value,
origin = diskData.origin
) as StoreResponse<Output>
)
}
// if this is the first disk value and it is null, we should enable fetcher
// TODO should we ignore the index and always enable?
if (index == 0 && (diskData.value == null || request.refresh)) {
networkLock.complete(Unit)
}
}
}
}
private fun createNetworkFlow(
request: StoreRequest<Key>,
networkLock: CompletableDeferred<Unit>?
): Flow<StoreResponse<Output>> {
return fetcherController
.getFetcher(request.key)
.map {
StoreResponse.Data(
value = it,
origin = ResponseOrigin.Fetcher
) as StoreResponse<Input>
}.catch {
emit(
StoreResponse.Error(
error = it,
origin = ResponseOrigin.Fetcher
)
)
}.onStart {
if (!request.shouldSkipCache(CacheType.DISK)) {
// wait until network gives us the go
networkLock?.await()
}
emit(
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
)
)
}.map {
it.swapType<Output>()
}
}
fun asLegacyStore() = open()
// TODO this builder w/ 3 type args is really ugly, think more about it...
companion object {
fun <Key, Input, Output> beginWithNonFlowingFetcher(
fetcher: suspend (key: Key) -> Input
) = Builder<Key, Input, Output> { key: Key ->
flow {
emit(fetcher(key))
}
}
fun <Key, Input, Output> beginWithFlowingFetcher(
fetcher: (key: Key) -> Flow<Input>
) = Builder<Key, Input, Output>(fetcher)
}
class Builder<Key, Input, Output>(
private val fetcher: (key: Key) -> Flow<Input>
) {
private var scope: CoroutineScope? = null
private var sourceOfTruth: SourceOfTruth<Key, Input, Output>? = null
private var cachePolicy: MemoryPolicy? = StoreDefaults.memoryPolicy
fun scope(scope: CoroutineScope): Builder<Key, Input, Output> {
this.scope = scope
return this
}
fun nonFlowingPersister(
reader: suspend (Key) -> Output?,
writer: suspend (Key, Input) -> Unit,
delete: (suspend (Key) -> Unit)? = null
): Builder<Key, Input, Output> {
sourceOfTruth = PersistentNonFlowingSourceOfTruth(
realReader = reader,
realWriter = writer,
realDelete = delete
)
return this
}
fun persister(
reader: (Key) -> Flow<Output?>,
writer: suspend (Key, Input) -> Unit,
delete: (suspend (Key) -> Unit)? = null
): Builder<Key, Input, Output> {
sourceOfTruth = PersistentSourceOfTruth(
realReader = reader,
realWriter = writer,
realDelete = delete
)
return this
}
fun sourceOfTruth(
sourceOfTruth: SourceOfTruth<Key, Input, Output>
): Builder<Key, Input, Output> {
this.sourceOfTruth = sourceOfTruth
return this
}
fun cachePolicy(memoryPolicy: MemoryPolicy?): Builder<Key, Input, Output> {
cachePolicy = memoryPolicy
return this
}
fun disableCache(): Builder<Key, Input, Output> {
cachePolicy = null
return this
}
fun build(): RealInternalCoroutineStore<Key, Input, Output> {
@Suppress("UNCHECKED_CAST")
return RealInternalCoroutineStore(
scope = scope ?: GlobalScope,
sourceOfTruth = sourceOfTruth,
fetcher = fetcher,
memoryPolicy = cachePolicy
)
}
}
}

View file

@ -0,0 +1,45 @@
package com.nytimes.android.external.store4
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
/**
* Simple holder that can ref-count items by a given key.
*/
internal class RefCountedResource<Key, T>(
private val create: suspend (Key) -> T,
private val onRelease : (suspend (Key, T) -> Unit)? = null
) {
private val items = mutableMapOf<Key, Item>()
private val lock = Mutex()
suspend fun acquire(key: Key) : T = lock.withLock {
items.getOrPut(key) {
Item(create(key))
}.also {
it.refCount ++
}.value
}
suspend fun release(key: Key, value : T) = lock.withLock {
val existing = items[key]
check(existing != null && existing.value === value) {
"inconsistent release, seems like $value was leaked or never acquired"
}
existing.refCount --
if (existing.refCount < 1) {
items.remove(key)
onRelease?.invoke(key, value)
}
}
// used in tests
suspend fun size() = lock.withLock {
items.size
}
private inner class Item(
val value: T,
var refCount: Int = 0
)
}

View file

@ -0,0 +1,90 @@
package com.nytimes.android.external.store4
import com.nytimes.android.external.store3.base.Clearable
import com.nytimes.android.external.store3.base.Persister
import com.nytimes.android.external.store3.pipeline.ResponseOrigin
import com.nytimes.android.external.store3.util.KeyParser
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
/**
* Source of truth takes care of making any source (no matter if it has flowing reads or not) into
* a common flowing API. Used w/ a [SourceOfTruthWithBarrier] in front of it in the
* [RealInternalCoroutineStore] implementation to avoid dispatching values to downstream while
* a write is in progress.
*/
internal interface SourceOfTruth<Key, Input, Output> {
val defaultOrigin: ResponseOrigin
fun reader(key: Key): Flow<Output?>
suspend fun write(key: Key, value: Input)
suspend fun delete(key: Key)
// for testing
suspend fun getSize(): Int
companion object {
fun <Key, Output> fromLegacy(
persister: Persister<Output, Key>,
// parser that runs after get from db
postParser: KeyParser<Key, Output, Output>? = null
): SourceOfTruth<Key, Output, Output> {
return PersistentSourceOfTruth(
realReader = { key ->
flow {
if (postParser == null) {
emit(persister.read(key))
} else {
persister.read(key)?.let {
val postParsed = postParser.apply(key, it)
emit(postParsed)
} ?: emit(null)
}
}
},
realWriter = { key, value ->
persister.write(key, value)
},
realDelete = { key ->
(persister as? Clearable<Key>)?.clear(key)
}
)
}
}
}
internal class PersistentSourceOfTruth<Key, Input, Output>(
private val realReader: (Key) -> Flow<Output?>,
private val realWriter: suspend (Key, Input) -> Unit,
private val realDelete: (suspend (Key) -> Unit)? = null
) : SourceOfTruth<Key, Input, Output> {
override val defaultOrigin = ResponseOrigin.Persister
override fun reader(key: Key): Flow<Output?> = realReader(key)
override suspend fun write(key: Key, value: Input) = realWriter(key, value)
override suspend fun delete(key: Key) {
realDelete?.invoke(key)
}
// for testing
override suspend fun getSize(): Int {
throw UnsupportedOperationException("not supported for persistent")
}
}
internal class PersistentNonFlowingSourceOfTruth<Key, Input, Output>(
private val realReader: suspend (Key) -> Output?,
private val realWriter: suspend (Key, Input) -> Unit,
private val realDelete: (suspend (Key) -> Unit)? = null
) : SourceOfTruth<Key, Input, Output> {
override val defaultOrigin = ResponseOrigin.Persister
override fun reader(key: Key): Flow<Output?> = flow {
emit(realReader(key))
}
override suspend fun write(key: Key, value: Input) = realWriter(key, value)
override suspend fun delete(key: Key) {
realDelete?.invoke(key)
}
// for testing
override suspend fun getSize(): Int {
throw UnsupportedOperationException("not supported for persistent")
}
}

View file

@ -0,0 +1,122 @@
package com.nytimes.android.external.store4
import com.nytimes.android.external.store3.pipeline.ResponseOrigin
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collectIndexed
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import java.util.concurrent.atomic.AtomicLong
/**
* Wraps a [SourceOfTruth] and blocks reads while a write is in progress.
*/
@FlowPreview
@ExperimentalCoroutinesApi
internal class SourceOfTruthWithBarrier<Key, Input, Output>(
private val delegate: SourceOfTruth<Key, Input, Output>
) {
/**
* Each key has a barrier so that we can block reads while writing.
*/
private val barriers = RefCountedResource<Key, ConflatedBroadcastChannel<BarrierMsg>>(
create = { key ->
ConflatedBroadcastChannel(BarrierMsg.Open.INITIAL)
}
)
/**
* Each message gets dispatched with a version. This ensures we won't accidentally turn on the
* reader flow for a new reader that happens to have arrived while a write is in progress since
* that write should be considered as a disk read for that flow, not fetcher.
*/
private val versionCounter = AtomicLong(0)
fun reader(key: Key, lock: CompletableDeferred<Unit>): Flow<DataWithOrigin<Output>> {
return flow {
val barrier = barriers.acquire(key)
val readerVersion: Long = versionCounter.incrementAndGet()
try {
lock.await()
emitAll(barrier.asFlow()
.flatMapLatest {
val messageArrivedAfterMe = readerVersion < it.version
when (it) {
is BarrierMsg.Open -> delegate.reader(key).mapIndexed { index, output ->
if (index == 0 && messageArrivedAfterMe) {
DataWithOrigin<Output>(
origin = ResponseOrigin.Fetcher,
value = output
)
} else {
DataWithOrigin<Output>(
origin = delegate.defaultOrigin,
value = output
)
}
}
is BarrierMsg.Blocked -> {
flowOf()
}
}
})
} finally {
// we are using a finally here instead of onCompletion as there might be a
// possibility where flow gets cancelled right before `emitAll`.
barriers.release(key, barrier)
}
}
}
suspend fun write(key: Key, value: Input) {
val barrier = barriers.acquire(key)
try {
barrier.send(BarrierMsg.Blocked(versionCounter.incrementAndGet()))
delegate.write(key, value)
barrier.send(BarrierMsg.Open(versionCounter.incrementAndGet()))
} finally {
barriers.release(key, barrier)
}
}
suspend fun delete(key: Key) {
delegate.delete(key)
}
private sealed class BarrierMsg(
val version: Long
) {
class Blocked(version: Long) : BarrierMsg(version)
class Open(version: Long) : BarrierMsg(version) {
companion object {
val INITIAL = Open(INITIAL_VERSION)
}
}
}
// visible for testing
internal suspend fun barrierCount() = barriers.size()
companion object {
private const val INITIAL_VERSION = -1L
}
}
@ExperimentalCoroutinesApi
private inline fun <T, R> Flow<T>.mapIndexed(crossinline block: (Int, T) -> R) = flow {
this@mapIndexed.collectIndexed { index, value ->
emit(block(index, value))
}
}
internal data class DataWithOrigin<T>(
val origin: ResponseOrigin,
val value: T?
)

View file

@ -0,0 +1,321 @@
package com.nytimes.android.external.store4.multiplex
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import java.util.ArrayDeque
import java.util.Collections
/**
* This actor helps tracking active channels and is able to dispatch values to each of them
* in parallel. As soon as one of them receives the value, the ack in the dispatch message is
* completed so that the sender can continue for the next item.
*/
@ExperimentalCoroutinesApi
class ChannelManager<T>(
/**
* The scope in which ChannelManager actor runs
*/
scope: CoroutineScope,
/**
* The buffer size that is used while the upstream is active
*/
bufferSize: Int,
private val onEach: suspend (T) -> Unit,
/**
* Called when the channel manager is active (e.g. it has downstream collectors and needs a
* producer)
*/
private val onActive: (ChannelManager<T>) -> SharedFlowProducer<T>
) : StoreRealActor<ChannelManager.Message<T>>(scope) {
private val buffer = Buffer<T>(bufferSize)
/**
* The current producer
*/
private var producer: SharedFlowProducer<T>? = null
/**
* Tracks whether we've ever dispatched value or error from the current producer.
* Reset when producer finishes.
*/
private var dispatchedValue: Boolean = false
/**
* List of downstream collectors.
*/
private val channels = mutableListOf<ChannelEntry<T>>()
override suspend fun handle(msg: Message<T>) {
when (msg) {
is Message.AddLeftovers -> doAddLefovers(msg.leftovers)
is Message.AddChannel -> doAdd(msg)
is Message.RemoveChannel -> doRemove(msg.channel)
is Message.DispatchValue -> doDispatchValue(msg)
is Message.DispatchError -> doDispatchError(msg)
is Message.UpstreamFinished -> doHandleUpstreamClose(msg.producer)
}
}
/**
* We are closing. Do a cleanup on existing channels where we'll close them and also decide
* on the list of leftovers.
*/
fun doHandleUpstreamClose(producer: SharedFlowProducer<T>?) {
if (this.producer !== producer) {
return
}
val leftovers = mutableListOf<ChannelEntry<T>>()
channels.forEach {
when {
it.receivedValue -> it.close()
dispatchedValue ->
// we dispatched a value but this channel didn't receive so put it into
// leftovers
leftovers.add(it)
else -> // upstream didn't dispatch
it.close()
}
}
channels.clear() // empty references
channels.addAll(leftovers)
this.producer = null
if (leftovers.isNotEmpty()) {
activateIfNecessary(true)
}
}
override fun onClosed() {
producer?.cancel()
}
/**
* Dispatch value to all downstream collectors.
*/
private suspend fun doDispatchValue(msg: Message.DispatchValue<T>) {
onEach(msg.value)
buffer.add(msg)
dispatchedValue = true
channels.forEach {
it.dispatchValue(msg)
}
}
/**
* Dispatch an upstream error to downstream collectors.
*/
private fun doDispatchError(msg: Message.DispatchError<T>) {
// dispatching error is as good as dispatching value
dispatchedValue = true
channels.forEach {
it.dispatchError(msg.error)
}
}
/**
* Remove a downstream collector.
*/
private suspend fun doRemove(channel: Channel<Message.DispatchValue<T>>) {
val index = channels.indexOfFirst {
it.hasChannel(channel)
}
if (index >= 0) {
channels.removeAt(index)
if (channels.isEmpty()) {
producer?.cancelAndJoin()
}
}
}
/**
* We've received some leftovers from the previous [ChannelManager]. Add them to our list.
*/
private suspend fun doAddLefovers(leftovers: List<ChannelEntry<T>>) {
val wasEmpty = channels.isEmpty()
leftovers.forEach { channelEntry ->
addEntry(
entry = channelEntry.copy(_receivedValue = false)
)
}
activateIfNecessary(wasEmpty)
}
/**
* Add a new downstream collector
*/
private suspend fun doAdd(msg: Message.AddChannel<T>) {
val wasEmpty = channels.isEmpty()
addEntry(
entry = ChannelEntry(
channel = msg.channel
)
)
activateIfNecessary(wasEmpty)
}
private fun activateIfNecessary(wasEmpty: Boolean) {
if (producer == null && wasEmpty) {
producer = onActive(this)
dispatchedValue = false
producer!!.start()
}
}
/**
* Internally add the new downstream collector to our list, send it anything buffered.
*/
private suspend fun addEntry(entry: ChannelEntry<T>) {
val new = channels.none {
it.hasChannel(entry)
}
check(new) {
"$entry is already in the list."
}
check(!entry.receivedValue) {
"$entry already received a value"
}
channels.add(entry)
if (buffer.items.isNotEmpty()) {
// if there is anything in the buffer, send it
buffer.items.forEach {
entry.dispatchValue(it)
}
}
}
/**
* Holder for each downstream collector
*/
internal data class ChannelEntry<T>(
/**
* The channel used by the collector
*/
private val channel: Channel<Message.DispatchValue<T>>,
/**
* Tracking whether we've ever dispatched a value or an error to downstream
*/
private var _receivedValue: Boolean = false
) {
val receivedValue
get() = _receivedValue
suspend fun dispatchValue(value: Message.DispatchValue<T>) {
_receivedValue = true
channel.send(value)
}
fun dispatchError(error: Throwable) {
_receivedValue = true
channel.close(error)
}
fun close() {
channel.close()
}
fun hasChannel(channel: Channel<Message.DispatchValue<T>>) = this.channel === channel
fun hasChannel(entry: ChannelEntry<T>) = this.channel === entry.channel
}
/**
* Messages accepted by the [ChannelManager].
*/
sealed class Message<T> {
/**
* Add a new channel, that means a new downstream subscriber
*/
class AddChannel<T>(
val channel: Channel<DispatchValue<T>>
) : Message<T>()
/**
* Add multiple channels. Happens when we are carrying over leftovers from a previous
* manager
*/
internal class AddLeftovers<T>(val leftovers: List<ChannelEntry<T>>) :
Message<T>()
/**
* Remove a downstream subscriber, that means it completed
*/
class RemoveChannel<T>(val channel: Channel<DispatchValue<T>>) : Message<T>()
/**
* Upstream dispatched a new value, send it to all downstream items
*/
class DispatchValue<T>(
/**
* The value dispatched by the upstream
*/
val value: T,
/**
* Ack that is completed by all receiver. Upstream producer will await this before asking
* for a new value from upstream
*/
val delivered: CompletableDeferred<Unit>
) : Message<T>()
/**
* Upstream dispatched an error, send it to all downstream items
*/
class DispatchError<T>(
/**
* The error sent by the upstream
*/
val error: Throwable
) : Message<T>()
class UpstreamFinished<T>(
/**
* SharedFlowProducer finished emitting
*/
val producer: SharedFlowProducer<T>
) : Message<T>()
}
/**
* Buffer implementation for any late arrivals.
*/
private interface Buffer<T> {
fun add(item: Message.DispatchValue<T>)
val items: Collection<Message.DispatchValue<T>>
}
/**
* Default implementation of buffer which does not buffer anything.
*/
private class NoBuffer<T> : Buffer<T> {
override val items: Collection<Message.DispatchValue<T>>
get() = Collections.emptyList()
override fun add(item: Message.DispatchValue<T>) {
// ignore
}
}
/**
* Create a new buffer insteance based on the provided limit.
*/
private fun <T> Buffer(limit: Int): Buffer<T> = if (limit > 0) {
BufferImpl(limit)
} else {
NoBuffer()
}
/**
* A real buffer implementation that has a FIFO queue.
*/
private class BufferImpl<T>(private val limit: Int) :
Buffer<T> {
override val items = ArrayDeque<Message.DispatchValue<T>>(limit.coerceAtMost(10))
override fun add(item: Message.DispatchValue<T>) {
while (items.size >= limit) {
items.pollFirst()
}
items.offerLast(item)
}
}
}

View file

@ -0,0 +1,81 @@
package com.nytimes.android.external.store4.multiplex
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.transform
/**
* Like a publish, shares 1 upstream value with multiple downstream receiver.
* It has one store specific behavior where upstream flow is suspended until at least 1 downstream
* flow emits the value to ensure we don't abuse the upstream flow of downstream cannot keep up.
*/
@FlowPreview
@ExperimentalCoroutinesApi
internal class Multiplexer<T>(
/**
* The [CoroutineScope] to use for upstream subscription
*/
private val scope: CoroutineScope,
/**
* The buffer size that is used only if the upstream has not complete yet.
* Defaults to 0.
*/
bufferSize: Int = 0,
/**
* Source function to create a new flow when necessary.
*/
// TODO does this have to be a method or just a flow ? Will decide when actual implementation
// happens
private val source: () -> Flow<T>,
/**
* Called when upstream dispatches a value.
*/
private val onEach: suspend (T) -> Unit
) {
private val channelManager by lazy(LazyThreadSafetyMode.SYNCHRONIZED) {
ChannelManager<T>(
scope = scope,
bufferSize = bufferSize,
onActive = {
SharedFlowProducer(
scope = scope,
src = source(),
channelManager = it
)
},
onEach = onEach
)
}
fun create(): Flow<T> {
val channel = Channel<ChannelManager.Message.DispatchValue<T>>(Channel.UNLIMITED)
return channel.consumeAsFlow()
.onStart {
channelManager.send(
ChannelManager.Message.AddChannel(
channel
))
}
.transform {
emit(it.value)
it.delivered.complete(Unit)
}.onCompletion {
channelManager.send(
ChannelManager.Message.RemoveChannel(
channel
)
)
}
}
fun close() {
channelManager.close()
}
}

View file

@ -0,0 +1,88 @@
package com.nytimes.android.external.store4.multiplex
import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.DispatchError
import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.DispatchValue
import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.UpstreamFinished
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import java.nio.channels.ClosedChannelException
/**
* A flow collector that works with a [ChannelManager] to collect values from an upstream flow
* and dispatch to the [ChannelManager] which then dispatches to downstream collectors.
*
* They work in sync such that this producer always expects an ack from the [ChannelManager] after
* sending an event.
*
* Cancellation of the collection might be triggered by both this producer (e.g. upstream completes)
* or the [ChannelManager] (e.g. all active collectors complete).
*/
@ExperimentalCoroutinesApi
class SharedFlowProducer<T>(
private val scope: CoroutineScope,
private val src: Flow<T>,
private val channelManager: ChannelManager<T>
) {
private lateinit var collectionJob: Job
/**
* Starts the collection of the upstream flow.
*/
fun start() {
scope.launch {
try {
// launch again to track the collection job
collectionJob = scope.launch {
try {
src.catch {
channelManager.send(
DispatchError(
it
)
)
}.collect {
val ack = CompletableDeferred<Unit>()
channelManager.send(
DispatchValue(
it,
ack
)
)
// suspend until at least 1 receives the new value
ack.await()
}
} catch (closed: ClosedSendChannelException) {
// ignore. if consumers are gone, it might close itself.
}
}
// wait until collection ends, either due to an error or ordered by the channel
// manager
collectionJob.join()
} finally {
// cleanup the channel manager so that downstreams can be closed if they are not
// closed already and leftovers can be moved to a new producer if necessary.
try {
channelManager.send(UpstreamFinished(this@SharedFlowProducer))
} catch (closed : ClosedSendChannelException) {
// it might close before us, its fine.
}
}
}
}
suspend fun cancelAndJoin() {
collectionJob.cancelAndJoin()
}
fun cancel() {
collectionJob.cancel()
}
}

View file

@ -0,0 +1,46 @@
package com.nytimes.android.external.store4.multiplex
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.actor
/**
* Simple actor implementation abstracting away Coroutine.actor since it is deprecated.
* It also enforces a 0 capacity buffer.
*/
@Suppress("EXPERIMENTAL_API_USAGE")
@ExperimentalCoroutinesApi
abstract class StoreRealActor<T>(
scope: CoroutineScope
) {
val inboundChannel: SendChannel<T>
init {
inboundChannel = scope.actor(
capacity = 0
) {
channel.invokeOnClose {
onClosed()
}
for (msg in channel) {
handle(msg)
}
}
}
open fun onClosed() {
}
abstract suspend fun handle(msg: T)
suspend fun send(msg: T) {
inboundChannel.send(msg)
}
fun close() {
inboundChannel.close()
}
}

View file

@ -12,12 +12,12 @@ import org.junit.runners.Parameterized
@ExperimentalCoroutinesApi
@RunWith(Parameterized::class)
class ClearStoreMemoryTest(
storeType : TestStoreType
storeType: TestStoreType
) {
private val testScope = TestCoroutineScope()
private var networkCalls = 0
private val store = TestStoreBuilder.from<BarCode, Int> {
networkCalls ++
private val store = TestStoreBuilder.from<BarCode, Int>(testScope) {
networkCalls++
}.build(storeType)
@Test

View file

@ -22,6 +22,7 @@ class ClearStoreTest(
private val networkCalls = AtomicInteger(0)
private val store = TestStoreBuilder.from(
scope = testScope,
fetcher = {
networkCalls.incrementAndGet()
},

View file

@ -2,18 +2,24 @@ package com.nytimes.android.external.store3
import com.nytimes.android.external.store3.base.impl.BarCode
import junit.framework.Assert.fail
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import kotlin.coroutines.EmptyCoroutineContext
@RunWith(Parameterized::class)
class DontCacheErrorsTest(
storeType: TestStoreType
) {
private val testScope = TestCoroutineScope()
private var shouldThrow: Boolean = false
private val store = TestStoreBuilder.from<BarCode, Int> {
// TODO move to test coroutine scope
private val store = TestStoreBuilder.from<BarCode, Int>(testScope) {
if (shouldThrow) {
throw RuntimeException()
} else {
@ -22,7 +28,7 @@ class DontCacheErrorsTest(
}.build(storeType)
@Test
fun testStoreDoesntCacheErrors() = runBlocking<Unit> {
fun testStoreDoesntCacheErrors() = testScope.runBlockingTest {
val barcode = BarCode("bar", "code")
shouldThrow = true

View file

@ -15,6 +15,7 @@ class KeyParserTest(
private val testScope = TestCoroutineScope()
private val store = TestStoreBuilder.from(
scope = testScope,
fetcher = {
NETWORK
},

View file

@ -14,10 +14,11 @@ import org.junit.runners.Parameterized
class NoNetworkTest(
storeType: TestStoreType
) {
private val store: Store<out Any, BarCode> = TestStoreBuilder.from<BarCode, Any> {
private val testScope = TestCoroutineScope()
private val store: Store<out Any, BarCode> = TestStoreBuilder.from<BarCode, Any>(testScope) {
throw EXCEPTION
}.build(storeType)
private val testScope = TestCoroutineScope()
@Test
fun testNoNetwork() = testScope.runBlockingTest {

View file

@ -30,6 +30,7 @@ class ParsingFetcherTest(
@Test
fun testPersistFetcher() = testScope.runBlockingTest {
val simpleStore = TestStoreBuilder.from(
scope = testScope,
fetcher = fetcher,
fetchParser = parser,
persister = persister

View file

@ -17,12 +17,16 @@ import kotlin.random.Random
@RunWith(Parameterized::class)
class SequentialTes(
storeType: TestStoreType) {
storeType: TestStoreType
) {
private val testScope = TestCoroutineScope()
var networkCalls = 0
private val store = TestStoreBuilder.from<BarCode, Int>(cached = true) {
networkCalls ++
private val store = TestStoreBuilder.from<BarCode, Int>(
scope = testScope,
cached = true
) {
networkCalls++
}.build(storeType)
@Test

View file

@ -41,6 +41,7 @@ class StoreTest(
@Test
fun testSimple() = testScope.runBlockingTest {
val simpleStore = TestStoreBuilder.from(
scope = testScope,
fetcher = fetcher,
persister = persister
).build(storeType)
@ -66,6 +67,7 @@ class StoreTest(
@Test
fun testDoubleTap() = testScope.runBlockingTest {
val simpleStore = TestStoreBuilder.from(
scope = testScope,
fetcher = fetcher,
persister = persister
).build(storeType)
@ -97,6 +99,7 @@ class StoreTest(
fun testSubclass() = testScope.runBlockingTest {
val simpleStore = TestStoreBuilder.from(
scope = testScope,
fetcher = fetcher,
persister = persister
).build(storeType)
@ -122,6 +125,7 @@ class StoreTest(
fun testNoopAndDefault() = testScope.runBlockingTest {
val persister = spy(NoopPersister.create<String, BarCode>())
val simpleStore = TestStoreBuilder.from(
scope = testScope,
fetcher = fetcher,
persister = persister,
cached = true
@ -163,6 +167,7 @@ class StoreTest(
@Test
fun testFreshUsesOnlyNetwork() = testScope.runBlockingTest {
val simpleStore = TestStoreBuilder.from(
scope = testScope,
fetcher = fetcher,
persister = persister,
persisterStalePolicy = StalePolicy.NETWORK_BEFORE_STALE

View file

@ -29,6 +29,7 @@ class StoreThrowOnNoItems(
@Test
fun testShouldThrowOnFetcherEmitsNoSuckElementException() = testScope.runBlockingTest {
val simpleStore = TestStoreBuilder.from(
scope = testScope,
fetcher = fetcher
).build(storeType)
@ -36,8 +37,8 @@ class StoreThrowOnNoItems(
.thenThrow(NoSuchElementException())
try {
simpleStore.get(barCode)
fail("exception not thrown when no items emitted from fetcher")
val unexpected = simpleStore.get(barCode)
fail("exception not thrown when no items emitted from fetcher $unexpected")
} catch (e: NoSuchElementException) {
assertThat(e).isInstanceOf(NoSuchElementException::class.java)
}

View file

@ -31,6 +31,7 @@ class StoreWithParserTest(
@Test
fun testSimple() = testScope.runBlockingTest {
val simpleStore = TestStoreBuilder.fromPostParser(
scope = testScope,
fetcher = fetcher,
persister = persister,
postParser = parser
@ -58,6 +59,7 @@ class StoreWithParserTest(
@Test
fun testSubclass() = testScope.runBlockingTest {
val simpleStore = TestStoreBuilder.fromPostParser(
scope = testScope,
fetcher = fetcher,
persister = persister,
postParser = parser

View file

@ -24,12 +24,14 @@ class StreamOneKeyTest(
val persister: Persister<String, BarCode> = mock()
private val barCode = BarCode("key", "value")
private val barCode2 = BarCode("key2", "value2")
private val testScope = TestCoroutineScope()
private val store = TestStoreBuilder.from(
scope = testScope,
fetcher = fetcher,
persister = persister
).build(storeType)
private val testScope = TestCoroutineScope()
@Before
fun setUp() = runBlockingTest {
@ -41,7 +43,7 @@ class StreamOneKeyTest(
.let {
// the backport stream method of Pipeline to Store does not skip disk so we
// make sure disk returns empty value first
if (storeType == TestStoreType.Pipeline) {
if (storeType != TestStoreType.Store) {
it.thenReturn(null)
} else {
it
@ -58,7 +60,7 @@ class StreamOneKeyTest(
@Suppress("UsePropertyAccessSyntax") // for assert isTrue() isFalse()
@Test
fun testStream() = runBlockingTest {
fun testStream() = testScope.runBlockingTest {
val streamSubscription = store.stream(barCode)
.openChannelSubscription()
try {

View file

@ -35,6 +35,7 @@ class StreamTest(
private val barCode = BarCode("key", "value")
private val store = TestStoreBuilder.from(
scope = testScope,
fetcher = fetcher,
persister = persister
).build(storeType)
@ -54,7 +55,7 @@ class StreamTest(
@Suppress("UsePropertyAccessSyntax")// for isTrue() / isFalse()
@Test
fun testStream() = testScope.runBlockingTest {
if (storeType == TestStoreType.Pipeline) {
if (storeType != TestStoreType.Store) {
throw AssumptionViolatedException("Pipeline store does not support stream() no arg")
}
val streamSubscription = store.stream().openChannelSubscription()
@ -70,7 +71,7 @@ class StreamTest(
@Suppress("UsePropertyAccessSyntax")// for isTrue() / isFalse()
@Test
fun testStreamEmitsOnlyFreshData() = testScope.runBlockingTest {
if (storeType == TestStoreType.Pipeline) {
if (storeType != TestStoreType.Store) {
throw AssumptionViolatedException("Pipeline store does not support stream() no arg")
}
store.get(barCode)

View file

@ -16,22 +16,29 @@ import com.nytimes.android.external.store3.pipeline.withCache
import com.nytimes.android.external.store3.pipeline.withKeyConverter
import com.nytimes.android.external.store3.pipeline.withNonFlowPersister
import com.nytimes.android.external.store3.util.KeyParser
import com.nytimes.android.external.store4.RealInternalCoroutineStore
import com.nytimes.android.external.store4.SourceOfTruth
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.flow
data class TestStoreBuilder<Key, Output>(
private val buildStore: () -> Store<Output, Key>,
private val buildPipelineStore: () -> Store<out Output, Key>
private val buildPipelineStore: () -> Store<out Output, Key>,
private val builRealInternalCoroutineStore: () -> Store<Output, Key>
) {
fun build(storeType: TestStoreType): Store<out Output, Key> = when (storeType) {
TestStoreType.Store -> buildStore()
TestStoreType.Pipeline -> buildPipelineStore()
TestStoreType.CoroutineInternal -> builRealInternalCoroutineStore()
}
companion object {
fun <Key, Output> from(
scope: CoroutineScope,
inflight: Boolean = true,
fetcher: suspend (Key) -> Output
): TestStoreBuilder<Key, Output> = from(
scope = scope,
inflight = inflight,
persister = null,
fetchParser = null,
@ -39,10 +46,12 @@ data class TestStoreBuilder<Key, Output>(
)
fun <Key, Output> from(
scope: CoroutineScope,
inflight: Boolean = true,
fetcher: suspend (Key) -> Output,
fetchParser : KeyParser<Key, Output, Output>
fetchParser: KeyParser<Key, Output, Output>
): TestStoreBuilder<Key, Output> = from(
scope = scope,
inflight = inflight,
persister = null,
fetchParser = fetchParser,
@ -50,11 +59,13 @@ data class TestStoreBuilder<Key, Output>(
)
fun <Key, Output> from(
scope: CoroutineScope,
inflight: Boolean = true,
fetcher: Fetcher<Output, Key>,
fetchParser : Parser<Output, Output>,
fetchParser: Parser<Output, Output>,
persister: Persister<Output, Key>
): TestStoreBuilder<Key, Output> = from(
scope = scope,
inflight = inflight,
persister = persister,
fetchParser = object : KeyParser<Key, Output, Output> {
@ -66,11 +77,13 @@ data class TestStoreBuilder<Key, Output>(
)
fun <Key, Output> fromPostParser(
scope: CoroutineScope,
inflight: Boolean = true,
fetcher: Fetcher<Output, Key>,
postParser : Parser<Output, Output>,
postParser: Parser<Output, Output>,
persister: Persister<Output, Key>
): TestStoreBuilder<Key, Output> = from(
scope = scope,
inflight = inflight,
persister = persister,
postParser = object : KeyParser<Key, Output, Output> {
@ -83,14 +96,16 @@ data class TestStoreBuilder<Key, Output>(
@Suppress("UNCHECKED_CAST")
fun <Key, Output> from(
scope: CoroutineScope,
inflight: Boolean = true,
cached : Boolean = false,
cached: Boolean = false,
cacheMemoryPolicy: MemoryPolicy? = null,
persister: Persister<Output, Key>? = null,
persisterStalePolicy: StalePolicy = StalePolicy.UNSPECIFIED,
fetchParser: KeyParser<Key, Output, Output>? = null,
fetcher: suspend (Key) -> Output
): TestStoreBuilder<Key, Output> = from(
scope = scope,
inflight = inflight,
cached = cached,
cacheMemoryPolicy = cacheMemoryPolicy,
@ -101,10 +116,12 @@ data class TestStoreBuilder<Key, Output>(
override suspend fun fetch(key: Key): Output = fetcher(key)
}
)
@Suppress("UNCHECKED_CAST")
fun <Key, Output> from(
scope: CoroutineScope,
inflight: Boolean = true,
cached : Boolean = false,
cached: Boolean = false,
cacheMemoryPolicy: MemoryPolicy? = null,
persister: Persister<Output, Key>? = null,
persisterStalePolicy: StalePolicy = StalePolicy.UNSPECIFIED,
@ -195,6 +212,35 @@ data class TestStoreBuilder<Key, Output>(
}
}
}.open()
},
builRealInternalCoroutineStore = {
RealInternalCoroutineStore
.beginWithFlowingFetcher<Key, Output, Output> { key: Key ->
flow {
val value = fetcher.fetch(key = key)
if (fetchParser != null) {
emit(fetchParser.apply(key, value))
} else {
emit(value)
}
}
}
.scope(scope)
.let {
if (persister == null) {
it
} else {
it.sourceOfTruth(SourceOfTruth.fromLegacy(persister, postParser))
}
}
.let {
if (cached) {
it
} else {
it.disableCache()
}
}
.build().asLegacyStore()
}
)
}
@ -202,13 +248,14 @@ data class TestStoreBuilder<Key, Output>(
// wraps a regular fun to suspend, couldn't figure out how to create suspend fun variables :/
private class SuspendWrapper<P0, R>(
val f : (P0) -> R
val f: (P0) -> R
) {
suspend fun apply(input : P0) : R = f(input)
suspend fun apply(input: P0): R = f(input)
}
}
enum class TestStoreType {
Store,
Pipeline
Pipeline,
CoroutineInternal
}

View file

@ -1,10 +1,12 @@
package com.nytimes.android.external.store3.pipeline
import com.nytimes.android.external.store3.TestStoreType
import com.nytimes.android.external.store3.pipeline.ResponseOrigin.Cache
import com.nytimes.android.external.store3.pipeline.ResponseOrigin.Fetcher
import com.nytimes.android.external.store3.pipeline.ResponseOrigin.Persister
import com.nytimes.android.external.store3.pipeline.StoreResponse.Data
import com.nytimes.android.external.store3.pipeline.StoreResponse.Loading
import com.nytimes.android.external.store4.RealInternalCoroutineStore
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
@ -14,17 +16,18 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.junit.runners.Parameterized
@ExperimentalCoroutinesApi
@RunWith(JUnit4::class)
class PipelineStoreTest {
@RunWith(Parameterized::class)
class PipelineStoreTest(
private val storeType: TestStoreType
) {
private val testScope = TestCoroutineScope()
@Test
@ -33,10 +36,12 @@ class PipelineStoreTest {
3 to "three-1",
3 to "three-2"
)
val pipeline = beginNonFlowingPipeline(fetcher::fetch)
.withCache()
val pipeline = build<Int, String, String>(
nonFlowingFetcher = fetcher::fetch,
enableCache = true
)
pipeline.stream(StoreRequest.cached(3, refresh = false))
.assertCompleteStream(
.assertItems(
Loading(
origin = Fetcher
), Data(
@ -45,7 +50,7 @@ class PipelineStoreTest {
)
)
pipeline.stream(StoreRequest.cached(3, refresh = false))
.assertCompleteStream(
.assertItems(
Data(
value = "three-1",
origin = Cache
@ -71,18 +76,18 @@ class PipelineStoreTest {
}
@Test
fun getAndFresh_withPersister() = runBlocking<Unit> {
fun getAndFresh_withPersister() = testScope.runBlockingTest {
val fetcher = FakeFetcher(
3 to "three-1",
3 to "three-2"
)
val persister = InMemoryPersister<Int, String>()
val pipeline = beginNonFlowingPipeline(fetcher::fetch)
.withNonFlowPersister(
reader = persister::read,
writer = persister::write
val pipeline = build(
nonFlowingFetcher = fetcher::fetch,
persisterReader = persister::read,
persisterWriter = persister::write,
enableCache = true
)
.withCache()
pipeline.stream(StoreRequest.cached(3, refresh = false))
.assertItems(
Loading(
@ -127,12 +132,12 @@ class PipelineStoreTest {
)
val persister = InMemoryPersister<Int, String>()
val pipeline = beginNonFlowingPipeline(fetcher::fetch)
.withNonFlowPersister(
reader = persister::read,
writer = persister::write
val pipeline = build(
nonFlowingFetcher = fetcher::fetch,
persisterReader = persister::read,
persisterWriter = persister::write,
enableCache = true
)
.withCache()
pipeline.stream(StoreRequest.cached(3, refresh = true))
.assertItems(
@ -171,11 +176,13 @@ class PipelineStoreTest {
3 to "three-1",
3 to "three-2"
)
val pipeline = beginNonFlowingPipeline(fetcher::fetch)
.withCache()
val pipeline = build<Int, String, String>(
nonFlowingFetcher = fetcher::fetch,
enableCache = true
)
pipeline.stream(StoreRequest.cached(3, refresh = true))
.assertCompleteStream(
.assertItems(
Loading(
origin = Fetcher
),
@ -186,7 +193,7 @@ class PipelineStoreTest {
)
pipeline.stream(StoreRequest.cached(3, refresh = true))
.assertCompleteStream(
.assertItems(
Data(
value = "three-1",
origin = Cache
@ -207,11 +214,13 @@ class PipelineStoreTest {
3 to "three-1",
3 to "three-2"
)
val pipeline = beginNonFlowingPipeline(fetcher::fetch)
.withCache()
val pipeline = build<Int, String, String>(
nonFlowingFetcher = fetcher::fetch,
enableCache = true
)
pipeline.stream(StoreRequest.skipMemory(3, refresh = false))
.assertCompleteStream(
.assertItems(
Loading(
origin = Fetcher
),
@ -222,7 +231,7 @@ class PipelineStoreTest {
)
pipeline.stream(StoreRequest.skipMemory(3, refresh = false))
.assertCompleteStream(
.assertItems(
Loading(
origin = Fetcher
),
@ -241,11 +250,13 @@ class PipelineStoreTest {
)
val persister = InMemoryPersister<Int, String>()
val pipeline = beginPipeline(fetcher::createFlow)
.withNonFlowPersister(
reader = persister::read,
writer = persister::write
val pipeline = build(
flowingFetcher = fetcher::createFlow,
persisterReader = persister::read,
persisterWriter = persister::write,
enableCache = false
)
pipeline.stream(StoreRequest.fresh(3))
.assertItems(
Loading(
@ -260,8 +271,8 @@ class PipelineStoreTest {
origin = Fetcher
)
)
pipeline.stream(StoreRequest.cached(3, refresh = true)).assertItems(
pipeline.stream(StoreRequest.cached(3, refresh = true))
.assertItems(
Data(
value = "three-2",
origin = Persister
@ -283,14 +294,17 @@ class PipelineStoreTest {
@Test
fun diskChangeWhileNetworkIsFlowing_simple() = testScope.runBlockingTest {
val persister = InMemoryPersister<Int, String>().asObservable()
val pipeline = beginPipeline<Int, String> {
val pipeline = build(
flowingFetcher = {
flow {
// never emit
}
}.withPersister(
reader = persister::flowReader,
writer = persister::flowWriter
},
flowingPersisterReader = persister::flowReader,
persisterWriter = persister::flowWriter,
enableCache = false
)
launch {
delay(10)
persister.flowWriter(3, "local-1")
@ -311,16 +325,18 @@ class PipelineStoreTest {
@Test
fun diskChangeWhileNetworkIsFlowing_overwrite() = testScope.runBlockingTest {
val persister = InMemoryPersister<Int, String>().asObservable()
val pipeline = beginPipeline<Int, String> {
val pipeline = build(
flowingFetcher = {
flow {
delay(10)
emit("three-1")
delay(10)
emit("three-2")
}
}.withPersister(
reader = persister::flowReader,
writer = persister::flowWriter
},
flowingPersisterReader = persister::flowReader,
persisterWriter = persister::flowWriter,
enableCache = false
)
launch {
delay(5)
@ -357,11 +373,13 @@ class PipelineStoreTest {
fun errorTest() = testScope.runBlockingTest {
val exception = IllegalArgumentException("wow")
val persister = InMemoryPersister<Int, String>().asObservable()
val pipeline = beginNonFlowingPipeline<Int, String> { key: Int ->
val pipeline = build(
nonFlowingFetcher = {
throw exception
}.withPersister(
reader = persister::flowReader,
writer = persister::flowWriter
},
flowingPersisterReader = persister::flowReader,
persisterWriter = persister::flowWriter,
enableCache = false
)
launch {
delay(10)
@ -420,8 +438,11 @@ class PipelineStoreTest {
responses.filter {
it.first == key
}.forEach {
emit(it.second)
// we delay here to avoid collapsing fetcher values, otherwise, there is a
// possibility that consumer won't be fast enough to get both values before new
// value overrides the previous one.
delay(1)
emit(it.second)
}
}
}
@ -441,7 +462,7 @@ class PipelineStoreTest {
}
}
private class InMemoryPersister<Key, Output> {
class InMemoryPersister<Key, Output> {
private val data = mutableMapOf<Key, Output>()
@Suppress("RedundantSuspendModifier")// for function reference
@ -468,12 +489,93 @@ class PipelineStoreTest {
.isEqualTo(expected.toList())
}
/**
* Takes all elements from the stream and asserts them.
* Use this if test does not have an infinite flow (e.g. no persister or no infinite fetcher)
*/
private suspend fun <T> Flow<T>.assertCompleteStream(vararg expected: T) {
assertThat(this.toList())
.isEqualTo(expected.toList())
private fun <Key, Input, Output> build(
nonFlowingFetcher: (suspend (Key) -> Input)? = null,
flowingFetcher: ((Key) -> Flow<Input>)? = null,
persisterReader: (suspend (Key) -> Output?)? = null,
flowingPersisterReader: ((Key) -> Flow<Output?>)? = null,
persisterWriter: (suspend (Key, Input) -> Unit)? = null,
persisterDelete: (suspend (Key) -> Unit)? = null,
enableCache: Boolean
): PipelineStore<Key, Output> {
check(nonFlowingFetcher != null || flowingFetcher != null) {
"need to provide a fetcher"
}
check(nonFlowingFetcher == null || flowingFetcher == null) {
"need 1 fetcher"
}
check(persisterReader == null || flowingPersisterReader == null) {
"need 0 or 1 persister"
}
if (storeType == TestStoreType.Pipeline) {
return if (nonFlowingFetcher != null) {
beginNonFlowingPipeline(nonFlowingFetcher)
} else {
beginPipeline(flowingFetcher!!)
}.let {
when {
flowingPersisterReader != null -> it.withPersister(
reader = flowingPersisterReader,
writer = persisterWriter!!,
delete = persisterDelete
)
persisterReader != null -> it.withNonFlowPersister(
reader = persisterReader,
writer = persisterWriter!!,
delete = persisterDelete
)
else -> it as PipelineStore<Key, Output>
}
}.let {
if (enableCache) {
it.withCache()
} else {
it
}
}
} else if (storeType == TestStoreType.CoroutineInternal) {
return if (nonFlowingFetcher != null) {
RealInternalCoroutineStore.beginWithNonFlowingFetcher<Key, Input, Output>(
nonFlowingFetcher
)
} else {
RealInternalCoroutineStore.beginWithFlowingFetcher<Key, Input, Output>(
flowingFetcher!!
)
}.let {
when {
flowingPersisterReader != null -> it.persister(
reader = flowingPersisterReader,
writer = persisterWriter!!,
delete = persisterDelete
)
persisterReader != null -> it.nonFlowingPersister(
reader = persisterReader,
writer = persisterWriter!!,
delete = persisterDelete
)
else -> it
}
}.let {
if (enableCache) {
it
} else {
it.disableCache()
}
}.scope(testScope)
.build()
} else {
throw UnsupportedOperationException("cannot test $storeType")
}
}
companion object {
@JvmStatic
@Parameterized.Parameters(name = "{0}")
fun params() = listOf(
TestStoreType.Pipeline,
TestStoreType.CoroutineInternal
)
}
}

View file

@ -0,0 +1,78 @@
package com.nytimes.android.external.store4
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
@ExperimentalCoroutinesApi
@FlowPreview
@RunWith(JUnit4::class)
class FetcherControllerTest {
private val testScope = TestCoroutineScope()
@Test
fun simple() = testScope.runBlockingTest {
val fetcherController = FetcherController<Int, Int, Int>(
scope = testScope,
realFetcher = { key: Int ->
flow {
emit(key * key)
}
},
sourceOfTruth = null
)
val fetcher = fetcherController.getFetcher(3)
assertThat(fetcherController.fetcherSize()).isEqualTo(0)
val received = fetcher.onEach {
assertThat(fetcherController.fetcherSize()).isEqualTo(1)
}.first()
assertThat(received).isEqualTo(9)
assertThat(fetcherController.fetcherSize()).isEqualTo(0)
}
@Test
fun concurrent() = testScope.runBlockingTest {
var createdCnt = 0
val fetcherController = FetcherController<Int, Int, Int>(
scope = testScope,
realFetcher = { key: Int ->
createdCnt ++
flow {
// make sure it takes time, otherwise, we may not share
delay(1)
emit(key * key)
}
},
sourceOfTruth = null
)
val fetcherCount = 20
fun createFetcher() = async {
fetcherController.getFetcher(3)
.onEach {
assertThat(fetcherController.fetcherSize()).isEqualTo(1)
}.first()
}
val fetchers = (0 until fetcherCount).map {
createFetcher()
}
fetchers.forEach {
assertThat(it.await()).isEqualTo(9)
}
assertThat(fetcherController.fetcherSize()).isEqualTo(0)
assertThat(createdCnt).isEqualTo(1)
}
}

View file

@ -0,0 +1,66 @@
package com.nytimes.android.external.store4
import com.nytimes.android.external.store3.pipeline.PipelineStoreTest
import com.nytimes.android.external.store3.pipeline.ResponseOrigin
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
@FlowPreview
@ExperimentalCoroutinesApi
class SourceOfTruthWithBarrierTest {
private val testScope = TestCoroutineScope()
private val persister = PipelineStoreTest.InMemoryPersister<Int, String>()
private val delegate: SourceOfTruth<Int, String, String> =
PersistentSourceOfTruth(
realReader = { key ->
flow {
emit(persister.read(key))
}
},
realWriter = persister::write,
realDelete = null
)
private val source = SourceOfTruthWithBarrier(
delegate = delegate
)
@Test
fun simple() = testScope.runBlockingTest {
val collector = async {
source.reader(1, CompletableDeferred(Unit)).take(2).toList()
}
source.write(1, "a")
assertThat(collector.await()).isEqualTo(
listOf(
DataWithOrigin(delegate.defaultOrigin, null),
DataWithOrigin(ResponseOrigin.Fetcher, "a")
)
)
assertThat(source.barrierCount()).isEqualTo(0)
}
@Test
fun preAndPostWrites() = testScope.runBlockingTest {
source.write(1, "a")
val collector = async {
source.reader(1, CompletableDeferred(Unit)).take(2).toList()
}
source.write(1, "b")
assertThat(collector.await()).isEqualTo(
listOf(
DataWithOrigin(delegate.defaultOrigin, "a"),
DataWithOrigin(ResponseOrigin.Fetcher, "b")
)
)
assertThat(source.barrierCount()).isEqualTo(0)
}
}

View file

@ -0,0 +1,63 @@
package com.nytimes.android.external.store4.multiplex
import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.AddChannel
import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.DispatchValue
import com.nytimes.android.external.store4.multiplex.ChannelManager.Message.RemoveChannel
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
@FlowPreview
@ExperimentalCoroutinesApi
@RunWith(JUnit4::class)
class ChannelManagerTest {
private val scope = TestCoroutineScope()
private val manager = ChannelManager<String>(
scope,
0,
onEach = {}
) {
SharedFlowProducer(
scope, src =
flow {
suspendCancellableCoroutine<String> {
// never end
}
},
channelManager = it
)
}
@Test
fun simple() = scope.runBlockingTest {
val collection = async {
val channel = Channel<DispatchValue<String>>(Channel.UNLIMITED)
try {
manager.send(AddChannel(channel))
channel.consumeAsFlow().take(2).toList()
.map { it.value }
} finally {
manager.send(RemoveChannel(channel))
}
}
val ack1 = CompletableDeferred<Unit>()
manager.send(DispatchValue("a", ack1))
val ack2 = CompletableDeferred<Unit>()
manager.send(DispatchValue("b", ack2))
assertThat(collection.await()).isEqualTo(listOf("a", "b"))
}
}

View file

@ -0,0 +1,300 @@
package com.nytimes.android.external.store4.multiplex
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import kotlinx.coroutines.yield
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
@FlowPreview
@ExperimentalCoroutinesApi
@RunWith(JUnit4::class)
class MultiplexTest {
private val testScope = TestCoroutineScope()
private fun <T> createMultiplexer(f: () -> Flow<T>): Multiplexer<T> {
return Multiplexer(testScope, 0, f, {})
}
@Test
fun serialial_notShared() = testScope.runBlockingTest {
var createCnt = 0
val activeFlow = createMultiplexer {
createCnt++
when (createCnt) {
1 -> flowOf("a", "b", "c")
2 -> flowOf("d", "e", "f")
else -> throw AssertionError("should not create more")
}
}
assertThat(activeFlow.create().toList())
.isEqualTo(listOf("a", "b", "c"))
assertThat(activeFlow.create().toList())
.isEqualTo(listOf("d", "e", "f"))
}
@Test
fun slowFastCollector() = testScope.runBlockingTest {
val activeFlow = createMultiplexer {
flowOf("a", "b", "c").onStart {
// make sure both registers on time so that no one drops a value
delay(100)
}
}
val c1 = async {
activeFlow.create().onEach {
delay(100)
}.toList()
}
val c2 = async {
activeFlow.create().onEach {
delay(200)
}.toList()
}
assertThat(c1.await())
.isEqualTo(listOf("a", "b", "c"))
assertThat(c2.await())
.isEqualTo(listOf("a", "b", "c"))
}
@Test
fun slowDispatcher() = testScope.runBlockingTest {
val activeFlow = createMultiplexer {
flowOf("a", "b", "c").onEach {
delay(100)
}
}
val c1 = async {
activeFlow.create().toList()
}
val c2 = async {
activeFlow.create().toList()
}
assertThat(c1.await()).isEqualTo(listOf("a", "b", "c"))
assertThat(c2.await()).isEqualTo(listOf("a", "b", "c"))
}
@Test
fun lateToTheParty_arrivesAfterUpstreamClosed() = testScope.runBlockingTest {
val activeFlow = createMultiplexer {
flowOf("a", "b", "c").onStart {
delay(100)
}
}
val c1 = async {
activeFlow.create().toList()
}
val c2 = async {
activeFlow.create().also {
delay(110)
}.toList()
}
assertThat(c1.await()).isEqualTo(listOf("a", "b", "c"))
assertThat(c2.await()).isEqualTo(listOf("a", "b", "c"))
}
@Test
fun lateToTheParty_arrivesBeforeUpstreamClosed() = testScope.runBlockingTest {
var generationCounter = 0
val activeFlow = createMultiplexer {
flow {
val gen = generationCounter++
check(gen < 2) {
"created one too many"
}
emit("a_$gen")
delay(5)
emit("b_$gen")
delay(100)
}
}
val c1 = async {
activeFlow.create().onEach {
}.toList()
}
val c2 = async {
activeFlow.create().also {
delay(3)
}.toList()
}
val c3 = async {
activeFlow.create().also {
delay(20)
}.toList()
}
val lists = listOf(c1, c2, c3).map {
it.await()
}
assertThat(lists[0]).isEqualTo(listOf("a_0", "b_0"))
assertThat(lists[1]).isEqualTo(listOf("b_0"))
assertThat(lists[2]).isEqualTo(listOf("a_1", "b_1"))
}
@Test
fun upstreamError() = testScope.runBlockingTest {
val exception =
MyCustomException("hey")
val activeFlow = createMultiplexer {
flow {
emit("a")
throw exception
}
}
val receivedValue = CompletableDeferred<String>()
val receivedError = CompletableDeferred<Throwable>()
activeFlow.create()
.onEach {
check(receivedValue.isActive) {
"already received value"
}
receivedValue.complete(it)
}.catch {
check(receivedError.isActive) {
"already received error"
}
receivedError.complete(it)
}.toList()
assertThat(receivedValue.await()).isEqualTo("a")
val error = receivedError.await()
assertThat(error).isEqualTo(exception)
}
@Test
fun upstreamError_secondJustGetsError() = testScope.runBlockingTest {
val exception =
MyCustomException("hey")
val dispatchedFirstValue = CompletableDeferred<Unit>()
val registeredSecondCollector = CompletableDeferred<Unit>()
val activeFlow = createMultiplexer {
flow {
emit("a")
dispatchedFirstValue.complete(Unit)
registeredSecondCollector.await()
yield() //yield to allow second collector to register
throw exception
}
}
launch {
activeFlow.create().catch {
}.toList()
}
// wait until the above collector registers and receives first value
dispatchedFirstValue.await()
val receivedValue = CompletableDeferred<String>()
val receivedError = CompletableDeferred<Throwable>()
activeFlow.create()
.onStart {
registeredSecondCollector.complete(Unit)
}
.onEach {
receivedValue.complete(it)
}.catch {
check(receivedError.isActive) {
"already received error"
}
receivedError.complete(it)
}.toList()
val error = receivedError.await()
assertThat(error).isEqualTo(exception)
// test sanity, second collector never receives a value
assertThat(receivedValue.isActive).isTrue()
}
@Test
fun lateArrival_unregistersFromTheCorrectManager() = testScope.runBlockingTest {
var createdCount = 0
var didntFinish = false
val activeFlow = createMultiplexer {
flow {
check(createdCount < 2) {
"created 1 too many"
}
val index = ++createdCount
emit("a_$index")
emit("b_$index")
delay(100)
if (index == 2) {
didntFinish = true
}
}
}
val firstCollector = async {
activeFlow.create().onEach { delay(5) }.take(2).toList()
}
delay(11) // miss first two values
val secondCollector = async {
// this will come in a new channel
activeFlow.create().take(2).toList()
}
assertThat(firstCollector.await()).isEqualTo(listOf("a_1", "b_1"))
assertThat(secondCollector.await()).isEqualTo(listOf("a_2", "b_2"))
assertThat(createdCount).isEqualTo(2)
delay(200)
assertThat(didntFinish).isEqualTo(false)
}
@Test
fun lateArrival_buffered() = testScope.runBlockingTest {
var createdCount = 0
val activeFlow = Multiplexer(
scope = testScope,
bufferSize = 2,
source = {
createdCount++
flow {
emit("a")
delay(5)
emit("b")
emit("c")
emit("d")
delay(100)
emit("e")
// dont finish to see the buffer behavior
delay(2000)
}
},
onEach = {}
)
val c1 = async {
activeFlow.create().toList()
}
delay(4)// c2 misses first value
val c2 = async {
activeFlow.create().toList()
}
delay(50) // c3 misses first 4 values
val c3 = async {
activeFlow.create().toList()
}
delay(100) // c4 misses all values
val c4 = async {
activeFlow.create().toList()
}
assertThat(c1.await()).isEqualTo(listOf("a", "b", "c", "d", "e"))
assertThat(c2.await()).isEqualTo(listOf("a", "b", "c", "d", "e"))
assertThat(c3.await()).isEqualTo(listOf("c", "d", "e"))
assertThat(c4.await()).isEqualTo(listOf("d", "e"))
assertThat(createdCount).isEqualTo(1)
}
class MyCustomException(val x: String) : RuntimeException("hello") {
override fun toString() = "custom$x"
}
}