Add StoreResult.NoNewData for empty fetchers. (#194)

* Add StoreResult.NoNewData for empty fetchers.

* handle no SoT case

* update comments
This commit is contained in:
Eyal Guthmann 2020-08-07 09:26:51 -07:00 committed by GitHub
parent eb5b8c2fe8
commit ecc3470d55
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 265 additions and 32 deletions

View file

@ -233,6 +233,17 @@ public final class com/dropbox/android/external/store4/StoreResponse$Loading : c
public fun toString ()Ljava/lang/String;
}
public final class com/dropbox/android/external/store4/StoreResponse$NoNewData : com/dropbox/android/external/store4/StoreResponse {
public fun <init> (Lcom/dropbox/android/external/store4/ResponseOrigin;)V
public final fun component1 ()Lcom/dropbox/android/external/store4/ResponseOrigin;
public final fun copy (Lcom/dropbox/android/external/store4/ResponseOrigin;)Lcom/dropbox/android/external/store4/StoreResponse$NoNewData;
public static synthetic fun copy$default (Lcom/dropbox/android/external/store4/StoreResponse$NoNewData;Lcom/dropbox/android/external/store4/ResponseOrigin;ILjava/lang/Object;)Lcom/dropbox/android/external/store4/StoreResponse$NoNewData;
public fun equals (Ljava/lang/Object;)Z
public fun getOrigin ()Lcom/dropbox/android/external/store4/ResponseOrigin;
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}
public final class com/dropbox/android/external/store4/StoreResponseKt {
public static final fun doThrow (Lcom/dropbox/android/external/store4/StoreResponse$Error;)Ljava/lang/Void;
}

View file

@ -60,19 +60,25 @@ interface Store<Key : Any, Output : Any> {
}
/**
* Helper factory that will return data for [key] if it is cached otherwise will return fresh/network data (updating your caches)
* Helper factory that will return data for [key] if it is cached otherwise will return
* fresh/network data (updating your caches)
*/
suspend fun <Key : Any, Output : Any> Store<Key, Output>.get(key: Key) = stream(
StoreRequest.cached(key, refresh = false)
).filterNot {
it is StoreResponse.Loading
it is StoreResponse.Loading || it is StoreResponse.NoNewData
}.first().requireData()
/**
* Helper factory that will return fresh data for [key] while updating your caches
*
* Note: If the [Fetcher] does not return any data (i.e the returned
* [kotlinx.coroutines.Flow], when collected, is empty). Then store will fall back to local
* data **even** if you explicitly requested fresh data.
* See https://github.com/dropbox/Store/pull/194 for context
*/
suspend fun <Key : Any, Output : Any> Store<Key, Output>.fresh(key: Key) = stream(
StoreRequest.fresh(key)
).filterNot {
it is StoreResponse.Loading
it is StoreResponse.Loading || it is StoreResponse.NoNewData
}.first().requireData()

View file

