Skip to content

Commit

Permalink
Fix schema match (#131)
Browse files Browse the repository at this point in the history
  • Loading branch information
cybermaggedon authored Oct 31, 2024
1 parent 24d0997 commit bc1b38c
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import argparse
import time

from .... schema import Triple
from .... schema import Triples
from .... schema import triples_store_queue
from .... base import Consumer

Expand Down Expand Up @@ -38,7 +38,7 @@ def __init__(self, **params):
**params | {
"input_queue": input_queue,
"subscriber": subscriber,
"input_schema": Triple,
"input_schema": Triples,
}
)

Expand All @@ -51,7 +51,9 @@ def __del__(self):
def handle(self, msg):

v = msg.value()
self.writer.write(v.s.value, v.p.value, v.o.value)

for t in v.triples:
self.writer.write(t.s.value, t.p.value, t.o.value)

@staticmethod
def add_args(parser):
Expand Down

0 comments on commit bc1b38c

Please sign in to comment.