Skip to content

Commit

Permalink
feat: track event seen on relays
Browse files Browse the repository at this point in the history
fix: dump/clear commands
  • Loading branch information
v0l committed Sep 16, 2024
1 parent b491443 commit 21e88b0
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 54 deletions.
8 changes: 3 additions & 5 deletions packages/app/src/Components/Event/Note/ReactionsModal.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import "./ReactionsModal.css";

import { NostrLink, socialGraphInstance, TaggedNostrEvent } from "@snort/system";
import { NostrLink, TaggedNostrEvent } from "@snort/system";
import { useEventReactions, useReactions } from "@snort/system-react";
import { useMemo, useState } from "react";
import { FormattedMessage, MessageDescriptor, useIntl } from "react-intl";
Expand All @@ -11,6 +11,7 @@ import Modal from "@/Components/Modal/Modal";
import TabSelectors, { Tab } from "@/Components/TabSelectors/TabSelectors";
import ProfileImage from "@/Components/User/ProfileImage";
import ZapAmount from "@/Components/zap-amount";
import useWoT from "@/Hooks/useWoT";

import messages from "../../messages";

Expand All @@ -29,10 +30,7 @@ const ReactionsModal = ({ onClose, event, initialTab = 0 }: ReactionsModalProps)
const { reactions, zaps, reposts } = useEventReactions(link, related);
const { positive, negative } = reactions;

const sortEvents = (events: Array<TaggedNostrEvent>) =>
events.sort(
(a, b) => socialGraphInstance.getFollowDistance(a.pubkey) - socialGraphInstance.getFollowDistance(b.pubkey),
);
const { sortEvents } = useWoT();

const likes = useMemo(() => sortEvents([...positive]), [positive]);
const dislikes = useMemo(() => sortEvents([...negative]), [negative]);
Expand Down
6 changes: 4 additions & 2 deletions packages/app/src/Components/Feed/Timeline.tsx
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import "./Timeline.css";

import { unixNow } from "@snort/shared";
import { socialGraphInstance, TaggedNostrEvent } from "@snort/system";
import { TaggedNostrEvent } from "@snort/system";
import { useCallback, useMemo, useState } from "react";

import { DisplayAs, DisplayAsSelector } from "@/Components/Feed/DisplayAsSelector";
import { TimelineRenderer } from "@/Components/Feed/TimelineRenderer";
import useTimelineFeed, { TimelineFeed, TimelineSubject } from "@/Feed/TimelineFeed";
import useHistoryState from "@/Hooks/useHistoryState";
import useLogin from "@/Hooks/useLogin";
import useWoT from "@/Hooks/useWoT";
import { dedupeByPubkey } from "@/Utils";

