Fix side collect (#17)

This PR fixes a bug in side-collect where it was not cancelling the side
collection when main flow stops.
It still does not handle the case where side collect fails but that can only be fixed when we have error handling.

Needed to move the method to a non-private place to be able to write tests :/
This commit is contained in:
Yigit Boyar 2019-08-02 05:27:03 -07:00 committed by GitHub
parent 645cbc1ef5
commit 550eb18f48
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 105 additions and 21 deletions

View file

@ -1,8 +1,9 @@
package com.nytimes.android.external.store3.pipeline
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
private object NotReceived
@ -24,4 +25,23 @@ internal suspend fun <T> Flow<T>.singleOrNull(): T? {
@Suppress("UNCHECKED_CAST")
value as? T
}
}
@FlowPreview
internal fun <T, R> Flow<T>.sideCollect(
other: Flow<R>,
otherCollect: suspend (R) -> Unit
) = flow {
coroutineScope {
val sideJob = launch {
other.collect {
otherCollect(it)
}
}
this@sideCollect.collect {
emit(it)
}
// when main flow ends, cancel the side channel.
sideJob.cancelAndJoin()
}
}

View file

@ -1,12 +1,10 @@
package com.nytimes.android.external.store3.pipeline
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.switchMap
import kotlinx.coroutines.launch
@FlowPreview
class PipelinePersister<Key, Input, Output>(
@ -80,21 +78,3 @@ private fun <T1, T2> Flow<T1>.castNonNull(): Flow<T2> {
}
}
}
@FlowPreview
private fun <T, R> Flow<T>.sideCollect(
other: Flow<R>,
otherCollect: suspend (R) -> Unit
) = flow {
coroutineScope {
launch {
other.collect {
otherCollect(it)
}
}
this@sideCollect.collect {
emit(it)
}
}
}

View file

@ -0,0 +1,84 @@
package com.nytimes.android.external.store3.pipeline
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
@ExperimentalCoroutinesApi
@FlowPreview
@RunWith(JUnit4::class)
class FlowExtTest {
private val testScope = TestCoroutineScope()
@Test
fun sideCollect_instant() = testScope.runBlockingTest {
val main = flowOf(1, 2, 3)
val side = flowOf("a", "b", "c")
val merged = mutableListOf<Any>()
main.sideCollect(side) {
merged.add(it)
}.collect {
merged.add(it)
}
assertThat(merged).isEqualTo(listOf("a", "b", "c", 1, 2, 3))
}
@Test
fun sideCollect_sideDelayed() = testScope.runBlockingTest {
val main = flowOf(1, 2, 3)
val side = flowOf("a", "b", "c").delayFlow(10)
val merged = mutableListOf<Any>()
main.sideCollect(side) {
merged.add(it)
}.collect {
merged.add(it)
}
assertThat(merged).isEqualTo(listOf(1, 2, 3))
}
@Test
fun sideCollect_srcDelayed() = testScope.runBlockingTest {
val main = flowOf(1, 2, 3).delayEach(10)
val side = flowOf("a", "b", "c")
val merged = mutableListOf<Any>()
main.sideCollect(side) {
merged.add(it)
}.collect {
merged.add(it)
}
assertThat(merged).isEqualTo(listOf("a", "b", "c", 1, 2, 3))
}
@Test
fun sideCollect_interleaved() = testScope.runBlockingTest {
val main = flow {
emit(1)
delay(10)
emit(2)
delay(20)
emit(3)
}
val side = flow {
delay(1)
emit("a")
delay(6)
emit("b")
delay(2000)
emit("c") //
}
val merged = mutableListOf<Any>()
main.sideCollect(side) {
merged.add(it)
}.collect {
merged.add(it)
}
assertThat(merged).isEqualTo(listOf(1, "a", "b", 2, 3))
}
}