-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Add schema load util * Added a sample schema turtle file will be useful for future testing and tutorials. * Fixed graph label metadata confusion, was created incorrect subjectOf edges.
- Loading branch information
1 parent
dedb663
commit 24d0997
Showing
6 changed files
with
196 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
@prefix ns1: <http://trustgraph.ai/e/> . | ||
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . | ||
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . | ||
@prefix schema: <https://schema.org/> . | ||
@prefix skos: <http://www.w3.org/2004/02/skos/core#> . | ||
|
||
schema:subjectOf rdfs:label "subject of" . | ||
skos:definition rdfs:label "definition" . | ||
|
||
rdf:type rdfs:label "type" . | ||
|
||
schema:DigitalDocument rdfs:label "digital document" . | ||
schema:Organization rdfs:label "organization" . | ||
schema:PublicationEvent rdfs:label "publication event" . | ||
|
||
schema:copyrightNotice rdfs:label "copyright notice" . | ||
schema:copyrightHolder rdfs:label "copyright holder" . | ||
schema:copyrightYear rdfs:label "copyright year" . | ||
schema:license rdfs:label "license" . | ||
schema:publication rdfs:label "publication" . | ||
schema:startDate rdfs:label "start date" . | ||
schema:endDate rdfs:label "end date" . | ||
schema:publishedBy rdfs:label "published by" . | ||
schema:datePublished rdfs:label "date published" . | ||
schema:publication rdfs:label "publication" . | ||
schema:datePublished rdfs:label "date published" . | ||
schema:url rdfs:label "url" . | ||
schema:identifier rdfs:label "identifier" . | ||
schema:keywords rdfs:label "keyword" . | ||
|
||
skos:definition rdfs:label "definition" . | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
#!/usr/bin/env python3 | ||
|
||
""" | ||
Loads Graph embeddings into TrustGraph processing. | ||
""" | ||
|
||
import pulsar | ||
from pulsar.schema import JsonSchema | ||
from trustgraph.schema import Triples, Triple, Value, Metadata | ||
from trustgraph.schema import triples_store_queue | ||
import argparse | ||
import os | ||
import time | ||
import pyarrow as pa | ||
import rdflib | ||
|
||
from trustgraph.log_level import LogLevel | ||
|
||
default_user = 'trustgraph' | ||
default_collection = 'default' | ||
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://localhost:6650') | ||
default_output_queue = triples_store_queue | ||
|
||
class Loader: | ||
|
||
def __init__( | ||
self, | ||
pulsar_host, | ||
output_queue, | ||
log_level, | ||
files, | ||
user, | ||
collection, | ||
): | ||
|
||
self.client = pulsar.Client( | ||
pulsar_host, | ||
logger=pulsar.ConsoleLogger(log_level.to_pulsar()) | ||
) | ||
|
||
self.producer = self.client.create_producer( | ||
topic=output_queue, | ||
schema=JsonSchema(Triples), | ||
chunking_enabled=True, | ||
) | ||
|
||
self.files = files | ||
self.user = user | ||
self.collection = collection | ||
|
||
def run(self): | ||
|
||
try: | ||
|
||
for file in self.files: | ||
self.load_file(file) | ||
|
||
except Exception as e: | ||
print(e, flush=True) | ||
|
||
def load_file(self, file): | ||
|
||
g = rdflib.Graph() | ||
g.parse(file, format="turtle") | ||
|
||
for e in g: | ||
s = Value(value=str(e[0]), is_uri=True) | ||
p = Value(value=str(e[1]), is_uri=True) | ||
if type(e[2]) == rdflib.term.URIRef: | ||
o = Value(value=str(e[2]), is_uri=True) | ||
else: | ||
o = Value(value=str(e[2]), is_uri=False) | ||
|
||
r = Triples( | ||
metadata=Metadata( | ||
id=None, | ||
metadata=[], | ||
user=self.user, | ||
collection=self.collection, | ||
), | ||
triples=[ Triple(s=s, p=p, o=o) ] | ||
) | ||
|
||
self.producer.send(r) | ||
|
||
def __del__(self): | ||
self.client.close() | ||
|
||
def main(): | ||
|
||
parser = argparse.ArgumentParser( | ||
prog='loader', | ||
description=__doc__, | ||
) | ||
|
||
parser.add_argument( | ||
'-p', '--pulsar-host', | ||
default=default_pulsar_host, | ||
help=f'Pulsar host (default: {default_pulsar_host})', | ||
) | ||
|
||
parser.add_argument( | ||
'-o', '--output-queue', | ||
default=default_output_queue, | ||
help=f'Output queue (default: {default_output_queue})' | ||
) | ||
|
||
parser.add_argument( | ||
'-u', '--user', | ||
default=default_user, | ||
help=f'User ID (default: {default_user})' | ||
) | ||
|
||
parser.add_argument( | ||
'-c', '--collection', | ||
default=default_collection, | ||
help=f'Collection ID (default: {default_collection})' | ||
) | ||
|
||
parser.add_argument( | ||
'-l', '--log-level', | ||
type=LogLevel, | ||
default=LogLevel.ERROR, | ||
choices=list(LogLevel), | ||
help=f'Output queue (default: info)' | ||
) | ||
|
||
parser.add_argument( | ||
'files', nargs='+', | ||
help=f'Turtle files to load' | ||
) | ||
|
||
args = parser.parse_args() | ||
|
||
while True: | ||
|
||
try: | ||
p = Loader( | ||
pulsar_host=args.pulsar_host, | ||
output_queue=args.output_queue, | ||
log_level=args.log_level, | ||
files=args.files, | ||
user=args.user, | ||
collection=args.collection, | ||
) | ||
|
||
p.run() | ||
|
||
print("File loaded.") | ||
break | ||
|
||
except Exception as e: | ||
|
||
print("Exception:", e, flush=True) | ||
print("Will retry...", flush=True) | ||
|
||
time.sleep(10) | ||
|
||
main() | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters