Tech/record stale fs (#111)

networkBeforeStale and refreshOnStale
This commit is contained in:
Mike Nakhimovich 2017-02-07 09:45:48 -05:00 committed by GitHub
parent e76f29261d
commit 1c7bc502b0
24 changed files with 593 additions and 78 deletions

View file

@ -0,0 +1,32 @@
package com.nytimes.android.external.fs;
import com.nytimes.android.external.fs.filesystem.FileSystem;
import com.nytimes.android.external.store.base.RecordProvider;
import com.nytimes.android.external.store.base.RecordState;
import com.nytimes.android.external.store.base.impl.BarCode;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.inject.Inject;
public class RecordPersister extends SourcePersister implements RecordProvider<BarCode> {
@Nonnull
private final TimeUnit expirationUnit;
private final long expirationDuration;
@Inject
public RecordPersister(FileSystem fileSystem,
long expirationDuration,
@Nonnull TimeUnit expirationUnit) {
super(fileSystem);
this.expirationDuration = expirationDuration;
this.expirationUnit = expirationUnit;
}
@Override
public RecordState getRecordState(@Nonnull BarCode barCode) {
return sourceFileReader.getRecordState(barCode, expirationUnit, expirationDuration);
}
}

View file

@ -2,10 +2,12 @@ package com.nytimes.android.external.fs;
import com.nytimes.android.external.fs.filesystem.FileSystem;
import com.nytimes.android.external.store.base.DiskRead;
import com.nytimes.android.external.store.base.RecordState;
import com.nytimes.android.external.store.base.impl.BarCode;
import java.io.FileNotFoundException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.inject.Inject;
@ -39,4 +41,11 @@ public class SourceFileReader implements DiskRead<BufferedSource, BarCode> {
public boolean exists(@Nonnull BarCode barCode) {
return fileSystem.exists(pathForBarcode(barCode));
}
@Nonnull
public RecordState getRecordState(@Nonnull BarCode barCode,
@Nonnull TimeUnit expirationUnit,
long expirationDuration) {
return fileSystem.getRecordState(expirationUnit, expirationDuration, pathForBarcode(barCode));
}
}

View file

@ -5,9 +5,8 @@ import com.nytimes.android.external.fs.filesystem.FileSystem;
import com.nytimes.android.external.store.base.Persister;
import com.nytimes.android.external.store.base.impl.BarCode;
import javax.inject.Inject;
import javax.annotation.Nonnull;
import javax.inject.Inject;
import okio.BufferedSource;
import rx.Observable;
@ -21,12 +20,12 @@ import rx.Observable;
* .parser(new GsonSourceParser<>(gson, BookResults.class))
* .open();
*/
public class SourcePersister implements Persister<BufferedSource, BarCode> {
public class SourcePersister implements Persister<BufferedSource, BarCode>{
@Nonnull
private final SourceFileReader sourceFileReader;
final SourceFileReader sourceFileReader;
@Nonnull
private final SourceFileWriter sourceFileWriter;
final SourceFileWriter sourceFileWriter;
@Inject
public SourcePersister(FileSystem fileSystem) {

View file

@ -1,7 +1,5 @@
package com.nytimes.android.external.fs;
import javax.annotation.Nonnull;
import com.nytimes.android.external.fs.filesystem.FileSystem;
import com.nytimes.android.external.fs.filesystem.FileSystemFactory;
import com.nytimes.android.external.store.base.Persister;
@ -10,6 +8,8 @@ import com.nytimes.android.external.store.base.impl.BarCode;
import java.io.File;
import java.io.IOException;
import javax.annotation.Nonnull;
import okio.BufferedSource;
/**

View file

@ -82,5 +82,9 @@ class FSFile {
}
throw new FileNotFoundException(pathValue);
}
public long lastModified() {
return file.lastModified();
}
}

View file

@ -1,9 +1,12 @@
package com.nytimes.android.external.fs.filesystem;
import com.nytimes.android.external.store.base.RecordState;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
@ -92,4 +95,12 @@ public interface FileSystem {
* @return exists, duh
*/
boolean exists(String file);
/**
* compares age of file with given expiration time and returns
* appropriate recordState
*/
RecordState getRecordState(@Nonnull TimeUnit expirationUnit,
long expirationDuration,
@Nonnull String path);
}

View file

@ -1,10 +1,10 @@
package com.nytimes.android.external.fs.filesystem;
import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
import javax.annotation.Nonnull;
/**
* Factory for {@link FileSystem}.
*/

View file

@ -3,6 +3,7 @@ package com.nytimes.android.external.fs.filesystem;
import com.nytimes.android.external.cache.CacheLoader;
import com.nytimes.android.external.cache.LoadingCache;
import com.nytimes.android.external.fs.Util;
import com.nytimes.android.external.store.base.RecordState;
import java.io.File;
import java.io.FileNotFoundException;
@ -10,6 +11,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -91,6 +93,21 @@ class FileSystemImpl implements FileSystem {
return getFile(path).exists();
}
@Override
public RecordState getRecordState(@Nonnull TimeUnit expirationUnit, long expirationDuration, @Nonnull String path) {
FSFile file = getFile(path);
if (!file.exists()) {
return RecordState.MISSING;
}
long now = System.currentTimeMillis();
long cuttOffPoint = now - TimeUnit.MILLISECONDS.convert(expirationDuration, expirationUnit);
if (file.lastModified() < cuttOffPoint) {
return RecordState.STALE;
} else {
return RecordState.FRESH;
}
}
@Nullable
private FSFile getFile(@Nonnull String path) {
return files.getUnchecked(cleanPath(path));

View file

@ -0,0 +1,94 @@
package com.nytimes.android.external.fs;
import com.nytimes.android.external.fs.filesystem.FileSystem;
import com.nytimes.android.external.store.base.RecordState;
import com.nytimes.android.external.store.base.impl.BarCode;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import okio.BufferedSource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
public class RecordPersisterTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Mock
FileSystem fileSystem;
@Mock
BufferedSource bufferedSource;
private RecordPersister sourcePersister;
private final BarCode simple = new BarCode("type", "key");
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
sourcePersister = new RecordPersister(fileSystem, 1L, TimeUnit.DAYS);
}
@Test
public void readExists() throws FileNotFoundException {
when(fileSystem.exists(SourcePersister.pathForBarcode(simple)))
.thenReturn(true);
when(fileSystem.read(SourcePersister.pathForBarcode(simple))).thenReturn(bufferedSource);
BufferedSource returnedValue = sourcePersister.read(simple).toBlocking().single();
assertThat(returnedValue).isEqualTo(bufferedSource);
}
@Test
public void freshTest() {
when(fileSystem.getRecordState(TimeUnit.DAYS, 1L, SourcePersister.pathForBarcode(simple)))
.thenReturn(RecordState.FRESH);
assertThat(sourcePersister.getRecordState(simple)).isEqualTo(RecordState.FRESH);
}
@Test
public void staleTest() {
when(fileSystem.getRecordState(TimeUnit.DAYS, 1L, SourcePersister.pathForBarcode(simple)))
.thenReturn(RecordState.STALE);
assertThat(sourcePersister.getRecordState(simple)).isEqualTo(RecordState.STALE);
}
@Test
public void missingTest() {
when(fileSystem.getRecordState(TimeUnit.DAYS, 1L, SourcePersister.pathForBarcode(simple)))
.thenReturn(RecordState.MISSING);
assertThat(sourcePersister.getRecordState(simple)).isEqualTo(RecordState.MISSING);
}
@Test
public void readDoesNotExist() throws FileNotFoundException {
expectedException.expect(NoSuchElementException.class);
when(fileSystem.exists(SourcePersister.pathForBarcode(simple)))
.thenReturn(false);
sourcePersister.read(simple).toBlocking().single();
}
@Test
public void write() throws IOException {
assertThat(sourcePersister.write(simple, bufferedSource).toBlocking().single()).isTrue();
}
@Test
public void pathForBarcode() {
assertThat(SourcePersister.pathForBarcode(simple)).isEqualTo("typekey");
}
}

View file

@ -0,0 +1,106 @@
package com.nytimes.android.external.fs;
import com.nytimes.android.external.store.base.Fetcher;
import com.nytimes.android.external.store.base.RecordState;
import com.nytimes.android.external.store.base.beta.Store;
import com.nytimes.android.external.store.base.impl.BarCode;
import com.nytimes.android.external.store.base.impl.StoreBuilder;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.runners.MockitoJUnitRunner;
import okio.BufferedSource;
import rx.Observable;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class StoreNetworkBeforeStaleTest {
@Mock
Fetcher<BufferedSource, BarCode> fetcher;
@Mock
RecordPersister persister;
@Mock
BufferedSource network1;
@Mock
BufferedSource network2;
@Mock
BufferedSource disk1;
@Mock
BufferedSource disk2;
private final BarCode barCode = new BarCode("key", "value");
private Store<BufferedSource, BarCode> store;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
store = StoreBuilder.<BufferedSource>barcode()
.fetcher(fetcher)
.persister(persister)
.networkBeforeStale()
.open();
}
@Test
public void networkBeforeDiskWhenStale() {
when(fetcher.fetch(barCode))
.thenReturn(Observable.<BufferedSource>error(new Exception()));
when(persister.read(barCode))
.thenReturn(Observable.just(disk1)); //get should return from disk
when(persister.getRecordState(barCode)).thenReturn(RecordState.STALE);
when(persister.write(barCode, network1))
.thenReturn(Observable.just(true));
store.get(barCode).test().awaitTerminalEvent();
InOrder inOrder = inOrder(fetcher, persister);
inOrder.verify(fetcher, times(1)).fetch(barCode);
inOrder.verify(persister, times(1)).read(barCode);
verify(persister, never()).write(barCode, network1);
}
@Test
public void noNetworkBeforeStaleWhenMissingRecord() {
when(fetcher.fetch(barCode))
.thenReturn(Observable.just(network1));
when(persister.read(barCode))
.thenReturn(Observable.just(disk1)); //get should return from disk
when(persister.getRecordState(barCode)).thenReturn(RecordState.MISSING);
when(persister.write(barCode, network1))
.thenReturn(Observable.just(true));
store.get(barCode).test().awaitTerminalEvent();
InOrder inOrder = inOrder(fetcher, persister);
inOrder.verify(fetcher, times(1)).fetch(barCode);
inOrder.verify(persister, times(1)).write(barCode, network1);
inOrder.verify(persister, times(1)).read(barCode);
}
@Test
public void noNetworkBeforeStaleWhenFreshRecord() {
when(persister.read(barCode))
.thenReturn(Observable.just(disk1)); //get should return from disk
when(persister.getRecordState(barCode)).thenReturn(RecordState.FRESH);
store.get(barCode).test().awaitTerminalEvent();
verify(fetcher, never()).fetch(barCode);
verify(persister, never()).write(barCode, network1);
verify(persister, times(1)).read(barCode);
}
}

View file

@ -0,0 +1,100 @@
package com.nytimes.android.external.fs;
import com.nytimes.android.external.store.base.Fetcher;
import com.nytimes.android.external.store.base.RecordState;
import com.nytimes.android.external.store.base.beta.Store;
import com.nytimes.android.external.store.base.impl.BarCode;
import com.nytimes.android.external.store.base.impl.StoreBuilder;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.runners.MockitoJUnitRunner;
import okio.BufferedSource;
import rx.Observable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class StoreRefreshWhenStaleTest {
@Mock
Fetcher<BufferedSource, BarCode> fetcher;
@Mock
RecordPersister persister;
@Mock
BufferedSource network1;
@Mock
BufferedSource network2;
@Mock
BufferedSource disk1;
@Mock
BufferedSource disk2;
private final BarCode barCode = new BarCode("key", "value");
private Store<BufferedSource, BarCode> store;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
store = StoreBuilder.<BufferedSource>barcode()
.fetcher(fetcher)
.persister(persister)
.refreshOnStale()
.open();
}
@Test
public void diskWasRefreshedWhenStaleRecord() {
when(fetcher.fetch(barCode))
.thenReturn(Observable.just(network1));
when(persister.read(barCode))
.thenReturn(Observable.just(disk1)); //get should return from disk
when(persister.getRecordState(barCode)).thenReturn(RecordState.STALE);
when(persister.write(barCode, network1))
.thenReturn(Observable.just(true));
store.get(barCode).test().awaitTerminalEvent();
verify(fetcher, times(1)).fetch(barCode);
verify(persister, times(2)).getRecordState(barCode);
verify(persister, times(1)).write(barCode, network1);
verify(persister, times(2)).read(barCode); //reads from disk a second time when backfilling
}
@Test
public void diskWasNotRefreshedWhenFreshRecord() {
when(fetcher.fetch(barCode))
.thenReturn(Observable.just(network1));
when(persister.read(barCode))
.thenReturn(Observable.just(disk1)) //get should return from disk
.thenReturn(Observable.just(disk2)); //backfill should read from disk again
when(persister.getRecordState(barCode)).thenReturn(RecordState.FRESH);
when(persister.write(barCode, network1))
.thenReturn(Observable.just(true));
BufferedSource result = store.get(barCode)
.test()
.awaitTerminalEvent()
.getOnNextEvents()
.get(0);
assertThat(result).isEqualTo(disk1);
verify(fetcher, times(0)).fetch(barCode);
verify(persister, times(1)).getRecordState(barCode);
store.clearMemory(barCode);
result = store.get(barCode).test().awaitTerminalEvent().getOnNextEvents().get(0);
assertThat(result).isEqualTo(disk2);
verify(fetcher, times(0)).fetch(barCode);
verify(persister, times(2)).getRecordState(barCode);
}
}

