Fix multiple collections on Multicast (#28)

* Fix multiple collections on Multicast

Multicast implementation had a bug where the returned flow could
not be collected multiple times as it was using the same channel
it created when  was called.

This PR changes it to create per collection to avoid this issue.

I've also replaced  function with a  field as there
is no reason to keep creating a new one, it can be just a flow

Test: MultiplexTest#multipleCollections
Fixes: #26

* remove create function
This commit is contained in:
Yigit Boyar 2019-12-13 10:08:35 -08:00 committed by Mike Nakhimovich
parent fa10e84623
commit 2b70a2f721
4 changed files with 67 additions and 50 deletions

View file

@ -22,6 +22,8 @@ import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.transform
@ -78,26 +80,27 @@ class Multicaster<T>(
)
}
fun create(): Flow<T> {
val flow = flow<T> {
val channel = Channel<ChannelManager.Message.DispatchValue<T>>(Channel.UNLIMITED)
return channel.consumeAsFlow()
.onStart {
channelManager.send(
ChannelManager.Message.AddChannel(
channel
val subFlow = channel.consumeAsFlow()
.onStart {
channelManager.send(
ChannelManager.Message.AddChannel(
channel
)
)
)
}
.transform {
emit(it.value)
it.delivered.complete(Unit)
}.onCompletion {
channelManager.send(
ChannelManager.Message.RemoveChannel(
channel
}
.transform {
emit(it.value)
it.delivered.complete(Unit)
}.onCompletion {
channelManager.send(
ChannelManager.Message.RemoveChannel(
channel
)
)
)
}
}
emitAll(subFlow)
}
suspend fun close() {

View file

@ -67,19 +67,19 @@ class InfiniteMulticastTest {
}
}
val c1 = async {
activeFlow.create().onEach {
activeFlow.flow.onEach {
delay(100)
}.take(6).toList()
}
val c2 = async {
activeFlow.create().onEach {
activeFlow.flow.onEach {
delay(200)
}.take(6).toList()
}
// ensure first flow finishes
delay(10_000)
// add another
val c3 = activeFlow.create().take(3).toList()
val c3 = activeFlow.flow.take(3).toList()
assertThat(c1.await())
.isEqualTo(listOf("a0", "b0", "c0", "a1", "b1", "c1"))
assertThat(c2.await())
@ -100,19 +100,19 @@ class InfiniteMulticastTest {
}
}
val c1 = async {
activeFlow.create().onEach {
activeFlow.flow.onEach {
delay(100)
}.take(6).toList()
}
val c2 = async {
activeFlow.create().onEach {
activeFlow.flow.onEach {
delay(200)
}.take(6).toList()
}
// ensure first flow finishes
delay(10_000)
// add another
val c3 = activeFlow.create().take(1).toList()
val c3 = activeFlow.flow.take(1).toList()
assertThat(c1.await())
.isEqualTo(listOf("a0", "b0", "c0", "a1", "b1", "c1"))
assertThat(c2.await())
@ -133,19 +133,19 @@ class InfiniteMulticastTest {
}
}
val c1 = async {
activeFlow.create().onEach {
activeFlow.flow.onEach {
delay(100)
}.take(4).toList()
}
val c2 = async {
activeFlow.create().onEach {
activeFlow.flow.onEach {
delay(200)
}.take(5).toList()
}
// ensure first flow finishes
delay(10_000)
// add another
val c3 = activeFlow.create().take(3).toList()
val c3 = activeFlow.flow.take(3).toList()
assertThat(c1.await())
.isEqualTo(listOf("a0", "b0", "c0", "a1"))
assertThat(c2.await())
@ -167,19 +167,19 @@ class InfiniteMulticastTest {
}
}
val c1 = async {
activeFlow.create().onEach {
activeFlow.flow.onEach {
delay(100)
}.take(4).toList()
}
val c2 = async {
activeFlow.create().onEach {
activeFlow.flow.onEach {
delay(200)
}.take(5).toList()
}
// ensure first flow finishes
delay(10_000)
// add another
val c3 = activeFlow.create().take(1).toList()
val c3 = activeFlow.flow.take(1).toList()
assertThat(c1.await())
.isEqualTo(listOf("a0", "b0", "c0", "a1"))
assertThat(c2.await())

View file

@ -62,9 +62,9 @@ class MulticastTest {
else -> throw AssertionError("should not create more")
}
}
assertThat(activeFlow.create().toList())
assertThat(activeFlow.flow.toList())
.isEqualTo(listOf("a", "b", "c"))
assertThat(activeFlow.create().toList())
assertThat(activeFlow.flow.toList())
.isEqualTo(listOf("d", "e", "f"))
}
@ -77,12 +77,12 @@ class MulticastTest {
}
}
val c1 = async {
activeFlow.create().onEach {
activeFlow.flow.onEach {
delay(100)
}.toList()
}
val c2 = async {
activeFlow.create().onEach {
activeFlow.flow.onEach {
delay(200)
}.toList()
}
@ -100,10 +100,10 @@ class MulticastTest {
}
}
val c1 = async {
activeFlow.create().toList()
activeFlow.flow.toList()
}
val c2 = async {
activeFlow.create().toList()
activeFlow.flow.toList()
}
assertThat(c1.await()).isEqualTo(listOf("a", "b", "c"))
assertThat(c2.await()).isEqualTo(listOf("a", "b", "c"))
@ -117,10 +117,10 @@ class MulticastTest {
}
}
val c1 = async {
activeFlow.create().toList()
activeFlow.flow.toList()
}
val c2 = async {
activeFlow.create().also {
activeFlow.flow.also {
delay(110)
}.toList()
}
@ -144,16 +144,16 @@ class MulticastTest {
}
}
val c1 = async {
activeFlow.create().onEach {
activeFlow.flow.onEach {
}.toList()
}
val c2 = async {
activeFlow.create().also {
activeFlow.flow.also {
delay(3)
}.toList()
}
val c3 = async {
activeFlow.create().also {
activeFlow.flow.also {
delay(20)
}.toList()
}
@ -177,7 +177,7 @@ class MulticastTest {
}
val receivedValue = CompletableDeferred<String>()
val receivedError = CompletableDeferred<Throwable>()
activeFlow.create()
activeFlow.flow
.onEach {
check(receivedValue.isActive) {
"already received value"
@ -210,13 +210,13 @@ class MulticastTest {
}
}
launch {
activeFlow.create().catch {}.toList()
activeFlow.flow.catch {}.toList()
}
// wait until the above collector registers and receives first value
dispatchedFirstValue.await()
val receivedValue = CompletableDeferred<String>()
val receivedError = CompletableDeferred<Throwable>()
activeFlow.create()
activeFlow.flow
.onStart {
registeredSecondCollector.complete(Unit)
}
@ -253,12 +253,12 @@ class MulticastTest {
}
}
val firstCollector = async {
activeFlow.create().onEach { delay(5) }.take(2).toList()
activeFlow.flow.onEach { delay(5) }.take(2).toList()
}
delay(11) // miss first two values
val secondCollector = async {
// this will come in a new channel
activeFlow.create().take(2).toList()
activeFlow.flow.take(2).toList()
}
assertThat(firstCollector.await()).isEqualTo(listOf("a_1", "b_1"))
assertThat(secondCollector.await()).isEqualTo(listOf("a_2", "b_2"))
@ -290,19 +290,19 @@ class MulticastTest {
onEach = {}
)
val c1 = async {
activeFlow.create().toList()
activeFlow.flow.toList()
}
delay(4) // c2 misses first value
val c2 = async {
activeFlow.create().toList()
activeFlow.flow.toList()
}
delay(50) // c3 misses first 4 values
val c3 = async {
activeFlow.create().toList()
activeFlow.flow.toList()
}
delay(100) // c4 misses all values
val c4 = async {
activeFlow.create().toList()
activeFlow.flow.toList()
}
assertThat(c1.await()).isEqualTo(listOf("a", "b", "c", "d", "e"))
assertThat(c2.await()).isEqualTo(listOf("a", "b", "c", "d", "e"))
@ -311,6 +311,20 @@ class MulticastTest {
assertThat(createdCount).isEqualTo(1)
}
@Test
fun multipleCollections() = testScope.runBlockingTest {
val activeFlow = Multicaster(
scope = testScope,
bufferSize = 0,
source = {
flowOf(1, 2, 3)
},
onEach = {}
)
assertThat(activeFlow.flow.toList()).isEqualTo(listOf(1, 2, 3))
assertThat(activeFlow.flow.toList()).isEqualTo(listOf(1, 2, 3))
}
class MyCustomException(val x: String) : RuntimeException("hello") {
override fun toString() = "custom$x"
}

View file

@ -90,7 +90,7 @@ internal class FetcherController<Key, Input, Output>(
return flow {
val fetcher = fetchers.acquire(key)
try {
emitAll(fetcher.create())
emitAll(fetcher.flow)
} finally {
fetchers.release(key, fetcher)
}