export interface TimelineProps {
Expand Down Expand Up @@ -41,14 +42,15 @@ const Timeline = (props: TimelineProps) => {
const feed: TimelineFeed = useTimelineFeed(props.subject, feedOptions);
const displayAsInitial = props.displayAs ?? login.feedDisplayAs ?? "list";
const [displayAs, setDisplayAs] = useState<DisplayAs>(displayAsInitial);
const wot = useWoT();

const filterPosts = useCallback(
(nts: readonly TaggedNostrEvent[]) => {
const checkFollowDistance = (a: TaggedNostrEvent) => {
if (props.followDistance === undefined) {
return true;
}
const followDistance = socialGraphInstance.getFollowDistance(a.pubkey);
const followDistance = wot.followDistance(a.pubkey);
return followDistance === props.followDistance;
};
return nts
Expand Down
11 changes: 11 additions & 0 deletions packages/app/src/Hooks/useWoT.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { TaggedNostrEvent } from "@snort/system";
import { socialGraphInstance } from "@snort/system/dist/SocialGraph/SocialGraph";

export default function useWoT() {
const sg = socialGraphInstance;
return {
sortEvents: (events: Array<TaggedNostrEvent>) =>
events.sort((a, b) => sg.getFollowDistance(a.pubkey) - sg.getFollowDistance(b.pubkey)),
followDistance: sg.getFollowDistance,
};
}
47 changes: 29 additions & 18 deletions packages/app/src/Pages/settings/Cache.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -123,24 +123,35 @@ function RelayCacheStats() {
</table>
</div>
<div className="flex flex-col gap-2">
<AsyncButton onClick={() => {}}>
<FormattedMessage defaultMessage="Clear" />
</AsyncButton>
<AsyncButton
onClick={async () => {
const data = new Uint8Array();
const url = URL.createObjectURL(
new File([data], "snort.db", {
type: "application/octet-stream",
}),
);
const a = document.createElement("a");
a.href = url;
a.download = "snort.db";
a.click();
}}>
<FormattedMessage defaultMessage="Dump" />
</AsyncButton>
{Relay instanceof WorkerRelayInterface && (
<>
<AsyncButton
onClick={async () => {
if (Relay instanceof WorkerRelayInterface) {
await Relay.wipe();
}
}}>
<FormattedMessage defaultMessage="Clear" />
</AsyncButton>
<AsyncButton
onClick={async () => {
const data = Relay instanceof WorkerRelayInterface ? await Relay.dump() : undefined;
if (data) {
const url = URL.createObjectURL(
new File([data], "snort.db", {
type: "application/octet-stream",
}),
);
const a = document.createElement("a");
a.href = url;
a.download = "snort.db";
a.click();
}
}}>
<FormattedMessage defaultMessage="Dump" />
</AsyncButton>
</>
)}
<AsyncButton onClick={() => navigate("/cache-debug")}>
<FormattedMessage defaultMessage="Debug" />
</AsyncButton>
Expand Down
6 changes: 3 additions & 3 deletions packages/system/src/cache-relay.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { NostrEvent, OkResponse, ReqCommand } from "./nostr";
import { OkResponse, ReqCommand, TaggedNostrEvent } from "./nostr";

/**
* A cache relay is an always available local (local network / browser worker) relay
Expand All @@ -8,12 +8,12 @@ export interface CacheRelay {
/**
* Write event to cache relay
*/
event(ev: NostrEvent): Promise<OkResponse>;
event(ev: TaggedNostrEvent): Promise<OkResponse>;

/**
* Read event from cache relay
*/
query(req: ReqCommand): Promise<Array<NostrEvent>>;
query(req: ReqCommand): Promise<Array<TaggedNostrEvent>>;

/**
* Delete events by filter
Expand Down
2 changes: 1 addition & 1 deletion packages/system/src/query-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ export class QueryManager extends EventEmitter<QueryManagerEvents> {
// fetch results from cache first, flag qSend for sync
if (this.#system.cacheRelay) {
const data = await this.#system.cacheRelay.query(["REQ", q.id, ...filters]);
syncFrom = data;
if (data.length > 0) {
syncFrom = data.map(a => ({ ...a, relays: [] }));
this.#log("Adding from cache %s %O", q.id, data);
q.feed.add(syncFrom);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/worker-relay/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@snort/worker-relay",
"version": "1.2.0",
"version": "1.3.0",
"description": "A nostr relay in a service worker",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
4 changes: 4 additions & 0 deletions packages/worker-relay/src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ export class WorkerRelayInterface {
return await this.#workerRpc<void, Uint8Array>("dumpDb");
}

async wipe() {
return await this.#workerRpc<void, boolean>("wipe");
}

async forYouFeed(pubkey: string) {
return await this.#workerRpc<string, Array<NostrEvent>>("forYouFeed", pubkey);
}
Expand Down
8 changes: 7 additions & 1 deletion packages/worker-relay/src/memory-relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,19 @@ export class InMemoryRelay extends EventEmitter<RelayHandlerEvents> implements R
}

dump(): Promise<Uint8Array> {
return Promise.resolve(new Uint8Array());
const enc = new TextEncoder();
return Promise.resolve(enc.encode(JSON.stringify(this.#events.values())));
}

close(): void {
// nothing
}

wipe() {
this.#events = new Map();
return Promise.resolve();
}

event(ev: NostrEvent) {
if (this.#events.has(ev.id)) return false;
this.#events.set(ev.id, ev);
Expand Down
10 changes: 10 additions & 0 deletions packages/worker-relay/src/sqlite/migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const migrations = [
{ version: 3, script: migrate_v3 },
{ version: 4, script: migrate_v4 },
{ version: 5, script: migrate_v5 },
{ version: 6, script: migrate_v6 },
];

async function migrate(relay: SqliteRelay) {
Expand Down Expand Up @@ -103,4 +104,13 @@ async function migrate_v5(relay: SqliteRelay) {
});
}

async function migrate_v6(relay: SqliteRelay) {
relay.db?.transaction(db => {
db.exec("ALTER TABLE events ADD COLUMN relays TEXT");
db.exec("insert into __migration values(6, ?)", {
bind: [new Date().getTime() / 1000],
});
});
}

export default migrate;
82 changes: 60 additions & 22 deletions packages/worker-relay/src/sqlite/sqlite-relay.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import sqlite3InitModule, { Database, Sqlite3Static } from "@sqlite.org/sqlite-wasm";
import sqlite3InitModule, { Database, SAHPoolUtil, Sqlite3Static } from "@sqlite.org/sqlite-wasm";
import { EventEmitter } from "eventemitter3";
import { EventMetadata, NostrEvent, RelayHandler, RelayHandlerEvents, ReqFilter, unixNowMs } from "../types";
import migrate from "./migrations";
Expand All @@ -12,6 +12,7 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
#sqlite?: Sqlite3Static;
#log = (msg: string, ...args: Array<any>) => debugLog("SqliteRelay", msg, ...args);
db?: Database;
#pool?: SAHPoolUtil;
#seenInserts = new Set<string>();

/**
Expand Down Expand Up @@ -45,15 +46,28 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
if (!this.#sqlite) throw new Error("Must call init first");
if (this.db) return;

const pool = await this.#sqlite.installOpfsSAHPoolVfs({});
this.db = new pool.OpfsSAHPoolDb(path);
this.#pool = await this.#sqlite.installOpfsSAHPoolVfs({});
this.db = new this.#pool.OpfsSAHPoolDb(path);
this.#log(`Opened ${this.db.filename}`);
/*this.db.exec(
`PRAGMA cache_size=${32 * 1024
}; PRAGMA page_size=8192; PRAGMA journal_mode=MEMORY; PRAGMA temp_store=MEMORY;`,
);*/
}

/**
* Delete all data
*/
async wipe() {
if (this.#pool && this.db) {
const dbName = this.db.filename;
this.close();
await this.#pool.wipeFiles();
await this.#open(dbName);
await migrate(this);
}
}

close() {
this.db?.close();
this.db = undefined;
Expand Down Expand Up @@ -157,8 +171,15 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
this.#deleteById(db, oldEvents);
}
}
db.exec("insert or ignore into events(id, pubkey, created, kind, json) values(?,?,?,?,?)", {
bind: [ev.id, ev.pubkey, ev.created_at, ev.kind, JSON.stringify(ev)],

// remove relays from event json
const evInsert = {
...ev,
} as NostrEvent;
delete evInsert["relays"];

db.exec("insert or ignore into events(id, pubkey, created, kind, json, relays) values(?,?,?,?,?,?)", {
bind: [ev.id, ev.pubkey, ev.created_at, ev.kind, JSON.stringify(evInsert), (ev.relays ?? []).join(",")],
});
const insertedEvents = db.changes();
if (insertedEvents > 0) {
Expand All @@ -169,12 +190,33 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
}
this.insertIntoSearchIndex(db, ev);
} else {
this.#updateRelays(db, ev);
return 0;
}
this.#seenInserts.add(ev.id);
return insertedEvents;
}

/**
* Append relays
*/
#updateRelays(db: Database, ev: NostrEvent) {
const relays = db.selectArrays("select relays from events where id = ?", [ev.id]);
const oldRelays = new Set((relays?.at(0)?.at(0) as string | null)?.split(",") ?? []);
let hasNew = false;
for (const r of ev.relays ?? []) {
if (!oldRelays.has(r)) {
oldRelays.add(r);
hasNew = true;
}
}
if (hasNew) {
db.exec("update events set relays = ? where id = ?", {
bind: [[...oldRelays].join(","), ev.id],
});
}
}

/**
* Query relay by nostr filter
*/
Expand All @@ -188,7 +230,11 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
if (req.ids_only === true) {
return a[0] as string;
}
return JSON.parse(a[0] as string) as NostrEvent;
const ev = JSON.parse(a[0] as string) as NostrEvent;
return {
...ev,
relays: (a[1] as string | null)?.split(","),
};
}) ?? [];
const time = unixNowMs() - start;
this.#log(`Query ${id} results took ${time.toLocaleString()}ms`);
Expand Down Expand Up @@ -252,22 +298,14 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
*/
async dump() {
const filePath = String(this.db?.filename ?? "");
try {
this.db?.close();
this.db = undefined;
const dir = await navigator.storage.getDirectory();
// @ts-expect-error
for await (const [name, file] of dir) {
if (`/${name}` === filePath) {
const fh = await (file as FileSystemFileHandle).getFile();
const ret = new Uint8Array(await fh.arrayBuffer());
return ret;
}
if (this.db && this.#pool) {
try {
return await this.#pool.exportFile(`/${filePath}`);
} catch (e) {
console.error(e);
} finally {
await this.#open(filePath);
}
} catch (e) {
console.error(e);
} finally {
await this.#open(filePath);
}
return new Uint8Array();
}
Expand All @@ -276,7 +314,7 @@ export class SqliteRelay extends EventEmitter<RelayHandlerEvents> implements Rel
const conditions: Array<string> = [];
const params: Array<any> = [];

let resultType = "json";
let resultType = "json,relays";
if (count) {
resultType = "count(json)";
} else if (req.ids_only === true) {
Expand Down
4 changes: 3 additions & 1 deletion packages/worker-relay/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ export type WorkerMessageCommand =
| "forYouFeed"
| "setEventMetadata"
| "debug"
| "delete";
| "delete"
| "wipe";

export interface WorkerMessage<T> {
id: string;
Expand Down Expand Up @@ -73,6 +74,7 @@ export interface RelayHandler extends EventEmitter<RelayHandlerEvents> {
dump(): Promise<Uint8Array>;
delete(req: ReqFilter): Array<string>;
setEventMetadata(id: string, meta: EventMetadata): void;
wipe(): Promise<void>;
}

export interface RelayHandlerEvents {
Expand Down
5 changes: 5 additions & 0 deletions packages/worker-relay/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ const handleMsg = async (port: MessagePort | DedicatedWorkerGlobalScope, ev: Mes
reply(msg.id, res);
break;
}
case "wipe": {
await relay!.wipe();
reply(msg.id, true);
break;
}
case "forYouFeed": {
const res = await getForYouFeed(relay!, msg.args as string);
reply(msg.id, res);
Expand Down

0 comments on commit 21e88b0

Please sign in to comment.