Skip to content

Commit

Permalink
decouple dataset extraction from dataset handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorrit Poelen committed Jan 2, 2025
1 parent 747677c commit abdd21c
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 41 deletions.
66 changes: 33 additions & 33 deletions src/main/java/org/globalbioticinteractions/elton/cmd/CmdStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import org.eol.globi.data.NodeFactory;
import org.eol.globi.domain.LogContext;
import org.eol.globi.tool.NullImportLogger;

import org.globalbioticinteractions.dataset.Dataset;
import org.globalbioticinteractions.dataset.DatasetImpl;
import org.globalbioticinteractions.elton.util.DatasetRegistryUtil;
import org.globalbioticinteractions.elton.util.ProgressCursor;
import org.globalbioticinteractions.elton.util.ProgressCursorFactory;
Expand All @@ -29,9 +30,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -46,10 +45,6 @@ public class CmdStream extends CmdDefaultParams {
public static final String DESCRIPTION = "stream interactions associated with dataset configuration provided by globi.json line-json as input.\n" +
"example input:" +
"{ \"namespace\": \"hash://sha256/9cd053d40ef148e16389982ea16d724063b82567f7ba1799962670fc97876fbf\", \"citation\": \"hash://sha256/9cd053d40ef148e16389982ea16d724063b82567f7ba1799962670fc97876fbf\", \"format\": \"dwca\", \"url\": \"https://linker.bio/hash://sha256/9cd053d40ef148e16389982ea16d724063b82567f7ba1799962670fc97876fbf\" }\n";
public static final String ASSOCIATED_WITH = " <http://www.w3.org/ns/prov#wasAssociatedWith> ";
public static final String FORMAT = " <http://purl.org/dc/elements/1.1/format> ";
public static final String HAS_VERSION = " <http://purl.org/pav/hasVersion> ";
public static final String URN_LSID_GLOBALBIOTICINTERACTIONS_ORG = "urn:lsid:globalbioticinteractions.org:";

public void setRecordType(String recordType) {
this.recordType = recordType;
Expand All @@ -73,12 +68,14 @@ public void doRun() {

String line;
while ((line = reader.readLine()) != null) {
boolean handled = jsonLineHandler.processLine(line, shouldWriteHeader.get());
if (!handled) {
provLineHandler.processLine(line, shouldWriteHeader.get());
Dataset dataset = jsonLineHandler.extractDataset(line, shouldWriteHeader.get());
if (dataset == null) {
dataset = provLineHandler.extractDataset(line, shouldWriteHeader.get());
}
if (handled) {
shouldWriteHeader.set(false);
if (dataset != null) {
if (handleDatasetConfig(dataset.getNamespace(), shouldWriteHeader.get(), dataset.getConfig())) {
shouldWriteHeader.set(false);
}
}
}
} catch (IOException ex) {
Expand All @@ -87,21 +84,7 @@ public void doRun() {

}

private boolean handleAsGloBIJson(String line, boolean isFirst) throws IOException {
boolean handled = false;
try {
JsonNode jsonNode = new ObjectMapper().readTree(line);
String namespace = jsonNode.at("/namespace").asText(DatasetRegistryUtil.NAMESPACE_LOCAL);
if (StringUtils.isNotBlank(namespace)) {
handled = handleGloBIJson(namespace, isFirst, jsonNode);
}
} catch (JsonProcessingException e) {
// ignore non-json lines
}
return handled;
}

private boolean handleGloBIJson(final String namespace, boolean shouldWriteHeader, JsonNode jsonNode) throws IOException {
private boolean handleDatasetConfig(final String namespace, boolean shouldWriteHeader, JsonNode jsonNode) throws IOException {
boolean handled = false;
ImportLoggerFactory loggerFactory = new ImportLoggerFactoryImpl(
recordType,
Expand Down Expand Up @@ -214,20 +197,36 @@ public NodeFactory createNodeFactory() {
public class LineHandlerJson implements LineHandler {

@Override
public boolean processLine(String line, boolean isFirstLine) throws IOException {
return handleAsGloBIJson(line, isFirstLine);
public Dataset extractDataset(String line, boolean isFirstLine) throws IOException {
Dataset dataset = null;
try {
JsonNode jsonNode = new ObjectMapper().readTree(line);
String namespace = jsonNode.at("/namespace").asText(DatasetRegistryUtil.NAMESPACE_LOCAL);
if (StringUtils.isNotBlank(namespace)) {
dataset = new DatasetImpl(namespace, null, null);
dataset.setConfig(jsonNode);
}
} catch (JsonProcessingException e) {
// ignore non-json lines
}
return dataset;
}
}

public class LineHandlerProv implements LineHandler {
static final String ASSOCIATED_WITH = " <http://www.w3.org/ns/prov#wasAssociatedWith> ";
static final String FORMAT = " <http://purl.org/dc/elements/1.1/format> ";
static final String HAS_VERSION = " <http://purl.org/pav/hasVersion> ";
static final String URN_LSID_GLOBALBIOTICINTERACTIONS_ORG = "urn:lsid:globalbioticinteractions.org:";

IRI resourceLocation = null;
IRI resourceNamespace = null;
String resourceFormat = null;
IRI resourceVersion = null;

@Override
public boolean processLine(String line, boolean isFirstLine) throws IOException {
boolean handled = false;
public Dataset extractDataset(String line, boolean isFirstLine) throws IOException {
Dataset dataset = null;
if (StringUtils.contains(line, ASSOCIATED_WITH)) {
// possible namespace statement
Pattern namespacePattern = Pattern.compile("<(?<namespace>" + URN_LSID_GLOBALBIOTICINTERACTIONS_ORG + "[^>]+)>" + ASSOCIATED_WITH + "<(?<location>[^>]+)>.*");
Expand Down Expand Up @@ -270,10 +269,11 @@ public boolean processLine(String line, boolean isFirstLine) throws IOException
objectNode.put(resourceLocation.getIRIString(), "https://linker.bio/" + resourceVersion.getIRIString());
resourceMapping.add(objectNode);
//globiConfig.set("resources", resourceMapping);
handled = handleGloBIJson(namespace, isFirstLine, globiConfig);
dataset = new DatasetImpl(namespace, null, null);
dataset.setConfig(globiConfig);
resetContext();
}
return handled;
return dataset;
}

private void resetOnLocationSwitch(Matcher matcher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ static DatasetRegistry createDataFinderLoggingCaching(
ActivityListener activityListener,
ActivityContext ctx,
Supplier<IRI> iriFactory,
DatasetRegistry registryProvenanceLogger) {
DatasetRegistry datasetRegistry) {

CacheFactory cacheFactory = createCacheFactory(
namespace,
Expand All @@ -85,7 +85,7 @@ static DatasetRegistry createDataFinderLoggingCaching(
iriFactory
);
return new DatasetRegistryWithCache(
registryProvenanceLogger,
datasetRegistry,
cacheFactory
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package org.globalbioticinteractions.elton.cmd;

import org.globalbioticinteractions.dataset.Dataset;

import java.io.IOException;

public interface LineHandler {

boolean processLine(String line, boolean isFirstLine) throws IOException;
Dataset extractDataset(String line, boolean isFirstLine) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

class StreamingDatasetsHandler implements NamespaceHandler {
private final static Logger LOG = LoggerFactory.getLogger(StreamingDatasetsHandler.class);
private final String cacheDir;
private final String dataDir;
private final PrintStream stderr;

private InputStreamFactory factory;
Expand All @@ -44,7 +44,7 @@ class StreamingDatasetsHandler implements NamespaceHandler {
private Supplier<IRI> activityIdFactory;

public StreamingDatasetsHandler(JsonNode config,
String cacheDir,
String dataDir,
String provDir,
PrintStream stderr,
InputStreamFactory inputStreamFactory,
Expand All @@ -55,7 +55,7 @@ public StreamingDatasetsHandler(JsonNode config,
ActivityContext ctx,
Supplier<IRI> activityIdFactory) {
this.factory = inputStreamFactory;
this.cacheDir = cacheDir;
this.dataDir = dataDir;
this.provDir = provDir;
this.stderr = stderr;
this.config = config;
Expand All @@ -74,7 +74,7 @@ public void onNamespace(String namespace) throws Exception {

CacheFactory cacheFactory = CmdUtil.createCacheFactory(
namespace,
cacheDir,
dataDir,
provDir,
factory,
contentPathFactory,
Expand All @@ -101,7 +101,7 @@ public void onNamespace(String namespace) throws Exception {
datasetWithCache,
nodeFactory,
loggerFactory.createImportLogger(),
new File(cacheDir)
new File(dataDir)
);
stderr.println("done.");
} catch (StudyImporterException ex) {
Expand Down

0 comments on commit abdd21c

Please sign in to comment.