From 89d925273d3151e27fe0948e1be002ce37c08593 Mon Sep 17 00:00:00 2001 From: Jey Puget Gil Date: Tue, 1 Oct 2024 14:30:42 +0200 Subject: [PATCH] Quads Loader + Docker compose: - Use streams only - Use sha256 images --- docker-compose.yml | 6 +- .../converg/services/QuadImportService.java | 196 ++++++++---------- 2 files changed, 95 insertions(+), 107 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index d1ba350..60e5b93 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: postgres: - image: postgres:16.2 + image: postgres@sha256:4ec37d2a07a0067f176fdcc9d4bb633a5724d2cc4f892c7a2046d054bb6939e5 ports: - "5432:5432" environment: @@ -17,7 +17,7 @@ services: memory: ${BD_RAM_LIMITATION:-16Gb} postgres-test: - image: postgres:16.2 + image: postgres@sha256:4ec37d2a07a0067f176fdcc9d4bb633a5724d2cc4f892c7a2046d054bb6939e5 ports: - "5433:5432" environment: @@ -33,7 +33,7 @@ services: memory: ${BD_RAM_LIMITATION:-16Gb} blazegraph: - image: vcity/blazegraph-cors + image: vcity/blazegraph-cors@sha256:c6f9556ca53ff01304557e349d2f10b3e121dae7230426f4c64fa42b2cbaf805 ports: - "9999:8080" container_name: blazegraph diff --git a/quads-loader/src/main/java/fr/vcity/converg/services/QuadImportService.java b/quads-loader/src/main/java/fr/vcity/converg/services/QuadImportService.java index 3843254..1a904fe 100644 --- a/quads-loader/src/main/java/fr/vcity/converg/services/QuadImportService.java +++ b/quads-loader/src/main/java/fr/vcity/converg/services/QuadImportService.java @@ -5,14 +5,13 @@ import fr.vcity.converg.repository.*; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FilenameUtils; -import org.apache.jena.graph.Graph; -import org.apache.jena.graph.Node; import org.apache.jena.graph.Triple; import org.apache.jena.riot.RDFLanguages; import org.apache.jena.riot.RDFParser; import org.apache.jena.riot.RiotException; -import org.apache.jena.riot.system.ErrorHandlerFactory; -import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.riot.system.StreamRDF; +import org.apache.jena.riot.system.StreamRDFBase; +import org.apache.jena.sparql.core.Quad; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; @@ -20,8 +19,9 @@ import java.io.InputStream; import java.time.LocalDateTime; import java.util.ArrayList; -import java.util.Iterator; +import java.util.HashSet; import java.util.List; +import java.util.Set; @Service @Slf4j @@ -66,6 +66,10 @@ public record QuadValueType(TripleValueType tripleValueType, String namedGraph, IVersionRepository versionRepository; VersionedQuadComponent versionedQuadComponent; + List quadValueTypes = new ArrayList<>(); + Set namedGraphs = new HashSet<>(); + List tripleValueTypes = new ArrayList<>(); + public QuadImportService( IFlatModelQuadRepository flatModelQuadRepository, IFlatModelTripleRepository flatModelTripleRepository, @@ -106,18 +110,12 @@ public Integer importModel(MultipartFile file) throws RiotException { log.info("Current file: {}", file.getOriginalFilename()); - try (InputStream inputStream = file.getInputStream()) { - DatasetGraph datasetGraph = - RDFParser.create() - .source(inputStream) - .lang(RDFLanguages.nameToLang(FilenameUtils.getExtension(file.getOriginalFilename()))) - .errorHandler(ErrorHandlerFactory.errorHandlerStrict) - .toDatasetGraph(); + try (InputStream inputStream = file.getInputStream()) { Long start = System.nanoTime(); - extractAndInsertVersionedNamedGraph(file, datasetGraph, version); - extractAndInsertQuads(datasetGraph, version); + getQuadsStreamRDF(inputStream, file.getOriginalFilename(), version.getIndexVersion()) + .finish(); log.info("Saving quads to catalog"); rdfResourceRepository.flatModelQuadsToCatalog(); @@ -149,23 +147,10 @@ public void importMetadata(MultipartFile file) throws RiotException { log.info("Current file: {}", file.getOriginalFilename()); try (InputStream inputStream = file.getInputStream()) { - DatasetGraph datasetGraph = - RDFParser.create() - .source(inputStream) - .lang(RDFLanguages.nameToLang(FilenameUtils.getExtension(file.getOriginalFilename()))) - .errorHandler(ErrorHandlerFactory.errorHandlerStrict) - .toDatasetGraph(); - Long start = System.nanoTime(); - if (!datasetGraph.getDefaultGraph().isEmpty()) { - importDefaultModel(datasetGraph.getDefaultGraph()); - } - - for (Iterator i = datasetGraph.listGraphNodes(); i.hasNext(); ) { - var graphNode = i.next(); - importDefaultModel(datasetGraph.getGraph(graphNode)); - } + getTriplesStreamRDF(inputStream, file.getOriginalFilename()) + .finish(); log.info("Saving triples to catalog"); rdfResourceRepository.flatModelTriplesToCatalog(); @@ -200,96 +185,99 @@ public void removeMetadata() { } /** - * Import RDF default model statements + * Save the triples in batch + */ + private void saveBatchTriples() { + metadataComponent.saveTriples(tripleValueTypes); + tripleValueTypes.clear(); + } + + /** + * Save the quads in batch + */ + private void saveBatchQuads() { + versionedQuadComponent.saveQuads(quadValueTypes); + quadValueTypes.clear(); + } + + /** + * Save the versioned named graph in batch * - * @param graph The default graph + * @param filename The input filename + * @param version The version number */ - private void importDefaultModel(Graph graph) { - List tripleValueTypes = new ArrayList<>(); - - graph.stream() - .forEach(triple -> { - tripleValueTypes.add(getTripleValueType(triple)); - - if (tripleValueTypes.size() == 50000) { - log.info("50000 records found. Executing batch save triples"); - metadataComponent.saveTriples(tripleValueTypes); - tripleValueTypes.clear(); - } - }); - - if (!tripleValueTypes.isEmpty()) { - metadataComponent.saveTriples(tripleValueTypes); - } + private void saveBatchVersionedNamedGraph(String filename, Integer version) { + versionedNamedGraphComponent.saveVersionedNamedGraph(namedGraphs.stream().toList(), filename, version); + namedGraphs.clear(); } /** - * Extract and insert the quads + * Converts a stream into a stream of quads * - * @param datasetGraph The datasetGraph - * @param version The version + * @param in The input stream of the dataset + * @return A stream of quads */ - private void extractAndInsertQuads(DatasetGraph datasetGraph, Version version) { - List quadValueTypes = new ArrayList<>(); - - datasetGraph.stream() - .forEach(quad -> { - QuadValueType quadValueType = new QuadValueType( - getTripleValueType( - quad.asTriple() - ), - quad.getGraph().getURI(), - version.getIndexVersion() - 1 - ); - quadValueTypes.add(quadValueType); - - if (quadValueTypes.size() == 50000) { - log.info("50000 records found. Executing batch save quads"); - versionedQuadComponent.saveQuads(quadValueTypes); - quadValueTypes.clear(); - } - }); - - datasetGraph.getDefaultGraph() - .stream() - .forEach(triple -> { - QuadValueType quadValueType = new QuadValueType(getTripleValueType(triple), defaultGraphURI.getName(), version.getIndexVersion() - 1); - quadValueTypes.add(quadValueType); - - if (quadValueTypes.size() == 50000) { - log.info("50000 records found. Executing batch save quads"); - versionedQuadComponent.saveQuads(quadValueTypes); - quadValueTypes.clear(); - } - }); - - if (!quadValueTypes.isEmpty()) { - versionedQuadComponent.saveQuads(quadValueTypes); - } + private StreamRDF getQuadsStreamRDF(InputStream in, String filename, Integer version) { + StreamRDF quadStreamRDF = new StreamRDFBase() { + @Override + public void quad(Quad quad) { + namedGraphs.add(quad.getGraph().getURI()); + quadValueTypes.add(new QuadValueType( + getTripleValueType(quad.asTriple()), + quad.getGraph().getURI(), + version - 1 + )); + + if (namedGraphs.size() == 50000) { + log.info("50000 named graph records found. Executing batch save named graph"); + saveBatchVersionedNamedGraph(filename, version); + } + + if (quadValueTypes.size() == 50000) { + log.info("50000 quads records found. Executing batch save quads"); + saveBatchQuads(); + } + } + }; + + RDFParser.create() + .source(in) + .lang(RDFLanguages.nameToLang(FilenameUtils.getExtension(filename))) + .parse(quadStreamRDF); + + saveBatchVersionedNamedGraph(filename, version); + saveBatchQuads(); + + return quadStreamRDF; } /** - * Extract and insert the versioned named graph + * Converts a stream into a stream of quads * - * @param file The input file - * @param datasetGraph The datasetGraph - * @param version The version + * @param in The input stream of the dataset + * @return A stream of quads */ - private void extractAndInsertVersionedNamedGraph(MultipartFile file, DatasetGraph datasetGraph, Version version) { - List namedGraphs = new ArrayList<>(); + private StreamRDF getTriplesStreamRDF(InputStream in, String filename) { + StreamRDF tripleStreamRDF = new StreamRDFBase() { + @Override + public void triple(Triple triple) { + tripleValueTypes.add(getTripleValueType(triple)); + + if (tripleValueTypes.size() == 50000) { + log.info("50000 triples records found. Executing batch save quads"); + saveBatchTriples(); + } + } + }; - for (Iterator i = datasetGraph.listGraphNodes(); i.hasNext(); ) { - var namedModel = i.next(); - namedGraphs.add(namedModel.getURI()); - } + RDFParser.create() + .source(in) + .lang(RDFLanguages.nameToLang(FilenameUtils.getExtension(filename))) + .parse(tripleStreamRDF); - if (!datasetGraph.getDefaultGraph().isEmpty()) { - namedGraphs.add(defaultGraphURI.getName()); - } + saveBatchTriples(); - if (!namedGraphs.isEmpty()) { - versionedNamedGraphComponent.saveVersionedNamedGraph(namedGraphs, file.getOriginalFilename(), version.getIndexVersion()); - } + return tripleStreamRDF; } /**