Fix App Crash (UndeliverableException - RxJava2)

This commit is contained in:
David Luhmer 2017-05-28 23:31:01 +02:00
parent d3c3b709a9
commit 8d4b2e2a70
2 changed files with 167 additions and 131 deletions

View file

@ -7,6 +7,9 @@ import com.google.gson.JsonObject;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonToken;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
@ -31,7 +34,7 @@ import okio.BufferedSource;
/**
* onNext returns the current amount of synced items
*/
public class RssItemObservable extends Observable<Integer> {
public class RssItemObservable implements Publisher<Integer> {
private DatabaseConnectionOrm mDbConn;
private API mApi;
@ -46,127 +49,130 @@ public class RssItemObservable extends Observable<Integer> {
}
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
public void subscribe(Subscriber<? super Integer> s) {
try {
mDbConn.clearDatabaseOverSize();
//String authKey = AuthenticationManager.getGoogleAuthKey(username, password);
//SharedPreferences mPrefs = PreferenceManager.getDefaultSharedPreferences(context);
//int maxItemsInDatabase = Integer.parseInt(mPrefs.getString(SettingsActivity.SP_MAX_ITEMS_SYNC, "200"));
long lastModified = mDbConn.getLastModified();
//dbConn.clearDatabaseOverSize();
int requestCount = 0;
int totalCount = 0;
int maxSyncSize = maxSizePerSync;
if(lastModified == 0)//Only on first sync
{
long offset = 0;
Log.v(TAG, "First sync!!");
int maxItemsInDatabase = Constants.maxItemsCount;
do {
Log.v(TAG, "offset=" + offset + ", requestCount=" + requestCount + "");
List<RssItem> buffer = (mApi.items(maxSyncSize, offset, Integer.valueOf(FeedItemTags.ALL.toString()), 0, false, true).execute().body());
requestCount = 0;
if(buffer != null) {
requestCount = buffer.size();
performDatabaseBatchInsert(mDbConn, buffer);
}
if(requestCount > 0)
offset = mDbConn.getHighestItemId();
totalCount += requestCount;
observer.onNext(totalCount);
} while(requestCount == maxSyncSize);
Log.v(TAG, "offset=" + offset + ", requestCount=" + requestCount + ", maxSyncSize=" + maxSyncSize);
Log.v(TAG, "Sync all items done - Starting starred now");
mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, totalCount).apply();
do {
offset = mDbConn.getLowestItemId(true);
List<RssItem> buffer = mApi.items(maxSyncSize, offset, Integer.valueOf(FeedItemTags.ALL_STARRED.toString()), 0, false, true).execute().body();
requestCount = 0;
if(buffer != null) {
requestCount = buffer.size();
performDatabaseBatchInsert(mDbConn, buffer);
}
//if(requestCount > 0)
// offset = dbConn.getLowestItemId(true);
totalCount += requestCount;
observer.onNext(totalCount);
} while(requestCount == maxSyncSize && totalCount < maxItemsInDatabase);
}
else
{
Log.v(TAG, "Incremental sync!!");
//First reset the count of last updated items
mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, 0).apply();
long highestItemIdBeforeSync = mDbConn.getHighestItemId();
//Get all updated items
mApi.updatedItems(lastModified+1, Integer.valueOf(FeedItemTags.ALL.toString()), highestItemIdBeforeSync)
.flatMap(new Function<ResponseBody, ObservableSource<RssItem>>() {
@Override
public ObservableSource<RssItem> apply(@NonNull ResponseBody responseBody) throws Exception {
return events(responseBody.source());
}
})
.subscribe(new Observer<RssItem>() {
int totalUpdatedUnreadItemCount = 0;
final int bufferSize = 150;
List<RssItem> buffer = new ArrayList<>(bufferSize); //Buffer of size X
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.v(TAG, "onSubscribe() called with: d = [" + d + "]");
}
@Override
public void onNext(@NonNull RssItem rssItem) {
if(!rssItem.getRead()) { //If updates item is unread
totalUpdatedUnreadItemCount++;
}
buffer.add(rssItem);
if (buffer.size() >= bufferSize) {
performDatabaseBatchInsert(mDbConn, buffer);
}
}
@Override
public void onError(@NonNull Throwable e) {
Log.v(TAG, "onError() called with: e = [" + e + "]");
}
@Override
public void onComplete() {
Log.v(TAG, "onComplete() called");
performDatabaseBatchInsert(mDbConn, buffer);
//If no exception occurs, set the number of updated items
mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, totalUpdatedUnreadItemCount).apply();
}
});
}
//throw new RuntimeException("");
sync(s);
s.onComplete();
} catch (Exception ex) {
ex.printStackTrace();
s.onError(ex);
}
}
observer.onComplete();
public void sync(Subscriber<? super Integer> subscriber) throws IOException {
mDbConn.clearDatabaseOverSize();
//String authKey = AuthenticationManager.getGoogleAuthKey(username, password);
//SharedPreferences mPrefs = PreferenceManager.getDefaultSharedPreferences(context);
//int maxItemsInDatabase = Integer.parseInt(mPrefs.getString(SettingsActivity.SP_MAX_ITEMS_SYNC, "200"));
long lastModified = mDbConn.getLastModified();
//dbConn.clearDatabaseOverSize();
int requestCount = 0;
int totalCount = 0;
int maxSyncSize = maxSizePerSync;
if(lastModified == 0)//Only on first sync
{
long offset = 0;
Log.v(TAG, "First sync!!");
int maxItemsInDatabase = Constants.maxItemsCount;
do {
Log.v(TAG, "offset=" + offset + ", requestCount=" + requestCount + "");
List<RssItem> buffer = (mApi.items(maxSyncSize, offset, Integer.valueOf(FeedItemTags.ALL.toString()), 0, false, true).execute().body());
requestCount = 0;
if(buffer != null) {
requestCount = buffer.size();
performDatabaseBatchInsert(mDbConn, buffer);
}
if(requestCount > 0)
offset = mDbConn.getHighestItemId();
totalCount += requestCount;
subscriber.onNext(totalCount);
} while(requestCount == maxSyncSize);
Log.v(TAG, "offset=" + offset + ", requestCount=" + requestCount + ", maxSyncSize=" + maxSyncSize);
Log.v(TAG, "Sync all items done - Starting starred now");
mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, totalCount).apply();
do {
offset = mDbConn.getLowestItemId(true);
List<RssItem> buffer = mApi.items(maxSyncSize, offset, Integer.valueOf(FeedItemTags.ALL_STARRED.toString()), 0, false, true).execute().body();
requestCount = 0;
if(buffer != null) {
requestCount = buffer.size();
performDatabaseBatchInsert(mDbConn, buffer);
}
//if(requestCount > 0)
// offset = dbConn.getLowestItemId(true);
totalCount += requestCount;
subscriber.onNext(totalCount);
} while(requestCount == maxSyncSize && totalCount < maxItemsInDatabase);
}
else
{
Log.v(TAG, "Incremental sync!!");
//First reset the count of last updated items
mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, 0).apply();
long highestItemIdBeforeSync = mDbConn.getHighestItemId();
//Get all updated items
mApi.updatedItems(lastModified+1, Integer.valueOf(FeedItemTags.ALL.toString()), highestItemIdBeforeSync)
.flatMap(new Function<ResponseBody, ObservableSource<RssItem>>() {
@Override
public ObservableSource<RssItem> apply(@NonNull ResponseBody responseBody) throws Exception {
return events(responseBody.source());
}
})
.subscribe(new Observer<RssItem>() {
int totalUpdatedUnreadItemCount = 0;
final int bufferSize = 150;
List<RssItem> buffer = new ArrayList<>(bufferSize); //Buffer of size X
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.v(TAG, "onSubscribe() called with: d = [" + d + "]");
}
@Override
public void onNext(@NonNull RssItem rssItem) {
if(!rssItem.getRead()) { //If updates item is unread
totalUpdatedUnreadItemCount++;
}
buffer.add(rssItem);
if (buffer.size() >= bufferSize) {
performDatabaseBatchInsert(mDbConn, buffer);
}
}
@Override
public void onError(@NonNull Throwable e) {
Log.v(TAG, "onError() called with: e = [" + e + "]");
}
@Override
public void onComplete() {
Log.v(TAG, "onComplete() called");
performDatabaseBatchInsert(mDbConn, buffer);
//If no exception occurs, set the number of updated items
mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, totalUpdatedUnreadItemCount).apply();
}
});
}
}
public static boolean performDatabaseBatchInsert(DatabaseConnectionOrm dbConn, List<RssItem> buffer) {
@ -197,7 +203,6 @@ public class RssItemObservable extends Observable<Integer> {
}
}
reader.beginArray();
while (reader.hasNext()) {
JsonObject jsonObj = getJsonObjectFromReader(reader);
@ -275,5 +280,4 @@ public class RssItemObservable extends Observable<Integer> {
}
return null;
}
}

