diff --git a/multicast/src/main/kotlin/com/dropbox/flow/multicast/Multicaster.kt b/multicast/src/main/kotlin/com/dropbox/flow/multicast/Multicaster.kt index aa62b40..4c7c75c 100644 --- a/multicast/src/main/kotlin/com/dropbox/flow/multicast/Multicaster.kt +++ b/multicast/src/main/kotlin/com/dropbox/flow/multicast/Multicaster.kt @@ -22,6 +22,8 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.transform @@ -78,26 +80,27 @@ class Multicaster( ) } - fun create(): Flow { + val flow = flow { val channel = Channel>(Channel.UNLIMITED) - return channel.consumeAsFlow() - .onStart { - channelManager.send( - ChannelManager.Message.AddChannel( - channel + val subFlow = channel.consumeAsFlow() + .onStart { + channelManager.send( + ChannelManager.Message.AddChannel( + channel + ) ) - ) - } - .transform { - emit(it.value) - it.delivered.complete(Unit) - }.onCompletion { - channelManager.send( - ChannelManager.Message.RemoveChannel( - channel + } + .transform { + emit(it.value) + it.delivered.complete(Unit) + }.onCompletion { + channelManager.send( + ChannelManager.Message.RemoveChannel( + channel + ) ) - ) - } + } + emitAll(subFlow) } suspend fun close() { diff --git a/multicast/src/test/kotlin/com/dropbox/flow/multicast/InfiniteMulticastTest.kt b/multicast/src/test/kotlin/com/dropbox/flow/multicast/InfiniteMulticastTest.kt index 6a4ed97..6e77bdd 100644 --- a/multicast/src/test/kotlin/com/dropbox/flow/multicast/InfiniteMulticastTest.kt +++ b/multicast/src/test/kotlin/com/dropbox/flow/multicast/InfiniteMulticastTest.kt @@ -67,19 +67,19 @@ class InfiniteMulticastTest { } } val c1 = async { - activeFlow.create().onEach { + activeFlow.flow.onEach { delay(100) }.take(6).toList() } val c2 = async { - activeFlow.create().onEach { + activeFlow.flow.onEach { delay(200) }.take(6).toList() } // ensure first flow finishes delay(10_000) // add another - val c3 = activeFlow.create().take(3).toList() + val c3 = activeFlow.flow.take(3).toList() assertThat(c1.await()) .isEqualTo(listOf("a0", "b0", "c0", "a1", "b1", "c1")) assertThat(c2.await()) @@ -100,19 +100,19 @@ class InfiniteMulticastTest { } } val c1 = async { - activeFlow.create().onEach { + activeFlow.flow.onEach { delay(100) }.take(6).toList() } val c2 = async { - activeFlow.create().onEach { + activeFlow.flow.onEach { delay(200) }.take(6).toList() } // ensure first flow finishes delay(10_000) // add another - val c3 = activeFlow.create().take(1).toList() + val c3 = activeFlow.flow.take(1).toList() assertThat(c1.await()) .isEqualTo(listOf("a0", "b0", "c0", "a1", "b1", "c1")) assertThat(c2.await()) @@ -133,19 +133,19 @@ class InfiniteMulticastTest { } } val c1 = async { - activeFlow.create().onEach { + activeFlow.flow.onEach { delay(100) }.take(4).toList() } val c2 = async { - activeFlow.create().onEach { + activeFlow.flow.onEach { delay(200) }.take(5).toList() } // ensure first flow finishes delay(10_000) // add another - val c3 = activeFlow.create().take(3).toList() + val c3 = activeFlow.flow.take(3).toList() assertThat(c1.await()) .isEqualTo(listOf("a0", "b0", "c0", "a1")) assertThat(c2.await()) @@ -167,19 +167,19 @@ class InfiniteMulticastTest { } } val c1 = async { - activeFlow.create().onEach { + activeFlow.flow.onEach { delay(100) }.take(4).toList() } val c2 = async { - activeFlow.create().onEach { + activeFlow.flow.onEach { delay(200) }.take(5).toList() } // ensure first flow finishes delay(10_000) // add another - val c3 = activeFlow.create().take(1).toList() + val c3 = activeFlow.flow.take(1).toList() assertThat(c1.await()) .isEqualTo(listOf("a0", "b0", "c0", "a1")) assertThat(c2.await()) diff --git a/multicast/src/test/kotlin/com/dropbox/flow/multicast/MulticastTest.kt b/multicast/src/test/kotlin/com/dropbox/flow/multicast/MulticastTest.kt index effddbd..4c5ce99 100644 --- a/multicast/src/test/kotlin/com/dropbox/flow/multicast/MulticastTest.kt +++ b/multicast/src/test/kotlin/com/dropbox/flow/multicast/MulticastTest.kt @@ -62,9 +62,9 @@ class MulticastTest { else -> throw AssertionError("should not create more") } } - assertThat(activeFlow.create().toList()) + assertThat(activeFlow.flow.toList()) .isEqualTo(listOf("a", "b", "c")) - assertThat(activeFlow.create().toList()) + assertThat(activeFlow.flow.toList()) .isEqualTo(listOf("d", "e", "f")) } @@ -77,12 +77,12 @@ class MulticastTest { } } val c1 = async { - activeFlow.create().onEach { + activeFlow.flow.onEach { delay(100) }.toList() } val c2 = async { - activeFlow.create().onEach { + activeFlow.flow.onEach { delay(200) }.toList() } @@ -100,10 +100,10 @@ class MulticastTest { } } val c1 = async { - activeFlow.create().toList() + activeFlow.flow.toList() } val c2 = async { - activeFlow.create().toList() + activeFlow.flow.toList() } assertThat(c1.await()).isEqualTo(listOf("a", "b", "c")) assertThat(c2.await()).isEqualTo(listOf("a", "b", "c")) @@ -117,10 +117,10 @@ class MulticastTest { } } val c1 = async { - activeFlow.create().toList() + activeFlow.flow.toList() } val c2 = async { - activeFlow.create().also { + activeFlow.flow.also { delay(110) }.toList() } @@ -144,16 +144,16 @@ class MulticastTest { } } val c1 = async { - activeFlow.create().onEach { + activeFlow.flow.onEach { }.toList() } val c2 = async { - activeFlow.create().also { + activeFlow.flow.also { delay(3) }.toList() } val c3 = async { - activeFlow.create().also { + activeFlow.flow.also { delay(20) }.toList() } @@ -177,7 +177,7 @@ class MulticastTest { } val receivedValue = CompletableDeferred() val receivedError = CompletableDeferred() - activeFlow.create() + activeFlow.flow .onEach { check(receivedValue.isActive) { "already received value" @@ -210,13 +210,13 @@ class MulticastTest { } } launch { - activeFlow.create().catch {}.toList() + activeFlow.flow.catch {}.toList() } // wait until the above collector registers and receives first value dispatchedFirstValue.await() val receivedValue = CompletableDeferred() val receivedError = CompletableDeferred() - activeFlow.create() + activeFlow.flow .onStart { registeredSecondCollector.complete(Unit) } @@ -253,12 +253,12 @@ class MulticastTest { } } val firstCollector = async { - activeFlow.create().onEach { delay(5) }.take(2).toList() + activeFlow.flow.onEach { delay(5) }.take(2).toList() } delay(11) // miss first two values val secondCollector = async { // this will come in a new channel - activeFlow.create().take(2).toList() + activeFlow.flow.take(2).toList() } assertThat(firstCollector.await()).isEqualTo(listOf("a_1", "b_1")) assertThat(secondCollector.await()).isEqualTo(listOf("a_2", "b_2")) @@ -290,19 +290,19 @@ class MulticastTest { onEach = {} ) val c1 = async { - activeFlow.create().toList() + activeFlow.flow.toList() } delay(4) // c2 misses first value val c2 = async { - activeFlow.create().toList() + activeFlow.flow.toList() } delay(50) // c3 misses first 4 values val c3 = async { - activeFlow.create().toList() + activeFlow.flow.toList() } delay(100) // c4 misses all values val c4 = async { - activeFlow.create().toList() + activeFlow.flow.toList() } assertThat(c1.await()).isEqualTo(listOf("a", "b", "c", "d", "e")) assertThat(c2.await()).isEqualTo(listOf("a", "b", "c", "d", "e")) @@ -311,6 +311,20 @@ class MulticastTest { assertThat(createdCount).isEqualTo(1) } + @Test + fun multipleCollections() = testScope.runBlockingTest { + val activeFlow = Multicaster( + scope = testScope, + bufferSize = 0, + source = { + flowOf(1, 2, 3) + }, + onEach = {} + ) + assertThat(activeFlow.flow.toList()).isEqualTo(listOf(1, 2, 3)) + assertThat(activeFlow.flow.toList()).isEqualTo(listOf(1, 2, 3)) + } + class MyCustomException(val x: String) : RuntimeException("hello") { override fun toString() = "custom$x" } diff --git a/store/src/main/java/com/dropbox/android/external/store4/impl/FetcherController.kt b/store/src/main/java/com/dropbox/android/external/store4/impl/FetcherController.kt index 9df149e..c64c98b 100644 --- a/store/src/main/java/com/dropbox/android/external/store4/impl/FetcherController.kt +++ b/store/src/main/java/com/dropbox/android/external/store4/impl/FetcherController.kt @@ -90,7 +90,7 @@ internal class FetcherController( return flow { val fetcher = fetchers.acquire(key) try { - emitAll(fetcher.create()) + emitAll(fetcher.flow) } finally { fetchers.release(key, fetcher) }