@ -43,7 +43,13 @@ data class StoreRequest<Key> private constructor(
}
/**
* Create a Store Request which will skip all caches and hit your fetcher (filling your caches)
* Create a Store Request which will skip all caches and hit your fetcher
* (filling your caches).
*
* Note: If the [Fetcher] does not return any data (i.e the returned
* [kotlinx.coroutines.Flow], when collected, is empty). Then store will fall back to local
* data **even** if you explicitly requested fresh data.
* See https://github.com/dropbox/Store/pull/194 for context.
*/
fun <Key> fresh(key: Key) = StoreRequest(
key = key,

View file

@ -29,15 +29,21 @@ sealed class StoreResponse<out T> {
abstract val origin: ResponseOrigin
/**
* Loading event dispatched by a Pipeline
* Loading event dispatched by [Store] to signal the [Fetcher] is in progress.
*/
data class Loading<T>(override val origin: ResponseOrigin) : StoreResponse<T>()
/**
* Data dispatched by a pipeline
* Data dispatched by [Store]
*/
data class Data<T>(val value: T, override val origin: ResponseOrigin) : StoreResponse<T>()
/**
* No new data event dispatched by Store to signal the [Fetcher] returned no data (i.e the
* returned [kotlinx.coroutines.Flow], when collected, was empty).
*/
data class NoNewData<T>(override val origin: ResponseOrigin) : StoreResponse<T>()
/**
* Error dispatched by a pipeline
*/
@ -98,6 +104,7 @@ sealed class StoreResponse<out T> {
internal fun <R> swapType(): StoreResponse<R> = when (this) {
is Error -> this as Error<R>
is Loading -> this as Loading<R>
is NoNewData -> this as NoNewData<R>
is Data -> throw RuntimeException("cannot swap type for StoreResponse.Data")
}
}

View file

@ -29,6 +29,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEmpty
/**
* This class maintains one and only 1 fetcher for a given [Key].
@ -82,6 +83,8 @@ internal class FetcherController<Key : Any, Input : Any, Output : Any>(
origin = ResponseOrigin.Fetcher
)
}
}.onEmpty {
emit(StoreResponse.NoNewData(ResponseOrigin.Fetcher))
},
piggybackingDownstream = enablePiggyback,
onEach = { response ->

View file

@ -86,30 +86,53 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
override fun stream(request: StoreRequest<Key>): Flow<StoreResponse<Output>> =
flow<StoreResponse<Output>> {
val cached = if (request.shouldSkipCache(CacheType.MEMORY)) {
val cachedToEmit = if (request.shouldSkipCache(CacheType.MEMORY)) {
null
} else {
memCache?.get(request.key)
}
cached?.let {
cachedToEmit?.let {
// if we read a value from cache, dispatch it first
emit(StoreResponse.Data(value = it, origin = ResponseOrigin.Cache))
}
if (sourceOfTruth == null) {
val stream = if (sourceOfTruth == null) {
// piggypack only if not specified fresh data AND we emitted a value from the cache
val piggybackOnly = !request.refresh && cached != null
val piggybackOnly = !request.refresh && cachedToEmit != null
@Suppress("UNCHECKED_CAST")
emitAll(
createNetworkFlow(
request = request,
networkLock = null,
piggybackOnly = piggybackOnly
) as Flow<StoreResponse<Output>> // when no source of truth Input == Output
)
createNetworkFlow(
request = request,
networkLock = null,
piggybackOnly = piggybackOnly
) as Flow<StoreResponse<Output>> // when no source of truth Input == Output
} else {
emitAll(diskNetworkCombined(request, sourceOfTruth))
diskNetworkCombined(request, sourceOfTruth)
}
emitAll(stream.transform {
emit(it)
if (it is StoreResponse.NoNewData && cachedToEmit == null) {
// In the special case where fetcher returned no new data we actually want to
// serve cache data (even if the request specified skipping cache and/or SoT)
//
// For stream(Request.cached(key, refresh=true)) we will return:
// Cache
// Source of truth
// Fetcher - > Loading
// Fetcher - > NoNewData
// (future Source of truth updates)
//
// For stream(Request.fresh(key)) we will return:
// Fetcher - > Loading
// Fetcher - > NoNewData
// Cache
// Source of truth
// (future Source of truth updates)
memCache?.get(request.key)?.let {
emit(StoreResponse.Data(value = it, origin = ResponseOrigin.Cache))
}
}
})
}.onEach {
// whenever a value is dispatched, save it to the memory cache
if (it.origin != ResponseOrigin.Cache) {
@ -179,13 +202,17 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
when (it) {
is Either.Left -> {
// left, that is data from network
when (it.value) {
is StoreResponse.Data ->
// unlocking disk only if network sent data so that fresh data request
// never receives disk data by mistake
diskLock.complete(Unit)
else ->
emit(it.value.swapType())
if (it.value is StoreResponse.Data || it.value is StoreResponse.NoNewData) {
// Unlocking disk only if network sent data or reported no new data
// so that fresh data request never receives new fetcher data after
// cached disk data.
// This means that if the user asked for fresh data but the network returned
// no new data we will still unblock disk.
diskLock.complete(Unit)
}
if (it.value !is StoreResponse.Data) {
emit(it.value.swapType())
}
}
is Either.Right -> {
@ -194,12 +221,8 @@ internal class RealStore<Key : Any, Input : Any, Output : Any>(
is StoreResponse.Data -> {
val diskValue = diskData.value
if (diskValue != null) {
emit(
StoreResponse.Data(
value = diskValue,
origin = diskData.origin
)
)
@Suppress("UNCHECKED_CAST")
emit(diskData as StoreResponse<Output>)
}
// If the disk value is null or refresh was requested then allow fetcher
// to start emitting values.

View file

@ -163,6 +163,40 @@ class StoreTest(
verify(persister, never()).read(any())
}
@Test
fun `GIVEN no new data WHEN get THEN returns disk data`() = testScope.runBlockingTest {
val simpleStore = TestStoreBuilder.from(
scope = testScope,
fetcher = fetcher,
persister = persister
).build(storeType)
whenever(fetcher.invoke(barCode)) doReturn
flowOf()
whenever(persister.read(barCode)) doReturn DISK
val value = simpleStore.get(barCode)
assertThat(value).isEqualTo(DISK)
}
@Test
fun `GIVEN no new data WHEN fresh THEN returns disk data`() = testScope.runBlockingTest {
val simpleStore = TestStoreBuilder.from(
scope = testScope,
fetcher = fetcher,
persister = persister
).build(storeType)
whenever(fetcher.invoke(barCode)) doReturn
flowOf()
whenever(persister.read(barCode)) doReturn DISK
val value = simpleStore.fresh(barCode)
assertThat(value).isEqualTo(DISK)
}
companion object {
private const val DISK = "disk"
private const val NETWORK = "fresh"

View file

@ -40,6 +40,7 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
@ -325,7 +326,12 @@ class FlowStoreTest {
fun diskChangeWhileNetworkIsFlowing_simple() = testScope.runBlockingTest {
val persister = InMemoryPersister<Int, String>().asFlowable()
val pipeline = StoreBuilder.from(
Fetcher.ofFlow { flow {} },
Fetcher.ofFlow {
flow {
delay(20)
emit("three-1")
}
},
sourceOfTruth = persister.asSourceOfTruth()
)
.disableCache()
@ -343,7 +349,12 @@ class FlowStoreTest {
Data(
value = "local-1",
origin = ResponseOrigin.SourceOfTruth
),
Data(
value = "three-1",
origin = ResponseOrigin.Fetcher
)
)
}
@ -441,6 +452,138 @@ class FlowStoreTest {
)
}
@Test
fun `GIVEN SoT WHEN stream fresh data returns no data from fetcher THEN fetch returns no data AND cached values are recevied`() =
testScope.runBlockingTest {
val persister = InMemoryPersister<Int, String>().asFlowable()
val pipeline = StoreBuilder.from(
fetcher = Fetcher.ofFlow { flow {} },
sourceOfTruth = persister.asSourceOfTruth()
)
.buildWithTestScope()
persister.flowWriter(3, "local-1")
val firstFetch = pipeline.fresh(3) // prime the cache
assertThat(firstFetch).isEqualTo("local-1")
assertThat(pipeline.stream(StoreRequest.fresh(3)))
.emitsExactly(
Loading(
origin = ResponseOrigin.Fetcher
),
StoreResponse.NoNewData(
origin = ResponseOrigin.Fetcher
),
Data(
value = "local-1",
origin = ResponseOrigin.Cache
),
Data(
value = "local-1",
origin = ResponseOrigin.SourceOfTruth
)
)
}
@Test
fun `GIVEN SoT WHEN stream cached data with refresh returns NoNewData THEN cached values are recevied AND fetch returns no data`() =
testScope.runBlockingTest {
val persister = InMemoryPersister<Int, String>().asFlowable()
val pipeline = StoreBuilder.from(
fetcher = Fetcher.ofFlow { flow {} },
sourceOfTruth = persister.asSourceOfTruth()
)
.buildWithTestScope()
persister.flowWriter(3, "local-1")
val firstFetch = pipeline.fresh(3) // prime the cache
assertThat(firstFetch).isEqualTo("local-1")
assertThat(pipeline.stream(StoreRequest.cached(3, refresh = true)))
.emitsExactly(
Data(
value = "local-1",
origin = ResponseOrigin.Cache
),
Data(
value = "local-1",
origin = ResponseOrigin.SourceOfTruth
),
Loading(
origin = ResponseOrigin.Fetcher
),
StoreResponse.NoNewData(
origin = ResponseOrigin.Fetcher
)
)
}
@Test
fun `GIVEN no SoT WHEN stream fresh data returns no data from fetcher THEN fetch returns no data AND cached values are recevied`() =
testScope.runBlockingTest {
var createCount = 0
val pipeline = StoreBuilder.from<Int, String>(
fetcher = Fetcher.ofFlow {
if (createCount++ == 0) {
flowOf("remote-1")
} else {
flowOf()
}
}
)
.buildWithTestScope()
val firstFetch = pipeline.fresh(3) // prime the cache
assertThat(firstFetch).isEqualTo("remote-1")
assertThat(pipeline.stream(StoreRequest.fresh(3)))
.emitsExactly(
Loading(
origin = ResponseOrigin.Fetcher
),
StoreResponse.NoNewData(
origin = ResponseOrigin.Fetcher
),
Data(
value = "remote-1",
origin = ResponseOrigin.Cache
)
)
}
@Test
fun `GIVEN no SoT WHEN stream cached data with refresh returns NoNewData THEN cached values are recevied AND fetch returns no data`() =
testScope.runBlockingTest {
var createCount = 0
val pipeline = StoreBuilder.from<Int, String>(
fetcher = Fetcher.ofFlow {
if (createCount++ == 0) {
flowOf("remote-1")
} else {
flowOf()
}
}
)
.buildWithTestScope()
val firstFetch = pipeline.fresh(3) // prime the cache
assertThat(firstFetch).isEqualTo("remote-1")
assertThat(pipeline.stream(StoreRequest.cached(3, refresh = true)))
.emitsExactly(
Data(
value = "remote-1",
origin = ResponseOrigin.Cache
),
Loading(
origin = ResponseOrigin.Fetcher
),
StoreResponse.NoNewData(
origin = ResponseOrigin.Fetcher
)
)
}
@Test
fun `GIVEN no sourceOfTruth and cache hit WHEN stream cached data without refresh THEN no fetch is triggered AND receives following network updates`() =
testScope.runBlockingTest {