Skip to content

Commit

Permalink
Fix API gateway integration, added to templates (#165)
Browse files Browse the repository at this point in the history
  • Loading branch information
cybermaggedon authored Nov 20, 2024
1 parent 92b8444 commit ba6d6c1
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 24 deletions.
42 changes: 42 additions & 0 deletions templates/components/trustgraph.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,51 @@ local prompt = import "prompt-template.jsonnet";

{

"api-gateway-port":: 8088,
"api-gateway-timeout":: 600,

"chunk-size":: 250,
"chunk-overlap":: 15,

"api-gateway" +: {

create:: function(engine)

local port = $["api-gateway-port"];

local container =
engine.container("api-gateway")
.with_image(images.trustgraph)
.with_command([
"api-gateway",
"-p",
url.pulsar,
"--timeout",
std.toString($["api-gateway-timeout"]),
"--port",
std.toString(port),
])
.with_limits("0.5", "256M")
.with_reservations("0.1", "256M")
.with_port(8000, 8000, "metrics")
.with_port(port, port, "api");

local containerSet = engine.containers(
"api-gateway", [ container ]
);

local service =
engine.internalService(containerSet)
.with_port(8000, 8000, "metrics")
.with_port(port, port, "api");

engine.resources([
containerSet,
service,
])

},

"chunker" +: {

create:: function(engine)
Expand Down
61 changes: 37 additions & 24 deletions trustgraph-flow/scripts/api-gateway
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
#!/usr/bin/env python3

# FIXME: Subscribes to Pulsar unnecessarily, should only do it when there
# are active listeners

# FIXME: Connection errors in publishers / subscribers cause those threads
# to fail and are not failed or retried

import asyncio
from aiohttp import web
import json
import logging
import uuid
import os

import pulsar
from pulsar.asyncio import Client
Expand Down Expand Up @@ -42,7 +49,7 @@ from trustgraph.schema import embeddings_response_queue
logger = logging.getLogger("api")
logger.setLevel(logging.INFO)

pulsar_host = "pulsar://localhost:6650"
pulsar_host = os.getenv("PULSAR_HOST", "pulsar://pulsar:6650")
TIME_OUT = 600

class Publisher:
Expand All @@ -54,15 +61,18 @@ class Publisher:
self.q = asyncio.Queue(maxsize=max_size)

async def run(self):
async with aiopulsar.connect(self.pulsar_host) as client:
async with client.create_producer(
topic=self.topic,
schema=self.schema,
) as producer:
while True:
id, item = await self.q.get()
await producer.send(item, { "id": id })
# print("message out")
try:
async with aiopulsar.connect(self.pulsar_host) as client:
async with client.create_producer(
topic=self.topic,
schema=self.schema,
) as producer:
while True:
id, item = await self.q.get()
await producer.send(item, { "id": id })
# print("message out")
except Exception as e:
print("Exception:", e, flush=True)

async def send(self, id, msg):
await self.q.put((id, msg))
Expand All @@ -79,20 +89,23 @@ class Subscriber:
self.q = {}

async def run(self):
async with aiopulsar.connect(pulsar_host) as client:
async with client.subscribe(
topic=self.topic,
subscription_name=self.subscription,
consumer_name=self.consumer_name,
schema=self.schema,
) as consumer:
while True:
msg = await consumer.receive()
# print("message in", self.topic)
id = msg.properties()["id"]
value = msg.value()
if id in self.q:
await self.q[id].put(value)
try:
async with aiopulsar.connect(pulsar_host) as client:
async with client.subscribe(
topic=self.topic,
subscription_name=self.subscription,
consumer_name=self.consumer_name,
schema=self.schema,
) as consumer:
while True:
msg = await consumer.receive()
# print("message in", self.topic)
id = msg.properties()["id"]
value = msg.value()
if id in self.q:
await self.q[id].put(value)
except Exception as e:
print("Exception:", e, flush=True)

async def subscribe(self, id):
q = asyncio.Queue()
Expand Down
2 changes: 2 additions & 0 deletions trustgraph-flow/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@
"ibis",
"jsonschema",
"aiohttp",
"aiopulsar-py",
],
scripts=[
"scripts/api-gateway",
"scripts/agent-manager-react",
"scripts/chunker-recursive",
"scripts/chunker-token",
Expand Down

0 comments on commit ba6d6c1

Please sign in to comment.