Add JMAP message sync (part 2) - delta sync
This commit is contained in:
parent
5dce1101ed
commit
b78c2e295e
8 changed files with 227 additions and 28 deletions
|
@ -270,11 +270,15 @@ class K9BackendFolder(
|
|||
}
|
||||
}
|
||||
|
||||
override fun setFolderExtraString(name: String, value: String) {
|
||||
override fun setFolderExtraString(name: String, value: String?) {
|
||||
database.execute(false) { db ->
|
||||
val contentValues = ContentValues().apply {
|
||||
put("name", name)
|
||||
put("value_text", value)
|
||||
if (value != null) {
|
||||
put("value_text", value)
|
||||
} else {
|
||||
putNull("value_text")
|
||||
}
|
||||
put("folder_id", databaseId)
|
||||
}
|
||||
db.insertWithOnConflict("folder_extra_values", null, contentValues, SQLiteDatabase.CONFLICT_REPLACE)
|
||||
|
|
|
@ -28,7 +28,7 @@ interface BackendFolder {
|
|||
fun setLatestOldMessageSeenTime(date: Date)
|
||||
fun getOldestMessageDate(): Date?
|
||||
fun getFolderExtraString(name: String): String?
|
||||
fun setFolderExtraString(name: String, value: String)
|
||||
fun setFolderExtraString(name: String, value: String?)
|
||||
fun getFolderExtraNumber(name: String): Long?
|
||||
fun setFolderExtraNumber(name: String, value: Long)
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ import okhttp3.HttpUrl
|
|||
import okhttp3.OkHttpClient
|
||||
import okhttp3.Request
|
||||
import rs.ltt.jmap.client.JmapClient
|
||||
import rs.ltt.jmap.client.api.MethodErrorResponseException
|
||||
import rs.ltt.jmap.client.api.UnauthorizedException
|
||||
import rs.ltt.jmap.client.http.HttpAuthentication
|
||||
import rs.ltt.jmap.client.session.Session
|
||||
|
@ -19,8 +20,10 @@ import rs.ltt.jmap.common.entity.capability.CoreCapability
|
|||
import rs.ltt.jmap.common.entity.filter.EmailFilterCondition
|
||||
import rs.ltt.jmap.common.entity.query.EmailQuery
|
||||
import rs.ltt.jmap.common.method.call.email.GetEmailMethodCall
|
||||
import rs.ltt.jmap.common.method.call.email.QueryChangesEmailMethodCall
|
||||
import rs.ltt.jmap.common.method.call.email.QueryEmailMethodCall
|
||||
import rs.ltt.jmap.common.method.response.email.GetEmailMethodResponse
|
||||
import rs.ltt.jmap.common.method.response.email.QueryChangesEmailMethodResponse
|
||||
import rs.ltt.jmap.common.method.response.email.QueryEmailMethodResponse
|
||||
import timber.log.Timber
|
||||
|
||||
|
@ -39,8 +42,12 @@ class CommandSync(
|
|||
|
||||
val limit = if (backendFolder.visibleLimit > 0) backendFolder.visibleLimit.toLong() else null
|
||||
|
||||
// FIXME: Only do full sync when we can't do a delta sync
|
||||
fullSync(backendFolder, folderServerId, limit, listener)
|
||||
val queryState = backendFolder.getFolderExtraString(EXTRA_QUERY_STATE)
|
||||
if (queryState == null) {
|
||||
fullSync(backendFolder, folderServerId, limit, listener)
|
||||
} else {
|
||||
deltaSync(backendFolder, folderServerId, limit, queryState, listener)
|
||||
}
|
||||
|
||||
listener.syncFinished(folderServerId)
|
||||
} catch (e: UnauthorizedException) {
|
||||
|
@ -63,17 +70,82 @@ class CommandSync(
|
|||
} else {
|
||||
Timber.d("Fetching all messages in %s (%s)", backendFolder.name, folderServerId)
|
||||
}
|
||||
val remoteServerIds = fetchMessageServerIds(folderServerId, limit)
|
||||
|
||||
val destroyServerIds = cachedServerIds - remoteServerIds
|
||||
if (destroyServerIds.isNotEmpty()) {
|
||||
Timber.d("Removing messages no longer on server: %s", destroyServerIds)
|
||||
backendFolder.destroyMessages(destroyServerIds.toList())
|
||||
val queryEmailMethod = QueryEmailMethodCall(accountId, createEmailQuery(folderServerId), limit)
|
||||
val queryEmailCall = jmapClient.call(queryEmailMethod)
|
||||
val queryEmailResponse = queryEmailCall.getMainResponseBlocking<QueryEmailMethodResponse>()
|
||||
val queryState = if (queryEmailResponse.isCanCalculateChanges) queryEmailResponse.queryState else null
|
||||
val remoteServerIds = queryEmailResponse.ids.toSet()
|
||||
|
||||
val destroyServerIds = (cachedServerIds - remoteServerIds).toList()
|
||||
val newServerIds = remoteServerIds - cachedServerIds
|
||||
|
||||
handleFolderUpdates(backendFolder, folderServerId, destroyServerIds, newServerIds, queryState, listener)
|
||||
|
||||
// TODO: Refresh flags of messages we've already downloaded before
|
||||
}
|
||||
|
||||
private fun createEmailQuery(folderServerId: String): EmailQuery? {
|
||||
val filter = EmailFilterCondition.builder()
|
||||
.inMailbox(folderServerId)
|
||||
.build()
|
||||
|
||||
// FIXME: Add sort parameter
|
||||
return EmailQuery.of(filter)
|
||||
}
|
||||
|
||||
private fun deltaSync(
|
||||
backendFolder: BackendFolder,
|
||||
folderServerId: String,
|
||||
limit: Long?,
|
||||
queryState: String,
|
||||
listener: SyncListener
|
||||
) {
|
||||
Timber.d("Updating messages in %s (%s)", backendFolder.name, folderServerId)
|
||||
|
||||
val emailQuery = createEmailQuery(folderServerId)
|
||||
val queryChangesEmailMethod = QueryChangesEmailMethodCall(accountId, queryState, emailQuery)
|
||||
val queryChangesEmailCall = jmapClient.call(queryChangesEmailMethod)
|
||||
|
||||
val queryChangesEmailResponse = try {
|
||||
queryChangesEmailCall.getMainResponseBlocking<QueryChangesEmailMethodResponse>()
|
||||
} catch (e: MethodErrorResponseException) {
|
||||
if (e.methodErrorResponse.type == ERROR_CANNOT_CALCULATE_CHANGES) {
|
||||
Timber.d("Server responded with '$ERROR_CANNOT_CALCULATE_CHANGES'; switching to full sync")
|
||||
|
||||
backendFolder.saveQueryState(null)
|
||||
fullSync(backendFolder, folderServerId, limit, listener)
|
||||
return
|
||||
}
|
||||
|
||||
throw e
|
||||
}
|
||||
|
||||
val destroyServerIds = queryChangesEmailResponse.removed.toList()
|
||||
val newServerIds = queryChangesEmailResponse.added.map { it.item }.toSet()
|
||||
val newQueryState = queryChangesEmailResponse.newQueryState
|
||||
|
||||
handleFolderUpdates(backendFolder, folderServerId, destroyServerIds, newServerIds, newQueryState, listener)
|
||||
|
||||
// TODO: Refresh flags of messages we've already downloaded before
|
||||
}
|
||||
|
||||
private fun handleFolderUpdates(
|
||||
backendFolder: BackendFolder,
|
||||
folderServerId: String,
|
||||
destroyServerIds: List<String>,
|
||||
newServerIds: Set<String>,
|
||||
newQueryState: String?,
|
||||
listener: SyncListener
|
||||
) {
|
||||
if (destroyServerIds.isNotEmpty()) {
|
||||
Timber.d("Removing messages no longer on server: %s", destroyServerIds)
|
||||
backendFolder.destroyMessages(destroyServerIds)
|
||||
}
|
||||
|
||||
val newServerIds = remoteServerIds - cachedServerIds
|
||||
if (newServerIds.isEmpty()) {
|
||||
Timber.d("No new messages on server")
|
||||
backendFolder.saveQueryState(newQueryState)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -101,21 +173,7 @@ class CommandSync(
|
|||
listener.syncProgress(folderServerId, index + 1, total)
|
||||
}
|
||||
|
||||
// TODO: Refresh flags of messages we've already downloaded before
|
||||
}
|
||||
|
||||
private fun fetchMessageServerIds(folderServerId: String, limit: Long?): Set<String> {
|
||||
val filter = EmailFilterCondition.builder()
|
||||
.inMailbox(folderServerId)
|
||||
.build()
|
||||
|
||||
// FIXME: Add sort parameter
|
||||
val queryEmailMethod = QueryEmailMethodCall(accountId, EmailQuery.of(filter), limit)
|
||||
val queryEmailCall = jmapClient.call(queryEmailMethod)
|
||||
|
||||
val queryEmailResponse = queryEmailCall.getMainResponseBlocking<QueryEmailMethodResponse>()
|
||||
|
||||
return queryEmailResponse.ids.toSet()
|
||||
backendFolder.saveQueryState(newQueryState)
|
||||
}
|
||||
|
||||
private fun fetchMessageInfo(session: Session, maxObjectsInGet: Int, emailIds: Set<String>): List<MessageInfo> {
|
||||
|
@ -194,6 +252,15 @@ class CommandSync(
|
|||
val coreCapability = getCapability(CoreCapability::class.java)
|
||||
return minOf(Int.MAX_VALUE.toLong(), coreCapability.maxObjectsInGet).toInt()
|
||||
}
|
||||
|
||||
private fun BackendFolder.saveQueryState(queryState: String?) {
|
||||
setFolderExtraString(EXTRA_QUERY_STATE, queryState)
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val EXTRA_QUERY_STATE = "jmapQueryState"
|
||||
private const val ERROR_CANNOT_CALCULATE_CHANGES = "cannotCalculateChanges"
|
||||
}
|
||||
}
|
||||
|
||||
private data class MessageInfo(
|
||||
|
|
|
@ -58,6 +58,7 @@ class CommandSyncTest {
|
|||
"M001" to "/jmap_responses/blob/email/email_1.eml",
|
||||
"M002" to "/jmap_responses/blob/email/email_2.eml"
|
||||
)
|
||||
backendFolder.assertQueryState("50:0")
|
||||
syncListener.assertSyncEvents(
|
||||
SyncListenerEvent.SyncStarted(FOLDER_SERVER_ID),
|
||||
SyncListenerEvent.SyncProgress(FOLDER_SERVER_ID, completed = 1, total = 2),
|
||||
|
@ -133,6 +134,72 @@ class CommandSyncTest {
|
|||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun deltaSyncWithoutChanges() {
|
||||
val backendFolder = backendStorage.getFolder(FOLDER_SERVER_ID)
|
||||
backendFolder.createMessages(
|
||||
"M001" to "/jmap_responses/blob/email/email_1.eml",
|
||||
"M002" to "/jmap_responses/blob/email/email_2.eml"
|
||||
)
|
||||
backendFolder.setQueryState("50:0")
|
||||
val command = createCommandSync(
|
||||
responseBodyFromResource("/jmap_responses/session/valid_session.json"),
|
||||
responseBodyFromResource("/jmap_responses/email/email_query_changes_empty_result.json")
|
||||
)
|
||||
|
||||
command.sync(FOLDER_SERVER_ID, syncListener)
|
||||
|
||||
assertEquals(setOf("M001", "M002"), backendFolder.getMessageServerIds())
|
||||
backendFolder.assertQueryState("50:0")
|
||||
syncListener.assertSyncEvents(
|
||||
SyncListenerEvent.SyncStarted(FOLDER_SERVER_ID),
|
||||
SyncListenerEvent.SyncFinished(FOLDER_SERVER_ID)
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun deltaSyncWithLocalMessagesAndDifferentMessagesInRemoteMailbox() {
|
||||
val backendFolder = backendStorage.getFolder(FOLDER_SERVER_ID)
|
||||
backendFolder.createMessages(
|
||||
"M001" to "/jmap_responses/blob/email/email_1.eml",
|
||||
"M002" to "/jmap_responses/blob/email/email_2.eml"
|
||||
)
|
||||
backendFolder.setQueryState("50:0")
|
||||
val command = createCommandSync(
|
||||
responseBodyFromResource("/jmap_responses/session/valid_session.json"),
|
||||
responseBodyFromResource("/jmap_responses/email/email_query_changes_M001_deleted_M003_added.json"),
|
||||
responseBodyFromResource("/jmap_responses/email/email_get_ids_M003.json"),
|
||||
responseBodyFromResource("/jmap_responses/blob/email/email_3.eml")
|
||||
)
|
||||
|
||||
command.sync(FOLDER_SERVER_ID, syncListener)
|
||||
|
||||
assertEquals(setOf("M002", "M003"), backendFolder.getMessageServerIds())
|
||||
backendFolder.assertQueryState("51:0")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun deltaSyncCannotCalculateChanges() {
|
||||
val backendFolder = backendStorage.getFolder(FOLDER_SERVER_ID)
|
||||
backendFolder.createMessages(
|
||||
"M001" to "/jmap_responses/blob/email/email_1.eml",
|
||||
"M002" to "/jmap_responses/blob/email/email_2.eml"
|
||||
)
|
||||
backendFolder.setQueryState("10:0")
|
||||
val command = createCommandSync(
|
||||
responseBodyFromResource("/jmap_responses/session/valid_session.json"),
|
||||
responseBodyFromResource("/jmap_responses/email/email_query_changes_cannot_calculate_changes_error.json"),
|
||||
responseBodyFromResource("/jmap_responses/email/email_query_M002_and_M003.json"),
|
||||
responseBodyFromResource("/jmap_responses/email/email_get_ids_M003.json"),
|
||||
responseBodyFromResource("/jmap_responses/blob/email/email_3.eml")
|
||||
)
|
||||
|
||||
command.sync(FOLDER_SERVER_ID, syncListener)
|
||||
|
||||
assertEquals(setOf("M002", "M003"), backendFolder.getMessageServerIds())
|
||||
backendFolder.assertQueryState("50:0")
|
||||
}
|
||||
|
||||
private fun createCommandSync(vararg mockResponses: MockResponse): CommandSync {
|
||||
val server = createMockWebServer(*mockResponses)
|
||||
return createCommandSync(server.url("/jmap/"))
|
||||
|
@ -155,6 +222,14 @@ class CommandSyncTest {
|
|||
assertEquals(expected, requestUrlPath)
|
||||
}
|
||||
|
||||
private fun InMemoryBackendFolder.assertQueryState(expected: String) {
|
||||
assertEquals(expected, getFolderExtraString("jmapQueryState"))
|
||||
}
|
||||
|
||||
private fun InMemoryBackendFolder.setQueryState(queryState: String) {
|
||||
setFolderExtraString("jmapQueryState", queryState)
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val FOLDER_SERVER_ID = "id_folder"
|
||||
private const val USERNAME = "username"
|
||||
|
|
|
@ -128,8 +128,12 @@ class InMemoryBackendFolder(override var name: String, var type: FolderType) : B
|
|||
|
||||
override fun getFolderExtraString(name: String): String? = extraStrings[name]
|
||||
|
||||
override fun setFolderExtraString(name: String, value: String) {
|
||||
extraStrings[name] = value
|
||||
override fun setFolderExtraString(name: String, value: String?) {
|
||||
if (value != null) {
|
||||
extraStrings[name] = value
|
||||
} else {
|
||||
extraStrings.remove(name)
|
||||
}
|
||||
}
|
||||
|
||||
override fun getFolderExtraNumber(name: String): Long? = extraNumbers[name]
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
{
|
||||
"methodResponses": [
|
||||
[
|
||||
"Email/queryChanges",
|
||||
{
|
||||
"accountId": "test@example.com",
|
||||
"oldQueryState": "50:0",
|
||||
"newQueryState": "51:0",
|
||||
"removed": ["M001"],
|
||||
"added": [
|
||||
{
|
||||
"id": "M003",
|
||||
"index": 1
|
||||
}
|
||||
]
|
||||
},
|
||||
"0"
|
||||
]
|
||||
],
|
||||
"sessionState": "0"
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
{
|
||||
"methodResponses": [
|
||||
[
|
||||
"error",
|
||||
{
|
||||
"type": "cannotCalculateChanges"
|
||||
},
|
||||
"0"
|
||||
]
|
||||
],
|
||||
"sessionState": "0"
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
{
|
||||
"methodResponses": [
|
||||
[
|
||||
"Email/queryChanges",
|
||||
{
|
||||
"accountId": "test@example.com",
|
||||
"oldQueryState": "50:0",
|
||||
"newQueryState": "50:0",
|
||||
"removed": [],
|
||||
"added": []
|
||||
},
|
||||
"0"
|
||||
]
|
||||
],
|
||||
"sessionState": "0"
|
||||
}
|
Loading…
Reference in a new issue