move multicast to its own module (#9)

Fixes #8
This commit is contained in:
Yigit Boyar 2019-12-08 19:12:11 +01:00 committed by Mike Nakhimovich
parent a362c33c0a
commit 6d97c399fa
14 changed files with 171 additions and 127 deletions

View file

@ -70,10 +70,6 @@ project.ext.preDexLibs = !project.hasProperty('disablePreDex')
subprojects {
apply plugin: 'com.diffplug.gradle.spotless'
if (project.path == ":store") {
apply plugin: 'org.jetbrains.dokka'
}
spotless {
kotlin {
target 'src/**/*.kt'

29
multicast/build.gradle Normal file
View file

@ -0,0 +1,29 @@
apply plugin: 'kotlin'
apply plugin: 'org.jetbrains.dokka'
dependencies {
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
implementation libraries.coroutinesCore
testImplementation libraries.junit
testImplementation libraries.coroutinesTest
testImplementation libraries.assertJ
}
group = GROUP
version = VERSION_NAME
apply from: rootProject.file("gradle/maven-push.gradle")
apply from: rootProject.file("gradle/checkstyle.gradle")
apply from: rootProject.file("gradle/pmd.gradle")
targetCompatibility = 1.8
sourceCompatibility = 1.8
compileKotlin {
kotlinOptions {
jvmTarget = "1.8"
}
}
compileTestKotlin {
kotlinOptions {
jvmTarget = "1.8"
}
}

View file

@ -0,0 +1,18 @@
#
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
POM_NAME=com.dropbox.android
POM_ARTIFACT_ID=multicast
POM_PACKAGING=jar

View file

@ -14,14 +14,13 @@
* limitations under the License.
*/
package com.dropbox.android.external.store4.impl.multicast
package com.dropbox.flow.multicast
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import java.util.ArrayDeque
import java.util.Collections
import java.util.*
/**
* This actor helps tracking active channels and is able to dispatch values to each of them
@ -30,29 +29,29 @@ import java.util.Collections
*/
@ExperimentalCoroutinesApi
internal class ChannelManager<T>(
/**
* The scope in which ChannelManager actor runs
*/
scope: CoroutineScope,
/**
* The buffer size that is used while the upstream is active
*/
bufferSize: Int,
/**
* If true, downstream is never closed by the ChannelManager unless upstream throws an error.
* Instead, it is kept open and if a new downstream shows up that causes us to restart the flow,
* it will receive values as well.
*/
private val piggybackingDownstream: Boolean = false,
/**
* Called when a value is dispatched
*/
private val onEach: suspend (T) -> Unit,
/**
* Called when the channel manager is active (e.g. it has downstream collectors and needs a
* producer)
*/
private val onActive: (ChannelManager<T>) -> SharedFlowProducer<T>
/**
* The scope in which ChannelManager actor runs
*/
scope: CoroutineScope,
/**
* The buffer size that is used while the upstream is active
*/
bufferSize: Int,
/**
* If true, downstream is never closed by the ChannelManager unless upstream throws an error.
* Instead, it is kept open and if a new downstream shows up that causes us to restart the flow,
* it will receive values as well.
*/
private val piggybackingDownstream: Boolean = false,
/**
* Called when a value is dispatched
*/
private val onEach: suspend (T) -> Unit,
/**
* Called when the channel manager is active (e.g. it has downstream collectors and needs a
* producer)
*/
private val onActive: (ChannelManager<T>) -> SharedFlowProducer<T>
) : StoreRealActor<ChannelManager.Message<T>>(scope) {
private val buffer = Buffer<T>(bufferSize)
/**
@ -175,9 +174,9 @@ internal class ChannelManager<T>(
*/
private suspend fun doAdd(msg: Message.AddChannel<T>) {
addEntry(
entry = ChannelEntry(
channel = msg.channel
)
entry = ChannelEntry(
channel = msg.channel
)
)
activateIfNecessary()
}
@ -216,14 +215,14 @@ internal class ChannelManager<T>(
* Holder for each downstream collector
*/
internal data class ChannelEntry<T>(
/**
* The channel used by the collector
*/
private val channel: Channel<Message.DispatchValue<T>>,
/**
* Tracking whether we've ever dispatched a value or an error to downstream
*/
private var _receivedValue: Boolean = false
/**
* The channel used by the collector
*/
private val channel: Channel<Message.DispatchValue<T>>,
/**
* Tracking whether we've ever dispatched a value or an error to downstream
*/
private var _receivedValue: Boolean = false
) {
val receivedValue
get() = _receivedValue
@ -255,7 +254,7 @@ internal class ChannelManager<T>(
* Add a new channel, that means a new downstream subscriber
*/
class AddChannel<T>(
val channel: Channel<DispatchValue<T>>
val channel: Channel<DispatchValue<T>>
) : Message<T>()
/**
@ -267,32 +266,32 @@ internal class ChannelManager<T>(
* Upstream dispatched a new value, send it to all downstream items
*/
class DispatchValue<T>(
/**
* The value dispatched by the upstream
*/
val value: T,
/**
* Ack that is completed by all receiver. Upstream producer will await this before asking
* for a new value from upstream
*/
val delivered: CompletableDeferred<Unit>
/**
* The value dispatched by the upstream
*/
val value: T,
/**
* Ack that is completed by all receiver. Upstream producer will await this before asking
* for a new value from upstream
*/
val delivered: CompletableDeferred<Unit>
) : Message<T>()
/**
* Upstream dispatched an error, send it to all downstream items
*/
class DispatchError<T>(
/**
* The error sent by the upstream
*/
val error: Throwable
/**
* The error sent by the upstream
*/
val error: Throwable
) : Message<T>()
class UpstreamFinished<T>(
/**
* SharedFlowProducer finished emitting
*/
val producer: SharedFlowProducer<T>
/**
* SharedFlowProducer finished emitting
*/
val producer: SharedFlowProducer<T>
) : Message<T>()
}
@ -331,7 +330,7 @@ internal class ChannelManager<T>(
* A real buffer implementation that has a FIFO queue.
*/
private class BufferImpl<T>(private val limit: Int) :
Buffer<T> {
Buffer<T> {
override val items = ArrayDeque<Message.DispatchValue<T>>(limit.coerceAtMost(10))
override fun add(item: Message.DispatchValue<T>) {
while (items.size >= limit) {

View file

@ -14,7 +14,7 @@
* limitations under the License.
*/
package com.dropbox.android.external.store4.impl.multicast
package com.dropbox.flow.multicast
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
@ -33,7 +33,7 @@ import kotlinx.coroutines.flow.transform
*/
@FlowPreview
@ExperimentalCoroutinesApi
internal class Multicaster<T>(
class Multicaster<T>(
/**
* The [CoroutineScope] to use for upstream subscription
*/
@ -64,17 +64,17 @@ internal class Multicaster<T>(
private val channelManager by lazy(LazyThreadSafetyMode.SYNCHRONIZED) {
ChannelManager(
scope = scope,
bufferSize = bufferSize,
onActive = {
SharedFlowProducer(
scope = scope,
src = source(),
channelManager = it
)
},
piggybackingDownstream = piggybackingDownstream,
onEach = onEach
scope = scope,
bufferSize = bufferSize,
onActive = {
SharedFlowProducer(
scope = scope,
src = source(),
channelManager = it
)
},
piggybackingDownstream = piggybackingDownstream,
onEach = onEach
)
}

View file

@ -13,11 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dropbox.android.external.store4.impl.multicast
package com.dropbox.flow.multicast
import com.dropbox.android.external.store4.impl.multicast.ChannelManager.Message.DispatchError
import com.dropbox.android.external.store4.impl.multicast.ChannelManager.Message.DispatchValue
import com.dropbox.android.external.store4.impl.multicast.ChannelManager.Message.UpstreamFinished
import com.dropbox.flow.multicast.ChannelManager.Message.DispatchError
import com.dropbox.flow.multicast.ChannelManager.Message.DispatchValue
import com.dropbox.flow.multicast.ChannelManager.Message.UpstreamFinished
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi

View file

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dropbox.android.external.store4.impl.multicast
package com.dropbox.flow.multicast
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope

View file

@ -13,11 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dropbox.android.external.store4.impl.multicast
package com.dropbox.flow.multicast
import com.dropbox.android.external.store4.impl.multicast.ChannelManager.Message.AddChannel
import com.dropbox.android.external.store4.impl.multicast.ChannelManager.Message.DispatchValue
import com.dropbox.android.external.store4.impl.multicast.ChannelManager.Message.RemoveChannel
import com.dropbox.flow.multicast.ChannelManager.Message.AddChannel
import com.dropbox.flow.multicast.ChannelManager.Message.DispatchValue
import com.dropbox.flow.multicast.ChannelManager.Message.RemoveChannel
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
@ -41,18 +41,18 @@ import org.junit.runners.JUnit4
class ChannelManagerTest {
private val scope = TestCoroutineScope()
private val manager = ChannelManager<String>(
scope,
0,
onEach = {}
scope,
0,
onEach = {}
) {
SharedFlowProducer(
scope, src =
flow {
suspendCancellableCoroutine<String> {
// never end
}
},
channelManager = it
scope, src =
flow {
suspendCancellableCoroutine<String> {
// never end
}
},
channelManager = it
)
}

View file

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dropbox.android.external.store4.impl.multicast
package com.dropbox.flow.multicast
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
@ -47,13 +47,13 @@ class InfiniteMulticastTest {
private fun <T> createMulticaster(f: () -> Flow<T>): Multicaster<T> {
return Multicaster(
scope = testScope,
bufferSize = 0,
source = f,
piggybackingDownstream = true,
onEach = {
dispatchLog.add(it.toString())
})
scope = testScope,
bufferSize = 0,
source = f,
piggybackingDownstream = true,
onEach = {
dispatchLog.add(it.toString())
})
}
@Test

View file

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dropbox.android.external.store4.impl.multicast
package com.dropbox.flow.multicast
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
@ -45,10 +45,10 @@ class MulticastTest {
private fun <T> createMulticaster(f: () -> Flow<T>): Multicaster<T> {
return Multicaster(
scope = testScope,
bufferSize = 0,
source = f,
onEach = {})
scope = testScope,
bufferSize = 0,
source = f,
onEach = {})
}
@Test
@ -168,7 +168,7 @@ class MulticastTest {
@Test
fun upstreamError() = testScope.runBlockingTest {
val exception =
MyCustomException("hey")
MyCustomException("hey")
val activeFlow = createMulticaster {
flow {
emit("a")
@ -197,7 +197,7 @@ class MulticastTest {
@Test
fun upstreamError_secondJustGetsError() = testScope.runBlockingTest {
val exception =
MyCustomException("hey")
MyCustomException("hey")
val dispatchedFirstValue = CompletableDeferred<Unit>()
val registeredSecondCollector = CompletableDeferred<Unit>()
val activeFlow = createMulticaster {
@ -273,23 +273,23 @@ class MulticastTest {
fun lateArrival_buffered() = testScope.runBlockingTest {
var createdCount = 0
val activeFlow = Multicaster(
scope = testScope,
bufferSize = 2,
source = {
createdCount++
flow {
emit("a")
delay(5)
emit("b")
emit("c")
emit("d")
delay(100)
emit("e")
// dont finish to see the buffer behavior
delay(2000)
}
},
onEach = {}
scope = testScope,
bufferSize = 2,
source = {
createdCount++
flow {
emit("a")
delay(5)
emit("b")
emit("c")
emit("d")
delay(100)
emit("e")
// dont finish to see the buffer behavior
delay(2000)
}
},
onEach = {}
)
val c1 = async {
activeFlow.create().toList()

View file

@ -13,8 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dropbox.android.external.store4.impl.multicast
package com.dropbox.flow.multicast
import com.dropbox.flow.multicast.StoreRealActor
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi

View file

@ -1 +1 @@
include ':app', ':store', ':cache', ':filesystem'
include ':app', ':store', ':cache', ':filesystem', ':multicast'

View file

@ -26,6 +26,7 @@ version = VERSION_NAME
dependencies {
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
implementation project(path: ':cache')
implementation project(path: ':multicast')
implementation libraries.coroutinesCore

View file

@ -17,7 +17,7 @@ package com.dropbox.android.external.store4.impl
import com.dropbox.android.external.store4.ResponseOrigin
import com.dropbox.android.external.store4.StoreResponse
import com.dropbox.android.external.store4.impl.multicast.Multicaster
import com.dropbox.flow.multicast.Multicaster
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview