From adce15c7a55724c378b8468afcc7a84544384391 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Marty?= Date: Mon, 9 Dec 2024 11:58:39 +0100 Subject: [PATCH 1/7] Add support for compressed files in FileSource MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Joël Marty --- .../plugins/source/file/FileSource.java | 19 +++--- .../plugins/source/file/FileSourceConfig.java | 8 +++ .../plugins/source/file/FileSourceTests.java | 59 +++++++++++++------ 3 files changed, 61 insertions(+), 25 deletions(-) diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java index a0da7461f1..670c47dc07 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.DecompressionEngine; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -23,11 +24,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.IOException; +import java.io.*; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; @@ -48,6 +48,7 @@ public class FileSource implements Source> { private final FileSourceConfig fileSourceConfig; private final FileStrategy fileStrategy; private final EventFactory eventFactory; + private final DecompressionEngine decompressionEngine; private Thread readThread; @@ -63,6 +64,7 @@ public FileSource( this.fileSourceConfig = fileSourceConfig; this.isStopRequested = false; this.writeTimeout = FileSourceConfig.DEFAULT_TIMEOUT; + this.decompressionEngine = fileSourceConfig.getCompression().getDecompressionEngine(); if(fileSourceConfig.getCodec() != null) { fileStrategy = new CodecFileStrategy(pluginFactory); @@ -104,7 +106,8 @@ private interface FileStrategy { private class ClassicFileStrategy implements FileStrategy { @Override public void start(Buffer> buffer) { - try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileSourceConfig.getFilePathToRead()), StandardCharsets.UTF_8)) { + Path filePath = Paths.get(fileSourceConfig.getFilePathToRead()); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(decompressionEngine.createInputStream(Files.newInputStream(filePath)), StandardCharsets.UTF_8))) { String line; while ((line = reader.readLine()) != null && !isStopRequested) { writeLineAsEventOrString(line, buffer); @@ -166,13 +169,13 @@ private class CodecFileStrategy implements FileStrategy { final PluginModel codecConfiguration = fileSourceConfig.getCodec(); final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings); - } @Override public void start(final Buffer> buffer) { - try { - codec.parse(new FileInputStream(fileSourceConfig.getFilePathToRead()), eventRecord -> { + Path filePath = Paths.get(fileSourceConfig.getFilePathToRead()); + try(InputStream is = decompressionEngine.createInputStream(Files.newInputStream(filePath))) { + codec.parse(is, eventRecord -> { try { buffer.write((Record) eventRecord, writeTimeout); } catch (TimeoutException e) { @@ -186,4 +189,4 @@ public void start(final Buffer> buffer) { } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfig.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfig.java index 255857a4bb..9eb8dd961d 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfig.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSourceConfig.java @@ -10,6 +10,7 @@ import com.google.common.base.Preconditions; import jakarta.validation.constraints.AssertTrue; import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; import java.util.Objects; @@ -35,6 +36,9 @@ public class FileSourceConfig { @JsonProperty("codec") private PluginModel codec; + @JsonProperty("compression") + private CompressionOption compression = CompressionOption.NONE; + public String getFilePathToRead() { return filePathToRead; } @@ -52,6 +56,10 @@ public PluginModel getCodec() { return codec; } + public CompressionOption getCompression() { + return compression; + } + void validate() { Objects.requireNonNull(filePathToRead, "File path is required"); Preconditions.checkArgument(recordType.equals(EVENT_TYPE) || recordType.equals(DEFAULT_TYPE), "Invalid type: must be either [event] or [string]"); diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java index aedacdcbb2..fc8a08065d 100644 --- a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java @@ -14,17 +14,21 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.event.TestEventFactory; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.DecompressionEngine; import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventBuilder; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,11 +52,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; @ExtendWith(MockitoExtension.class) public class FileSourceTests { @@ -85,6 +85,31 @@ private FileSource createObjectUnderTest() { return new FileSource(fileSourceConfig, pluginMetrics, pluginFactory, TestEventFactory.getTestEventFactory()); } + /** + * Variant of creatgeObjectUnderTest that uses mocks for the configuration instead of object mapper, so we can + * pass concrete mocks to the FileSource through the FileSourceConfig. + * @param codec the codec to use in the configuration + * @param engine the {@link DecompressionEngine} to use in the configuration + * @return + */ + private FileSource createObjectUnderTest(PluginModel codec, DecompressionEngine engine) { + FileSourceConfig fileSourceConfig = mock(FileSourceConfig.class); + + when(fileSourceConfig.getFilePathToRead()).thenReturn(TEST_FILE_PATH_PLAIN); + + if (codec != null) { + when(fileSourceConfig.getCodec()).thenReturn(codec); + } + + if (engine != null) { + CompressionOption compressionOption = mock(CompressionOption.class); + when(compressionOption.getDecompressionEngine()).thenReturn(engine); + when(fileSourceConfig.getCompression()).thenReturn(compressionOption); + } + + return new FileSource(fileSourceConfig, pluginMetrics, pluginFactory, TestEventFactory.getTestEventFactory()); + } + @Nested class WithRecord { private static final String TEST_PIPELINE_NAME = "pipeline"; @@ -278,6 +303,9 @@ class WithCodec { @Mock private Buffer buffer; + @Mock + private DecompressionEngine decompressionEngine; + @BeforeEach void setUp() { Map codecConfiguration = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); @@ -290,21 +318,18 @@ void setUp() { @Test void start_will_parse_codec_with_correct_inputStream() throws IOException { - createObjectUnderTest().start(buffer); + final FileInputStream decompressedStream = new FileInputStream(TEST_FILE_PATH_PLAIN); + DecompressionEngine mockEngine = mock(DecompressionEngine.class); + when(mockEngine.createInputStream(any(InputStream.class))).thenReturn(decompressedStream); - final ArgumentCaptor inputStreamArgumentCaptor = ArgumentCaptor.forClass(InputStream.class); + PluginModel fakeCodec = mock(PluginModel.class); + when(fakeCodec.getPluginName()).thenReturn("fake_codec"); + when(fakeCodec.getPluginSettings()).thenReturn(Map.of()); - await().atMost(2, TimeUnit.SECONDS) - .untilAsserted(() -> verify(inputCodec).parse(any(InputStream.class), any(Consumer.class))); - verify(inputCodec).parse(inputStreamArgumentCaptor.capture(), any(Consumer.class)); - - final InputStream actualInputStream = inputStreamArgumentCaptor.getValue(); + createObjectUnderTest(fakeCodec, mockEngine).start(buffer); - final byte[] actualBytes = actualInputStream.readAllBytes(); - final FileInputStream fileInputStream = new FileInputStream(TEST_FILE_PATH_PLAIN); - final byte[] expectedBytes = fileInputStream.readAllBytes(); - - assertThat(actualBytes, equalTo(expectedBytes)); + await().atMost(2, TimeUnit.SECONDS) + .untilAsserted(() -> verify(inputCodec).parse(eq(decompressedStream), any(Consumer.class))); } @Test From 18d92c601b2377d028ed91716efe395b9009a6e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Marty?= Date: Mon, 9 Dec 2024 11:58:39 +0100 Subject: [PATCH 2/7] Add doc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Joël Marty --- data-prepper-plugins/common/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/data-prepper-plugins/common/README.md b/data-prepper-plugins/common/README.md index 96e5e560a2..cbcd185d2e 100644 --- a/data-prepper-plugins/common/README.md +++ b/data-prepper-plugins/common/README.md @@ -35,6 +35,8 @@ A source plugin to read input data from the specified file path. The file source Temporarily, `type` can either be `event` or `string`. If you would like to use the file source for log analytics use cases like grok, change this to `event`. +* `compression` (String): The source file compression format, if any. Default is `none`. + ## `file` (sink) A sink plugin to write output data to the specified file path. From 44c7a5f1da196058d4cd33df1668947386f39fd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Marty?= Date: Mon, 9 Dec 2024 11:58:39 +0100 Subject: [PATCH 3/7] Review doc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Joël Marty --- data-prepper-plugins/common/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-prepper-plugins/common/README.md b/data-prepper-plugins/common/README.md index cbcd185d2e..c774d72e95 100644 --- a/data-prepper-plugins/common/README.md +++ b/data-prepper-plugins/common/README.md @@ -35,7 +35,7 @@ A source plugin to read input data from the specified file path. The file source Temporarily, `type` can either be `event` or `string`. If you would like to use the file source for log analytics use cases like grok, change this to `event`. -* `compression` (String): The source file compression format, if any. Default is `none`. +* `compression` (String): The source file compression format, if any. Valid options are `none`, `gzip` and `snappy`. Default is `none`. ## `file` (sink) From 85e11f21fc7543d8af5f47db54326b45abae86cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Marty?= Date: Mon, 9 Dec 2024 11:58:39 +0100 Subject: [PATCH 4/7] Add notice MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Joël Marty --- NOTICE | 3 +++ 1 file changed, 3 insertions(+) diff --git a/NOTICE b/NOTICE index 6c7dc983f8..55b46e01d4 100644 --- a/NOTICE +++ b/NOTICE @@ -10,3 +10,6 @@ Foundation (http://www.apache.org/). This product includes software developed by Joda.org (http://www.joda.org/). + +This product includes software developed by +Twilio Inc. (https://www.twilio.com/). From a55115190b17eda0050743746a3a11bea8bd044e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Marty?= Date: Mon, 9 Dec 2024 11:58:39 +0100 Subject: [PATCH 5/7] Update copyright notice MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Joël Marty --- NOTICE | 1 + 1 file changed, 1 insertion(+) diff --git a/NOTICE b/NOTICE index 55b46e01d4..4a671c614a 100644 --- a/NOTICE +++ b/NOTICE @@ -13,3 +13,4 @@ Joda.org (http://www.joda.org/). This product includes software developed by Twilio Inc. (https://www.twilio.com/). +Copyright 2024 Twilio Inc. From 710dd13fa2560990f47511a9020c99cfa436c4fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Marty?= Date: Wed, 11 Dec 2024 18:30:39 +0100 Subject: [PATCH 6/7] Do not use .* import for checkstyle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Joël Marty --- .../dataprepper/plugins/source/file/FileSource.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java index 670c47dc07..9698144097 100644 --- a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/source/file/FileSource.java @@ -24,7 +24,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; From 4684124bbb14fd6d01e4933bebc53f58d8d58d67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Marty?= Date: Thu, 12 Dec 2024 10:20:59 +0100 Subject: [PATCH 7/7] Remove wildcard imports MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Joël Marty --- .../dataprepper/plugins/source/file/FileSourceTests.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java index fc8a08065d..e1111207d4 100644 --- a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/source/file/FileSourceTests.java @@ -14,7 +14,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.event.TestEventFactory; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -52,7 +51,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class FileSourceTests {