Skip to content

Commit

Permalink
towards implementing [elton track | elton tee | preston append | elto…
Browse files Browse the repository at this point in the history
…n stream] workflow
  • Loading branch information
Jorrit Poelen committed Jan 2, 2025
1 parent f2bb29e commit 1461ebd
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 33 deletions.
149 changes: 116 additions & 33 deletions src/main/java/org/globalbioticinteractions/elton/cmd/CmdStream.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package org.globalbioticinteractions.elton.cmd;

import bio.guoda.preston.RefNodeFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.rdf.api.IRI;
import org.eol.globi.data.ImportLogger;
import org.eol.globi.data.NodeFactory;
import org.eol.globi.domain.LogContext;
Expand All @@ -28,6 +32,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@CommandLine.Command(
name = "stream",
Expand All @@ -40,6 +46,10 @@ public class CmdStream extends CmdDefaultParams {
public static final String DESCRIPTION = "stream interactions associated with dataset configuration provided by globi.json line-json as input.\n" +
"example input:" +
"{ \"namespace\": \"hash://sha256/9cd053d40ef148e16389982ea16d724063b82567f7ba1799962670fc97876fbf\", \"citation\": \"hash://sha256/9cd053d40ef148e16389982ea16d724063b82567f7ba1799962670fc97876fbf\", \"format\": \"dwca\", \"url\": \"https://linker.bio/hash://sha256/9cd053d40ef148e16389982ea16d724063b82567f7ba1799962670fc97876fbf\" }\n";
public static final String ASSOCIATED_WITH = " <http://www.w3.org/ns/prov#wasAssociatedWith> ";
public static final String FORMAT = " <http://purl.org/dc/elements/1.1/format> ";
public static final String HAS_VERSION = " <http://purl.org/pav/hasVersion> ";
public static final String URN_LSID_GLOBALBIOTICINTERACTIONS_ORG = "urn:lsid:globalbioticinteractions.org:";

public void setRecordType(String recordType) {
this.recordType = recordType;
Expand All @@ -56,43 +66,62 @@ public void doRun() {
BufferedReader reader = IOUtils.buffer(new InputStreamReader(getStdin(), StandardCharsets.UTF_8));
AtomicBoolean isFirst = new AtomicBoolean(true);
try {
IRI resourceLocation = null;
IRI resourceNamespace = null;
String resourceFormat = null;
IRI resourceVersion = null;
String line;
while ((line = reader.readLine()) != null) {
try {
JsonNode jsonNode = new ObjectMapper().readTree(line);
String namespace = jsonNode.at("/namespace").asText(DatasetRegistryUtil.NAMESPACE_LOCAL);
if (StringUtils.isNotBlank(namespace)) {
ImportLoggerFactory loggerFactory = new ImportLoggerFactoryImpl(recordType, namespace, Arrays.asList(ReviewCommentType.values()), getStdout());
try {
boolean shouldWriteHeader = isFirst.get();
StreamingDatasetsHandler namespaceHandler = new StreamingDatasetsHandler(
jsonNode,
getDataDir(),
getProvDir(),
getStderr(),
createInputStreamFactory(),
new NodeFactoryFactoryImpl(shouldWriteHeader, recordType, loggerFactory.createImportLogger()),
loggerFactory,
getContentPathFactory(),
getProvenancePathFactory()
);
namespaceHandler.onNamespace(namespace);
isFirst.set(false);
} catch (Exception e) {
String msg = "failed to add dataset associated with namespace [" + namespace + "]";
loggerFactory.createImportLogger().warn(new LogContext() {
@Override
public String toString() {
return "{ \"namespace\": \"" + namespace + "\" }";
}
}, msg);
LOG.error(msg, e);
} finally {
FileUtils.forceDelete(new File(this.getDataDir()));
boolean handled = handleAsGloBIJson(isFirst, line);
if (!handled) {
if (StringUtils.contains(line, ASSOCIATED_WITH)) {
// possible namespace statement
Pattern namespacePattern = Pattern.compile("<(?<namespace>" + URN_LSID_GLOBALBIOTICINTERACTIONS_ORG + "[^>]+)>" + ASSOCIATED_WITH + "<(?<location>[^>]+)>.*");
Matcher matcher = namespacePattern.matcher(line);
if (matcher.matches()) {
resourceNamespace = RefNodeFactory.toIRI(matcher.group("namespace"));
resourceLocation = RefNodeFactory.toIRI(matcher.group("location"));
}
} else if (StringUtils.contains(line, FORMAT)) {
Pattern namespacePattern = Pattern.compile("<(?<location>[^>]+)>" + FORMAT + "\"(?<format>[^\"]+)\".*");
Matcher matcher = namespacePattern.matcher(line);
if (matcher.matches()) {
resourceLocation = RefNodeFactory.toIRI(matcher.group("location"));
resourceFormat = matcher.group("format");
}
// possible format statement
} else if (StringUtils.contains(line, HAS_VERSION)) {
// possible version statement
Pattern namespacePattern = Pattern.compile("<(?<location>[^>]+)>" + HAS_VERSION + "<(?<version>[^>]+)>.*");
Matcher matcher = namespacePattern.matcher(line);
if (matcher.matches()) {
resourceLocation = RefNodeFactory.toIRI(matcher.group("location"));
resourceVersion = RefNodeFactory.toIRI(matcher.group("version"));
}
}

if (resourceLocation != null
&& resourceFormat != null
&& resourceVersion != null) {
String resourceNamespaceString = (resourceNamespace == null
? RefNodeFactory.toIRI(URN_LSID_GLOBALBIOTICINTERACTIONS_ORG + "local")
: resourceNamespace).getIRIString();
String namespace = StringUtils.removeStart(resourceNamespaceString, URN_LSID_GLOBALBIOTICINTERACTIONS_ORG);
ObjectNode globiConfig = new ObjectMapper().createObjectNode();
globiConfig.put("url", resourceLocation.getIRIString());
globiConfig.put("format", StringUtils.replace(resourceFormat, "application/globi", "globi"));
globiConfig.put("citation", resourceVersion.getIRIString());
ArrayNode resourceMapping = new ObjectMapper().createArrayNode();
ObjectNode objectNode = new ObjectMapper().createObjectNode();
objectNode.put(resourceLocation.getIRIString(), "https://linker.bio/" + resourceVersion.getIRIString());
resourceMapping.add(objectNode);
globiConfig.set("resources", resourceMapping);
handleGloBIJson(namespace, isFirst, globiConfig);
resourceLocation = null;
resourceFormat = null;
resourceVersion = null;
resourceNamespace = null;
}
} catch (JsonProcessingException e) {
// ignore non-json lines
}
}
} catch (IOException ex) {
Expand All @@ -101,6 +130,60 @@ public String toString() {

}

private boolean handleAsGloBIJson(AtomicBoolean isFirst, String line) throws IOException {
boolean handled = false;
try {
JsonNode jsonNode = new ObjectMapper().readTree(line);
String namespace = jsonNode.at("/namespace").asText(DatasetRegistryUtil.NAMESPACE_LOCAL);
if (StringUtils.isNotBlank(namespace)) {
handled = handleGloBIJson(namespace, isFirst, jsonNode);
}
} catch (JsonProcessingException e) {
// ignore non-json lines
}
return handled;
}

private boolean handleGloBIJson(final String namespace, AtomicBoolean isFirst, JsonNode jsonNode) throws IOException {
boolean handled = false;
ImportLoggerFactory loggerFactory = new ImportLoggerFactoryImpl(
recordType,
namespace,
Arrays.asList(ReviewCommentType.values()),
getStdout()
);
try {
boolean shouldWriteHeader = isFirst.get();
StreamingDatasetsHandler namespaceHandler = new StreamingDatasetsHandler(
jsonNode,
getDataDir(),
getProvDir(),
getStderr(),
createInputStreamFactory(),
new NodeFactoryFactoryImpl(shouldWriteHeader, recordType, loggerFactory.createImportLogger()),
loggerFactory,
getContentPathFactory(),
getProvenancePathFactory()
);
namespaceHandler.onNamespace(namespace);
isFirst.set(false);
handled = true;
} catch (Exception e) {
String msg = "failed to add dataset associated with namespace [" + namespace + "]";
loggerFactory.createImportLogger().warn(new LogContext() {
@Override
public String toString() {
return "{ \"namespace\": \"" + namespace + "\" }";
}
}, msg);
LOG.error(msg, e);
} finally {
// FileUtils.forceDelete(new File(this.getDataDir()));
}
return handled;

}

@Override
public String getDescription() {
return DESCRIPTION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import org.apache.commons.io.IOUtils;
import org.hamcrest.core.Is;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;

Expand All @@ -15,6 +19,9 @@

public class CmdStreamTest {

@Rule
public TemporaryFolder folder = new TemporaryFolder();

@Test
public void streamNothing() {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Expand Down Expand Up @@ -46,6 +53,37 @@ public void streamSomeInteractions() {
assertThat(new String(errorStream.toByteArray(), StandardCharsets.UTF_8), startsWith("tracking [local]..."));
}


@Test
public void streamSomeProvStatements() throws IOException {

String provLogGeneratedByElton = "<urn:lsid:globalbioticinteractions.org:globalbioticinteractions/template-dataset> <http://www.w3.org/ns/prov#wasAssociatedWith> <https://github.com/globalbioticinteractions/template-dataset/archive/b92cd44dcba945c760229a14d3b9becb2dd0c147.zip> <urn:uuid:16b63a6d-153b-4f16-afed-a67fa09383a7> .\n" +
"<https://github.com/globalbioticinteractions/template-dataset/archive/b92cd44dcba945c760229a14d3b9becb2dd0c147.zip> <http://purl.org/dc/elements/1.1/format> \"application/globi\" <urn:uuid:16b63a6d-153b-4f16-afed-a67fa09383a7> .\n" +
"<urn:uuid:41389744-0f4d-47e2-8506-76999e1b5c34> <http://www.w3.org/ns/prov#used> <https://github.com/globalbioticinteractions/template-dataset/archive/b92cd44dcba945c760229a14d3b9becb2dd0c147.zip> <urn:uuid:16b63a6d-153b-4f16-afed-a67fa09383a7> .\n" +
"<https://github.com/globalbioticinteractions/template-dataset/archive/b92cd44dcba945c760229a14d3b9becb2dd0c147.zip> <http://purl.org/pav/hasVersion> <hash://sha256/76c00c8b64e422800b85d29db93bcfa9ebee999f52f21e16cbd00ba750e98b44> <urn:uuid:16b63a6d-153b-4f16-afed-a67fa09383a7> .\n" +
"<urn:uuid:cce97773-a8e2-4af4-94f9-0ac2699cb28e> <http://www.w3.org/ns/prov#used> <jar:hash://sha256/76c00c8b64e422800b85d29db93bcfa9ebee999f52f21e16cbd00ba750e98b44!/template-dataset-b92cd44dcba945c760229a14d3b9becb2dd0c147/globi.json> <urn:uuid:16b63a6d-153b-4f16-afed-a67fa09383a7> .\n" +
"<jar:hash://sha256/76c00c8b64e422800b85d29db93bcfa9ebee999f52f21e16cbd00ba750e98b44!/template-dataset-b92cd44dcba945c760229a14d3b9becb2dd0c147/globi.json> <http://purl.org/pav/hasVersion> <hash://sha256/94bc19a3b0f172f63138fdc9384bb347f110e6fae6d42613a6eba019df6268d2> <urn:uuid:16b63a6d-153b-4f16-afed-a67fa09383a7> .\n" +
"<urn:uuid:c7b1a849-8230-4e34-a0d5-7b663bc87e01> <http://www.w3.org/ns/prov#used> <jar:hash://sha256/76c00c8b64e422800b85d29db93bcfa9ebee999f52f21e16cbd00ba750e98b44!/template-dataset-b92cd44dcba945c760229a14d3b9becb2dd0c147/interactions.tsv> <urn:uuid:16b63a6d-153b-4f16-afed-a67fa09383a7> .\n" +
"<jar:hash://sha256/76c00c8b64e422800b85d29db93bcfa9ebee999f52f21e16cbd00ba750e98b44!/template-dataset-b92cd44dcba945c760229a14d3b9becb2dd0c147/interactions.tsv> <http://purl.org/pav/hasVersion> <hash://sha256/d84999936296e4b85086f2851f4459605502f4eb80b9484049b81d34f43b2ff1> <urn:uuid:16b63a6d-153b-4f16-afed-a67fa09383a7> .\n";

ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();

CmdStream cmdStream = new CmdStream();
File tmpDir = folder.newFolder("tmpDir");
tmpDir.mkdirs();
cmdStream.setWorkDir(tmpDir.getAbsolutePath());
cmdStream.setDataDir(tmpDir.getAbsolutePath());
cmdStream.setWorkDir(tmpDir.getAbsolutePath());
cmdStream.setStdout(new PrintStream(outputStream));
cmdStream.setStderr(new PrintStream(errorStream));
cmdStream.setStdin(IOUtils.toInputStream(provLogGeneratedByElton, StandardCharsets.UTF_8));
cmdStream.run();

assertThat(new String(outputStream.toByteArray(), StandardCharsets.UTF_8), startsWith(headerInteractions()));
assertThat(new String(errorStream.toByteArray(), StandardCharsets.UTF_8), Is.is("tracking [globalbioticinteractions/template-dataset]...done.\nwrote [globalbioticinteractions/template-dataset]\n"));
}

@Test
public void streamSomeInteractionsCustomNamespace() {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Expand Down

0 comments on commit 1461ebd

Please sign in to comment.