Migrate Multicast to Kotlin Test (#146)

This commit is contained in:
Claus Holst 2020-04-07 17:48:04 +02:00 committed by GitHub
parent 59967d12fd
commit f809dc688e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 121 additions and 140 deletions

View file

@ -70,5 +70,7 @@ ext.libraries = [
coroutinesRx : "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:$versions.coroutines",
coroutinesReactive : "org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$versions.coroutines",
coroutinesAndroid : "org.jetbrains.kotlinx:kotlinx-coroutines-android:$versions.coroutines",
coroutinesTest : "org.jetbrains.kotlinx:kotlinx-coroutines-test:$versions.coroutines"
coroutinesTest : "org.jetbrains.kotlinx:kotlinx-coroutines-test:$versions.coroutines",
kotlinTest : "org.jetbrains.kotlin:kotlin-test:$versions.kotlin",
kotlinTestJunit : "org.jetbrains.kotlin:kotlin-test-junit:$versions.kotlin",
]

View file

@ -6,9 +6,9 @@ plugins {
dependencies {
implementation libraries.kotlinStdLib
implementation libraries.coroutinesCore
testImplementation libraries.junit
testImplementation libraries.coroutinesTest
testImplementation libraries.truth
testImplementation libraries.kotlinTest
testImplementation libraries.kotlinTestJunit
}
group = GROUP
version = VERSION_NAME

View file

@ -16,7 +16,6 @@
package com.dropbox.flow.multicast
import com.dropbox.flow.multicast.ChannelManager.Message.Dispatch
import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.async
@ -32,15 +31,14 @@ import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import org.junit.Assert.fail
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import kotlin.test.assertEquals
import kotlin.test.assertTrue
import kotlin.test.fail
import kotlin.test.Test
@FlowPreview
@ExperimentalStdlibApi
@ExperimentalCoroutinesApi
@RunWith(JUnit4::class)
class ChannelManagerTest {
private val scope = TestCoroutineScope()
private val upstream: Channel<String> = Channel(Channel.UNLIMITED)
@ -67,7 +65,7 @@ class ChannelManagerTest {
upstream.send("a")
upstream.send("b")
upstream.close()
assertThat(collection.await()).isEqualTo(listOf("a", "b"))
assertEquals(listOf("a", "b"), collection.await())
}
@Test(expected = TestException::class)
@ -106,8 +104,8 @@ class ChannelManagerTest {
// give the upstream a chance to finish and check that downstream finished.
// does not await on downstream to avoid the test hanging in case of a bug.
delay(100)
assertThat(collection.isCompleted).isTrue()
assertThat(collection.getCompleted()).isEmpty()
assertTrue(collection.isCompleted)
assertTrue(collection.getCompleted().isEmpty())
}
@Test
@ -141,8 +139,8 @@ class ChannelManagerTest {
upstream.send("b")
upstream.close()
assertThat(collection1.await()).isEqualTo(listOf("a", "b"))
assertThat(collection2.await()).isEqualTo(listOf("a", "b"))
assertEquals(listOf("a", "b"), collection1.await())
assertEquals(listOf("a", "b"), collection2.await())
}
@Test
@ -152,7 +150,7 @@ class ChannelManagerTest {
`consume two non-overlapping downstreams and count upstream creations`(
keepUpstreamAlive = false
)
assertThat(upstreamCreateCount).isEqualTo(2)
assertEquals(2, upstreamCreateCount)
}
@Test
@ -162,7 +160,7 @@ class ChannelManagerTest {
`consume two non-overlapping downstreams and count upstream creations`(
keepUpstreamAlive = true
)
assertThat(upstreamCreateCount).isEqualTo(1)
assertEquals(1, upstreamCreateCount)
}
private suspend fun `consume two non-overlapping downstreams and count upstream creations`(
@ -202,8 +200,8 @@ class ChannelManagerTest {
// get value and make sure first downstream is closed
upstreamChannel.send("a")
assertThat(s1.await()).isEqualTo("a")
assertThat(downstream1.isClosedForReceive)
assertEquals("a", s1.await())
assertTrue(downstream1.isClosedForReceive)
val downstream2 =
Channel<Dispatch.Value<String>>(Channel.UNLIMITED)
@ -219,7 +217,7 @@ class ChannelManagerTest {
// get the final value
upstreamChannel.send("b")
upstreamChannel.close()
assertThat(s2.await()).isEqualTo(listOf("a", "b")) // buffer=1 so 'a' should be sent as well
assertEquals(listOf("a", "b"), s2.await()) // buffer=1 so 'a' should be sent as well
return@coroutineScope upstreamCreateCount
}
@ -246,7 +244,7 @@ class ChannelManagerTest {
upstream.send("c")
upstream.close()
assertThat(s2.await()).isEqualTo(listOf("b", "c"))
assertEquals(listOf("b", "c"), s2.await())
}
@Test
@ -267,7 +265,7 @@ class ChannelManagerTest {
it.value
}
}
assertThat(pending.await()).isEqualTo("b")
assertEquals("b", pending.await())
val downstream3 = Channel<Dispatch.Value<String>>(Channel.UNLIMITED)
manager.addDownstream(downstream3)
@ -279,7 +277,7 @@ class ChannelManagerTest {
}
upstream.close()
assertThat(collection.await()).isEqualTo(listOf("b"))
assertEquals(listOf("b"), collection.await())
}
@Test
@ -304,7 +302,7 @@ class ChannelManagerTest {
// get the final value
upstream.send("c")
upstream.close()
assertThat(s2.await()).isEqualTo(listOf("b", "c"))
assertEquals(listOf("b", "c"), s2.await())
}
@Test
@ -330,7 +328,7 @@ class ChannelManagerTest {
// get the final value
upstream.send("c")
upstream.close()
assertThat(collection.await()).isEqualTo(listOf("a", "b", "c"))
assertEquals(listOf("a", "b", "c"), collection.await())
}
private suspend fun newManagerInKeepAliveModeWithPendingFetch(
@ -364,8 +362,8 @@ class ChannelManagerTest {
// get value and make sure first downstream is closed
upstream.send(firstValue)
assertThat(value.await()).isEqualTo(firstValue)
assertThat(downstream.isClosedForReceive)
assertEquals(firstValue, value.await())
assertTrue(downstream.isClosedForReceive)
// emit with no downstreams
upstream.send(pendingValue)
@ -390,11 +388,11 @@ class ChannelManagerTest {
upstream.send("a")
}
manager.close()
assertThat(downstream1.isClosedForSend).isTrue()
assertThat(downstream2.isClosedForSend).isTrue()
assertTrue(downstream1.isClosedForSend)
assertTrue(downstream2.isClosedForSend)
// it can be open for receive if and only if we've already sent a value
assertThat(downstream1.isClosedForReceive).isEqualTo(!dispatchValue)
assertThat(downstream2.isClosedForReceive).isEqualTo(!dispatchValue)
assertEquals(!dispatchValue, downstream1.isClosedForReceive)
assertEquals(!dispatchValue, downstream2.isClosedForReceive)
}
@Test
@ -404,7 +402,7 @@ class ChannelManagerTest {
manager.addDownstream(downstream)
manager.close()
manager.close()
assertThat(downstream.isClosedForSend).isTrue()
assertTrue(downstream.isClosedForSend)
}
}

View file

@ -28,10 +28,8 @@ import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import com.google.common.truth.Truth.assertThat
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import kotlin.test.assertEquals
/**
* Multicaster tests where downstream is not closed even when upstream is closed.
@ -41,7 +39,6 @@ import org.junit.runners.JUnit4
@FlowPreview
@ExperimentalStdlibApi
@ExperimentalCoroutinesApi
@RunWith(JUnit4::class)
class InfiniteMulticastTest {
private val testScope = TestCoroutineScope()
private val dispatchLog = mutableListOf<String>()
@ -83,13 +80,10 @@ class InfiniteMulticastTest {
delay(10_000)
// add another
val c3 = activeFlow.newDownstream().take(3).toList()
assertThat(c1.await())
.isEqualTo(listOf("a0", "b0", "c0", "a1", "b1", "c1"))
assertThat(c2.await())
.isEqualTo(listOf("a0", "b0", "c0", "a1", "b1", "c1"))
assertThat(c3)
.isEqualTo(listOf("a1", "b1", "c1"))
assertThat(createdCount).isEqualTo(2)
assertEquals(listOf("a0", "b0", "c0", "a1", "b1", "c1"), c1.await())
assertEquals(listOf("a0", "b0", "c0", "a1", "b1", "c1"), c2.await())
assertEquals(listOf("a1", "b1", "c1"), c3)
assertEquals(2, createdCount)
}
@Test
@ -118,13 +112,10 @@ class InfiniteMulticastTest {
delay(10_000)
// add another
val c3 = activeFlow.newDownstream().take(1).toList()
assertThat(c1.await())
.isEqualTo(listOf("a0", "b0", "c0", "a1", "b1", "c1"))
assertThat(c2.await())
.isEqualTo(listOf("a0", "b0", "c0", "a1", "b1", "c1"))
assertThat(c3)
.isEqualTo(listOf("a1"))
assertThat(createdCount).isEqualTo(2)
assertEquals(listOf("a0", "b0", "c0", "a1", "b1", "c1"), c1.await())
assertEquals(listOf("a0", "b0", "c0", "a1", "b1", "c1"), c2.await())
assertEquals(listOf("a1"), c3)
assertEquals(2, createdCount)
}
@Test
@ -153,13 +144,10 @@ class InfiniteMulticastTest {
delay(10_000)
// add another
val c3 = activeFlow.newDownstream().take(3).toList()
assertThat(c1.await())
.isEqualTo(listOf("a0", "b0", "c0", "a1"))
assertThat(c2.await())
.isEqualTo(listOf("a0", "b0", "c0", "a1", "b1"))
assertThat(c3)
.isEqualTo(listOf("a1", "b1", "c1"))
assertThat(createdCount).isEqualTo(2)
assertEquals(listOf("a0", "b0", "c0", "a1"), c1.await())
assertEquals(listOf("a0", "b0", "c0", "a1", "b1"), c2.await())
assertEquals(listOf("a1", "b1", "c1"), c3)
assertEquals(2, createdCount)
}
@Test
@ -189,16 +177,11 @@ class InfiniteMulticastTest {
delay(10_000)
// add another
val c3 = activeFlow.newDownstream().take(1).toList()
assertThat(c1.await())
.isEqualTo(listOf("a0", "b0", "c0", "a1"))
assertThat(c2.await())
.isEqualTo(listOf("a0", "b0", "c0", "a1", "b1"))
assertThat(c3)
.isEqualTo(listOf("a1"))
assertThat(createdCount).isEqualTo(2)
assertEquals(listOf("a0", "b0", "c0", "a1"), c1.await())
assertEquals(listOf("a0", "b0", "c0", "a1", "b1"), c2.await())
assertEquals(listOf("a1"), c3)
assertEquals(2, createdCount)
// make sure we didn't keep upsteam too long
assertThat(dispatchLog).containsExactly(
"a0", "b0", "c0", "a1", "b1"
)
assertEquals(listOf("a0", "b0", "c0", "a1", "b1"), dispatchLog)
}
}

View file

@ -15,7 +15,6 @@
*/
package com.dropbox.flow.multicast
import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
@ -39,14 +38,15 @@ import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import kotlinx.coroutines.yield
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
import kotlin.test.assertFailsWith
@FlowPreview
@ExperimentalStdlibApi
@ExperimentalCoroutinesApi
@RunWith(JUnit4::class)
class MulticastTest {
private val testScope = TestCoroutineScope()
@ -77,10 +77,8 @@ class MulticastTest {
}
}
)
assertThat(activeFlow.newDownstream().toList())
.isEqualTo(listOf("a", "b", "c"))
assertThat(activeFlow.newDownstream().toList())
.isEqualTo(listOf("d", "e", "f"))
assertEquals(listOf("a", "b", "c"), activeFlow.newDownstream().toList())
assertEquals(listOf("d", "e", "f"), activeFlow.newDownstream().toList())
}
@Test
@ -101,10 +99,8 @@ class MulticastTest {
delay(200)
}.toList()
}
assertThat(c1.await())
.isEqualTo(listOf("a", "b", "c"))
assertThat(c2.await())
.isEqualTo(listOf("a", "b", "c"))
assertEquals(listOf("a", "b", "c"), c1.await())
assertEquals(listOf("a", "b", "c"), c2.await())
}
@Test
@ -120,8 +116,8 @@ class MulticastTest {
val c2 = async {
activeFlow.newDownstream().toList()
}
assertThat(c1.await()).isEqualTo(listOf("a", "b", "c"))
assertThat(c2.await()).isEqualTo(listOf("a", "b", "c"))
assertEquals(listOf("a", "b", "c"), c1.await())
assertEquals(listOf("a", "b", "c"), c2.await())
}
@Test
@ -139,8 +135,8 @@ class MulticastTest {
delay(110)
}.toList()
}
assertThat(c1.await()).isEqualTo(listOf("a", "b", "c"))
assertThat(c2.await()).isEqualTo(listOf("a", "b", "c"))
assertEquals(listOf("a", "b", "c"), c1.await())
assertEquals(listOf("a", "b", "c"), c2.await())
}
@Test
@ -175,9 +171,9 @@ class MulticastTest {
val lists = listOf(c1, c2, c3).map {
it.await()
}
assertThat(lists[0]).isEqualTo(listOf("a_0", "b_0"))
assertThat(lists[1]).isEqualTo(listOf("b_0"))
assertThat(lists[2]).isEqualTo(listOf("a_1", "b_1"))
assertEquals(listOf("a_0", "b_0"), lists[0])
assertEquals(listOf("b_0"), lists[1])
assertEquals(listOf("a_1", "b_1"), lists[2])
}
@Test
@ -204,9 +200,9 @@ class MulticastTest {
}
receivedError.complete(it)
}.toList()
assertThat(receivedValue.await()).isEqualTo("a")
assertEquals("a", receivedValue.await())
val error = receivedError.await()
assertThat(error).isEqualTo(exception)
assertEquals(exception, error)
}
@Test
@ -244,9 +240,9 @@ class MulticastTest {
receivedError.complete(it)
}.toList()
val error = receivedError.await()
assertThat(error).isEqualTo(exception)
assertEquals(exception, error)
// test sanity, second collector never receives a value
assertThat(receivedValue.isActive).isTrue()
assertTrue(receivedValue.isActive)
}
@Test
@ -275,11 +271,11 @@ class MulticastTest {
// this will come in a new channel
activeFlow.newDownstream().take(2).toList()
}
assertThat(firstCollector.await()).isEqualTo(listOf("a_1", "b_1"))
assertThat(secondCollector.await()).isEqualTo(listOf("a_2", "b_2"))
assertThat(collectedCount).isEqualTo(2)
assertEquals(listOf("a_1", "b_1"), firstCollector.await())
assertEquals(listOf("a_2", "b_2"), secondCollector.await())
assertEquals(2, collectedCount)
delay(200)
assertThat(didntFinish).isEqualTo(false)
assertFalse(didntFinish)
}
@Test
@ -317,11 +313,11 @@ class MulticastTest {
val c4 = async {
activeFlow.newDownstream().toList()
}
assertThat(c1.await()).isEqualTo(listOf("a", "b", "c", "d", "e"))
assertThat(c2.await()).isEqualTo(listOf("a", "b", "c", "d", "e"))
assertThat(c3.await()).isEqualTo(listOf("c", "d", "e"))
assertThat(c4.await()).isEqualTo(listOf("d", "e"))
assertThat(collectedCount).isEqualTo(1)
assertEquals(listOf("a", "b", "c", "d", "e"), c1.await())
assertEquals(listOf("a", "b", "c", "d", "e"), c2.await())
assertEquals(listOf("c", "d", "e"), c3.await())
assertEquals(listOf("d", "e"), c4.await())
assertEquals(1, collectedCount)
}
@Test
@ -332,8 +328,8 @@ class MulticastTest {
source = flowOf(1, 2, 3),
onEach = {}
)
assertThat(activeFlow.newDownstream().toList()).isEqualTo(listOf(1, 2, 3))
assertThat(activeFlow.newDownstream().toList()).isEqualTo(listOf(1, 2, 3))
assertEquals(listOf(1, 2, 3), activeFlow.newDownstream().toList())
assertEquals(listOf(1, 2, 3), activeFlow.newDownstream().toList())
}
@Test
@ -355,8 +351,8 @@ class MulticastTest {
activeFlow.newDownstream().toList()
}
testScope.runCurrent()
assertThat(c2.isActive).isFalse()
assertThat(c2.await()).isEqualTo(listOf("b_0", "c_0"))
assertFalse(c2.isActive)
assertEquals(listOf("b_0", "c_0"), c2.await())
unlockC1.complete(Unit)
}
@ -379,8 +375,8 @@ class MulticastTest {
activeFlow.newDownstream().toList()
}
testScope.runCurrent()
assertThat(c2.isActive).isFalse()
assertThat(c2.await()).isEqualTo(listOf("a_0", "b_0", "c_0"))
assertFalse(c2.isActive)
assertEquals(listOf("a_0", "b_0", "c_0"), c2.await())
unlockC1.complete(Unit)
}
@ -402,8 +398,8 @@ class MulticastTest {
activeFlow.newDownstream().toList()
}
testScope.runCurrent()
assertThat(c2.isActive).isFalse()
assertThat(c2.await()).isEqualTo(listOf("a_1"))
assertFalse(c2.isActive)
assertEquals(listOf("a_1"), c2.await())
unlockC1.complete(Unit)
}
@ -420,11 +416,11 @@ class MulticastTest {
multicaster.newDownstream().toList()
}
runCurrent()
assertThat(collection.isActive).isTrue()
assertTrue(collection.isActive)
multicaster.close()
runCurrent()
assertThat(collection.isCompleted).isTrue()
assertThat(collection.await()).isEqualTo(listOf(1))
assertTrue(collection.isCompleted)
assertEquals(listOf(1), collection.await())
}
@Test
@ -443,8 +439,8 @@ class MulticastTest {
multicaster.newDownstream().toList()
}
runCurrent()
assertThat(collection.isActive).isFalse()
assertThat(collection.await()).isEmpty()
assertFalse(collection.isActive)
assertTrue(collection.await().isEmpty())
}
@Test
@ -472,8 +468,8 @@ class MulticastTest {
multicaster.newDownstream().toList()
}
runCurrent()
assertThat(collection2.isActive).isFalse()
assertThat(collection2.await()).isEmpty()
assertFalse(collection2.isActive)
assertTrue(collection2.await().isEmpty())
}
@Test
@ -489,25 +485,28 @@ class MulticastTest {
val piggybackDownstream = multicaster.newDownstream(piggybackOnly = true)
val piggybackValue = testScope.async { piggybackDownstream.first() }
testScope.advanceUntilIdle()
assertThat(createCount).isEqualTo(0)
assertThat(piggybackValue.isCompleted).isEqualTo(false)
assertEquals(0, createCount)
assertFalse(piggybackValue.isCompleted)
val downstream = multicaster.newDownstream(piggybackOnly = false)
val value = testScope.async { downstream.first() }
testScope.advanceUntilIdle()
assertThat(createCount).isEqualTo(1)
assertThat(piggybackValue.isCompleted).isEqualTo(true)
assertThat(piggybackValue.getCompleted()).isEqualTo("value")
assertThat(value.isCompleted).isEqualTo(true)
assertThat(value.getCompleted()).isEqualTo("value")
assertEquals(1, createCount)
assertTrue(piggybackValue.isCompleted)
assertEquals("value", piggybackValue.getCompleted())
assertTrue(value.isCompleted)
assertEquals("value", value.getCompleted())
}
@Test(expected = IllegalStateException::class)
fun `GIVEN no piggybackDownstream WHEN adding a piggybackOnly downstream THEN throws IllegalStateException`() =
testScope.runBlockingTest {
val multicaster = createMulticaster(flowOf("a"), piggybackDownstream = false)
multicaster.newDownstream(piggybackOnly = true)
@Test
fun `GIVEN no piggybackDownstream WHEN adding a piggybackOnly downstream THEN throws IllegalStateException`() {
assertFailsWith(IllegalStateException::class, "Must fail with IllegalStateException") {
testScope.runBlockingTest {
val multicaster = createMulticaster(flowOf("a"), piggybackDownstream = false)
multicaster.newDownstream(piggybackOnly = true)
}
}
}
private fun versionedMulticaster(
bufferSize: Int = 0,
@ -520,7 +519,7 @@ class MulticastTest {
bufferSize = bufferSize,
source = flow<String> {
val id = counter++
assertThat(counter).isAtMost(collectionLimit)
assertTrue(counter <= collectionLimit)
emitAll(values.asFlow().map {
"${it}_$id"
})

View file

@ -15,12 +15,13 @@
*/
package com.dropbox.flow.multicast
import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.test.TestCoroutineScope
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
@OptIn(ExperimentalCoroutinesApi::class, ExperimentalStdlibApi::class)
class SharedFlowProducerTest {
@ -43,24 +44,24 @@ class SharedFlowProducerTest {
scope.pauseDispatcher()
producer.start()
producer.cancel()
assertThat(upstreamMessages).isEmpty()
assertTrue(upstreamMessages.isEmpty())
}
@Test
fun `Producer forwards all values from source when acked`() {
val producer = createProducer(flowOf("a", "b", "c"))
assertThat(upstreamMessages).isEmpty()
assertTrue(upstreamMessages.isEmpty())
producer.start()
assertThat(upstreamMessages).containsExactly("a", "b", "c")
assertEquals(listOf("a", "b", "c"), upstreamMessages)
}
@Test
fun `Calling start should be idempotent`() {
val producer = createProducer(flowOf("a", "b", "c"))
assertThat(upstreamMessages).isEmpty()
assertTrue(upstreamMessages.isEmpty())
producer.start()
producer.start()
producer.start()
assertThat(upstreamMessages).containsExactly("a", "b", "c")
assertEquals(listOf("a", "b", "c"), upstreamMessages)
}
}

View file

@ -22,15 +22,13 @@ import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import com.google.common.truth.Truth.assertThat
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.test.Test
import kotlin.test.assertFalse
import kotlin.test.assertTrue
@ExperimentalCoroutinesApi
@RunWith(JUnit4::class)
class StoreRealActorTest {
/**
@ -52,7 +50,7 @@ class StoreRealActorTest {
}
override fun onClosed() {
assertThat(active.get()).isFalse()
assertFalse(active.get())
didClose.set(true)
}
}
@ -71,6 +69,6 @@ class StoreRealActorTest {
actor.close()
sender.join()
}
assertThat(didClose.get()).isTrue()
assertTrue(didClose.get())
}
}