View file

@ -2,9 +2,14 @@ package com.nytimes.android.external.fs;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import okio.BufferedSource;
import okio.Okio;
import static com.google.common.base.Charsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@ -34,4 +39,8 @@ public class UtilTest {
util.createParentDirs(child);
verify(parent).mkdirs();
}
static BufferedSource source(String data) {
return Okio.buffer(Okio.source(new ByteArrayInputStream(data.getBytes(UTF_8))));
}
}

View file

@ -3,6 +3,7 @@ package com.nytimes.android.external.fs.impl;
import com.nytimes.android.external.fs.filesystem.FileSystem;
import com.nytimes.android.external.fs.filesystem.FileSystemFactory;
import com.nytimes.android.external.store.base.RecordState;
import org.junit.Before;
import org.junit.Test;
@ -11,6 +12,7 @@ import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import okio.BufferedSource;
import okio.Okio;
@ -51,7 +53,15 @@ public class SimpleTest {
assertThat(fileSystem.read("/boo").readUtf8()).isEqualTo(testString1);
fileSystem.delete("/boo");
assertThat(fileSystem.exists("/boo")).isFalse();
assertThat(fileSystem.exists("/boo")).isFalse();
}
@Test
public void testIsRecordStale() throws IOException {
fileSystem.write("/boo", source(testString1));
assertThat(fileSystem.read("/boo").readUtf8()).isEqualTo(testString1);
assertThat(fileSystem.getRecordState(TimeUnit.MINUTES, 1, "/boo")).isEqualTo(RecordState.FRESH);
assertThat(fileSystem.getRecordState(TimeUnit.MICROSECONDS, 1, "/boo")).isEqualTo(RecordState.STALE);
assertThat(fileSystem.getRecordState(TimeUnit.DAYS, 1, "/notfound")).isEqualTo(RecordState.MISSING);
}
@Test

