Support Rx2 (#531)

* Add rx2 module

* Support Rx2

Signed-off-by: Matt Ramotar <mramotar@dropbox.com>

* Add unit tests

Signed-off-by: Matt Ramotar <mramotar@dropbox.com>

* Format

Signed-off-by: Matt Ramotar <mramotar@dropbox.com>

---------

Signed-off-by: Matt Ramotar <mramotar@dropbox.com>
This commit is contained in:
Matt Ramotar 2023-04-24 16:54:26 -04:00 committed by GitHub
parent 66d18cb026
commit 7d73f08cc0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 772 additions and 0 deletions

View file

@ -39,6 +39,7 @@ object Deps {
const val serializationJson = "org.jetbrains.kotlinx:kotlinx-serialization-json:${Version.kotlinxSerialization}"
const val coroutinesAndroid = "org.jetbrains.kotlinx:kotlinx-coroutines-android:${Version.kotlinxCoroutines}"
const val coroutinesCore = "org.jetbrains.kotlinx:kotlinx-coroutines-core:${Version.kotlinxCoroutines}"
const val coroutinesRx2 = "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:${Version.kotlinxCoroutines}"
const val dateTime = "org.jetbrains.kotlinx:kotlinx-datetime:0.4.0"
}
@ -47,6 +48,10 @@ object Deps {
const val clientCio = "io.ktor:ktor-client-cio:${Version.ktor}"
}
object Rx {
const val rx2 = "io.reactivex.rxjava2:rxjava:2.2.21"
}
object SqlDelight {
const val gradlePlugin = "com.squareup.sqldelight:gradle-plugin:${Version.sqlDelight}"
const val driverAndroid = "com.squareup.sqldelight:android-driver:${Version.sqlDelight}"
@ -61,6 +66,7 @@ object Deps {
const val core = "androidx.test:core:${Version.testCore}"
const val coroutinesTest = "org.jetbrains.kotlinx:kotlinx-coroutines-test:${Version.kotlinxCoroutines}"
const val junit = "junit:junit:${Version.junit}"
const val truth = "com.google.truth:truth:${Version.truth}"
}
object Touchlab {

View file

@ -33,4 +33,5 @@ object Version {
const val sqlDelight = "1.5.4"
const val sqlDelightGradlePlugin = sqlDelight
const val store = "5.0.0-alpha05"
const val truth = "1.1.3"
}

64
rx2/build.gradle.kts Normal file
View file

@ -0,0 +1,64 @@
@file:Suppress("UnstableApiUsage")
import com.vanniktech.maven.publish.SonatypeHost.S01
import org.jetbrains.dokka.gradle.DokkaTask
plugins {
kotlin("android")
id("com.android.library")
id("com.vanniktech.maven.publish")
id("org.jetbrains.dokka")
id("org.jetbrains.kotlinx.kover")
`maven-publish`
kotlin("native.cocoapods")
}
dependencies {
implementation(Deps.Kotlinx.coroutinesRx2)
implementation(Deps.Kotlinx.coroutinesCore)
implementation(Deps.Kotlinx.coroutinesAndroid)
implementation(Deps.Rx.rx2)
implementation(project(":store"))
testImplementation(kotlin("test"))
with(Deps.Test) {
testImplementation(junit)
testImplementation(core)
testImplementation(coroutinesTest)
testImplementation(truth)
}
}
android {
compileSdk = 33
defaultConfig {
minSdk = 24
targetSdk = 33
}
lint {
disable += "ComposableModifierFactory"
disable += "ModifierFactoryExtensionFunction"
disable += "ModifierFactoryReturnType"
disable += "ModifierFactoryUnreferencedReceiver"
}
compileOptions {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
}
}
tasks.withType<DokkaTask>().configureEach {
dokkaSourceSets.configureEach {
reportUndocumented.set(false)
skipDeprecated.set(true)
jdkVersion.set(8)
}
}
mavenPublishing {
publishToMavenCentral(S01)
signAllPublications()
}

3
rx2/gradle.properties Normal file
View file

@ -0,0 +1,3 @@
POM_NAME=org.mobilenativefoundation.store
POM_ARTIFACT_ID=rx2
POM_PACKAGING=jar

View file

@ -0,0 +1,2 @@
<?xml version="1.0" encoding="utf-8"?>
<manifest package="org.mobilenativefoundation.store.rx2" />

View file

@ -0,0 +1,70 @@
package org.mobilenativefoundation.store.rx2
import io.reactivex.Flowable
import io.reactivex.Single
import kotlinx.coroutines.reactive.asFlow
import org.mobilenativefoundation.store.store5.Fetcher
import org.mobilenativefoundation.store.store5.FetcherResult
import org.mobilenativefoundation.store.store5.Store
/**
* Creates a new [Fetcher] from a [flowableFactory].
*
* [Store] does not catch exception thrown in [flowableFactory] or in the returned [Flowable]. These
* exception will be propagated to the caller.
*
* Use when creating a [Store] that fetches objects in a multiple responses per request
* network protocol (e.g Web Sockets).
*
* @param flowableFactory a factory for a [Flowable] source of network records.
*/
fun <Key : Any, Output : Any> Fetcher.Companion.ofResultFlowable(
flowableFactory: (key: Key) -> Flowable<FetcherResult<Output>>
): Fetcher<Key, Output> = ofResultFlow { key: Key -> flowableFactory(key).asFlow() }
/**
* "Creates" a [Fetcher] from a [singleFactory].
*
* [Store] does not catch exception thrown in [singleFactory] or in the returned [Single]. These
* exception will be propagated to the caller.
*
* Use when creating a [Store] that fetches objects in a single response per request network
* protocol (e.g Http).
*
* @param singleFactory a factory for a [Single] source of network records.
*/
fun <Key : Any, Output : Any> Fetcher.Companion.ofResultSingle(
singleFactory: (key: Key) -> Single<FetcherResult<Output>>
): Fetcher<Key, Output> = ofResultFlowable { key: Key -> singleFactory(key).toFlowable() }
/**
* "Creates" a [Fetcher] from a [flowableFactory] and translate the results to a [FetcherResult].
*
* Emitted values will be wrapped in [FetcherResult.Data]. if an exception disrupts the stream then
* it will be wrapped in [FetcherResult.Error]. Exceptions thrown in [flowableFactory] itself are
* not caught and will be returned to the caller.
*
* Use when creating a [Store] that fetches objects in a multiple responses per request
* network protocol (e.g Web Sockets).
*
* @param flowFactory a factory for a [Flowable] source of network records.
*/
fun <Key : Any, Output : Any> Fetcher.Companion.ofFlowable(
flowableFactory: (key: Key) -> Flowable<Output>
): Fetcher<Key, Output> = ofFlow { key: Key -> flowableFactory(key).asFlow() }
/**
* Creates a new [Fetcher] from a [singleFactory] and translate the results to a [FetcherResult].
*
* The emitted value will be wrapped in [FetcherResult.Data]. if an exception is returned then
* it will be wrapped in [FetcherResult.Error]. Exceptions thrown in [singleFactory] itself are
* not caught and will be returned to the caller.
*
* Use when creating a [Store] that fetches objects in a single response per request network
* protocol (e.g Http).
*
* @param singleFactory a factory for a [Single] source of network records.
*/
fun <Key : Any, Output : Any> Fetcher.Companion.ofSingle(
singleFactory: (key: Key) -> Single<Output>
): Fetcher<Key, Output> = ofFlowable { key: Key -> singleFactory(key).toFlowable() }

View file

@ -0,0 +1,64 @@
package org.mobilenativefoundation.store.rx2
import io.reactivex.Completable
import io.reactivex.Flowable
import io.reactivex.Maybe
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.rx2.await
import kotlinx.coroutines.rx2.awaitSingleOrNull
import org.mobilenativefoundation.store.store5.SourceOfTruth
/**
* Creates a [Maybe] source of truth that is accessible via [reader], [writer], [delete] and
* [deleteAll].
*
* @param reader function for reading records from the source of truth
* @param writer function for writing updates to the backing source of truth
* @param delete function for deleting records in the source of truth for the given key
* @param deleteAll function for deleting all records in the source of truth
*
*/
fun <Key : Any, Local : Any> SourceOfTruth.Companion.ofMaybe(
reader: (Key) -> Maybe<Local>,
writer: (Key, Local) -> Completable,
delete: ((Key) -> Completable)? = null,
deleteAll: (() -> Completable)? = null
): SourceOfTruth<Key, Local> {
val deleteFun: (suspend (Key) -> Unit)? =
if (delete != null) { key -> delete(key).await() } else null
val deleteAllFun: (suspend () -> Unit)? = deleteAll?.let { { deleteAll().await() } }
return of(
nonFlowReader = { key -> reader.invoke(key).awaitSingleOrNull() },
writer = { key, output -> writer.invoke(key, output).await() },
delete = deleteFun,
deleteAll = deleteAllFun
)
}
/**
* Creates a ([Flowable]) source of truth that is accessed via [reader], [writer], [delete] and
* [deleteAll].
*
* @param reader function for reading records from the source of truth
* @param writer function for writing updates to the backing source of truth
* @param delete function for deleting records in the source of truth for the given key
* @param deleteAll function for deleting all records in the source of truth
*
*/
fun <Key : Any, Local : Any> SourceOfTruth.Companion.ofFlowable(
reader: (Key) -> Flowable<Local>,
writer: (Key, Local) -> Completable,
delete: ((Key) -> Completable)? = null,
deleteAll: (() -> Completable)? = null
): SourceOfTruth<Key, Local> {
val deleteFun: (suspend (Key) -> Unit)? =
if (delete != null) { key -> delete(key).await() } else null
val deleteAllFun: (suspend () -> Unit)? = deleteAll?.let { { deleteAll().await() } }
return of(
reader = { key -> reader.invoke(key).asFlow() },
writer = { key, output -> writer.invoke(key, output).await() },
delete = deleteFun,
deleteAll = deleteAllFun
)
}

View file

@ -0,0 +1,49 @@
package org.mobilenativefoundation.store.rx2
import io.reactivex.Completable
import io.reactivex.Flowable
import kotlinx.coroutines.rx2.asFlowable
import kotlinx.coroutines.rx2.rxCompletable
import kotlinx.coroutines.rx2.rxSingle
import org.mobilenativefoundation.store.store5.ExperimentalStoreApi
import org.mobilenativefoundation.store.store5.Store
import org.mobilenativefoundation.store.store5.StoreBuilder
import org.mobilenativefoundation.store.store5.StoreReadRequest
import org.mobilenativefoundation.store.store5.StoreReadResponse
import org.mobilenativefoundation.store.store5.impl.extensions.fresh
import org.mobilenativefoundation.store.store5.impl.extensions.get
/**
* Return a [Flowable] for the given key
* @param request - see [StoreReadRequest] for configurations
*/
fun <Key : Any, Output : Any> Store<Key, Output>.observe(request: StoreReadRequest<Key>): Flowable<StoreReadResponse<Output>> =
stream(request).asFlowable()
/**
* Purge a particular entry from memory and disk cache.
* Persistent storage will only be cleared if a delete function was passed to
* [StoreBuilder.persister] or [StoreBuilder.nonFlowingPersister] when creating the [Store].
*/
fun <Key : Any, Output : Any> Store<Key, Output>.observeClear(key: Key): Completable =
rxCompletable { clear(key) }
/**
* Purge all entries from memory and disk cache.
* Persistent storage will only be cleared if a deleteAll function was passed to
* [StoreBuilder.persister] or [StoreBuilder.nonFlowingPersister] when creating the [Store].
*/
@ExperimentalStoreApi
fun <Key : Any, Output : Any> Store<Key, Output>.observeClearAll(): Completable =
rxCompletable { clear() }
/**
* Helper factory that will return data as a [Single] for [key] if it is cached otherwise will return fresh/network data (updating your caches)
*/
fun <Key : Any, Output : Any> Store<Key, Output>.getSingle(key: Key) = rxSingle { this@getSingle.get(key) }
/**
* Helper factory that will return fresh data as a [Single] for [key] while updating your caches
*/
fun <Key : Any, Output : Any> Store<Key, Output>.freshSingle(key: Key) = rxSingle { this@freshSingle.fresh(key) }

View file

@ -0,0 +1,26 @@
package org.mobilenativefoundation.store.rx2
import io.reactivex.Scheduler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.rx2.asCoroutineDispatcher
import org.mobilenativefoundation.store.store5.Store
import org.mobilenativefoundation.store.store5.StoreBuilder
/**
* A store multicasts same [Output] value to many consumers (Similar to RxJava.share()), by default
* [Store] will open a global scope for management of shared responses, if instead you'd like to control
* the scheduler that sharing/multicasting happens in you can pass a @param [scheduler]
*
* Note this does not control what scheduler a response is emitted on but rather what thread/scheduler
* to use when managing in flight responses. This is usually used for things like testing where you
* may want to confine to a scheduler backed by a single thread executor
*
* @param scheduler - scheduler to use for sharing
* if a scheduler is not set Store will use [GlobalScope]
*/
fun <Key : Any, Network : Any, Output : Any, Local : Any> StoreBuilder<Key, Network, Output, Local>.withScheduler(
scheduler: Scheduler
): StoreBuilder<Key, Network, Output, Local> {
return scope(CoroutineScope(scheduler.asCoroutineDispatcher()))
}

View file

@ -0,0 +1,85 @@
package org.mobilenativefoundation.store.rx2.test
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.google.common.truth.FailureMetadata
import com.google.common.truth.Subject
import com.google.common.truth.Truth
import com.google.common.truth.Truth.assertWithMessage
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.advanceUntilIdle
@OptIn(ExperimentalCoroutinesApi::class)
internal fun <T> TestCoroutineScope.assertThat(flow: Flow<T>): FlowSubject<T> {
return Truth.assertAbout(FlowSubject.Factory<T>(this)).that(flow)
}
@OptIn(ExperimentalCoroutinesApi::class)
internal class FlowSubject<T> constructor(
failureMetadata: FailureMetadata,
private val testCoroutineScope: TestCoroutineScope,
private val actual: Flow<T>
) : Subject(failureMetadata, actual) {
/**
* Takes all items in the flow that are available by collecting on it as long as there are
* active jobs in the given [TestCoroutineScope].
*
* It ensures all expected items are dispatched as well as no additional unexpected items are
* dispatched.
*/
suspend fun emitsExactly(vararg expected: T) {
val collectedSoFar = mutableListOf<T>()
val collectionCoroutine = testCoroutineScope.async {
actual.collect {
collectedSoFar.add(it)
if (collectedSoFar.size > expected.size) {
assertWithMessage("Too many emissions in the flow (only first additional item is shown)")
.that(collectedSoFar)
.isEqualTo(expected)
}
}
}
testCoroutineScope.advanceUntilIdle()
if (!collectionCoroutine.isActive) {
collectionCoroutine.getCompletionExceptionOrNull()?.let {
throw it
}
}
collectionCoroutine.cancelAndJoin()
assertWithMessage("Flow didn't exactly emit expected items")
.that(collectedSoFar)
.isEqualTo(expected.toList())
}
class Factory<T>(
private val testCoroutineScope: TestCoroutineScope
) : Subject.Factory<FlowSubject<T>, Flow<T>> {
override fun createSubject(metadata: FailureMetadata, actual: Flow<T>): FlowSubject<T> {
return FlowSubject(
failureMetadata = metadata,
actual = actual,
testCoroutineScope = testCoroutineScope
)
}
}
}

View file

@ -0,0 +1,84 @@
package org.mobilenativefoundation.store.rx2.test
import com.google.common.truth.Truth.assertThat
import io.reactivex.Single
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.mobilenativefoundation.store.rx2.ofResultSingle
import org.mobilenativefoundation.store.store5.Fetcher
import org.mobilenativefoundation.store.store5.FetcherResult
import org.mobilenativefoundation.store.store5.StoreBuilder
import org.mobilenativefoundation.store.store5.StoreReadRequest
import org.mobilenativefoundation.store.store5.StoreReadResponse
import org.mobilenativefoundation.store.store5.StoreReadResponseOrigin
@RunWith(JUnit4::class)
@FlowPreview
@ExperimentalCoroutinesApi
class HotRxSingleStoreTest {
private val testScope = TestCoroutineScope()
@Test
fun `GIVEN a hot fetcher WHEN two cached and one fresh call THEN fetcher is only called twice`() =
testScope.runBlockingTest {
val fetcher: FakeRxFetcher<Int, FetcherResult<String>> = FakeRxFetcher(
3 to FetcherResult.Data("three-1"),
3 to FetcherResult.Data("three-2")
)
val pipeline = StoreBuilder.from<Int, String, String>(Fetcher.ofResultSingle<Int, String> { fetcher.fetch(it) })
.scope(testScope)
.build()
assertThat(pipeline.stream(StoreReadRequest.cached(3, refresh = false)))
.emitsExactly(
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreReadResponse.Data(
value = "three-1",
origin = StoreReadResponseOrigin.Fetcher
)
)
assertThat(
pipeline.stream(StoreReadRequest.cached(3, refresh = false))
).emitsExactly(
StoreReadResponse.Data(
value = "three-1",
origin = StoreReadResponseOrigin.Cache
)
)
assertThat(pipeline.stream(StoreReadRequest.fresh(3)))
.emitsExactly(
StoreReadResponse.Loading(
origin = StoreReadResponseOrigin.Fetcher
),
StoreReadResponse.Data(
value = "three-2",
origin = StoreReadResponseOrigin.Fetcher
)
)
}
}
class FakeRxFetcher<Key, Output>(
vararg val responses: Pair<Key, Output>
) {
private var index = 0
@Suppress("RedundantSuspendModifier") // needed for function reference
fun fetch(key: Key): Single<Output> {
// will throw if fetcher called more than twice
if (index >= responses.size) {
throw AssertionError("unexpected fetch request")
}
val pair = responses[index++]
assertThat(pair.first).isEqualTo(key)
return Single.just(pair.second)
}
}

View file

@ -0,0 +1,110 @@
package org.mobilenativefoundation.store.rx2.test
import io.reactivex.BackpressureStrategy
import io.reactivex.Completable
import io.reactivex.Flowable
import io.reactivex.schedulers.TestScheduler
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.mobilenativefoundation.store.rx2.observe
import org.mobilenativefoundation.store.rx2.ofFlowable
import org.mobilenativefoundation.store.rx2.ofResultFlowable
import org.mobilenativefoundation.store.rx2.withScheduler
import org.mobilenativefoundation.store.store5.Fetcher
import org.mobilenativefoundation.store.store5.FetcherResult
import org.mobilenativefoundation.store.store5.SourceOfTruth
import org.mobilenativefoundation.store.store5.StoreBuilder
import org.mobilenativefoundation.store.store5.StoreReadRequest
import org.mobilenativefoundation.store.store5.StoreReadResponse
import org.mobilenativefoundation.store.store5.StoreReadResponseOrigin
import java.util.concurrent.atomic.AtomicInteger
@RunWith(JUnit4::class)
@FlowPreview
@ExperimentalCoroutinesApi
class RxFlowableStoreTest {
private val testScheduler = TestScheduler()
private val atomicInteger = AtomicInteger(0)
private val fakeDisk = mutableMapOf<Int, String>()
private val store = StoreBuilder.from<Int, String, String, String>(
fetcher = Fetcher.ofResultFlowable<Int, String> {
Flowable.create(
{ emitter ->
emitter.onNext(
FetcherResult.Data("$it ${atomicInteger.incrementAndGet()} occurrence")
)
emitter.onNext(
FetcherResult.Data("$it ${atomicInteger.incrementAndGet()} occurrence")
)
emitter.onComplete()
},
BackpressureStrategy.BUFFER
)
},
sourceOfTruth = SourceOfTruth.ofFlowable<Int, String>(
reader = {
if (fakeDisk[it] != null)
Flowable.fromCallable { fakeDisk[it]!! }
else
Flowable.empty<String>()
},
writer = { key, value ->
Completable.fromAction { fakeDisk[key] = value }
}
)
)
.withScheduler(testScheduler)
.build()
@Test
fun simpleTest() {
val testSubscriber1 = store.observe(StoreReadRequest.fresh(3))
.subscribeOn(testScheduler)
.test()
testScheduler.triggerActions()
testSubscriber1
.awaitCount(3)
.assertValues(
StoreReadResponse.Loading(StoreReadResponseOrigin.Fetcher),
StoreReadResponse.Data("3 1 occurrence", StoreReadResponseOrigin.Fetcher),
StoreReadResponse.Data("3 2 occurrence", StoreReadResponseOrigin.Fetcher)
)
val testSubscriber2 = store.observe(StoreReadRequest.cached(3, false))
.subscribeOn(testScheduler)
.test()
testScheduler.triggerActions()
testSubscriber2
.awaitCount(2)
.assertValues(
StoreReadResponse.Data("3 2 occurrence", StoreReadResponseOrigin.Cache),
StoreReadResponse.Data("3 2 occurrence", StoreReadResponseOrigin.SourceOfTruth)
)
val testSubscriber3 = store.observe(StoreReadRequest.fresh(3))
.subscribeOn(testScheduler)
.test()
testScheduler.triggerActions()
testSubscriber3
.awaitCount(3)
.assertValues(
StoreReadResponse.Loading(StoreReadResponseOrigin.Fetcher),
StoreReadResponse.Data("3 3 occurrence", StoreReadResponseOrigin.Fetcher),
StoreReadResponse.Data("3 4 occurrence", StoreReadResponseOrigin.Fetcher)
)
val testSubscriber4 = store.observe(StoreReadRequest.cached(3, false))
.subscribeOn(testScheduler)
.test()
testScheduler.triggerActions()
testSubscriber4
.awaitCount(2)
.assertValues(
StoreReadResponse.Data("3 4 occurrence", StoreReadResponseOrigin.Cache),
StoreReadResponse.Data("3 4 occurrence", StoreReadResponseOrigin.SourceOfTruth)
)
}
}

View file

@ -0,0 +1,78 @@
package org.mobilenativefoundation.store.rx2.test
import io.reactivex.Completable
import io.reactivex.Maybe
import io.reactivex.Single
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.mobilenativefoundation.store.rx2.freshSingle
import org.mobilenativefoundation.store.rx2.getSingle
import org.mobilenativefoundation.store.rx2.ofMaybe
import org.mobilenativefoundation.store.rx2.ofResultSingle
import org.mobilenativefoundation.store.rx2.withScheduler
import org.mobilenativefoundation.store.store5.ExperimentalStoreApi
import org.mobilenativefoundation.store.store5.Fetcher
import org.mobilenativefoundation.store.store5.FetcherResult
import org.mobilenativefoundation.store.store5.SourceOfTruth
import org.mobilenativefoundation.store.store5.StoreBuilder
import java.util.concurrent.atomic.AtomicInteger
@ExperimentalStoreApi
@RunWith(JUnit4::class)
@FlowPreview
@ExperimentalCoroutinesApi
class RxSingleStoreExtensionsTest {
private val atomicInteger = AtomicInteger(0)
private var fakeDisk = mutableMapOf<Int, String>()
private val store =
StoreBuilder.from<Int, String, String, String>(
fetcher = Fetcher.ofResultSingle {
Single.fromCallable { FetcherResult.Data("$it ${atomicInteger.incrementAndGet()}") }
},
sourceOfTruth = SourceOfTruth.ofMaybe(
reader = { Maybe.fromCallable<String> { fakeDisk[it] } },
writer = { key, value ->
Completable.fromAction { fakeDisk[key] = value }
},
delete = { key ->
Completable.fromAction { fakeDisk.remove(key) }
},
deleteAll = {
Completable.fromAction { fakeDisk.clear() }
}
)
)
.withScheduler(Schedulers.trampoline())
.build()
@Test
fun `store rx extension tests`() {
// Return from cache - after initial fetch
store.getSingle(3)
.test()
.await()
.assertValue("3 1")
// Return from cache
store.getSingle(3)
.test()
.await()
.assertValue("3 1")
// Return from fresh - forcing a new fetch
store.freshSingle(3)
.test()
.await()
.assertValue("3 2")
// Return from cache - different to initial
store.getSingle(3)
.test()
.await()
.assertValue("3 2")
}
}

View file

@ -0,0 +1,129 @@
package org.mobilenativefoundation.store.rx2.test
import io.reactivex.Completable
import io.reactivex.Maybe
import io.reactivex.Single
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.mobilenativefoundation.store.rx2.observe
import org.mobilenativefoundation.store.rx2.observeClear
import org.mobilenativefoundation.store.rx2.observeClearAll
import org.mobilenativefoundation.store.rx2.ofMaybe
import org.mobilenativefoundation.store.rx2.ofResultSingle
import org.mobilenativefoundation.store.rx2.withScheduler
import org.mobilenativefoundation.store.store5.ExperimentalStoreApi
import org.mobilenativefoundation.store.store5.Fetcher
import org.mobilenativefoundation.store.store5.FetcherResult
import org.mobilenativefoundation.store.store5.SourceOfTruth
import org.mobilenativefoundation.store.store5.StoreBuilder
import org.mobilenativefoundation.store.store5.StoreReadRequest
import org.mobilenativefoundation.store.store5.StoreReadResponse
import org.mobilenativefoundation.store.store5.StoreReadResponseOrigin
import java.util.concurrent.atomic.AtomicInteger
@ExperimentalStoreApi
@RunWith(JUnit4::class)
@FlowPreview
@ExperimentalCoroutinesApi
class RxSingleStoreTest {
private val atomicInteger = AtomicInteger(0)
private var fakeDisk = mutableMapOf<Int, String>()
private val store =
StoreBuilder.from<Int, String, String, String>(
fetcher = Fetcher.ofResultSingle {
Single.fromCallable { FetcherResult.Data("$it ${atomicInteger.incrementAndGet()}") }
},
sourceOfTruth = SourceOfTruth.ofMaybe(
reader = { Maybe.fromCallable<String> { fakeDisk[it] } },
writer = { key, value ->
Completable.fromAction { fakeDisk[key] = value }
},
delete = { key ->
Completable.fromAction { fakeDisk.remove(key) }
},
deleteAll = {
Completable.fromAction { fakeDisk.clear() }
}
)
)
.withScheduler(Schedulers.trampoline())
.build()
@Test
fun simpleTest() {
store.observe(StoreReadRequest.cached(3, false))
.test()
.awaitCount(2)
.assertValues(
StoreReadResponse.Loading(StoreReadResponseOrigin.Fetcher),
StoreReadResponse.Data("3 1", StoreReadResponseOrigin.Fetcher)
)
store.observe(StoreReadRequest.cached(3, false))
.test()
.awaitCount(2)
.assertValues(
StoreReadResponse.Data("3 1", StoreReadResponseOrigin.Cache),
StoreReadResponse.Data("3 1", StoreReadResponseOrigin.SourceOfTruth)
)
store.observe(StoreReadRequest.fresh(3))
.test()
.awaitCount(2)
.assertValues(
StoreReadResponse.Loading(StoreReadResponseOrigin.Fetcher),
StoreReadResponse.Data("3 2", StoreReadResponseOrigin.Fetcher)
)
store.observe(StoreReadRequest.cached(3, false))
.test()
.awaitCount(2)
.assertValues(
StoreReadResponse.Data("3 2", StoreReadResponseOrigin.Cache),
StoreReadResponse.Data("3 2", StoreReadResponseOrigin.SourceOfTruth)
)
}
@Test
fun `GIVEN a store with persister values WHEN observeClear is Called THEN next Store get hits network`() {
fakeDisk[3] = "seeded occurrence"
store.observeClear(3).blockingGet()
store.observe(StoreReadRequest.cached(3, false))
.test()
.awaitCount(2)
.assertValues(
StoreReadResponse.Loading(StoreReadResponseOrigin.Fetcher),
StoreReadResponse.Data("3 1", StoreReadResponseOrigin.Fetcher)
)
}
@Test
fun `GIVEN a store with persister values WHEN observeClearAll is called THEN next Store get calls both hit network`() {
fakeDisk[3] = "seeded occurrence"
fakeDisk[4] = "another seeded occurrence"
store.observeClearAll().blockingGet()
store.observe(StoreReadRequest.cached(3, false))
.test()
.awaitCount(2)
.assertValues(
StoreReadResponse.Loading(StoreReadResponseOrigin.Fetcher),
StoreReadResponse.Data("3 1", StoreReadResponseOrigin.Fetcher)
)
store.observe(StoreReadRequest.cached(4, false))
.test()
.awaitCount(2)
.assertValues(
StoreReadResponse.Loading(StoreReadResponseOrigin.Fetcher),
StoreReadResponse.Data("4 2", StoreReadResponseOrigin.Fetcher)
)
}
}

View file

@ -1,3 +1,4 @@
include ':store'
include ':cache'
include ':multicast'
include ':rx2'