Removed extra generic
This commit is contained in:
parent
1ba634e7c3
commit
1d912ea898
6 changed files with 27 additions and 32 deletions
|
@ -5,46 +5,43 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||||
import kotlinx.coroutines.FlowPreview
|
import kotlinx.coroutines.FlowPreview
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
|
||||||
// used when there is no input
|
|
||||||
class NoInput
|
|
||||||
|
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
fun <Key, Output> beginPipeline(
|
fun <Key, Output> beginPipeline(
|
||||||
fetcher: (Key) -> Flow<Output>
|
fetcher: (Key) -> Flow<Output>
|
||||||
): PipelineStore<Key, NoInput, Output> {
|
): PipelineStore<Key, Output> {
|
||||||
return PipelineFetcherStore(fetcher)
|
return PipelineFetcherStore(fetcher)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
fun <Key, Input, OldOutput, NewOutput> PipelineStore<Key, Input, OldOutput>.withConverter(
|
fun <Key, OldOutput, NewOutput> PipelineStore<Key, OldOutput>.withConverter(
|
||||||
converter: suspend (OldOutput) -> NewOutput
|
converter: suspend (OldOutput) -> NewOutput
|
||||||
): PipelineStore<Key, Input, NewOutput> {
|
): PipelineStore<Key, NewOutput> {
|
||||||
return PipelineConverterStore(this) { key, value ->
|
return PipelineConverterStore(this) { key, value ->
|
||||||
converter(value)
|
converter(value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
fun <Key, Input, OldOutput, NewOutput> PipelineStore<Key, Input, OldOutput>.withKeyConverter(
|
fun <Key, OldOutput, NewOutput> PipelineStore<Key, OldOutput>.withKeyConverter(
|
||||||
converter: suspend (Key, OldOutput) -> NewOutput
|
converter: suspend (Key, OldOutput) -> NewOutput
|
||||||
): PipelineStore<Key, Input, NewOutput> {
|
): PipelineStore<Key, NewOutput> {
|
||||||
return PipelineConverterStore(this, converter)
|
return PipelineConverterStore(this, converter)
|
||||||
}
|
}
|
||||||
|
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
fun <Key, Input, Output> PipelineStore<Key, Input, Output>.withCache(
|
fun <Key, Output> PipelineStore<Key, Output>.withCache(
|
||||||
memoryPolicy: MemoryPolicy? = null
|
memoryPolicy: MemoryPolicy? = null
|
||||||
): PipelineStore<Key, Input, Output> {
|
): PipelineStore<Key, Output> {
|
||||||
return PipelineCacheStore(this, memoryPolicy)
|
return PipelineCacheStore(this, memoryPolicy)
|
||||||
}
|
}
|
||||||
|
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
fun <Key, OldInput, OldOutput, NewOutput> PipelineStore<Key, OldInput, OldOutput>.withPersister(
|
fun <Key, OldOutput, NewOutput> PipelineStore<Key, OldOutput>.withPersister(
|
||||||
reader: (Key) -> Flow<NewOutput>,
|
reader: (Key) -> Flow<NewOutput>,
|
||||||
writer: suspend (Key, OldOutput) -> Unit,
|
writer: suspend (Key, OldOutput) -> Unit,
|
||||||
delete: (suspend (Key) -> Unit)? = null
|
delete: (suspend (Key) -> Unit)? = null
|
||||||
): PipelineStore<Key, OldOutput, NewOutput> {
|
): PipelineStore<Key, NewOutput> {
|
||||||
return PipelinePersister(
|
return PipelinePersister(
|
||||||
fetcher = this,
|
fetcher = this,
|
||||||
reader = reader,
|
reader = reader,
|
||||||
|
@ -55,11 +52,11 @@ fun <Key, OldInput, OldOutput, NewOutput> PipelineStore<Key, OldInput, OldOutput
|
||||||
|
|
||||||
@ExperimentalCoroutinesApi
|
@ExperimentalCoroutinesApi
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
fun <Key, OldInput, OldOutput, NewOutput> PipelineStore<Key, OldInput, OldOutput>.withNonFlowPersister(
|
fun <Key, OldOutput, NewOutput> PipelineStore<Key, OldOutput>.withNonFlowPersister(
|
||||||
reader: suspend (Key) -> NewOutput?,
|
reader: suspend (Key) -> NewOutput?,
|
||||||
writer: suspend (Key, OldOutput) -> Unit,
|
writer: suspend (Key, OldOutput) -> Unit,
|
||||||
delete: (suspend (Key) -> Unit)? = null
|
delete: (suspend (Key) -> Unit)? = null
|
||||||
): PipelineStore<Key, OldOutput, NewOutput> {
|
): PipelineStore<Key, NewOutput> {
|
||||||
val flowable = SimplePersisterAsFlowable(
|
val flowable = SimplePersisterAsFlowable(
|
||||||
reader = reader,
|
reader = reader,
|
||||||
writer = writer,
|
writer = writer,
|
||||||
|
|
|
@ -8,10 +8,10 @@ import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.onEach
|
import kotlinx.coroutines.flow.onEach
|
||||||
|
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
internal class PipelineCacheStore<Key, Input, Output>(
|
internal class PipelineCacheStore<Key, Output>(
|
||||||
private val delegate: PipelineStore<Key, Input, Output>,
|
private val delegate: PipelineStore<Key, Output>,
|
||||||
memoryPolicy: MemoryPolicy? = null
|
memoryPolicy: MemoryPolicy? = null
|
||||||
) : PipelineStore<Key, Input, Output> {
|
) : PipelineStore<Key, Output> {
|
||||||
private val memCache = StoreCache.from(
|
private val memCache = StoreCache.from(
|
||||||
loader = { key: Key ->
|
loader = { key: Key ->
|
||||||
delegate.get(key)
|
delegate.get(key)
|
||||||
|
|
|
@ -12,10 +12,10 @@ private fun <Key, In, Out> castConverter(): suspend (Key, In) -> Out {
|
||||||
}
|
}
|
||||||
|
|
||||||
@UseExperimental(FlowPreview::class)
|
@UseExperimental(FlowPreview::class)
|
||||||
class PipelineConverterStore<Key, Input, OldOutput, NewOutput>(
|
class PipelineConverterStore<Key, OldOutput, NewOutput>(
|
||||||
private val delegate: PipelineStore<Key, Input, OldOutput>,
|
private val delegate: PipelineStore<Key, OldOutput>,
|
||||||
private val converter: (suspend (Key, OldOutput) -> NewOutput) = castConverter()
|
private val converter: (suspend (Key, OldOutput) -> NewOutput) = castConverter()
|
||||||
) : PipelineStore<Key, Input, NewOutput> {
|
) : PipelineStore<Key, NewOutput> {
|
||||||
override suspend fun get(key: Key): NewOutput? {
|
override suspend fun get(key: Key): NewOutput? {
|
||||||
return delegate.get(key)?.let {
|
return delegate.get(key)?.let {
|
||||||
converter(key, it)
|
converter(key, it)
|
||||||
|
|
|
@ -6,7 +6,7 @@ import kotlinx.coroutines.flow.Flow
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
internal class PipelineFetcherStore<Key, Output>(
|
internal class PipelineFetcherStore<Key, Output>(
|
||||||
private val fetcher: (Key) -> Flow<Output>
|
private val fetcher: (Key) -> Flow<Output>
|
||||||
) : PipelineStore<Key, NoInput, Output> {
|
) : PipelineStore<Key, Output> {
|
||||||
override suspend fun get(key: Key): Output? {
|
override suspend fun get(key: Key): Output? {
|
||||||
return fetcher(key).singleOrNull()
|
return fetcher(key).singleOrNull()
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,20 +1,18 @@
|
||||||
package com.nytimes.android.external.store3.pipeline
|
package com.nytimes.android.external.store3.pipeline
|
||||||
|
|
||||||
import kotlinx.coroutines.FlowPreview
|
import kotlinx.coroutines.FlowPreview
|
||||||
import kotlinx.coroutines.coroutineScope
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.collect
|
import kotlinx.coroutines.flow.collect
|
||||||
import kotlinx.coroutines.flow.flow
|
import kotlinx.coroutines.flow.flow
|
||||||
import kotlinx.coroutines.flow.switchMap
|
import kotlinx.coroutines.flow.switchMap
|
||||||
import kotlinx.coroutines.launch
|
|
||||||
|
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
class PipelinePersister<Key, Input, Output>(
|
class PipelinePersister<Key, Input, Output>(
|
||||||
private val fetcher: PipelineStore<Key, *, Input>,
|
private val fetcher: PipelineStore<Key, Input>,
|
||||||
private val reader: (Key) -> Flow<Output?>,
|
private val reader: (Key) -> Flow<Output?>,
|
||||||
private val writer: suspend (Key, Input) -> Unit,
|
private val writer: suspend (Key, Input) -> Unit,
|
||||||
private val delete: (suspend (Key) -> Unit)? = null
|
private val delete: (suspend (Key) -> Unit)? = null
|
||||||
) : PipelineStore<Key, Input, Output> {
|
) : PipelineStore<Key, Output> {
|
||||||
override suspend fun get(key: Key): Output? {
|
override suspend fun get(key: Key): Output? {
|
||||||
val value: Output? = reader(key).singleOrNull()
|
val value: Output? = reader(key).singleOrNull()
|
||||||
value?.let {
|
value?.let {
|
||||||
|
|
|
@ -14,7 +14,7 @@ internal class AbortFlowException :
|
||||||
// if this class becomes public, should probaly be named IntermediateStore to distingush from
|
// if this class becomes public, should probaly be named IntermediateStore to distingush from
|
||||||
// Store and also clarify that it still needs to be built/open? (how do we ensure?)
|
// Store and also clarify that it still needs to be built/open? (how do we ensure?)
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
interface PipelineStore<Key, Input, Output> {
|
interface PipelineStore<Key, Output> {
|
||||||
/**
|
/**
|
||||||
* Return a flow for the given key
|
* Return a flow for the given key
|
||||||
*/
|
*/
|
||||||
|
@ -48,7 +48,7 @@ interface PipelineStore<Key, Input, Output> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
fun <Key, Input, Output> PipelineStore<Key, Input, Output>.open(): Store<Output, Key> {
|
fun <Key, Output> PipelineStore<Key, Output>.open(): Store<Output, Key> {
|
||||||
val self = this
|
val self = this
|
||||||
return object : Store<Output, Key> {
|
return object : Store<Output, Key> {
|
||||||
override suspend fun get(key: Key) = self.get(key)!!
|
override suspend fun get(key: Key) = self.get(key)!!
|
||||||
|
|
Loading…
Reference in a new issue