Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorrit Poelen committed Jan 9, 2025
1 parent e68233c commit eab3ba4
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 11 deletions.
99 changes: 88 additions & 11 deletions src/main/java/org/globalbioticinteractions/elton/cmd/CmdStream.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package org.globalbioticinteractions.elton.cmd;

import bio.guoda.preston.DateUtil;
import bio.guoda.preston.DerefProgressListener;
import bio.guoda.preston.DerefState;
import bio.guoda.preston.HashType;
import bio.guoda.preston.RefNodeFactory;
import bio.guoda.preston.store.BlobStoreAppendOnly;
import bio.guoda.preston.store.BlobStoreReadOnly;
import bio.guoda.preston.store.KeyTo3LevelPath;
import bio.guoda.preston.store.KeyValueStore;
import bio.guoda.preston.store.KeyValueStoreConfig;
import bio.guoda.preston.store.KeyValueStoreFactoryImpl;
import bio.guoda.preston.store.KeyValueStoreLocalFileSystem;
import bio.guoda.preston.store.ValidatingKeyValueStreamContentAddressedFactory;
import bio.guoda.preston.store.ValidatingKeyValueStreamFactory;
import bio.guoda.preston.stream.ContentHashDereferencer;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
Expand All @@ -37,6 +35,7 @@
import java.io.PrintStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -53,6 +52,42 @@ public class CmdStream extends CmdDefaultParams {
"example input:" +
"{ \"namespace\": \"hash://sha256/9cd053d40ef148e16389982ea16d724063b82567f7ba1799962670fc97876fbf\", \"citation\": \"hash://sha256/9cd053d40ef148e16389982ea16d724063b82567f7ba1799962670fc97876fbf\", \"format\": \"dwca\", \"url\": \"hash://sha256/9cd053d40ef148e16389982ea16d724063b82567f7ba1799962670fc97876fbf\" }\n";

@CommandLine.Option(
names = {"--remote", "--remotes", "--include", "--repos", "--repositories"},
split = ",",
description = "Included repository dependencies (e.g., https://linker.bio/,https://softwareheritage.org,https://wikimedia.org,https://dataone.org,https://zenodo.org)"
)

private List<URI> remotes = new ArrayList<>();

@CommandLine.Option(
names = {"--no-cache", "--disable-cache"},
defaultValue = "false",
description = "Disable local content cache"
)
private Boolean disableCache = false;

@CommandLine.Option(
names = {"--no-progress", "--no-process"},
description = "Disable progress monitor"
)
private Boolean disableProgress = false;

@CommandLine.Option(
names = {"-d", "--depth"},
defaultValue = "2",
description = "folder depth of data dir"
)
private int depth = 2;

@CommandLine.Option(
names = {"--hash-algorithm", "--algo", "-a"},
description = "Hash algorithm used to generate primary content identifiers. Supported values: ${COMPLETION-CANDIDATES}."
)
private HashType hashType = HashType.sha256;

private boolean supportTarGzDiscovery = true;


public void setRecordType(String recordType) {
this.recordType = recordType;
Expand All @@ -66,8 +101,26 @@ public void setRecordType(String recordType) {
@Override
public void doRun() {

KeyValueStoreConfig config
= new KeyValueStoreConfig(
new File(getDataDir()),
new File(getWorkDir()),
2,
isCacheEnabled(),
getRemotes(),
getHashType(),
new DerefProgressListener() {
@Override
public void onProgress(IRI iri, DerefState derefState, long l, long l1) {

}
},
isSupportTarGzDiscovery()
);

BlobStoreReadOnly blobStore = new BlobStoreAppendOnly(
getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()),
new KeyValueStoreFactoryImpl(config)
.getKeyValueStore(new ValidatingKeyValueStreamContentAddressedFactory()),
true,
HashType.sha256
);
Expand Down Expand Up @@ -98,12 +151,8 @@ public void doRun() {
}
}

private KeyValueStore getKeyValueStore(ValidatingKeyValueStreamFactory keyValueStreamFactory) {
KeyValueStoreConfig config
= new KeyValueStoreConfig(new File(getDataDir()), new File(getWorkDir()), 2);

return new KeyValueStoreFactoryImpl(config)
.getKeyValueStore(keyValueStreamFactory);
private boolean isCacheEnabled() {
return !getDisableCache();
}

private boolean handleDataset(final Dataset dataset, boolean shouldWriteHeader, Cache cache) throws IOException {
Expand Down Expand Up @@ -148,6 +197,34 @@ public String getDescription() {
return DESCRIPTION;
}

public List<URI> getRemotes() {
return remotes;
}

public Boolean getDisableCache() {
return disableCache;
}

public Boolean getDisableProgress() {
return disableProgress;
}

public int getDepth() {
return depth;
}

public HashType getHashType() {
return hashType;
}

public boolean isSupportTarGzDiscovery() {
return supportTarGzDiscovery;
}

public void setRemotes(List<URI> remotes) {
this.remotes = remotes;
}

public static class ImportLoggerFactoryImpl implements ImportLoggerFactory {
private final String recordType;
private final String namespace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -86,6 +87,42 @@ public void streamSomeInteractions() throws IOException {
assertThat(new String(errorStream.toByteArray(), StandardCharsets.UTF_8), startsWith("processing data stream from [local]..."));
}

@Test
public void streamSomeInteractionsUsingLocalRepository() throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();

File tmpDir = folder.newFolder("tmpDir");
tmpDir.mkdirs();

CmdStream cmdStream = new CmdStream();


cmdStream.setWorkDir(tmpDir.getAbsolutePath());
cmdStream.setDataDir(tmpDir.getAbsolutePath());
cmdStream.setWorkDir(tmpDir.getAbsolutePath());
cmdStream.setStdout(new PrintStream(outputStream));
cmdStream.setStderr(new PrintStream(errorStream));
cmdStream.setStdin(IOUtils.toInputStream("{ \"url\": \"hash://sha256/76c00c8b64e422800b85d29db93bcfa9ebee999f52f21e16cbd00ba750e98b44\", \"citation\": \"some citation\" }", StandardCharsets.UTF_8));
cmdStream.run();

String stdout = new String(outputStream.toByteArray(), StandardCharsets.UTF_8);
assertThat(stdout, Is.is(""));

File repoDir = folder.newFolder("repoDir");
repoDir.mkdirs();

populateCacheWithResource(repoDir, "/b92cd44dcba945c760229a14d3b9becb2dd0c147.zip");

cmdStream.setStdin(IOUtils.toInputStream("{ \"url\": \"hash://sha256/76c00c8b64e422800b85d29db93bcfa9ebee999f52f21e16cbd00ba750e98b44\", \"citation\": \"some citation\" }", StandardCharsets.UTF_8));
cmdStream.setRemotes(Arrays.asList(repoDir.toURI()));
cmdStream.run();

assertHeaderAndMore(outputStream, headerInteractions());

assertThat(new String(errorStream.toByteArray(), StandardCharsets.UTF_8), startsWith("processing data stream from [local]..."));
}


@Test
public void streamSomeProvStatements() throws IOException {
Expand Down

0 comments on commit eab3ba4

Please sign in to comment.