diff --git a/Containerfile b/Containerfile index c2735feb..73c9285f 100644 --- a/Containerfile +++ b/Containerfile @@ -17,7 +17,7 @@ RUN pip3 install anthropic boto3 cohere openai google-cloud-aiplatform ollama go langchain langchain-core langchain-huggingface langchain-text-splitters \ langchain-community pymilvus sentence-transformers transformers \ huggingface-hub pulsar-client cassandra-driver pyyaml \ - neo4j tiktoken && \ + neo4j tiktoken falkordb && \ pip3 cache purge # ---------------------------------------------------------------------------- diff --git a/Makefile b/Makefile index 72d144a9..67094a90 100644 --- a/Makefile +++ b/Makefile @@ -62,7 +62,7 @@ TEMPLATES=azure bedrock claude cohere mix llamafile ollama openai vertexai \ DCS=$(foreach template,${TEMPLATES},${template:%=tg-launch-%.yaml}) MODELS=azure bedrock claude cohere llamafile ollama openai vertexai -GRAPHS=cassandra neo4j +GRAPHS=cassandra neo4j falkordb # tg-launch-%.yaml: templates/%.jsonnet templates/components/version.jsonnet # jsonnet -Jtemplates \ diff --git a/docs/README.quickstart-docker-compose.md b/docs/README.quickstart-docker-compose.md index a81da9bc..12cb8cf0 100644 --- a/docs/README.quickstart-docker-compose.md +++ b/docs/README.quickstart-docker-compose.md @@ -13,7 +13,7 @@ > [!TIP] > If using `Podman`, the only change will be to substitute `podman` instead of `docker` in all commands. -All `TrustGraph` components are deployed through a `Docker Compose` file. There are **16** `Docker Compose` files to choose from, depending on the desired model deployment and choosing between the graph stores `Cassandra` or `Neo4j`: +All `TrustGraph` components are deployed through a `Docker Compose` file. There are **16** `Docker Compose` files to choose from, depending on the desired model deployment and choosing between the graph stores `Cassandra` or `Neo4j` or `FalkorDB`: - `AzureAI` serverless endpoint for deployed models in Azure - `Bedrock` API for models deployed in AWS Bedrock diff --git a/templates/all-patterns.jsonnet b/templates/all-patterns.jsonnet index 47622939..f68f307d 100644 --- a/templates/all-patterns.jsonnet +++ b/templates/all-patterns.jsonnet @@ -5,6 +5,7 @@ import "patterns/grafana.jsonnet", import "patterns/triple-store-cassandra.jsonnet", import "patterns/triple-store-neo4j.jsonnet", + import "patterns/triple-store-falkordb.jsonnet", import "patterns/graph-rag.jsonnet", import "patterns/llm-azure.jsonnet", import "patterns/llm-azure-openai.jsonnet", diff --git a/templates/components.jsonnet b/templates/components.jsonnet index 1abf44a4..d3a4a112 100644 --- a/templates/components.jsonnet +++ b/templates/components.jsonnet @@ -12,6 +12,7 @@ "graph-rag": import "components/graph-rag.jsonnet", "triple-store-cassandra": import "components/cassandra.jsonnet", "triple-store-neo4j": import "components/neo4j.jsonnet", + "triple-store-falkordb": import "components/falkordb.jsonnet", "triple-store-memgraph": import "components/memgraph.jsonnet", "llamafile": import "components/llamafile.jsonnet", "ollama": import "components/ollama.jsonnet", @@ -39,6 +40,7 @@ "qdrant": import "components/qdrant.jsonnet", "pinecone": import "components/pinecone.jsonnet", "milvus": import "components/milvus.jsonnet", + "falkordb": import "components/falkordb.jsonnet", "trustgraph": import "components/trustgraph.jsonnet", } diff --git a/templates/components/falkordb.jsonnet b/templates/components/falkordb.jsonnet new file mode 100644 index 00000000..e238cebe --- /dev/null +++ b/templates/components/falkordb.jsonnet @@ -0,0 +1,76 @@ +local base = import "base/base.jsonnet"; +local images = import "values/images.jsonnet"; +local url = import "values/url.jsonnet"; +local falkordb = import "stores/falkordb.jsonnet"; + +falkordb + { + + "falkordb-url":: "falkor://falkordb:6379", + + "store-triples" +: { + + create:: function(engine) + + local container = + engine.container("store-triples") + .with_image(images.trustgraph) + .with_command([ + "triples-write-falkordb", + "-p", + url.pulsar, + "-g", + $["falkordb-url"], + ]) + .with_limits("0.5", "128M") + .with_reservations("0.1", "128M"); + + local containerSet = engine.containers( + "store-triples", [ container ] + ); + + local service = + engine.internalService(containerSet) + .with_port(8080, 8080, "metrics"); + + engine.resources([ + containerSet, + service, + ]) + + }, + + "query-triples" +: { + + create:: function(engine) + + local container = + engine.container("query-triples") + .with_image(images.trustgraph) + .with_command([ + "triples-query-falkordb", + "-p", + url.pulsar, + "-g", + $["falkordb-url"], + ]) + .with_limits("0.5", "128M") + .with_reservations("0.1", "128M"); + + local containerSet = engine.containers( + "query-triples", [ container ] + ); + + local service = + engine.internalService(containerSet) + .with_port(8080, 8080, "metrics"); + + engine.resources([ + containerSet, + service, + ]) + + + } + +} + diff --git a/templates/generate-all b/templates/generate-all index 0b403620..70aa6436 100755 --- a/templates/generate-all +++ b/templates/generate-all @@ -125,7 +125,7 @@ def generate_all(output, version): "azure", "azure-openai", "bedrock", "claude", "cohere", "googleaistudio", "llamafile", "ollama", "openai", "vertexai", ]: - for graph in [ "cassandra", "neo4j" ]: + for graph in [ "cassandra", "neo4j", "falkordb" ]: y = generate_config( llm=model, graph_store=graph, platform=platform, diff --git a/templates/patterns/triple-store-falkordb.jsonnet b/templates/patterns/triple-store-falkordb.jsonnet new file mode 100644 index 00000000..40ef02e2 --- /dev/null +++ b/templates/patterns/triple-store-falkordb.jsonnet @@ -0,0 +1,13 @@ +{ + pattern: { + name: "triple-store-falkordb", + icon: "🖇️🙋‍♀️", + title: "Adds a FalkorDB store configured to act as a triple store.", + description: "GraphRAG processing needs a triple store. This pattern adds a FalkorDB store, along with plumbing so that FalkorDB is integrated with GraphRag indexing and querying.", + requires: ["pulsar", "trustgraph"], + features: ["falkordb", "triple-store"], + args: [], + category: [ "knowledge-graph" ], + }, + module: "components/falkordb.jsonnet", +} diff --git a/templates/patterns/triple-store-neo4j.jsonnet b/templates/patterns/triple-store-neo4j.jsonnet index b8a93e31..b111ebe3 100644 --- a/templates/patterns/triple-store-neo4j.jsonnet +++ b/templates/patterns/triple-store-neo4j.jsonnet @@ -3,7 +3,7 @@ name: "triple-store-neo4j", icon: "🖇️🙋‍♀️", title: "Adds a Neo4j store configured to act as a triple store.", - description: "GraphRAG processing needs a triple store. This pattern adds a Cassandra store, along with plumbing so that Cassandra is integrated with GraphRag indexing and querying.", + description: "GraphRAG processing needs a triple store. This pattern adds a Neo4j store, along with plumbing so that Neo4j is integrated with GraphRag indexing and querying.", requires: ["pulsar", "trustgraph"], features: ["neo4j", "triple-store"], args: [], diff --git a/templates/stores/falkordb.jsonnet b/templates/stores/falkordb.jsonnet new file mode 100644 index 00000000..1c7924a9 --- /dev/null +++ b/templates/stores/falkordb.jsonnet @@ -0,0 +1,39 @@ +local base = import "base/base.jsonnet"; +local images = import "values/images.jsonnet"; + +{ + + "falkordb" +: { + + create:: function(engine) + + local vol = engine.volume("falkordb").with_size("20G"); + + local container = + engine.container("falkordb") + .with_image(images.falkordb) + .with_limits("1.0", "768M") + .with_reservations("0.5", "768M") + .with_port(6379, 6379, "api") + .with_port(3000, 3000, "ui") + .with_volume_mount(vol, "/data"); + + local containerSet = engine.containers( + "falkordb", [ container ] + ); + + local service = + engine.service(containerSet) + .with_port(6379, 6379, "api") + .with_port(3000, 3000, "ui"); + + engine.resources([ + vol, + containerSet, + service, + ]) + + }, + +} + diff --git a/templates/values/images.jsonnet b/templates/values/images.jsonnet index c583815b..7a4ddba7 100644 --- a/templates/values/images.jsonnet +++ b/templates/values/images.jsonnet @@ -13,4 +13,5 @@ local version = import "version.jsonnet"; qdrant: "docker.io/qdrant/qdrant:v1.11.1", memgraph_mage: "docker.io/memgraph/memgraph-mage:1.22-memgraph-2.22", memgraph_lab: "docker.io/memgraph/lab:2.19.1", + falkordb: "falkordb/falkordb:latest" } diff --git a/trustgraph-flow/scripts/triples-query-falkordb b/trustgraph-flow/scripts/triples-query-falkordb new file mode 100755 index 00000000..7f9ab74c --- /dev/null +++ b/trustgraph-flow/scripts/triples-query-falkordb @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.query.triples.falkordb import run + +run() + diff --git a/trustgraph-flow/scripts/triples-write-falkordb b/trustgraph-flow/scripts/triples-write-falkordb new file mode 100755 index 00000000..916ee352 --- /dev/null +++ b/trustgraph-flow/scripts/triples-write-falkordb @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from trustgraph.storage.triples.falkordb import run + +run() + diff --git a/trustgraph-flow/setup.py b/trustgraph-flow/setup.py index c53f96e7..3de42f9f 100644 --- a/trustgraph-flow/setup.py +++ b/trustgraph-flow/setup.py @@ -60,6 +60,7 @@ "jsonschema", "aiohttp", "pinecone[grpc]", + "falkordb", ], scripts=[ "scripts/api-gateway", @@ -104,9 +105,11 @@ "scripts/triples-query-cassandra", "scripts/triples-query-neo4j", "scripts/triples-query-memgraph", + "scripts/triples-query-falkordb", "scripts/triples-write-cassandra", "scripts/triples-write-neo4j", "scripts/triples-write-memgraph", + "scripts/triples-write-falkordb", "scripts/wikipedia-lookup", ] ) diff --git a/trustgraph-flow/trustgraph/query/triples/falkordb/__init__.py b/trustgraph-flow/trustgraph/query/triples/falkordb/__init__.py new file mode 100644 index 00000000..ba844705 --- /dev/null +++ b/trustgraph-flow/trustgraph/query/triples/falkordb/__init__.py @@ -0,0 +1,3 @@ + +from . service import * + diff --git a/trustgraph-flow/trustgraph/query/triples/falkordb/__main__.py b/trustgraph-flow/trustgraph/query/triples/falkordb/__main__.py new file mode 100755 index 00000000..89684e3e --- /dev/null +++ b/trustgraph-flow/trustgraph/query/triples/falkordb/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . hf import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph-flow/trustgraph/query/triples/falkordb/service.py b/trustgraph-flow/trustgraph/query/triples/falkordb/service.py new file mode 100755 index 00000000..43083832 --- /dev/null +++ b/trustgraph-flow/trustgraph/query/triples/falkordb/service.py @@ -0,0 +1,341 @@ + +""" +Triples query service for FalkorDB. +Input is a (s, p, o) triple, some values may be null. Output is a list of +triples. +""" + +from falkordb import FalkorDB + +from .... schema import TriplesQueryRequest, TriplesQueryResponse, Error +from .... schema import Value, Triple +from .... schema import triples_request_queue +from .... schema import triples_response_queue +from .... base import ConsumerProducer + +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = triples_request_queue +default_output_queue = triples_response_queue +default_subscriber = module + +default_graph_url = 'falkor://falkordb:6379' +default_database = 'falkordb' + +class Processor(ConsumerProducer): + + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + output_queue = params.get("output_queue", default_output_queue) + subscriber = params.get("subscriber", default_subscriber) + graph_url = params.get("graph_host", default_graph_url) + database = params.get("database", default_database) + + super(Processor, self).__init__( + **params | { + "input_queue": input_queue, + "output_queue": output_queue, + "subscriber": subscriber, + "input_schema": TriplesQueryRequest, + "output_schema": TriplesQueryResponse, + "graph_url": graph_url, + } + ) + + self.db = database + + self.io = FalkorDB.from_url(graph_url).select_graph(database) + + def create_value(self, ent): + + if ent.startswith("http://") or ent.startswith("https://"): + return Value(value=ent, is_uri=True) + else: + return Value(value=ent, is_uri=False) + + def handle(self, msg): + + try: + + v = msg.value() + + # Sender-produced ID + id = msg.properties()["id"] + + print(f"Handling input {id}...", flush=True) + + triples = [] + + if v.s is not None: + if v.p is not None: + if v.o is not None: + + # SPO + + records = self.io.query( + "MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal {value: $value}) " + "RETURN $src as src", + src=v.s.value, rel=v.p.value, value=v.o.value, + database_=self.db, + ).result_set + + for rec in records: + triples.append((v.s.value, v.p.value, v.o.value)) + + records = self.io.query( + "MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node {uri: $uri}) " + "RETURN $src as src", + src=v.s.value, rel=v.p.value, uri=v.o.value, + database_=self.db, + ).result_set + + for rec in records: + triples.append((v.s.value, v.p.value, v.o.value)) + + else: + + # SP + + records = self.io.query( + "MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Literal) " + "RETURN dest.value as dest", + src=v.s.value, rel=v.p.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((v.s.value, v.p.value, data["dest"])) + + records = self.io.query( + "MATCH (src:Node {uri: $src})-[rel:Rel {uri: $rel}]->(dest:Node) " + "RETURN dest.uri as dest", + src=v.s.value, rel=v.p.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((v.s.value, v.p.value, data["dest"])) + + else: + + if v.o is not None: + + # SO + + records = self.io.query( + "MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal {value: $value}) " + "RETURN rel.uri as rel", + src=v.s.value, value=v.o.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((v.s.value, data["rel"], v.o.value)) + + records = self.io.query( + "MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node {uri: $uri}) " + "RETURN rel.uri as rel", + src=v.s.value, uri=v.o.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((v.s.value, data["rel"], v.o.value)) + + else: + + # S + + records = self.io.query( + "MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Literal) " + "RETURN rel.uri as rel, dest.value as dest", + src=v.s.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((v.s.value, data["rel"], data["dest"])) + + records = self.io.query( + "MATCH (src:Node {uri: $src})-[rel:Rel]->(dest:Node) " + "RETURN rel.uri as rel, dest.uri as dest", + src=v.s.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((v.s.value, data["rel"], data["dest"])) + + + else: + + if v.p is not None: + + if v.o is not None: + + # PO + + records = self.io.query( + "MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal {value: $value}) " + "RETURN src.uri as src", + uri=v.p.value, value=v.o.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((data["src"], v.p.value, v.o.value)) + + records = self.io.query( + "MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node {uri: $uri}) " + "RETURN src.uri as src", + uri=v.p.value, dest=v.o.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((data["src"], v.p.value, v.o.value)) + + else: + + # P + + records = self.io.query( + "MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Literal) " + "RETURN src.uri as src, dest.value as dest", + uri=v.p.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((data["src"], v.p.value, data["dest"])) + + records = self.io.query( + "MATCH (src:Node)-[rel:Rel {uri: $uri}]->(dest:Node) " + "RETURN src.uri as src, dest.uri as dest", + uri=v.p.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((data["src"], v.p.value, data["dest"])) + + else: + + if v.o is not None: + + # O + + records = self.io.query( + "MATCH (src:Node)-[rel:Rel]->(dest:Literal {value: $value}) " + "RETURN src.uri as src, rel.uri as rel", + value=v.o.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((data["src"], data["rel"], v.o.value)) + + records = self.io.query( + "MATCH (src:Node)-[rel:Rel]->(dest:Node {uri: $uri}) " + "RETURN src.uri as src, rel.uri as rel", + uri=v.o.value, + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((data["src"], data["rel"], v.o.value)) + + else: + + # * + + records = self.io.query( + "MATCH (src:Node)-[rel:Rel]->(dest:Literal) " + "RETURN src.uri as src, rel.uri as rel, dest.value as dest", + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((data["src"], data["rel"], data["dest"])) + + records = self.io.query( + "MATCH (src:Node)-[rel:Rel]->(dest:Node) " + "RETURN src.uri as src, rel.uri as rel, dest.uri as dest", + database_=self.db, + ).result_set + + for rec in records: + data = rec.data() + triples.append((data["src"], data["rel"], data["dest"])) + + triples = [ + Triple( + s=self.create_value(t[0]), + p=self.create_value(t[1]), + o=self.create_value(t[2]) + ) + for t in triples + ] + + print("Send response...", flush=True) + r = TriplesQueryResponse(triples=triples, error=None) + self.producer.send(r, properties={"id": id}) + + print("Done.", flush=True) + + except Exception as e: + + print(f"Exception: {e}") + + print("Send error response...", flush=True) + + r = TriplesQueryResponse( + error=Error( + type = "llm-error", + message = str(e), + ), + response=None, + ) + + self.producer.send(r, properties={"id": id}) + + self.consumer.acknowledge(msg) + + @staticmethod + def add_args(parser): + + ConsumerProducer.add_args( + parser, default_input_queue, default_subscriber, + default_output_queue, + ) + + parser.add_argument( + '-g', '--graph-url', + default=default_graph_url, + help=f'Graph url (default: {default_graph_url})' + ) + + parser.add_argument( + '--database', + default=default_database, + help=f'FalkorDB database (default: {default_database})' + ) + +def run(): + + Processor.start(module, __doc__) + diff --git a/trustgraph-flow/trustgraph/storage/triples/falkordb/__init__.py b/trustgraph-flow/trustgraph/storage/triples/falkordb/__init__.py new file mode 100644 index 00000000..d891d55f --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/triples/falkordb/__init__.py @@ -0,0 +1,3 @@ + +from . write import * + diff --git a/trustgraph-flow/trustgraph/storage/triples/falkordb/__main__.py b/trustgraph-flow/trustgraph/storage/triples/falkordb/__main__.py new file mode 100755 index 00000000..c05d8c6d --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/triples/falkordb/__main__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +from . write import run + +if __name__ == '__main__': + run() + diff --git a/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py b/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py new file mode 100755 index 00000000..9fb1e0ff --- /dev/null +++ b/trustgraph-flow/trustgraph/storage/triples/falkordb/write.py @@ -0,0 +1,150 @@ + +""" +Graph writer. Input is graph edge. Writes edges to FalkorDB graph. +""" + +import pulsar +import base64 +import os +import argparse +import time + +from falkordb import FalkorDB + +from .... schema import Triples +from .... schema import triples_store_queue +from .... log_level import LogLevel +from .... base import Consumer + +module = ".".join(__name__.split(".")[1:-1]) + +default_input_queue = triples_store_queue +default_subscriber = module + +default_graph_url = 'falkor://falkordb:6379' +default_database = 'falkordb' + +class Processor(Consumer): + + def __init__(self, **params): + + input_queue = params.get("input_queue", default_input_queue) + subscriber = params.get("subscriber", default_subscriber) + graph_url = params.get("graph_host", default_graph_url) + database = params.get("database", default_database) + + super(Processor, self).__init__( + **params | { + "input_queue": input_queue, + "subscriber": subscriber, + "input_schema": Triples, + "graph_url": graph_url, + } + ) + + self.db = database + + self.io = FalkorDB.from_url(graph_url).select_graph(database) + + def create_node(self, uri): + + print("Create node", uri) + + res = self.io.query( + "MERGE (n:Node {uri: $uri})", + uri=uri, + database_=self.db, + ) + + print("Created {nodes_created} nodes in {time} ms.".format( + nodes_created=res.nodes_created, + time=res.run_time_ms + )) + + def create_literal(self, value): + + print("Create literal", value) + + res = self.io.query( + "MERGE (n:Literal {value: $value})", + value=value, + database_=self.db, + ) + + print("Created {nodes_created} nodes in {time} ms.".format( + nodes_created=res.nodes_created, + time=res.run_time_ms + )) + + def relate_node(self, src, uri, dest): + + print("Create node rel", src, uri, dest) + + res = self.io.query( + "MATCH (src:Node {uri: $src}) " + "MATCH (dest:Node {uri: $dest}) " + "MERGE (src)-[:Rel {uri: $uri}]->(dest)", + src=src, dest=dest, uri=uri, + database_=self.db, + ) + + print("Created {nodes_created} nodes in {time} ms.".format( + nodes_created=res.nodes_created, + time=res.run_time_ms + )) + + def relate_literal(self, src, uri, dest): + + print("Create literal rel", src, uri, dest) + + res = self.io.query( + "MATCH (src:Node {uri: $src}) " + "MATCH (dest:Literal {value: $dest}) " + "MERGE (src)-[:Rel {uri: $uri}]->(dest)", + src=src, dest=dest, uri=uri, + database_=self.db, + ) + + print("Created {nodes_created} nodes in {time} ms.".format( + nodes_created=res.nodes_created, + time=res.run_time_ms + )) + + def handle(self, msg): + + v = msg.value() + + for t in v.triples: + + self.create_node(t.s.value) + + if t.o.is_uri: + self.create_node(t.o.value) + self.relate_node(t.s.value, t.p.value, t.o.value) + else: + self.create_literal(t.o.value) + self.relate_literal(t.s.value, t.p.value, t.o.value) + + @staticmethod + def add_args(parser): + + Consumer.add_args( + parser, default_input_queue, default_subscriber, + ) + + parser.add_argument( + '-g', '--graph_host', + default=default_graph_url, + help=f'Graph host (default: {default_graph_url})' + ) + + parser.add_argument( + '--database', + default=default_database, + help=f'FalkorDB database (default: {default_database})' + ) + +def run(): + + Processor.start(module, __doc__) + diff --git a/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py b/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py index 929333e5..1aa25aa8 100755 --- a/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py +++ b/trustgraph-flow/trustgraph/storage/triples/neo4j/write.py @@ -1,6 +1,6 @@ """ -Graph writer. Input is graph edge. Writes edges to Cassandra graph. +Graph writer. Input is graph edge. Writes edges to Neo4j graph. """ import pulsar