Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Note: this fix changes the signature of the cancel function which is also used in socket-relay and socket-graphiql. However, I did not yet adapt them.
  • Loading branch information
mpoeter committed Aug 14, 2018
1 parent 88a8322 commit 86e5d47
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 15 deletions.
37 changes: 27 additions & 10 deletions packages/socket-apollo-link/src/createAbsintheSocketLink.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
// @flow

import {ApolloLink} from "apollo-link";
import {cancel, send, toObservable} from "@absinthe/socket";
import {cancel, send, observe} from "@absinthe/socket";
import {compose} from "flow-static-land/lib/Fun";
import {print} from "graphql/language/printer";
import Observable from "zen-observable";

import type {
AbsintheSocket,
GqlRequest,
Observer
} from "@absinthe/socket/compat/cjs/types";
import type {DocumentNode} from "graphql/language/ast";
import type {Subscriber} from "zen-observable";

type ApolloOperation<Variables> = {
query: DocumentNode,
Expand All @@ -25,28 +27,43 @@ const getRequest = <Variables: Object>({
variables
});

const onResult = (notifier, subscriber: Subscriber<any>) => result => {
subscriber.next(result);

if (notifier.operationType !== "subscription") {
subscriber.complete();
}
};

const notifierToObservable = (absintheSocket, onError, onStart) => notifier => {
let notifierStarted;
let unsubscribed = false;

return toObservable(absintheSocket, notifier, {
const observer: Observer<any> = {
onError,
onStart: notifierLatest => {
notifierStarted = notifierLatest;

if (unsubscribed) {
cancel(absintheSocket, notifierStarted);
cancel(absintheSocket, notifierStarted, observer);
}

onStart && onStart(notifierLatest);
},
unsubscribe: () => {
unsubscribed = true;
}
};

if (notifierStarted) {
cancel(absintheSocket, notifierStarted);
}
const unsubscribe = () => {
unsubscribed = true;
if (notifierStarted) {
cancel(absintheSocket, notifierStarted, observer);
}
};

return new Observable(subscriber => {
observer.onAbort = subscriber.error;
observer.onResult = onResult(notifier, subscriber);
observe(absintheSocket, notifier, observer);

return unsubscribe;
});
};

Expand Down
24 changes: 19 additions & 5 deletions packages/socket/src/cancel.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import createPushHandler from "./createPushHandler";
import handlePush from "./handlePush";
import notifierNotify from "./notifier/notify";
import notifierRefresh from "./notifier/refresh";
import notifierRemove from "./notifier/remove";
import notifierUnobserve from "./notifier/unobserve";
import updateNotifiers from "./updateNotifiers";

import type {AbsintheSocket, Notifier, NotifierPushHandler} from "./types";
import type {AbsintheSocket, Notifier, Observer, NotifierPushHandler} from "./types";

// TODO: improve this type
type UnsubscribeResponse = {};
Expand Down Expand Up @@ -52,12 +54,24 @@ const unsubscribe = (absintheSocket, notifier) =>
*/
const cancel = (
absintheSocket: AbsintheSocket,
notifier: Notifier<any>
notifier: Notifier<any>,
observer: Observer<any>
): AbsintheSocket => {
if (notifier.operationType === "subscription") {
unsubscribe(absintheSocket, notifier);
} else {
observer.onCancel && observer.onCancel();

notifier = notifierUnobserve(notifier, observer);
if (notifier.observers.length === 0) {
// this was the last observer -> remove the whole notifier and
// unsubscribe the subscription if necessary.
removeNotifiers(absintheSocket, notifier);
if (notifier.operationType === "subscription") {
unsubscribe(absintheSocket, notifier);
}
}
else {
// there are other observers left -> only refresh the list
// with the updated notifier.
updateNotifiers(absintheSocket, notifierRefresh(notifier));
}

return absintheSocket;
Expand Down

0 comments on commit 86e5d47

Please sign in to comment.