Skip to content

Commit

Permalink
cache query & use share delay
Browse files Browse the repository at this point in the history
  • Loading branch information
GrandSchtroumpf committed Jul 21, 2021
1 parent eeb372c commit c9dcb86
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 22 deletions.
2 changes: 1 addition & 1 deletion projects/akita-ng-fire/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "akita-ng-fire",
"version": "6.0.3",
"version": "6.0.4",
"peerDependencies": {
"@angular/common": ">=9.0.0 <13",
"@angular/core": ">=9.0.0 <13",
Expand Down
59 changes: 38 additions & 21 deletions projects/akita-ng-fire/src/lib/collection/collection.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import {
AngularFirestore,
DocumentChangeAction,
QueryFn,
QueryGroupFn
QueryGroupFn,
Query
} from '@angular/fire/firestore';
import {
EntityStore,
Expand All @@ -26,10 +27,11 @@ import {
} from '../utils/sync-from-action';
import { WriteOptions, SyncOptions, PathParams, UpdateCallback, AtomicWrite } from '../utils/types';
import { Observable, isObservable, of, combineLatest } from 'rxjs';
import { tap, map, switchMap, shareReplay } from 'rxjs/operators';
import { tap, map, switchMap } from 'rxjs/operators';
import { getStoreName } from '../utils/store-options';
import { pathWithParams } from '../utils/path-with-params';
import { hasChildGetter } from '../utils/has-path-getter';
import { shareWithDelay } from '../utils/share-delay';

export type CollectionState<E = any> = EntityState<E, string> & ActiveState<string>;

Expand All @@ -52,7 +54,8 @@ export function isTransaction(write: AtomicWrite): write is firebase.firestore.T
export class CollectionService<S extends EntityState<EntityType, string>, EntityType = getEntityType<S>> {
// keep memory of the current ids to listen to (for syncManyDocs)
private idsToListen: Record<string, string[]> = {};
private memo: Record<string, Observable<EntityType>> = {};
private memoPath: Record<string, Observable<EntityType>> = {};
private memoQuery: Map<Query, Observable<EntityType[]>> = new Map();
protected db: AngularFirestore;

protected onCreate?(entity: EntityType, options: WriteOptions): any;
Expand Down Expand Up @@ -81,14 +84,22 @@ export class CollectionService<S extends EntityState<EntityType, string>, Entity
}
}

private fromMemo(path: string): Observable<EntityType> {
if (!this.memo[path]) {
this.memo[path] = this.db.doc<EntityType>(path).valueChanges().pipe(
map(doc => this.formatFromFirestore(doc)),
shareReplay({ bufferSize: 1, refCount: true })
);
private fromMemo(key: string | Query, cb: () => Observable<any>): Observable<any> {
if (!this.useMemorization) return cb();
if (typeof key === 'string') {
if (!this.memoPath[key]) {
this.memoPath[key] = cb().pipe(shareWithDelay());
}
return this.memoPath[key];
} else {
for (const query of this.memoQuery.keys()) {
if (typeof query !== 'string' && query.isEqual(key)) {
return this.memoQuery.get(query)!;
}
}
this.memoQuery.set(key, cb().pipe(shareWithDelay()));
return this.memoQuery.get(key)!;
}
return this.memo[path];
}

protected getPath(options: PathParams) {
Expand Down Expand Up @@ -496,28 +507,34 @@ export class CollectionService<S extends EntityState<EntityType, string>, Entity
const path = this.getPath(options);
// If path targets a collection ( odd number of segments after the split )
if (typeof idOrQuery === 'string') {
if (this.useMemorization) {
return this.fromMemo(`${path}/${idOrQuery}`);
}
return this.db.doc<EntityType>(`${path}/${idOrQuery}`).valueChanges().pipe(
const key = `${path}/${idOrQuery}`;
const query = () => this.db.doc<EntityType>(key).valueChanges();
return this.fromMemo(key, query).pipe(
map(doc => this.formatFromFirestore(doc))
);
}
let docs$: Observable<EntityType[]>;
if (Array.isArray(idOrQuery)) {
if (!idOrQuery.length) return of([]);
if (this.useMemorization) {
return combineLatest(idOrQuery.map(id => this.fromMemo(`${path}/${id}`)));
}
const queries = idOrQuery.map(id => this.db.doc<EntityType>(`${path}/${id}`).valueChanges());
const queries = idOrQuery.map(id => {
const key = `${path}/${id}`;
const query = () => this.db.doc<EntityType>(key).valueChanges();
return this.fromMemo(key, query) as Observable<EntityType>;
});
docs$ = combineLatest(queries);
} else if (typeof idOrQuery === 'function') {
docs$ = this.db.collection<EntityType>(path, idOrQuery).valueChanges();
const query = idOrQuery(this.db.collection(path).ref);
const cb = () => this.db.collection<EntityType>(path, idOrQuery).valueChanges();
docs$ = this.fromMemo(query, cb);
} else if (typeof idOrQuery === 'object') {
const subpath = this.getPath(idOrQuery);
docs$ = this.db.collection<EntityType>(subpath).valueChanges();
const query = this.db.collection(subpath).ref;
const cb = () => this.db.collection<EntityType>(subpath).valueChanges();
docs$ = this.fromMemo(query, cb);
} else {
docs$ = this.db.collection<EntityType>(path, idOrQuery).valueChanges();
const query = this.db.collection(path).ref;
const cb = () => this.db.collection<EntityType>(path, idOrQuery).valueChanges();
docs$ = this.fromMemo(query, cb);
}
return docs$.pipe(
map(docs => docs.map(doc => this.formatFromFirestore(doc)))
Expand Down
72 changes: 72 additions & 0 deletions projects/akita-ng-fire/src/lib/utils/share-delay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import {
MonoTypeOperatorFunction,
Observable,
ReplaySubject,
Subscriber,
Subscription
} from 'rxjs';

/**
* Replay the data and share it across source.
* It will unsubscribe after a delay when there is no more subscriber
* This is useful if you unsubscribe from a page & resubscribe on the other
* @note code based on shareReplay of rxjs v6.6.7: https://github.com/ReactiveX/rxjs/blob/6.6.7/src/internal/operators/shareReplay.ts
* @param delay Delay in ms to wait before unsubscribing
*/
export function shareWithDelay<T>(delay: number = 100): MonoTypeOperatorFunction<T> {
let subject: ReplaySubject<T> | undefined;
let subscription: Subscription | undefined;
let refCount = 0;
let hasError = false;
let isComplete = false;
function operation(this: Subscriber<T>, source: Observable<T>) {
refCount++;
let innerSub: Subscription | undefined;
if (!subject || hasError) {
hasError = false;
subject = new ReplaySubject<T>(1, Infinity);
innerSub = subject.subscribe(this);
subscription = source.subscribe({
next(value) {
subject?.next(value);
},
error(err) {
hasError = true;
subject?.error(err);
},
complete() {
isComplete = true;
subscription = undefined;
subject?.complete();
}
});

// Here we need to check to see if the source synchronously completed. Although
// we're setting `subscription = undefined` in the completion handler, if the source
// is synchronous, that will happen *before* subscription is set by the return of
// the `subscribe` call.
if (isComplete) {
subscription = undefined;
}
} else {
innerSub = subject.subscribe(this);
}

this.add(() => {
refCount--;
innerSub?.unsubscribe();
innerSub = undefined;

// await some ms before unsubscribing
setTimeout(() => {
if (subscription && !isComplete && refCount === 0) {
subscription.unsubscribe();
subscription = undefined;
subject = undefined;
}
}, delay);
});
}

return (source: Observable<T>) => source.lift(operation);
}

0 comments on commit c9dcb86

Please sign in to comment.