Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filesource compression support #5255

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ Foundation (http://www.apache.org/).

This product includes software developed by
Joda.org (http://www.joda.org/).

This product includes software developed by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a requirement from your company? We don't require contributors to add to this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see if your company requires this. If not, it would be best to remove this change.

In the meantime, I'm also checking with the OpenSearch project to determine if this is an acceptable change or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they asked me to make this change. According to our OSS team, it's how the company can retain the copyright for my contributions as an employee.
I am not sure whether the notice file is the appropriate place to capture this, the information I found after some search tends to say it is not.
I think it is better to get some guidance.

Twilio Inc. (https://www.twilio.com/).
Copyright 2024 Twilio Inc.
2 changes: 2 additions & 0 deletions data-prepper-plugins/common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ A source plugin to read input data from the specified file path. The file source
Temporarily, `type` can either be `event` or `string`. If you would like to use the file source for log analytics use cases like grok,
change this to `event`.

* `compression` (String): The source file compression format, if any. Valid options are `none`, `gzip` and `snappy`. Default is `none`.

## `file` (sink)

A sink plugin to write output data to the specified file path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
Expand All @@ -23,11 +24,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -48,6 +52,7 @@ public class FileSource implements Source<Record<Object>> {
private final FileSourceConfig fileSourceConfig;
private final FileStrategy fileStrategy;
private final EventFactory eventFactory;
private final DecompressionEngine decompressionEngine;

private Thread readThread;

Expand All @@ -63,6 +68,7 @@ public FileSource(
this.fileSourceConfig = fileSourceConfig;
this.isStopRequested = false;
this.writeTimeout = FileSourceConfig.DEFAULT_TIMEOUT;
this.decompressionEngine = fileSourceConfig.getCompression().getDecompressionEngine();

if(fileSourceConfig.getCodec() != null) {
fileStrategy = new CodecFileStrategy(pluginFactory);
Expand Down Expand Up @@ -104,7 +110,8 @@ private interface FileStrategy {
private class ClassicFileStrategy implements FileStrategy {
@Override
public void start(Buffer<Record<Object>> buffer) {
try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileSourceConfig.getFilePathToRead()), StandardCharsets.UTF_8)) {
Path filePath = Paths.get(fileSourceConfig.getFilePathToRead());
try (BufferedReader reader = new BufferedReader(new InputStreamReader(decompressionEngine.createInputStream(Files.newInputStream(filePath)), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null && !isStopRequested) {
writeLineAsEventOrString(line, buffer);
Expand Down Expand Up @@ -166,13 +173,13 @@ private class CodecFileStrategy implements FileStrategy {
final PluginModel codecConfiguration = fileSourceConfig.getCodec();
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings);

}

@Override
public void start(final Buffer<Record<Object>> buffer) {
try {
codec.parse(new FileInputStream(fileSourceConfig.getFilePathToRead()), eventRecord -> {
Path filePath = Paths.get(fileSourceConfig.getFilePathToRead());
try(InputStream is = decompressionEngine.createInputStream(Files.newInputStream(filePath))) {
codec.parse(is, eventRecord -> {
try {
buffer.write((Record) eventRecord, writeTimeout);
} catch (TimeoutException e) {
Expand All @@ -186,4 +193,4 @@ public void start(final Buffer<Record<Object>> buffer) {
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.common.base.Preconditions;
import jakarta.validation.constraints.AssertTrue;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;

import java.util.Objects;

Expand All @@ -35,6 +36,9 @@ public class FileSourceConfig {
@JsonProperty("codec")
private PluginModel codec;

@JsonProperty("compression")
private CompressionOption compression = CompressionOption.NONE;

public String getFilePathToRead() {
return filePathToRead;
}
Expand All @@ -52,6 +56,10 @@ public PluginModel getCodec() {
return codec;
}

public CompressionOption getCompression() {
return compression;
}

void validate() {
Objects.requireNonNull(filePathToRead, "File path is required");
Preconditions.checkArgument(recordType.equals(EVENT_TYPE) || recordType.equals(DEFAULT_TYPE), "Invalid type: must be either [event] or [string]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
import org.opensearch.dataprepper.event.TestEventFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,6 +88,31 @@ private FileSource createObjectUnderTest() {
return new FileSource(fileSourceConfig, pluginMetrics, pluginFactory, TestEventFactory.getTestEventFactory());
}

/**
* Variant of creatgeObjectUnderTest that uses mocks for the configuration instead of object mapper, so we can
* pass concrete mocks to the FileSource through the FileSourceConfig.
* @param codec the codec to use in the configuration
* @param engine the {@link DecompressionEngine} to use in the configuration
* @return
*/
private FileSource createObjectUnderTest(PluginModel codec, DecompressionEngine engine) {
FileSourceConfig fileSourceConfig = mock(FileSourceConfig.class);

when(fileSourceConfig.getFilePathToRead()).thenReturn(TEST_FILE_PATH_PLAIN);

if (codec != null) {
when(fileSourceConfig.getCodec()).thenReturn(codec);
}

if (engine != null) {
CompressionOption compressionOption = mock(CompressionOption.class);
when(compressionOption.getDecompressionEngine()).thenReturn(engine);
when(fileSourceConfig.getCompression()).thenReturn(compressionOption);
}

return new FileSource(fileSourceConfig, pluginMetrics, pluginFactory, TestEventFactory.getTestEventFactory());
}

@Nested
class WithRecord {
private static final String TEST_PIPELINE_NAME = "pipeline";
Expand Down Expand Up @@ -278,6 +306,9 @@ class WithCodec {
@Mock
private Buffer buffer;

@Mock
private DecompressionEngine decompressionEngine;

@BeforeEach
void setUp() {
Map<String, String> codecConfiguration = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
Expand All @@ -290,21 +321,18 @@ void setUp() {

@Test
void start_will_parse_codec_with_correct_inputStream() throws IOException {
createObjectUnderTest().start(buffer);
final FileInputStream decompressedStream = new FileInputStream(TEST_FILE_PATH_PLAIN);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewrote the test: previous implementation worked by testing equality between the bytes produced by the source and the bytes produced by reading the file directly.
The test now checks that the codec is called with the inputstream returned by the decompression engine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than changing the whole test, could you just mock the DecompressionEngine to return exactly the bytes given?

Perhaps an alternative is to keep both tests.

Copy link
Contributor Author

@joelmarty joelmarty Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot keep the test as it is: the test relies on the previous implementation's behavior: it tries to read from the input stream created in FileSource.CodecFileStrategy#start but because this stream is created in a try-with-resource (see FileSource.java L181), it is closed when the test tries to read it again on FileSourceTest.java L303.
In other words, the test used to work because the input stream was never properly closed in the previous implementation of the WithCodec strategy. I could reintroduce what I believe is a bug to keep the test as-is, but IMHO it would make more sense to validate the test is equivalent to the previous implementation.

DecompressionEngine mockEngine = mock(DecompressionEngine.class);
when(mockEngine.createInputStream(any(InputStream.class))).thenReturn(decompressedStream);

final ArgumentCaptor<InputStream> inputStreamArgumentCaptor = ArgumentCaptor.forClass(InputStream.class);
PluginModel fakeCodec = mock(PluginModel.class);
when(fakeCodec.getPluginName()).thenReturn("fake_codec");
when(fakeCodec.getPluginSettings()).thenReturn(Map.of());

await().atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> verify(inputCodec).parse(any(InputStream.class), any(Consumer.class)));
verify(inputCodec).parse(inputStreamArgumentCaptor.capture(), any(Consumer.class));

final InputStream actualInputStream = inputStreamArgumentCaptor.getValue();
createObjectUnderTest(fakeCodec, mockEngine).start(buffer);

final byte[] actualBytes = actualInputStream.readAllBytes();
final FileInputStream fileInputStream = new FileInputStream(TEST_FILE_PATH_PLAIN);
final byte[] expectedBytes = fileInputStream.readAllBytes();

assertThat(actualBytes, equalTo(expectedBytes));
await().atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> verify(inputCodec).parse(eq(decompressedStream), any(Consumer.class)));
}

@Test
Expand Down
Loading