release fetcher in nonCancellable scope (#432)

* release fetcher in nonCancellable scope

* replace wildcard imports
This commit is contained in:
lukisk 2022-04-23 02:20:25 +02:00 committed by GitHub
parent fb55f6a0c7
commit 4cc5b052d9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 1 deletions

View file

@ -22,12 +22,14 @@ import com.dropbox.android.external.store4.SourceOfTruth
import com.dropbox.android.external.store4.StoreResponse
import com.dropbox.flow.multicast.Multicaster
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEmpty
import kotlinx.coroutines.withContext
/**
* This class maintains one and only 1 fetcher for a given [Key].
@ -100,7 +102,9 @@ internal class FetcherController<Key : Any, Input : Any, Output : Any>(
try {
emitAll(fetcher.newDownstream(piggybackOnly))
} finally {
fetchers.release(key, fetcher)
withContext(NonCancellable) {
fetchers.release(key, fetcher)
}
}
}
}

View file

@ -18,13 +18,19 @@ package com.dropbox.android.external.store4
import com.dropbox.android.external.store4.StoreResponse.Data
import com.dropbox.android.external.store4.impl.FetcherController
import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.runBlockingTest
import org.junit.Test
@ -99,4 +105,40 @@ class FetcherControllerTest {
assertThat(fetcherController.fetcherSize()).isEqualTo(0)
assertThat(createdCnt).isEqualTo(1)
}
@Test
fun concurrent_when_cancelled() = runBlocking {
var createdCnt = 0
val job = SupervisorJob()
val scope = CoroutineScope(Dispatchers.Default + job)
val fetcherController = FetcherController<Int, Int, Int>(
scope = scope,
realFetcher = Fetcher.ofResultFlow { key: Int ->
createdCnt++
flow {
// make sure it takes time, otherwise, we may not share
delay(100)
emit(FetcherResult.Data(key * key) as FetcherResult<Int>)
}
},
sourceOfTruth = null
)
val fetcherCount = 20
fun createFetcher() = scope.launch {
fetcherController.getFetcher(3)
.onEach {
assertThat(fetcherController.fetcherSize()).isEqualTo(1)
}.first()
}
(0 until fetcherCount).map {
createFetcher()
}
delay(50)
job.cancelChildren()
delay(50)
assertThat(fetcherController.fetcherSize()).isEqualTo(0)
assertThat(createdCnt).isEqualTo(1)
}
}