Skip to content

Commit

Permalink
fixup! Create project Datablock #179
Browse files Browse the repository at this point in the history
  • Loading branch information
hdsdi3g committed Nov 10, 2024
1 parent 6401d32 commit 2be11eb
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ public class DatablockChunkHeader implements IOTraits {// TODO debug tools
* @param fourCC 4 bytes to identify and route to process the chunk
* @param version chunk type version
* @param payloadSize data payload payloadSize
* @param compressed is payload is compressed
* @param crc payload crc result
* @param archived marked as archived
*/
DatablockChunkHeader(final byte[] fourCC,
Expand Down Expand Up @@ -89,8 +87,11 @@ public class DatablockChunkHeader implements IOTraits {// TODO debug tools
checkEndBlank(readFrom, BLANK_EXPECTED_SIZE);
}

ByteBuffer toByteBuffer() {
final var header = ByteBuffer.allocate(CHUNK_HEADER_LEN);
void toByteBuffer(final ByteBuffer header) {// TODO needed 2 toByteBuffer ?
if (header.remaining() < CHUNK_HEADER_LEN) {
throw new IllegalArgumentException(
"No left space (" + header.remaining() + "/" + CHUNK_HEADER_LEN + ") on buffer");
}
header.put(fourCC);
header.putShort(version);
header.putInt(payloadSize);
Expand All @@ -99,8 +100,13 @@ ByteBuffer toByteBuffer() {
final var flag = getFlag(deleted, archived);
header.put(flag);
header.put(new byte[BLANK_EXPECTED_SIZE]);
}

ByteBuffer toByteBuffer() {
final var header = ByteBuffer.allocate(CHUNK_HEADER_LEN);
toByteBuffer(header);
header.flip();
return header.asReadOnlyBuffer();
return header;
}

private static byte getFlag(final boolean deleted, final boolean archived) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,14 @@ synchronized void clean() {
try {
Optional.ofNullable(currentMemorySegment)
.ifPresent(MemorySegment::unload);
Optional.ofNullable(arena)
.ifPresent(Arena::close);
} finally {
currentMemorySegment = null;
arena = null;
try {
Optional.ofNullable(arena)
.ifPresent(Arena::close);
} finally {
arena = null;
}
}
}

Expand Down
16 changes: 8 additions & 8 deletions datablock/src/main/java/tv/hd3g/datablock/DatablockDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static tv.hd3g.datablock.DatablockChunkHeader.CHUNK_HEADER_LEN;
import static tv.hd3g.datablock.DatablockDocumentHeader.DOCUMENT_HEADER_LEN;
import static tv.hd3g.datablock.DatablockDocumentHeader.DOCUMENT_VERSION_POS;

import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -54,13 +53,14 @@ public synchronized DatablockDocumentHeader readDocumentHeader() throws IOExcept
return new DatablockDocumentHeader(documentHeaderBuffer.flip().asReadOnlyBuffer());
}

/**
/*
* TODO needed ?
* @return newDocumentVersion
*/
public synchronized int incrementDocumentVersion() throws IOException {
final var buffer = ByteBuffer.allocate(4 /** documentVersion */
/*public synchronized int incrementDocumentVersion() throws IOException {
final var buffer = ByteBuffer.allocate(4 /** documentVersion *
);

checkedRead(channel, DOCUMENT_VERSION_POS, buffer);
buffer.flip();
final var newDocumentVersion = buffer.getInt() + 1;
Expand All @@ -69,7 +69,7 @@ public synchronized int incrementDocumentVersion() throws IOException {
buffer.flip();
checkedWrite(channel, DOCUMENT_VERSION_POS, buffer);
return newDocumentVersion;
}
}*/

public synchronized void writeDocumentHeader(final DatablockDocumentHeader header) throws IOException {
final var buffer = header.toByteBuffer();
Expand Down Expand Up @@ -98,13 +98,13 @@ public synchronized void appendChunk(final byte[] fourCC,
public synchronized void appendChunk(final byte[] fourCC,
final short version,
final boolean archived,
final Consumer<OutputStream> reader) throws IOException {
final Consumer<OutputStream> writer) throws IOException {
final var chunkHeader = new DatablockChunkHeader(fourCC, version, 0, archived);
final var header = chunkHeader.toByteBuffer();
checkedWrite(channel, header);

try (var outputStream = new DatablockOutputStreamChunk(channel)) {
reader.accept(outputStream);
writer.accept(outputStream);
} finally {
chunkSeparator.clear();
chunkSeparator.put(ZERO_BYTE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,76 +29,45 @@ class DatablockInputStreamChunk extends InputStream {// TODO test
private final FileChannel channel;
private final long payloadPosition;
private final int payloadSize;
private final ByteBuffer buffer;
private final ByteBuffer oneByteBuffer;

DatablockInputStreamChunk(final FileChannel channel,
final long payloadPosition,
final int payloadSize,
final int maxBufferSize) throws IOException {
final int payloadSize) throws IOException {
this.channel = Objects.requireNonNull(channel, "\"channel\" can't to be null");
this.payloadPosition = payloadPosition;
this.payloadSize = payloadSize;
channel.position(payloadPosition);
buffer = ByteBuffer.allocate(min(payloadSize, maxBufferSize));
}

private boolean readNextBuffer() throws IOException {
if (buffer.hasRemaining() == false) {
final var available = getCurrentAvailable();
if (available == 0) {
return false;
}
buffer.clear();
buffer.limit(min(buffer.capacity(), available));
final var readed = channel.read(buffer);
if (readed == 0) {
return false;
}
buffer.flip();
}
return true;
oneByteBuffer = ByteBuffer.allocate(1);
}

@Override
public int read() throws IOException {
if (readNextBuffer() == false) {
if (getCurrentAvailable() <= 0) {
return -1;
}
return buffer.get() & 0xFF;
}

@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
var pos = off;
var writeRemain = len;

while (writeRemain > 0) {
if (readNextBuffer() == false) {
break;
}

final var ioSize = min(writeRemain, buffer.remaining());
buffer.get(b, pos, ioSize);

pos += ioSize;
writeRemain -= ioSize;
oneByteBuffer.clear();
if (channel.read(oneByteBuffer) <= 0) {
return -1;
}

return pos - off;

oneByteBuffer.flip();
return oneByteBuffer.get() & 0xFF;
}

@Override
public void close() throws IOException {
buffer.clear();
}

private int getCurrentPosition() throws IOException {
return (int) (channel.position() - payloadPosition);
public int read(final byte[] b, final int off, final int len) throws IOException {
final var available = getCurrentAvailable();
if (getCurrentAvailable() <= 0) {
return -1;
}
final var buffer = ByteBuffer.wrap(b, off, min(available, len));
return channel.read(buffer);
}

private int getCurrentAvailable() throws IOException {
return payloadSize - getCurrentPosition();
int getCurrentAvailable() throws IOException {
return payloadSize - (int) (channel.position() - payloadPosition);
}

}

0 comments on commit 2be11eb

Please sign in to comment.