Skip to content

Commit

Permalink
fixup! Create project Datablock #179
Browse files Browse the repository at this point in the history
  • Loading branch information
hdsdi3g committed Nov 12, 2024
1 parent 1fc2da4 commit f3b34f5
Show file tree
Hide file tree
Showing 10 changed files with 430 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.Optional;
import java.util.function.Function;

class DatablockChunkPayloadExtractorImpl implements DatablockChunkPayloadExtractor, IOTraits {// TODO tests
class DatablockChunkPayloadExtractorImpl implements DatablockChunkPayloadExtractor, IOTraits {// TODO tests via E2E

private final FileChannel channel;
private final long payloadPosition;
Expand All @@ -42,7 +42,8 @@ class DatablockChunkPayloadExtractorImpl implements DatablockChunkPayloadExtract
/**
* Always call clean() after getCurrentChunkPayload()
*/
DatablockChunkPayloadExtractorImpl(final FileChannel channel, final long payloadPosition,
DatablockChunkPayloadExtractorImpl(final FileChannel channel,
final long payloadPosition,
final int payloadSize) {
this.channel = Objects.requireNonNull(channel, "\"channel\" can't to be null");
this.payloadPosition = payloadPosition;
Expand Down
60 changes: 20 additions & 40 deletions datablock/src/main/java/tv/hd3g/datablock/DatablockDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,25 @@
import static tv.hd3g.datablock.DatablockDocumentHeader.DOCUMENT_HEADER_LEN;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;

// TODO add technical readme
// TODO add v1 + object storage + defrag
// TODO add indexed list
public class DatablockDocument implements IOTraits {// TODO test
public class DatablockDocument implements IOTraits {

public static final int CHUNK_SEPARATOR_SIZE = 1;

private final FileChannel channel;
private final ByteBuffer documentHeaderBuffer = ByteBuffer.allocate(DOCUMENT_HEADER_LEN);
private final ByteBuffer chunkHeaderBuffer = ByteBuffer.allocate(CHUNK_HEADER_LEN);
private final ByteBuffer chunkSeparator = ByteBuffer.allocate(1);
private final ByteBuffer chunkSeparator = ByteBuffer.allocate(CHUNK_SEPARATOR_SIZE);

public DatablockDocument(final FileChannel channel) throws IOException {
this.channel = Objects.requireNonNull(channel, "\"channel\" can't to be null");
Expand All @@ -47,35 +47,19 @@ public DatablockDocument(final FileChannel channel) throws IOException {
}
}

public synchronized DatablockDocumentHeader readDocumentHeader() throws IOException {
public synchronized DatablockDocumentHeader getDocumentHeader() throws IOException {
documentHeaderBuffer.clear();
checkedRead(channel, 0, documentHeaderBuffer);
return new DatablockDocumentHeader(documentHeaderBuffer.flip().asReadOnlyBuffer());
}

/*
* TODO needed ?
* @return newDocumentVersion
*/
/*public synchronized int incrementDocumentVersion() throws IOException {
final var buffer = ByteBuffer.allocate(4 /** documentVersion *
);
checkedRead(channel, DOCUMENT_VERSION_POS, buffer);
buffer.flip();
final var newDocumentVersion = buffer.getInt() + 1;
buffer.reset();
buffer.putInt(newDocumentVersion);
buffer.flip();
checkedWrite(channel, DOCUMENT_VERSION_POS, buffer);
return newDocumentVersion;
}*/

public synchronized void writeDocumentHeader(final DatablockDocumentHeader header) throws IOException {
public synchronized void putDocumentHeader(final DatablockDocumentHeader header) throws IOException {
final var buffer = header.toByteBuffer();
checkedWrite(channel, 0, buffer);
}

// FIXME + test APPEND FROM THE EOF !

/**
* @param chunkPayload No reset/flip will be done
*/
Expand All @@ -88,28 +72,21 @@ public synchronized void appendChunk(final byte[] fourCC,
final var header = chunkHeader.toByteBuffer();
checkedWrite(channel, header);
checkedWrite(channel, chunkPayload);

chunkSeparator.clear();
chunkSeparator.put(ZERO_BYTE);
chunkSeparator.flip();
checkedWrite(channel, chunkSeparator);
writeChunkSeparator();
}

public synchronized void appendChunk(final byte[] fourCC,
final short version,
final boolean archived,
final Consumer<OutputStream> writer) throws IOException {
final OutputStreamConsumer writer) throws IOException {
final var chunkHeader = new DatablockChunkHeader(fourCC, version, 0, archived);
final var header = chunkHeader.toByteBuffer();
checkedWrite(channel, header);

try (var outputStream = new DatablockOutputStreamChunk(channel)) {
writer.accept(outputStream);
writer.writeTo(outputStream);
} finally {
chunkSeparator.clear();
chunkSeparator.put(ZERO_BYTE);
chunkSeparator.flip();
checkedWrite(channel, chunkSeparator);
writeChunkSeparator();
}
}

Expand All @@ -123,19 +100,22 @@ public synchronized DataBlockChunkIndexItem appendEmptyChunk(final byte[] fourCC

final var payloadPosition = channel.position();
channel.position(payloadPosition + payloadSize);
writeChunkSeparator();

return new DataBlockChunkIndexItem(chunkHeader, payloadPosition);
}

private void writeChunkSeparator() throws IOException {
chunkSeparator.clear();
chunkSeparator.put(ZERO_BYTE);
chunkSeparator.flip();
checkedWrite(channel, chunkSeparator);

return new DataBlockChunkIndexItem(chunkHeader, payloadPosition);
}

public synchronized void documentCrawl(final FoundedDataBlockDocumentChunk chunkCallback) throws IOException {
channel.position(DOCUMENT_HEADER_LEN);

while (channel.position() + 1l < channel.size()) {
while (channel.position() <= channel.size()) {
chunkHeaderBuffer.clear();
checkedRead(channel, chunkHeaderBuffer);
final var header = new DatablockChunkHeader(chunkHeaderBuffer);
Expand Down Expand Up @@ -173,8 +153,8 @@ public synchronized void documentRefactor(final FileChannel newDocument,
newDocument.truncate(DOCUMENT_HEADER_LEN);
newDocument.position(0);

final var actualHeader = readDocumentHeader();
targetDocument.writeDocumentHeader(actualHeader.getIncrementedDocumentVersion());
final var actualHeader = getDocumentHeader();
targetDocument.putDocumentHeader(actualHeader.getIncrementedDocumentVersion());

final var actualPos = channel.position();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

@Getter
@EqualsAndHashCode
public class DatablockDocumentHeader implements IOTraits {// TODO test + debug tools
public class DatablockDocumentHeader implements IOTraits {// TODO debug tools

public static final int MAGIC_NUMBER_EXPECTED_SIZE = 8;
public static final int DOCUMENT_TYPE_EXPECTED_SIZE = 8;
Expand All @@ -36,11 +36,6 @@ public class DatablockDocumentHeader implements IOTraits {// TODO test + debug t
+ 4 /** documentVersion */
+ BLANK_EXPECTED_SIZE;

public static final long DOCUMENT_VERSION_POS = MAGIC_NUMBER_EXPECTED_SIZE +
DOCUMENT_TYPE_EXPECTED_SIZE
+ 2 /** typeVersion */
;

private final byte[] magicNumber;
private final byte[] documentType;
private final short typeVersion;
Expand Down Expand Up @@ -94,7 +89,7 @@ ByteBuffer toByteBuffer() {
header.putInt(documentVersion);
header.put(new byte[BLANK_EXPECTED_SIZE]);
header.flip();
return header.asReadOnlyBuffer();
return header;
}

@Override
Expand Down
6 changes: 4 additions & 2 deletions datablock/src/main/java/tv/hd3g/datablock/IOTraits.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ default void checkedWrite(final WritableByteChannel channel, final ByteBuffer bu
default void checkedWrite(final FileChannel channel,
final long position,
final ByteBuffer buffer) throws IOException {
checkIOSize(channel.write(buffer, position), buffer.capacity());
channel.position(position);
checkIOSize(channel.write(buffer), buffer.capacity());
}

default void checkedRead(final ReadableByteChannel channel, final ByteBuffer buffer) throws IOException {
Expand All @@ -50,7 +51,8 @@ default void checkedRead(final ReadableByteChannel channel, final ByteBuffer buf
default void checkedRead(final FileChannel channel,
final long position,
final ByteBuffer buffer) throws IOException {
checkIOSize(channel.read(buffer, position), buffer.capacity());
channel.position(position);
checkIOSize(channel.read(buffer), buffer.capacity());
}

default void checkEndBlank(final ByteBuffer readFrom, final int blankExpectedSize) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* This file is part of datablock.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* Copyright (C) hdsdi3g for hd3g.tv 2024
*
*/
package tv.hd3g.datablock;

import java.io.IOException;
import java.io.OutputStream;

@FunctionalInterface
public interface OutputStreamConsumer {

void writeTo(OutputStream outputStream) throws IOException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* This file is part of datablock.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* Copyright (C) hdsdi3g for hd3g.tv 2024
*
*/
package tv.hd3g.datablock;

import static java.util.Arrays.fill;
import static org.apache.commons.codec.binary.Hex.encodeHexString;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static tv.hd3g.datablock.DatablockDocumentHeader.DOCUMENT_HEADER_LEN;

import java.nio.ByteBuffer;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import tv.hd3g.commons.testtools.Fake;
import tv.hd3g.commons.testtools.MockToolsExtendsJunit;

@ExtendWith(MockToolsExtendsJunit.class)
class DatablockDocumentHeaderTest {

@Fake(min = 8, max = 8)
byte[] magicNumber;
@Fake(min = 8, max = 8)
byte[] documentType;
@Fake
short typeVersion;
@Fake
int documentVersion;

DatablockDocumentHeader h;

@BeforeEach
void init() {
h = new DatablockDocumentHeader(magicNumber, documentType, typeVersion, documentVersion);
checkConsts(h);
assertEquals(documentVersion, h.getDocumentVersion());
}

@Test
void testInvalidConstructDataSize_magicNumber() {
final var newMagicNumber = new byte[0];
assertThrows(IllegalArgumentException.class,
() -> new DatablockDocumentHeader(newMagicNumber, documentType, typeVersion, documentVersion));
}

@Test
void testInvalidConstructDataSize_documentType() {
final var newDocumentType = new byte[0];
assertThrows(IllegalArgumentException.class,
() -> new DatablockDocumentHeader(magicNumber, newDocumentType, typeVersion, documentVersion));
}

@Test
void testGetIncrementedDocumentVersion() {
final var incremented = h.getIncrementedDocumentVersion();
checkConsts(incremented);
assertEquals(documentVersion + 1, incremented.getDocumentVersion());
}

@Test
void testImpExByteBuffer() {
final var bb = h.toByteBuffer();
assertNotNull(bb);
assertEquals(0, bb.position());
assertEquals(DOCUMENT_HEADER_LEN, bb.capacity());
assertEquals(DOCUMENT_HEADER_LEN, bb.remaining());

h = new DatablockDocumentHeader(bb);
checkConsts(h);
assertEquals(documentVersion, h.getDocumentVersion());

assertEquals(DOCUMENT_HEADER_LEN, bb.position());
assertEquals(0, bb.remaining());
}

@Test
void testImpByteBuffer_invalidData() {
final var content = new byte[DOCUMENT_HEADER_LEN];
fill(content, (byte) 1);
final var bb = ByteBuffer.wrap(content);

assertThrows(IllegalArgumentException.class,
() -> new DatablockDocumentHeader(bb));
}

@Test
void testToString() {
assertThat(h.toString()).contains(
encodeHexString(magicNumber),
encodeHexString(documentType),
String.valueOf(typeVersion),
String.valueOf(documentVersion));
}

private void checkConsts(final DatablockDocumentHeader compareTo) {
assertArrayEquals(magicNumber, compareTo.getMagicNumber());
assertArrayEquals(documentType, compareTo.getDocumentType());
assertEquals(typeVersion, compareTo.getTypeVersion());
}

}
Loading

0 comments on commit f3b34f5

Please sign in to comment.