Skip to content

Commit

Permalink
Feature/pinecone integration (#170)
Browse files Browse the repository at this point in the history
* Added Pinecone for GE write & query

* Add templates

* Doc embedding support
  • Loading branch information
cybermaggedon authored Nov 22, 2024
1 parent ae1264f commit 319f9ac
Show file tree
Hide file tree
Showing 17 changed files with 842 additions and 0 deletions.
2 changes: 2 additions & 0 deletions templates/components.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"trustgraph-base": import "components/trustgraph.jsonnet",
"vector-store-milvus": import "components/milvus.jsonnet",
"vector-store-qdrant": import "components/qdrant.jsonnet",
"vector-store-pinecone": import "components/pinecone.jsonnet",
"vertexai": import "components/vertexai.jsonnet",
"null": {},

Expand All @@ -34,6 +35,7 @@
"cassandra": import "components/cassandra.jsonnet",
"neo4j": import "components/neo4j.jsonnet",
"qdrant": import "components/qdrant.jsonnet",
"pinecone": import "components/pinecone.jsonnet",
"milvus": import "components/milvus.jsonnet",
"trustgraph": import "components/trustgraph.jsonnet",

Expand Down
153 changes: 153 additions & 0 deletions templates/components/pinecone.jsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
local base = import "base/base.jsonnet";
local images = import "values/images.jsonnet";
local url = import "values/url.jsonnet";
local cassandra_hosts = "cassandra";

{

"pinecone-cloud":: "aws",
"pinecone-region":: "us-east-1",

"store-graph-embeddings" +: {

create:: function(engine)

local envSecrets = engine.envSecrets("pinecone-api-key")
.with_env_var("PINECONE_API_KEY", "pinecone-api-key");

local container =
engine.container("store-graph-embeddings")
.with_image(images.trustgraph)
.with_command([
"ge-write-pinecone",
"-p",
url.pulsar,
])
.with_env_var_secrets(envSecrets)
.with_limits("0.5", "128M")
.with_reservations("0.1", "128M");

local containerSet = engine.containers(
"store-graph-embeddings", [ container ]
);

local service =
engine.internalService(containerSet)
.with_port(8080, 8080, "metrics");

engine.resources([
envSecrets,
containerSet,
service,
])

},

"query-graph-embeddings" +: {

create:: function(engine)

local envSecrets = engine.envSecrets("pinecone-api-key")
.with_env_var("PINECONE_API_KEY", "pinecone-api-key");

local container =
engine.container("query-graph-embeddings")
.with_image(images.trustgraph)
.with_command([
"ge-query-pinecone",
"-p",
url.pulsar,
])
.with_env_var_secrets(envSecrets)
.with_limits("0.5", "128M")
.with_reservations("0.1", "128M");

local containerSet = engine.containers(
"query-graph-embeddings", [ container ]
);

local service =
engine.internalService(containerSet)
.with_port(8080, 8080, "metrics");

engine.resources([
envSecrets,
containerSet,
service,
])

},

"store-doc-embeddings" +: {

create:: function(engine)

local envSecrets = engine.envSecrets("pinecone-api-key")
.with_env_var("PINECONE_API_KEY", "pinecone-api-key");

local container =
engine.container("store-doc-embeddings")
.with_image(images.trustgraph)
.with_command([
"de-write-pinecone",
"-p",
url.pulsar,
])
.with_env_var_secrets(envSecrets)
.with_limits("0.5", "128M")
.with_reservations("0.1", "128M");

local containerSet = engine.containers(
"store-doc-embeddings", [ container ]
);

local service =
engine.internalService(containerSet)
.with_port(8080, 8080, "metrics");

engine.resources([
envSecrets,
containerSet,
service,
])

},

"query-doc-embeddings" +: {

create:: function(engine)

local envSecrets = engine.envSecrets("pinecone-api-key")
.with_env_var("PINECONE_API_KEY", "pinecone-api-key");

local container =
engine.container("query-doc-embeddings")
.with_image(images.trustgraph)
.with_command([
"de-query-pinecone",
"-p",
url.pulsar,
])
.with_env_var_secrets(envSecrets)
.with_limits("0.5", "128M")
.with_reservations("0.1", "128M");

local containerSet = engine.containers(
"query-doc-embeddings", [ container ]
);

local service =
engine.internalService(containerSet)
.with_port(8080, 8080, "metrics");

engine.resources([
envSecrets,
containerSet,
service,
])


}

}

6 changes: 6 additions & 0 deletions trustgraph-flow/scripts/ge-query-pinecone
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env python3

from trustgraph.query.graph_embeddings.pinecone import run

run()

6 changes: 6 additions & 0 deletions trustgraph-flow/scripts/ge-write-pinecone
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env python3

from trustgraph.storage.graph_embeddings.pinecone import run

run()

3 changes: 3 additions & 0 deletions trustgraph-flow/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"jsonschema",
"aiohttp",
"aiopulsar-py",
"pinecone[grpc]",
],
scripts=[
"scripts/api-gateway",
Expand All @@ -74,8 +75,10 @@
"scripts/embeddings-ollama",
"scripts/embeddings-vectorize",
"scripts/ge-query-milvus",
"scripts/ge-query-pinecone",
"scripts/ge-query-qdrant",
"scripts/ge-write-milvus",
"scripts/ge-write-pinecone",
"scripts/ge-write-qdrant",
"scripts/graph-rag",
"scripts/kg-extract-definitions",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

from . service import *

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env python3

from . hf import run

if __name__ == '__main__':
run()

142 changes: 142 additions & 0 deletions trustgraph-flow/trustgraph/query/doc_embeddings/pinecone/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@

"""
Document embeddings query service. Input is vector, output is an array
of chunks. Pinecone implementation.
"""

from pinecone import Pinecone, ServerlessSpec
from pinecone.grpc import PineconeGRPC, GRPCClientConfig

import uuid
import os

from .... schema import DocumentEmbeddingsRequest, DocumentEmbeddingsResponse
from .... schema import Error, Value
from .... schema import document_embeddings_request_queue
from .... schema import document_embeddings_response_queue
from .... base import ConsumerProducer

module = ".".join(__name__.split(".")[1:-1])

default_input_queue = document_embeddings_request_queue
default_output_queue = document_embeddings_response_queue
default_subscriber = module
default_api_key = os.getenv("PINECONE_API_KEY", "not-specified")

class Processor(ConsumerProducer):

def __init__(self, **params):

input_queue = params.get("input_queue", default_input_queue)
output_queue = params.get("output_queue", default_output_queue)
subscriber = params.get("subscriber", default_subscriber)

self.url = params.get("url", None)
self.api_key = params.get("api_key", default_api_key)

if self.url:

self.pinecone = PineconeGRPC(
api_key = self.api_key,
host = self.url
)

else:

self.pinecone = Pinecone(api_key = self.api_key)

super(Processor, self).__init__(
**params | {
"input_queue": input_queue,
"output_queue": output_queue,
"subscriber": subscriber,
"input_schema": DocumentEmbeddingsRequest,
"output_schema": DocumentEmbeddingsResponse,
"url": self.url,
}
)

def handle(self, msg):

try:

v = msg.value()

# Sender-produced ID
id = msg.properties()["id"]

print(f"Handling input {id}...", flush=True)

chunks = []

for vec in v.vectors:

dim = len(vec)

index_name = (
"d-" + v.user + "-" + str(dim)
)

index = self.pinecone.Index(index_name)

results = index.query(
namespace=v.collection,
vector=vec,
top_k=v.limit,
include_values=False,
include_metadata=True
)

search_result = self.client.query_points(
collection_name=collection,
query=vec,
limit=v.limit,
with_payload=True,
).points

for r in results.matches:
doc = r.metadata["doc"]
chunks.add(doc)

print("Send response...", flush=True)
r = DocumentEmbeddingsResponse(documents=chunks, error=None)
self.producer.send(r, properties={"id": id})

print("Done.", flush=True)

except Exception as e:

print(f"Exception: {e}")

print("Send error response...", flush=True)

r = DocumentEmbeddingsResponse(
error=Error(
type = "llm-error",
message = str(e),
),
documents=None,
)

self.producer.send(r, properties={"id": id})

self.consumer.acknowledge(msg)

@staticmethod
def add_args(parser):

ConsumerProducer.add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
)

parser.add_argument(
'-t', '--store-uri',
default=default_store_uri,
help=f'Milvus store URI (default: {default_store_uri})'
)

def run():

Processor.start(module, __doc__)

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

from . service import *

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env python3

from . hf import run

if __name__ == '__main__':
run()

Loading

0 comments on commit 319f9ac

Please sign in to comment.