Skip to content

Commit

Permalink
remove mem control parameter (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
l2280212 authored Feb 1, 2024
1 parent f7d0a6c commit 930cef6
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ public RestorableTsFileIOWriter(File file) throws IOException {
public RestorableTsFileIOWriter(File file, long maxMetadataSize) throws IOException {
this(file, true);
this.maxMetadataSize = maxMetadataSize;
this.enableMemoryControl = true;
this.chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
this.checkMetadataSizeAndMayFlush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -111,7 +110,6 @@ public class TsFileIOWriter implements AutoCloseable {
protected volatile boolean hasChunkMetadataInDisk = false;
// record the total num of path in order to make bloom filter
protected int pathCount = 0;
protected boolean enableMemoryControl = false;
private Path lastSerializePath = null;
protected LinkedList<Long> endPosInCMTForDevice = new LinkedList<>();
private volatile int chunkMetadataCount = 0;
Expand Down Expand Up @@ -151,10 +149,8 @@ public TsFileIOWriter(TsFileOutput output, boolean test) {
}

/** for write with memory control */
public TsFileIOWriter(File file, boolean enableMemoryControl, long maxMetadataSize)
throws IOException {
public TsFileIOWriter(File file, long maxMetadataSize) throws IOException {
this(file);
this.enableMemoryControl = enableMemoryControl;
this.maxMetadataSize = maxMetadataSize;
chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX);
}
Expand Down Expand Up @@ -306,9 +302,7 @@ public void writeChunk(Chunk chunk) throws IOException {

/** end chunk and write some log. */
public void endCurrentChunk() {
if (enableMemoryControl) {
this.currentChunkMetadataSize += currentChunkMetadata.getRetainedSizeInBytes();
}
this.currentChunkMetadataSize += currentChunkMetadata.getRetainedSizeInBytes();
chunkMetadataCount++;
chunkMetadataList.add(currentChunkMetadata);
currentChunkMetadata = null;
Expand Down Expand Up @@ -508,40 +502,6 @@ public void setFile(File file) {
this.file = file;
}

/** Remove such ChunkMetadata that its startTime is not in chunkStartTimes */
public void filterChunks(Map<Path, List<Long>> chunkStartTimes) {
Map<Path, Integer> startTimeIdxes = new HashMap<>();
chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0));

Iterator<ChunkGroupMetadata> chunkGroupMetaDataIterator = chunkGroupMetadataList.iterator();
while (chunkGroupMetaDataIterator.hasNext()) {
ChunkGroupMetadata chunkGroupMetaData = chunkGroupMetaDataIterator.next();
String deviceId = chunkGroupMetaData.getDevice();
int chunkNum = chunkGroupMetaData.getChunkMetadataList().size();
Iterator<ChunkMetadata> chunkMetaDataIterator =
chunkGroupMetaData.getChunkMetadataList().iterator();
while (chunkMetaDataIterator.hasNext()) {
IChunkMetadata chunkMetaData = chunkMetaDataIterator.next();
Path path = new Path(deviceId, chunkMetaData.getMeasurementUid(), true);
int startTimeIdx = startTimeIdxes.get(path);

List<Long> pathChunkStartTimes = chunkStartTimes.get(path);
boolean chunkValid =
startTimeIdx < pathChunkStartTimes.size()
&& pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime();
if (!chunkValid) {
chunkMetaDataIterator.remove();
chunkNum--;
} else {
startTimeIdxes.put(path, startTimeIdx + 1);
}
}
if (chunkNum == 0) {
chunkGroupMetaDataIterator.remove();
}
}
}

public void writePlanIndices() throws IOException {
ReadWriteIOUtils.write(MetaMarker.OPERATION_INDEX_RANGE, out.wrapAsStream());
ReadWriteIOUtils.write(minPlanIndex, out.wrapAsStream());
Expand Down Expand Up @@ -630,7 +590,7 @@ public void setMaxPlanIndex(long maxPlanIndex) {
*/
public int checkMetadataSizeAndMayFlush() throws IOException {
// This function should be called after all data of an aligned device has been written
if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) {
if (currentChunkMetadataSize > maxMetadataSize) {
try {
if (logger.isDebugEnabled()) {
logger.debug(
Expand Down Expand Up @@ -700,7 +660,7 @@ private int writeChunkMetadataToTempFile(
// for each device, we only serialize it once, in order to save io
writtenSize += ReadWriteIOUtils.write(seriesPath.getDevice(), tempOutput.wrapAsStream());
}
if (isNewPath && iChunkMetadataList.size() > 0) {
if (isNewPath && !iChunkMetadataList.isEmpty()) {
// serialize the public info of this measurement
writtenSize +=
ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), tempOutput.wrapAsStream());
Expand All @@ -719,10 +679,6 @@ private int writeChunkMetadataToTempFile(
return writtenSize;
}

public String getCurrentChunkGroupDeviceId() {
return currentChunkGroupDeviceId;
}

public List<ChunkGroupMetadata> getChunkGroupMetadataList() {
return chunkGroupMetadataList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void tearDown() throws IOException {
/** The following tests is for ChunkMetadata serialization and deserialization. */
@Test
public void testSerializeAndDeserializeChunkMetadata() throws IOException {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) {
List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
Expand Down Expand Up @@ -147,7 +147,7 @@ public void testSerializeAndDeserializeChunkMetadata() throws IOException {

@Test
public void testSerializeAndDeserializeAlignedChunkMetadata() throws IOException {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) {
List<ChunkMetadata> originChunkMetadataList = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
Expand Down Expand Up @@ -185,7 +185,7 @@ public void testSerializeAndDeserializeAlignedChunkMetadata() throws IOException

@Test
public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024 * 1024 * 10)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) {
List<IChunkMetadata> originChunkMetadataList = new ArrayList<>();
List<String> seriesIds = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
Expand Down Expand Up @@ -258,7 +258,7 @@ public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException {
@Test
public void testWriteCompleteFileWithNormalChunk() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -311,7 +311,7 @@ public void testWriteCompleteFileWithNormalChunk() throws IOException {
@Test
public void testWriteCompleteFileWithMultipleNormalChunk() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -398,7 +398,7 @@ public void testWriteCompleteFileWithMultipleNormalChunk() throws IOException {
@Test
public void testWriteCompleteFileWithMetadataRemainsInMemoryWhenEndFile() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -490,7 +490,7 @@ public void testWriteCompleteFileWithEnormousNormalChunk() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
long originTestChunkSize = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 10;
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 2; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -581,7 +581,7 @@ public void testWriteCompleteFileWithEnormousSeriesNum() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes = new HashMap<>();
long originTestChunkSize = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 1;
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 2; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -672,7 +672,7 @@ public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originTimes = new HashMap<>();
long originTestChunkSize = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 10;
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 1024; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -762,7 +762,7 @@ public void testWriteCompleteFileWithEnormousDeviceNum() throws IOException {
@Test
public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -798,7 +798,7 @@ public void testWriteCompleteFileWithAlignedSeriesWithOneChunk() throws IOExcept
public void testWriteCompleteFileWithAlignedSeriesWithMultiChunks() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
int chunkNum = 512, seriesNum = 6;
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 1; ++i) {
String deviceId = sortedDeviceId.get(i);
for (int k = 0; k < chunkNum; ++k) {
Expand Down Expand Up @@ -840,7 +840,7 @@ public void testWriteCompleteFileWithAlignedSeriesWithManyComponents() throws IO
long originTestPointNum = TEST_CHUNK_SIZE;
TEST_CHUNK_SIZE = 10;
try {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 10; ++i) {
String deviceId = sortedDeviceId.get(i);
for (int k = 0; k < chunkNum; ++k) {
Expand Down Expand Up @@ -881,7 +881,7 @@ public void testWriteCompleteFileWithLotsAlignedSeries() throws IOException {
TEST_CHUNK_SIZE = 10;
int deviceNum = 1024;
try {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < deviceNum; ++i) {
String deviceId = sortedDeviceId.get(i);
for (int k = 0; k < chunkNum; ++k) {
Expand Down Expand Up @@ -918,7 +918,7 @@ public void testWriteCompleteFileWithLotsAlignedSeries() throws IOException {
public void testWritingAlignedSeriesByColumnWithMultiComponents() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
TEST_CHUNK_SIZE = 10;
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 5; i++) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -976,7 +976,7 @@ public void testWritingAlignedSeriesByColumnWithMultiComponents() throws IOExcep
@Test
public void testWritingCompleteMixedFiles() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originData = new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 5; ++i) {
String deviceId = sortedDeviceId.get(i);
for (int k = 0; k < 10; ++k) {
Expand Down Expand Up @@ -1075,7 +1075,7 @@ public void testWritingCompleteMixedFiles() throws IOException {
@Test
public void testWritingAlignedSeriesByColumn() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 5; i++) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down Expand Up @@ -1129,7 +1129,7 @@ public void testWritingAlignedSeriesByColumn() throws IOException {
@Test
public void testWritingAlignedSeriesByColumnWithMultiChunks() throws IOException {
Map<String, Map<String, List<List<Pair<Long, TsPrimitiveType>>>>> originValue = new HashMap<>();
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, true, 1024)) {
try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024)) {
for (int i = 0; i < 5; i++) {
String deviceId = sortedDeviceId.get(i);
writer.startChunkGroup(deviceId);
Expand Down

0 comments on commit 930cef6

Please sign in to comment.