From 69fb039ba3a3ac9d3f625786b23993b7255a44c0 Mon Sep 17 00:00:00 2001 From: Jackie Tien Date: Wed, 13 Nov 2024 09:28:12 +0800 Subject: [PATCH] Add LongConsumer ioSizeRecorder in TsFileSequenceReader for IoTDB scan --- .../tsfile/file/header/ChunkHeader.java | 21 ++ .../tsfile/read/TsFileSequenceReader.java | 269 +++++++++++++++--- .../tsfile/read/UnClosedTsFileReader.java | 12 +- .../read/TimeSeriesMetadataReadTest.java | 4 +- .../tsfile/read/UnClosedTsFileReaderTest.java | 2 +- 5 files changed, 256 insertions(+), 52 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java b/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java index 6cb3bee56..c07071fb7 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java @@ -36,6 +36,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.function.LongConsumer; public class ChunkHeader { @@ -189,7 +190,23 @@ public static ChunkHeader deserializeFrom(InputStream inputStream, byte chunkTyp * @throws IOException IOException */ public static ChunkHeader deserializeFrom(TsFileInput input, long offset) throws IOException { + return deserializeFrom(input, offset, null); + } + /** + * deserialize from TsFileInput, the marker has not been read. + * + * @param input TsFileInput + * @param offset offset + * @param ioSizeRecorder can be null + * @return CHUNK_HEADER object + * @throws IOException IOException + */ + public static ChunkHeader deserializeFrom( + TsFileInput input, long offset, LongConsumer ioSizeRecorder) throws IOException { + + // only 6 bytes, no need to call ioSizeRecorder.accept alone, combine into the remaining raed + // operation ByteBuffer buffer = ByteBuffer.allocate(Byte.BYTES + Integer.BYTES + 1); input.read(buffer, offset); buffer.flip(); @@ -208,6 +225,10 @@ public static ChunkHeader deserializeFrom(TsFileInput input, long offset) throws + TSEncoding.getSerializedSize(); buffer = ByteBuffer.allocate(remainingBytes); + if (ioSizeRecorder != null) { + ioSizeRecorder.accept((long) alreadyReadLength + remainingBytes); + } + input.read(buffer, offset + alreadyReadLength); buffer.flip(); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java index 1374a6d48..af8e0f73d 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java @@ -103,6 +103,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.LongConsumer; import java.util.stream.Collectors; public class TsFileSequenceReader implements AutoCloseable { @@ -144,7 +145,21 @@ public class TsFileSequenceReader implements AutoCloseable { * @throws IOException If some I/O error occurs */ public TsFileSequenceReader(String file) throws IOException { - this(file, true); + this(file, null); + } + + /** + * Create a file reader of the given file. The reader will read the tail of the file to get the + * file metadata size.Then the reader will skip the first + * TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.NUMBER_VERSION.getBytes().length + * bytes of the file for preparing reading real data. + * + * @param file the data file + * @param ioSizeRecorder can be null + * @throws IOException If some I/O error occurs + */ + public TsFileSequenceReader(String file, LongConsumer ioSizeRecorder) throws IOException { + this(file, true, ioSizeRecorder); } /** @@ -154,6 +169,18 @@ public TsFileSequenceReader(String file) throws IOException { * @param loadMetadataSize -whether load meta data size */ public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOException { + this(file, loadMetadataSize, null); + } + + /** + * construct function for TsFileSequenceReader. + * + * @param file -given file name + * @param loadMetadataSize -whether load meta data size + * @param ioSizeRecorder can be null + */ + public TsFileSequenceReader(String file, boolean loadMetadataSize, LongConsumer ioSizeRecorder) + throws IOException { if (resourceLogger.isDebugEnabled()) { resourceLogger.debug("{} reader is opened. {}", file, getClass().getName()); } @@ -161,9 +188,9 @@ public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOExce tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file); try { - loadFileVersion(); + loadFileVersion(ioSizeRecorder); if (loadMetadataSize) { - loadMetadataSize(); + loadMetadataSize(ioSizeRecorder); } } catch (Throwable e) { tsFileInput.close(); @@ -224,10 +251,14 @@ public TsFileSequenceReader(TsFileInput input, long fileMetadataPos, int fileMet this.fileMetadataSize = fileMetadataSize; } - private void loadFileVersion() throws IOException { + // ioSizeRecorder can be null + private void loadFileVersion(LongConsumer ioSizeRecorder) throws IOException { try { tsFileInput.position(TSFileConfig.MAGIC_STRING.getBytes(TSFileConfig.STRING_CHARSET).length); - final ByteBuffer buffer = ByteBuffer.allocate(1); + final ByteBuffer buffer = ByteBuffer.allocate(Byte.BYTES); + if (ioSizeRecorder != null) { + ioSizeRecorder.accept(Byte.BYTES); + } tsFileInput.read(buffer); buffer.flip(); fileVersion = buffer.get(); @@ -255,8 +286,19 @@ private void checkFileVersion() throws FileVersionTooOldException { } public void loadMetadataSize() throws IOException { - ByteBuffer metadataSize = ByteBuffer.allocate(Integer.BYTES); + loadMetadataSize(null); + } + + /** + * @param ioSizeRecorder can be null + */ + public void loadMetadataSize(LongConsumer ioSizeRecorder) throws IOException { + int readSize = Integer.BYTES; + ByteBuffer metadataSize = ByteBuffer.allocate(readSize); if (readTailMagic().equals(TSFileConfig.MAGIC_STRING)) { + if (ioSizeRecorder != null) { + ioSizeRecorder.accept(readSize); + } tsFileInput.read( metadataSize, tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES); @@ -329,7 +371,8 @@ public String readHeadMagic() throws IOException { /** this function reads version number and checks compatibility of TsFile. */ public byte readVersionNumber() throws IOException { ByteBuffer versionNumberByte = ByteBuffer.allocate(Byte.BYTES); - tsFileInput.read(versionNumberByte, TSFileConfig.MAGIC_STRING.getBytes().length); + tsFileInput.read( + versionNumberByte, TSFileConfig.MAGIC_STRING.getBytes(TSFileConfig.STRING_CHARSET).length); versionNumberByte.flip(); return versionNumberByte.get(); } @@ -340,13 +383,20 @@ public byte readVersionNumber() throws IOException { * @throws IOException io error */ public TsFileMetadata readFileMetadata() throws IOException { + return readFileMetadata(null); + } + + /** + * @param ioSizeRecorder can be null + */ + public TsFileMetadata readFileMetadata(LongConsumer ioSizeRecorder) throws IOException { try { if (tsFileMetaData == null) { synchronized (this) { if (tsFileMetaData == null) { tsFileMetaData = deserializeConfig.tsFileMetadataBufferDeserializer.deserialize( - readData(fileMetadataPos, fileMetadataSize), deserializeConfig); + readData(fileMetadataPos, fileMetadataSize, ioSizeRecorder), deserializeConfig); } } } @@ -365,7 +415,14 @@ public TsFileMetadata readFileMetadata() throws IOException { * @throws IOException io error */ public BloomFilter readBloomFilter() throws IOException { - readFileMetadata(); + return readBloomFilter(null); + } + + /** + * @param ioSizeRecorder can be null + */ + public BloomFilter readBloomFilter(LongConsumer ioSizeRecorder) throws IOException { + readFileMetadata(ioSizeRecorder); return tsFileMetaData.getBloomFilter(); } @@ -378,8 +435,15 @@ public BloomFilter readBloomFilter() throws IOException { * @throws IOException if an I/O error occurs while reading the file metadata */ public EncryptParameter getEncryptParam() throws IOException { + return getEncryptParam(null); + } + + /** + * @param ioSizeRecorder can be null + */ + public EncryptParameter getEncryptParam(LongConsumer ioSizeRecorder) throws IOException { if (fileMetadataSize != 0) { - readFileMetadata(); + readFileMetadata(ioSizeRecorder); return tsFileMetaData.getEncryptParam(); } return EncryptUtils.encryptParam; @@ -438,18 +502,28 @@ private Map readDeviceMetadataFromDisk(IDeviceID dev public TimeseriesMetadata readTimeseriesMetadata( IDeviceID device, String measurement, boolean ignoreNotExists) throws IOException { - readFileMetadata(); + return readTimeseriesMetadata(device, measurement, ignoreNotExists, null); + } + + public TimeseriesMetadata readTimeseriesMetadata( + IDeviceID device, + String measurement, + boolean ignoreNotExistDevice, + LongConsumer ioSizeConsumer) + throws IOException { + readFileMetadata(ioSizeConsumer); MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getTableMetadataIndexNode(device.getTableName()); Pair metadataIndexPair = - getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, true); + getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, true, ioSizeConsumer); if (metadataIndexPair == null) { - if (ignoreNotExists) { + if (ignoreNotExistDevice) { return null; } throw new IOException("Device {" + device + "} is not in tsFileMetaData"); } - ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); + ByteBuffer buffer = + readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeConsumer); MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode; if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { try { @@ -461,14 +535,16 @@ public TimeseriesMetadata readTimeseriesMetadata( throw e; } metadataIndexPair = - getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, measurement, false); + getMetadataAndEndOffsetOfMeasurementNode( + metadataIndexNode, measurement, false, ioSizeConsumer); } if (metadataIndexPair == null) { return null; } List timeseriesMetadataList = new ArrayList<>(); if (metadataIndexPair.right - metadataIndexPair.left.getOffset() < Integer.MAX_VALUE) { - buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); + buffer = + readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeConsumer); while (buffer.hasRemaining()) { try { timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true)); @@ -479,6 +555,9 @@ public TimeseriesMetadata readTimeseriesMetadata( } } } else { + if (ioSizeConsumer != null) { + ioSizeConsumer.accept(metadataIndexPair.right - metadataIndexPair.left.getOffset()); + } // when the buffer length is over than Integer.MAX_VALUE, // using tsFileInput to get timeseriesMetadataList tsFileInput.position(metadataIndexPair.left.getOffset()); @@ -499,7 +578,7 @@ public TimeseriesMetadata readTimeseriesMetadata( } // This method is only used for TsFile - public ITimeSeriesMetadata readITimeseriesMetadata(Path path, boolean ignoreNotExists) + public ITimeSeriesMetadata readITimeseriesMetadata(Path path, boolean ignoreNotExistDevice) throws IOException { readFileMetadata(); MetadataIndexNode deviceMetadataIndexNode = @@ -507,7 +586,7 @@ public ITimeSeriesMetadata readITimeseriesMetadata(Path path, boolean ignoreNotE Pair metadataIndexPair = getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, path.getIDeviceID(), true); if (metadataIndexPair == null) { - if (ignoreNotExists) { + if (ignoreNotExistDevice) { return null; } throw new IOException("Device {" + path.getDeviceString() + "} is not in tsFileMetaData"); @@ -526,7 +605,8 @@ public ITimeSeriesMetadata readITimeseriesMetadata(Path path, boolean ignoreNotE } firstTimeseriesMetadata = getTimeColumnMetadata(metadataIndexNode); metadataIndexPair = - getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, path.getMeasurement(), false); + getMetadataAndEndOffsetOfMeasurementNode( + metadataIndexNode, path.getMeasurement(), false, null); if (metadataIndexPair == null) { return null; @@ -558,18 +638,33 @@ public ITimeSeriesMetadata readITimeseriesMetadata(Path path, boolean ignoreNotE } } - /* Find the leaf node that contains path, return all the sensors in that leaf node which are also in allSensors set */ + /** + * Find the leaf node that contains path, return all the sensors in that leaf node which are also + * in allSensors set + * + * @param ignoreNotExistDevice whether throw IOException if device not found + * @param ioSizeRecorder can be null + */ public List readTimeseriesMetadata( - IDeviceID device, String measurement, Set allSensors) throws IOException { + IDeviceID device, + String measurement, + Set allSensors, + boolean ignoreNotExistDevice, + LongConsumer ioSizeRecorder) + throws IOException { Pair metadataIndexPair = - getLeafMetadataIndexPair(device, measurement); + getLeafMetadataIndexPair(device, measurement, ioSizeRecorder); if (metadataIndexPair == null) { - return Collections.emptyList(); + if (ignoreNotExistDevice) { + return Collections.emptyList(); + } + throw new IOException("Device {" + device + "} is not in tsFileMetaData"); } List timeseriesMetadataList = new ArrayList<>(); if (metadataIndexPair.right - metadataIndexPair.left.getOffset() < Integer.MAX_VALUE) { - ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); + ByteBuffer buffer = + readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeRecorder); while (buffer.hasRemaining()) { TimeseriesMetadata timeseriesMetadata; try { @@ -587,6 +682,9 @@ public List readTimeseriesMetadata( // when the buffer length is over than Integer.MAX_VALUE, // using tsFileInput to get timeseriesMetadataList synchronized (this) { + if (ioSizeRecorder != null) { + ioSizeRecorder.accept(metadataIndexPair.right - metadataIndexPair.left.getOffset()); + } tsFileInput.position(metadataIndexPair.left.getOffset()); while (tsFileInput.position() < metadataIndexPair.right) { TimeseriesMetadata timeseriesMetadata; @@ -610,16 +708,17 @@ public List readTimeseriesMetadata( /* Get leaf MetadataIndexPair which contains path */ private Pair getLeafMetadataIndexPair( - IDeviceID device, String measurement) throws IOException { - readFileMetadata(); + IDeviceID device, String measurement, LongConsumer ioSizeRecorder) throws IOException { + readFileMetadata(ioSizeRecorder); MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getTableMetadataIndexNode(device.getTableName()); Pair metadataIndexPair = - getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, true); + getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, true, ioSizeRecorder); if (metadataIndexPair == null) { return null; } - ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); + ByteBuffer buffer = + readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, ioSizeRecorder); MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode; if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { try { @@ -631,7 +730,8 @@ private Pair getLeafMetadataIndexPair( throw e; } metadataIndexPair = - getMetadataAndEndOffsetOfMeasurementNode(metadataIndexNode, measurement, false); + getMetadataAndEndOffsetOfMeasurementNode( + metadataIndexNode, measurement, false, ioSizeRecorder); } return metadataIndexPair; } @@ -700,7 +800,7 @@ public boolean readITimeseriesMetadata( List timeseriesMetadataList, MetadataIndexNode node, String measurement) throws IOException { Pair measurementMetadataIndexPair = - getMetadataAndEndOffsetOfMeasurementNode(node, measurement, false); + getMetadataAndEndOffsetOfMeasurementNode(node, measurement, false, null); if (measurementMetadataIndexPair == null) { return false; @@ -1400,6 +1500,18 @@ private List getDeviceTimeseriesMetadata( */ protected Pair getMetadataAndEndOffsetOfDeviceNode( MetadataIndexNode metadataIndex, IDeviceID deviceID, boolean exactSearch) throws IOException { + return getMetadataAndEndOffsetOfDeviceNode(metadataIndex, deviceID, exactSearch, null); + } + + /** + * @param ioSizeRecorder can be null + */ + protected Pair getMetadataAndEndOffsetOfDeviceNode( + MetadataIndexNode metadataIndex, + IDeviceID deviceID, + boolean exactSearch, + LongConsumer ioSizeRecorder) + throws IOException { if (metadataIndex == null) { return null; } @@ -1411,12 +1523,14 @@ protected Pair getMetadataAndEndOffsetOfDeviceNode( if (MetadataIndexNodeType.INTERNAL_DEVICE.equals(metadataIndex.getNodeType())) { Pair childIndexEntry = metadataIndex.getChildIndexEntry(deviceID, false); - ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), childIndexEntry.right); + ByteBuffer buffer = + readData(childIndexEntry.left.getOffset(), childIndexEntry.right, ioSizeRecorder); return getMetadataAndEndOffsetOfDeviceNode( deserializeConfig.deviceMetadataIndexNodeBufferDeserializer.deserialize( buffer, deserializeConfig), deviceID, - exactSearch); + exactSearch, + ioSizeRecorder); } else { return metadataIndex.getChildIndexEntry(deviceID, exactSearch); } @@ -1435,10 +1549,15 @@ protected Pair getMetadataAndEndOffsetOfDeviceNode( * @param measurement target measurement * @param exactSearch whether is in exact search mode, return null when there is no entry with * name; or else return the nearest MetadataIndexEntry before it (for deeper search) + * @param ioSizeRecorder can be null * @return target MetadataIndexEntry, endOffset pair */ protected Pair getMetadataAndEndOffsetOfMeasurementNode( - MetadataIndexNode metadataIndex, String measurement, boolean exactSearch) throws IOException { + MetadataIndexNode metadataIndex, + String measurement, + boolean exactSearch, + LongConsumer ioSizeRecorder) + throws IOException { if (MetadataIndexNodeType.INTERNAL_DEVICE.equals(metadataIndex.getNodeType()) || MetadataIndexNodeType.LEAF_DEVICE.equals(metadataIndex.getNodeType())) { throw new IllegalArgumentException(); @@ -1447,12 +1566,14 @@ protected Pair getMetadataAndEndOffsetOfMeasurementNo if (MetadataIndexNodeType.INTERNAL_MEASUREMENT.equals(metadataIndex.getNodeType())) { Pair childIndexEntry = metadataIndex.getChildIndexEntry(measurement, false); - ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), childIndexEntry.right); + ByteBuffer buffer = + readData(childIndexEntry.left.getOffset(), childIndexEntry.right, ioSizeRecorder); return getMetadataAndEndOffsetOfMeasurementNode( deserializeConfig.measurementMetadataIndexNodeBufferDeserializer.deserialize( buffer, deserializeConfig), measurement, - exactSearch); + exactSearch, + ioSizeRecorder); } else { return metadataIndex.getChildIndexEntry(measurement, exactSearch); } @@ -1527,10 +1648,12 @@ public ChunkHeader readChunkHeader(byte chunkType) throws IOException { * read the chunk's header. * * @param position the file offset of this chunk's header + * @param ioSizeRecorder can be null */ - private ChunkHeader readChunkHeader(long position) throws IOException { + private ChunkHeader readChunkHeader(long position, LongConsumer ioSizeRecorder) + throws IOException { try { - return ChunkHeader.deserializeFrom(tsFileInput, position); + return ChunkHeader.deserializeFrom(tsFileInput, position, ioSizeRecorder); } catch (StopReadTsFileByInterruptException e) { throw e; } catch (Throwable t) { @@ -1547,8 +1670,21 @@ private ChunkHeader readChunkHeader(long position) throws IOException { * @return the pages of this chunk */ public ByteBuffer readChunk(long position, int dataSize) throws IOException { + return readChunk(position, dataSize, null); + } + + /** + * notice, this function will modify channel's position. + * + * @param dataSize the size of chunkdata + * @param position the offset of the chunk data + * @param ioSizeRecorder can be null + * @return the pages of this chunk + */ + public ByteBuffer readChunk(long position, int dataSize, LongConsumer ioSizeRecorder) + throws IOException { try { - return readData(position, dataSize); + return readData(position, dataSize, ioSizeRecorder); } catch (StopReadTsFileByInterruptException e) { throw e; } catch (Throwable t) { @@ -1563,10 +1699,18 @@ public ByteBuffer readChunk(long position, int dataSize) throws IOException { * @return -chunk */ public Chunk readMemChunk(long offset) throws IOException { + return readMemChunk(offset, null); + } + + /** + * @param ioSizeRecorder can be null + */ + public Chunk readMemChunk(long offset, LongConsumer ioSizeRecorder) throws IOException { try { - ChunkHeader header = readChunkHeader(offset); - ByteBuffer buffer = readChunk(offset + header.getSerializedSize(), header.getDataSize()); - return new Chunk(header, buffer, getEncryptParam()); + ChunkHeader header = readChunkHeader(offset, ioSizeRecorder); + ByteBuffer buffer = + readChunk(offset + header.getSerializedSize(), header.getDataSize(), ioSizeRecorder); + return new Chunk(header, buffer, getEncryptParam(ioSizeRecorder)); } catch (StopReadTsFileByInterruptException e) { throw e; } catch (Throwable t) { @@ -1583,7 +1727,7 @@ public Chunk readMemChunk(long offset) throws IOException { */ public Chunk readMemChunk(ChunkMetadata metaData) throws IOException { try { - ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader()); + ChunkHeader header = readChunkHeader(metaData.getOffsetOfChunkHeader(), null); ByteBuffer buffer = readChunk( metaData.getOffsetOfChunkHeader() + header.getSerializedSize(), header.getDataSize()); @@ -1608,7 +1752,7 @@ public Chunk readMemChunk(ChunkMetadata metaData) throws IOException { * @return chunk */ public Chunk readMemChunk(CachedChunkLoaderImpl.ChunkCacheKey chunkCacheKey) throws IOException { - ChunkHeader header = readChunkHeader(chunkCacheKey.getOffsetOfChunkHeader()); + ChunkHeader header = readChunkHeader(chunkCacheKey.getOffsetOfChunkHeader(), null); ByteBuffer buffer = readChunk( chunkCacheKey.getOffsetOfChunkHeader() + header.getSerializedSize(), @@ -1648,7 +1792,7 @@ public MeasurementSchema getMeasurementSchema(List chunkMetadata return null; } IChunkMetadata lastChunkMetadata = chunkMetadataList.get(chunkMetadataList.size() - 1); - ChunkHeader header = readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader()); + ChunkHeader header = readChunkHeader(lastChunkMetadata.getOffsetOfChunkHeader(), null); return new MeasurementSchema( lastChunkMetadata.getMeasurementUid(), header.getDataType(), @@ -1776,6 +1920,26 @@ public long fileSize() throws IOException { * @return data that been read. */ protected ByteBuffer readData(long position, int totalSize) throws IOException { + return readData(position, totalSize, null); + } + + /** + * read data from tsFileInput, from the current position (if position = -1), or the given + * position.
+ * if position = -1, the tsFileInput's position will be changed to the current position + real + * data size that been read. Other wise, the tsFileInput's position is not changed. + * + * @param position the start position of data in the tsFileInput, or the current position if + * position = -1 + * @param totalSize the size of data that want to read + * @param ioSizeRecorder can be null + * @return data that been read. + */ + protected ByteBuffer readData(long position, int totalSize, LongConsumer ioSizeRecorder) + throws IOException { + if (ioSizeRecorder != null) { + ioSizeRecorder.accept(totalSize); + } int allocateSize = Math.min(MAX_READ_BUFFER_SIZE, totalSize); int allocateNum = (int) Math.ceil((double) totalSize / allocateSize); ByteBuffer buffer = ByteBuffer.allocate(totalSize); @@ -1817,8 +1981,23 @@ protected ByteBuffer readData(long position, int totalSize) throws IOException { * @return data that been read. */ protected ByteBuffer readData(long start, long end) throws IOException { + return readData(start, end, null); + } + + /** + * read data from tsFileInput, from the current position (if position = -1), or the given + * position. + * + * @param start the start position of data in the tsFileInput, or the current position if position + * = -1 + * @param end the end position of data that want to read + * @param ioSizeRecorder can be null + * @return data that been read. + */ + protected ByteBuffer readData(long start, long end, LongConsumer ioSizeRecorder) + throws IOException { try { - return readData(start, (int) (end - start)); + return readData(start, (int) (end - start), ioSizeRecorder); } catch (StopReadTsFileByInterruptException e) { throw e; } catch (Throwable t) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/UnClosedTsFileReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/UnClosedTsFileReader.java index 817f0f462..53be3fec6 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/UnClosedTsFileReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/UnClosedTsFileReader.java @@ -25,19 +25,23 @@ import org.apache.tsfile.file.metadata.TsFileMetadata; import java.io.IOException; +import java.util.function.LongConsumer; /** A class for reading unclosed tsfile. */ public class UnClosedTsFileReader extends TsFileSequenceReader { private EncryptParameter encryptParam; - public UnClosedTsFileReader(String file) throws IOException { - super(file, false); + // ioSizeRecorder can be null + public UnClosedTsFileReader(String file, LongConsumer ioSizeRecorder) throws IOException { + super(file, false, ioSizeRecorder); encryptParam = EncryptUtils.encryptParam; } - public UnClosedTsFileReader(String file, EncryptParameter decryptParam) throws IOException { - super(file, false); + // ioSizeRecorder can be null + public UnClosedTsFileReader( + String file, EncryptParameter decryptParam, LongConsumer ioSizeRecorder) throws IOException { + super(file, false, ioSizeRecorder); this.encryptParam = encryptParam; } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/TimeSeriesMetadataReadTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/TimeSeriesMetadataReadTest.java index bf284515d..e4bfbb44d 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/TimeSeriesMetadataReadTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/TimeSeriesMetadataReadTest.java @@ -73,7 +73,7 @@ public void testReadTimeseriesMetadata() throws IOException { // s4 should not be returned as result set.add("s4"); List timeseriesMetadataList = - reader.readTimeseriesMetadata(path.getIDeviceID(), path.getMeasurement(), set); + reader.readTimeseriesMetadata(path.getIDeviceID(), path.getMeasurement(), set, false, null); Assert.assertEquals(3, timeseriesMetadataList.size()); for (int i = 1; i <= timeseriesMetadataList.size(); i++) { Assert.assertEquals("s" + i, timeseriesMetadataList.get(i - 1).getMeasurementId()); @@ -87,7 +87,7 @@ public void testReadTimeseriesMetadata() throws IOException { // so the result is not supposed to contain this measurement's timeseries metadata set.add("s8"); timeseriesMetadataList = - reader.readTimeseriesMetadata(path.getIDeviceID(), path.getMeasurement(), set); + reader.readTimeseriesMetadata(path.getIDeviceID(), path.getMeasurement(), set, false, null); Assert.assertEquals(2, timeseriesMetadataList.size()); for (int i = 5; i < 7; i++) { Assert.assertEquals("s" + i, timeseriesMetadataList.get(i - 5).getMeasurementId()); diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/UnClosedTsFileReaderTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/UnClosedTsFileReaderTest.java index 62dc9bda9..8e865413a 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/UnClosedTsFileReaderTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/UnClosedTsFileReaderTest.java @@ -57,7 +57,7 @@ public void testRead() throws IOException { ChunkMetadata chunkMetadata = writer.getChunkGroupMetadataList().get(0).getChunkMetadataList().get(0); - UnClosedTsFileReader reader = new UnClosedTsFileReader(file.getAbsolutePath()); + UnClosedTsFileReader reader = new UnClosedTsFileReader(file.getAbsolutePath(), null); Chunk chunk = reader.readMemChunk(chunkMetadata); ChunkReader chunkReader = new ChunkReader(chunk); BatchData batchData = chunkReader.nextPageData();