Skip to content

Commit

Permalink
Refactoring and fix for e2e acks (#1)
Browse files Browse the repository at this point in the history
* Refactoring and fix for e2e acks

* fix: remove WIP code from StreamRecordValidator

Signed-off-by: Amanda Xiang <[email protected]>
  • Loading branch information
melzareix authored and Amanda Xiang committed Jan 2, 2025
1 parent 62c278a commit e525d90
Show file tree
Hide file tree
Showing 20 changed files with 725 additions and 538 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
/*
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/
package org.opensearch.dataprepper.plugins.source.neptune.client;

Expand Down Expand Up @@ -43,7 +49,7 @@ public static StreamType fromString(String name) {
}
}

private NeptuneDataClientWrapper(final NeptuneSourceConfig sourceConfig, final long batchSize) {
protected NeptuneDataClientWrapper(final NeptuneSourceConfig sourceConfig, final long batchSize) {
this.client = NeptuneDataClientFactory.provideNeptuneDataClient(sourceConfig);
this.streamType = StreamType.fromString(sourceConfig.getStreamType());
this.batchSize = batchSize;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.source.neptune.client;

import lombok.Getter;
import org.opensearch.dataprepper.plugins.source.neptune.configuration.NeptuneSourceConfig;
import org.opensearch.dataprepper.plugins.source.neptune.stream.model.NeptuneStreamRecord;
import org.opensearch.dataprepper.plugins.source.neptune.stream.model.StreamPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.neptunedata.model.StreamRecordsNotFoundException;

import java.time.Duration;
import java.util.List;

public class NeptuneStreamClient {
private static final Logger LOG = LoggerFactory.getLogger(NeptuneStreamClient.class);
private static final long MAX_BACKOFF_TIME = 60;
private final NeptuneDataClientWrapper dataClient;
private final NeptuneStreamEventListener listener;

@Getter
private StreamPosition streamPositionInfo;
private long retryCount;

public NeptuneStreamClient(final NeptuneSourceConfig config, final int batchSize, final NeptuneStreamEventListener listener) {
this.dataClient = new NeptuneDataClientWrapper(config, batchSize);
this.listener = listener;
this.streamPositionInfo = StreamPosition.empty();
this.retryCount = 0;
}


public void setStreamPosition(final long commitNum, final long opNum) {
streamPositionInfo = new StreamPosition(commitNum, opNum);
}

public void start() throws InterruptedException {
while (!Thread.currentThread().isInterrupted() && !listener.shouldStopNeptuneStream(streamPositionInfo)) {
try {
final List<NeptuneStreamRecord> streamRecords =
this.dataClient.getStreamRecords(streamPositionInfo.getCommitNum(), streamPositionInfo.getOpNum());
retryCount = 0;
if (!streamRecords.isEmpty()) {
final NeptuneStreamRecord<?> lastRecord = streamRecords.get(streamRecords.size() - 1);
setStreamPosition(lastRecord.getCommitNum(), lastRecord.getOpNum());
}
listener.onNeptuneStreamEvents(streamRecords, streamPositionInfo);
} catch (final StreamRecordsNotFoundException exception) {
final long nextBackoff = getNextBackoff();
LOG.info("Stream is up-to-date, Sleeping for {} seconds before retrying again.", nextBackoff);
Thread.sleep(Duration.ofSeconds(nextBackoff).toMillis());
} catch (final Exception exception) {
if (!listener.onNeptuneStreamException(exception, streamPositionInfo)) {
break;
}
}
}
}

private long getNextBackoff() {
final long nextBackoff = (long) Math.pow(2.0f, retryCount++);
return Math.min(MAX_BACKOFF_TIME, nextBackoff);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.source.neptune.client;

import org.opensearch.dataprepper.plugins.source.neptune.stream.model.NeptuneStreamRecord;
import org.opensearch.dataprepper.plugins.source.neptune.stream.model.StreamPosition;

import java.util.List;

public interface NeptuneStreamEventListener {
/**
* @param records the records returned from the Stream.
* @param streamPosition current commitNum and OpNum in the stream.
*/
void onNeptuneStreamEvents(final List<NeptuneStreamRecord> records, final StreamPosition streamPosition);

/**
*
* @param exception
* @param streamPosition current commitNum and OpNum in the stream.
* @return boolean if the execution should continue after that exception is encountered.
*/
boolean onNeptuneStreamException(final Exception exception, final StreamPosition streamPosition);

/**
*
* @param streamPosition
* @return boolean if stream processing should be stopped
*/
boolean shouldStopNeptuneStream(final StreamPosition streamPosition);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@ public class NeptuneSourceConfig {
@JsonProperty("s3_region")
private String s3Region;

@JsonProperty("enable_non_string_indexing")
private boolean enableNonStringIndexing = false;

@JsonProperty("acknowledgments")
private boolean acknowledgments = false;
@JsonProperty("partition_acknowledgment_timeout")
@JsonDeserialize(using = DurationDeserializer.class)
private Duration partitionAcknowledgmentTimeout = DEFAULT_ACKNOWLEDGEMENT_SET_TIMEOUT;

@JsonProperty("aws_config")
@JsonProperty("aws")
@NotNull
@Valid
private AwsConfig awsConfig;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.neptune.converter;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;
import lombok.Getter;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.opensearch.dataprepper.plugins.source.neptune.stream.StreamUtils;
import org.opensearch.dataprepper.plugins.source.neptune.stream.model.NeptuneStreamRecord;
import software.amazon.awssdk.services.neptunedata.model.PropertygraphData;
import software.amazon.awssdk.services.neptunedata.model.SparqlData;

import java.io.IOException;

/**
* Represents a single record stored in S3.
*/
@Builder
@Getter
public class NeptuneS3Record {

private static final String DOCUMENT_TYPE_VERTEX = "vertex";
private static final String DOCUMENT_TYPE_EDGE = "edge";
private static final String DOCUMENT_TYPE_RDF = "rdf-resource";
private static final String VERTEX_ID_PREFIX = "v://";
private static final String EDGE_ID_PREFIX = "e://";
private static final String PG_ENTITY_ID_KEY = "label";

// Reference to Neptune entity corresponding to document. For Gremlin, it will be Vertex Id for Vertex document &
// Edge ID for Edge Document. For Sparql, it will be RDF subject URI
@JsonProperty("entity_id")
private String entityId;

// Store the Neptune entity type. Vertex/Edge label for gremlin. rdf:type for Sparql
@JsonProperty("entity_type")
private String entityType;

// Classify Open Search document. It could be one of vertex / edge / rdf-resource
@JsonProperty("document_type")
private String documentType;

// Nested Field for storing predicate corresponding to Graph vertex / Edge
@JsonProperty("predicate")
private ImmutablePair<String, NeptuneS3RecordPredicate> predicate;

private static NeptuneS3Record convertPropertyGraphStreamRecord(final NeptuneStreamRecord record) {
if (!(record.getData() instanceof PropertygraphData)) {
throw new IllegalArgumentException("Data must be a PropertygraphData");
}

String entityType = null;
ImmutablePair<String, NeptuneS3RecordPredicate> predicate = null;
final PropertygraphData propertygraphData = ((PropertygraphData) record.getData());
final String key = propertygraphData.key();
if (key.equalsIgnoreCase(PG_ENTITY_ID_KEY)) {
entityType = propertygraphData.value().asMap().get("value").asString();
} else {
predicate = ImmutablePair.of(key, NeptuneS3RecordPredicate.fromPropertGraphData(propertygraphData));
}

return NeptuneS3Record
.builder()
.entityId(getEntityIdForPropertyGraph(propertygraphData.type(), propertygraphData.id()))
.documentType(getDocumentTypeForPropertyGraph(propertygraphData.type()))
.entityType(entityType)
.predicate(predicate)
.build();
}

private static NeptuneS3Record convertSparqlStreamRecord(final NeptuneStreamRecord record) throws IOException {
if (!(record.getData() instanceof SparqlData)) {
throw new IllegalArgumentException("Data must be a SparqlData");
}
final Statement stmt = StreamUtils.parseSparqlStatement(((SparqlData) record.getData()).stmt());
final boolean isTypeStmt = stmt.getPredicate().isIRI() && stmt.getPredicate().equals(RDF.TYPE);
ImmutablePair<String, NeptuneS3RecordPredicate> predicate = null;
String entityType = null;

if (isTypeStmt) {
entityType = stmt.getObject().stringValue();
} else {
final String predicateName = stmt.getPredicate().stringValue();
final NeptuneS3RecordPredicate predicateValue = NeptuneS3RecordPredicate
.builder()
.value(stmt.getObject().stringValue())
.graph(stmt.getContext().stringValue())
.language(stmt.getObject() instanceof Literal ? ((Literal) stmt.getObject()).getLanguage().orElse(null) : null)
.build();
predicate = ImmutablePair.of(predicateName, predicateValue);
}
return NeptuneS3Record
.builder()
.entityId(stmt.getSubject().stringValue())
.documentType(DOCUMENT_TYPE_RDF)
.entityType(entityType)
.predicate(predicate)
.build();

}

public static NeptuneS3Record fromNeptuneStreamRecord(final NeptuneStreamRecord neptuneStreamRecord) throws IOException {
if (neptuneStreamRecord.getData() instanceof PropertygraphData) {
return convertPropertyGraphStreamRecord(neptuneStreamRecord);
}
return convertSparqlStreamRecord(neptuneStreamRecord);
}

private static String getEntityIdForPropertyGraph(final String type, final String entityId) {
return String.format("%s%s", type.startsWith("v") ? VERTEX_ID_PREFIX : EDGE_ID_PREFIX, entityId);
}

private static String getDocumentTypeForPropertyGraph(final String type) {
if (type.equalsIgnoreCase("vl") || type.equalsIgnoreCase("vp")) {
return DOCUMENT_TYPE_VERTEX;
}
return DOCUMENT_TYPE_EDGE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

@Builder
@Getter
public class OpenSearchDocumentPredicate {
public class NeptuneS3RecordPredicate {

@JsonProperty("value")
private String value;
Expand All @@ -24,7 +24,7 @@ public class OpenSearchDocumentPredicate {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OpenSearchDocumentPredicate that = (OpenSearchDocumentPredicate) o;
NeptuneS3RecordPredicate that = (NeptuneS3RecordPredicate) o;
return Objects.equals(value, that.value) && Objects.equals(graph, that.graph) &&
Objects.equals(language, that.language);
}
Expand All @@ -34,9 +34,9 @@ public int hashCode() {
return Objects.hash(value, graph, language);
}

public static OpenSearchDocumentPredicate fromPropertGraphData(final PropertygraphData propertygraphData) {
public static NeptuneS3RecordPredicate fromPropertGraphData(final PropertygraphData propertygraphData) {
final String value = propertygraphData.value().asMap().get("value").asString();
return OpenSearchDocumentPredicate
return NeptuneS3RecordPredicate
.builder()
.value(value)
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.neptune.converter;

import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.model.vocabulary.XSD;
import org.opensearch.dataprepper.plugins.source.neptune.configuration.NeptuneSourceConfig;
import org.opensearch.dataprepper.plugins.source.neptune.stream.StreamUtils;
import org.opensearch.dataprepper.plugins.source.neptune.stream.model.NeptuneStreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.neptunedata.model.PropertygraphData;
import software.amazon.awssdk.services.neptunedata.model.SparqlData;

import java.io.IOException;


/**
* Validates if the record from Neptune Streams is a valid record.
* (1) If {@link NeptuneSourceConfig#isEnableNonStringIndexing()} is true, then all datatypes are valid and mapped to
* OS datatypes as defined in https://docs.aws.amazon.com/neptune/latest/userguide/full-text-search-non-string-indexing-mapping.html.
* (2) Otherwise, only String datatypes are supported and any non-string record is dropped.
*/
public class NeptuneStreamRecordValidator {
private static final Logger LOG = LoggerFactory.getLogger(NeptuneStreamRecordValidator.class);
private final boolean allowNonStringDatatypes;

public NeptuneStreamRecordValidator(final boolean allowNonStringDatatypes) {
this.allowNonStringDatatypes = allowNonStringDatatypes;
}

public boolean isValid(final NeptuneStreamRecord record) {
if (record.getData() instanceof SparqlData) {
return isValidSparqlRecord(record);
}
return isValidPropertyGraphRecord(record);
}

private boolean isValidPropertyGraphRecord(final NeptuneStreamRecord record) {
if (allowNonStringDatatypes) {
return true;
}
final PropertygraphData data = (PropertygraphData) record.getData();
final String datatype = data.value().asMap().get("dataType").asString();
return datatype.equalsIgnoreCase("String");
}

private boolean isValidSparqlRecord(final NeptuneStreamRecord record) {
if (allowNonStringDatatypes) {
return true;
}
final SparqlData data = (SparqlData) record.getData();
try {
final Statement statement = StreamUtils.parseSparqlStatement(data.stmt());
if (!statement.getObject().isLiteral()) {
return false;
}
return isSparqlStringDatatype(((Literal) statement.getObject()).getDatatype());
} catch (IOException e) {
LOG.error("Failed to parse Sparql statement, Skipping record: ", e);
return false;
}
}

private static boolean isSparqlStringDatatype(IRI datatype) {
return XSD.STRING.equals(datatype) || RDF.LANGSTRING.equals(datatype);
}
}
Loading

0 comments on commit e525d90

Please sign in to comment.