Suspend cache (#8)
This PR adds a cache implementation that has all APIs as suspend functions. It uses guava's cache under the hood and handles query deduplication in a custom Entry type that we keep in the cache
This commit is contained in:
parent
823dbcc3e2
commit
dc9b1bd51c
22 changed files with 580 additions and 134 deletions
|
@ -61,6 +61,7 @@ dependencies {
|
||||||
implementation project(':middleware')
|
implementation project(':middleware')
|
||||||
implementation project(':middleware-moshi')
|
implementation project(':middleware-moshi')
|
||||||
implementation project(':filesystem')
|
implementation project(':filesystem')
|
||||||
|
implementation project(':suspendCache')
|
||||||
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$versions.kotlin"
|
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$versions.kotlin"
|
||||||
|
|
||||||
implementation libraries.coroutinesCore
|
implementation libraries.coroutinesCore
|
||||||
|
|
|
@ -2,7 +2,7 @@ apply from: 'buildsystem/dependencies.gradle'
|
||||||
|
|
||||||
// Top-level build file where you can add configuration options common to all sub-projects/modules.
|
// Top-level build file where you can add configuration options common to all sub-projects/modules.
|
||||||
buildscript {
|
buildscript {
|
||||||
ext.kotlin_version = '1.3.31'
|
ext.kotlin_version = '1.3.40'
|
||||||
repositories {
|
repositories {
|
||||||
mavenCentral()
|
mavenCentral()
|
||||||
maven {
|
maven {
|
||||||
|
|
|
@ -14,7 +14,7 @@ ext.versions = [
|
||||||
targetSdk : 28,
|
targetSdk : 28,
|
||||||
compileSdk : 28,
|
compileSdk : 28,
|
||||||
buildTools : '28.0.3',
|
buildTools : '28.0.3',
|
||||||
kotlin : '1.3.0',
|
kotlin : '1.3.40',
|
||||||
|
|
||||||
// UI libs.
|
// UI libs.
|
||||||
supportLibs : '28.0.0',
|
supportLibs : '28.0.0',
|
||||||
|
@ -53,7 +53,7 @@ ext.versions = [
|
||||||
supportTestRunner : '0.4.1',
|
supportTestRunner : '0.4.1',
|
||||||
espresso : '2.2.1',
|
espresso : '2.2.1',
|
||||||
compileTesting : '0.8',
|
compileTesting : '0.8',
|
||||||
coroutines : '1.2.1',
|
coroutines : '1.2.2',
|
||||||
]
|
]
|
||||||
|
|
||||||
ext.libraries = [
|
ext.libraries = [
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
include ':app', ':store', ':store-kotlin', ':middleware', ':cache', ':filesystem', ':middleware-moshi'
|
include ':app', ':store', ':store-kotlin', ':middleware', ':cache', ':filesystem', ':middleware-moshi', ':suspendCache'
|
||||||
|
|
|
@ -16,6 +16,7 @@ javadoc.dependsOn dokka
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation project(':store')
|
implementation project(':store')
|
||||||
apiElements project(':store')
|
apiElements project(':store')
|
||||||
|
implementation project(':suspendCache')
|
||||||
implementation libraries.kotlinStdLib
|
implementation libraries.kotlinStdLib
|
||||||
apiElements libraries.kotlinStdLib
|
apiElements libraries.kotlinStdLib
|
||||||
|
|
||||||
|
|
|
@ -24,8 +24,10 @@ version = VERSION_NAME
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
|
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
|
||||||
implementation project(path: ':cache')
|
implementation project(path: ':cache')
|
||||||
|
implementation project(path: ':suspendCache')
|
||||||
implementation libraries.coroutinesCore
|
implementation libraries.coroutinesCore
|
||||||
|
|
||||||
|
|
||||||
compileOnly libraries.jsr305
|
compileOnly libraries.jsr305
|
||||||
|
|
||||||
testImplementation libraries.mockito
|
testImplementation libraries.mockito
|
||||||
|
|
|
@ -1,68 +0,0 @@
|
||||||
package com.nytimes.android.external.store3.base.impl
|
|
||||||
|
|
||||||
import com.nytimes.android.external.cache3.Cache
|
|
||||||
import com.nytimes.android.external.cache3.CacheBuilder
|
|
||||||
import com.nytimes.android.external.cache3.CacheLoader
|
|
||||||
import com.nytimes.android.external.cache3.LoadingCache
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
|
|
||||||
object CacheFactory {
|
|
||||||
|
|
||||||
internal fun <Key, Parsed> createCache(memoryPolicy: MemoryPolicy?, cacheLoader: CacheLoader<Key, Parsed>): LoadingCache<Key, Parsed> {
|
|
||||||
return createBaseCache(memoryPolicy, cacheLoader)
|
|
||||||
}
|
|
||||||
|
|
||||||
internal fun <Key, Parsed> createInflighter(memoryPolicy: MemoryPolicy?): Cache<Key, Parsed> {
|
|
||||||
return createBaseInFlighter(memoryPolicy)
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun <Key, Value> createBaseInFlighter(memoryPolicy: MemoryPolicy?): Cache<Key, Value> {
|
|
||||||
val expireAfterToSeconds = memoryPolicy?.expireAfterTimeUnit?.toSeconds(memoryPolicy.expireAfterWrite)
|
|
||||||
?: StoreDefaults.cacheTTLTimeUnit
|
|
||||||
.toSeconds(StoreDefaults.cacheTTL)
|
|
||||||
val maximumInFlightRequestsDuration = TimeUnit.MINUTES.toSeconds(1)
|
|
||||||
|
|
||||||
return if (expireAfterToSeconds > maximumInFlightRequestsDuration) {
|
|
||||||
CacheBuilder
|
|
||||||
.newBuilder()
|
|
||||||
.expireAfterWrite(maximumInFlightRequestsDuration, TimeUnit.SECONDS)
|
|
||||||
.build()
|
|
||||||
} else {
|
|
||||||
val expireAfter = memoryPolicy?.expireAfterWrite ?: StoreDefaults.cacheTTL
|
|
||||||
val expireAfterUnit = if (memoryPolicy == null)
|
|
||||||
StoreDefaults.cacheTTLTimeUnit
|
|
||||||
else
|
|
||||||
memoryPolicy.expireAfterTimeUnit
|
|
||||||
CacheBuilder.newBuilder()
|
|
||||||
.expireAfterWrite(expireAfter, expireAfterUnit)
|
|
||||||
.build()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private fun <Key, Value> createBaseCache(memoryPolicy: MemoryPolicy?,
|
|
||||||
cacheLoader: CacheLoader<Key,Value>): LoadingCache<Key, Value> {
|
|
||||||
return if (memoryPolicy == null) {
|
|
||||||
CacheBuilder
|
|
||||||
.newBuilder()
|
|
||||||
.maximumSize(StoreDefaults.cacheSize)
|
|
||||||
.expireAfterWrite(StoreDefaults.cacheTTL, StoreDefaults.cacheTTLTimeUnit)
|
|
||||||
.build(cacheLoader)
|
|
||||||
} else {
|
|
||||||
if (memoryPolicy.expireAfterAccess == MemoryPolicy.DEFAULT_POLICY) {
|
|
||||||
CacheBuilder
|
|
||||||
.newBuilder()
|
|
||||||
.maximumSize(memoryPolicy.maxSize)
|
|
||||||
.expireAfterWrite(memoryPolicy.expireAfterWrite, memoryPolicy.expireAfterTimeUnit)
|
|
||||||
.build(cacheLoader)
|
|
||||||
} else {
|
|
||||||
CacheBuilder
|
|
||||||
.newBuilder()
|
|
||||||
.maximumSize(memoryPolicy.maxSize)
|
|
||||||
.expireAfterAccess(memoryPolicy.expireAfterAccess, memoryPolicy.expireAfterTimeUnit)
|
|
||||||
.build(cacheLoader)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -55,13 +55,13 @@ interface Store<T, V> {
|
||||||
/**
|
/**
|
||||||
* Clear the memory cache of all entries
|
* Clear the memory cache of all entries
|
||||||
*/
|
*/
|
||||||
fun clearMemory()
|
suspend fun clearMemory()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Purge a particular entry from memory and disk cache.
|
* Purge a particular entry from memory and disk cache.
|
||||||
* Persister will only be cleared if they implements Clearable
|
* Persister will only be cleared if they implements Clearable
|
||||||
*/
|
*/
|
||||||
fun clear(key: V)
|
suspend fun clear(key: V)
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
fun <V, K> from(inflight: Boolean = true, f: suspend (K) -> V) =
|
fun <V, K> from(inflight: Boolean = true, f: suspend (K) -> V) =
|
||||||
|
|
|
@ -22,4 +22,10 @@ internal object StoreDefaults {
|
||||||
|
|
||||||
val cacheTTLTimeUnit: TimeUnit
|
val cacheTTLTimeUnit: TimeUnit
|
||||||
get() = TimeUnit.SECONDS
|
get() = TimeUnit.SECONDS
|
||||||
|
|
||||||
|
val memoryPolicy = MemoryPolicy.builder()
|
||||||
|
.setMemorySize(cacheSize)
|
||||||
|
.setExpireAfterWrite(cacheTTL)
|
||||||
|
.setExpireAfterTimeUnit(cacheTTLTimeUnit)
|
||||||
|
.build()
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,10 @@
|
||||||
package com.nytimes.android.external.store3.base.wrappers
|
package com.nytimes.android.external.store3.base.wrappers
|
||||||
|
|
||||||
import com.nytimes.android.external.cache3.CacheLoader
|
import com.com.nytimes.suspendCache.StoreCache
|
||||||
import com.nytimes.android.external.cache3.LoadingCache
|
|
||||||
import com.nytimes.android.external.store3.base.impl.CacheFactory
|
|
||||||
import com.nytimes.android.external.store3.base.impl.MemoryPolicy
|
import com.nytimes.android.external.store3.base.impl.MemoryPolicy
|
||||||
import com.nytimes.android.external.store3.base.impl.Store
|
import com.nytimes.android.external.store3.base.impl.Store
|
||||||
import kotlinx.coroutines.*
|
import com.nytimes.android.external.store3.base.impl.StoreDefaults
|
||||||
|
import kotlinx.coroutines.FlowPreview
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
|
||||||
fun <V, K> Store4Builder<V, K>.cache(
|
fun <V, K> Store4Builder<V, K>.cache(
|
||||||
|
@ -17,42 +16,26 @@ internal class MemoryCacheStore<V, K>(
|
||||||
memoryPolicy: MemoryPolicy?
|
memoryPolicy: MemoryPolicy?
|
||||||
) : Store<V, K> {
|
) : Store<V, K> {
|
||||||
|
|
||||||
//TODO this could be a Cache<K, V> but it uses a deferred because memCache.get doesn't support suspending methods
|
private val memCache = StoreCache.from(
|
||||||
private val memCache: LoadingCache<K, Deferred<V>> = CacheFactory.createCache(memoryPolicy,
|
loader = { key: K ->
|
||||||
object : CacheLoader<K, Deferred<V>>() {
|
wrappedStore.get(key)
|
||||||
override fun load(key: K): Deferred<V>? =
|
},
|
||||||
memoryScope.async {
|
memoryPolicy = memoryPolicy ?: StoreDefaults.memoryPolicy
|
||||||
wrappedStore.get(key)
|
)
|
||||||
}
|
|
||||||
|
|
||||||
})
|
override suspend fun get(key: K): V = memCache.get(key)
|
||||||
private val memoryScope = CoroutineScope(SupervisorJob())
|
|
||||||
|
|
||||||
|
override suspend fun fresh(key: K): V = memCache.fresh(key)
|
||||||
override suspend fun get(key: K): V {
|
|
||||||
return try {
|
|
||||||
memCache.get(key)!!.await()
|
|
||||||
} catch (e: Exception) {
|
|
||||||
memCache.invalidate(key)
|
|
||||||
throw e
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override suspend fun fresh(key: K): V {
|
|
||||||
val value = wrappedStore.fresh(key)
|
|
||||||
memCache.put(key, memoryScope.async { value })
|
|
||||||
return value
|
|
||||||
}
|
|
||||||
|
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
override fun stream(): Flow<Pair<K, V>> = wrappedStore.stream()
|
override fun stream(): Flow<Pair<K, V>> = wrappedStore.stream()
|
||||||
|
|
||||||
override fun clearMemory() {
|
override suspend fun clearMemory() {
|
||||||
memCache.invalidateAll()
|
memCache.clearAll()
|
||||||
wrappedStore.clearMemory()
|
wrappedStore.clearMemory()
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun clear(key: K) {
|
override suspend fun clear(key: K) {
|
||||||
memCache.invalidate(key)
|
memCache.invalidate(key)
|
||||||
wrappedStore.clear(key)
|
wrappedStore.clear(key)
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,10 +34,10 @@ internal class FetcherStore<Raw, Key>(
|
||||||
.drop(1)
|
.drop(1)
|
||||||
.map { it!! }
|
.map { it!! }
|
||||||
|
|
||||||
override fun clearMemory() {
|
override suspend fun clearMemory() {
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun clear(key: Key) {
|
override suspend fun clear(key: Key) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,26 +1,26 @@
|
||||||
package com.nytimes.android.external.store3.base.wrappers
|
package com.nytimes.android.external.store3.base.wrappers
|
||||||
|
|
||||||
import com.nytimes.android.external.cache3.Cache
|
import com.com.nytimes.suspendCache.StoreCache
|
||||||
import com.nytimes.android.external.store3.base.impl.CacheFactory
|
|
||||||
import com.nytimes.android.external.store3.base.impl.MemoryPolicy
|
import com.nytimes.android.external.store3.base.impl.MemoryPolicy
|
||||||
import com.nytimes.android.external.store3.base.impl.Store
|
import com.nytimes.android.external.store3.base.impl.Store
|
||||||
import kotlinx.coroutines.*
|
import com.nytimes.android.external.store3.base.impl.StoreDefaults
|
||||||
|
import kotlinx.coroutines.FlowPreview
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
|
||||||
internal class InflightStore<V, K>(
|
internal class InflightStore<V, K>(
|
||||||
private val wrappedStore: Store<V, K>,
|
private val wrappedStore: Store<V, K>,
|
||||||
memoryPolicy: MemoryPolicy?
|
memoryPolicy: MemoryPolicy?
|
||||||
) : Store<V, K> {
|
) : Store<V, K> {
|
||||||
|
private val inFlightRequests = StoreCache.from(
|
||||||
private val inFlightRequests: Cache<K, Deferred<V>> = CacheFactory.createInflighter(memoryPolicy)
|
loader = { key: K ->
|
||||||
|
wrappedStore.get(key)
|
||||||
private val inFlightScope = CoroutineScope(SupervisorJob())
|
},
|
||||||
|
memoryPolicy = memoryPolicy ?: StoreDefaults.memoryPolicy
|
||||||
|
)
|
||||||
|
|
||||||
override suspend fun get(key: K): V {
|
override suspend fun get(key: K): V {
|
||||||
return try {
|
return try {
|
||||||
inFlightRequests
|
inFlightRequests.get(key)
|
||||||
.get(key) { inFlightScope.async { wrappedStore.get(key) } }
|
|
||||||
.await()
|
|
||||||
} finally {
|
} finally {
|
||||||
inFlightRequests.invalidate(key)
|
inFlightRequests.invalidate(key)
|
||||||
}
|
}
|
||||||
|
@ -28,9 +28,7 @@ internal class InflightStore<V, K>(
|
||||||
|
|
||||||
override suspend fun fresh(key: K): V {
|
override suspend fun fresh(key: K): V {
|
||||||
return try {
|
return try {
|
||||||
inFlightRequests
|
inFlightRequests.fresh(key)
|
||||||
.get(key) { inFlightScope.async { wrappedStore.fresh(key) } }
|
|
||||||
.await()
|
|
||||||
} finally {
|
} finally {
|
||||||
inFlightRequests.invalidate(key)
|
inFlightRequests.invalidate(key)
|
||||||
}
|
}
|
||||||
|
@ -39,12 +37,12 @@ internal class InflightStore<V, K>(
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
override fun stream(): Flow<Pair<K, V>> = wrappedStore.stream()
|
override fun stream(): Flow<Pair<K, V>> = wrappedStore.stream()
|
||||||
|
|
||||||
override fun clearMemory() {
|
override suspend fun clearMemory() {
|
||||||
inFlightRequests.invalidateAll()
|
inFlightRequests.clearAll()
|
||||||
wrappedStore.clearMemory()
|
wrappedStore.clearMemory()
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun clear(key: K) {
|
override suspend fun clear(key: K) {
|
||||||
inFlightRequests.invalidate(key)
|
inFlightRequests.invalidate(key)
|
||||||
wrappedStore.clear(key)
|
wrappedStore.clear(key)
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,11 +30,11 @@ internal class ParserStore<V1, V2, K>(
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
override fun stream(): Flow<Pair<K, V2>> = wrappedStore.stream().map { (key, value) -> key to parser.apply(key, value) }
|
override fun stream(): Flow<Pair<K, V2>> = wrappedStore.stream().map { (key, value) -> key to parser.apply(key, value) }
|
||||||
|
|
||||||
override fun clearMemory() {
|
override suspend fun clearMemory() {
|
||||||
wrappedStore.clearMemory()
|
wrappedStore.clearMemory()
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun clear(key: K) {
|
override suspend fun clear(key: K) {
|
||||||
wrappedStore.clear(key)
|
wrappedStore.clear(key)
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -5,11 +5,8 @@ import com.nytimes.android.external.store3.base.impl.StalePolicy
|
||||||
import com.nytimes.android.external.store3.base.impl.StalePolicy.*
|
import com.nytimes.android.external.store3.base.impl.StalePolicy.*
|
||||||
import com.nytimes.android.external.store3.base.impl.Store
|
import com.nytimes.android.external.store3.base.impl.Store
|
||||||
import com.nytimes.android.external.store3.base.impl.StoreUtil
|
import com.nytimes.android.external.store3.base.impl.StoreUtil
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.*
|
||||||
import kotlinx.coroutines.FlowPreview
|
|
||||||
import kotlinx.coroutines.SupervisorJob
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.launch
|
|
||||||
|
|
||||||
fun <V, K> Store4Builder<V, K>.persister(
|
fun <V, K> Store4Builder<V, K>.persister(
|
||||||
persister: Persister<V, K>,
|
persister: Persister<V, K>,
|
||||||
|
@ -60,11 +57,14 @@ internal class PersisterStore<V, K>(
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
override fun stream(): Flow<Pair<K, V>> = wrappedStore.stream()
|
override fun stream(): Flow<Pair<K, V>> = wrappedStore.stream()
|
||||||
|
|
||||||
override fun clearMemory() {
|
override suspend fun clearMemory() {
|
||||||
wrappedStore.clearMemory()
|
wrappedStore.clearMemory()
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun clear(key: K) {
|
override suspend fun clear(key: K) {
|
||||||
StoreUtil.clearPersister<Any, K>(persister, key)
|
// TODO we should somehow receive it or not make this suspend
|
||||||
|
withContext(Dispatchers.IO) {
|
||||||
|
StoreUtil.clearPersister<Any, K>(persister, key)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
2
suspendCache/README.md
Normal file
2
suspendCache/README.md
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
re-implementation for:
|
||||||
|
https://github.com/nytimes/Store/compare/tech/removeCache
|
18
suspendCache/build.gradle
Normal file
18
suspendCache/build.gradle
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
plugins {
|
||||||
|
id 'org.jetbrains.kotlin.jvm'
|
||||||
|
}
|
||||||
|
group = GROUP
|
||||||
|
version = VERSION_NAME
|
||||||
|
sourceCompatibility = "8"
|
||||||
|
targetCompatibility = "8"
|
||||||
|
|
||||||
|
dependencies {
|
||||||
|
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
|
||||||
|
implementation project(path: ":cache")
|
||||||
|
implementation libraries.coroutinesCore
|
||||||
|
testImplementation libraries.junit
|
||||||
|
testImplementation libraries.coroutinesTest
|
||||||
|
testImplementation libraries.assertJ
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,60 @@
|
||||||
|
package com.com.nytimes.suspendCache
|
||||||
|
|
||||||
|
import com.nytimes.android.external.cache3.CacheBuilder
|
||||||
|
import com.nytimes.android.external.cache3.CacheLoader
|
||||||
|
import com.nytimes.android.external.cache3.Ticker
|
||||||
|
import com.nytimes.android.external.store3.base.impl.MemoryPolicy
|
||||||
|
|
||||||
|
internal class RealStoreCache<K, V>(
|
||||||
|
private val loader: suspend (K) -> V,
|
||||||
|
private val memoryPolicy: MemoryPolicy,
|
||||||
|
ticker: Ticker = Ticker.systemTicker()
|
||||||
|
) : StoreCache<K, V> {
|
||||||
|
private val realCache = CacheBuilder.newBuilder()
|
||||||
|
.ticker(ticker)
|
||||||
|
.also {
|
||||||
|
if (memoryPolicy.hasAccessPolicy()) {
|
||||||
|
it.expireAfterAccess(memoryPolicy.expireAfterAccess, memoryPolicy.expireAfterTimeUnit)
|
||||||
|
}
|
||||||
|
if (memoryPolicy.hasWritePolicy()) {
|
||||||
|
it.expireAfterWrite(memoryPolicy.expireAfterWrite, memoryPolicy.expireAfterTimeUnit)
|
||||||
|
}
|
||||||
|
if (memoryPolicy.hasMaxSize()) {
|
||||||
|
it.maximumSize(memoryPolicy.maxSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.build(object : CacheLoader<K, StoreRecord<K, V>>() {
|
||||||
|
override fun load(key: K): StoreRecord<K, V>? {
|
||||||
|
return StoreRecord(
|
||||||
|
key = key,
|
||||||
|
loader = loader)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
override suspend fun fresh(key: K): V {
|
||||||
|
return realCache.get(key)!!.freshValue()
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun get(key: K): V {
|
||||||
|
return realCache.get(key)!!.value()
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun put(key: K, value: V) {
|
||||||
|
realCache.put(key, StoreRecord(
|
||||||
|
key = key,
|
||||||
|
loader = loader,
|
||||||
|
precomputedValue = value))
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun invalidate(key: K) {
|
||||||
|
realCache.invalidate(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun clearAll() {
|
||||||
|
realCache.cleanUp()
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun getIfPresent(key: K): V? {
|
||||||
|
return realCache.getIfPresent(key)?.cachedValue()
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
package com.com.nytimes.suspendCache
|
||||||
|
|
||||||
|
import com.nytimes.android.external.store3.base.impl.MemoryPolicy
|
||||||
|
|
||||||
|
typealias Loader<K, V> = suspend (K) -> V
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cache definition used by Store internally.
|
||||||
|
*/
|
||||||
|
interface StoreCache<K, V> {
|
||||||
|
suspend fun get(key: K): V
|
||||||
|
suspend fun fresh(key: K): V
|
||||||
|
suspend fun put(key: K, value: V)
|
||||||
|
suspend fun invalidate(key: K)
|
||||||
|
suspend fun clearAll()
|
||||||
|
suspend fun getIfPresent(key: K): V?
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
fun <K, V> from(
|
||||||
|
loader: suspend (K) -> V,
|
||||||
|
memoryPolicy: MemoryPolicy
|
||||||
|
): StoreCache<K, V> {
|
||||||
|
return RealStoreCache(
|
||||||
|
loader = loader,
|
||||||
|
memoryPolicy = memoryPolicy
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,62 @@
|
||||||
|
package com.com.nytimes.suspendCache
|
||||||
|
|
||||||
|
import kotlinx.coroutines.sync.Mutex
|
||||||
|
import kotlinx.coroutines.sync.withLock
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The value we keep in guava's cache and it handles custom logic for store
|
||||||
|
* * not caching failures
|
||||||
|
* * not having concurrent fetches for the same key
|
||||||
|
* * maybe fresh?
|
||||||
|
* * deduplication
|
||||||
|
*/
|
||||||
|
internal class StoreRecord<K, V>(
|
||||||
|
private val key: K,
|
||||||
|
precomputedValue : V? = null,
|
||||||
|
private val loader: Loader<K, V>
|
||||||
|
) {
|
||||||
|
private var inFlight = Mutex(false)
|
||||||
|
@Volatile
|
||||||
|
private var _value: V? = precomputedValue
|
||||||
|
|
||||||
|
fun cachedValue(): V? = _value
|
||||||
|
|
||||||
|
suspend fun freshValue(): V {
|
||||||
|
// first try to lock inflight request so that we can avoid get() from making a call
|
||||||
|
// but if we failed to lock, just request w/o a lock.
|
||||||
|
// we want fresh to be really fresh and we don't want it to wait for another request
|
||||||
|
if (inFlight.tryLock()) {
|
||||||
|
try {
|
||||||
|
return internalDoLoadAndCache()
|
||||||
|
} finally {
|
||||||
|
inFlight.unlock()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return inFlight.withLock {
|
||||||
|
return internalDoLoadAndCache()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private inline suspend fun internalDoLoadAndCache(): V {
|
||||||
|
return runCatching {
|
||||||
|
loader(key)
|
||||||
|
}.also {
|
||||||
|
it.getOrNull()?.let {
|
||||||
|
_value = it
|
||||||
|
}
|
||||||
|
}.getOrThrow()
|
||||||
|
}
|
||||||
|
|
||||||
|
suspend fun value(): V {
|
||||||
|
val cached = _value
|
||||||
|
if (cached != null) {
|
||||||
|
return cached
|
||||||
|
}
|
||||||
|
return inFlight.withLock {
|
||||||
|
_value?.let {
|
||||||
|
return it
|
||||||
|
} ?: internalDoLoadAndCache()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,8 +1,6 @@
|
||||||
package com.nytimes.android.external.store3.base.impl
|
package com.nytimes.android.external.store3.base.impl
|
||||||
|
|
||||||
|
|
||||||
import com.nytimes.android.external.store3.util.NoopPersister
|
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
/**
|
/**
|
|
@ -0,0 +1,166 @@
|
||||||
|
package com.nytimes.suspendCache
|
||||||
|
|
||||||
|
import com.com.nytimes.suspendCache.RealStoreCache
|
||||||
|
import com.nytimes.android.external.cache3.Ticker
|
||||||
|
import com.nytimes.android.external.store3.base.impl.MemoryPolicy
|
||||||
|
import kotlinx.coroutines.CompletableDeferred
|
||||||
|
import kotlinx.coroutines.Deferred
|
||||||
|
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||||
|
import kotlinx.coroutines.async
|
||||||
|
import kotlinx.coroutines.test.TestCoroutineScope
|
||||||
|
import kotlinx.coroutines.test.runBlockingTest
|
||||||
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
|
import org.assertj.core.api.Assertions.fail
|
||||||
|
import org.junit.Test
|
||||||
|
import org.junit.runner.RunWith
|
||||||
|
import org.junit.runners.JUnit4
|
||||||
|
import java.util.*
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
|
@Suppress("UsePropertyAccessSyntax") // for isTrue() / isFalse()
|
||||||
|
@ExperimentalCoroutinesApi
|
||||||
|
@RunWith(JUnit4::class)
|
||||||
|
class RealStoreCacheTest {
|
||||||
|
private val testScope = TestCoroutineScope()
|
||||||
|
private val loader = TestLoader()
|
||||||
|
private val ticker = object : Ticker() {
|
||||||
|
override fun read(): Long {
|
||||||
|
return TimeUnit.MILLISECONDS.toNanos(testScope.currentTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun sanity_notEnquedShouldThrow() = testScope.runBlockingTest {
|
||||||
|
val cache = createCache()
|
||||||
|
try {
|
||||||
|
cache.get("unused key")
|
||||||
|
fail("should've failed")
|
||||||
|
} catch (assertionError: AssertionError) {
|
||||||
|
assertThat(assertionError.localizedMessage).isEqualTo("nothing enqueued")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun cache() = testScope.runBlockingTest {
|
||||||
|
val cache = createCache()
|
||||||
|
loader.enqueueResponse("foo", "bar")
|
||||||
|
assertThat(cache.get("foo")).isEqualTo("bar")
|
||||||
|
// get again, is cached
|
||||||
|
assertThat(cache.get("foo")).isEqualTo("bar")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun cache_expired() = testScope.runBlockingTest {
|
||||||
|
val cache = createCache(
|
||||||
|
MemoryPolicy.builder()
|
||||||
|
.setExpireAfterAccess(10)
|
||||||
|
.setExpireAfterTimeUnit(TimeUnit.MILLISECONDS)
|
||||||
|
.build())
|
||||||
|
loader.enqueueResponse("foo", "bar")
|
||||||
|
loader.enqueueResponse("foo", "bar_updated")
|
||||||
|
assertThat(cache.get("foo")).isEqualTo("bar")
|
||||||
|
// get again, is cached
|
||||||
|
assertThat(cache.get("foo")).isEqualTo("bar")
|
||||||
|
testScope.advanceTimeBy(11)
|
||||||
|
assertThat(cache.get("foo")).isEqualTo("bar_updated")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun getIfPresent() = testScope.runBlockingTest {
|
||||||
|
val cache = createCache()
|
||||||
|
assertThat(cache.getIfPresent("foo")).isNull()
|
||||||
|
loader.enqueueResponse("foo", "bar")
|
||||||
|
assertThat(cache.getIfPresent("foo")).isNull()
|
||||||
|
assertThat(cache.get("foo")).isEqualTo("bar")
|
||||||
|
assertThat(cache.getIfPresent("foo")).isEqualTo("bar")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun getIfPresent_pendingFetch() = testScope.runBlockingTest {
|
||||||
|
val cache = createCache()
|
||||||
|
val deferredResult = CompletableDeferred<String>()
|
||||||
|
loader.enqueueResponse("foo", deferredResult)
|
||||||
|
val asyncGet = async {
|
||||||
|
cache.get("foo")
|
||||||
|
}
|
||||||
|
testScope.advanceUntilIdle()
|
||||||
|
assertThat(asyncGet.isCompleted).isFalse()
|
||||||
|
assertThat(cache.getIfPresent("foo")).isNull()
|
||||||
|
deferredResult.complete("bar")
|
||||||
|
testScope.advanceUntilIdle()
|
||||||
|
assertThat(asyncGet.isCompleted).isTrue()
|
||||||
|
assertThat(cache.getIfPresent("foo")).isEqualTo("bar")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun invalidate() = testScope.runBlockingTest {
|
||||||
|
val cache = createCache()
|
||||||
|
loader.enqueueResponse("foo", "bar")
|
||||||
|
loader.enqueueResponse("foo", "bar_updated")
|
||||||
|
assertThat(cache.get("foo")).isEqualTo("bar")
|
||||||
|
cache.invalidate("foo")
|
||||||
|
assertThat(cache.get("foo")).isEqualTo("bar_updated")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun clearAll() = testScope.runBlockingTest {
|
||||||
|
val cache = createCache()
|
||||||
|
loader.enqueueResponse("foo", "bar")
|
||||||
|
loader.enqueueResponse("foo", "bar_updated")
|
||||||
|
loader.enqueueResponse("baz", "bat")
|
||||||
|
loader.enqueueResponse("baz", "bat_updated")
|
||||||
|
|
||||||
|
assertThat(cache.get("foo")).isEqualTo("bar")
|
||||||
|
assertThat(cache.get("baz")).isEqualTo("bat")
|
||||||
|
cache.clearAll()
|
||||||
|
assertThat(cache.get("foo")).isEqualTo("bar_updated")
|
||||||
|
assertThat(cache.get("baz")).isEqualTo("bat_updated")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun put() = testScope.runBlockingTest {
|
||||||
|
val cache = createCache()
|
||||||
|
cache.put("foo", "bar")
|
||||||
|
assertThat(cache.get("foo")).isEqualTo("bar")
|
||||||
|
cache.put("foo", "bar_updated")
|
||||||
|
assertThat(cache.get("foo")).isEqualTo("bar_updated")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun fresh() = testScope.runBlockingTest {
|
||||||
|
val cache = createCache()
|
||||||
|
cache.put("foo", "bar")
|
||||||
|
loader.enqueueResponse("foo", "bar_updated")
|
||||||
|
assertThat(cache.fresh("foo")).isEqualTo("bar_updated")
|
||||||
|
assertThat(cache.get("foo")).isEqualTo("bar_updated")
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun createCache(
|
||||||
|
memoryPolicy: MemoryPolicy = MemoryPolicy.builder().build()
|
||||||
|
): RealStoreCache<String, String> {
|
||||||
|
return RealStoreCache(
|
||||||
|
loader = loader::invoke,
|
||||||
|
ticker = ticker,
|
||||||
|
memoryPolicy = memoryPolicy
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class TestLoader {
|
||||||
|
private val enqueued = mutableMapOf<String, LinkedList<Deferred<String>>>()
|
||||||
|
|
||||||
|
fun enqueueResponse(key: String, value: String) {
|
||||||
|
enqueueResponse(key, CompletableDeferred(value))
|
||||||
|
}
|
||||||
|
|
||||||
|
fun enqueueResponse(key: String, deferred: Deferred<String>) {
|
||||||
|
enqueued.getOrPut(key) {
|
||||||
|
LinkedList()
|
||||||
|
}.add(deferred)
|
||||||
|
}
|
||||||
|
|
||||||
|
suspend fun invoke(key: String): String {
|
||||||
|
val response = enqueued[key]?.pop() ?: throw AssertionError("nothing enqueued")
|
||||||
|
return response.await()
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,188 @@
|
||||||
|
package com.nytimes.suspendCache
|
||||||
|
|
||||||
|
import com.com.nytimes.suspendCache.StoreRecord
|
||||||
|
import kotlinx.coroutines.CompletableDeferred
|
||||||
|
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||||
|
import kotlinx.coroutines.async
|
||||||
|
import kotlinx.coroutines.test.TestCoroutineScope
|
||||||
|
import kotlinx.coroutines.test.runBlockingTest
|
||||||
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
|
import org.junit.Test
|
||||||
|
import org.junit.runner.RunWith
|
||||||
|
import org.junit.runners.JUnit4
|
||||||
|
|
||||||
|
@Suppress("UsePropertyAccessSyntax") // for isTrue()/isFalse()
|
||||||
|
@ExperimentalCoroutinesApi
|
||||||
|
@RunWith(JUnit4::class)
|
||||||
|
class StoreRecordTest {
|
||||||
|
private val testScope = TestCoroutineScope()
|
||||||
|
@Test
|
||||||
|
fun precomputed() = testScope.runBlockingTest {
|
||||||
|
val record = StoreRecord(
|
||||||
|
key = "foo",
|
||||||
|
loader = { TODO() },
|
||||||
|
precomputedValue = "bar")
|
||||||
|
assertThat(record.cachedValue()).isEqualTo("bar")
|
||||||
|
assertThat(record.value()).isEqualTo("bar")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun fetched() = testScope.runBlockingTest {
|
||||||
|
val record = StoreRecord("foo") { key ->
|
||||||
|
assertThat(key).isEqualTo("foo")
|
||||||
|
"bar"
|
||||||
|
}
|
||||||
|
assertThat(record.value()).isEqualTo("bar")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun fetched_multipleValueGet() = testScope.runBlockingTest {
|
||||||
|
var runCount = 0
|
||||||
|
val record = StoreRecord("foo") {
|
||||||
|
runCount++
|
||||||
|
"bar"
|
||||||
|
}
|
||||||
|
assertThat(record.value()).isEqualTo("bar")
|
||||||
|
assertThat(record.value()).isEqualTo("bar")
|
||||||
|
assertThat(runCount).isEqualTo(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun fetched_multipleValueGet_firstError() = testScope.runBlockingTest {
|
||||||
|
var runCount = 0
|
||||||
|
val errorMsg = "i'd like to fail"
|
||||||
|
val record = StoreRecord("foo") {
|
||||||
|
runCount++
|
||||||
|
if (runCount == 1) {
|
||||||
|
|
||||||
|
throw RuntimeException(errorMsg)
|
||||||
|
} else {
|
||||||
|
"bar"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
val first = runCatching {
|
||||||
|
record.value()
|
||||||
|
}
|
||||||
|
assertThat(first.isFailure).isTrue()
|
||||||
|
assertThat(first.exceptionOrNull()?.localizedMessage).isEqualTo(errorMsg)
|
||||||
|
assertThat(record.value()).isEqualTo("bar")
|
||||||
|
assertThat(runCount).isEqualTo(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun fetched_multipleValueGet_firstOneFails_delayed() = testScope.runBlockingTest {
|
||||||
|
var runCount = 0
|
||||||
|
val firstResponse = CompletableDeferred<String>()
|
||||||
|
val secondResponse = CompletableDeferred<String>()
|
||||||
|
val errorMsg = "i'd like to fail"
|
||||||
|
val record = StoreRecord("foo") {
|
||||||
|
runCount++
|
||||||
|
if (runCount == 1) {
|
||||||
|
return@StoreRecord firstResponse.await()
|
||||||
|
} else {
|
||||||
|
return@StoreRecord secondResponse.await()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
val first = async {
|
||||||
|
record.value()
|
||||||
|
}
|
||||||
|
val second = async {
|
||||||
|
record.value()
|
||||||
|
}
|
||||||
|
testScope.advanceUntilIdle()
|
||||||
|
assertThat(first.isCompleted).isFalse()
|
||||||
|
assertThat(second.isCompleted).isFalse()
|
||||||
|
firstResponse.completeExceptionally(RuntimeException(errorMsg))
|
||||||
|
|
||||||
|
assertThat(first.isCompleted).isTrue()
|
||||||
|
assertThat(second.isCompleted).isFalse()
|
||||||
|
|
||||||
|
assertThat(first.getCompletionExceptionOrNull()?.localizedMessage).isEqualTo(errorMsg)
|
||||||
|
|
||||||
|
secondResponse.complete("bar")
|
||||||
|
assertThat(second.await()).isEqualTo("bar")
|
||||||
|
assertThat(runCount).isEqualTo(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun freshSimple_alreadyCached() = testScope.runBlockingTest {
|
||||||
|
var runCount = 0
|
||||||
|
val responses = listOf(
|
||||||
|
"bar",
|
||||||
|
"bar2"
|
||||||
|
)
|
||||||
|
val record = StoreRecord("foo") {
|
||||||
|
val index = runCount
|
||||||
|
runCount++
|
||||||
|
return@StoreRecord responses[index]
|
||||||
|
}
|
||||||
|
assertThat(record.value()).isEqualTo("bar")
|
||||||
|
assertThat(record.freshValue()).isEqualTo("bar2")
|
||||||
|
assertThat(record.value()).isEqualTo("bar2")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun freshSimple_notCached() = testScope.runBlockingTest {
|
||||||
|
val record = StoreRecord("foo") {
|
||||||
|
"bar"
|
||||||
|
}
|
||||||
|
assertThat(record.freshValue()).isEqualTo("bar")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun fresh_multipleParallel() = testScope.runBlockingTest {
|
||||||
|
val responses = listOf<CompletableDeferred<String>>(
|
||||||
|
CompletableDeferred(),
|
||||||
|
CompletableDeferred()
|
||||||
|
)
|
||||||
|
var runCount = 0
|
||||||
|
val record = StoreRecord("foo") {
|
||||||
|
val index = runCount
|
||||||
|
runCount++
|
||||||
|
responses[index].await()
|
||||||
|
}
|
||||||
|
val first = async {
|
||||||
|
record.freshValue()
|
||||||
|
}
|
||||||
|
val second = async {
|
||||||
|
record.freshValue()
|
||||||
|
}
|
||||||
|
assertThat(first.isActive).isTrue()
|
||||||
|
assertThat(second.isActive).isTrue()
|
||||||
|
responses[0].complete("bar")
|
||||||
|
assertThat(first.await()).isEqualTo("bar")
|
||||||
|
assertThat(second.isActive).isTrue()
|
||||||
|
responses[1].complete("bar2")
|
||||||
|
assertThat(second.await()).isEqualTo("bar2")
|
||||||
|
assertThat(runCount).isEqualTo(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun fresh_multipleParallel_firstOneFails() = testScope.runBlockingTest {
|
||||||
|
val responses = listOf(
|
||||||
|
CompletableDeferred<String>(),
|
||||||
|
CompletableDeferred()
|
||||||
|
)
|
||||||
|
var runCount = 0
|
||||||
|
val record = StoreRecord("foo") {
|
||||||
|
val index = runCount
|
||||||
|
runCount++
|
||||||
|
responses[index].await()
|
||||||
|
}
|
||||||
|
val first = async {
|
||||||
|
record.freshValue()
|
||||||
|
}
|
||||||
|
val second = async {
|
||||||
|
record.freshValue()
|
||||||
|
}
|
||||||
|
val errorMsg = "i'd like to fail"
|
||||||
|
assertThat(first.isActive).isTrue()
|
||||||
|
assertThat(second.isActive).isTrue()
|
||||||
|
responses[0].completeExceptionally(RuntimeException(errorMsg))
|
||||||
|
assertThat(first.getCompletionExceptionOrNull()?.localizedMessage).isEqualTo(errorMsg)
|
||||||
|
assertThat(second.isActive).isTrue()
|
||||||
|
responses[1].complete("bar")
|
||||||
|
assertThat(second.await()).isEqualTo("bar")
|
||||||
|
assertThat(runCount).isEqualTo(2)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue