Port to RxJava 2 (#155)

* Port Store artifact to RxJava 2
This commit is contained in:
Maksim Moiseikin 2017-03-23 21:29:47 +01:00 committed by Mike Nakhimovich
parent de6f55ed1b
commit a9ed871834
68 changed files with 427 additions and 345 deletions

View file

@ -28,6 +28,10 @@ android {
abortOnError false
disable 'InvalidPackage'
}
packagingOptions {
exclude 'META-INF/rxjava.properties'
}
}
dependencies {
@ -40,21 +44,20 @@ dependencies {
compile libraries.supportDesign
compile libraries.retrofit
compile libraries.retrofitGsonConverter
compile libraries.retrofitRx
compile libraries.retrofitRx2
compile libraries.picasso
compile libraries.guava
annotationProcessor libraries.immutablesValue // <-- for annotation processor
provided libraries.immutablesValue // <-- for annotation API
provided libraries.immutablesGson // for annotations
compile 'com.nytimes.android:store:2.0.2'
compile'com.nytimes.android:cache:2.0.2'
compile'com.nytimes.android:middleware:2.0.2'
compile 'com.nytimes.android:filesystem:2.0.2'
//compile project(path: ':store')
//compile project(path: ':cache')
//compile project(path: ':middleware')
//compile project(path: ':filesystem')
//compile 'com.nytimes.android:store:2.0.2'
//compile'com.nytimes.android:cache:2.0.2'
//compile'com.nytimes.android:middleware:2.0.2'
//compile 'com.nytimes.android:filesystem:2.0.2'
compile project(path: ':store')
compile project(path: ':cache')
compile project(path: ':middleware')
compile project(path: ':filesystem')
retrolambdaConfig libraries.retrolambda
compile libraries.rxAndroid
compile libraries.rxAndroid2
}

View file

@ -1,12 +1,10 @@
package com.nytimes.android.sample;
import android.app.Application;
import android.util.Log;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.nytimes.android.external.fs.SourcePersisterFactory;
import com.nytimes.android.external.store.base.Fetcher;
import com.nytimes.android.external.store.base.Persister;
import com.nytimes.android.external.store.base.impl.BarCode;
import com.nytimes.android.external.store.base.impl.MemoryPolicy;
@ -20,13 +18,11 @@ import com.nytimes.android.sample.data.remote.Api;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import io.reactivex.Observable;
import okio.BufferedSource;
import retrofit2.GsonConverterFactory;
import retrofit2.Retrofit;
import retrofit2.RxJavaCallAdapterFactory;
import rx.Observable;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.gson.GsonConverterFactory;
public class SampleApp extends Application {
@ -63,11 +59,11 @@ public class SampleApp extends Application {
return StoreBuilder.<RedditData>barcode()
.fetcher(barCode -> provideRetrofit().fetchSubreddit(barCode.getKey(), "10"))
.memoryPolicy(
MemoryPolicy
.builder()
.setExpireAfter(10)
.setExpireAfterTimeUnit(TimeUnit.SECONDS)
.build()
MemoryPolicy
.builder()
.setExpireAfter(10)
.setExpireAfterTimeUnit(TimeUnit.SECONDS)
.build()
)
.open();
}
@ -93,7 +89,7 @@ public class SampleApp extends Application {
return new Retrofit.Builder()
.baseUrl("http://reddit.com/")
.addConverterFactory(GsonConverterFactory.create(provideGson()))
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.validateEagerly(BuildConfig.DEBUG) // Fail early: check Retrofit configuration at creation time in Debug build.
.build()
.create(Api.class);

View file

@ -18,9 +18,9 @@ import com.nytimes.android.sample.reddit.PostAdapter;
import java.util.List;
import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.schedulers.Schedulers;
import static android.widget.Toast.makeText;
@ -52,6 +52,7 @@ public class PersistingStoreActivity extends AppCompatActivity {
}
}
@SuppressWarnings("CheckReturnValue")
public void loadPosts() {
BarCode awwRequest = new BarCode(RedditData.class.getSimpleName(), "aww");
@ -74,7 +75,7 @@ public class PersistingStoreActivity extends AppCompatActivity {
}
private Observable<Post> sanitizeData(RedditData redditData) {
return Observable.from(redditData.data().children())
return Observable.fromIterable(redditData.data().children())
.map(Children::data);
}

View file

@ -19,9 +19,12 @@ import com.nytimes.android.sample.reddit.PostAdapter;
import java.util.List;
import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import static android.widget.Toast.makeText;
@ -51,12 +54,18 @@ public class StoreActivity extends AppCompatActivity {
}
}
@SuppressWarnings("CheckReturnValue")
public void loadPosts() {
BarCode awwRequest = new BarCode(RedditData.class.getSimpleName(), "aww");
this.nonPersistedStore
.get(awwRequest)
.flatMap(this::sanitizeData)
.flatMap(new Function<RedditData, ObservableSource<Post>>() {
@Override
public ObservableSource<Post> apply(@NonNull RedditData redditData) throws Exception {
return sanitizeData(redditData);
}
})
.toList()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
@ -74,7 +83,7 @@ public class StoreActivity extends AppCompatActivity {
}
private Observable<Post> sanitizeData(RedditData redditData) {
return Observable.from(redditData.data().children())
return Observable.fromIterable(redditData.data().children())
.map(Children::data);
}

View file

@ -2,12 +2,11 @@ package com.nytimes.android.sample.data.remote;
import com.nytimes.android.sample.data.model.RedditData;
import okhttp3.Response;
import io.reactivex.Observable;
import okhttp3.ResponseBody;
import retrofit2.http.GET;
import retrofit2.http.Path;
import retrofit2.http.Query;
import rx.Observable;
public interface Api {

View file

@ -7,7 +7,8 @@ import com.nytimes.android.external.store.base.impl.StoreBuilder;
import org.junit.Before;
import org.junit.Test;
import rx.Observable;
import io.reactivex.Observable;
import static junit.framework.Assert.assertEquals;
@ -36,7 +37,7 @@ public class StoreIntegrationTest {
@Test
public void testRepeatedGet() throws Exception {
String first = testStore.get(BarCode.empty()).toBlocking().first();
String first = testStore.get(BarCode.empty()).blockingFirst();
assertEquals(first, "hello");
}

View file

@ -26,12 +26,14 @@ ext.versions = [
// Reactive.
rxJava : '1.2.6',
rxJava2 : '2.0.7',
rxJavaProGuardRules : '1.1.6.0',
rxJavaAsyncUtil : '0.21.0',
rxAndroid : '1.2.1',
rxAndroid2 : '2.0.1',
// Others.
retrofit : '2.0.0-beta3',
retrofit : '2.2.0',
retrolambda : '2.3.0',
dagger : '2.9',
jsr305 : '3.0.1',
@ -80,14 +82,16 @@ ext.libraries = [
// Reactive.
rxJava : "io.reactivex:rxjava:$versions.rxJava",
rxJava2 : "io.reactivex.rxjava2:rxjava:$versions.rxJava2",
rxJavaAsyncUtil : "io.reactivex:rxjava-async-util:$versions.rxJavaAsyncUtil",
rxJavaProGuardRules : "com.artemzin.rxjava:proguard-rules:$versions.rxJavaProGuardRules",
rxAndroid : "io.reactivex:rxandroid:$versions.rxAndroid",
rxAndroid2 : "io.reactivex.rxjava2:rxandroid:$versions.rxAndroid2",
// Others.
retrofit : "com.squareup.retrofit2:retrofit:$versions.retrofit",
retrofitGsonConverter : "com.squareup.retrofit2:converter-gson:$versions.retrofit",
retrofitRx : "com.squareup.retrofit2:adapter-rxjava:$versions.retrofit",
retrofitRx2 : "com.squareup.retrofit2:adapter-rxjava2:$versions.retrofit",
retrolambda : "net.orfjackal.retrolambda:retrolambda:$versions.retrolambda",
dagger : "com.google.dagger:dagger:$versions.dagger",
daggerCompiler : "com.google.dagger:dagger-compiler:$versions.dagger",

View file

@ -7,10 +7,10 @@ import java.io.FileNotFoundException;
import javax.annotation.Nonnull;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import okio.BufferedSource;
import rx.Emitter;
import rx.Observable;
import rx.functions.Action1;
/**
* FSReader is used when persisting from file system
@ -31,24 +31,24 @@ public class FSReader<T> implements DiskRead<BufferedSource, T> {
@Nonnull
@Override
public Observable<BufferedSource> read(@Nonnull final T key) {
return Observable.fromEmitter(new Action1<Emitter<BufferedSource>>() {
return Observable.create(new ObservableOnSubscribe<BufferedSource>() {
@Override
public void call(Emitter<BufferedSource> emitter) {
public void subscribe(ObservableEmitter<BufferedSource> emitter) {
String resolvedKey = pathResolver.resolve(key);
boolean exists = fileSystem.exists(resolvedKey);
if (exists) {
try {
BufferedSource bufferedSource = fileSystem.read(resolvedKey);
emitter.onNext(bufferedSource);
emitter.onCompleted();
emitter.onComplete();
} catch (FileNotFoundException e) {
emitter.onError(e);
}
} else {
emitter.onCompleted();
emitter.onComplete();
}
}
}, Emitter.BackpressureMode.NONE);
});
}
}

View file

@ -7,8 +7,9 @@ import java.util.concurrent.Callable;
import javax.annotation.Nonnull;
import io.reactivex.Observable;
import okio.BufferedSource;
import rx.Observable;
/**
* FSReader is used when persisting to file system
* PathResolver will be used in creating file system paths based on cache keys.

View file

@ -5,8 +5,9 @@ import com.nytimes.android.external.store.base.Persister;
import javax.annotation.Nonnull;
import io.reactivex.Observable;
import okio.BufferedSource;
import rx.Observable;
/**
* FileSystemPersister is used when persisting to/from file system
* PathResolver will be used in creating file system paths based on cache keys.

View file

@ -9,8 +9,8 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import io.reactivex.Observable;
import okio.BufferedSource;
import rx.Observable;
/**
* FileSystemRecordPersister is used when persisting to/from file system while being stale aware

View file

@ -8,8 +8,8 @@ import com.nytimes.android.external.store.base.impl.BarCode;
import javax.annotation.Nonnull;
import javax.inject.Inject;
import io.reactivex.Observable;
import okio.BufferedSource;
import rx.Observable;
/**
* Persister to be used when storing something to persister from a BufferedSource

View file

@ -47,25 +47,27 @@ public class FilePersisterTest {
.thenReturn(true);
when(fileSystem.read(resolvedPath)).thenReturn(bufferedSource);
BufferedSource returnedValue = fileSystemPersister.read(simple).toBlocking().single();
BufferedSource returnedValue = fileSystemPersister.read(simple).blockingFirst();
assertThat(returnedValue).isEqualTo(bufferedSource);
}
@Test
@SuppressWarnings("CheckReturnValue")
public void readDoesNotExist() throws FileNotFoundException {
expectedException.expect(NoSuchElementException.class);
when(fileSystem.exists(resolvedPath))
.thenReturn(false);
fileSystemPersister.read(simple).toBlocking().single();
fileSystemPersister.read(simple).blockingFirst();
}
@Test
@SuppressWarnings("CheckReturnValue")
public void writeThenRead() throws IOException {
when(fileSystem.read(resolvedPath)).thenReturn(bufferedSource);
when(fileSystem.exists(resolvedPath)).thenReturn(true);
fileSystemPersister.write(simple, bufferedSource).toBlocking().single();
BufferedSource source = fileSystemPersister.read(simple).toBlocking().first();
fileSystemPersister.write(simple, bufferedSource).blockingFirst();
BufferedSource source = fileSystemPersister.read(simple).blockingFirst();
InOrder inOrder = inOrder(fileSystem);
inOrder.verify(fileSystem).write(resolvedPath, bufferedSource);
inOrder.verify(fileSystem).exists(resolvedPath);

View file

@ -50,25 +50,27 @@ public class FileSystemRecordPersisterTest {
.thenReturn(true);
when(fileSystem.read(resolvedPath)).thenReturn(bufferedSource);
BufferedSource returnedValue = fileSystemPersister.read(simple).toBlocking().single();
BufferedSource returnedValue = fileSystemPersister.read(simple).blockingFirst();
assertThat(returnedValue).isEqualTo(bufferedSource);
}
@Test
@SuppressWarnings("CheckReturnValue")
public void readDoesNotExist() throws FileNotFoundException {
expectedException.expect(NoSuchElementException.class);
when(fileSystem.exists(resolvedPath))
.thenReturn(false);
fileSystemPersister.read(simple).toBlocking().single();
fileSystemPersister.read(simple).blockingFirst();
}
@Test
@SuppressWarnings("CheckReturnValue")
public void writeThenRead() throws IOException {
when(fileSystem.read(resolvedPath)).thenReturn(bufferedSource);
when(fileSystem.exists(resolvedPath)).thenReturn(true);
fileSystemPersister.write(simple, bufferedSource).toBlocking().single();
BufferedSource source = fileSystemPersister.read(simple).toBlocking().first();
fileSystemPersister.write(simple, bufferedSource).blockingFirst();
BufferedSource source = fileSystemPersister.read(simple).blockingFirst();
InOrder inOrder = inOrder(fileSystem);
inOrder.verify(fileSystem).write(resolvedPath, bufferedSource);
inOrder.verify(fileSystem).exists(resolvedPath);

View file

@ -45,7 +45,7 @@ public class RecordPersisterTest {
.thenReturn(true);
when(fileSystem.read(simple.toString())).thenReturn(bufferedSource);
BufferedSource returnedValue = sourcePersister.read(simple).toBlocking().single();
BufferedSource returnedValue = sourcePersister.read(simple).blockingFirst();
assertThat(returnedValue).isEqualTo(bufferedSource);
}
@ -74,17 +74,18 @@ public class RecordPersisterTest {
}
@Test
@SuppressWarnings("CheckReturnValue")
public void readDoesNotExist() throws FileNotFoundException {
expectedException.expect(NoSuchElementException.class);
when(fileSystem.exists(SourcePersister.pathForBarcode(simple)))
.thenReturn(false);
sourcePersister.read(simple).toBlocking().single();
sourcePersister.read(simple).blockingFirst();
}
@Test
public void write() throws IOException {
assertThat(sourcePersister.write(simple, bufferedSource).toBlocking().single()).isTrue();
assertThat(sourcePersister.write(simple, bufferedSource).blockingSingle()).isTrue();
}
@Test

View file

@ -14,9 +14,9 @@ import org.mockito.MockitoAnnotations;
import java.io.ByteArrayInputStream;
import io.reactivex.Observable;
import okio.BufferedSource;
import okio.Okio;
import rx.Observable;
import static com.google.common.base.Charsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
@ -65,9 +65,9 @@ public class SourceDiskDaoStoreTest {
when(diskDAO.write(barCode, source))
.thenReturn(Observable.just(true));
Foo result = store.get(barCode).toBlocking().first();
Foo result = store.get(barCode).blockingFirst();
assertThat(result.bar).isEqualTo(KEY);
result = store.get(barCode).toBlocking().first();
result = store.get(barCode).blockingFirst();
assertThat(result.bar).isEqualTo(KEY);
verify(fetcher, times(1)).fetch(barCode);
}

View file

@ -13,9 +13,9 @@ import org.mockito.MockitoAnnotations;
import java.io.ByteArrayInputStream;
import io.reactivex.Observable;
import okio.BufferedSource;
import okio.Okio;
import rx.Observable;
import static com.google.common.base.Charsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
@ -65,9 +65,9 @@ public class SourceFilerReaderWriterStoreTest {
when(fileWriter.write(barCode, source))
.thenReturn(Observable.just(true));
Foo result = simpleStore.get(barCode).toBlocking().first();
Foo result = simpleStore.get(barCode).blockingFirst();
assertThat(result.bar).isEqualTo(KEY);
result = simpleStore.get(barCode).toBlocking().first();
result = simpleStore.get(barCode).blockingFirst();
assertThat(result.bar).isEqualTo(KEY);
verify(fetcher, times(1)).fetch(barCode);
}

View file

@ -44,22 +44,23 @@ public class SourcePersisterTest {
.thenReturn(true);
when(fileSystem.read(simple.toString())).thenReturn(bufferedSource);
BufferedSource returnedValue = sourcePersister.read(simple).toBlocking().single();
BufferedSource returnedValue = sourcePersister.read(simple).blockingSingle();
assertThat(returnedValue).isEqualTo(bufferedSource);
}
@Test
@SuppressWarnings("CheckReturnValue")
public void readDoesNotExist() throws FileNotFoundException {
expectedException.expect(NoSuchElementException.class);
when(fileSystem.exists(SourcePersister.pathForBarcode(simple)))
.thenReturn(false);
sourcePersister.read(simple).toBlocking().single();
sourcePersister.read(simple).blockingSingle();
}
@Test
public void write() throws IOException {
assertThat(sourcePersister.write(simple, bufferedSource).toBlocking().single()).isTrue();
assertThat(sourcePersister.write(simple, bufferedSource).blockingSingle()).isTrue();
}
@Test

View file

@ -14,8 +14,8 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.runners.MockitoJUnitRunner;
import io.reactivex.Observable;
import okio.BufferedSource;
import rx.Observable;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;

View file

@ -13,10 +13,10 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.runners.MockitoJUnitRunner;
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import okio.BufferedSource;
import rx.Observable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -81,20 +81,22 @@ public class StoreRefreshWhenStaleTest {
when(persister.write(barCode, network1))
.thenReturn(Observable.just(true));
BufferedSource result = store.get(barCode)
.test()
.awaitTerminalEvent()
.getOnNextEvents()
.get(0);
assertThat(result).isEqualTo(disk1);
TestObserver testObserver = store
.get(barCode)
.test();
testObserver.awaitTerminalEvent();
testObserver.assertNoErrors();
testObserver.assertResult(disk1);
verify(fetcher, times(0)).fetch(barCode);
verify(persister, times(1)).getRecordState(barCode);
store.clear(barCode);
result = store.get(barCode).test().awaitTerminalEvent().getOnNextEvents().get(0);
assertThat(result).isEqualTo(disk2);
testObserver = store
.get(barCode)
.test();
testObserver.awaitTerminalEvent();
testObserver.assertResult(disk2);
verify(fetcher, times(0)).fetch(barCode);
verify(persister, times(2)).getRecordState(barCode);
}
}

View file

@ -5,6 +5,7 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nytimes.android.external.store.base.Parser;
import com.nytimes.android.external.store.util.ParserException;
import java.io.IOException;
import java.io.Reader;
@ -13,6 +14,8 @@ import java.lang.reflect.Type;
import javax.annotation.Nonnull;
import javax.inject.Inject;
import io.reactivex.annotations.NonNull;
public class JacksonReaderParser<Parsed> implements Parser<Reader, Parsed> {
private final ObjectMapper objectMapper;
@ -30,11 +33,11 @@ public class JacksonReaderParser<Parsed> implements Parser<Reader, Parsed> {
}
@Override
public Parsed call(@Nonnull Reader reader) {
public Parsed apply(@NonNull Reader reader) throws ParserException {
try {
return objectMapper.readValue(reader, parsedType);
} catch (IOException e) {
return null;
throw new ParserException(e.getMessage(), e);
}
}
}

View file

@ -4,15 +4,16 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nytimes.android.external.store.base.Parser;
import com.nytimes.android.external.store.util.ParserException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import io.reactivex.annotations.NonNull;
import okio.BufferedSource;
public class JacksonSourceParser<Parsed> implements Parser<BufferedSource, Parsed> {
@ -32,14 +33,13 @@ public class JacksonSourceParser<Parsed> implements Parser<BufferedSource, Parse
}
@Override
@Nullable
@SuppressWarnings("PMD.EmptyCatchBlock")
public Parsed call(@Nonnull BufferedSource source) {
InputStream inputStream = source.inputStream();
@SuppressWarnings({"PMD.EmptyCatchBlock"})
public Parsed apply(@NonNull BufferedSource bufferedSource) throws ParserException {
InputStream inputStream = bufferedSource.inputStream();
try {
return objectMapper.readValue(inputStream, parsedType);
} catch (IOException e) {
return null;
throw new ParserException(e.getMessage(), e);
} finally {
try {
if (inputStream != null) {

View file

@ -4,14 +4,16 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.nytimes.android.external.store.base.Parser;
import com.nytimes.android.external.store.util.ParserException;
import java.io.IOException;
import java.lang.reflect.Type;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import io.reactivex.annotations.NonNull;
public class JacksonStringParser<Parsed> implements Parser<String, Parsed> {
private final ObjectMapper objectMapper;
@ -29,12 +31,11 @@ public class JacksonStringParser<Parsed> implements Parser<String, Parsed> {
}
@Override
@Nullable
public Parsed call(@Nonnull String source) {
public Parsed apply(@NonNull String s) throws ParserException {
try {
return objectMapper.readValue(source, parsedType);
return objectMapper.readValue(s, parsedType);
} catch (IOException e) {
return null;
throw new ParserException(e.getMessage(), e);
}
}
}

View file

@ -20,7 +20,8 @@ import org.mockito.MockitoAnnotations;
import java.io.Reader;
import java.io.StringReader;
import rx.Observable;
import io.reactivex.Observable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -66,7 +67,7 @@ public class JacksonReaderParserStoreTest {
.parser(parser)
.open();
Foo result = store.get(barCode).toBlocking().first();
Foo result = store.get(barCode).blockingFirst();
validateFoo(result);
@ -85,7 +86,7 @@ public class JacksonReaderParserStoreTest {
.parser(parser)
.open();
Foo result = store.get(barCode).toBlocking().first();
Foo result = store.get(barCode).blockingFirst();
validateFoo(result);

View file

@ -20,9 +20,9 @@ import org.mockito.MockitoAnnotations;
import java.io.ByteArrayInputStream;
import java.nio.charset.Charset;
import io.reactivex.Observable;
import okio.BufferedSource;
import okio.Okio;
import rx.Observable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -74,7 +74,7 @@ public class JacksonSourceParserStoreTest {
.parser(parser)
.open();
Foo result = store.get(barCode).toBlocking().first();
Foo result = store.get(barCode).blockingFirst();
validateFoo(result);
@ -93,7 +93,7 @@ public class JacksonSourceParserStoreTest {
.parser(parser)
.open();
Foo result = store.get(barCode).toBlocking().first();
Foo result = store.get(barCode).blockingFirst();
validateFoo(result);

View file

@ -17,7 +17,8 @@ import org.junit.rules.ExpectedException;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import rx.Observable;
import io.reactivex.Observable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -61,7 +62,7 @@ public class JacksonStringParserStoreTest {
.parser(JacksonParserFactory.createStringParser(Foo.class))
.open();
Foo result = store.get(barCode).toBlocking().first();
Foo result = store.get(barCode).blockingFirst();
validateFoo(result);
@ -80,7 +81,7 @@ public class JacksonStringParserStoreTest {
.parser(parser)
.open();
Foo result = store.get(barCode).toBlocking().first();
Foo result = store.get(barCode).blockingFirst();
validateFoo(result);

View file

@ -1,6 +1,7 @@
package com.nytimes.android.external.store.middleware.moshi;
import com.nytimes.android.external.store.base.Parser;
import com.nytimes.android.external.store.util.ParserException;
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
@ -8,9 +9,9 @@ import java.io.IOException;
import java.lang.reflect.Type;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import io.reactivex.annotations.NonNull;
import okio.BufferedSource;
public class MoshiSourceParser<Parsed> implements Parser<BufferedSource, Parsed> {
@ -23,12 +24,11 @@ public class MoshiSourceParser<Parsed> implements Parser<BufferedSource, Parsed>
}
@Override
@Nullable
public Parsed call(BufferedSource source) {
public Parsed apply(@NonNull BufferedSource bufferedSource) throws ParserException {
try {
return jsonAdapter.fromJson(source);
return jsonAdapter.fromJson(bufferedSource);
} catch (IOException e) {
return null;
throw new ParserException(e.getMessage(), e);
}
}
}

View file

@ -1,6 +1,7 @@
package com.nytimes.android.external.store.middleware.moshi;
import com.nytimes.android.external.store.base.Parser;
import com.nytimes.android.external.store.util.ParserException;
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
@ -8,9 +9,10 @@ import java.io.IOException;
import java.lang.reflect.Type;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import io.reactivex.annotations.NonNull;
public class MoshiStringParser<Parsed> implements Parser<String, Parsed> {
private final JsonAdapter<Parsed> jsonAdapter;
@ -20,13 +22,13 @@ public class MoshiStringParser<Parsed> implements Parser<String, Parsed> {
jsonAdapter = moshi.adapter(type);
}
@Override
@Nullable
public Parsed call(@Nonnull String source) {
public Parsed apply(@NonNull String s) throws ParserException {
try {
return jsonAdapter.fromJson(source);
return jsonAdapter.fromJson(s);
} catch (IOException e) {
return null;
throw new ParserException(e.getMessage(), e);
}
}
}

View file

@ -18,9 +18,9 @@ import org.mockito.MockitoAnnotations;
import java.io.ByteArrayInputStream;
import java.nio.charset.Charset;
import io.reactivex.Observable;
import okio.BufferedSource;
import okio.Okio;
import rx.Observable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -74,7 +74,7 @@ public class MoshiSourceParserTest {
.parser(parser)
.open();
Foo result = store.get(barCode).toBlocking().first();
Foo result = store.get(barCode).blockingFirst();
assertEquals(result.number, 123);
assertEquals(result.string, "abc");

View file

@ -15,7 +15,8 @@ import org.junit.rules.ExpectedException;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import rx.Observable;
import io.reactivex.Observable;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.times;
@ -58,7 +59,7 @@ public class MoshiStringParserStoreTest {
.parser(MoshiParserFactory.createStringParser(Foo.class))
.open();
Foo result = store.get(barCode).toBlocking().first();
Foo result = store.get(barCode).blockingFirst();
assertEquals(result.number, 123);
assertEquals(result.string, "abc");

View file

@ -2,13 +2,15 @@ package com.nytimes.android.external.store.middleware;
import com.google.gson.Gson;
import com.nytimes.android.external.store.base.Parser;
import com.nytimes.android.external.store.util.ParserException;
import java.io.Reader;
import java.lang.reflect.Type;
import javax.annotation.Nonnull;
import javax.inject.Inject;
import io.reactivex.annotations.NonNull;
import static com.nytimes.android.external.cache.Preconditions.checkNotNull;
public class GsonReaderParser<Parsed> implements Parser<Reader, Parsed> {
@ -25,7 +27,7 @@ public class GsonReaderParser<Parsed> implements Parser<Reader, Parsed> {
}
@Override
public Parsed call(@Nonnull Reader reader) {
public Parsed apply(@NonNull Reader reader) throws ParserException {
return gson.fromJson(reader, type);
}
}

View file

@ -3,15 +3,16 @@ package com.nytimes.android.external.store.middleware;
import com.google.gson.Gson;
import com.nytimes.android.external.store.base.Parser;
import com.nytimes.android.external.store.util.ParserException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.nio.charset.Charset;
import javax.annotation.Nonnull;
import javax.inject.Inject;
import io.reactivex.annotations.NonNull;
import okio.BufferedSource;
import static com.nytimes.android.external.cache.Preconditions.checkNotNull;
@ -42,11 +43,11 @@ public class GsonSourceParser<Parsed> implements Parser<BufferedSource, Parsed>
}
@Override
public Parsed call(@Nonnull BufferedSource source) {
try (InputStreamReader reader = new InputStreamReader(source.inputStream(), Charset.forName("UTF-8"))) {
public Parsed apply(@NonNull BufferedSource bufferedSource) throws ParserException {
try (InputStreamReader reader = new InputStreamReader(bufferedSource.inputStream(), Charset.forName("UTF-8"))) {
return gson.fromJson(reader, type);
} catch (IOException e) {
throw new RuntimeException(e);
throw new ParserException(e.getMessage(), e);
}
}
}

View file

@ -2,10 +2,14 @@ package com.nytimes.android.external.store.middleware;
import com.google.gson.Gson;
import com.nytimes.android.external.store.base.Parser;
import com.nytimes.android.external.store.util.ParserException;
import java.lang.reflect.Type;
import javax.inject.Inject;
import io.reactivex.annotations.NonNull;
import static com.nytimes.android.external.cache.Preconditions.checkNotNull;
public class GsonStringParser<Parsed> implements Parser<String, Parsed> {
@ -22,7 +26,7 @@ public class GsonStringParser<Parsed> implements Parser<String, Parsed> {
}
@Override
public Parsed call(String source) {
return gson.fromJson(source, type);
public Parsed apply(@NonNull String s) throws ParserException {
return gson.fromJson(s, type);
}
}

View file

@ -15,9 +15,9 @@ import org.mockito.MockitoAnnotations;
import java.io.ByteArrayInputStream;
import io.reactivex.Observable;
import okio.BufferedSource;
import okio.Okio;
import rx.Observable;
import static com.google.common.base.Charsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
@ -67,9 +67,9 @@ public class GenericParserStoreTest {
when(persister.write(barCode, source))
.thenReturn(Observable.just(true));
Foo result = simpleStore.get(barCode).toBlocking().first();
Foo result = simpleStore.get(barCode).blockingFirst();
assertThat(result.bar).isEqualTo(KEY);
result = simpleStore.get(barCode).toBlocking().first();
result = simpleStore.get(barCode).blockingFirst();
assertThat(result.bar).isEqualTo(KEY);
verify(fetcher, times(1)).fetch(barCode);
}

View file

@ -18,9 +18,9 @@ import java.io.ByteArrayInputStream;
import java.util.Arrays;
import java.util.List;
import io.reactivex.Observable;
import okio.BufferedSource;
import okio.Okio;
import rx.Observable;
import static com.google.common.base.Charsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
@ -76,7 +76,7 @@ public class GsonSourceListParserTest {
when(persister.write(barCode, source))
.thenReturn(Observable.just(true));
List<Foo> result = simpleStore.get(barCode).toBlocking().first();
List<Foo> result = simpleStore.get(barCode).blockingFirst();
assertThat(result.get(0).value).isEqualTo("a");
assertThat(result.get(1).value).isEqualTo("b");
assertThat(result.get(2).value).isEqualTo("c");

View file

@ -5,7 +5,7 @@ version = VERSION_NAME
dependencies {
compile project(path: ':cache')
compile libraries.rxJava
compile libraries.rxJava2
compile libraries.jsr305
testCompile libraries.mockito

View file

@ -2,7 +2,8 @@ package com.nytimes.android.external.store.base;
import javax.annotation.Nonnull;
import rx.Observable;
import io.reactivex.Observable;
public interface DiskRead<Raw, Key> {
@Nonnull

View file

@ -2,7 +2,7 @@ package com.nytimes.android.external.store.base;
import javax.annotation.Nonnull;
import rx.Observable;
import io.reactivex.Observable;
public interface DiskWrite<Raw, Key> {
/**

View file

@ -2,7 +2,8 @@ package com.nytimes.android.external.store.base;
import javax.annotation.Nonnull;
import rx.Observable;
import io.reactivex.Observable;
/**
* Interface for fetching new data for a Store

View file

@ -4,7 +4,7 @@ import com.nytimes.android.external.store.base.impl.Store;
import javax.annotation.Nonnull;
import rx.Observable;
import io.reactivex.Observable;
/**
* this interface allows us to mark a {@link Store} as "internal", exposing methods for retrieving data

View file

@ -1,7 +1,14 @@
package com.nytimes.android.external.store.base;
import rx.functions.Func1;
import com.nytimes.android.external.store.util.ParserException;
import io.reactivex.annotations.NonNull;
import io.reactivex.functions.Function;
//just a marker interface allowing for a reimplementation of how the parser is implemented
public interface Parser<Raw, Parsed> extends Func1<Raw, Parsed> {
public interface Parser<Raw, Parsed> extends Function<Raw, Parsed> {
@Override
Parsed apply(@NonNull Raw raw) throws ParserException;
}

View file

@ -2,7 +2,7 @@ package com.nytimes.android.external.store.base;
import javax.annotation.Nonnull;
import rx.Observable;
import io.reactivex.Observable;
/**
* Interface for fetching data from persister

View file

@ -6,6 +6,8 @@ import com.nytimes.android.external.store.util.ParserException;
import java.util.ArrayList;
import java.util.List;
import io.reactivex.annotations.NonNull;
import static com.nytimes.android.external.cache.Preconditions.checkArgument;
import static com.nytimes.android.external.cache.Preconditions.checkNotNull;
@ -28,12 +30,13 @@ public class MultiParser<Key, Raw, Parsed> implements KeyParser<Key, Raw, Parsed
}
@Override
@NonNull
@SuppressWarnings("unchecked")
public Parsed call(Key key, Raw raw) {
public Parsed apply(@NonNull Key key, @NonNull Raw raw) throws ParserException {
Object parsed = raw;
for (KeyParser parser : parsers) {
try {
parsed = parser.call(key, parsed);
parsed = parser.apply(key, parsed);
} catch (ClassCastException exception) {
throw createParserException();
}

View file

@ -17,14 +17,15 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import rx.Observable;
import rx.annotations.Experimental;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.annotations.Experimental;
import io.reactivex.annotations.NonNull;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.PublishSubject;
import static com.nytimes.android.external.store.base.impl.StoreUtil.persisterIsStale;
import static com.nytimes.android.external.store.base.impl.StoreUtil.shouldReturnNetworkBeforeStale;
@ -41,9 +42,9 @@ import static com.nytimes.android.external.store.base.impl.StoreUtil.shouldRetur
final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed, Key> {
Cache<Key, Observable<Parsed>> inFlightRequests;
Cache<Key, Observable<Parsed>> memCache;
StalePolicy stalePolicy;
Persister<Raw, Key> persister;
KeyParser<Key, Raw, Parsed> parser;
StalePolicy stalePolicy;
private final PublishSubject<Key> refreshSubject = PublishSubject.create();
private Fetcher<Raw, Key> fetcher;
@ -69,11 +70,11 @@ final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed,
if (memoryPolicy == null) {
memoryPolicy = MemoryPolicy
.builder()
.setMemorySize(getCacheSize())
.setExpireAfter(getCacheTTL())
.setExpireAfterTimeUnit(getCacheTTLTimeUnit())
.build();
.builder()
.setMemorySize(getCacheSize())
.setExpireAfter(getCacheTTL())
.setExpireAfterTimeUnit(getCacheTTLTimeUnit())
.build();
}
initMemCache(memoryPolicy);
@ -88,22 +89,22 @@ final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed,
if (expireAfterToSeconds > maximumInFlightRequestsDuration) {
inFlightRequests = CacheBuilder
.newBuilder()
.expireAfterWrite(maximumInFlightRequestsDuration, TimeUnit.SECONDS)
.build();
.newBuilder()
.expireAfterWrite(maximumInFlightRequestsDuration, TimeUnit.SECONDS)
.build();
} else {
inFlightRequests = CacheBuilder.newBuilder()
.expireAfterWrite(memoryPolicy.getExpireAfter(), memoryPolicy.getExpireAfterTimeUnit())
.build();
.expireAfterWrite(memoryPolicy.getExpireAfter(), memoryPolicy.getExpireAfterTimeUnit())
.build();
}
}
private void initMemCache(MemoryPolicy memoryPolicy) {
memCache = CacheBuilder
.newBuilder()
.maximumSize(memoryPolicy.getMaxSize())
.expireAfterWrite(memoryPolicy.getExpireAfter(), memoryPolicy.getExpireAfterTimeUnit())
.build();
.newBuilder()
.maximumSize(memoryPolicy.getMaxSize())
.expireAfterWrite(memoryPolicy.getExpireAfter(), memoryPolicy.getExpireAfterTimeUnit())
.build();
}
/**
@ -114,8 +115,8 @@ final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed,
@Override
public Observable<Parsed> get(@Nonnull final Key key) {
return Observable.concat(
lazyCache(key),
fetch(key)
lazyCache(key),
fetch(key)
).take(1);
}
@ -124,7 +125,7 @@ final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed,
@Experimental
public Observable<Parsed> getRefreshing(@Nonnull final Key key) {
return get(key)
.compose(StoreUtil.<Parsed, Key>repeatWhenCacheEvicted(refreshSubject, key));
.compose(StoreUtil.<Parsed, Key>repeatWhenCacheEvicted(refreshSubject, key));
}
@ -133,13 +134,13 @@ final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed,
*/
private Observable<Parsed> lazyCache(@Nonnull final Key key) {
return Observable
.defer(new Func0<Observable<Parsed>>() {
@Override
public Observable<Parsed> call() {
return cache(key);
}
})
.onErrorResumeNext(new OnErrorResumeWithEmpty<Parsed>());
.defer(new Callable<ObservableSource<? extends Parsed>>() {
@Override
public ObservableSource<? extends Parsed> call() {
return cache(key);
}
})
.onErrorResumeNext(new OnErrorResumeWithEmpty<Parsed>());
}
Observable<Parsed> cache(@Nonnull final Key key) {
@ -147,8 +148,7 @@ final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed,
return memCache.get(key, new Callable<Observable<Parsed>>() {
@Nonnull
@Override
@SuppressWarnings("PMD.SignatureDeclareThrowsException")
public Observable<Parsed> call() throws Exception {
public Observable<Parsed> call() {
return disk(key);
}
});
@ -184,35 +184,36 @@ final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed,
Observable<Parsed> readDisk(@Nonnull final Key key) {
return persister().read(key)
.onErrorResumeNext(new OnErrorResumeWithEmpty<Raw>())
.map(new Func1<Raw, Parsed>() {
@Override
public Parsed call(Raw raw) {
return parser.call(key, raw);
}
})
.doOnNext(new Action1<Parsed>() {
@Override
public void call(Parsed parsed) {
updateMemory(key, parsed);
if (stalePolicy == StalePolicy.REFRESH_ON_STALE
&& persisterIsStale(key, persister)) {
backfillCache(key);
.onErrorResumeNext(new OnErrorResumeWithEmpty<Raw>())
.map(new Function<Raw, Parsed>() {
@Override
public Parsed apply(@NonNull Raw raw) {
return parser.apply(key, raw);
}
}
}).cache();
})
.doOnNext(new Consumer<Parsed>() {
@Override
public void accept(@NonNull Parsed parsed) {
updateMemory(key, parsed);
if (stalePolicy == StalePolicy.REFRESH_ON_STALE
&& persisterIsStale(key, persister)) {
backfillCache(key);
}
}
}).cache();
}
@SuppressWarnings("CheckReturnValue")
void backfillCache(@Nonnull Key key) {
fetch(key).subscribe(new Action1<Parsed>() {
fetch(key).subscribe(new Consumer<Parsed>() {
@Override
public void call(Parsed parsed) {
//do Nothing we are just backfilling cache
public void accept(@NonNull Parsed parsed) {
// do Nothing we are just backfilling cache
}
}, new Action1<Throwable>() {
}, new Consumer<Throwable>() {
@Override
public void call(Throwable throwable) {
//do nothing as we are just backfilling cache
public void accept(@NonNull Throwable throwable) {
// do nothing as we are just backfilling cache
}
});
}
@ -227,10 +228,9 @@ final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed,
@Nonnull
@Override
public Observable<Parsed> fetch(@Nonnull final Key key) {
return Observable.defer(new Func0<Observable<Parsed>>() {
@Nullable
return Observable.defer(new Callable<ObservableSource<? extends Parsed>>() {
@Override
public Observable<Parsed> call() {
public ObservableSource<? extends Parsed> call() {
return fetchAndPersist(key);
}
});
@ -264,41 +264,41 @@ final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed,
@Nonnull
Observable<Parsed> response(@Nonnull final Key key) {
return fetcher()
.fetch(key)
.flatMap(new Func1<Raw, Observable<Parsed>>() {
@Override
public Observable<Parsed> call(Raw raw) {
return persister().write(key, raw)
.flatMap(new Func1<Boolean, Observable<Parsed>>() {
@Override
public Observable<Parsed> call(Boolean aBoolean) {
return readDisk(key);
}
});
}
})
.onErrorResumeNext(new Func1<Throwable, Observable<? extends Parsed>>() {
@Override
public Observable<? extends Parsed> call(Throwable throwable) {
if (stalePolicy == StalePolicy.NETWORK_BEFORE_STALE) {
return readDisk(key);
.fetch(key)
.flatMap(new Function<Raw, ObservableSource<Parsed>>() {
@Override
public ObservableSource<Parsed> apply(@NonNull Raw raw) {
return persister().write(key, raw)
.flatMap(new Function<Boolean, ObservableSource<Parsed>>() {
@Override
public ObservableSource<Parsed> apply(@NonNull Boolean aBoolean) {
return readDisk(key);
}
});
}
return Observable.error(throwable);
}
})
.doOnNext(new Action1<Parsed>() {
@Override
public void call(Parsed data) {
notifySubscribers(data);
}
})
.doOnTerminate(new Action0() {
@Override
public void call() {
inFlightRequests.invalidate(key);
}
})
.cache();
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Parsed>>() {
@Override
public ObservableSource<? extends Parsed> apply(@NonNull Throwable throwable) {
if (stalePolicy == StalePolicy.NETWORK_BEFORE_STALE) {
return readDisk(key);
}
return Observable.error(throwable);
}
})
.doOnNext(new Consumer<Parsed>() {
@Override
public void accept(@NonNull Parsed parsed) {
notifySubscribers(parsed);
}
})
.doOnTerminate(new Action() {
@Override
public void run() {
inFlightRequests.invalidate(key);
}
})
.cache();
}
void notifySubscribers(Parsed data) {
@ -314,7 +314,7 @@ final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed,
@Override
public Observable<Parsed> stream(@Nonnull Key key) {
Observable<Parsed> stream = subject.asObservable();
Observable<Parsed> stream = subject.hide();
//If nothing was emitted through the subject yet, start stream with get() value
if (!subject.hasValue()) {
@ -327,7 +327,7 @@ final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed,
@Nonnull
@Override
public Observable<Parsed> stream() {
return subject.asObservable();
return subject.hide();
}
/**

View file

@ -12,7 +12,8 @@ import com.nytimes.android.external.store.util.NoopPersister;
import javax.annotation.Nonnull;
import rx.Observable;
import io.reactivex.Observable;
import static com.nytimes.android.external.store.base.impl.StalePolicy.UNSPECIFIED;

View file

@ -16,7 +16,8 @@ import java.util.List;
import javax.annotation.Nonnull;
import rx.Observable;
import io.reactivex.Observable;
/**
* Builder where there parser is used.

View file

@ -2,8 +2,11 @@ package com.nytimes.android.external.store.base.impl;
import javax.annotation.Nonnull;
import rx.Observable;
import rx.functions.Func1;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.annotations.NonNull;
import io.reactivex.functions.Function;
import static com.nytimes.android.external.cache.Preconditions.checkNotNull;
@ -11,7 +14,7 @@ import static com.nytimes.android.external.cache.Preconditions.checkNotNull;
* A Transformer that takes a source observable and re-subscribes to the upstream Observable when
* it emits.
*/
final class RepeatWhenEmits<T> implements Observable.Transformer<T, T> {
final class RepeatWhenEmits<T> implements ObservableTransformer<T, T> {
private final Observable source;
@ -25,13 +28,13 @@ final class RepeatWhenEmits<T> implements Observable.Transformer<T, T> {
}
@Override
public Observable<T> call(Observable<T> upstream) {
return upstream.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> events) {
return events.switchMap(new Func1<Void, Observable<?>>() {
public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) {
return objectObservable.switchMap(new Function<Object, ObservableSource<?>>() {
@Override
public Observable<?> call(Void aVoid) {
public ObservableSource<?> apply(@NonNull Object o) {
return source;
}
});

View file

@ -3,8 +3,9 @@ package com.nytimes.android.external.store.base.impl;
import javax.annotation.Nonnull;
import rx.Observable;
import rx.annotations.Experimental;
import io.reactivex.Observable;
import io.reactivex.annotations.Experimental;
/**
* a {@link com.nytimes.android.external.store.base.impl.StoreBuilder StoreBuilder}

View file

@ -2,7 +2,7 @@ package com.nytimes.android.external.store.base.impl;
import javax.annotation.Nonnull;
import rx.annotations.Beta;
import io.reactivex.annotations.Beta;
/**

View file

@ -6,9 +6,12 @@ import com.nytimes.android.external.store.base.RecordState;
import javax.annotation.Nonnull;
import rx.Observable;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;
import io.reactivex.annotations.NonNull;
import io.reactivex.functions.Predicate;
import io.reactivex.subjects.PublishSubject;
import static com.nytimes.android.external.store.base.RecordState.STALE;
@ -17,11 +20,11 @@ final class StoreUtil {
}
@Nonnull
static <Parsed, Key> Observable.Transformer<Parsed, Parsed>
static <Parsed, Key> ObservableTransformer<Parsed, Parsed>
repeatWhenCacheEvicted(PublishSubject<Key> refreshSubject, @Nonnull final Key keyForRepeat) {
Observable<Key> filter = refreshSubject.filter(new Func1<Key, Boolean>() {
Observable<Key> filter = refreshSubject.filter(new Predicate<Key>() {
@Override
public Boolean call(Key key) {
public boolean test(@NonNull Key key) throws Exception {
return key.equals(keyForRepeat);
}
});

View file

@ -1,13 +1,11 @@
package com.nytimes.android.external.store.util;
import javax.annotation.Nonnull;
import rx.functions.Func2;
public interface KeyParser<Key, Raw, Parsed> extends Func2<Key, Raw, Parsed> {
import io.reactivex.annotations.NonNull;
import io.reactivex.functions.BiFunction;
public interface KeyParser<Key, Raw, Parsed> extends BiFunction<Key, Raw, Parsed> {
@Override
@Nonnull
Parsed call(@Nonnull Key key, @Nonnull Raw raw);
Parsed apply(@NonNull Key key, @NonNull Raw raw) throws ParserException;
}

View file

@ -4,6 +4,8 @@ import com.nytimes.android.external.store.base.Parser;
import javax.annotation.Nonnull;
import io.reactivex.annotations.NonNull;
public class NoKeyParser<Key, Raw, Parsed> implements KeyParser<Key, Raw, Parsed> {
private final Parser<Raw, Parsed> parser;
@ -12,8 +14,7 @@ public class NoKeyParser<Key, Raw, Parsed> implements KeyParser<Key, Raw, Parsed
}
@Override
@Nonnull
public Parsed call(@Nonnull Key key, @Nonnull Raw raw) {
return parser.call(raw);
public Parsed apply(@NonNull Key key, @NonNull Raw raw) throws ParserException {
return parser.apply(raw);
}
}

View file

@ -2,12 +2,15 @@ package com.nytimes.android.external.store.util;
import com.nytimes.android.external.store.base.Parser;
import io.reactivex.annotations.NonNull;
/**
* Pass-through parser for stores that parse externally
*/
public class NoopParserFunc<Raw, Parsed> implements Parser<Raw, Parsed> {
@Override
public Object call(Object object) {
return object;
public Parsed apply(@NonNull Raw raw) throws ParserException {
return (Parsed) raw;
}
}

View file

@ -11,7 +11,8 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import rx.Observable;
import io.reactivex.Observable;
/**
* Pass-through diskdao for stores that don't want to use persister

View file

@ -1,15 +1,17 @@
package com.nytimes.android.external.store.util;
import rx.Observable;
import rx.functions.Func1;
import io.reactivex.Observable;
import io.reactivex.annotations.NonNull;
import io.reactivex.functions.Function;
/**
* Resume with empty observable on error
*/
public class OnErrorResumeWithEmpty<Parsed> implements Func1<Throwable, Observable<? extends Parsed>> {
public class OnErrorResumeWithEmpty<Parsed> implements Function<Throwable, Observable<? extends Parsed>> {
@Override
public Observable<? extends Parsed> call(Throwable throwable) {
public Observable<? extends Parsed> apply(@NonNull Throwable throwable) {
return Observable.empty();
}
}

View file

@ -5,7 +5,15 @@ package com.nytimes.android.external.store.util;
*/
public class ParserException extends RuntimeException {
public ParserException(Throwable cause) {
super(cause);
}
public ParserException(String message) {
super(message);
}
public ParserException(String message, Throwable cause) {
super(message, cause);
}
}

View file

@ -12,7 +12,8 @@ import java.util.concurrent.Callable;
import javax.annotation.Nonnull;
import rx.Observable;
import io.reactivex.Observable;
import static org.assertj.core.api.Assertions.assertThat;

View file

@ -16,7 +16,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import rx.Observable;
import io.reactivex.Observable;
import static com.nytimes.android.external.store.GetRefreshingTest.ClearingPersister;
import static org.assertj.core.api.Assertions.assertThat;

View file

@ -12,7 +12,8 @@ import java.util.concurrent.Callable;
import javax.annotation.Nonnull;
import rx.Observable;
import io.reactivex.Observable;
public class DontCacheErrorsTest {
@ -47,12 +48,13 @@ public class DontCacheErrorsTest {
shouldThrow = true;
store.get(barcode).test()
.awaitTerminalEvent()
.assertError(Exception.class);
.assertTerminated()
.assertError(Exception.class)
.awaitTerminalEvent();
shouldThrow = false;
store.get(barcode).test()
.awaitTerminalEvent()
.assertNoErrors();
.assertNoErrors()
.awaitTerminalEvent();
}
}

View file

@ -18,8 +18,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import rx.Observable;
import rx.observers.AssertableSubscriber;
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
@ -63,16 +63,16 @@ public class GetRefreshingTest {
when(persister.write(barcode, 2)).thenReturn(Observable.just(true));
AssertableSubscriber<Integer> refreshingObservable = store.getRefreshing(barcode).test();
assertThat(refreshingObservable.getValueCount()).isEqualTo(1);
TestObserver<Integer> refreshingObservable = store.getRefreshing(barcode).test();
refreshingObservable.assertValueCount(1);
assertThat(networkCalls.intValue()).isEqualTo(1);
//clearing the store should produce another network call
store.clear(barcode);
assertThat(refreshingObservable.getValueCount()).isEqualTo(2);
refreshingObservable.assertValueCount(2);
assertThat(networkCalls.intValue()).isEqualTo(2);
store.get(barcode).test().awaitTerminalEvent();
assertThat(refreshingObservable.getValueCount()).isEqualTo(2);
refreshingObservable.assertValueCount(2);
assertThat(networkCalls.intValue()).isEqualTo(2);
}
@ -98,10 +98,10 @@ public class GetRefreshingTest {
when(persister.write(barcode2, 1)).thenReturn(Observable.just(true));
when(persister.write(barcode2, 2)).thenReturn(Observable.just(true));
AssertableSubscriber<Integer> testObservable1 = store.getRefreshing(barcode1).test();
AssertableSubscriber<Integer> testObservable2 = store.getRefreshing(barcode2).test();
assertThat(testObservable1.getValueCount()).isEqualTo(1);
assertThat(testObservable2.getValueCount()).isEqualTo(1);
TestObserver<Integer> testObservable1 = store.getRefreshing(barcode1).test();
TestObserver<Integer> testObservable2 = store.getRefreshing(barcode2).test();
testObservable1.assertValueCount(1);
testObservable2.assertValueCount(1);
assertThat(networkCalls.intValue()).isEqualTo(2);

View file

@ -12,8 +12,10 @@ import org.mockito.runners.MockitoJUnitRunner;
import javax.annotation.Nonnull;
import rx.Observable;
import rx.observers.AssertableSubscriber;
import io.reactivex.Observable;
import io.reactivex.annotations.NonNull;
import io.reactivex.observers.TestObserver;
@RunWith(MockitoJUnitRunner.class)
public class KeyParserTest {
@ -27,7 +29,7 @@ public class KeyParserTest {
store = StoreBuilder.<Integer, String, String>parsedWithKey()
.parser(new KeyParser<Integer, String, String>() {
@Override
public String call(Integer integer, String s) {
public String apply(@NonNull Integer integer, @NonNull String s) {
return s + integer;
}
})
@ -42,12 +44,11 @@ public class KeyParserTest {
}
@Test
@SuppressWarnings("PMD.SignatureDeclareThrowsException")
public void testStoreWithKeyParserFuncNoPersister() throws Exception {
AssertableSubscriber<String> testObservable = store.get(KEY).test().awaitTerminalEvent();
TestObserver<String> testObservable = store.get(KEY).test().await();
testObservable.assertNoErrors()
.assertValues(NETWORK + KEY)
.assertUnsubscribed();
.awaitTerminalEvent();
}
}

View file

@ -12,7 +12,8 @@ import java.util.concurrent.Callable;
import javax.annotation.Nonnull;
import rx.Observable;
import io.reactivex.Observable;
import static org.assertj.core.api.Assertions.assertThat;

View file

@ -14,7 +14,8 @@ import java.util.Date;
import javax.annotation.Nonnull;
import rx.Observable;
import io.reactivex.Observable;
import io.reactivex.annotations.NonNull;
import static org.assertj.core.api.Assertions.assertThat;
@ -48,7 +49,7 @@ public class StoreBuilderTest {
})
.parser(new Parser<String, Date>() {
@Override
public Date call(String s) {
public Date apply(@NonNull String s) {
return DATE;
}
})
@ -73,9 +74,9 @@ public class StoreBuilderTest {
}
})
.open();
Date result = store.get(5).toBlocking().first();
result = barCodeStore.get(new BarCode("test", "5")).toBlocking().first();
result = keyStore.get(5).toBlocking().first();
Date result = store.get(5).blockingFirst();
result = barCodeStore.get(new BarCode("test", "5")).blockingFirst();
result = keyStore.get(5).blockingFirst();
assertThat(result).isNotNull();
}

View file

@ -4,9 +4,9 @@ import com.nytimes.android.external.cache.Cache;
import com.nytimes.android.external.cache.CacheBuilder;
import com.nytimes.android.external.store.base.Fetcher;
import com.nytimes.android.external.store.base.Persister;
import com.nytimes.android.external.store.base.impl.Store;
import com.nytimes.android.external.store.base.impl.BarCode;
import com.nytimes.android.external.store.base.impl.RealStore;
import com.nytimes.android.external.store.base.impl.Store;
import com.nytimes.android.external.store.base.impl.StoreBuilder;
import com.nytimes.android.external.store.util.NoopPersister;
@ -18,10 +18,11 @@ import org.mockito.MockitoAnnotations;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Emitter;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func2;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.annotations.NonNull;
import io.reactivex.functions.BiFunction;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.spy;
@ -65,10 +66,10 @@ public class StoreTest {
when(persister.write(barCode, NETWORK))
.thenReturn(Observable.just(true));
String value = simpleStore.get(barCode).toBlocking().first();
String value = simpleStore.get(barCode).blockingFirst();
assertThat(value).isEqualTo(DISK);
value = simpleStore.get(barCode).toBlocking().first();
value = simpleStore.get(barCode).blockingFirst();
assertThat(value).isEqualTo(DISK);
verify(fetcher, times(1)).fetch(barCode);
}
@ -83,9 +84,9 @@ public class StoreTest {
.open();
Observable<String> networkObservable =
Observable.fromEmitter(new Action1<Emitter<String>>() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void call(Emitter<String> emitter) {
public void subscribe(ObservableEmitter<String> emitter) {
if (counter.incrementAndGet() == 1) {
emitter.onNext(NETWORK);
@ -93,7 +94,9 @@ public class StoreTest {
emitter.onError(new RuntimeException("Yo Dawg your inflight is broken"));
}
}
}, Emitter.BackpressureMode.NONE);
});
when(fetcher.fetch(barCode))
.thenReturn(networkObservable);
@ -106,14 +109,13 @@ public class StoreTest {
String response = simpleStore.get(barCode).zipWith(simpleStore.get(barCode),
new Func2<String, String, String>() {
new BiFunction<String, String, String>() {
@Override
public String call(String s, String s2) {
public String apply(@NonNull String s, @NonNull String s2) {
return "hello";
}
})
.toBlocking()
.first();
.blockingFirst();
assertThat(response).isEqualTo("hello");
verify(fetcher, times(1)).fetch(barCode);
}
@ -133,9 +135,9 @@ public class StoreTest {
.thenReturn(Observable.just(DISK));
when(persister.write(barCode, NETWORK)).thenReturn(Observable.just(true));
String value = simpleStore.get(barCode).toBlocking().first();
String value = simpleStore.get(barCode).blockingFirst();
assertThat(value).isEqualTo(DISK);
value = simpleStore.get(barCode).toBlocking().first();
value = simpleStore.get(barCode).blockingFirst();
assertThat(value).isEqualTo(DISK);
verify(fetcher, times(1)).fetch(barCode);
}
@ -151,14 +153,14 @@ public class StoreTest {
when(fetcher.fetch(barCode))
.thenReturn(Observable.just(NETWORK));
String value = simpleStore.get(barCode).toBlocking().first();
String value = simpleStore.get(barCode).blockingFirst();
verify(fetcher, times(1)).fetch(barCode);
verify(persister, times(1)).write(barCode, NETWORK);
verify(persister, times(2)).read(barCode);
assertThat(value).isEqualTo(NETWORK);
value = simpleStore.get(barCode).toBlocking().first();
value = simpleStore.get(barCode).blockingFirst();
verify(persister, times(2)).read(barCode);
verify(persister, times(1)).write(barCode, NETWORK);
verify(fetcher, times(1)).fetch(barCode);

View file

@ -11,7 +11,8 @@ import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import rx.Observable;
import io.reactivex.Observable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.times;
@ -32,7 +33,7 @@ public class StoreWithParserTest {
private final BarCode barCode = new BarCode("key", "value");
@Test
public void testSimple() {
public void testSimple() throws Exception {
MockitoAnnotations.initMocks(this);
@ -52,17 +53,17 @@ public class StoreWithParserTest {
when(persister.write(barCode, NETWORK))
.thenReturn(Observable.just(true));
when(parser.call(DISK)).thenReturn(barCode.getKey());
when(parser.apply(DISK)).thenReturn(barCode.getKey());
String value = simpleStore.get(barCode).toBlocking().first();
String value = simpleStore.get(barCode).blockingFirst();
assertThat(value).isEqualTo(barCode.getKey());
value = simpleStore.get(barCode).toBlocking().first();
value = simpleStore.get(barCode).blockingFirst();
assertThat(value).isEqualTo(barCode.getKey());
verify(fetcher, times(1)).fetch(barCode);
}
@Test
public void testSubclass() {
public void testSubclass() throws Exception {
MockitoAnnotations.initMocks(this);
Store<String, BarCode> simpleStore = new SampleParsingStore(fetcher, persister, parser);
@ -77,11 +78,11 @@ public class StoreWithParserTest {
when(persister.write(barCode, NETWORK))
.thenReturn(Observable.just(true));
when(parser.call(DISK)).thenReturn(barCode.getKey());
when(parser.apply(DISK)).thenReturn(barCode.getKey());
String value = simpleStore.get(barCode).toBlocking().first();
String value = simpleStore.get(barCode).blockingFirst();
assertThat(value).isEqualTo(barCode.getKey());
value = simpleStore.get(barCode).toBlocking().first();
value = simpleStore.get(barCode).blockingFirst();
assertThat(value).isEqualTo(barCode.getKey());
verify(fetcher, times(1)).fetch(barCode);
}

View file

@ -19,21 +19,21 @@ public class MultiParserTest {
private static final Parser<Integer, String> PARSER_1 = new Parser<Integer, String>() {
@Override
public String call(Integer value) {
public String apply(Integer value) {
return String.valueOf(value);
}
};
private static final Parser<String, BarCode> PARSER_2 = new Parser<String, BarCode>() {
@Override
public BarCode call(String value) {
public BarCode apply(String value) {
return new BarCode(value, "KEY");
}
};
private static final Parser<BarCode, UUID> PARSER_3 = new Parser<BarCode, UUID>() {
@Override
public UUID call(BarCode barCode) {
public UUID apply(BarCode barCode) {
return UUID.randomUUID();
}
};
@ -49,7 +49,7 @@ public class MultiParserTest {
parsersChain.add(new NoKeyParser<>(PARSER_3));
KeyParser<Object, Integer, UUID> parser = new MultiParser<>(parsersChain);
UUID parsed = parser.call(new Object(), 100);
UUID parsed = parser.apply(new Object(), 100);
assertNotNull(parsed);
}
@ -64,7 +64,7 @@ public class MultiParserTest {
parsersChain.add(new NoKeyParser<>(PARSER_2));
KeyParser<Object, Integer, UUID> parser = new MultiParser<>(parsersChain);
UUID parsed = parser.call(new Object(), 100);
UUID parsed = parser.apply(new Object(), 100);
assertNotNull(parsed);
}

View file

@ -4,9 +4,9 @@ import org.junit.Test;
import java.util.concurrent.Callable;
import rx.Observable;
import rx.observers.AssertableSubscriber;
import rx.subjects.PublishSubject;
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import io.reactivex.subjects.PublishSubject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -25,7 +25,7 @@ public class RepeatWhenEmitsTest {
// create an observable and apply the transformer to test
PublishSubject<String> source = PublishSubject.create();
AssertableSubscriber<String> testSubscriber = Observable.fromCallable(mockCallable)
TestObserver<String> testSubscriber = Observable.fromCallable(mockCallable)
.compose(RepeatWhenEmits.<String>from(source))
.test();

View file

@ -12,11 +12,10 @@ public class NoopPersisterTest {
@Test
public void writeReadTest() {
NoopPersister<String, BarCode> persister = NoopPersister.create();
boolean success = persister.write(barCode, "foo").toBlocking().first();
boolean success = persister.write(barCode, "foo").blockingFirst();
assertThat(success).isTrue();
String rawValue = persister.read(barCode).toBlocking().first();
String rawValue = persister.read(barCode).blockingFirst();
assertThat(rawValue).isEqualTo("foo");
}
@ -24,7 +23,7 @@ public class NoopPersisterTest {
public void noopParserFuncTest() {
NoopParserFunc<String, String> noopParserFunc = new NoopParserFunc<>();
String input = "foo";
String output = (String) noopParserFunc.call(input);
String output = (String) noopParserFunc.apply(input);
assertThat(input).isEqualTo(output);
//intended object ref comparison
assertThat(input).isSameAs(output);