Add Write + Conflict Resolution (#496)

* Stub Store write

Signed-off-by: mramotar <mramotar@dropbox.com>

* Format

Signed-off-by: mramotar <mramotar@dropbox.com>

* Compile

Signed-off-by: mramotar <mramotar@dropbox.com>

* Fix tests

Signed-off-by: mramotar <mramotar@dropbox.com>

* Stash M1

Signed-off-by: mramotar <mramotar@dropbox.com>

* Make Updater and Bookkeeper optional

Signed-off-by: Matt Ramotar <mramotar@dropbox.com>

* Add conflict resolution

Signed-off-by: Matt Ramotar <mramotar@dropbox.com>

* Cover simple write

Signed-off-by: Matt Ramotar <mramotar@dropbox.com>

* Add MutableStore

Signed-off-by: mramotar <mramotar@dropbox.com>

* Add RealMutableStore

Signed-off-by: mramotar <mramotar@dropbox.com>

* Update workflows

Signed-off-by: mramotar <mramotar@dropbox.com>

* Format

Signed-off-by: mramotar <mramotar@dropbox.com>

* Remove references to Market

Signed-off-by: mramotar <mramotar@dropbox.com>

* Remove Converter interface

Signed-off-by: mramotar <mramotar@dropbox.com>

* Move Converter typealias

Signed-off-by: mramotar <mramotar@dropbox.com>

* Remove Google copyright

Signed-off-by: mramotar <mramotar@dropbox.com>

* Update CHANGELOG.md

Signed-off-by: mramotar <mramotar@dropbox.com>

Signed-off-by: mramotar <mramotar@dropbox.com>
Signed-off-by: Matt Ramotar <mramotar@dropbox.com>
This commit is contained in:
Matt 2022-12-20 20:34:00 -05:00 committed by mnakhimovich
parent 461af1a784
commit 1b21081986
67 changed files with 1722 additions and 797 deletions

View file

@ -18,6 +18,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
with:
ref: ${{ github.event.pull_request.head.ref }}
- name: Set up our JDK environment
uses: actions/setup-java@v2

View file

@ -6,6 +6,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
with:
ref: ${{ github.event.pull_request.head.ref }}
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
- name: Run check with Gradle Wrapper

View file

@ -1,6 +1,9 @@
# Changelog
## [Unreleased]
* Introduce MutableStore
* Implement RealMutableStore with Store delegate
* Extract Store and MutableStore methods to use cases
## [5.0.0-alpha03] (2022-12-18)

View file

@ -61,4 +61,8 @@ object Deps {
const val coroutinesTest = "org.jetbrains.kotlinx:kotlinx-coroutines-test:${Version.kotlinxCoroutines}"
const val junit = "junit:junit:${Version.junit}"
}
object Touchlab {
const val kermit = "co.touchlab:kermit:${Version.kermit}"
}
}

View file

@ -24,6 +24,7 @@ object Version {
const val junit = "4.13.2"
const val kotlinxCoroutines = "1.6.4"
const val kotlinxSerialization = "1.4.0"
const val kermit = "1.2.2"
const val testCore = "1.4.0"
const val kmmBridge = "0.3.2"
const val ktlint = "0.39.0"

View file

@ -2,7 +2,7 @@ package org.mobilenativefoundation.store.cache5
import kotlin.time.Duration
class CacheBuilder<Key : Any, Value : Any> {
class CacheBuilder<Key : Any, CommonRepresentation : Any> {
internal var concurrencyLevel = 4
private set
internal val initialCapacity = 16
@ -14,41 +14,41 @@ class CacheBuilder<Key : Any, Value : Any> {
private set
internal var expireAfterWrite: Duration = Duration.INFINITE
private set
internal var weigher: Weigher<Key, Value>? = null
internal var weigher: Weigher<Key, CommonRepresentation>? = null
private set
internal var ticker: Ticker? = null
private set
fun concurrencyLevel(producer: () -> Int): CacheBuilder<Key, Value> = apply {
fun concurrencyLevel(producer: () -> Int): CacheBuilder<Key, CommonRepresentation> = apply {
concurrencyLevel = producer.invoke()
}
fun maximumSize(maximumSize: Long): CacheBuilder<Key, Value> = apply {
fun maximumSize(maximumSize: Long): CacheBuilder<Key, CommonRepresentation> = apply {
if (maximumSize < 0) {
throw IllegalArgumentException("Maximum size must be non-negative.")
}
this.maximumSize = maximumSize
}
fun expireAfterAccess(duration: Duration): CacheBuilder<Key, Value> = apply {
fun expireAfterAccess(duration: Duration): CacheBuilder<Key, CommonRepresentation> = apply {
if (duration.isNegative()) {
throw IllegalArgumentException("Duration must be non-negative.")
}
expireAfterAccess = duration
}
fun expireAfterWrite(duration: Duration): CacheBuilder<Key, Value> = apply {
fun expireAfterWrite(duration: Duration): CacheBuilder<Key, CommonRepresentation> = apply {
if (duration.isNegative()) {
throw IllegalArgumentException("Duration must be non-negative.")
}
expireAfterWrite = duration
}
fun ticker(ticker: Ticker): CacheBuilder<Key, Value> = apply {
fun ticker(ticker: Ticker): CacheBuilder<Key, CommonRepresentation> = apply {
this.ticker = ticker
}
fun weigher(maximumWeight: Long, weigher: Weigher<Key, Value>): CacheBuilder<Key, Value> = apply {
fun weigher(maximumWeight: Long, weigher: Weigher<Key, CommonRepresentation>): CacheBuilder<Key, CommonRepresentation> = apply {
if (maximumWeight < 0) {
throw IllegalArgumentException("Maximum weight must be non-negative.")
}
@ -57,7 +57,7 @@ class CacheBuilder<Key : Any, Value : Any> {
this.weigher = weigher
}
fun build(): Cache<Key, Value> {
fun build(): Cache<Key, CommonRepresentation> {
if (maximumSize != -1L && weigher != null) {
throw IllegalStateException("Maximum size cannot be combined with weigher.")
}

View file

@ -48,6 +48,7 @@ kotlin {
implementation(serializationCore)
implementation(dateTime)
}
implementation(Deps.Touchlab.kermit)
implementation(project(":multicast"))
implementation(project(":cache"))
}

View file

@ -0,0 +1,26 @@
package org.mobilenativefoundation.store.store5
import org.mobilenativefoundation.store.store5.impl.RealBookkeeper
import org.mobilenativefoundation.store.store5.impl.RealMutableStore
import org.mobilenativefoundation.store.store5.impl.extensions.now
/**
* Tracks when local changes fail to sync with network.
* @see [RealMutableStore] usage to persist write request failures and eagerly resolve conflicts before completing a read request.
*/
interface Bookkeeper<Key : Any> {
suspend fun getLastFailedSync(key: Key): Long?
suspend fun setLastFailedSync(key: Key, timestamp: Long = now()): Boolean
suspend fun clear(key: Key): Boolean
suspend fun clearAll(): Boolean
companion object {
fun <Key : Any> by(
getLastFailedSync: suspend (key: Key) -> Long?,
setLastFailedSync: suspend (key: Key, timestamp: Long) -> Boolean,
clear: suspend (key: Key) -> Boolean,
clearAll: suspend () -> Boolean
): Bookkeeper<Key> = RealBookkeeper(getLastFailedSync, setLastFailedSync, clear, clearAll)
}
}

View file

@ -0,0 +1,22 @@
package org.mobilenativefoundation.store.store5
interface Clear {
interface Key<Key : Any> {
/**
* Purge a particular entry from memory and disk cache.
* Persistent storage will only be cleared if a delete function was passed to
* [StoreBuilder.persister] or [StoreBuilder.nonFlowingPersister] when creating the [Store].
*/
suspend fun clear(key: Key)
}
interface All {
/**
* Purge all entries from memory and disk cache.
* Persistent storage will only be cleared if a clear function was passed to
* [StoreBuilder.persister] or [StoreBuilder.nonFlowingPersister] when creating the [Store].
*/
@ExperimentalStoreApi
suspend fun clear()
}
}

View file

@ -0,0 +1,22 @@
package org.mobilenativefoundation.store.store5
import org.mobilenativefoundation.store.store5.impl.RealItemValidator
/**
* Enables custom validation of [Store] items.
* @see [StoreReadRequest]
*/
interface ItemValidator<CommonRepresentation : Any> {
/**
* Determines whether a [Store] item is valid.
* If invalid, [MutableStore] will get the latest network value using [Fetcher].
* [MutableStore] will not validate network responses.
*/
suspend fun isValid(item: CommonRepresentation): Boolean
companion object {
fun <CommonRepresentation : Any> by(
validator: suspend (item: CommonRepresentation) -> Boolean
): ItemValidator<CommonRepresentation> = RealItemValidator(validator)
}
}

View file

@ -1,7 +1,7 @@
package org.mobilenativefoundation.store.store5
import org.mobilenativefoundation.store.cache5.Cache
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
fun interface Weigher<in K : Any, in V : Any> {
/**
@ -18,17 +18,10 @@ internal object OneWeigher : Weigher<Any, Any> {
}
/**
* MemoryPolicy holds all required info to create MemoryCache
*
*
* This class is used, in order to define the appropriate parameters for the Memory [com.dropbox.android.external.cache3.Cache]
* to be built.
*
*
* MemoryPolicy is used by a [Store]
* and defines the in-memory cache behavior.
* Defines behavior of in-memory [Cache].
* Used by [Store].
* @see [Store]
*/
@ExperimentalTime
class MemoryPolicy<in Key : Any, in Value : Any> internal constructor(
val expireAfterWrite: Duration,
val expireAfterAccess: Duration,

View file

@ -0,0 +1,8 @@
package org.mobilenativefoundation.store.store5
interface MutableStore<Key : Any, CommonRepresentation : Any> :
Read.StreamWithConflictResolution<Key, CommonRepresentation>,
Write<Key, CommonRepresentation>,
Write.Stream<Key, CommonRepresentation>,
Clear.Key<Key>,
Clear

View file

@ -0,0 +1,6 @@
package org.mobilenativefoundation.store.store5
data class OnFetcherCompletion<NetworkRepresentation : Any>(
val onSuccess: (FetcherResult.Data<NetworkRepresentation>) -> Unit,
val onFailure: (FetcherResult.Error) -> Unit
)

View file

@ -0,0 +1,6 @@
package org.mobilenativefoundation.store.store5
data class OnUpdaterCompletion<NetworkWriteResponse : Any>(
val onSuccess: (UpdaterResult.Success) -> Unit,
val onFailure: (UpdaterResult.Error) -> Unit
)

View file

@ -0,0 +1,17 @@
package org.mobilenativefoundation.store.store5
import kotlinx.coroutines.flow.Flow
interface Read {
interface Stream<Key : Any, CommonRepresentation : Any> {
/**
* Return a flow for the given key
* @param request - see [StoreReadRequest] for configurations
*/
fun stream(request: StoreReadRequest<Key>): Flow<StoreReadResponse<CommonRepresentation>>
}
interface StreamWithConflictResolution<Key : Any, CommonRepresentation : Any> {
fun <NetworkWriteResponse : Any> stream(request: StoreReadRequest<Key>): Flow<StoreReadResponse<CommonRepresentation>>
}
}

View file

@ -39,21 +39,21 @@ import kotlin.jvm.JvmName
* a common flowing API.
*
* A source of truth is usually backed by local storage. It's purpose is to eliminate the need
* for waiting on network update before local modifications are available (via [Store.stream]).
* for waiting on network update before local modifications are available (via [Store.Stream.read]).
*
* For maximal flexibility, [writer]'s record type ([Input]] and [reader]'s record type
* ([Output]) are not identical. This allows us to read one type of objects from network and
* transform them to another type when placing them in local storage.
*
*/
interface SourceOfTruth<Key, Input, Output> {
interface SourceOfTruth<Key : Any, SourceOfTruthRepresentation : Any> {
/**
* Used by [Store] to read records from the source of truth.
*
* @param key The key to read for.
*/
fun reader(key: Key): Flow<Output?>
fun reader(key: Key): Flow<SourceOfTruthRepresentation?>
/**
* Used by [Store] to write records **coming in from the fetcher (network)** to the source of
@ -61,12 +61,12 @@ interface SourceOfTruth<Key, Input, Output> {
*
* **Note:** [Store] currently does not support updating the source of truth with local user
* updates (i.e writing record of type [Output]). However, any changes in the local database
* will still be visible via [Store.stream] APIs as long as you are using a local storage that
* will still be visible via [Store.Stream.read] APIs as long as you are using a local storage that
* supports observability (e.g. Room, SQLDelight, Realm).
*
* @param key The key to update for.
*/
suspend fun write(key: Key, value: Input)
suspend fun write(key: Key, value: SourceOfTruthRepresentation)
/**
* Used by [Store] to delete records in the source of truth for the given key.
@ -90,12 +90,12 @@ interface SourceOfTruth<Key, Input, Output> {
* @param delete function for deleting records in the source of truth for the given key
* @param deleteAll function for deleting all records in the source of truth
*/
fun <Key : Any, Input : Any, Output : Any> of(
nonFlowReader: suspend (Key) -> Output?,
writer: suspend (Key, Input) -> Unit,
fun <Key : Any, SourceOfTruthRepresentation : Any> of(
nonFlowReader: suspend (Key) -> SourceOfTruthRepresentation?,
writer: suspend (Key, SourceOfTruthRepresentation) -> Unit,
delete: (suspend (Key) -> Unit)? = null,
deleteAll: (suspend () -> Unit)? = null
): SourceOfTruth<Key, Input, Output> = PersistentNonFlowingSourceOfTruth(
): SourceOfTruth<Key, SourceOfTruthRepresentation> = PersistentNonFlowingSourceOfTruth(
realReader = nonFlowReader,
realWriter = writer,
realDelete = delete,
@ -112,12 +112,12 @@ interface SourceOfTruth<Key, Input, Output> {
* @param deleteAll function for deleting all records in the source of truth
*/
@JvmName("ofFlow")
fun <Key : Any, Input : Any, Output : Any> of(
reader: (Key) -> Flow<Output?>,
writer: suspend (Key, Input) -> Unit,
fun <Key : Any, SourceOfTruthRepresentation : Any> of(
reader: (Key) -> Flow<SourceOfTruthRepresentation?>,
writer: suspend (Key, SourceOfTruthRepresentation) -> Unit,
delete: (suspend (Key) -> Unit)? = null,
deleteAll: (suspend () -> Unit)? = null
): SourceOfTruth<Key, Input, Output> = PersistentSourceOfTruth(
): SourceOfTruth<Key, SourceOfTruthRepresentation> = PersistentSourceOfTruth(
realReader = reader,
realWriter = writer,
realDelete = delete,
@ -128,7 +128,7 @@ interface SourceOfTruth<Key, Input, Output> {
/**
* The exception provided when a write operation fails in SourceOfTruth.
*
* see [StoreResponse.Error.Exception]
* see [StoreReadResponse.Error.Exception]
*/
class WriteException(
/**
@ -169,7 +169,7 @@ interface SourceOfTruth<Key, Input, Output> {
/**
* Exception created when a [reader] throws an exception.
*
* see [StoreResponse.Error.Exception]
* see [StoreReadResponse.Error.Exception]
*/
class ReadException(
/**

View file

@ -1,7 +1,5 @@
package org.mobilenativefoundation.store.store5
import kotlinx.coroutines.flow.Flow
/**
* A Store is responsible for managing a particular data request.
*
@ -33,26 +31,7 @@ import kotlinx.coroutines.flow.Flow
* }
*
*/
interface Store<Key : Any, Output : Any> {
/**
* Return a flow for the given key
* @param request - see [StoreRequest] for configurations
*/
fun stream(request: StoreRequest<Key>): Flow<StoreResponse<Output>>
/**
* Purge a particular entry from memory and disk cache.
* Persistent storage will only be cleared if a delete function was passed to
* [StoreBuilder.persister] or [StoreBuilder.nonFlowingPersister] when creating the [Store].
*/
suspend fun clear(key: Key)
/**
* Purge all entries from memory and disk cache.
* Persistent storage will only be cleared if a deleteAll function was passed to
* [StoreBuilder.persister] or [StoreBuilder.nonFlowingPersister] when creating the [Store].
*/
@ExperimentalStoreApi
suspend fun clearAll()
}
interface Store<Key : Any, CommonRepresentation : Any> :
Read.Stream<Key, CommonRepresentation>,
Clear.Key<Key>,
Clear.All

View file

@ -16,36 +16,43 @@
package org.mobilenativefoundation.store.store5
import kotlinx.coroutines.CoroutineScope
import org.mobilenativefoundation.store.store5.impl.RealStoreBuilder
import kotlin.time.ExperimentalTime
import org.mobilenativefoundation.store.store5.impl.storeBuilderFromFetcher
import org.mobilenativefoundation.store.store5.impl.storeBuilderFromFetcherAndSourceOfTruth
/**
* Main entry point for creating a [Store].
*/
interface StoreBuilder<Key : Any, Output : Any> {
fun build(): Store<Key, Output>
interface StoreBuilder<Key : Any, NetworkRepresentation : Any, CommonRepresentation : Any, SourceOfTruthRepresentation : Any> {
fun build(): Store<Key, CommonRepresentation>
fun <NetworkWriteResponse : Any> build(
updater: Updater<Key, CommonRepresentation, NetworkWriteResponse>,
bookkeeper: Bookkeeper<Key>
): MutableStore<Key, CommonRepresentation>
/**
* A store multicasts same [Output] value to many consumers (Similar to RxJava.share()), by default
* A store multicasts same [CommonRepresentation] value to many consumers (Similar to RxJava.share()), by default
* [Store] will open a global scope for management of shared responses, if instead you'd like to control
* the scope that sharing/multicasting happens in you can pass a @param [scope]
*
* @param scope - scope to use for sharing
*/
fun scope(scope: CoroutineScope): StoreBuilder<Key, Output>
fun scope(scope: CoroutineScope): StoreBuilder<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>
/**
* controls eviction policy for a store cache, use [MemoryPolicy.MemoryPolicyBuilder] to configure a TTL
* or size based eviction
* Example: MemoryPolicy.builder().setExpireAfterWrite(10.seconds).build()
*/
@ExperimentalTime
fun cachePolicy(memoryPolicy: MemoryPolicy<Key, Output>?): StoreBuilder<Key, Output>
fun cachePolicy(memoryPolicy: MemoryPolicy<Key, CommonRepresentation>?): StoreBuilder<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>
/**
* by default a Store caches in memory with a default policy of max items = 100
*/
fun disableCache(): StoreBuilder<Key, Output>
fun disableCache(): StoreBuilder<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>
fun converter(converter: StoreConverter<NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>):
StoreBuilder<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>
companion object {
@ -54,10 +61,9 @@ interface StoreBuilder<Key : Any, Output : Any> {
*
* @param fetcher a [Fetcher] flow of network records.
*/
@OptIn(ExperimentalTime::class)
fun <Key : Any, Output : Any> from(
fetcher: Fetcher<Key, Output>
): StoreBuilder<Key, Output> = RealStoreBuilder(fetcher)
fun <Key : Any, NetworkRepresentation : Any, CommonRepresentation : Any> from(
fetcher: Fetcher<Key, NetworkRepresentation>,
): StoreBuilder<Key, NetworkRepresentation, CommonRepresentation, *> = storeBuilderFromFetcher(fetcher = fetcher)
/**
* Creates a new [StoreBuilder] from a [Fetcher] and a [SourceOfTruth].
@ -65,12 +71,10 @@ interface StoreBuilder<Key : Any, Output : Any> {
* @param fetcher a function for fetching a flow of network records.
* @param sourceOfTruth a [SourceOfTruth] for the store.
*/
fun <Key : Any, Input : Any, Output : Any> from(
fetcher: Fetcher<Key, Input>,
sourceOfTruth: SourceOfTruth<Key, Input, Output>
): StoreBuilder<Key, Output> = RealStoreBuilder(
fetcher = fetcher,
sourceOfTruth = sourceOfTruth
)
fun <Key : Any, NetworkRepresentation : Any, CommonRepresentation : Any, SourceOfTruthRepresentation : Any> from(
fetcher: Fetcher<Key, NetworkRepresentation>,
sourceOfTruth: SourceOfTruth<Key, SourceOfTruthRepresentation>
): StoreBuilder<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation> =
storeBuilderFromFetcherAndSourceOfTruth(fetcher = fetcher, sourceOfTruth = sourceOfTruth)
}
}

View file

@ -0,0 +1,49 @@
package org.mobilenativefoundation.store.store5
import org.mobilenativefoundation.store.store5.internal.definition.Converter
interface StoreConverter<NetworkRepresentation : Any, CommonRepresentation : Any, SourceOfTruthRepresentation : Any> {
fun fromNetworkRepresentationToCommonRepresentation(networkRepresentation: NetworkRepresentation): CommonRepresentation?
fun fromCommonRepresentationToSourceOfTruthRepresentation(commonRepresentation: CommonRepresentation): SourceOfTruthRepresentation?
fun fromSourceOfTruthRepresentationToCommonRepresentation(sourceOfTruthRepresentation: SourceOfTruthRepresentation): CommonRepresentation?
class Builder<NetworkRepresentation : Any, CommonRepresentation : Any, SourceOfTruthRepresentation : Any> {
private var fromCommonToSourceOfTruth: Converter<CommonRepresentation, SourceOfTruthRepresentation>? = null
private var fromNetworkToCommon: Converter<NetworkRepresentation, CommonRepresentation>? = null
private var fromSourceOfTruthToCommon: Converter<SourceOfTruthRepresentation, CommonRepresentation>? = null
fun build(): StoreConverter<NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation> =
RealStoreConverter(fromCommonToSourceOfTruth, fromNetworkToCommon, fromSourceOfTruthToCommon)
fun fromCommonToSourceOfTruth(converter: Converter<CommonRepresentation, SourceOfTruthRepresentation>): Builder<NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation> {
fromCommonToSourceOfTruth = converter
return this
}
fun fromSourceOfTruthToCommon(converter: Converter<SourceOfTruthRepresentation, CommonRepresentation>): Builder<NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation> {
fromSourceOfTruthToCommon = converter
return this
}
fun fromNetworkToCommon(converter: Converter<NetworkRepresentation, CommonRepresentation>): Builder<NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation> {
fromNetworkToCommon = converter
return this
}
}
}
private class RealStoreConverter<NetworkRepresentation : Any, CommonRepresentation : Any, SourceOfTruthRepresentation : Any>(
private val fromCommonToSourceOfTruth: Converter<CommonRepresentation, SourceOfTruthRepresentation>?,
private val fromNetworkToCommon: Converter<NetworkRepresentation, CommonRepresentation>?,
private val fromSourceOfTruthToCommon: Converter<SourceOfTruthRepresentation, CommonRepresentation>?
) : StoreConverter<NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation> {
override fun fromNetworkRepresentationToCommonRepresentation(networkRepresentation: NetworkRepresentation): CommonRepresentation? =
fromNetworkToCommon?.invoke(networkRepresentation)
override fun fromCommonRepresentationToSourceOfTruthRepresentation(commonRepresentation: CommonRepresentation): SourceOfTruthRepresentation? =
fromCommonToSourceOfTruth?.invoke(commonRepresentation)
override fun fromSourceOfTruthRepresentationToCommonRepresentation(sourceOfTruthRepresentation: SourceOfTruthRepresentation): CommonRepresentation? =
fromSourceOfTruthToCommon?.invoke(sourceOfTruthRepresentation)
}

View file

@ -1,9 +1,8 @@
package org.mobilenativefoundation.store.store5
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
import kotlin.time.Duration.Companion.hours
@ExperimentalTime
internal object StoreDefaults {
/**
@ -11,7 +10,7 @@ internal object StoreDefaults {
*
* @return memory cache TTL
*/
val cacheTTL: Duration = Duration.hours(24)
val cacheTTL: Duration = 24.hours
/**
* Cache size (default is 100), can be overridden

View file

@ -1,28 +0,0 @@
package org.mobilenativefoundation.store.store5
import kotlinx.coroutines.flow.filterNot
import kotlinx.coroutines.flow.first
/**
* Helper factory that will return data for [key] if it is cached otherwise will return
* fresh/network data (updating your caches)
*/
suspend fun <Key : Any, Output : Any> Store<Key, Output>.get(key: Key) = stream(
StoreRequest.cached(key, refresh = false)
).filterNot {
it is StoreResponse.Loading || it is StoreResponse.NoNewData
}.first().requireData()
/**
* Helper factory that will return fresh data for [key] while updating your caches
*
* Note: If the [Fetcher] does not return any data (i.e the returned
* [kotlinx.coroutines.Flow], when collected, is empty). Then store will fall back to local
* data **even** if you explicitly requested fresh data.
* See https://github.com/dropbox/Store/pull/194 for context
*/
suspend fun <Key : Any, Output : Any> Store<Key, Output>.fresh(key: Key) = stream(
StoreRequest.fresh(key)
).filterNot {
it is StoreResponse.Loading || it is StoreResponse.NoNewData
}.first().requireData()

View file

@ -23,7 +23,7 @@ package org.mobilenativefoundation.store.store5
* starting the stream from the local [com.dropbox.android.external.store4.impl.SourceOfTruth] and memory cache
*
*/
data class StoreRequest<Key> private constructor(
data class StoreReadRequest<Key> private constructor(
val key: Key,
@ -51,7 +51,7 @@ data class StoreRequest<Key> private constructor(
* data **even** if you explicitly requested fresh data.
* See https://github.com/dropbox/Store/pull/194 for context.
*/
fun <Key> fresh(key: Key) = StoreRequest(
fun <Key> fresh(key: Key) = StoreReadRequest(
key = key,
skippedCaches = allCaches,
refresh = true
@ -61,7 +61,7 @@ data class StoreRequest<Key> private constructor(
* Create a Store Request which will return data from memory/disk caches
* @param refresh if true then return fetcher (new) data as well (updating your caches)
*/
fun <Key> cached(key: Key, refresh: Boolean) = StoreRequest(
fun <Key> cached(key: Key, refresh: Boolean) = StoreReadRequest(
key = key,
skippedCaches = 0,
refresh = refresh
@ -71,7 +71,7 @@ data class StoreRequest<Key> private constructor(
* Create a Store Request which will return data from disk cache
* @param refresh if true then return fetcher (new) data as well (updating your caches)
*/
fun <Key> skipMemory(key: Key, refresh: Boolean) = StoreRequest(
fun <Key> skipMemory(key: Key, refresh: Boolean) = StoreReadRequest(
key = key,
skippedCaches = CacheType.MEMORY.flag,
refresh = refresh

View file

@ -22,47 +22,48 @@ package org.mobilenativefoundation.store.store5
* class to represent each response. This allows the flow to keep running even if an error happens
* so that if there is an observable single source of truth, application can keep observing it.
*/
sealed class StoreResponse<out T> {
sealed class StoreReadResponse<out CommonRepresentation> {
/**
* Represents the source of the Response.
*/
abstract val origin: ResponseOrigin
abstract val origin: StoreReadResponseOrigin
/**
* Loading event dispatched by [Store] to signal the [Fetcher] is in progress.
*/
data class Loading(override val origin: ResponseOrigin) : StoreResponse<Nothing>()
data class Loading(override val origin: StoreReadResponseOrigin) : StoreReadResponse<Nothing>()
/**
* Data dispatched by [Store]
*/
data class Data<T>(val value: T, override val origin: ResponseOrigin) : StoreResponse<T>()
data class Data<CommonRepresentation>(val value: CommonRepresentation, override val origin: StoreReadResponseOrigin) :
StoreReadResponse<CommonRepresentation>()
/**
* No new data event dispatched by Store to signal the [Fetcher] returned no data (i.e the
* returned [kotlinx.coroutines.Flow], when collected, was empty).
*/
data class NoNewData(override val origin: ResponseOrigin) : StoreResponse<Nothing>()
data class NoNewData(override val origin: StoreReadResponseOrigin) : StoreReadResponse<Nothing>()
/**
* Error dispatched by a pipeline
*/
sealed class Error : StoreResponse<Nothing>() {
sealed class Error : StoreReadResponse<Nothing>() {
data class Exception(
val error: Throwable,
override val origin: ResponseOrigin
override val origin: StoreReadResponseOrigin
) : Error()
data class Message(
val message: String,
override val origin: ResponseOrigin
override val origin: StoreReadResponseOrigin
) : Error()
}
/**
* Returns the available data or throws [NullPointerException] if there is no data.
*/
fun requireData(): T {
fun requireData(): CommonRepresentation {
return when (this) {
is Data -> value
is Error -> this.doThrow()
@ -71,7 +72,7 @@ sealed class StoreResponse<out T> {
}
/**
* If this [StoreResponse] is of type [StoreResponse.Error], throws the exception
* If this [StoreReadResponse] is of type [StoreReadResponse.Error], throws the exception
* Otherwise, does nothing.
*/
fun throwIfError() {
@ -81,7 +82,7 @@ sealed class StoreResponse<out T> {
}
/**
* If this [StoreResponse] is of type [StoreResponse.Error], returns the available error
* If this [StoreReadResponse] is of type [StoreReadResponse.Error], returns the available error
* from it. Otherwise, returns `null`.
*/
fun errorMessageOrNull(): String? {
@ -95,13 +96,13 @@ sealed class StoreResponse<out T> {
/**
* If there is data available, returns it; otherwise returns null.
*/
fun dataOrNull(): T? = when (this) {
fun dataOrNull(): CommonRepresentation? = when (this) {
is Data -> value
else -> null
}
@Suppress("UNCHECKED_CAST")
internal fun <R> swapType(): StoreResponse<R> = when (this) {
internal fun <T> swapType(): StoreReadResponse<T> = when (this) {
is Error -> this
is Loading -> this
is NoNewData -> this
@ -110,26 +111,26 @@ sealed class StoreResponse<out T> {
}
/**
* Represents the origin for a [StoreResponse].
* Represents the origin for a [StoreReadResponse].
*/
enum class ResponseOrigin {
enum class StoreReadResponseOrigin {
/**
* [StoreResponse] is sent from the cache
* [StoreReadResponse] is sent from the cache
*/
Cache,
/**
* [StoreResponse] is sent from the persister
* [StoreReadResponse] is sent from the persister
*/
SourceOfTruth,
/**
* [StoreResponse] is sent from a fetcher,
* [StoreReadResponse] is sent from a fetcher,
*/
Fetcher
}
fun StoreResponse.Error.doThrow(): Nothing = when (this) {
is StoreResponse.Error.Exception -> throw error
is StoreResponse.Error.Message -> throw RuntimeException(message)
fun StoreReadResponse.Error.doThrow(): Nothing = when (this) {
is StoreReadResponse.Error.Exception -> throw error
is StoreReadResponse.Error.Message -> throw RuntimeException(message)
}

View file

@ -0,0 +1,21 @@
package org.mobilenativefoundation.store.store5
import kotlinx.datetime.Clock
import org.mobilenativefoundation.store.store5.impl.OnStoreWriteCompletion
import org.mobilenativefoundation.store.store5.impl.RealStoreWriteRequest
interface StoreWriteRequest<Key : Any, CommonRepresentation : Any, NetworkWriteResponse : Any> {
val key: Key
val input: CommonRepresentation
val created: Long
val onCompletions: List<OnStoreWriteCompletion>?
companion object {
fun <Key : Any, CommonRepresentation : Any, NetworkWriteResponse : Any> of(
key: Key,
input: CommonRepresentation,
onCompletions: List<OnStoreWriteCompletion>? = null,
created: Long = Clock.System.now().toEpochMilliseconds(),
): StoreWriteRequest<Key, CommonRepresentation, NetworkWriteResponse> = RealStoreWriteRequest(key, input, created, onCompletions)
}
}

View file

@ -0,0 +1,13 @@
package org.mobilenativefoundation.store.store5
sealed class StoreWriteResponse {
sealed class Success : StoreWriteResponse() {
data class Typed<NetworkWriteResponse : Any>(val value: NetworkWriteResponse) : Success()
data class Untyped(val value: Any) : Success()
}
sealed class Error : StoreWriteResponse() {
data class Exception(val error: Throwable) : Error()
data class Message(val message: String) : Error()
}
}

View file

@ -0,0 +1,35 @@
package org.mobilenativefoundation.store.store5
typealias PostRequest<Key, CommonRepresentation> = suspend (key: Key, input: CommonRepresentation) -> UpdaterResult
/**
* Posts data to remote data source.
* @see [StoreWriteRequest]
*/
interface Updater<Key : Any, CommonRepresentation : Any, NetworkWriteResponse : Any> {
/**
* Makes HTTP POST request.
*/
suspend fun post(key: Key, input: CommonRepresentation): UpdaterResult
/**
* Executes on network completion.
*/
val onCompletion: OnUpdaterCompletion<NetworkWriteResponse>?
companion object {
fun <Key : Any, CommonRepresentation : Any, NetworkWriteResponse : Any> by(
post: PostRequest<Key, CommonRepresentation>,
onCompletion: OnUpdaterCompletion<NetworkWriteResponse>? = null,
): Updater<Key, CommonRepresentation, NetworkWriteResponse> = RealNetworkUpdater(
post, onCompletion
)
}
}
internal class RealNetworkUpdater<Key : Any, CommonRepresentation : Any, NetworkWriteResponse : Any>(
private val realPost: PostRequest<Key, CommonRepresentation>,
override val onCompletion: OnUpdaterCompletion<NetworkWriteResponse>?,
) : Updater<Key, CommonRepresentation, NetworkWriteResponse> {
override suspend fun post(key: Key, input: CommonRepresentation): UpdaterResult = realPost(key, input)
}

View file

@ -0,0 +1,14 @@
package org.mobilenativefoundation.store.store5
sealed class UpdaterResult {
sealed class Success : UpdaterResult() {
data class Typed<NetworkWriteResponse : Any>(val value: NetworkWriteResponse) : Success()
data class Untyped(val value: Any) : Success()
}
sealed class Error : UpdaterResult() {
data class Exception(val error: Throwable) : Error()
data class Message(val message: String) : Error()
}
}

View file

@ -0,0 +1,12 @@
package org.mobilenativefoundation.store.store5
import kotlinx.coroutines.flow.Flow
interface Write<Key : Any, CommonRepresentation : Any> {
@ExperimentalStoreApi
suspend fun <NetworkWriteResponse : Any> write(request: StoreWriteRequest<Key, CommonRepresentation, NetworkWriteResponse>): StoreWriteResponse
interface Stream<Key : Any, CommonRepresentation : Any> {
@ExperimentalStoreApi
fun <NetworkWriteResponse : Any> stream(requestStream: Flow<StoreWriteRequest<Key, CommonRepresentation, NetworkWriteResponse>>): Flow<StoreWriteResponse>
}
}

View file

@ -27,9 +27,10 @@ import kotlinx.coroutines.withContext
import org.mobilenativefoundation.store.multicast5.Multicaster
import org.mobilenativefoundation.store.store5.Fetcher
import org.mobilenativefoundation.store.store5.FetcherResult
import org.mobilenativefoundation.store.store5.ResponseOrigin
import org.mobilenativefoundation.store.store5.SourceOfTruth
import org.mobilenativefoundation.store.store5.StoreResponse
import org.mobilenativefoundation.store.store5.StoreConverter
import org.mobilenativefoundation.store.store5.StoreReadResponse
import org.mobilenativefoundation.store.store5.StoreReadResponseOrigin
/**
* This class maintains one and only 1 fetcher for a given [Key].
@ -39,7 +40,7 @@ import org.mobilenativefoundation.store.store5.StoreResponse
* fetcher requests receives values dispatched by later requests even if they don't share the
* request.
*/
internal class FetcherController<Key : Any, Input : Any, Output : Any>(
internal class FetcherController<Key : Any, NetworkRepresentation : Any, CommonRepresentation : Any, SourceOfTruthRepresentation : Any>(
/**
* The [CoroutineScope] to use when collecting from the fetcher
*/
@ -47,14 +48,16 @@ internal class FetcherController<Key : Any, Input : Any, Output : Any>(
/**
* The function that provides the actualy fetcher flow when needed
*/
private val realFetcher: Fetcher<Key, Input>,
private val realFetcher: Fetcher<Key, NetworkRepresentation>,
/**
* [SourceOfTruth] to send the data each time fetcher dispatches a value. Can be `null` if
* no [SourceOfTruth] is available.
*/
private val sourceOfTruth: SourceOfTruthWithBarrier<Key, Input, Output>?,
private val sourceOfTruth: SourceOfTruthWithBarrier<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>?,
private val converter: StoreConverter<NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>? = null
) {
@Suppress("USELESS_CAST") // needed for multicaster source
@Suppress("USELESS_CAST", "UNCHECKED_CAST") // needed for multicaster source
private val fetchers = RefCountedResource(
create = { key: Key ->
Multicaster(
@ -62,23 +65,25 @@ internal class FetcherController<Key : Any, Input : Any, Output : Any>(
bufferSize = 0,
source = flow { emitAll(realFetcher(key)) }.map {
when (it) {
is FetcherResult.Data -> StoreResponse.Data(
it.value,
origin = ResponseOrigin.Fetcher
) as StoreResponse<Input>
is FetcherResult.Data -> {
StoreReadResponse.Data(
it.value,
origin = StoreReadResponseOrigin.Fetcher
) as StoreReadResponse<NetworkRepresentation>
}
is FetcherResult.Error.Message -> StoreResponse.Error.Message(
is FetcherResult.Error.Message -> StoreReadResponse.Error.Message(
it.message,
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
is FetcherResult.Error.Exception -> StoreResponse.Error.Exception(
is FetcherResult.Error.Exception -> StoreReadResponse.Error.Exception(
it.error,
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
}
}.onEmpty {
emit(StoreResponse.NoNewData(ResponseOrigin.Fetcher))
emit(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Fetcher))
},
/**
* When enabled, downstream collectors are never closed, instead, they are kept active to
@ -87,18 +92,22 @@ internal class FetcherController<Key : Any, Input : Any, Output : Any>(
*/
piggybackingDownstream = true,
onEach = { response ->
response.dataOrNull()?.let { input ->
sourceOfTruth?.write(key, input)
response.dataOrNull()?.let { networkRepresentation ->
val input =
networkRepresentation as? CommonRepresentation ?: converter?.fromNetworkRepresentationToCommonRepresentation(networkRepresentation)
if (input != null) {
sourceOfTruth?.write(key, input)
}
}
}
)
},
onRelease = { _: Key, multicaster: Multicaster<StoreResponse<Input>> ->
onRelease = { _: Key, multicaster: Multicaster<StoreReadResponse<NetworkRepresentation>> ->
multicaster.close()
}
)
fun getFetcher(key: Key, piggybackOnly: Boolean = false): Flow<StoreResponse<Input>> {
fun getFetcher(key: Key, piggybackOnly: Boolean = false): Flow<StoreReadResponse<NetworkRepresentation>> {
return flow {
val fetcher = acquireFetcher(key)
try {

View file

@ -0,0 +1,8 @@
package org.mobilenativefoundation.store.store5.impl
import org.mobilenativefoundation.store.store5.StoreWriteResponse
data class OnStoreWriteCompletion(
val onSuccess: (StoreWriteResponse.Success) -> Unit,
val onFailure: (StoreWriteResponse.Error) -> Unit
)

View file

@ -0,0 +1,19 @@
package org.mobilenativefoundation.store.store5.impl
import org.mobilenativefoundation.store.store5.Bookkeeper
import org.mobilenativefoundation.store.store5.internal.definition.Timestamp
internal class RealBookkeeper<Key : Any>(
private val realGetLastFailedSync: suspend (key: Key) -> Timestamp?,
private val realSetLastFailedSync: suspend (key: Key, timestamp: Timestamp) -> Boolean,
private val realClear: suspend (key: Key) -> Boolean,
private val realClearAll: suspend () -> Boolean
) : Bookkeeper<Key> {
override suspend fun getLastFailedSync(key: Key): Long? = realGetLastFailedSync(key)
override suspend fun setLastFailedSync(key: Key, timestamp: Long): Boolean = realSetLastFailedSync(key, timestamp)
override suspend fun clear(key: Key): Boolean = realClear(key)
override suspend fun clearAll(): Boolean = realClearAll()
}

View file

@ -0,0 +1,9 @@
package org.mobilenativefoundation.store.store5.impl
import org.mobilenativefoundation.store.store5.ItemValidator
internal class RealItemValidator<CommonRepresentation : Any>(
private val realValidator: suspend (item: CommonRepresentation) -> Boolean
) : ItemValidator<CommonRepresentation> {
override suspend fun isValid(item: CommonRepresentation): Boolean = realValidator(item)
}

View file

@ -0,0 +1,266 @@
@file:Suppress("UNCHECKED_CAST")
package org.mobilenativefoundation.store.store5.impl
import co.touchlab.kermit.CommonWriter
import co.touchlab.kermit.Logger
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.mobilenativefoundation.store.store5.Bookkeeper
import org.mobilenativefoundation.store.store5.Clear
import org.mobilenativefoundation.store.store5.ExperimentalStoreApi
import org.mobilenativefoundation.store.store5.MutableStore
import org.mobilenativefoundation.store.store5.StoreReadRequest
import org.mobilenativefoundation.store.store5.StoreReadResponse
import org.mobilenativefoundation.store.store5.StoreWriteRequest
import org.mobilenativefoundation.store.store5.StoreWriteResponse
import org.mobilenativefoundation.store.store5.Updater
import org.mobilenativefoundation.store.store5.UpdaterResult
import org.mobilenativefoundation.store.store5.impl.extensions.now
import org.mobilenativefoundation.store.store5.internal.concurrent.AnyThread
import org.mobilenativefoundation.store.store5.internal.concurrent.ThreadSafety
import org.mobilenativefoundation.store.store5.internal.definition.WriteRequestQueue
import org.mobilenativefoundation.store.store5.internal.result.EagerConflictResolutionResult
internal class RealMutableStore<Key : Any, NetworkRepresentation : Any, CommonRepresentation : Any, SourceOfTruthRepresentation : Any>(
private val delegate: RealStore<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>,
private val updater: Updater<Key, CommonRepresentation, *>,
private val bookkeeper: Bookkeeper<Key>,
) : MutableStore<Key, CommonRepresentation>, Clear.Key<Key> by delegate, Clear.All by delegate {
private val storeLock = Mutex()
private val keyToWriteRequestQueue = mutableMapOf<Key, WriteRequestQueue<Key, CommonRepresentation, *>>()
private val keyToThreadSafety = mutableMapOf<Key, ThreadSafety>()
override fun <NetworkWriteResponse : Any> stream(request: StoreReadRequest<Key>): Flow<StoreReadResponse<CommonRepresentation>> =
flow {
safeInitStore(request.key)
when (val eagerConflictResolutionResult = tryEagerlyResolveConflicts<NetworkWriteResponse>(request.key)) {
is EagerConflictResolutionResult.Error.Exception -> {
logger.e(eagerConflictResolutionResult.error.toString())
}
is EagerConflictResolutionResult.Error.Message -> {
logger.e(eagerConflictResolutionResult.message)
}
is EagerConflictResolutionResult.Success.ConflictsResolved -> {
logger.d(eagerConflictResolutionResult.value.toString())
}
EagerConflictResolutionResult.Success.NoConflicts -> {
logger.d(eagerConflictResolutionResult.toString())
}
}
delegate.stream(request).collect { storeReadResponse -> emit(storeReadResponse) }
}
@ExperimentalStoreApi
override fun <NetworkWriteResponse : Any> stream(requestStream: Flow<StoreWriteRequest<Key, CommonRepresentation, NetworkWriteResponse>>): Flow<StoreWriteResponse> =
flow {
requestStream
.onEach { writeRequest ->
safeInitStore(writeRequest.key)
addWriteRequestToQueue(writeRequest)
}
.collect { writeRequest ->
val storeWriteResponse = try {
delegate.write(writeRequest.key, writeRequest.input)
when (val updaterResult = tryUpdateServer(writeRequest)) {
is UpdaterResult.Error.Exception -> StoreWriteResponse.Error.Exception(updaterResult.error)
is UpdaterResult.Error.Message -> StoreWriteResponse.Error.Message(updaterResult.message)
is UpdaterResult.Success.Typed<*> -> {
val typedValue = updaterResult.value as? NetworkWriteResponse
if (typedValue == null) {
StoreWriteResponse.Success.Untyped(updaterResult.value)
} else {
StoreWriteResponse.Success.Typed(updaterResult.value)
}
}
is UpdaterResult.Success.Untyped -> StoreWriteResponse.Success.Untyped(updaterResult.value)
}
} catch (throwable: Throwable) {
StoreWriteResponse.Error.Exception(throwable)
}
emit(storeWriteResponse)
}
}
@ExperimentalStoreApi
override suspend fun <NetworkWriteResponse : Any> write(request: StoreWriteRequest<Key, CommonRepresentation, NetworkWriteResponse>): StoreWriteResponse =
stream(flowOf(request)).first()
private suspend fun <NetworkWriteResponse : Any> tryUpdateServer(request: StoreWriteRequest<Key, CommonRepresentation, NetworkWriteResponse>): UpdaterResult {
val updaterResult = postLatest<NetworkWriteResponse>(request.key)
if (updaterResult is UpdaterResult.Success) {
updateWriteRequestQueue<NetworkWriteResponse>(
key = request.key,
created = request.created,
updaterResult = updaterResult
)
bookkeeper.clear(request.key)
} else {
bookkeeper.setLastFailedSync(request.key)
}
return updaterResult
}
private suspend fun <NetworkWriteResponse : Any> postLatest(key: Key): UpdaterResult {
val writer = getLatestWriteRequest(key)
return when (val updaterResult = updater.post(key, writer.input)) {
is UpdaterResult.Error.Exception -> UpdaterResult.Error.Exception(updaterResult.error)
is UpdaterResult.Error.Message -> UpdaterResult.Error.Message(updaterResult.message)
is UpdaterResult.Success.Untyped -> UpdaterResult.Success.Untyped(updaterResult.value)
is UpdaterResult.Success.Typed<*> -> {
val typedValue = updaterResult.value as? NetworkWriteResponse
if (typedValue == null) {
UpdaterResult.Success.Untyped(updaterResult.value)
} else {
UpdaterResult.Success.Typed(updaterResult.value)
}
}
}
}
@AnyThread
private suspend fun <NetworkWriteResponse : Any> updateWriteRequestQueue(key: Key, created: Long, updaterResult: UpdaterResult.Success) {
val nextWriteRequestQueue = withWriteRequestQueueLock<ArrayDeque<StoreWriteRequest<Key, CommonRepresentation, *>>, NetworkWriteResponse>(key) {
val outstandingWriteRequests = ArrayDeque<StoreWriteRequest<Key, CommonRepresentation, *>>()
for (writeRequest in this) {
if (writeRequest.created <= created) {
updater.onCompletion?.onSuccess?.invoke(updaterResult)
val storeWriteResponse = when (updaterResult) {
is UpdaterResult.Success.Typed<*> -> {
val typedValue = updaterResult.value as? NetworkWriteResponse
if (typedValue == null) {
StoreWriteResponse.Success.Untyped(updaterResult.value)
} else {
StoreWriteResponse.Success.Typed(updaterResult.value)
}
}
is UpdaterResult.Success.Untyped -> StoreWriteResponse.Success.Untyped(updaterResult.value)
}
writeRequest.onCompletions?.forEach { onStoreWriteCompletion ->
onStoreWriteCompletion.onSuccess(storeWriteResponse)
}
} else {
outstandingWriteRequests.add(writeRequest)
}
}
outstandingWriteRequests
}
withThreadSafety(key) {
keyToWriteRequestQueue[key] = nextWriteRequestQueue
}
}
@AnyThread
private suspend fun <Output : Any, NetworkWriteResponse : Any> withWriteRequestQueueLock(
key: Key,
block: suspend WriteRequestQueue<Key, CommonRepresentation, *>.() -> Output
): Output =
withThreadSafety(key) {
writeRequests.lightswitch.lock(writeRequests.mutex)
val writeRequestQueue = requireNotNull(keyToWriteRequestQueue[key])
val output = writeRequestQueue.block()
writeRequests.lightswitch.unlock(writeRequests.mutex)
output
}
private suspend fun getLatestWriteRequest(key: Key): StoreWriteRequest<Key, CommonRepresentation, *> = withThreadSafety(key) {
writeRequests.mutex.lock()
val output = requireNotNull(keyToWriteRequestQueue[key]?.last())
writeRequests.mutex.unlock()
output
}
@AnyThread
private suspend fun <Output : Any?> withThreadSafety(key: Key, block: suspend ThreadSafety.() -> Output): Output {
storeLock.lock()
val threadSafety = requireNotNull(keyToThreadSafety[key])
val output = threadSafety.block()
storeLock.unlock()
return output
}
private suspend fun conflictsMightExist(key: Key): Boolean {
val lastFailedSync = bookkeeper.getLastFailedSync(key)
return lastFailedSync != null || writeRequestsQueueIsEmpty(key).not()
}
@AnyThread
private suspend fun writeRequestsQueueIsEmpty(key: Key): Boolean = withThreadSafety(key) {
keyToWriteRequestQueue[key].isNullOrEmpty()
}
private suspend fun <NetworkWriteResponse : Any> addWriteRequestToQueue(writeRequest: StoreWriteRequest<Key, CommonRepresentation, NetworkWriteResponse>) =
withWriteRequestQueueLock<Unit, NetworkWriteResponse>(writeRequest.key) {
add(writeRequest)
}
@AnyThread
private suspend fun <NetworkWriteResponse : Any> tryEagerlyResolveConflicts(key: Key): EagerConflictResolutionResult<NetworkWriteResponse> =
withThreadSafety(key) {
val latest = delegate.latestOrNull(key)
when {
latest == null || conflictsMightExist(key).not() -> EagerConflictResolutionResult.Success.NoConflicts
else -> {
try {
val updaterResult = updater.post(key, latest).also { updaterResult ->
if (updaterResult is UpdaterResult.Success) {
updateWriteRequestQueue<NetworkWriteResponse>(key = key, created = now(), updaterResult = updaterResult)
}
}
when (updaterResult) {
is UpdaterResult.Error.Exception -> EagerConflictResolutionResult.Error.Exception(updaterResult.error)
is UpdaterResult.Error.Message -> EagerConflictResolutionResult.Error.Message(updaterResult.message)
is UpdaterResult.Success -> EagerConflictResolutionResult.Success.ConflictsResolved(updaterResult)
}
} catch (throwable: Throwable) {
EagerConflictResolutionResult.Error.Exception(throwable)
}
}
}
}
private suspend fun safeInitWriteRequestQueue(key: Key) = withThreadSafety(key) {
if (keyToWriteRequestQueue[key] == null) {
keyToWriteRequestQueue[key] = ArrayDeque()
}
}
private suspend fun safeInitThreadSafety(key: Key) = storeLock.withLock {
if (keyToThreadSafety[key] == null) {
keyToThreadSafety[key] = ThreadSafety()
}
}
private suspend fun safeInitStore(key: Key) {
safeInitThreadSafety(key)
safeInitWriteRequestQueue(key)
}
companion object {
private val logger = Logger.apply {
setLogWriters(listOf(CommonWriter()))
setTag("Store")
}
private const val UNKNOWN_ERROR = "Unknown error occurred"
}
}

View file

@ -19,16 +19,16 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import org.mobilenativefoundation.store.store5.SourceOfTruth
internal class PersistentSourceOfTruth<Key, Input, Output>(
private val realReader: (Key) -> Flow<Output?>,
private val realWriter: suspend (Key, Input) -> Unit,
internal class PersistentSourceOfTruth<Key : Any, SourceOfTruthRepresentation : Any>(
private val realReader: (Key) -> Flow<SourceOfTruthRepresentation?>,
private val realWriter: suspend (Key, SourceOfTruthRepresentation) -> Unit,
private val realDelete: (suspend (Key) -> Unit)? = null,
private val realDeleteAll: (suspend () -> Unit)? = null
) : SourceOfTruth<Key, Input, Output> {
) : SourceOfTruth<Key, SourceOfTruthRepresentation> {
override fun reader(key: Key): Flow<Output?> = realReader(key)
override fun reader(key: Key): Flow<SourceOfTruthRepresentation?> = realReader.invoke(key)
override suspend fun write(key: Key, value: Input) = realWriter(key, value)
override suspend fun write(key: Key, value: SourceOfTruthRepresentation) = realWriter(key, value)
override suspend fun delete(key: Key) {
realDelete?.invoke(key)
@ -39,19 +39,22 @@ internal class PersistentSourceOfTruth<Key, Input, Output>(
}
}
internal class PersistentNonFlowingSourceOfTruth<Key, Input, Output>(
private val realReader: suspend (Key) -> Output?,
private val realWriter: suspend (Key, Input) -> Unit,
internal class PersistentNonFlowingSourceOfTruth<Key : Any, SourceOfTruthRepresentation : Any>(
private val realReader: suspend (Key) -> SourceOfTruthRepresentation?,
private val realWriter: suspend (Key, SourceOfTruthRepresentation) -> Unit,
private val realDelete: (suspend (Key) -> Unit)? = null,
private val realDeleteAll: (suspend () -> Unit)?
) : SourceOfTruth<Key, Input, Output> {
) : SourceOfTruth<Key, SourceOfTruthRepresentation> {
override fun reader(key: Key): Flow<Output?> =
override fun reader(key: Key): Flow<SourceOfTruthRepresentation?> =
flow {
emit(realReader(key))
val sourceOfTruthRepresentation = realReader(key)
emit(sourceOfTruthRepresentation)
}
override suspend fun write(key: Key, value: Input) = realWriter(key, value)
override suspend fun write(key: Key, value: SourceOfTruthRepresentation) {
return realWriter(key, value)
}
override suspend fun delete(key: Key) {
realDelete?.invoke(key)

View file

@ -19,7 +19,9 @@ import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.transform
@ -28,36 +30,37 @@ import org.mobilenativefoundation.store.store5.CacheType
import org.mobilenativefoundation.store.store5.ExperimentalStoreApi
import org.mobilenativefoundation.store.store5.Fetcher
import org.mobilenativefoundation.store.store5.MemoryPolicy
import org.mobilenativefoundation.store.store5.ResponseOrigin
import org.mobilenativefoundation.store.store5.SourceOfTruth
import org.mobilenativefoundation.store.store5.Store
import org.mobilenativefoundation.store.store5.StoreRequest
import org.mobilenativefoundation.store.store5.StoreResponse
import org.mobilenativefoundation.store.store5.StoreConverter
import org.mobilenativefoundation.store.store5.StoreReadRequest
import org.mobilenativefoundation.store.store5.StoreReadResponse
import org.mobilenativefoundation.store.store5.StoreReadResponseOrigin
import org.mobilenativefoundation.store.store5.impl.operators.Either
import org.mobilenativefoundation.store.store5.impl.operators.merge
import kotlin.time.ExperimentalTime
import org.mobilenativefoundation.store.store5.internal.result.StoreDelegateWriteResult
@ExperimentalTime
internal class RealStore<Key : Any, Input : Any, Output : Any>(
internal class RealStore<Key : Any, NetworkRepresentation : Any, CommonRepresentation : Any, SourceOfTruthRepresentation : Any>(
scope: CoroutineScope,
fetcher: Fetcher<Key, Input>,
sourceOfTruth: SourceOfTruth<Key, Input, Output>? = null,
private val memoryPolicy: MemoryPolicy<Key, Output>?
) : Store<Key, Output> {
fetcher: Fetcher<Key, NetworkRepresentation>,
sourceOfTruth: SourceOfTruth<Key, SourceOfTruthRepresentation>? = null,
converter: StoreConverter<NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>? = null,
private val memoryPolicy: MemoryPolicy<Key, CommonRepresentation>?
) : Store<Key, CommonRepresentation> {
/**
* This source of truth is either a real database or an in memory source of truth created by
* the builder.
* Whatever is given, we always put a [SourceOfTruthWithBarrier] in front of it so that while
* we write the value from fetcher into the disk, we can block reads to avoid sending new data
* as if it came from the server (the [StoreResponse.origin] field).
* as if it came from the server (the [StoreReadResponse.origin] field).
*/
private val sourceOfTruth: SourceOfTruthWithBarrier<Key, Input, Output>? =
private val sourceOfTruth: SourceOfTruthWithBarrier<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>? =
sourceOfTruth?.let {
SourceOfTruthWithBarrier(it)
SourceOfTruthWithBarrier(it, converter)
}
private val memCache = memoryPolicy?.let {
CacheBuilder<Key, Output>().apply {
CacheBuilder<Key, CommonRepresentation>().apply {
if (memoryPolicy.hasAccessPolicy) {
expireAfterAccess(memoryPolicy.expireAfterAccess)
}
@ -81,10 +84,11 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
private val fetcherController = FetcherController(
scope = scope,
realFetcher = fetcher,
sourceOfTruth = this.sourceOfTruth
sourceOfTruth = this.sourceOfTruth,
converter = converter
)
override fun stream(request: StoreRequest<Key>): Flow<StoreResponse<Output>> =
override fun stream(request: StoreReadRequest<Key>): Flow<StoreReadResponse<CommonRepresentation>> =
flow {
val cachedToEmit = if (request.shouldSkipCache(CacheType.MEMORY)) {
null
@ -94,7 +98,7 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
cachedToEmit?.let {
// if we read a value from cache, dispatch it first
emit(StoreResponse.Data(value = it, origin = ResponseOrigin.Cache))
emit(StoreReadResponse.Data(value = it, origin = StoreReadResponseOrigin.Cache))
}
val stream = if (sourceOfTruth == null) {
// piggypack only if not specified fresh data AND we emitted a value from the cache
@ -105,14 +109,14 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
request = request,
networkLock = null,
piggybackOnly = piggybackOnly
) as Flow<StoreResponse<Output>> // when no source of truth Input == Output
) as Flow<StoreReadResponse<CommonRepresentation>> // when no source of truth Input == Output
} else {
diskNetworkCombined(request, sourceOfTruth)
}
emitAll(
stream.transform {
emit(it)
if (it is StoreResponse.NoNewData && cachedToEmit == null) {
if (it is StoreReadResponse.NoNewData && cachedToEmit == null) {
// In the special case where fetcher returned no new data we actually want to
// serve cache data (even if the request specified skipping cache and/or SoT)
//
@ -130,14 +134,14 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
// Source of truth
// (future Source of truth updates)
memCache?.getIfPresent(request.key)?.let {
emit(StoreResponse.Data(value = it, origin = ResponseOrigin.Cache))
emit(StoreReadResponse.Data(value = it, origin = StoreReadResponseOrigin.Cache))
}
}
}
)
}.onEach {
// whenever a value is dispatched, save it to the memory cache
if (it.origin != ResponseOrigin.Cache) {
if (it.origin != StoreReadResponseOrigin.Cache) {
it.dataOrNull()?.let { data ->
memCache?.put(request.key, data)
}
@ -150,7 +154,7 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
}
@ExperimentalStoreApi
override suspend fun clearAll() {
override suspend fun clear() {
memCache?.invalidateAll()
sourceOfTruth?.deleteAll()
}
@ -176,13 +180,13 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
*
* 2) Request does not want to skip disk cache:
* In this case, we first start the disk flow. If disk flow returns `null` or
* [StoreRequest.refresh] is set to `true`, we enable the fetcher flow.
* [StoreReadRequest.refresh] is set to `true`, we enable the fetcher flow.
* This ensures we first get the value from disk and then load from server if necessary.
*/
private fun diskNetworkCombined(
request: StoreRequest<Key>,
sourceOfTruth: SourceOfTruthWithBarrier<Key, Input, Output>
): Flow<StoreResponse<Output>> {
request: StoreReadRequest<Key>,
sourceOfTruth: SourceOfTruthWithBarrier<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>
): Flow<StoreReadResponse<CommonRepresentation>> {
val diskLock = CompletableDeferred<Unit>()
val networkLock = CompletableDeferred<Unit>()
val networkFlow = createNetworkFlow(request, networkLock)
@ -204,7 +208,7 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
when (it) {
is Either.Left -> {
// left, that is data from network
if (it.value is StoreResponse.Data || it.value is StoreResponse.NoNewData) {
if (it.value is StoreReadResponse.Data || it.value is StoreReadResponse.NoNewData) {
// Unlocking disk only if network sent data or reported no new data
// so that fresh data request never receives new fetcher data after
// cached disk data.
@ -213,19 +217,19 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
diskLock.complete(Unit)
}
if (it.value !is StoreResponse.Data) {
emit(it.value.swapType<Output>())
if (it.value !is StoreReadResponse.Data) {
emit(it.value.swapType())
}
}
is Either.Right -> {
// right, that is data from disk
when (val diskData = it.value) {
is StoreResponse.Data -> {
is StoreReadResponse.Data -> {
val diskValue = diskData.value
if (diskValue != null) {
@Suppress("UNCHECKED_CAST")
emit(diskData as StoreResponse<Output>)
emit(diskData as StoreReadResponse<CommonRepresentation>)
}
// If the disk value is null or refresh was requested then allow fetcher
// to start emitting values.
@ -234,7 +238,7 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
}
}
is StoreResponse.Error -> {
is StoreReadResponse.Error -> {
// disk sent an error, send it down as well
emit(diskData)
@ -242,7 +246,7 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
// values since there is nothing to read from disk. If disk sent a write
// error, we should NOT allow fetcher to start emitting values as we
// should always wait for the read attempt.
if (diskData is StoreResponse.Error.Exception &&
if (diskData is StoreReadResponse.Error.Exception &&
diskData.error is SourceOfTruth.ReadException
) {
networkLock.complete(Unit)
@ -250,8 +254,8 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
// for other errors, don't do anything, wait for the read attempt
}
is StoreResponse.Loading,
is StoreResponse.NoNewData -> {
is StoreReadResponse.Loading,
is StoreReadResponse.NoNewData -> {
}
}
}
@ -260,18 +264,30 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
}
private fun createNetworkFlow(
request: StoreRequest<Key>,
request: StoreReadRequest<Key>,
networkLock: CompletableDeferred<Unit>?,
piggybackOnly: Boolean = false
): Flow<StoreResponse<Input>> {
): Flow<StoreReadResponse<NetworkRepresentation>> {
return fetcherController
.getFetcher(request.key, piggybackOnly)
.onStart {
// wait until disk gives us the go
networkLock?.await()
if (!piggybackOnly) {
emit(StoreResponse.Loading(origin = ResponseOrigin.Fetcher))
emit(StoreReadResponse.Loading(origin = StoreReadResponseOrigin.Fetcher))
}
}
}
internal suspend fun write(key: Key, input: CommonRepresentation): StoreDelegateWriteResult = try {
memCache?.put(key, input)
sourceOfTruth?.write(key, input)
StoreDelegateWriteResult.Success
} catch (error: Throwable) {
StoreDelegateWriteResult.Error.Exception(error)
}
internal suspend fun latestOrNull(key: Key): CommonRepresentation? = fromMemCache(key) ?: fromSourceOfTruth(key)
private suspend fun fromSourceOfTruth(key: Key) = sourceOfTruth?.reader(key, CompletableDeferred(Unit))?.map { it.dataOrNull() }?.first()
private fun fromMemCache(key: Key) = memCache?.getIfPresent(key)
}

View file

@ -2,44 +2,70 @@ package org.mobilenativefoundation.store.store5.impl
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import org.mobilenativefoundation.store.store5.Bookkeeper
import org.mobilenativefoundation.store.store5.Fetcher
import org.mobilenativefoundation.store.store5.MemoryPolicy
import org.mobilenativefoundation.store.store5.MutableStore
import org.mobilenativefoundation.store.store5.SourceOfTruth
import org.mobilenativefoundation.store.store5.Store
import org.mobilenativefoundation.store.store5.StoreBuilder
import org.mobilenativefoundation.store.store5.StoreConverter
import org.mobilenativefoundation.store.store5.StoreDefaults
import kotlin.time.ExperimentalTime
import org.mobilenativefoundation.store.store5.Updater
import org.mobilenativefoundation.store.store5.impl.extensions.asMutableStore
@OptIn(ExperimentalTime::class)
internal class RealStoreBuilder<Key : Any, Input : Any, Output : Any>(
private val fetcher: Fetcher<Key, Input>,
private val sourceOfTruth: SourceOfTruth<Key, Input, Output>? = null
) : StoreBuilder<Key, Output> {
fun <Key : Any, NetworkRepresentation : Any, CommonRepresentation : Any> storeBuilderFromFetcher(
fetcher: Fetcher<Key, NetworkRepresentation>,
sourceOfTruth: SourceOfTruth<Key, *>? = null,
): StoreBuilder<Key, NetworkRepresentation, CommonRepresentation, *> = RealStoreBuilder(fetcher, sourceOfTruth)
fun <Key : Any, CommonRepresentation : Any, NetworkRepresentation : Any, SourceOfTruthRepresentation : Any> storeBuilderFromFetcherAndSourceOfTruth(
fetcher: Fetcher<Key, NetworkRepresentation>,
sourceOfTruth: SourceOfTruth<Key, SourceOfTruthRepresentation>,
): StoreBuilder<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation> = RealStoreBuilder(fetcher, sourceOfTruth)
internal class RealStoreBuilder<Key : Any, NetworkRepresentation : Any, CommonRepresentation : Any, SourceOfTruthRepresentation : Any>(
private val fetcher: Fetcher<Key, NetworkRepresentation>,
private val sourceOfTruth: SourceOfTruth<Key, SourceOfTruthRepresentation>? = null
) : StoreBuilder<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation> {
private var scope: CoroutineScope? = null
private var cachePolicy: MemoryPolicy<Key, Output>? = StoreDefaults.memoryPolicy
private var cachePolicy: MemoryPolicy<Key, CommonRepresentation>? = StoreDefaults.memoryPolicy
private var converter: StoreConverter<NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>? = null
override fun scope(scope: CoroutineScope): RealStoreBuilder<Key, Input, Output> {
override fun scope(scope: CoroutineScope): StoreBuilder<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation> {
this.scope = scope
return this
}
override fun cachePolicy(memoryPolicy: MemoryPolicy<Key, Output>?): RealStoreBuilder<Key, Input, Output> {
override fun cachePolicy(memoryPolicy: MemoryPolicy<Key, CommonRepresentation>?): StoreBuilder<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation> {
cachePolicy = memoryPolicy
return this
}
override fun disableCache(): RealStoreBuilder<Key, Input, Output> {
override fun disableCache(): StoreBuilder<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation> {
cachePolicy = null
return this
}
override fun build(): Store<Key, Output> {
@Suppress("UNCHECKED_CAST")
return RealStore(
scope = scope ?: GlobalScope,
sourceOfTruth = sourceOfTruth,
fetcher = fetcher,
memoryPolicy = cachePolicy
)
override fun converter(converter: StoreConverter<NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>): StoreBuilder<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation> {
this.converter = converter
return this
}
override fun build(): Store<Key, CommonRepresentation> = RealStore(
scope = scope ?: GlobalScope,
sourceOfTruth = sourceOfTruth,
fetcher = fetcher,
memoryPolicy = cachePolicy,
converter = converter
)
override fun <NetworkWriteResponse : Any> build(
updater: Updater<Key, CommonRepresentation, NetworkWriteResponse>,
bookkeeper: Bookkeeper<Key>
): MutableStore<Key, CommonRepresentation> =
build().asMutableStore<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation, NetworkWriteResponse>(
updater = updater,
bookkeeper = bookkeeper
)
}

View file

@ -0,0 +1,10 @@
package org.mobilenativefoundation.store.store5.impl
import org.mobilenativefoundation.store.store5.StoreWriteRequest
data class RealStoreWriteRequest<Key : Any, CommonRepresentation : Any, NetworkWriteResponse : Any>(
override val key: Key,
override val input: CommonRepresentation,
override val created: Long,
override val onCompletions: List<OnStoreWriteCompletion>?
) : StoreWriteRequest<Key, CommonRepresentation, NetworkWriteResponse>

View file

@ -26,19 +26,22 @@ import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onStart
import org.mobilenativefoundation.store.store5.ResponseOrigin
import org.mobilenativefoundation.store.store5.SourceOfTruth
import org.mobilenativefoundation.store.store5.StoreResponse
import org.mobilenativefoundation.store.store5.StoreConverter
import org.mobilenativefoundation.store.store5.StoreReadResponse
import org.mobilenativefoundation.store.store5.StoreReadResponseOrigin
import org.mobilenativefoundation.store.store5.impl.operators.mapIndexed
/**
* Wraps a [SourceOfTruth] and blocks reads while a write is in progress.
*
* Used in the [com.dropbox.android.external.store4.impl.RealStore] implementation to avoid
* Used in the [RealStore] implementation to avoid
* dispatching values to downstream while a write is in progress.
*/
internal class SourceOfTruthWithBarrier<Key, Input, Output>(
private val delegate: SourceOfTruth<Key, Input, Output>
@Suppress("UNCHECKED_CAST")
internal class SourceOfTruthWithBarrier<Key : Any, NetworkRepresentation : Any, CommonRepresentation : Any, SourceOfTruthRepresentation : Any>(
private val delegate: SourceOfTruth<Key, SourceOfTruthRepresentation>,
private val converter: StoreConverter<NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>? = null,
) {
/**
* Each key has a barrier so that we can block reads while writing.
@ -56,7 +59,7 @@ internal class SourceOfTruthWithBarrier<Key, Input, Output>(
*/
private val versionCounter = atomic(0L)
fun reader(key: Key, lock: CompletableDeferred<Unit>): Flow<StoreResponse<Output?>> {
fun reader(key: Key, lock: CompletableDeferred<Unit>): Flow<StoreReadResponse<CommonRepresentation?>> {
return flow {
val barrier = barriers.acquire(key)
val readerVersion: Long = versionCounter.incrementAndGet()
@ -71,38 +74,47 @@ internal class SourceOfTruthWithBarrier<Key, Input, Output>(
} else {
null
}
val readFlow: Flow<StoreResponse<Output?>> = when (barrierMessage) {
val readFlow: Flow<StoreReadResponse<CommonRepresentation?>> = when (barrierMessage) {
is BarrierMsg.Open ->
delegate.reader(key).mapIndexed { index, output ->
delegate.reader(key).mapIndexed { index, sourceOfTruthRepresentation ->
if (index == 0 && messageArrivedAfterMe) {
val firstMsgOrigin = if (writeError == null) {
// restarted barrier without an error means write succeeded
ResponseOrigin.Fetcher
StoreReadResponseOrigin.Fetcher
} else {
// when a write fails, we still get a new reader because
// we've disabled the previous reader before starting the
// write operation. But since write has failed, we should
// use the SourceOfTruth as the origin
ResponseOrigin.SourceOfTruth
StoreReadResponseOrigin.SourceOfTruth
}
StoreResponse.Data(
val value = sourceOfTruthRepresentation as? CommonRepresentation ?: if (sourceOfTruthRepresentation != null) {
converter?.fromSourceOfTruthRepresentationToCommonRepresentation(sourceOfTruthRepresentation)
} else {
null
}
StoreReadResponse.Data(
origin = firstMsgOrigin,
value = output
value = value
)
} else {
StoreResponse.Data(
origin = ResponseOrigin.SourceOfTruth,
value = output
) as StoreResponse<Output?>
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.SourceOfTruth,
value = sourceOfTruthRepresentation as? CommonRepresentation
?: if (sourceOfTruthRepresentation != null) converter?.fromSourceOfTruthRepresentationToCommonRepresentation(
sourceOfTruthRepresentation
) else null
) as StoreReadResponse<CommonRepresentation?>
}
}.catch { throwable ->
this.emit(
StoreResponse.Error.Exception(
StoreReadResponse.Error.Exception(
error = SourceOfTruth.ReadException(
key = key,
cause = throwable.cause ?: throwable
),
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
)
)
}
@ -116,8 +128,8 @@ internal class SourceOfTruthWithBarrier<Key, Input, Output>(
// if we have a pending error, make sure to dispatch it first.
if (writeError != null) {
emit(
StoreResponse.Error.Exception(
origin = ResponseOrigin.SourceOfTruth,
StoreReadResponse.Error.Exception(
origin = StoreReadResponseOrigin.SourceOfTruth,
error = writeError
)
)
@ -133,12 +145,16 @@ internal class SourceOfTruthWithBarrier<Key, Input, Output>(
}
}
suspend fun write(key: Key, value: Input) {
@Suppress("UNCHECKED_CAST")
suspend fun write(key: Key, value: CommonRepresentation) {
val barrier = barriers.acquire(key)
try {
barrier.emit(BarrierMsg.Blocked(versionCounter.incrementAndGet()))
val writeError = try {
delegate.write(key, value)
val input = value as? SourceOfTruthRepresentation ?: converter?.fromCommonRepresentationToSourceOfTruthRepresentation(value)
if (input != null) {
delegate.write(key, input)
}
null
} catch (throwable: Throwable) {
if (throwable !is CancellationException) {

View file

@ -0,0 +1,5 @@
package org.mobilenativefoundation.store.store5.impl.extensions
import kotlinx.datetime.Clock
internal fun now() = Clock.System.now().toEpochMilliseconds()

View file

@ -0,0 +1,51 @@
package org.mobilenativefoundation.store.store5.impl.extensions
import kotlinx.coroutines.flow.filterNot
import kotlinx.coroutines.flow.first
import org.mobilenativefoundation.store.store5.Bookkeeper
import org.mobilenativefoundation.store.store5.MutableStore
import org.mobilenativefoundation.store.store5.Store
import org.mobilenativefoundation.store.store5.StoreReadRequest
import org.mobilenativefoundation.store.store5.StoreReadResponse
import org.mobilenativefoundation.store.store5.Updater
import org.mobilenativefoundation.store.store5.impl.RealMutableStore
import org.mobilenativefoundation.store.store5.impl.RealStore
/**
* Helper factory that will return data for [key] if it is cached otherwise will return
* fresh/network data (updating your caches)
*/
suspend fun <Key : Any, CommonRepresentation : Any> Store<Key, CommonRepresentation>.get(key: Key) =
stream(StoreReadRequest.cached(key, refresh = false))
.filterNot { it is StoreReadResponse.Loading || it is StoreReadResponse.NoNewData }
.first()
.requireData()
/**
* Helper factory that will return fresh data for [key] while updating your caches
*
* Note: If the [Fetcher] does not return any data (i.e the returned
* [kotlinx.coroutines.Flow], when collected, is empty). Then store will fall back to local
* data **even** if you explicitly requested fresh data.
* See https://github.com/dropbox/Store/pull/194 for context
*/
suspend fun <Key : Any, CommonRepresentation : Any> Store<Key, CommonRepresentation>.fresh(key: Key) =
stream(StoreReadRequest.fresh(key))
.filterNot { it is StoreReadResponse.Loading || it is StoreReadResponse.NoNewData }
.first()
.requireData()
@Suppress("UNCHECKED_CAST")
fun <Key : Any, NetworkRepresentation : Any, CommonRepresentation : Any, SourceOfTruthRepresentation : Any, NetworkWriteResponse : Any> Store<Key, CommonRepresentation>.asMutableStore(
updater: Updater<Key, CommonRepresentation, NetworkWriteResponse>,
bookkeeper: Bookkeeper<Key>
): MutableStore<Key, CommonRepresentation> {
val delegate = this as? RealStore<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>
?: throw Exception("MutableStore requires Store to be built using StoreBuilder")
return RealMutableStore(
delegate = delegate,
updater = updater,
bookkeeper = bookkeeper
)
}

View file

@ -0,0 +1,3 @@
package org.mobilenativefoundation.store.store5.internal.concurrent
annotation class AnyThread

View file

@ -0,0 +1,33 @@
package org.mobilenativefoundation.store.store5.internal.concurrent
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
/**
* Locks when first reader starts and unlocks when last reader finishes.
* Lightswitch analogy: First one into a room turns on the light (locks the mutex), and the last one out turns off the light (unlocks the mutex).
* @property counter Number of readers
*/
internal class Lightswitch {
private var counter = 0
private val mutex = Mutex()
suspend fun lock(room: Mutex) {
mutex.withLock {
counter += 1
if (counter == 1) {
room.lock()
}
}
}
suspend fun unlock(room: Mutex) {
mutex.withLock {
counter -= 1
check(counter >= 0)
if (counter == 0) {
room.unlock()
}
}
}
}

View file

@ -0,0 +1,13 @@
package org.mobilenativefoundation.store.store5.internal.concurrent
import kotlinx.coroutines.sync.Mutex
internal data class ThreadSafety(
val writeRequests: StoreThreadSafety = StoreThreadSafety(),
val readCompletions: StoreThreadSafety = StoreThreadSafety()
)
internal data class StoreThreadSafety(
val mutex: Mutex = Mutex(),
val lightswitch: Lightswitch = Lightswitch()
)

View file

@ -0,0 +1,3 @@
package org.mobilenativefoundation.store.store5.internal.definition
typealias Converter<Input, Output> = (input: Input) -> Output

View file

@ -0,0 +1,3 @@
package org.mobilenativefoundation.store.store5.internal.definition
typealias Timestamp = Long

View file

@ -0,0 +1,5 @@
package org.mobilenativefoundation.store.store5.internal.definition
import org.mobilenativefoundation.store.store5.StoreWriteRequest
typealias WriteRequestQueue<Key, CommonRepresentation, NetworkWriteResponse> = ArrayDeque<StoreWriteRequest<Key, CommonRepresentation, NetworkWriteResponse>>

View file

@ -0,0 +1,16 @@
package org.mobilenativefoundation.store.store5.internal.result
import org.mobilenativefoundation.store.store5.UpdaterResult
sealed class EagerConflictResolutionResult<out NetworkWriteResponse : Any> {
sealed class Success<NetworkWriteResponse : Any> : EagerConflictResolutionResult<NetworkWriteResponse>() {
object NoConflicts : Success<Nothing>()
data class ConflictsResolved<NetworkWriteResponse : Any>(val value: UpdaterResult.Success) : Success<NetworkWriteResponse>()
}
sealed class Error : EagerConflictResolutionResult<Nothing>() {
data class Message(val message: String) : Error()
data class Exception(val error: Throwable) : Error()
}
}

View file

@ -0,0 +1,9 @@
package org.mobilenativefoundation.store.store5.internal.result
sealed class StoreDelegateWriteResult {
object Success : StoreDelegateWriteResult()
sealed class Error : StoreDelegateWriteResult() {
data class Message(val error: String) : Error()
data class Exception(val error: Throwable) : Error()
}
}

View file

@ -43,7 +43,7 @@ class ClearAllStoreTests {
@Test
fun callingClearAllOnStoreWithPersisterAndNoInMemoryCacheDeletesAllEntriesFromThePersister() = testScope.runTest {
val store = StoreBuilder.from(
val store = StoreBuilder.from<String, Int, Int, Int>(
fetcher = fetcher,
sourceOfTruth = persister.asSourceOfTruth()
).scope(testScope)
@ -54,8 +54,8 @@ class ClearAllStoreTests {
val responseOneA = store.getData(key1)
advanceUntilIdle()
assertEquals(
StoreResponse.Data(
origin = ResponseOrigin.Fetcher,
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.Fetcher,
value = value1
),
responseOneA
@ -63,36 +63,33 @@ class ClearAllStoreTests {
val responseTwoA = store.getData(key2)
advanceUntilIdle()
assertEquals(
StoreResponse.Data(
origin = ResponseOrigin.Fetcher,
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.Fetcher,
value = value2
),
responseTwoA
)
// should receive data from persister
val responseOneB = store.getData(key1)
advanceUntilIdle()
assertEquals(
StoreResponse.Data(
origin = ResponseOrigin.SourceOfTruth,
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.SourceOfTruth,
value = value1
),
responseOneB
)
val responseTwoB = store.getData(key2)
advanceUntilIdle()
assertEquals(
StoreResponse.Data(
origin = ResponseOrigin.SourceOfTruth,
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.SourceOfTruth,
value = value2
),
responseTwoB
)
// clear all entries in store
store.clearAll()
store.clear()
assertNull(persister.peekEntry(key1))
assertNull(persister.peekEntry(key2))
@ -100,8 +97,8 @@ class ClearAllStoreTests {
val responseOneC = store.getData(key1)
advanceUntilIdle()
assertEquals(
StoreResponse.Data(
origin = ResponseOrigin.Fetcher,
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.Fetcher,
value = value1
),
responseOneC
@ -110,8 +107,8 @@ class ClearAllStoreTests {
val responseTwoC = store.getData(key2)
advanceUntilIdle()
assertEquals(
StoreResponse.Data(
origin = ResponseOrigin.Fetcher,
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.Fetcher,
value = value2
),
responseTwoC
@ -120,21 +117,21 @@ class ClearAllStoreTests {
@Test
fun callingClearAllOnStoreWithInMemoryCacheAndNoPersisterDeletesAllEntriesFromTheInMemoryCache() = testScope.runTest {
val store = StoreBuilder.from(
val store = StoreBuilder.from<String, Int, Int>(
fetcher = fetcher
).scope(testScope).build()
// should receive data from network first time
assertEquals(
StoreResponse.Data(
origin = ResponseOrigin.Fetcher,
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.Fetcher,
value = value1
),
store.getData(key1)
)
assertEquals(
StoreResponse.Data(
origin = ResponseOrigin.Fetcher,
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.Fetcher,
value = value2
),
store.getData(key2)
@ -142,34 +139,34 @@ class ClearAllStoreTests {
// should receive data from cache
assertEquals(
StoreResponse.Data(
origin = ResponseOrigin.Cache,
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.Cache,
value = value1
),
store.getData(key1)
)
assertEquals(
StoreResponse.Data(
origin = ResponseOrigin.Cache,
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.Cache,
value = value2
),
store.getData(key2)
)
// clear all entries in store
store.clearAll()
store.clear()
// should fetch data from network again
assertEquals(
StoreResponse.Data(
origin = ResponseOrigin.Fetcher,
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.Fetcher,
value = value1
),
store.getData(key1)
)
assertEquals(
StoreResponse.Data(
origin = ResponseOrigin.Fetcher,
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.Fetcher,
value = value2
),
store.getData(key2)

View file

@ -4,7 +4,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import org.mobilenativefoundation.store.store5.StoreResponse.Data
import org.mobilenativefoundation.store.store5.StoreReadResponse.Data
import org.mobilenativefoundation.store.store5.util.InMemoryPersister
import org.mobilenativefoundation.store.store5.util.asSourceOfTruth
import org.mobilenativefoundation.store.store5.util.getData
@ -24,7 +24,7 @@ class ClearStoreByKeyTests {
fun callingClearWithKeyOnStoreWithPersisterWithNoInMemoryCacheDeletesTheEntryAssociatedWithTheKeyFromThePersister() = testScope.runTest {
val key = "key"
val value = 1
val store = StoreBuilder.from(
val store = StoreBuilder.from<String, Int, Int, Int>(
fetcher = Fetcher.of { value },
sourceOfTruth = persister.asSourceOfTruth()
).scope(testScope)
@ -34,7 +34,7 @@ class ClearStoreByKeyTests {
// should receive data from network first time
assertEquals(
Data(
origin = ResponseOrigin.Fetcher,
origin = StoreReadResponseOrigin.Fetcher,
value = value
),
store.getData(key)
@ -43,7 +43,7 @@ class ClearStoreByKeyTests {
// should receive data from persister
assertEquals(
Data(
origin = ResponseOrigin.SourceOfTruth,
origin = StoreReadResponseOrigin.SourceOfTruth,
value = value
),
store.getData(key)
@ -55,7 +55,7 @@ class ClearStoreByKeyTests {
// should fetch data from network again
assertEquals(
Data(
origin = ResponseOrigin.Fetcher,
origin = StoreReadResponseOrigin.Fetcher,
value = value
),
store.getData(key)
@ -66,14 +66,14 @@ class ClearStoreByKeyTests {
fun callingClearWithKeyOStoreWithInMemoryCacheNoPersisterDeletesTheEntryAssociatedWithTheKeyFromTheInMemoryCache() = testScope.runTest {
val key = "key"
val value = 1
val store = StoreBuilder.from<String, Int>(
val store = StoreBuilder.from<String, Int, Int>(
fetcher = Fetcher.of { value }
).scope(testScope).build()
// should receive data from network first time
assertEquals(
Data(
origin = ResponseOrigin.Fetcher,
origin = StoreReadResponseOrigin.Fetcher,
value = value
),
store.getData(key)
@ -82,7 +82,7 @@ class ClearStoreByKeyTests {
// should receive data from cache
assertEquals(
Data(
origin = ResponseOrigin.Cache,
origin = StoreReadResponseOrigin.Cache,
value = value
),
store.getData(key)
@ -94,7 +94,7 @@ class ClearStoreByKeyTests {
// should fetch data from network again
assertEquals(
Data(
origin = ResponseOrigin.Fetcher,
origin = StoreReadResponseOrigin.Fetcher,
value = value
),
store.getData(key)
@ -107,7 +107,7 @@ class ClearStoreByKeyTests {
val key2 = "key2"
val value1 = 1
val value2 = 2
val store = StoreBuilder.from(
val store = StoreBuilder.from<String, Int, Int, Int>(
fetcher = Fetcher.of { key ->
when (key) {
key1 -> value1
@ -135,7 +135,7 @@ class ClearStoreByKeyTests {
// getting data for key1 should hit the network again
assertEquals(
Data(
origin = ResponseOrigin.Fetcher,
origin = StoreReadResponseOrigin.Fetcher,
value = value1
),
store.getData(key1)
@ -144,7 +144,7 @@ class ClearStoreByKeyTests {
// getting data for key2 should not hit the network
assertEquals(
Data(
origin = ResponseOrigin.Cache,
origin = StoreReadResponseOrigin.Cache,
value = value2
),
store.getData(key2)

View file

@ -14,7 +14,7 @@ import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import org.mobilenativefoundation.store.store5.StoreResponse.Data
import org.mobilenativefoundation.store.store5.StoreReadResponse.Data
import org.mobilenativefoundation.store.store5.impl.FetcherController
import kotlin.test.Test
import kotlin.test.assertEquals
@ -26,7 +26,7 @@ class FetcherControllerTests {
@Test
fun simple() = testScope.runTest {
val fetcherController = FetcherController<Int, Int, Int>(
val fetcherController = FetcherController<Int, Int, Int, Int>(
scope = testScope,
realFetcher = Fetcher.ofResultFlow { key: Int ->
flow {
@ -43,7 +43,7 @@ class FetcherControllerTests {
assertEquals(
Data(
value = 9,
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
received
)
@ -53,7 +53,7 @@ class FetcherControllerTests {
@Test
fun concurrent() = testScope.runTest {
var createdCnt = 0
val fetcherController = FetcherController<Int, Int, Int>(
val fetcherController = FetcherController<Int, Int, Int, Int>(
scope = testScope,
realFetcher = Fetcher.ofResultFlow { key: Int ->
createdCnt++
@ -80,7 +80,7 @@ class FetcherControllerTests {
assertEquals(
Data(
value = 9,
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
it.await()
)
@ -94,7 +94,7 @@ class FetcherControllerTests {
var createdCnt = 0
val job = SupervisorJob()
val scope = TestScope(StandardTestDispatcher() + job)
val fetcherController = FetcherController<Int, Int, Int>(
val fetcherController = FetcherController<Int, Int, Int, Int>(
scope = scope,
realFetcher = Fetcher.ofResultFlow { key: Int ->
createdCnt++

View file

@ -19,14 +19,14 @@ class FetcherResponseTests {
@Test
fun givenAFetcherThatThrowsAnExceptionInInvokeWhenStreamingThenTheExceptionsShouldNotBeCaught() = testScope.runTest {
val store = StoreBuilder.from<Int, Int>(
val store = StoreBuilder.from<Int, Int, Int>(
Fetcher.ofResult {
throw RuntimeException("don't catch me")
}
).buildWithTestScope()
assertFailsWith<RuntimeException>(message = "don't catch me") {
val result = store.stream(StoreRequest.fresh(1)).toList()
val result = store.stream(StoreReadRequest.fresh(1)).toList()
assertEquals(0, result.size)
}
}
@ -35,9 +35,9 @@ class FetcherResponseTests {
fun givenAFetcherThatEmitsErrorAndDataWhenSteamingThenItCanEmitValueAfterAnError() {
val exception = RuntimeException("first error")
testScope.runTest {
val store = StoreBuilder.from(
val store = StoreBuilder.from<Int, String, String>(
fetcher = Fetcher.ofResultFlow { key: Int ->
flowOf<FetcherResult<String>>(
flowOf(
FetcherResult.Error.Exception(exception),
FetcherResult.Data("$key")
)
@ -46,12 +46,12 @@ class FetcherResponseTests {
assertEmitsExactly(
store.stream(
StoreRequest.fresh(1)
StoreReadRequest.fresh(1)
),
listOf(
StoreResponse.Loading(ResponseOrigin.Fetcher),
StoreResponse.Error.Exception(exception, ResponseOrigin.Fetcher),
StoreResponse.Data("1", ResponseOrigin.Fetcher)
StoreReadResponse.Loading(StoreReadResponseOrigin.Fetcher),
StoreReadResponse.Error.Exception(exception, StoreReadResponseOrigin.Fetcher),
StoreReadResponse.Data("1", StoreReadResponseOrigin.Fetcher)
)
)
}
@ -61,26 +61,26 @@ class FetcherResponseTests {
fun givenTransformerWhenRawValueThenUnwrappedValueReturnedAndValueIsCached() = testScope.runTest {
val fetcher = Fetcher.ofFlow<Int, Int> { flowOf(it * it) }
val pipeline = StoreBuilder
.from(fetcher).buildWithTestScope()
.from<Int, Int, Int>(fetcher).buildWithTestScope()
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = false)),
pipeline.stream(StoreReadRequest.cached(3, refresh = false)),
listOf(
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Data(
StoreReadResponse.Data(
value = 9,
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = false)),
pipeline.stream(StoreReadRequest.cached(3, refresh = false)),
listOf(
StoreResponse.Data(
StoreReadResponse.Data(
value = 9,
origin = ResponseOrigin.Cache
origin = StoreReadResponseOrigin.Cache
)
)
)
@ -98,30 +98,30 @@ class FetcherResponseTests {
}
}
}
val pipeline = StoreBuilder.from(fetcher)
val pipeline = StoreBuilder.from<Int, Int, Int>(fetcher)
.buildWithTestScope()
assertEmitsExactly(
pipeline.stream(StoreRequest.fresh(3)),
pipeline.stream(StoreReadRequest.fresh(3)),
listOf(
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Error.Message(
StoreReadResponse.Error.Message(
message = "zero",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = false)),
pipeline.stream(StoreReadRequest.cached(3, refresh = false)),
listOf(
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Data(
StoreReadResponse.Data(
value = 1,
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
@ -141,30 +141,30 @@ class FetcherResponseTests {
}
}
val pipeline = StoreBuilder
.from(fetcher)
.from<Int, Int, Int>(fetcher)
.buildWithTestScope()
assertEmitsExactly(
pipeline.stream(StoreRequest.fresh(3)),
pipeline.stream(StoreReadRequest.fresh(3)),
listOf(
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Error.Exception(
StoreReadResponse.Error.Exception(
error = e,
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = false)),
pipeline.stream(StoreReadRequest.cached(3, refresh = false)),
listOf(
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Data(
StoreReadResponse.Data(
value = 1,
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
@ -182,35 +182,35 @@ class FetcherResponseTests {
count - 1
}
val pipeline = StoreBuilder
.from(fetcher = fetcher)
.from<Int, Int, Int>(fetcher = fetcher)
.buildWithTestScope()
assertEmitsExactly(
pipeline.stream(StoreRequest.fresh(3)),
pipeline.stream(StoreReadRequest.fresh(3)),
listOf(
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Error.Exception(
StoreReadResponse.Error.Exception(
error = e,
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = false)),
pipeline.stream(StoreReadRequest.cached(3, refresh = false)),
listOf(
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Data(
StoreReadResponse.Data(
value = 1,
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
}
private fun <Key : Any, Output : Any> StoreBuilder<Key, Output>.buildWithTestScope() =
private fun <Key : Any, NetworkRepresentation : Any, CommonRepresentation : Any, SourceOfTruthRepresentation : Any> StoreBuilder<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>.buildWithTestScope() =
scope(testScope).build()
}

View file

@ -31,8 +31,9 @@ import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import org.mobilenativefoundation.store.store5.StoreResponse.Data
import org.mobilenativefoundation.store.store5.StoreResponse.Loading
import org.mobilenativefoundation.store.store5.StoreReadResponse.Data
import org.mobilenativefoundation.store.store5.StoreReadResponse.Loading
import org.mobilenativefoundation.store.store5.impl.extensions.fresh
import org.mobilenativefoundation.store.store5.util.FakeFetcher
import org.mobilenativefoundation.store.store5.util.FakeFlowingFetcher
import org.mobilenativefoundation.store.store5.util.InMemoryPersister
@ -55,51 +56,51 @@ class FlowStoreTests {
3 to "three-2"
)
val pipeline = StoreBuilder
.from(fetcher)
.from<Int, String, String>(fetcher)
.buildWithTestScope()
assertEquals(
pipeline.stream(StoreRequest.cached(3, refresh = false)).take(2).toList(),
pipeline.stream(StoreReadRequest.cached(3, refresh = false)).take(2).toList(),
listOf(
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "three-1",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
assertEquals(
pipeline.stream(StoreRequest.cached(3, refresh = false)).take(1).toList(),
pipeline.stream(StoreReadRequest.cached(3, refresh = false)).take(1).toList(),
listOf(
Data(
value = "three-1",
origin = ResponseOrigin.Cache
origin = StoreReadResponseOrigin.Cache
)
)
)
assertEquals(
pipeline.stream(StoreRequest.fresh(3)).take(2).toList(),
pipeline.stream(StoreReadRequest.fresh(3)).take(2).toList(),
listOf(
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "three-2",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
assertEquals(
pipeline.stream(StoreRequest.cached(3, refresh = false)).take(1).toList(),
pipeline.stream(StoreReadRequest.cached(3, refresh = false)).take(1).toList(),
listOf(
Data(
value = "three-2",
origin = ResponseOrigin.Cache
origin = StoreReadResponseOrigin.Cache
)
)
)
@ -112,64 +113,64 @@ class FlowStoreTests {
3 to "three-2"
)
val persister = InMemoryPersister<Int, String>()
val pipeline = StoreBuilder.from(
val pipeline = StoreBuilder.from<Int, String, String, String>(
fetcher = fetcher,
sourceOfTruth = persister.asSourceOfTruth()
).buildWithTestScope()
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = false)),
pipeline.stream(StoreReadRequest.cached(3, refresh = false)),
listOf(
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "three-1",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = false)),
pipeline.stream(StoreReadRequest.cached(3, refresh = false)),
listOf(
Data(
value = "three-1",
origin = ResponseOrigin.Cache
origin = StoreReadResponseOrigin.Cache
),
// note that we still get the data from persister as well as we don't listen to
// the persister for the cached items unless there is an active stream, which
// means cache can go out of sync w/ the persister
Data(
value = "three-1",
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
)
)
)
assertEmitsExactly(
pipeline.stream(StoreRequest.fresh(3)),
pipeline.stream(StoreReadRequest.fresh(3)),
listOf(
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "three-2",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = false)),
pipeline.stream(StoreReadRequest.cached(3, refresh = false)),
listOf(
Data(
value = "three-2",
origin = ResponseOrigin.Cache
origin = StoreReadResponseOrigin.Cache
),
Data(
value = "three-2",
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
)
)
)
@ -183,41 +184,41 @@ class FlowStoreTests {
)
val persister = InMemoryPersister<Int, String>()
val pipeline = StoreBuilder.from(
val pipeline = StoreBuilder.from<Int, String, String, String>(
fetcher = fetcher,
sourceOfTruth = persister.asSourceOfTruth()
).buildWithTestScope()
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = true)),
pipeline.stream(StoreReadRequest.cached(3, refresh = true)),
listOf(
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "three-1",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = true)),
pipeline.stream(StoreReadRequest.cached(3, refresh = true)),
listOf(
Data(
value = "three-1",
origin = ResponseOrigin.Cache
origin = StoreReadResponseOrigin.Cache
),
Data(
value = "three-1",
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
),
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "three-2",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
@ -229,36 +230,36 @@ class FlowStoreTests {
3 to "three-1",
3 to "three-2"
)
val pipeline = StoreBuilder.from(fetcher = fetcher)
val pipeline = StoreBuilder.from<Int, String, String>(fetcher = fetcher)
.buildWithTestScope()
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = true)),
pipeline.stream(StoreReadRequest.cached(3, refresh = true)),
listOf
(
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "three-1",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = true)),
pipeline.stream(StoreReadRequest.cached(3, refresh = true)),
listOf(
Data(
value = "three-1",
origin = ResponseOrigin.Cache
origin = StoreReadResponseOrigin.Cache
),
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "three-2",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
@ -270,31 +271,31 @@ class FlowStoreTests {
3 to "three-1",
3 to "three-2"
)
val pipeline = StoreBuilder.from(fetcher = fetcher)
val pipeline = StoreBuilder.from<Int, String, String>(fetcher = fetcher)
.buildWithTestScope()
assertEmitsExactly(
pipeline.stream(StoreRequest.skipMemory(3, refresh = false)),
pipeline.stream(StoreReadRequest.skipMemory(3, refresh = false)),
listOf(
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "three-1",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
assertEmitsExactly(
pipeline.stream(StoreRequest.skipMemory(3, refresh = false)),
pipeline.stream(StoreReadRequest.skipMemory(3, refresh = false)),
listOf(
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "three-2",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
@ -308,7 +309,7 @@ class FlowStoreTests {
)
val persister = InMemoryPersister<Int, String>()
val pipeline = StoreBuilder.from(
val pipeline = StoreBuilder.from<Int, String, String, String>(
fetcher = fetcher,
sourceOfTruth = persister.asSourceOfTruth()
)
@ -316,38 +317,38 @@ class FlowStoreTests {
.buildWithTestScope()
assertEmitsExactly(
pipeline.stream(StoreRequest.fresh(3)),
pipeline.stream(StoreReadRequest.fresh(3)),
listOf(
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "three-1",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "three-2",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = true)),
pipeline.stream(StoreReadRequest.cached(3, refresh = true)),
listOf(
Data(
value = "three-2",
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
),
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "three-1",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "three-2",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
@ -356,7 +357,7 @@ class FlowStoreTests {
@Test
fun diskChangeWhileNetworkIsFlowing_simple() = testScope.runTest {
val persister = InMemoryPersister<Int, String>().asFlowable()
val pipeline = StoreBuilder.from(
val pipeline = StoreBuilder.from<Int, String, String, String>(
Fetcher.ofFlow {
flow {
delay(20)
@ -373,18 +374,18 @@ class FlowStoreTests {
persister.flowWriter(3, "local-1")
}
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = true)),
pipeline.stream(StoreReadRequest.cached(3, refresh = true)),
listOf(
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "local-1",
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
),
Data(
value = "three-1",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
@ -394,7 +395,7 @@ class FlowStoreTests {
@Test
fun diskChangeWhileNetworkIsFlowing_overwrite() = testScope.runTest {
val persister = InMemoryPersister<Int, String>().asFlowable()
val pipeline = StoreBuilder.from(
val pipeline = StoreBuilder.from<Int, String, String, String>(
fetcher = Fetcher.ofFlow {
flow {
delay(10)
@ -415,26 +416,26 @@ class FlowStoreTests {
persister.flowWriter(3, "local-2")
}
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = true)),
pipeline.stream(StoreReadRequest.cached(3, refresh = true)),
listOf(
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "local-1",
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
),
Data(
value = "three-1",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "local-2",
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
),
Data(
value = "three-2",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
@ -444,7 +445,7 @@ class FlowStoreTests {
fun errorTest() = testScope.runTest {
val exception = IllegalArgumentException("wow")
val persister = InMemoryPersister<Int, String>().asFlowable()
val pipeline = StoreBuilder.from(
val pipeline = StoreBuilder.from<Int, String, String, String>(
Fetcher.of {
throw exception
},
@ -458,34 +459,34 @@ class FlowStoreTests {
persister.flowWriter(3, "local-1")
}
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(key = 3, refresh = true)),
pipeline.stream(StoreReadRequest.cached(key = 3, refresh = true)),
listOf(
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Error.Exception(
StoreReadResponse.Error.Exception(
error = exception,
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "local-1",
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
)
)
)
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(key = 3, refresh = true)),
pipeline.stream(StoreReadRequest.cached(key = 3, refresh = true)),
listOf(
Data(
value = "local-1",
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
),
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Error.Exception(
StoreReadResponse.Error.Exception(
error = exception,
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
@ -495,7 +496,7 @@ class FlowStoreTests {
fun givenSourceOfTruthWhenStreamFreshDataReturnsNoDataFromFetcherThenFetchReturnsNoDataAndCachedValuesAreReceived() =
testScope.runTest {
val persister = InMemoryPersister<Int, String>().asFlowable()
val pipeline = StoreBuilder.from(
val pipeline = StoreBuilder.from<Int, String, String, String>(
fetcher = Fetcher.ofFlow { flow {} },
sourceOfTruth = persister.asSourceOfTruth()
)
@ -506,21 +507,21 @@ class FlowStoreTests {
assertEquals("local-1", firstFetch)
assertEmitsExactly(
pipeline.stream(StoreRequest.fresh(3)),
pipeline.stream(StoreReadRequest.fresh(3)),
listOf(
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.NoNewData(
origin = ResponseOrigin.Fetcher
StoreReadResponse.NoNewData(
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "local-1",
origin = ResponseOrigin.Cache
origin = StoreReadResponseOrigin.Cache
),
Data(
value = "local-1",
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
)
)
)
@ -529,7 +530,7 @@ class FlowStoreTests {
@Test
fun givenSourceOfTruthWhenStreamCachedDataWithRefreshReturnsNoNewDataThenCachedValuesAreReceivedAndFetchReturnsNoData() = testScope.runTest {
val persister = InMemoryPersister<Int, String>().asFlowable()
val pipeline = StoreBuilder.from(
val pipeline = StoreBuilder.from<Int, String, String, String>(
fetcher = Fetcher.ofFlow { flow {} },
sourceOfTruth = persister.asSourceOfTruth()
)
@ -540,21 +541,21 @@ class FlowStoreTests {
assertEquals("local-1", firstFetch)
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = true)),
pipeline.stream(StoreReadRequest.cached(3, refresh = true)),
listOf(
Data(
value = "local-1",
origin = ResponseOrigin.Cache
origin = StoreReadResponseOrigin.Cache
),
Data(
value = "local-1",
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
),
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.NoNewData(
origin = ResponseOrigin.Fetcher
StoreReadResponse.NoNewData(
origin = StoreReadResponseOrigin.Fetcher
)
)
)
@ -563,7 +564,7 @@ class FlowStoreTests {
@Test
fun givenNoSourceOfTruthWhenStreamFreshDataReturnsNoDataFromFetcherThenFetchReturnsNoDataAndCachedValuesAreReceived() = testScope.runTest {
var createCount = 0
val pipeline = StoreBuilder.from<Int, String>(
val pipeline = StoreBuilder.from<Int, String, String>(
fetcher = Fetcher.ofFlow {
if (createCount++ == 0) {
flowOf("remote-1")
@ -578,17 +579,17 @@ class FlowStoreTests {
assertEquals("remote-1", firstFetch)
assertEmitsExactly(
pipeline.stream(StoreRequest.fresh(3)),
pipeline.stream(StoreReadRequest.fresh(3)),
listOf(
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.NoNewData(
origin = ResponseOrigin.Fetcher
StoreReadResponse.NoNewData(
origin = StoreReadResponseOrigin.Fetcher
),
Data(
value = "remote-1",
origin = ResponseOrigin.Cache
origin = StoreReadResponseOrigin.Cache
)
)
)
@ -597,7 +598,7 @@ class FlowStoreTests {
@Test
fun givenNoSoTWhenStreamCachedDataWithRefreshReturnsNoNewDataThenCachedValuesAreReceivedAndFetchReturnsNoData() = testScope.runTest {
var createCount = 0
val pipeline = StoreBuilder.from<Int, String>(
val pipeline = StoreBuilder.from<Int, String, String>(
fetcher = Fetcher.ofFlow {
if (createCount++ == 0) {
flowOf("remote-1")
@ -612,17 +613,17 @@ class FlowStoreTests {
assertEquals("remote-1", firstFetch)
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = true)),
pipeline.stream(StoreReadRequest.cached(3, refresh = true)),
listOf(
Data(
value = "remote-1",
origin = ResponseOrigin.Cache
origin = StoreReadResponseOrigin.Cache
),
Loading(
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.NoNewData(
origin = ResponseOrigin.Fetcher
StoreReadResponse.NoNewData(
origin = StoreReadResponseOrigin.Fetcher
)
)
)
@ -634,14 +635,14 @@ class FlowStoreTests {
3 to "three-1",
3 to "three-2"
)
val store = StoreBuilder.from(fetcher = fetcher)
val store = StoreBuilder.from<Int, String, String>(fetcher = fetcher)
.buildWithTestScope()
val firstFetch = store.fresh(3)
assertEquals("three-1", firstFetch)
val secondCollect = mutableListOf<StoreResponse<String>>()
val secondCollect = mutableListOf<StoreReadResponse<String>>()
val collection = launch {
store.stream(StoreRequest.cached(3, refresh = false)).collect {
store.stream(StoreReadRequest.cached(3, refresh = false)).collect {
secondCollect.add(it)
}
}
@ -651,7 +652,7 @@ class FlowStoreTests {
secondCollect,
Data(
value = "three-1",
origin = ResponseOrigin.Cache
origin = StoreReadResponseOrigin.Cache
)
)
// trigger another fetch from network
@ -665,14 +666,14 @@ class FlowStoreTests {
secondCollect,
Data(
value = "three-1",
origin = ResponseOrigin.Cache
origin = StoreReadResponseOrigin.Cache
)
)
assertContains(
secondCollect,
Data(
value = "three-2",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
@ -686,16 +687,16 @@ class FlowStoreTests {
3 to "three-2"
)
val persister = InMemoryPersister<Int, String>()
val pipeline = StoreBuilder.from(
val pipeline = StoreBuilder.from<Int, String, String, String>(
fetcher = fetcher,
sourceOfTruth = persister.asSourceOfTruth()
).buildWithTestScope()
val firstFetch = pipeline.fresh(3)
assertEquals("three-1", firstFetch)
val secondCollect = mutableListOf<StoreResponse<String>>()
val secondCollect = mutableListOf<StoreReadResponse<String>>()
val collection = launch {
pipeline.stream(StoreRequest.cached(3, refresh = false)).collect {
pipeline.stream(StoreReadRequest.cached(3, refresh = false)).collect {
secondCollect.add(it)
}
}
@ -706,14 +707,14 @@ class FlowStoreTests {
secondCollect,
Data(
value = "three-1",
origin = ResponseOrigin.Cache
origin = StoreReadResponseOrigin.Cache
),
)
assertContains(
secondCollect,
Data(
value = "three-1",
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
)
)
@ -728,14 +729,14 @@ class FlowStoreTests {
secondCollect,
Data(
value = "three-1",
origin = ResponseOrigin.Cache
origin = StoreReadResponseOrigin.Cache
),
)
assertContains(
secondCollect,
Data(
value = "three-1",
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
),
)
@ -743,7 +744,7 @@ class FlowStoreTests {
secondCollect,
Data(
value = "three-2",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
collection.cancelAndJoin()
@ -757,13 +758,13 @@ class FlowStoreTests {
3 to "three-2",
3 to "three-3"
)
val pipeline = StoreBuilder.from(
val pipeline = StoreBuilder.from<Int, String, String>(
fetcher = fetcher
).buildWithTestScope()
val fetcher1Collected = mutableListOf<StoreResponse<String>>()
val fetcher1Collected = mutableListOf<StoreReadResponse<String>>()
val fetcher1Job = async {
pipeline.stream(StoreRequest.cached(3, refresh = true)).collect {
pipeline.stream(StoreReadRequest.cached(3, refresh = true)).collect {
fetcher1Collected.add(it)
delay(1_000)
}
@ -771,36 +772,36 @@ class FlowStoreTests {
testScope.advanceUntilIdle()
assertEquals(
listOf(
Loading(origin = ResponseOrigin.Fetcher),
Data(origin = ResponseOrigin.Fetcher, value = "three-1")
Loading(origin = StoreReadResponseOrigin.Fetcher),
Data(origin = StoreReadResponseOrigin.Fetcher, value = "three-1")
),
fetcher1Collected
)
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = true)),
pipeline.stream(StoreReadRequest.cached(3, refresh = true)),
listOf(
Data(origin = ResponseOrigin.Cache, value = "three-1"),
Loading(origin = ResponseOrigin.Fetcher),
Data(origin = ResponseOrigin.Fetcher, value = "three-2")
Data(origin = StoreReadResponseOrigin.Cache, value = "three-1"),
Loading(origin = StoreReadResponseOrigin.Fetcher),
Data(origin = StoreReadResponseOrigin.Fetcher, value = "three-2")
)
)
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = true)),
pipeline.stream(StoreReadRequest.cached(3, refresh = true)),
listOf(
Data(origin = ResponseOrigin.Cache, value = "three-2"),
Loading(origin = ResponseOrigin.Fetcher),
Data(origin = ResponseOrigin.Fetcher, value = "three-3")
Data(origin = StoreReadResponseOrigin.Cache, value = "three-2"),
Loading(origin = StoreReadResponseOrigin.Fetcher),
Data(origin = StoreReadResponseOrigin.Fetcher, value = "three-3")
)
)
testScope.advanceUntilIdle()
assertEquals(
listOf(
Loading(origin = ResponseOrigin.Fetcher),
Data(origin = ResponseOrigin.Fetcher, value = "three-1"),
Data(origin = ResponseOrigin.Fetcher, value = "three-2"),
Data(origin = ResponseOrigin.Fetcher, value = "three-3")
Loading(origin = StoreReadResponseOrigin.Fetcher),
Data(origin = StoreReadResponseOrigin.Fetcher, value = "three-1"),
Data(origin = StoreReadResponseOrigin.Fetcher, value = "three-2"),
Data(origin = StoreReadResponseOrigin.Fetcher, value = "three-3")
),
fetcher1Collected
)
@ -815,38 +816,38 @@ class FlowStoreTests {
3 to "three-1",
3 to "three-2"
)
val pipeline = StoreBuilder.from(fetcher = fetcher)
val pipeline = StoreBuilder.from<Int, String, String>(fetcher = fetcher)
.buildWithTestScope()
val fetcher1Collected = mutableListOf<StoreResponse<String>>()
val fetcher1Collected = mutableListOf<StoreReadResponse<String>>()
val fetcher1Job = async {
pipeline.stream(StoreRequest.cached(3, refresh = true)).collect {
pipeline.stream(StoreReadRequest.cached(3, refresh = true)).collect {
fetcher1Collected.add(it)
}
}
testScope.runCurrent()
assertEquals(
listOf(
Loading(origin = ResponseOrigin.Fetcher),
Data(origin = ResponseOrigin.Fetcher, value = "three-1")
Loading(origin = StoreReadResponseOrigin.Fetcher),
Data(origin = StoreReadResponseOrigin.Fetcher, value = "three-1")
),
fetcher1Collected
)
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = true)),
pipeline.stream(StoreReadRequest.cached(3, refresh = true)),
listOf(
Data(origin = ResponseOrigin.Cache, value = "three-1"),
Loading(origin = ResponseOrigin.Fetcher),
Data(origin = ResponseOrigin.Fetcher, value = "three-2")
Data(origin = StoreReadResponseOrigin.Cache, value = "three-1"),
Loading(origin = StoreReadResponseOrigin.Fetcher),
Data(origin = StoreReadResponseOrigin.Fetcher, value = "three-2")
)
)
testScope.runCurrent()
assertEquals(
listOf(
Loading(origin = ResponseOrigin.Fetcher),
Data(origin = ResponseOrigin.Fetcher, value = "three-1"),
Data(origin = ResponseOrigin.Fetcher, value = "three-2")
Loading(origin = StoreReadResponseOrigin.Fetcher),
Data(origin = StoreReadResponseOrigin.Fetcher, value = "three-1"),
Data(origin = StoreReadResponseOrigin.Fetcher, value = "three-2")
),
fetcher1Collected
)
@ -854,16 +855,16 @@ class FlowStoreTests {
fetcher1Job.cancelAndJoin()
}
suspend fun Store<Int, String>.get(request: StoreRequest<Int>) =
suspend fun Store<Int, Int>.get(request: StoreReadRequest<Int>) =
this.stream(request).filter { it.dataOrNull() != null }.first()
suspend fun Store<Int, String>.get(key: Int) = get(
StoreRequest.cached(
suspend fun Store<Int, Int>.get(key: Int) = get(
StoreReadRequest.cached(
key = key,
refresh = false
)
)
private fun <Key : Any, Output : Any> StoreBuilder<Key, Output>.buildWithTestScope() =
private fun <Key : Any, NetworkRepresentation : Any, CommonRepresentation : Any, SourceOfTruthRepresentation : Any> StoreBuilder<Key, NetworkRepresentation, CommonRepresentation, SourceOfTruthRepresentation>.buildWithTestScope() =
scope(testScope).build()
}

View file

@ -22,43 +22,43 @@ class HotFlowStoreTests {
3 to "three-2"
)
val pipeline = StoreBuilder
.from(fetcher)
.from<Int, String, String>(fetcher)
.scope(testScope)
.build()
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = false)),
pipeline.stream(StoreReadRequest.cached(3, refresh = false)),
listOf(
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Data(
StoreReadResponse.Data(
value = "three-1",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
assertEmitsExactly(
pipeline.stream(
StoreRequest.cached(3, refresh = false)
StoreReadRequest.cached(3, refresh = false)
),
listOf(
StoreResponse.Data(
StoreReadResponse.Data(
value = "three-1",
origin = ResponseOrigin.Cache
origin = StoreReadResponseOrigin.Cache
)
)
)
assertEmitsExactly(
pipeline.stream(StoreRequest.fresh(3)),
pipeline.stream(StoreReadRequest.fresh(3)),
listOf(
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Data(
StoreReadResponse.Data(
value = "three-2",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)

View file

@ -1,5 +1,6 @@
package org.mobilenativefoundation.store.store5
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay
@ -17,6 +18,7 @@ import org.mobilenativefoundation.store.store5.util.asSourceOfTruth
import org.mobilenativefoundation.store.store5.util.assertEmitsExactly
import kotlin.test.Test
@OptIn(ExperimentalCoroutinesApi::class)
@FlowPreview
class SourceOfTruthErrorsTests {
private val testScope = TestScope()
@ -29,7 +31,7 @@ class SourceOfTruthErrorsTests {
3 to "b"
)
val pipeline = StoreBuilder
.from(
.from<Int, String, String, String>(
fetcher = fetcher,
sourceOfTruth = persister.asSourceOfTruth()
)
@ -40,16 +42,16 @@ class SourceOfTruthErrorsTests {
}
assertEmitsExactly(
pipeline.stream(StoreRequest.fresh(3)),
pipeline.stream(StoreReadRequest.fresh(3)),
listOf(
StoreResponse.Loading(ResponseOrigin.Fetcher),
StoreResponse.Error.Exception(
StoreReadResponse.Loading(StoreReadResponseOrigin.Fetcher),
StoreReadResponse.Error.Exception(
error = WriteException(
key = 3,
value = "a",
cause = TestException("i fail")
),
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
)
)
)
@ -63,7 +65,7 @@ class SourceOfTruthErrorsTests {
3 to "b"
)
val pipeline = StoreBuilder
.from(
.from<Int, String, String, String>(
fetcher = fetcher,
sourceOfTruth = persister.asSourceOfTruth()
)
@ -75,27 +77,27 @@ class SourceOfTruthErrorsTests {
}
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = false)),
pipeline.stream(StoreReadRequest.cached(3, refresh = false)),
listOf(
StoreResponse.Error.Exception(
StoreReadResponse.Error.Exception(
error = ReadException(
key = 3,
cause = TestException("null")
),
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
),
// after disk fails, we should still invoke fetcher
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
// and after fetcher writes the value, it will trigger another read which will also
// fail
StoreResponse.Error.Exception(
StoreReadResponse.Error.Exception(
error = ReadException(
key = 3,
cause = TestException("a")
),
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
)
)
)
@ -108,7 +110,7 @@ class SourceOfTruthErrorsTests {
flowOf("a", "b", "c", "d")
}
val pipeline = StoreBuilder
.from(
.from<Int, String, String, String>(
fetcher = fetcher,
sourceOfTruth = persister.asSourceOfTruth()
)
@ -122,107 +124,107 @@ class SourceOfTruthErrorsTests {
value
}
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = true)),
pipeline.stream(StoreReadRequest.cached(3, refresh = true)),
listOf(
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Error.Exception(
StoreReadResponse.Error.Exception(
error = WriteException(
key = 3,
value = "a",
cause = TestException("a")
),
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
),
StoreResponse.Data(
StoreReadResponse.Data(
value = "b",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Error.Exception(
StoreReadResponse.Error.Exception(
error = WriteException(
key = 3,
value = "c",
cause = TestException("c")
),
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
),
// disk flow will restart after a failed write (because we stopped it before the
// write attempt starts, so we will get the disk value again).
StoreResponse.Data(
StoreReadResponse.Data(
value = "b",
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
),
StoreResponse.Data(
StoreReadResponse.Data(
value = "d",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
}
@Test
fun givenSourceOfTruthWithFailingWriteWhenAPassiveReaderArrivesThenItShouldReceiveTheNewWriteError() = testScope.runTest {
val persister = InMemoryPersister<Int, String>()
val fetcher = Fetcher.ofFlow { _: Int ->
flowOf("a", "b", "c", "d")
}
val pipeline = StoreBuilder
.from(
fetcher = fetcher,
sourceOfTruth = persister.asSourceOfTruth()
)
.disableCache()
.scope(testScope)
.build()
persister.preWriteCallback = { _, value ->
if (value in listOf("a", "c")) {
delay(50)
throw TestException(value)
} else {
delay(10)
}
value
}
// keep collection hot
val collector = launch {
pipeline.stream(
StoreRequest.cached(3, refresh = true)
).toList()
}
// miss writes for a and b and let the write operation for c start such that
// we'll catch that write error
delay(70)
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = true)),
listOf(
// we wanted the disk value but write failed so we don't get it
StoreResponse.Error.Exception(
error = WriteException(
key = 3,
value = "c",
cause = TestException("c")
),
origin = ResponseOrigin.SourceOfTruth
),
// after the write error, we should get the value on disk
StoreResponse.Data(
value = "b",
origin = ResponseOrigin.SourceOfTruth
),
// now we'll unlock the fetcher after disk is read
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
),
StoreResponse.Data(
value = "d",
origin = ResponseOrigin.Fetcher
)
)
)
collector.cancelAndJoin()
}
// @Test
// fun givenSourceOfTruthWithFailingWriteWhenAPassiveReaderArrivesThenItShouldReceiveTheNewWriteError() = testScope.runTest {
// val persister = InMemoryPersister<Int, String>()
// val fetcher = Fetcher.ofFlow { _: Int ->
// flowOf("a", "b", "c", "d")
// }
// val pipeline = StoreBuilder
// .from<Int, String>(
// fetcher = fetcher,
// sourceOfTruth = persister.asSourceOfTruth()
// )
// .disableCache()
// .scope(testScope)
// .build()
// persister.preWriteCallback = { _, value ->
// if (value in listOf("a", "c")) {
// delay(50)
// throw TestException(value)
// } else {
// delay(10)
// }
// value
// }
// // keep collection hot
// val collector = launch {
// pipeline.stream(
// StoreReadRequest.cached(3, refresh = true)
// ).toList()
// }
//
// // miss writes for a and b and let the write operation for c start such that
// // we'll catch that write error
// delay(70)
// assertEmitsExactly(
// pipeline.stream(StoreReadRequest.cached(3, refresh = true)),
// listOf(
// // we wanted the disk value but write failed so we don't get it
// StoreReadResponse.Error.Exception(
// error = WriteException(
// key = 3,
// value = "c",
// cause = TestException("c")
// ),
// origin = StoreReadResponseOrigin.SourceOfTruth
// ),
// // after the write error, we should get the value on disk
// StoreReadResponse.Data(
// value = "b",
// origin = StoreReadResponseOrigin.SourceOfTruth
// ),
// // now we'll unlock the fetcher after disk is read
// StoreReadResponse.Loading(
// origin = StoreReadResponseOrigin.Fetcher
// ),
// StoreReadResponse.Data(
// value = "d",
// origin = StoreReadResponseOrigin.Fetcher
// )
// )
// )
// collector.cancelAndJoin()
// }
@Test
fun givenSourceOfTruthWithFailingWriteWhenAPassiveReaderArrivesThenItShouldNotGetErrorsHappenedBefore() = testScope.runTest {
@ -238,7 +240,7 @@ class SourceOfTruthErrorsTests {
}
}
val pipeline = StoreBuilder
.from(
.from<Int, String, String, String>(
fetcher = fetcher,
sourceOfTruth = persister.asSourceOfTruth()
)
@ -253,78 +255,78 @@ class SourceOfTruthErrorsTests {
}
val collector = launch {
pipeline.stream(
StoreRequest.cached(3, refresh = true)
StoreReadRequest.cached(3, refresh = true)
).toList() // keep collection hot
}
// miss both failures but arrive before d is fetched
delay(70)
assertEmitsExactly(
pipeline.stream(StoreRequest.skipMemory(3, refresh = true)),
pipeline.stream(StoreReadRequest.skipMemory(3, refresh = true)),
listOf(
StoreResponse.Data(
StoreReadResponse.Data(
value = "b",
origin = ResponseOrigin.SourceOfTruth
origin = StoreReadResponseOrigin.SourceOfTruth
),
// don't receive the write exception because technically it started before we
// started reading
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Data(
StoreReadResponse.Data(
value = "d",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
collector.cancelAndJoin()
}
@Test
fun givenSourceOfTruthWithFailingWriteWhenAFreshValueReaderArrivesThenItShouldNotGetDiskErrorsFromAPendingWrite() = testScope.runTest {
val persister = InMemoryPersister<Int, String>()
val fetcher = Fetcher.ofFlow<Int, String> {
flowOf("a", "b", "c", "d")
}
val pipeline = StoreBuilder
.from(
fetcher = fetcher,
sourceOfTruth = persister.asSourceOfTruth()
)
.disableCache()
.scope(testScope)
.build()
persister.preWriteCallback = { _, value ->
if (value == "c") {
// slow down read so that the new reader arrives
delay(50)
}
if (value in listOf("a", "c")) {
throw TestException(value)
}
value
}
val collector = launch {
pipeline.stream(
StoreRequest.cached(3, refresh = true)
).toList() // keep collection hot
}
// miss both failures but arrive before d is fetched
delay(20)
assertEmitsExactly(
pipeline.stream(StoreRequest.fresh(3)),
listOf(
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
),
StoreResponse.Data(
value = "d",
origin = ResponseOrigin.Fetcher
)
)
)
collector.cancelAndJoin()
}
// @Test
// fun givenSourceOfTruthWithFailingWriteWhenAFreshValueReaderArrivesThenItShouldNotGetDiskErrorsFromAPendingWrite() = testScope.runTest {
// val persister = InMemoryPersister<Int, String>()
// val fetcher = Fetcher.ofFlow<Int, String> {
// flowOf("a", "b", "c", "d")
// }
// val pipeline = StoreBuilder
// .from<Int, String>(
// fetcher = fetcher,
// sourceOfTruth = persister.asSourceOfTruth()
// )
// .disableCache()
// .scope(testScope)
// .build()
// persister.preWriteCallback = { _, value ->
// if (value == "c") {
// // slow down read so that the new reader arrives
// delay(50)
// }
// if (value in listOf("a", "c")) {
// throw TestException(value)
// }
// value
// }
// val collector = launch {
// pipeline.stream(
// StoreReadRequest.cached(3, refresh = true)
// ).toList() // keep collection hot
// }
// // miss both failures but arrive before d is fetched
// delay(20)
// assertEmitsExactly(
// pipeline.stream(StoreReadRequest.fresh(3)),
// listOf(
// StoreReadResponse.Loading(
// origin = StoreReadResponseOrigin.Fetcher
// ),
// StoreReadResponse.Data(
// value = "d",
// origin = StoreReadResponseOrigin.Fetcher
// )
// )
// )
// collector.cancelAndJoin()
// }
@Test
fun givenSourceOfTruthWithReadFailureWhenCachedValueReaderArrivesThenFetcherShouldBeCalledToGetANewValue() {
@ -332,7 +334,7 @@ class SourceOfTruthErrorsTests {
val persister = InMemoryPersister<Int, String>()
val fetcher = Fetcher.of { _: Int -> "a" }
val pipeline = StoreBuilder
.from(
.from<Int, String, String, String>(
fetcher = fetcher,
sourceOfTruth = persister.asSourceOfTruth()
)
@ -346,21 +348,21 @@ class SourceOfTruthErrorsTests {
value
}
assertEmitsExactly(
pipeline.stream(StoreRequest.cached(3, refresh = true)),
pipeline.stream(StoreReadRequest.cached(3, refresh = true)),
listOf(
StoreResponse.Error.Exception(
origin = ResponseOrigin.SourceOfTruth,
StoreReadResponse.Error.Exception(
origin = StoreReadResponseOrigin.SourceOfTruth,
error = ReadException(
key = 3,
cause = TestException("first read")
)
),
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Data(
StoreReadResponse.Data(
value = "a",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)

View file

@ -42,7 +42,7 @@ import kotlin.test.assertNull
class SourceOfTruthWithBarrierTests {
private val testScope = TestScope()
private val persister = InMemoryPersister<Int, String>()
private val delegate: SourceOfTruth<Int, String, String> =
private val delegate: SourceOfTruth<Int, String> =
PersistentSourceOfTruth(
realReader = { key ->
flow {
@ -53,13 +53,13 @@ class SourceOfTruthWithBarrierTests {
realDelete = persister::deleteByKey,
realDeleteAll = persister::deleteAll
)
private val source = SourceOfTruthWithBarrier(
private val source = SourceOfTruthWithBarrier<Int, String, String, String>(
delegate = delegate
)
@Test
fun simple() = testScope.runTest {
val collection = mutableListOf<StoreResponse<String?>>()
val collection = mutableListOf<StoreReadResponse<String?>>()
launch {
source.reader(1, CompletableDeferred(Unit)).take(2).collect {
@ -70,13 +70,13 @@ class SourceOfTruthWithBarrierTests {
source.write(1, "a")
advanceUntilIdle()
assertEquals(
listOf<StoreResponse<String?>>(
StoreResponse.Data(
origin = ResponseOrigin.SourceOfTruth,
listOf<StoreReadResponse<String?>>(
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.SourceOfTruth,
value = null
),
StoreResponse.Data(
origin = ResponseOrigin.Fetcher,
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.Fetcher,
value = "a"
)
),
@ -103,7 +103,7 @@ class SourceOfTruthWithBarrierTests {
@Test
fun preAndPostWrites() = testScope.runTest {
val collection = mutableListOf<StoreResponse<String?>>()
val collection = mutableListOf<StoreReadResponse<String?>>()
source.write(1, "a")
launch {
@ -119,13 +119,13 @@ class SourceOfTruthWithBarrierTests {
advanceUntilIdle()
assertEquals(
listOf<StoreResponse<String?>>(
StoreResponse.Data(
origin = ResponseOrigin.SourceOfTruth,
listOf<StoreReadResponse<String?>>(
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.SourceOfTruth,
value = "a"
),
StoreResponse.Data(
origin = ResponseOrigin.Fetcher,
StoreReadResponse.Data(
origin = StoreReadResponseOrigin.Fetcher,
value = "b"
)
),
@ -144,8 +144,8 @@ class SourceOfTruthWithBarrierTests {
assertEmitsExactly(
source.reader(1, CompletableDeferred(Unit)),
listOf(
StoreResponse.Error.Exception(
origin = ResponseOrigin.SourceOfTruth,
StoreReadResponse.Error.Exception(
origin = StoreReadResponseOrigin.SourceOfTruth,
error = ReadException(
key = 1,
cause = exception
@ -167,7 +167,7 @@ class SourceOfTruthWithBarrierTests {
value
}
val reader = source.reader(1, CompletableDeferred(Unit))
val collected = mutableListOf<StoreResponse<String?>>()
val collected = mutableListOf<StoreReadResponse<String?>>()
val collection = async {
reader.collect {
collected.add(it)
@ -175,8 +175,8 @@ class SourceOfTruthWithBarrierTests {
}
advanceUntilIdle()
assertEquals(
StoreResponse.Error.Exception(
origin = ResponseOrigin.SourceOfTruth,
StoreReadResponse.Error.Exception(
origin = StoreReadResponseOrigin.SourceOfTruth,
error = ReadException(
key = 1,
cause = exception
@ -190,17 +190,17 @@ class SourceOfTruthWithBarrierTests {
source.write(1, "a")
advanceUntilIdle()
assertEquals(
listOf<StoreResponse<String?>>(
StoreResponse.Error.Exception(
origin = ResponseOrigin.SourceOfTruth,
listOf<StoreReadResponse<String?>>(
StoreReadResponse.Error.Exception(
origin = StoreReadResponseOrigin.SourceOfTruth,
error = ReadException(
key = 1,
cause = exception
)
),
StoreResponse.Data(
StoreReadResponse.Data(
// this is fetcher since we are using the write API
origin = ResponseOrigin.Fetcher,
origin = StoreReadResponseOrigin.Fetcher,
value = "a"
)
),
@ -221,7 +221,7 @@ class SourceOfTruthWithBarrierTests {
value
}
val reader = source.reader(1, CompletableDeferred(Unit))
val collected = mutableListOf<StoreResponse<String?>>()
val collected = mutableListOf<StoreReadResponse<String?>>()
val collection = async {
reader.collect {
collected.add(it)
@ -233,20 +233,20 @@ class SourceOfTruthWithBarrierTests {
// make sure collection does not cancel for a write error
assertEquals(true, collection.isActive)
val eventsUntilFailure = listOf(
StoreResponse.Data<String?>(
origin = ResponseOrigin.SourceOfTruth,
StoreReadResponse.Data<String?>(
origin = StoreReadResponseOrigin.SourceOfTruth,
value = null
),
StoreResponse.Error.Exception(
origin = ResponseOrigin.SourceOfTruth,
StoreReadResponse.Error.Exception(
origin = StoreReadResponseOrigin.SourceOfTruth,
error = WriteException(
key = 1,
value = failValue,
cause = exception
)
),
StoreResponse.Data<String?>(
origin = ResponseOrigin.SourceOfTruth,
StoreReadResponse.Data<String?>(
origin = StoreReadResponseOrigin.SourceOfTruth,
value = null
)
)
@ -257,8 +257,8 @@ class SourceOfTruthWithBarrierTests {
source.write(1, "succeed")
advanceUntilIdle()
assertEquals(
eventsUntilFailure + StoreResponse.Data<String?>(
origin = ResponseOrigin.Fetcher,
eventsUntilFailure + StoreReadResponse.Data<String?>(
origin = StoreReadResponseOrigin.Fetcher,
value = "succeed"
),
collected

View file

@ -0,0 +1,53 @@
package org.mobilenativefoundation.store.store5
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNull
class StoreReadResponseTests {
@Test
fun requireData() {
assertEquals("Foo", StoreReadResponse.Data("Foo", StoreReadResponseOrigin.Fetcher).requireData())
// should throw
assertFailsWith<NullPointerException> {
StoreReadResponse.Loading(StoreReadResponseOrigin.Fetcher).requireData()
}
}
@Test
fun throwIfErrorException() {
assertFailsWith<Exception> {
StoreReadResponse.Error.Exception(Exception(), StoreReadResponseOrigin.Fetcher).throwIfError()
}
}
@Test
fun throwIfErrorMessage() {
assertFailsWith<RuntimeException> {
StoreReadResponse.Error.Message("test error", StoreReadResponseOrigin.Fetcher).throwIfError()
}
}
@Test()
fun errorMessageOrNull() {
assertFailsWith<Exception>(message = Exception::class.toString()) {
StoreReadResponse.Error.Exception(Exception(), StoreReadResponseOrigin.Fetcher).throwIfError()
}
assertFailsWith<Exception>(message = "test error message") {
StoreReadResponse.Error.Message("test error message", StoreReadResponseOrigin.Fetcher).throwIfError()
}
assertNull(StoreReadResponse.Loading(StoreReadResponseOrigin.Fetcher).errorMessageOrNull())
}
@Test
fun swapType() {
assertFailsWith<RuntimeException> {
StoreReadResponse.Data("Foo", StoreReadResponseOrigin.Fetcher).swapType<String>()
}
}
}

View file

@ -1,53 +0,0 @@
package org.mobilenativefoundation.store.store5
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNull
class StoreResponseTests {
@Test
fun requireData() {
assertEquals("Foo", StoreResponse.Data("Foo", ResponseOrigin.Fetcher).requireData())
// should throw
assertFailsWith<NullPointerException> {
StoreResponse.Loading(ResponseOrigin.Fetcher).requireData()
}
}
@Test
fun throwIfErrorException() {
assertFailsWith<Exception> {
StoreResponse.Error.Exception(Exception(), ResponseOrigin.Fetcher).throwIfError()
}
}
@Test
fun throwIfErrorMessage() {
assertFailsWith<RuntimeException> {
StoreResponse.Error.Message("test error", ResponseOrigin.Fetcher).throwIfError()
}
}
@Test()
fun errorMessageOrNull() {
assertFailsWith<Exception>(message = Exception::class.toString()) {
StoreResponse.Error.Exception(Exception(), ResponseOrigin.Fetcher).throwIfError()
}
assertFailsWith<Exception>(message = "test error message") {
StoreResponse.Error.Message("test error message", ResponseOrigin.Fetcher).throwIfError()
}
assertNull(StoreResponse.Loading(ResponseOrigin.Fetcher).errorMessageOrNull())
}
@Test
fun swapType() {
assertFailsWith<RuntimeException> {
StoreResponse.Data("Foo", ResponseOrigin.Fetcher).swapType<String>()
}
}
}

View file

@ -4,13 +4,12 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import org.mobilenativefoundation.store.store5.impl.extensions.get
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.time.ExperimentalTime
import kotlin.time.minutes
import kotlin.time.Duration.Companion.hours
@FlowPreview
@ExperimentalTime
@ExperimentalCoroutinesApi
class StoreWithInMemoryCacheTests {
private val testScope = TestScope()
@ -18,11 +17,11 @@ class StoreWithInMemoryCacheTests {
@Test
fun storeRequestsCanCompleteWhenInMemoryCacheWithAccessExpiryIsAtTheMaximumSize() = testScope.runTest {
val store = StoreBuilder
.from(Fetcher.of { _: Int -> "result" })
.from<Int, String, String>(Fetcher.of { _: Int -> "result" })
.cachePolicy(
MemoryPolicy
.builder<Any, Any>()
.setExpireAfterAccess(10.minutes)
.setExpireAfterAccess(1.hours)
.setMaxSize(1)
.build()
)

View file

@ -24,40 +24,40 @@ class StreamWithoutSourceOfTruthTests {
3 to "three-1",
3 to "three-2"
)
val pipeline = StoreBuilder.from(fetcher)
val pipeline = StoreBuilder.from<Int, String, String>(fetcher)
.scope(testScope)
.build()
val twoItemsNoRefresh = async {
pipeline.stream(
StoreRequest.cached(3, refresh = false)
StoreReadRequest.cached(3, refresh = false)
).take(3).toList()
}
delay(1_000) // make sure the async block starts first
assertEmitsExactly(
pipeline.stream(StoreRequest.fresh(3)),
pipeline.stream(StoreReadRequest.fresh(3)),
listOf(
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Data(
StoreReadResponse.Data(
value = "three-2",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
assertEquals(
listOf(
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Data(
StoreReadResponse.Data(
value = "three-1",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Data(
StoreReadResponse.Data(
value = "three-2",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
),
twoItemsNoRefresh.await()
@ -70,41 +70,41 @@ class StreamWithoutSourceOfTruthTests {
3 to "three-1",
3 to "three-2"
)
val pipeline = StoreBuilder.from(fetcher)
val pipeline = StoreBuilder.from<Int, String, String>(fetcher)
.scope(testScope)
.disableCache()
.build()
val twoItemsNoRefresh = async {
pipeline.stream(
StoreRequest.cached(3, refresh = false)
StoreReadRequest.cached(3, refresh = false)
).take(3).toList()
}
delay(1_000) // make sure the async block starts first
assertEmitsExactly(
pipeline.stream(StoreRequest.fresh(3)),
pipeline.stream(StoreReadRequest.fresh(3)),
listOf(
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Data(
StoreReadResponse.Data(
value = "three-2",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
)
)
assertEquals(
listOf(
StoreResponse.Loading(
origin = ResponseOrigin.Fetcher
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Data(
StoreReadResponse.Data(
value = "three-1",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
),
StoreResponse.Data(
StoreReadResponse.Data(
value = "three-2",
origin = ResponseOrigin.Fetcher
origin = StoreReadResponseOrigin.Fetcher
)
),
twoItemsNoRefresh.await()

View file

@ -0,0 +1,77 @@
package org.mobilenativefoundation.store.store5
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import org.mobilenativefoundation.store.store5.impl.extensions.asMutableStore
import org.mobilenativefoundation.store.store5.util.fake.NoteApi
import org.mobilenativefoundation.store.store5.util.fake.NoteBookkeeping
import org.mobilenativefoundation.store.store5.util.model.Note
import org.mobilenativefoundation.store.store5.util.model.NoteCommonRepresentation
import org.mobilenativefoundation.store.store5.util.model.NoteData
import org.mobilenativefoundation.store.store5.util.model.NoteNetworkRepresentation
import org.mobilenativefoundation.store.store5.util.model.NoteNetworkWriteResponse
import org.mobilenativefoundation.store.store5.util.model.NoteSourceOfTruthRepresentation
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
@OptIn(ExperimentalCoroutinesApi::class, ExperimentalStoreApi::class)
class UpdaterTests {
private val testScope = TestScope()
private lateinit var api: NoteApi
private lateinit var bookkeeping: NoteBookkeeping
@BeforeTest
fun before() {
api = NoteApi()
bookkeeping = NoteBookkeeping()
}
@Test
fun givenEmptyMarketWhenWriteThenSuccessResponsesAndApiUpdated() = testScope.runTest {
val updater = Updater.by<String, NoteCommonRepresentation, NoteNetworkWriteResponse>(
post = { key, commonRepresentation ->
val networkWriteResponse = api.post(key, commonRepresentation)
if (networkWriteResponse.ok) {
UpdaterResult.Success.Typed(networkWriteResponse)
} else {
UpdaterResult.Error.Message("Failed to sync")
}
}
)
val bookkeeper = Bookkeeper.by(
getLastFailedSync = bookkeeping::getLastFailedSync,
setLastFailedSync = bookkeeping::setLastFailedSync,
clear = bookkeeping::clear,
clearAll = bookkeeping::clear
)
val store = StoreBuilder.from<String, NoteNetworkRepresentation, NoteCommonRepresentation>(
fetcher = Fetcher.ofFlow { key ->
val networkRepresentation = NoteNetworkRepresentation(NoteData.Single(Note("$key-id", "$key-title", "$key-content")))
flow { emit(networkRepresentation) }
}
)
.build()
.asMutableStore<String, NoteNetworkRepresentation, NoteCommonRepresentation, NoteSourceOfTruthRepresentation, NoteNetworkWriteResponse>(
updater = updater,
bookkeeper = bookkeeper
)
val noteKey = "1-id"
val noteTitle = "1-title"
val noteContent = "1-content"
val noteData = NoteData.Single(Note(noteKey, noteTitle, noteContent))
val writeRequest = StoreWriteRequest.of<String, NoteCommonRepresentation, NoteNetworkWriteResponse>(
key = noteKey,
input = NoteCommonRepresentation(noteData)
)
val storeWriteResponse = store.write(writeRequest)
assertEquals(StoreWriteResponse.Success.Typed(NoteNetworkWriteResponse(noteKey, true)), storeWriteResponse)
assertEquals(NoteNetworkRepresentation(noteData), api.db[noteKey])
}
}

View file

@ -12,9 +12,9 @@ import org.mobilenativefoundation.store.store5.SourceOfTruth
/**
* Only used in FlowStoreTest. We should get rid of it eventually.
*/
class SimplePersisterAsFlowable<Key, Input, Output>(
private val reader: suspend (Key) -> Output?,
private val writer: suspend (Key, Input) -> Unit,
class SimplePersisterAsFlowable<Key : Any, SourceOfTruthRepresentation : Any>(
private val reader: suspend (Key) -> SourceOfTruthRepresentation?,
private val writer: suspend (Key, SourceOfTruthRepresentation) -> Unit,
private val delete: (suspend (Key) -> Unit)? = null
) {
@ -23,13 +23,13 @@ class SimplePersisterAsFlowable<Key, Input, Output>(
private val versionTracker = KeyTracker<Key>()
fun flowReader(key: Key): Flow<Output?> = flow {
fun flowReader(key: Key): Flow<SourceOfTruthRepresentation?> = flow {
versionTracker.keyFlow(key).collect {
emit(reader(key))
}
}
suspend fun flowWriter(key: Key, input: Input) {
suspend fun flowWriter(key: Key, input: SourceOfTruthRepresentation) {
writer(key, input)
versionTracker.invalidate(key)
}
@ -42,7 +42,7 @@ class SimplePersisterAsFlowable<Key, Input, Output>(
}
}
fun <Key : Any, Input : Any, Output : Any> SimplePersisterAsFlowable<Key, Input, Output>.asSourceOfTruth() =
fun <Key : Any, SourceOfTruthRepresentation : Any> SimplePersisterAsFlowable<Key, SourceOfTruthRepresentation>.asSourceOfTruth() =
SourceOfTruth.of(
reader = ::flowReader,
writer = ::flowWriter,

View file

@ -0,0 +1,6 @@
package org.mobilenativefoundation.store.store5.util
internal interface TestApi<Key : Any, NetworkRepresentation : Any, CommonRepresentation : Any, NetworkWriteResponse : Any> {
fun get(key: Key, fail: Boolean = false): NetworkRepresentation?
fun post(key: Key, value: CommonRepresentation, fail: Boolean = false): NetworkWriteResponse
}

View file

@ -3,18 +3,21 @@ package org.mobilenativefoundation.store.store5.util
import kotlinx.coroutines.flow.filterNot
import kotlinx.coroutines.flow.first
import org.mobilenativefoundation.store.store5.Store
import org.mobilenativefoundation.store.store5.StoreRequest
import org.mobilenativefoundation.store.store5.StoreResponse
import org.mobilenativefoundation.store.store5.StoreReadRequest
import org.mobilenativefoundation.store.store5.StoreReadResponse
import org.mobilenativefoundation.store.store5.impl.operators.mapIndexed
/**
* Helper factory that will return [StoreResponse.Data] for [key]
* Helper factory that will return [StoreReadResponse.Data] for [key]
* if it is cached otherwise will return fresh/network data (updating your caches)
*/
suspend fun <Key : Any, Output : Any> Store<Key, Output>.getData(key: Key) =
suspend fun <Key : Any, CommonRepresentation : Any> Store<Key, CommonRepresentation>.getData(key: Key) =
stream(
StoreRequest.cached(key, refresh = false)
StoreReadRequest.cached(key, refresh = false)
).filterNot {
it is StoreResponse.Loading
it is StoreReadResponse.Loading
}.mapIndexed { index, value ->
value
}.first().let {
StoreResponse.Data(it.requireData(), it.origin)
StoreReadResponse.Data(it.requireData(), it.origin)
}

View file

@ -0,0 +1,40 @@
package org.mobilenativefoundation.store.store5.util.fake
import org.mobilenativefoundation.store.store5.util.TestApi
import org.mobilenativefoundation.store.store5.util.model.Note
import org.mobilenativefoundation.store.store5.util.model.NoteCommonRepresentation
import org.mobilenativefoundation.store.store5.util.model.NoteData
import org.mobilenativefoundation.store.store5.util.model.NoteNetworkRepresentation
import org.mobilenativefoundation.store.store5.util.model.NoteNetworkWriteResponse
internal class NoteApi : TestApi<String, NoteNetworkRepresentation, NoteCommonRepresentation, NoteNetworkWriteResponse> {
internal val db = mutableMapOf<String, NoteNetworkRepresentation>()
init {
seed()
}
override fun get(key: String, fail: Boolean): NoteNetworkRepresentation? {
if (fail) {
throw Exception()
}
return db[key]
}
override fun post(key: String, value: NoteCommonRepresentation, fail: Boolean): NoteNetworkWriteResponse {
if (fail) {
throw Exception()
}
db[key] = NoteNetworkRepresentation(value.data)
return NoteNetworkWriteResponse(key, true)
}
private fun seed() {
db["1-id"] = NoteNetworkRepresentation(NoteData.Single(Note("1-id", "1-title", "1-content")))
db["2-id"] = NoteNetworkRepresentation(NoteData.Single(Note("2-id", "2-title", "2-content")))
db["3-id"] = NoteNetworkRepresentation(NoteData.Single(Note("3-id", "3-title", "3-content")))
}
}

View file

@ -0,0 +1,36 @@
package org.mobilenativefoundation.store.store5.util.fake
class NoteBookkeeping {
private val log: MutableMap<String, Long?> = mutableMapOf()
fun setLastFailedSync(key: String, timestamp: Long, fail: Boolean = false): Boolean {
if (fail) {
throw Exception()
}
log[key] = timestamp
return true
}
fun getLastFailedSync(key: String, fail: Boolean = false): Long? {
if (fail) {
throw Exception()
}
return log[key]
}
fun clear(key: String, fail: Boolean = false): Boolean {
if (fail) {
throw Exception()
}
log.remove(key)
return true
}
fun clear(fail: Boolean = false): Boolean {
if (fail) {
throw Exception()
}
log.clear()
return true
}
}

View file

@ -0,0 +1,29 @@
package org.mobilenativefoundation.store.store5.util.model
internal sealed class NoteData {
data class Single(val item: Note) : NoteData()
data class Collection(val items: List<Note>) : NoteData()
}
internal data class NoteNetworkWriteResponse(
val key: String,
val ok: Boolean
)
internal data class NoteNetworkRepresentation(
val data: NoteData? = null
)
internal data class NoteCommonRepresentation(
val data: NoteData? = null
)
internal data class NoteSourceOfTruthRepresentation(
val data: NoteData? = null
)
internal data class Note(
val id: String,
val title: String,
val content: String
)