Skip to content

Commit

Permalink
fixup! Create project Datablock #179
Browse files Browse the repository at this point in the history
  • Loading branch information
hdsdi3g committed Nov 5, 2024
1 parent bbfa057 commit 5969faf
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 41 deletions.
6 changes: 6 additions & 0 deletions datablock/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>tv.hd3g.commons</groupId>
<artifactId>testtools</artifactId>
<version>22.0.1-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@ public record DataBlockChunkIndexItem(DatablockChunkHeader header, long payloadP

public <T> T extractPayload(final Function<DatablockChunkPayloadExtractor, T> extractor,
final DatablockDocument document) {
final var reader = document.new ChunkReader(payloadPosition, header.getSize());
final var result = extractor.apply(reader);
reader.clean();
return result;
final var reader = document.new ChunkReader(payloadPosition, header.getPayloadSize());
try {
return extractor.apply(reader);
} finally {
reader.clean();
}
}

public void setArchived(final boolean archived, final DatablockDocument document) throws IOException {
document.new ChunkReader(payloadPosition, header.getSize()).updateHeader(archived, header.isDeleted());
document.new ChunkReader(payloadPosition, header.getPayloadSize()).updateHeader(archived, header.isDeleted());
}

public void setDeleted(final boolean deleted, final DatablockDocument document) throws IOException {
document.new ChunkReader(payloadPosition, header.getSize()).updateHeader(header.isArchived(), deleted);
document.new ChunkReader(payloadPosition, header.getPayloadSize()).updateHeader(header.isArchived(), deleted);
}

}
28 changes: 14 additions & 14 deletions datablock/src/main/java/tv/hd3g/datablock/DatablockChunkHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public class DatablockChunkHeader implements IOTraits {// TODO test + debug tool

public static final int FOURCC_EXPECTED_SIZE = 4;
public static final int BLANK_EXPECTED_SIZE = 8;
public static final int HEADER_LEN = FOURCC_EXPECTED_SIZE
public static final int CHUNK_HEADER_LEN = FOURCC_EXPECTED_SIZE
+ 2 /** version */
+ 4 /** size */
+ 4 /** payloadSize */
+ 8 /** createdDate */
+ 1 /** deleted/archived */
+ 1 /** compressed */
Expand All @@ -46,7 +46,7 @@ public class DatablockChunkHeader implements IOTraits {// TODO test + debug tool

private final byte[] fourCC;
private final short version;
private final int size;
private final int payloadSize;
private final long createdDate;
private final boolean archived;
private final boolean deleted;
Expand All @@ -56,14 +56,14 @@ public class DatablockChunkHeader implements IOTraits {// TODO test + debug tool
/**
* @param fourCC 4 bytes to identify and route to process the chunk
* @param version chunk type version
* @param size data payload size
* @param payloadSize data payload payloadSize
* @param compressed is payload is compressed
* @param crc payload crc result
* @param archived marked as archived
*/
public DatablockChunkHeader(final byte[] fourCC,
final short version,
final int size,
final int payloadSize,
final boolean compressed,
final boolean archived,
final int crc) {
Expand All @@ -73,7 +73,7 @@ public DatablockChunkHeader(final byte[] fourCC,
}
this.fourCC = fourCC;
this.version = version;
this.size = size;
this.payloadSize = payloadSize;
createdDate = System.currentTimeMillis();
deleted = false;
this.compressed = compressed;
Expand All @@ -82,12 +82,12 @@ public DatablockChunkHeader(final byte[] fourCC,
}

public DatablockChunkHeader(final ByteBuffer readFrom) {
checkRemaining(readFrom, HEADER_LEN);
checkRemaining(readFrom, CHUNK_HEADER_LEN);
fourCC = new byte[FOURCC_EXPECTED_SIZE];
readFrom.get(fourCC);

version = readFrom.getShort();
size = readFrom.getInt();
payloadSize = readFrom.getInt();
createdDate = readFrom.getLong();

final var flag = readFrom.get();
Expand All @@ -100,10 +100,10 @@ public DatablockChunkHeader(final ByteBuffer readFrom) {
}

public ByteBuffer toByteBuffer() {
final var header = ByteBuffer.allocate(HEADER_LEN);
final var header = ByteBuffer.allocate(CHUNK_HEADER_LEN);
header.put(fourCC);
header.putShort(version);
header.putInt(size);
header.putInt(payloadSize);
header.putLong(createdDate);

final var flag = getFlag(deleted, archived);
Expand All @@ -123,7 +123,7 @@ private static byte getFlag(final boolean deleted, final boolean archived) {
/**
* At the end, the file position will be put on the previous (actual) position.
* @param channel pos must be set on the first chunk header byte
* @return payload size extracted from header
* @return payload payloadSize extracted from header
*/
static void updateHeader(final FileChannel channel,
final boolean setArchived,
Expand All @@ -136,7 +136,7 @@ static void updateHeader(final FileChannel channel,
channel.position()
+ FOURCC_EXPECTED_SIZE
+ 2 /** version */
+ 4 /** size */
+ 4 /** payloadSize */
+ 8 /** createdDate */
), 1);

Expand All @@ -150,8 +150,8 @@ public String toString() {
builder.append(encodeHexString(fourCC));
builder.append(", version=");
builder.append(version);
builder.append(", size=");
builder.append(size);
builder.append(", payloadSize=");
builder.append(payloadSize);
builder.append(", createdDate=");
builder.append(new Date(createdDate));
builder.append(", archived=");
Expand Down
73 changes: 55 additions & 18 deletions datablock/src/main/java/tv/hd3g/datablock/DatablockDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,26 @@
package tv.hd3g.datablock;

import static java.nio.channels.FileChannel.MapMode.READ_WRITE;
import static tv.hd3g.datablock.DatablockChunkHeader.CHUNK_HEADER_LEN;
import static tv.hd3g.datablock.DatablockDocumentHeader.DOCUMENT_HEADER_LEN;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.function.Function;

public class DatablockDocument implements IOTraits {
// TODO add technical readme
public class DatablockDocument implements IOTraits {// TODO test

private final FileChannel channel;
private final ByteBuffer documentHeaderBuffer = ByteBuffer.allocate(DatablockDocumentHeader.HEADER_LEN);
private final ByteBuffer chunkHeaderBuffer = ByteBuffer.allocate(DatablockChunkHeader.HEADER_LEN);
private final ByteBuffer documentHeaderBuffer = ByteBuffer.allocate(DOCUMENT_HEADER_LEN);
private final ByteBuffer chunkHeaderBuffer = ByteBuffer.allocate(CHUNK_HEADER_LEN);
private final ByteBuffer chunkSeparator = ByteBuffer.allocate(1);

public DatablockDocument(final FileChannel channel) throws IOException {
Expand Down Expand Up @@ -69,20 +73,23 @@ public synchronized void appendChunk(final DatablockChunkHeader chunkHeader,
}

public synchronized void documentCrawl(final FoundedDataBlockDocumentChunk chunkCallback) throws IOException {
channel.position(DatablockDocumentHeader.HEADER_LEN);
channel.position(DOCUMENT_HEADER_LEN);

while (channel.position() + 1l < channel.size()) {
chunkHeaderBuffer.clear();
checkIOSize(channel.read(chunkHeaderBuffer), chunkHeaderBuffer);
final var header = new DatablockChunkHeader(chunkHeaderBuffer);
final var chunkPayloadDocumentPosition = channel.position();

final var currentChunkReader = new ChunkReader(chunkPayloadDocumentPosition, header.getSize());
chunkCallback.onChunk(header, chunkPayloadDocumentPosition, currentChunkReader);
currentChunkReader.clean();
final var currentChunkReader = new ChunkReader(chunkPayloadDocumentPosition, header.getPayloadSize());
try {
chunkCallback.onChunk(header, chunkPayloadDocumentPosition, currentChunkReader);
} finally {
currentChunkReader.clean();
}

final var nextChunkPosition = chunkPayloadDocumentPosition
+ header.getSize();
+ header.getPayloadSize();
chunkSeparator.clear();
checkIOSize(channel.read(chunkSeparator, nextChunkPosition), chunkSeparator);
chunkSeparator.flip();
Expand All @@ -94,32 +101,59 @@ public synchronized List<DataBlockChunkIndexItem> getDocumentMap() throws IOExce
final var result = new ArrayList<DataBlockChunkIndexItem>();
documentCrawl((chunkHeader,
chunkPayloadDocumentPosition,
payloadExtractor) -> {
result.add(new DataBlockChunkIndexItem(chunkHeader, chunkPayloadDocumentPosition));
});
payloadExtractor) -> result
.add(new DataBlockChunkIndexItem(chunkHeader, chunkPayloadDocumentPosition)));
return result;
}

public synchronized void documentRefactor(final FileChannel newDocument,
final Predicate<DataBlockChunkIndexItem> keepChunk) throws IOException {
final Function<DataBlockChunkIndexItem, DatablockMigrateChunkPolicy> keepChunkPolicy) throws IOException {
final var targetDocument = new DatablockDocument(newDocument);
newDocument.truncate(DatablockDocumentHeader.HEADER_LEN);
newDocument.truncate(DOCUMENT_HEADER_LEN);
newDocument.position(0);

final var actualHeader = readDocumentHeader();
targetDocument.writeDocumentHeader(actualHeader.getIncrementedDocumentVersion());

final var actualPos = channel.position();

documentCrawl((chunkHeader,
chunkPayloadDocumentPosition,
payloadExtractor) -> {
final var keepIt = keepChunk.test(new DataBlockChunkIndexItem(chunkHeader, chunkPayloadDocumentPosition));
if (keepIt) {
// TODO write
final var policy = keepChunkPolicy.apply(
new DataBlockChunkIndexItem(chunkHeader, chunkPayloadDocumentPosition));

if (policy.keepActualChunk()) {
final var startChunkPos = chunkPayloadDocumentPosition - CHUNK_HEADER_LEN;
final var chunkLen = CHUNK_HEADER_LEN
+ chunkHeader.getPayloadSize()
+ chunkSeparator.capacity();

try {
channel.transferTo(startChunkPos, chunkLen, newDocument);
} catch (final IOException e) {
throw new UncheckedIOException(
"Can't transfert to newDocument (readed from=" + startChunkPos + ", len=" + chunkLen + ")",
e);
}

}

if (policy.changeActualChunk()) {
try {
new ChunkReader(chunkPayloadDocumentPosition, chunkHeader.getPayloadSize())
.updateHeader(
policy.markActualChunkAsArchived(),
policy.markActualChunkAsDeleted());
} catch (final IOException e) {
throw new UncheckedIOException("Can't write to actual document", e);
}
}

});

channel.position(actualPos);
}
// TODO document defrag/cleanup

class ChunkReader implements DatablockChunkPayloadExtractor {

Expand All @@ -129,6 +163,9 @@ class ChunkReader implements DatablockChunkPayloadExtractor {
private MemorySegment currentMemorySegment;
private Arena arena;

/**
* Always call clean() after getCurrentChunkPayload()
*/
ChunkReader(final long position, final long size) {
this.position = position;
this.size = size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class DatablockDocumentHeader implements IOTraits {// TODO test + debug t
public static final int MAGIC_NUMBER_EXPECTED_SIZE = 8;
public static final int DOCUMENT_TYPE_EXPECTED_SIZE = 8;
public static final int BLANK_EXPECTED_SIZE = 2;
public static final int HEADER_LEN = MAGIC_NUMBER_EXPECTED_SIZE +
public static final int DOCUMENT_HEADER_LEN = MAGIC_NUMBER_EXPECTED_SIZE +
DOCUMENT_TYPE_EXPECTED_SIZE
+ 2 /** typeVersion */
+ 4 /** documentVersion */
Expand Down Expand Up @@ -70,7 +70,7 @@ public DatablockDocumentHeader getIncrementedDocumentVersion() {
}

public DatablockDocumentHeader(final ByteBuffer readFrom) {
checkRemaining(readFrom, HEADER_LEN);
checkRemaining(readFrom, DOCUMENT_HEADER_LEN);
magicNumber = new byte[MAGIC_NUMBER_EXPECTED_SIZE];
readFrom.get(magicNumber);
documentType = new byte[DOCUMENT_TYPE_EXPECTED_SIZE];
Expand All @@ -82,7 +82,7 @@ public DatablockDocumentHeader(final ByteBuffer readFrom) {
}

public ByteBuffer toByteBuffer() {
final var header = ByteBuffer.allocate(HEADER_LEN);
final var header = ByteBuffer.allocate(DOCUMENT_HEADER_LEN);
header.put(magicNumber);
header.put(documentType);
header.putShort(typeVersion);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* This file is part of datablock.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* Copyright (C) hdsdi3g for hd3g.tv 2024
*
*/
package tv.hd3g.datablock;

public record DatablockMigrateChunkPolicy(boolean keepActualChunk,
boolean markActualChunkAsArchived,
boolean markActualChunkAsDeleted) {

public static DatablockMigrateChunkPolicy keepChunk() {
return new DatablockMigrateChunkPolicy(true, false, false);
}

public static DatablockMigrateChunkPolicy dontKeepChunk() {
return new DatablockMigrateChunkPolicy(false, false, false);
}

public static DatablockMigrateChunkPolicy keepChunkMarkActualDeleted() {
return new DatablockMigrateChunkPolicy(true, false, true);
}

public static DatablockMigrateChunkPolicy keepChunkMarkActualArchived() {
return new DatablockMigrateChunkPolicy(true, true, false);
}

public static DatablockMigrateChunkPolicy dontKeepChunkMarkActualDeleted() {
return new DatablockMigrateChunkPolicy(false, false, true);
}

public static DatablockMigrateChunkPolicy dontKeepChunkMarkActualArchived() {
return new DatablockMigrateChunkPolicy(false, true, false);
}

boolean changeActualChunk() {
return markActualChunkAsArchived || markActualChunkAsDeleted;
}

}
Loading

0 comments on commit 5969faf

Please sign in to comment.