diff --git a/packages/socket-apollo-link/src/createAbsintheSocketLink.js b/packages/socket-apollo-link/src/createAbsintheSocketLink.js index b65cb8b1..d07a0b0a 100644 --- a/packages/socket-apollo-link/src/createAbsintheSocketLink.js +++ b/packages/socket-apollo-link/src/createAbsintheSocketLink.js @@ -1,9 +1,10 @@ // @flow import {ApolloLink} from "apollo-link"; -import {cancel, send, toObservable} from "@absinthe/socket"; +import {cancel, send, observe} from "@absinthe/socket"; import {compose} from "flow-static-land/lib/Fun"; import {print} from "graphql/language/printer"; +import Observable from "zen-observable"; import type { AbsintheSocket, @@ -11,6 +12,7 @@ import type { Observer } from "@absinthe/socket/compat/cjs/types"; import type {DocumentNode} from "graphql/language/ast"; +import type {Subscriber} from "zen-observable"; type ApolloOperation = { query: DocumentNode, @@ -25,28 +27,43 @@ const getRequest = ({ variables }); +const onResult = (notifier, subscriber: Subscriber) => result => { + subscriber.next(result); + + if (notifier.operationType !== "subscription") { + subscriber.complete(); + } +}; + const notifierToObservable = (absintheSocket, onError, onStart) => notifier => { let notifierStarted; let unsubscribed = false; - - return toObservable(absintheSocket, notifier, { + const observer: Observer = { onError, onStart: notifierLatest => { notifierStarted = notifierLatest; if (unsubscribed) { - cancel(absintheSocket, notifierStarted); + cancel(absintheSocket, notifierStarted, observer); } onStart && onStart(notifierLatest); - }, - unsubscribe: () => { - unsubscribed = true; + } + }; - if (notifierStarted) { - cancel(absintheSocket, notifierStarted); - } + const unsubscribe = () => { + unsubscribed = true; + if (notifierStarted) { + cancel(absintheSocket, notifierStarted, observer); } + }; + + return new Observable(subscriber => { + observer.onAbort = subscriber.error; + observer.onResult = onResult(notifier, subscriber); + observe(absintheSocket, notifier, observer); + + return unsubscribe; }); }; diff --git a/packages/socket/src/cancel.js b/packages/socket/src/cancel.js index ef7d72e9..f8028821 100644 --- a/packages/socket/src/cancel.js +++ b/packages/socket/src/cancel.js @@ -3,10 +3,12 @@ import createPushHandler from "./createPushHandler"; import handlePush from "./handlePush"; import notifierNotify from "./notifier/notify"; +import notifierRefresh from "./notifier/refresh"; import notifierRemove from "./notifier/remove"; +import notifierUnobserve from "./notifier/unobserve"; import updateNotifiers from "./updateNotifiers"; -import type {AbsintheSocket, Notifier, NotifierPushHandler} from "./types"; +import type {AbsintheSocket, Notifier, Observer, NotifierPushHandler} from "./types"; // TODO: improve this type type UnsubscribeResponse = {}; @@ -52,12 +54,24 @@ const unsubscribe = (absintheSocket, notifier) => */ const cancel = ( absintheSocket: AbsintheSocket, - notifier: Notifier + notifier: Notifier, + observer: Observer ): AbsintheSocket => { - if (notifier.operationType === "subscription") { - unsubscribe(absintheSocket, notifier); - } else { + observer.onCancel && observer.onCancel(); + + notifier = notifierUnobserve(notifier, observer); + if (notifier.observers.length === 0) { + // this was the last observer -> remove the whole notifier and + // unsubscribe the subscription if necessary. removeNotifiers(absintheSocket, notifier); + if (notifier.operationType === "subscription") { + unsubscribe(absintheSocket, notifier); + } + } + else { + // there are other observers left -> only refresh the list + // with the updated notifier. + updateNotifiers(absintheSocket, notifierRefresh(notifier)); } return absintheSocket;