View file

@ -36,9 +36,10 @@ import android.widget.Toast;
import org.apache.commons.lang3.time.StopWatch;
import org.greenrobot.eventbus.EventBus;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import java.util.List;
import java.util.concurrent.Callable;
import javax.inject.Inject;
@ -58,6 +59,7 @@ import de.luhmer.owncloudnewsreader.reader.nextcloud.RssItemObservable;
import de.luhmer.owncloudnewsreader.services.events.SyncFailedEvent;
import de.luhmer.owncloudnewsreader.services.events.SyncFinishedEvent;
import de.luhmer.owncloudnewsreader.services.events.SyncStartedEvent;
import de.luhmer.owncloudnewsreader.ssl.MemorizingTrustManager;
import de.luhmer.owncloudnewsreader.ssl.OkHttpSSLClient;
import de.luhmer.owncloudnewsreader.widget.WidgetProvider;
import io.reactivex.Observable;
@ -94,6 +96,7 @@ public class OwnCloudSyncService extends Service {
@Inject SharedPreferences mPrefs;
@Inject ApiProvider mApi;
@Inject MemorizingTrustManager mMTM;
public void startSync() {
@ -133,12 +136,14 @@ public class OwnCloudSyncService extends Service {
}
private class SyncResult {
SyncResult(List<Folder> folders, List<Feed> feeds) {
SyncResult(List<Folder> folders, List<Feed> feeds, Boolean stateSyncSuccessful) {
this.folders = folders;
this.feeds = feeds;
this.stateSyncSuccessful = stateSyncSuccessful;
}
List<Folder> folders;
List<Feed> feeds;
boolean stateSyncSuccessful;
}
//Sync state of items e.g. read/unread/starred/unstarred
@ -146,15 +151,41 @@ public class OwnCloudSyncService extends Service {
syncStopWatch = new StopWatch();
syncStopWatch.start();
//Delete all pinned/stored SSL Certificates
/*
final ArrayList<String> aliases = Collections.list(mMTM.getCertificates());
for(int i = 0; i < aliases.size(); i++) {
try {
mMTM.deleteCertificate(aliases.get(i));
} catch (KeyStoreException e) {
e.printStackTrace();
}
}*/
final DatabaseConnectionOrm dbConn = new DatabaseConnectionOrm(OwnCloudSyncService.this);
Observable rssStateSync = Observable.fromCallable(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
ItemStateSync.PerformItemStateSync(mApi.getAPI(), dbConn);
return true;
}
}).subscribeOn(Schedulers.newThread());
Observable rssStateSync = Observable.fromPublisher(
new Publisher() {
@Override
public void subscribe(Subscriber s) {
try {
ItemStateSync.PerformItemStateSync(mApi.getAPI(), dbConn);
s.onNext(true);
s.onComplete();
} catch(Exception ex) {
s.onError(ex);
}
}
}).subscribeOn(Schedulers.newThread());
// First sync Feeds and Folders and rss item states (in parallel)
Observable<List<Folder>> folderObservable = mApi
@ -171,7 +202,8 @@ public class OwnCloudSyncService extends Service {
Observable<SyncResult> combined = Observable.zip(folderObservable, feedsObservable, rssStateSync, new Function3<List<Folder>, List<Feed>, Boolean, SyncResult>() {
@Override
public SyncResult apply(@NonNull List<Folder> folders, @NonNull List<Feed> feeds, @NonNull Boolean mRes) throws Exception {
return new SyncResult(folders, feeds);
Log.v(TAG, "apply() called with: folders = [" + folders + "], feeds = [" + feeds + "], mRes = [" + mRes + "]");
return new SyncResult(folders, feeds, mRes);
}
});
@ -198,7 +230,7 @@ public class OwnCloudSyncService extends Service {
private void syncRssItems(final DatabaseConnectionOrm dbConn) {
new RssItemObservable(dbConn, mApi.getAPI(), mPrefs)
Observable.fromPublisher(new RssItemObservable(dbConn, mApi.getAPI(), mPrefs))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {