diff --git a/tsfile/src/main/java/org/apache/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/tsfile/write/writer/RestorableTsFileIOWriter.java index c66c6c6e8..60e818dec 100644 --- a/tsfile/src/main/java/org/apache/tsfile/write/writer/RestorableTsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/tsfile/write/writer/RestorableTsFileIOWriter.java @@ -85,7 +85,6 @@ public RestorableTsFileIOWriter(File file) throws IOException { public RestorableTsFileIOWriter(File file, long maxMetadataSize) throws IOException { this(file, true); this.maxMetadataSize = maxMetadataSize; - this.enableMemoryControl = true; this.chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX); this.checkMetadataSizeAndMayFlush(); } diff --git a/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java index 2427e7e8d..b8201d7a8 100644 --- a/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java @@ -57,7 +57,6 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -111,7 +110,6 @@ public class TsFileIOWriter implements AutoCloseable { protected volatile boolean hasChunkMetadataInDisk = false; // record the total num of path in order to make bloom filter protected int pathCount = 0; - protected boolean enableMemoryControl = false; private Path lastSerializePath = null; protected LinkedList endPosInCMTForDevice = new LinkedList<>(); private volatile int chunkMetadataCount = 0; @@ -151,10 +149,8 @@ public TsFileIOWriter(TsFileOutput output, boolean test) { } /** for write with memory control */ - public TsFileIOWriter(File file, boolean enableMemoryControl, long maxMetadataSize) - throws IOException { + public TsFileIOWriter(File file, long maxMetadataSize) throws IOException { this(file); - this.enableMemoryControl = enableMemoryControl; this.maxMetadataSize = maxMetadataSize; chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX); } @@ -306,9 +302,7 @@ public void writeChunk(Chunk chunk) throws IOException { /** end chunk and write some log. */ public void endCurrentChunk() { - if (enableMemoryControl) { - this.currentChunkMetadataSize += currentChunkMetadata.getRetainedSizeInBytes(); - } + this.currentChunkMetadataSize += currentChunkMetadata.getRetainedSizeInBytes(); chunkMetadataCount++; chunkMetadataList.add(currentChunkMetadata); currentChunkMetadata = null; @@ -508,40 +502,6 @@ public void setFile(File file) { this.file = file; } - /** Remove such ChunkMetadata that its startTime is not in chunkStartTimes */ - public void filterChunks(Map> chunkStartTimes) { - Map startTimeIdxes = new HashMap<>(); - chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0)); - - Iterator chunkGroupMetaDataIterator = chunkGroupMetadataList.iterator(); - while (chunkGroupMetaDataIterator.hasNext()) { - ChunkGroupMetadata chunkGroupMetaData = chunkGroupMetaDataIterator.next(); - String deviceId = chunkGroupMetaData.getDevice(); - int chunkNum = chunkGroupMetaData.getChunkMetadataList().size(); - Iterator chunkMetaDataIterator = - chunkGroupMetaData.getChunkMetadataList().iterator(); - while (chunkMetaDataIterator.hasNext()) { - IChunkMetadata chunkMetaData = chunkMetaDataIterator.next(); - Path path = new Path(deviceId, chunkMetaData.getMeasurementUid(), true); - int startTimeIdx = startTimeIdxes.get(path); - - List pathChunkStartTimes = chunkStartTimes.get(path); - boolean chunkValid = - startTimeIdx < pathChunkStartTimes.size() - && pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime(); - if (!chunkValid) { - chunkMetaDataIterator.remove(); - chunkNum--; - } else { - startTimeIdxes.put(path, startTimeIdx + 1); - } - } - if (chunkNum == 0) { - chunkGroupMetaDataIterator.remove(); - } - } - } - public void writePlanIndices() throws IOException { ReadWriteIOUtils.write(MetaMarker.OPERATION_INDEX_RANGE, out.wrapAsStream()); ReadWriteIOUtils.write(minPlanIndex, out.wrapAsStream()); @@ -630,7 +590,7 @@ public void setMaxPlanIndex(long maxPlanIndex) { */ public int checkMetadataSizeAndMayFlush() throws IOException { // This function should be called after all data of an aligned device has been written - if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) { + if (currentChunkMetadataSize > maxMetadataSize) { try { if (logger.isDebugEnabled()) { logger.debug( @@ -700,7 +660,7 @@ private int writeChunkMetadataToTempFile( // for each device, we only serialize it once, in order to save io writtenSize += ReadWriteIOUtils.write(seriesPath.getDevice(), tempOutput.wrapAsStream()); } - if (isNewPath && iChunkMetadataList.size() > 0) { + if (isNewPath && !iChunkMetadataList.isEmpty()) { // serialize the public info of this measurement writtenSize += ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), tempOutput.wrapAsStream()); @@ -719,10 +679,6 @@ private int writeChunkMetadataToTempFile( return writtenSize; } - public String getCurrentChunkGroupDeviceId() { - return currentChunkGroupDeviceId; - } - public List getChunkGroupMetadataList() { return chunkGroupMetadataList; } diff --git a/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java b/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java index 52b2add20..32b824bad 100644 --- a/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java +++ b/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java @@ -95,7 +95,7 @@ public void tearDown() throws IOException { /** The following tests is for ChunkMetadata serialization and deserialization. */ @Test public void testSerializeAndDeserializeChunkMetadata() throws IOException { - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) { List originChunkMetadataList = new ArrayList<>(); for (int i = 0; i < 10; ++i) { String deviceId = sortedDeviceId.get(i); @@ -147,7 +147,7 @@ public void testSerializeAndDeserializeChunkMetadata() throws IOException { @Test public void testSerializeAndDeserializeAlignedChunkMetadata() throws IOException { - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) { List originChunkMetadataList = new ArrayList<>(); for (int i = 0; i < 10; ++i) { String deviceId = sortedDeviceId.get(i); @@ -185,7 +185,7 @@ public void testSerializeAndDeserializeAlignedChunkMetadata() throws IOException @Test public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException { - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) { List originChunkMetadataList = new ArrayList<>(); List seriesIds = new ArrayList<>(); for (int i = 0; i < 10; ++i) { @@ -258,7 +258,7 @@ public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException { @Test public void testWriteCompleteFileWithNormalChunk() throws IOException { Map>>>> originData = new HashMap<>(); - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 10; ++i) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -311,7 +311,7 @@ public void testWriteCompleteFileWithNormalChunk() throws IOException { @Test public void testWriteCompleteFileWithMultipleNormalChunk() throws IOException { Map>>>> originData = new HashMap<>(); - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 10; ++i) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -398,7 +398,7 @@ public void testWriteCompleteFileWithMultipleNormalChunk() throws IOException { @Test public void testWriteCompleteFileWithMetadataRemainsInMemoryWhenEndFile() throws IOException { Map>>>> originData = new HashMap<>(); - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 10; ++i) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -490,7 +490,7 @@ public void testWriteCompleteFileWithEnormousNormalChunk() throws IOException { Map>>>> originData = new HashMap<>(); long originTestChunkSize = TEST_CHUNK_SIZE; TEST_CHUNK_SIZE = 10; - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 2; ++i) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -581,7 +581,7 @@ public void testWriteCompleteFileWithEnormousSeriesNum() throws IOException { Map>>>> originTimes = new HashMap<>(); long originTestChunkSize = TEST_CHUNK_SIZE; TEST_CHUNK_SIZE = 1; - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 2; ++i) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -672,7 +672,7 @@ public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException { Map>>>> originTimes = new HashMap<>(); long originTestChunkSize = TEST_CHUNK_SIZE; TEST_CHUNK_SIZE = 10; - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 1024; ++i) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -762,7 +762,7 @@ public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException { @Test public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws IOException { Map>>>> originData = new HashMap<>(); - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 10; ++i) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -798,7 +798,7 @@ public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws IOExcept public void testWriteCompleteFileWithAlignedSeriesWithMultiChunks() throws IOException { Map>>>> originData = new HashMap<>(); int chunkNum = 512, seriesNum = 6; - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 1; ++i) { String deviceId = sortedDeviceId.get(i); for (int k = 0; k < chunkNum; ++k) { @@ -840,7 +840,7 @@ public void testWriteCompleteFileWithAlignedSeriesWithManyComponents() throws IO long originTestPointNum = TEST_CHUNK_SIZE; TEST_CHUNK_SIZE = 10; try { - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 10; ++i) { String deviceId = sortedDeviceId.get(i); for (int k = 0; k < chunkNum; ++k) { @@ -881,7 +881,7 @@ public void testWriteCompleteFileWithLotsAlignedSeries() throws IOException { TEST_CHUNK_SIZE = 10; int deviceNum = 1024; try { - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < deviceNum; ++i) { String deviceId = sortedDeviceId.get(i); for (int k = 0; k < chunkNum; ++k) { @@ -918,7 +918,7 @@ public void testWriteCompleteFileWithLotsAlignedSeries() throws IOException { public void testWritingAlignedSeriesByColumnWithMultiComponents() throws IOException { Map>>>> originValue = new HashMap<>(); TEST_CHUNK_SIZE = 10; - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 5; i++) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -976,7 +976,7 @@ public void testWritingAlignedSeriesByColumnWithMultiComponents() throws IOExcep @Test public void testWritingCompleteMixedFiles() throws IOException { Map>>>> originData = new HashMap<>(); - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 5; ++i) { String deviceId = sortedDeviceId.get(i); for (int k = 0; k < 10; ++k) { @@ -1075,7 +1075,7 @@ public void testWritingCompleteMixedFiles() throws IOException { @Test public void testWritingAlignedSeriesByColumn() throws IOException { Map>>>> originValue = new HashMap<>(); - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 5; i++) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId); @@ -1129,7 +1129,7 @@ public void testWritingAlignedSeriesByColumn() throws IOException { @Test public void testWritingAlignedSeriesByColumnWithMultiChunks() throws IOException { Map>>>> originValue = new HashMap<>(); - try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) { for (int i = 0; i < 5; i++) { String deviceId = sortedDeviceId.get(i); writer.startChunkGroup(deviceId);