parent
1d912ea898
commit
e63c6bbef8
6 changed files with 32 additions and 27 deletions
|
@ -5,43 +5,46 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
|
|||
import kotlinx.coroutines.FlowPreview
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
|
||||
// used when there is no input
|
||||
class NoInput
|
||||
|
||||
@FlowPreview
|
||||
fun <Key, Output> beginPipeline(
|
||||
fetcher: (Key) -> Flow<Output>
|
||||
): PipelineStore<Key, Output> {
|
||||
): PipelineStore<Key, NoInput, Output> {
|
||||
return PipelineFetcherStore(fetcher)
|
||||
}
|
||||
|
||||
|
||||
@FlowPreview
|
||||
fun <Key, OldOutput, NewOutput> PipelineStore<Key, OldOutput>.withConverter(
|
||||
fun <Key, Input, OldOutput, NewOutput> PipelineStore<Key, Input, OldOutput>.withConverter(
|
||||
converter: suspend (OldOutput) -> NewOutput
|
||||
): PipelineStore<Key, NewOutput> {
|
||||
): PipelineStore<Key, Input, NewOutput> {
|
||||
return PipelineConverterStore(this) { key, value ->
|
||||
converter(value)
|
||||
}
|
||||
}
|
||||
|
||||
@FlowPreview
|
||||
fun <Key, OldOutput, NewOutput> PipelineStore<Key, OldOutput>.withKeyConverter(
|
||||
fun <Key, Input, OldOutput, NewOutput> PipelineStore<Key, Input, OldOutput>.withKeyConverter(
|
||||
converter: suspend (Key, OldOutput) -> NewOutput
|
||||
): PipelineStore<Key, NewOutput> {
|
||||
): PipelineStore<Key, Input, NewOutput> {
|
||||
return PipelineConverterStore(this, converter)
|
||||
}
|
||||
|
||||
@FlowPreview
|
||||
fun <Key, Output> PipelineStore<Key, Output>.withCache(
|
||||
fun <Key, Input, Output> PipelineStore<Key, Input, Output>.withCache(
|
||||
memoryPolicy: MemoryPolicy? = null
|
||||
): PipelineStore<Key, Output> {
|
||||
): PipelineStore<Key, Input, Output> {
|
||||
return PipelineCacheStore(this, memoryPolicy)
|
||||
}
|
||||
|
||||
@FlowPreview
|
||||
fun <Key, OldOutput, NewOutput> PipelineStore<Key, OldOutput>.withPersister(
|
||||
fun <Key, OldInput, OldOutput, NewOutput> PipelineStore<Key, OldInput, OldOutput>.withPersister(
|
||||
reader: (Key) -> Flow<NewOutput>,
|
||||
writer: suspend (Key, OldOutput) -> Unit,
|
||||
delete: (suspend (Key) -> Unit)? = null
|
||||
): PipelineStore<Key, NewOutput> {
|
||||
): PipelineStore<Key, OldOutput, NewOutput> {
|
||||
return PipelinePersister(
|
||||
fetcher = this,
|
||||
reader = reader,
|
||||
|
@ -52,11 +55,11 @@ fun <Key, OldOutput, NewOutput> PipelineStore<Key, OldOutput>.withPersister(
|
|||
|
||||
@ExperimentalCoroutinesApi
|
||||
@FlowPreview
|
||||
fun <Key, OldOutput, NewOutput> PipelineStore<Key, OldOutput>.withNonFlowPersister(
|
||||
fun <Key, OldInput, OldOutput, NewOutput> PipelineStore<Key, OldInput, OldOutput>.withNonFlowPersister(
|
||||
reader: suspend (Key) -> NewOutput?,
|
||||
writer: suspend (Key, OldOutput) -> Unit,
|
||||
delete: (suspend (Key) -> Unit)? = null
|
||||
): PipelineStore<Key, NewOutput> {
|
||||
): PipelineStore<Key, OldOutput, NewOutput> {
|
||||
val flowable = SimplePersisterAsFlowable(
|
||||
reader = reader,
|
||||
writer = writer,
|
||||
|
|
|
@ -8,10 +8,10 @@ import kotlinx.coroutines.flow.Flow
|
|||
import kotlinx.coroutines.flow.onEach
|
||||
|
||||
@FlowPreview
|
||||
internal class PipelineCacheStore<Key, Output>(
|
||||
private val delegate: PipelineStore<Key, Output>,
|
||||
memoryPolicy: MemoryPolicy? = null
|
||||
) : PipelineStore<Key, Output> {
|
||||
internal class PipelineCacheStore<Key, Input, Output>(
|
||||
private val delegate: PipelineStore<Key, Input, Output>,
|
||||
memoryPolicy: MemoryPolicy? = null
|
||||
) : PipelineStore<Key, Input, Output> {
|
||||
private val memCache = StoreCache.from(
|
||||
loader = { key: Key ->
|
||||
delegate.get(key)
|
||||
|
|
|
@ -12,10 +12,10 @@ private fun <Key, In, Out> castConverter(): suspend (Key, In) -> Out {
|
|||
}
|
||||
|
||||
@UseExperimental(FlowPreview::class)
|
||||
class PipelineConverterStore<Key, OldOutput, NewOutput>(
|
||||
private val delegate: PipelineStore<Key, OldOutput>,
|
||||
private val converter: (suspend (Key, OldOutput) -> NewOutput) = castConverter()
|
||||
) : PipelineStore<Key, NewOutput> {
|
||||
class PipelineConverterStore<Key, Input, OldOutput, NewOutput>(
|
||||
private val delegate: PipelineStore<Key, Input, OldOutput>,
|
||||
private val converter: (suspend (Key, OldOutput) -> NewOutput) = castConverter()
|
||||
) : PipelineStore<Key, Input, NewOutput> {
|
||||
override suspend fun get(key: Key): NewOutput? {
|
||||
return delegate.get(key)?.let {
|
||||
converter(key, it)
|
||||
|
|
|
@ -6,7 +6,7 @@ import kotlinx.coroutines.flow.Flow
|
|||
@FlowPreview
|
||||
internal class PipelineFetcherStore<Key, Output>(
|
||||
private val fetcher: (Key) -> Flow<Output>
|
||||
) : PipelineStore<Key, Output> {
|
||||
) : PipelineStore<Key, NoInput, Output> {
|
||||
override suspend fun get(key: Key): Output? {
|
||||
return fetcher(key).singleOrNull()
|
||||
}
|
||||
|
|
|
@ -1,18 +1,20 @@
|
|||
package com.nytimes.android.external.store3.pipeline
|
||||
|
||||
import kotlinx.coroutines.FlowPreview
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import kotlinx.coroutines.flow.switchMap
|
||||
import kotlinx.coroutines.launch
|
||||
|
||||
@FlowPreview
|
||||
class PipelinePersister<Key, Input, Output>(
|
||||
private val fetcher: PipelineStore<Key, Input>,
|
||||
private val reader: (Key) -> Flow<Output?>,
|
||||
private val writer: suspend (Key, Input) -> Unit,
|
||||
private val delete: (suspend (Key) -> Unit)? = null
|
||||
) : PipelineStore<Key, Output> {
|
||||
private val fetcher: PipelineStore<Key, *, Input>,
|
||||
private val reader: (Key) -> Flow<Output?>,
|
||||
private val writer: suspend (Key, Input) -> Unit,
|
||||
private val delete: (suspend (Key) -> Unit)? = null
|
||||
) : PipelineStore<Key, Input, Output> {
|
||||
override suspend fun get(key: Key): Output? {
|
||||
val value: Output? = reader(key).singleOrNull()
|
||||
value?.let {
|
||||
|
|
|
@ -14,7 +14,7 @@ internal class AbortFlowException :
|
|||
// if this class becomes public, should probaly be named IntermediateStore to distingush from
|
||||
// Store and also clarify that it still needs to be built/open? (how do we ensure?)
|
||||
@FlowPreview
|
||||
interface PipelineStore<Key, Output> {
|
||||
interface PipelineStore<Key, Input, Output> {
|
||||
/**
|
||||
* Return a flow for the given key
|
||||
*/
|
||||
|
@ -48,7 +48,7 @@ interface PipelineStore<Key, Output> {
|
|||
}
|
||||
|
||||
@FlowPreview
|
||||
fun <Key, Output> PipelineStore<Key, Output>.open(): Store<Output, Key> {
|
||||
fun <Key, Input, Output> PipelineStore<Key, Input, Output>.open(): Store<Output, Key> {
|
||||
val self = this
|
||||
return object : Store<Output, Key> {
|
||||
override suspend fun get(key: Key) = self.get(key)!!
|
||||
|
|
Loading…
Reference in a new issue