Update PeopleInSpaceRepo to use KMP-NativeCoroutines

This commit is contained in:
Rick Clephas 2021-11-06 12:32:53 +01:00
parent a41f29dcbc
commit 163882c121

View file

@ -1,6 +1,7 @@
package com.surrus.common.repository
import co.touchlab.kermit.Kermit
import com.rickclephas.kmp.nativecoroutines.NativeCoroutineScope
import com.squareup.sqldelight.runtime.coroutines.asFlow
import com.squareup.sqldelight.runtime.coroutines.mapToList
import com.surrus.common.di.PeopleInSpaceDatabaseWrapper
@ -11,7 +12,6 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.koin.core.component.KoinComponent
import org.koin.core.component.inject
import kotlin.coroutines.CoroutineContext
interface PeopleInSpaceRepositoryInterface {
fun fetchPeopleAsFlow(): Flow<List<Assignment>>
@ -24,12 +24,11 @@ class PeopleInSpaceRepository : KoinComponent, PeopleInSpaceRepositoryInterface
private val peopleInSpaceApi: PeopleInSpaceApi by inject()
private val logger: Kermit by inject()
@NativeCoroutineScope
private val coroutineScope: CoroutineScope = MainScope()
private val peopleInSpaceDatabase: PeopleInSpaceDatabaseWrapper by inject()
private val peopleInSpaceQueries = peopleInSpaceDatabase.instance?.peopleInSpaceQueries
var peopleJob: Job? = null
init {
coroutineScope.launch {
fetchAndStorePeople()
@ -74,51 +73,22 @@ class PeopleInSpaceRepository : KoinComponent, PeopleInSpaceRepositoryInterface
// Used by web client atm
override suspend fun fetchPeople(): List<Assignment> = peopleInSpaceApi.fetchPeople().people
// called from Kotlin/Native clients
fun startObservingPeopleUpdates(success: (List<Assignment>) -> Unit) {
logger.d { "startObservingPeopleUpdates" }
peopleJob = coroutineScope.launch {
fetchPeopleAsFlow().collect {
success(it)
override fun pollISSPosition(): Flow<IssPosition> {
// The returned will be frozen in Kotlin Native. We can't freeze the Koin internals
// so we'll use local variables to prevent the Koin internals from freezing.
val api = peopleInSpaceApi
val logger = logger
return flow {
while (true) {
val position = api.fetchISSPosition().iss_position
emit(position)
logger.d("PeopleInSpaceRepository") { position.toString() }
delay(POLL_INTERVAL)
}
}
}
fun stopObservingPeopleUpdates() {
logger.d { "stopObservingPeopleUpdates, peopleJob = $peopleJob" }
peopleJob?.cancel()
}
override fun pollISSPosition(): Flow<IssPosition> = flow {
while (true) {
val position = peopleInSpaceApi.fetchISSPosition().iss_position
emit(position)
logger.d("PeopleInSpaceRepository") { position.toString() }
delay(POLL_INTERVAL)
}
}
val iosScope: CoroutineScope = object : CoroutineScope {
override val coroutineContext: CoroutineContext
get() = SupervisorJob() + Dispatchers.Main
}
fun iosPollISSPosition() = KotlinNativeFlowWrapper(pollISSPosition())
companion object {
private const val POLL_INTERVAL = 10000L
}
}
class KotlinNativeFlowWrapper<T>(private val flow: Flow<T>) {
fun subscribe(
scope: CoroutineScope,
onEach: (item: T) -> Unit,
onComplete: () -> Unit,
onThrow: (error: Throwable) -> Unit
) = flow
.onEach { onEach(it) }
.catch { onThrow(it) }
.onCompletion { onComplete() }
.launchIn(scope)
}