View file

@ -9,7 +9,7 @@ import rx.Observable;
*
* @param <Raw> data type before parsing
*/
public interface Persister<Raw, Key> {
public interface Persister<Raw, Key> extends DiskRead<Raw, Key>, DiskWrite<Raw, Key> {
/**
* @param barCode to use to get data from persister

View file

@ -0,0 +1,10 @@
package com.nytimes.android.external.store.base;
import javax.annotation.Nonnull;
/**
* Created by 206847 on 2/6/17.
*/
public interface RecordProvider<Key> {
RecordState getRecordState(@Nonnull Key barCode);
}

View file

@ -0,0 +1,5 @@
package com.nytimes.android.external.store.base;
public enum RecordState {
FRESH, STALE, MISSING;
}

View file

@ -117,9 +117,11 @@ public class ParsingStoreBuilder<Raw, Parsed> {
RealInternalStore<Raw, Parsed, BarCode> realInternalStore;
if (memCache == null) {
realInternalStore = new RealInternalStore<>(fetcher, persister, multiParser);
realInternalStore = new RealInternalStore<>(fetcher, persister,
multiParser, StalePolicy.UNSPECIFIED);
} else {
realInternalStore = new RealInternalStore<>(fetcher, persister, multiParser, memCache);
realInternalStore = new RealInternalStore<>(fetcher, persister,
multiParser, memCache, StalePolicy.UNSPECIFIED);
}
return new ProxyStore<>(realInternalStore);

View file

@ -15,6 +15,8 @@ import javax.annotation.Nonnull;
import rx.Observable;
import rx.functions.Func1;
import static com.nytimes.android.external.store.base.impl.StalePolicy.UNSPECIFIED;
@Deprecated
public class ProxyStore<Parsed> implements Store<Parsed> {
@ -26,32 +28,33 @@ public class ProxyStore<Parsed> implements Store<Parsed> {
public ProxyStore(Fetcher<Parsed, BarCode> fetcher) {
internalStore = new RealInternalStore<>(fetcher, new NoopPersister<Parsed, BarCode>(),
new NoopParserFunc<Parsed, Parsed>());
new NoopParserFunc<Parsed, Parsed>(), UNSPECIFIED);
}
public ProxyStore(Fetcher<Parsed, BarCode> fetcher, Persister<Parsed, BarCode> persister) {
internalStore = new RealInternalStore<>(fetcher, persister,
new NoopParserFunc<Parsed, Parsed>());
new NoopParserFunc<Parsed, Parsed>(), UNSPECIFIED);
}
public <Raw> ProxyStore(Fetcher<Raw, BarCode> fetcher,
Persister<Raw, BarCode> persister,
Parser<Raw, Parsed> parser) {
internalStore = new RealInternalStore<>(fetcher, persister, parser);
internalStore = new RealInternalStore<>(fetcher, persister, parser, UNSPECIFIED);
}
public <Raw> ProxyStore(Fetcher<Raw, BarCode> fetcher,
Persister<Raw, BarCode> persister,
Func1<Raw, Parsed> parser, Cache<BarCode, Observable<Parsed>> memCache) {
internalStore = new RealInternalStore<>(fetcher, persister, parser, memCache);
internalStore = new RealInternalStore<>(fetcher, persister, parser, memCache, UNSPECIFIED);
}
public <Raw> ProxyStore(Fetcher<Raw, BarCode> fetcher,
Persister<Raw, BarCode> persister,
Cache<BarCode, Observable<Parsed>> memCache) {
internalStore = new RealInternalStore<>(fetcher, persister, new NoopParserFunc<Raw, Parsed>(), memCache);
internalStore = new RealInternalStore<>(fetcher, persister,
new NoopParserFunc<Raw, Parsed>(), memCache, UNSPECIFIED);
}

View file

@ -4,6 +4,7 @@ import com.nytimes.android.external.cache.Cache;
import com.nytimes.android.external.cache.CacheBuilder;
import com.nytimes.android.external.store.base.Fetcher;
import com.nytimes.android.external.store.base.InternalStore;
import com.nytimes.android.external.store.base.Parser;
import com.nytimes.android.external.store.base.Persister;
import com.nytimes.android.external.store.util.NoopPersister;
import com.nytimes.android.external.store.util.OnErrorResumeWithEmpty;
@ -25,7 +26,7 @@ import rx.functions.Func1;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;
import static java.util.Objects.requireNonNull;
import static com.nytimes.android.external.store.base.impl.StoreUtil.*;
/**
* Store to be used for loading an object different data sources
@ -39,38 +40,43 @@ import static java.util.Objects.requireNonNull;
final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed, Key> {
Cache<Key, Observable<Parsed>> inFlightRequests;
Cache<Key, Observable<Parsed>> memCache;
private StalePolicy stalePolicy;
private final PublishSubject<Key> refreshSubject = PublishSubject.create();
private Fetcher<Raw, Key> fetcher;
private Persister<Raw, Key> persister;
private Func1<Raw, Parsed> parser;
private BehaviorSubject<Parsed> subject;
RealInternalStore(Fetcher<Raw, Key> fetcher,
Persister<Raw, Key> persister,
Func1<Raw, Parsed> parser) {
memCache = CacheBuilder.newBuilder()
.maximumSize(getCacheSize())
.expireAfterAccess(getCacheTTL(), TimeUnit.SECONDS)
.build();
init(fetcher, persister, parser, memCache);
}
RealInternalStore(Fetcher<Raw, Key> fetcher,
Persister<Raw, Key> persister,
Func1<Raw, Parsed> parser,
Cache<Key, Observable<Parsed>> memCache) {
init(fetcher, persister, parser, memCache);
Cache<Key, Observable<Parsed>> memCache,
StalePolicy stalePolicy) {
init(fetcher, persister, parser, memCache, stalePolicy);
}
RealInternalStore(Fetcher<Raw, Key> fetcher,
Persister<Raw, Key> persister,
Parser<Raw, Parsed> parser,
StalePolicy stalePolicy) {
memCache = CacheBuilder.newBuilder()
.maximumSize(getCacheSize())
.expireAfterAccess(getCacheTTL(), TimeUnit.SECONDS)
.build();
init(fetcher, persister, parser, memCache, stalePolicy);
}
private void init(Fetcher<Raw, Key> fetcher,
Persister<Raw, Key> persister,
Func1<Raw, Parsed> parser,
Cache<Key, Observable<Parsed>> memCache) {
Cache<Key, Observable<Parsed>> memCache, StalePolicy stalePolicy) {
this.fetcher = fetcher;
this.persister = persister;
this.parser = parser;
this.memCache = memCache;
this.stalePolicy = stalePolicy;
inFlightRequests = CacheBuilder.newBuilder()
.expireAfterWrite(TimeUnit.MINUTES.toSeconds(1), TimeUnit.SECONDS)
.build();
@ -95,39 +101,8 @@ final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed,
@Nonnull
@Experimental
public Observable<Parsed> getRefreshing(@Nonnull final Key barCode) {
return get(barCode).compose(repeatWhenCacheEvicted(barCode));
}
@Nonnull
private Observable.Transformer<Parsed, Parsed> repeatWhenCacheEvicted(@Nonnull final Key key) {
Observable<Key> filter = refreshSubject.filter(new Func1<Key, Boolean>() {
@Override
public Boolean call(Key barCode) {
return barCode.equals(key);
}
});
return from(filter);
}
@Nonnull
private <T> Observable.Transformer<T, T> from(@Nonnull final Observable retrySource) {
requireNonNull(retrySource);
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> source) {
return source.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> events) {
return events.switchMap(new Func1<Void, Observable<?>>() {
@Override
public Observable<?> call(Void aVoid) {
return retrySource;
}
});
}
});
}
};
return get(barCode)
.compose(StoreUtil.<Parsed, Key>repeatWhenCacheEvicted(refreshSubject, barCode));
}
@ -176,6 +151,14 @@ final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed,
*/
@Override
public Observable<Parsed> disk(@Nonnull final Key barCode) {
if (shouldReturnNetworkBeforeStale(persister, stalePolicy, barCode)) {
return Observable.empty();
}
return readDisk(barCode);
}
private Observable<Parsed> readDisk(@Nonnull final Key barCode) {
return persister().read(barCode)
.onErrorResumeNext(new OnErrorResumeWithEmpty<Raw>())
.map(parser)
@ -183,10 +166,29 @@ final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed,
@Override
public void call(Parsed parsed) {
updateMemory(barCode, parsed);
if (stalePolicy == StalePolicy.REFRESH_ON_STALE
&& persisterIsStale(barCode, persister)) {
backfillCache(barCode);
}
}
}).cache();
}
private void backfillCache(@Nonnull Key barCode) {
fetch(barCode).subscribe(new Action1<Parsed>() {
@Override
public void call(Parsed parsed) {
//do Nothing we are just backfilling cache
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
//do nothing as we are just backfilling cache
}
});
}
/**
* Will check to see if there exists an in flight observable and return it before
* going to nerwork
@ -239,16 +241,21 @@ final class RealInternalStore<Raw, Parsed, Key> implements InternalStore<Parsed,
public Observable<Parsed> call(Raw raw) {
return persister().write(barCode, raw)
.flatMap(new Func1<Boolean, Observable<Parsed>>() {
@Nonnull
@Override
public Observable<Parsed> call(Boolean aBoolean) {
return disk(barCode);
return readDisk(barCode);
}
});
}
})
.onErrorResumeNext(new Func1<Throwable, Observable<? extends Parsed>>() {
public Observable<? extends Parsed> call(Throwable throwable) {
if (stalePolicy == StalePolicy.NETWORK_BEFORE_STALE) {
return readDisk(barCode);
}
return Observable.error(throwable);
}
})
.doOnNext(new Action1<Parsed>() {
@Override
public void call(Parsed data) {
notifySubscribers(data);
}

View file

@ -14,6 +14,8 @@ import javax.annotation.Nonnull;
import rx.Observable;
import rx.functions.Func1;
import static com.nytimes.android.external.store.base.impl.StalePolicy.UNSPECIFIED;
public class RealStore<Parsed, Key> implements Store<Parsed, Key> {
private final InternalStore<Parsed, Key> internalStore;
@ -24,32 +26,33 @@ public class RealStore<Parsed, Key> implements Store<Parsed, Key> {
public RealStore(Fetcher<Parsed, Key> fetcher) {
internalStore = new RealInternalStore<>(fetcher, new NoopPersister<Parsed, Key>(),
new NoopParserFunc<Parsed, Parsed>());
new NoopParserFunc<Parsed, Parsed>(), UNSPECIFIED);
}
public RealStore(Fetcher<Parsed, Key> fetcher, Persister<Parsed, Key> persister) {
internalStore = new RealInternalStore<>(fetcher, persister,
new NoopParserFunc<Parsed, Parsed>());
new NoopParserFunc<Parsed, Parsed>(), UNSPECIFIED);
}
public <Raw> RealStore(Fetcher<Raw, Key> fetcher,
Persister<Raw, Key> persister,
Parser<Raw, Parsed> parser) {
internalStore = new RealInternalStore<>(fetcher, persister, parser);
internalStore = new RealInternalStore<>(fetcher, persister, parser, UNSPECIFIED);
}
public <Raw> RealStore(Fetcher<Raw, Key> fetcher,
Persister<Raw, Key> persister,
Func1<Raw, Parsed> parser, Cache<Key, Observable<Parsed>> memCache) {
internalStore = new RealInternalStore<>(fetcher, persister, parser, memCache);
internalStore = new RealInternalStore<>(fetcher, persister, parser, memCache, UNSPECIFIED);
}
public <Raw> RealStore(Fetcher<Raw, Key> fetcher,
Persister<Raw, Key> persister,
Cache<Key, Observable<Parsed>> memCache) {
internalStore = new RealInternalStore<>(fetcher, persister, new NoopParserFunc<Raw, Parsed>(), memCache);
internalStore = new RealInternalStore<>(fetcher, persister,
new NoopParserFunc<Raw, Parsed>(), memCache, UNSPECIFIED);
}

View file

@ -27,6 +27,9 @@ public class RealStoreBuilder<Raw, Parsed, Key> {
private Cache<Key, Observable<Parsed>> memCache;
private Fetcher<Raw, Key> fetcher;
@SuppressWarnings("PMD.UnusedPrivateField") //remove when it is implemented...
private StalePolicy stalePolicy = StalePolicy.UNSPECIFIED;
@Nonnull
public static <Raw, Parsed, Key> RealStoreBuilder<Raw, Parsed, Key> builder() {
return new RealStoreBuilder<>();
@ -82,6 +85,19 @@ public class RealStoreBuilder<Raw, Parsed, Key> {
this.memCache = memCache;
return this;
}
//Store will backfill the disk cache anytime a record is stale
//User will still get the stale record returned to them
public RealStoreBuilder<Raw, Parsed, Key> refreshOnStale() {
stalePolicy = StalePolicy.REFRESH_ON_STALE;
return this;
}
//Store will try to get network source when disk data is stale
//if network source throws error or is empty, stale disk data will be returned
@Nonnull
public RealStoreBuilder<Raw, Parsed, Key> networkBeforeStale() {
stalePolicy = StalePolicy.NETWORK_BEFORE_STALE;
return this;
}
@Nonnull
public Store<Parsed, Key> open() {
@ -97,9 +113,9 @@ public class RealStoreBuilder<Raw, Parsed, Key> {
RealInternalStore<Raw, Parsed, Key> realInternalStore;
if (memCache == null) {
realInternalStore = new RealInternalStore<>(fetcher, persister, multiParser);
realInternalStore = new RealInternalStore<>(fetcher, persister, multiParser, stalePolicy);
} else {
realInternalStore = new RealInternalStore<>(fetcher, persister, multiParser, memCache);
realInternalStore = new RealInternalStore<>(fetcher, persister, multiParser, memCache, stalePolicy);
}
return new RealStore<>(realInternalStore);

View file

@ -0,0 +1,5 @@
package com.nytimes.android.external.store.base.impl;
public enum StalePolicy {
UNSPECIFIED, REFRESH_ON_STALE, NETWORK_BEFORE_STALE
}

View file

@ -15,6 +15,8 @@ import javax.annotation.Nonnull;
import rx.Observable;
import rx.annotations.Beta;
import static com.nytimes.android.external.store.base.impl.StalePolicy.UNSPECIFIED;
/**
* Builder where there parser is used.
@ -24,6 +26,7 @@ public final class StoreBuilder<Raw> {
private Persister<Raw, BarCode> persister;
private Cache<BarCode, Observable<Raw>> memCache;
@Nonnull
@Deprecated
//Please Use fromTypes to build Stores, allowing customization of Barcode Type
@ -108,9 +111,11 @@ public final class StoreBuilder<Raw> {
InternalStore<Raw, BarCode> internalStore;
if (memCache == null) {
internalStore = new RealInternalStore<>(fetcher, persister, new NoopParserFunc<Raw, Raw>());
internalStore = new RealInternalStore<>(fetcher, persister,
new NoopParserFunc<Raw, Raw>(), UNSPECIFIED);
} else {
internalStore = new RealInternalStore<>(fetcher, persister, new NoopParserFunc<Raw, Raw>(), memCache);
internalStore = new RealInternalStore<>(fetcher, persister,
new NoopParserFunc<Raw, Raw>(), memCache, UNSPECIFIED);
}
return new ProxyStore<>(internalStore);

View file

@ -0,0 +1,68 @@
package com.nytimes.android.external.store.base.impl;
import com.nytimes.android.external.store.base.Persister;
import com.nytimes.android.external.store.base.RecordProvider;
import com.nytimes.android.external.store.base.RecordState;
import java.util.Objects;
import javax.annotation.Nonnull;
import rx.Observable;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
import static com.nytimes.android.external.store.base.RecordState.FRESH;
final class StoreUtil {
private StoreUtil() {
}
@Nonnull
static <Parsed, Key> Observable.Transformer<Parsed, Parsed>
repeatWhenCacheEvicted(PublishSubject<Key> refreshSubject, @Nonnull final Key key) {
Observable<Key> filter = refreshSubject.filter(new Func1<Key, Boolean>() {
@Override
public Boolean call(Key barCode) {
return barCode.equals(key);
}
});
return from(filter);
}
@Nonnull
static <T> Observable.Transformer<T, T> from(@Nonnull final Observable retrySource) {
Objects.requireNonNull(retrySource);
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> source) {
return source.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> events) {
return events.switchMap(new Func1<Void, Observable<?>>() {
@Override
public Observable<?> call(Void aVoid) {
return retrySource;
}
});
}
});
}
};
}
static <Raw, Key> boolean shouldReturnNetworkBeforeStale(
Persister<Raw, Key> persister, StalePolicy stalePolicy, Key barCode) {
return stalePolicy == StalePolicy.NETWORK_BEFORE_STALE
&& persisterIsStale(barCode, persister);
}
static <Raw, Key> boolean persisterIsStale(@Nonnull Key barCode, Persister<Raw, Key> persister) {
if (persister instanceof RecordProvider) {
RecordProvider<Key> provider = (RecordProvider<Key>) persister;
RecordState recordState = provider.getRecordState(barCode);
return recordState != FRESH;
}
return false;
}
}