diff --git a/datablock/pom.xml b/datablock/pom.xml index b36cedd0..c142f5ba 100644 --- a/datablock/pom.xml +++ b/datablock/pom.xml @@ -32,5 +32,11 @@ junit test + + tv.hd3g.commons + testtools + 22.0.1-SNAPSHOT + test + diff --git a/datablock/src/main/java/tv/hd3g/datablock/DataBlockChunkIndexItem.java b/datablock/src/main/java/tv/hd3g/datablock/DataBlockChunkIndexItem.java index 0c940c7e..8d0406d3 100644 --- a/datablock/src/main/java/tv/hd3g/datablock/DataBlockChunkIndexItem.java +++ b/datablock/src/main/java/tv/hd3g/datablock/DataBlockChunkIndexItem.java @@ -23,18 +23,20 @@ public record DataBlockChunkIndexItem(DatablockChunkHeader header, long payloadP public T extractPayload(final Function extractor, final DatablockDocument document) { - final var reader = document.new ChunkReader(payloadPosition, header.getSize()); - final var result = extractor.apply(reader); - reader.clean(); - return result; + final var reader = document.new ChunkReader(payloadPosition, header.getPayloadSize()); + try { + return extractor.apply(reader); + } finally { + reader.clean(); + } } public void setArchived(final boolean archived, final DatablockDocument document) throws IOException { - document.new ChunkReader(payloadPosition, header.getSize()).updateHeader(archived, header.isDeleted()); + document.new ChunkReader(payloadPosition, header.getPayloadSize()).updateHeader(archived, header.isDeleted()); } public void setDeleted(final boolean deleted, final DatablockDocument document) throws IOException { - document.new ChunkReader(payloadPosition, header.getSize()).updateHeader(header.isArchived(), deleted); + document.new ChunkReader(payloadPosition, header.getPayloadSize()).updateHeader(header.isArchived(), deleted); } } diff --git a/datablock/src/main/java/tv/hd3g/datablock/DatablockChunkHeader.java b/datablock/src/main/java/tv/hd3g/datablock/DatablockChunkHeader.java index 6b6c1c14..8a53e623 100644 --- a/datablock/src/main/java/tv/hd3g/datablock/DatablockChunkHeader.java +++ b/datablock/src/main/java/tv/hd3g/datablock/DatablockChunkHeader.java @@ -32,9 +32,9 @@ public class DatablockChunkHeader implements IOTraits {// TODO test + debug tool public static final int FOURCC_EXPECTED_SIZE = 4; public static final int BLANK_EXPECTED_SIZE = 8; - public static final int HEADER_LEN = FOURCC_EXPECTED_SIZE + public static final int CHUNK_HEADER_LEN = FOURCC_EXPECTED_SIZE + 2 /** version */ - + 4 /** size */ + + 4 /** payloadSize */ + 8 /** createdDate */ + 1 /** deleted/archived */ + 1 /** compressed */ @@ -46,7 +46,7 @@ public class DatablockChunkHeader implements IOTraits {// TODO test + debug tool private final byte[] fourCC; private final short version; - private final int size; + private final int payloadSize; private final long createdDate; private final boolean archived; private final boolean deleted; @@ -56,14 +56,14 @@ public class DatablockChunkHeader implements IOTraits {// TODO test + debug tool /** * @param fourCC 4 bytes to identify and route to process the chunk * @param version chunk type version - * @param size data payload size + * @param payloadSize data payload payloadSize * @param compressed is payload is compressed * @param crc payload crc result * @param archived marked as archived */ public DatablockChunkHeader(final byte[] fourCC, final short version, - final int size, + final int payloadSize, final boolean compressed, final boolean archived, final int crc) { @@ -73,7 +73,7 @@ public DatablockChunkHeader(final byte[] fourCC, } this.fourCC = fourCC; this.version = version; - this.size = size; + this.payloadSize = payloadSize; createdDate = System.currentTimeMillis(); deleted = false; this.compressed = compressed; @@ -82,12 +82,12 @@ public DatablockChunkHeader(final byte[] fourCC, } public DatablockChunkHeader(final ByteBuffer readFrom) { - checkRemaining(readFrom, HEADER_LEN); + checkRemaining(readFrom, CHUNK_HEADER_LEN); fourCC = new byte[FOURCC_EXPECTED_SIZE]; readFrom.get(fourCC); version = readFrom.getShort(); - size = readFrom.getInt(); + payloadSize = readFrom.getInt(); createdDate = readFrom.getLong(); final var flag = readFrom.get(); @@ -100,10 +100,10 @@ public DatablockChunkHeader(final ByteBuffer readFrom) { } public ByteBuffer toByteBuffer() { - final var header = ByteBuffer.allocate(HEADER_LEN); + final var header = ByteBuffer.allocate(CHUNK_HEADER_LEN); header.put(fourCC); header.putShort(version); - header.putInt(size); + header.putInt(payloadSize); header.putLong(createdDate); final var flag = getFlag(deleted, archived); @@ -123,7 +123,7 @@ private static byte getFlag(final boolean deleted, final boolean archived) { /** * At the end, the file position will be put on the previous (actual) position. * @param channel pos must be set on the first chunk header byte - * @return payload size extracted from header + * @return payload payloadSize extracted from header */ static void updateHeader(final FileChannel channel, final boolean setArchived, @@ -136,7 +136,7 @@ static void updateHeader(final FileChannel channel, channel.position() + FOURCC_EXPECTED_SIZE + 2 /** version */ - + 4 /** size */ + + 4 /** payloadSize */ + 8 /** createdDate */ ), 1); @@ -150,8 +150,8 @@ public String toString() { builder.append(encodeHexString(fourCC)); builder.append(", version="); builder.append(version); - builder.append(", size="); - builder.append(size); + builder.append(", payloadSize="); + builder.append(payloadSize); builder.append(", createdDate="); builder.append(new Date(createdDate)); builder.append(", archived="); diff --git a/datablock/src/main/java/tv/hd3g/datablock/DatablockDocument.java b/datablock/src/main/java/tv/hd3g/datablock/DatablockDocument.java index e4064aa2..6ed0dfd3 100644 --- a/datablock/src/main/java/tv/hd3g/datablock/DatablockDocument.java +++ b/datablock/src/main/java/tv/hd3g/datablock/DatablockDocument.java @@ -17,8 +17,11 @@ package tv.hd3g.datablock; import static java.nio.channels.FileChannel.MapMode.READ_WRITE; +import static tv.hd3g.datablock.DatablockChunkHeader.CHUNK_HEADER_LEN; +import static tv.hd3g.datablock.DatablockDocumentHeader.DOCUMENT_HEADER_LEN; import java.io.IOException; +import java.io.UncheckedIOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; import java.nio.ByteBuffer; @@ -26,13 +29,14 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.function.Predicate; +import java.util.function.Function; -public class DatablockDocument implements IOTraits { +// TODO add technical readme +public class DatablockDocument implements IOTraits {// TODO test private final FileChannel channel; - private final ByteBuffer documentHeaderBuffer = ByteBuffer.allocate(DatablockDocumentHeader.HEADER_LEN); - private final ByteBuffer chunkHeaderBuffer = ByteBuffer.allocate(DatablockChunkHeader.HEADER_LEN); + private final ByteBuffer documentHeaderBuffer = ByteBuffer.allocate(DOCUMENT_HEADER_LEN); + private final ByteBuffer chunkHeaderBuffer = ByteBuffer.allocate(CHUNK_HEADER_LEN); private final ByteBuffer chunkSeparator = ByteBuffer.allocate(1); public DatablockDocument(final FileChannel channel) throws IOException { @@ -69,7 +73,7 @@ public synchronized void appendChunk(final DatablockChunkHeader chunkHeader, } public synchronized void documentCrawl(final FoundedDataBlockDocumentChunk chunkCallback) throws IOException { - channel.position(DatablockDocumentHeader.HEADER_LEN); + channel.position(DOCUMENT_HEADER_LEN); while (channel.position() + 1l < channel.size()) { chunkHeaderBuffer.clear(); @@ -77,12 +81,15 @@ public synchronized void documentCrawl(final FoundedDataBlockDocumentChunk chunk final var header = new DatablockChunkHeader(chunkHeaderBuffer); final var chunkPayloadDocumentPosition = channel.position(); - final var currentChunkReader = new ChunkReader(chunkPayloadDocumentPosition, header.getSize()); - chunkCallback.onChunk(header, chunkPayloadDocumentPosition, currentChunkReader); - currentChunkReader.clean(); + final var currentChunkReader = new ChunkReader(chunkPayloadDocumentPosition, header.getPayloadSize()); + try { + chunkCallback.onChunk(header, chunkPayloadDocumentPosition, currentChunkReader); + } finally { + currentChunkReader.clean(); + } final var nextChunkPosition = chunkPayloadDocumentPosition - + header.getSize(); + + header.getPayloadSize(); chunkSeparator.clear(); checkIOSize(channel.read(chunkSeparator, nextChunkPosition), chunkSeparator); chunkSeparator.flip(); @@ -94,32 +101,59 @@ public synchronized List getDocumentMap() throws IOExce final var result = new ArrayList(); documentCrawl((chunkHeader, chunkPayloadDocumentPosition, - payloadExtractor) -> { - result.add(new DataBlockChunkIndexItem(chunkHeader, chunkPayloadDocumentPosition)); - }); + payloadExtractor) -> result + .add(new DataBlockChunkIndexItem(chunkHeader, chunkPayloadDocumentPosition))); return result; } public synchronized void documentRefactor(final FileChannel newDocument, - final Predicate keepChunk) throws IOException { + final Function keepChunkPolicy) throws IOException { final var targetDocument = new DatablockDocument(newDocument); - newDocument.truncate(DatablockDocumentHeader.HEADER_LEN); + newDocument.truncate(DOCUMENT_HEADER_LEN); newDocument.position(0); final var actualHeader = readDocumentHeader(); targetDocument.writeDocumentHeader(actualHeader.getIncrementedDocumentVersion()); + final var actualPos = channel.position(); + documentCrawl((chunkHeader, chunkPayloadDocumentPosition, payloadExtractor) -> { - final var keepIt = keepChunk.test(new DataBlockChunkIndexItem(chunkHeader, chunkPayloadDocumentPosition)); - if (keepIt) { - // TODO write + final var policy = keepChunkPolicy.apply( + new DataBlockChunkIndexItem(chunkHeader, chunkPayloadDocumentPosition)); + + if (policy.keepActualChunk()) { + final var startChunkPos = chunkPayloadDocumentPosition - CHUNK_HEADER_LEN; + final var chunkLen = CHUNK_HEADER_LEN + + chunkHeader.getPayloadSize() + + chunkSeparator.capacity(); + + try { + channel.transferTo(startChunkPos, chunkLen, newDocument); + } catch (final IOException e) { + throw new UncheckedIOException( + "Can't transfert to newDocument (readed from=" + startChunkPos + ", len=" + chunkLen + ")", + e); + } + } + + if (policy.changeActualChunk()) { + try { + new ChunkReader(chunkPayloadDocumentPosition, chunkHeader.getPayloadSize()) + .updateHeader( + policy.markActualChunkAsArchived(), + policy.markActualChunkAsDeleted()); + } catch (final IOException e) { + throw new UncheckedIOException("Can't write to actual document", e); + } + } + }); + channel.position(actualPos); } - // TODO document defrag/cleanup class ChunkReader implements DatablockChunkPayloadExtractor { @@ -129,6 +163,9 @@ class ChunkReader implements DatablockChunkPayloadExtractor { private MemorySegment currentMemorySegment; private Arena arena; + /** + * Always call clean() after getCurrentChunkPayload() + */ ChunkReader(final long position, final long size) { this.position = position; this.size = size; diff --git a/datablock/src/main/java/tv/hd3g/datablock/DatablockDocumentHeader.java b/datablock/src/main/java/tv/hd3g/datablock/DatablockDocumentHeader.java index aecac2f7..6187d0d9 100644 --- a/datablock/src/main/java/tv/hd3g/datablock/DatablockDocumentHeader.java +++ b/datablock/src/main/java/tv/hd3g/datablock/DatablockDocumentHeader.java @@ -30,7 +30,7 @@ public class DatablockDocumentHeader implements IOTraits {// TODO test + debug t public static final int MAGIC_NUMBER_EXPECTED_SIZE = 8; public static final int DOCUMENT_TYPE_EXPECTED_SIZE = 8; public static final int BLANK_EXPECTED_SIZE = 2; - public static final int HEADER_LEN = MAGIC_NUMBER_EXPECTED_SIZE + + public static final int DOCUMENT_HEADER_LEN = MAGIC_NUMBER_EXPECTED_SIZE + DOCUMENT_TYPE_EXPECTED_SIZE + 2 /** typeVersion */ + 4 /** documentVersion */ @@ -70,7 +70,7 @@ public DatablockDocumentHeader getIncrementedDocumentVersion() { } public DatablockDocumentHeader(final ByteBuffer readFrom) { - checkRemaining(readFrom, HEADER_LEN); + checkRemaining(readFrom, DOCUMENT_HEADER_LEN); magicNumber = new byte[MAGIC_NUMBER_EXPECTED_SIZE]; readFrom.get(magicNumber); documentType = new byte[DOCUMENT_TYPE_EXPECTED_SIZE]; @@ -82,7 +82,7 @@ public DatablockDocumentHeader(final ByteBuffer readFrom) { } public ByteBuffer toByteBuffer() { - final var header = ByteBuffer.allocate(HEADER_LEN); + final var header = ByteBuffer.allocate(DOCUMENT_HEADER_LEN); header.put(magicNumber); header.put(documentType); header.putShort(typeVersion); diff --git a/datablock/src/main/java/tv/hd3g/datablock/DatablockMigrateChunkPolicy.java b/datablock/src/main/java/tv/hd3g/datablock/DatablockMigrateChunkPolicy.java new file mode 100644 index 00000000..2821e329 --- /dev/null +++ b/datablock/src/main/java/tv/hd3g/datablock/DatablockMigrateChunkPolicy.java @@ -0,0 +1,51 @@ +/* + * This file is part of datablock. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2024 + * + */ +package tv.hd3g.datablock; + +public record DatablockMigrateChunkPolicy(boolean keepActualChunk, + boolean markActualChunkAsArchived, + boolean markActualChunkAsDeleted) { + + public static DatablockMigrateChunkPolicy keepChunk() { + return new DatablockMigrateChunkPolicy(true, false, false); + } + + public static DatablockMigrateChunkPolicy dontKeepChunk() { + return new DatablockMigrateChunkPolicy(false, false, false); + } + + public static DatablockMigrateChunkPolicy keepChunkMarkActualDeleted() { + return new DatablockMigrateChunkPolicy(true, false, true); + } + + public static DatablockMigrateChunkPolicy keepChunkMarkActualArchived() { + return new DatablockMigrateChunkPolicy(true, true, false); + } + + public static DatablockMigrateChunkPolicy dontKeepChunkMarkActualDeleted() { + return new DatablockMigrateChunkPolicy(false, false, true); + } + + public static DatablockMigrateChunkPolicy dontKeepChunkMarkActualArchived() { + return new DatablockMigrateChunkPolicy(false, true, false); + } + + boolean changeActualChunk() { + return markActualChunkAsArchived || markActualChunkAsDeleted; + } + +} diff --git a/datablock/src/test/java/tv/hd3g/datablock/DatablockMigrateChunkPolicyTest.java b/datablock/src/test/java/tv/hd3g/datablock/DatablockMigrateChunkPolicyTest.java new file mode 100644 index 00000000..15b7340f --- /dev/null +++ b/datablock/src/test/java/tv/hd3g/datablock/DatablockMigrateChunkPolicyTest.java @@ -0,0 +1,107 @@ +/* + * This file is part of datablock. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2024 + * + */ +package tv.hd3g.datablock; + +import static org.assertj.core.api.Assertions.assertThat; +import static tv.hd3g.datablock.DatablockMigrateChunkPolicy.dontKeepChunk; +import static tv.hd3g.datablock.DatablockMigrateChunkPolicy.dontKeepChunkMarkActualArchived; +import static tv.hd3g.datablock.DatablockMigrateChunkPolicy.dontKeepChunkMarkActualDeleted; +import static tv.hd3g.datablock.DatablockMigrateChunkPolicy.keepChunk; +import static tv.hd3g.datablock.DatablockMigrateChunkPolicy.keepChunkMarkActualArchived; +import static tv.hd3g.datablock.DatablockMigrateChunkPolicy.keepChunkMarkActualDeleted; + +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +class DatablockMigrateChunkPolicyTest { + + DatablockMigrateChunkPolicy p; + + @Test + void testKeepChunk() { + p = keepChunk(); + + assertThat(p.keepActualChunk()).isTrue(); + assertThat(p.markActualChunkAsArchived()).isFalse(); + assertThat(p.markActualChunkAsDeleted()).isFalse(); + } + + @Test + void testDontKeepChunk() { + p = dontKeepChunk(); + + assertThat(p.keepActualChunk()).isFalse(); + assertThat(p.markActualChunkAsArchived()).isFalse(); + assertThat(p.markActualChunkAsDeleted()).isFalse(); + } + + @Test + void testKeepChunkMarkActualDeleted() { + p = keepChunkMarkActualDeleted(); + + assertThat(p.keepActualChunk()).isTrue(); + assertThat(p.markActualChunkAsArchived()).isFalse(); + assertThat(p.markActualChunkAsDeleted()).isTrue(); + } + + @Test + void testKeepChunkMarkActualArchived() { + p = keepChunkMarkActualArchived(); + + assertThat(p.keepActualChunk()).isTrue(); + assertThat(p.markActualChunkAsArchived()).isTrue(); + assertThat(p.markActualChunkAsDeleted()).isFalse(); + } + + @Test + void testDontKeepChunkMarkActualDeleted() { + p = dontKeepChunkMarkActualDeleted(); + + assertThat(p.keepActualChunk()).isFalse(); + assertThat(p.markActualChunkAsArchived()).isFalse(); + assertThat(p.markActualChunkAsDeleted()).isTrue(); + } + + @Test + void testDontKeepChunkMarkActualArchived() { + p = dontKeepChunkMarkActualArchived(); + + assertThat(p.keepActualChunk()).isFalse(); + assertThat(p.markActualChunkAsArchived()).isTrue(); + assertThat(p.markActualChunkAsDeleted()).isFalse(); + } + + private static Stream provideBooleans() { + return IntStream.range(0, 8) + .mapToObj(i -> Arguments.of( + (i & 1) == 1, + (i & 2) == 2, + (i & 4) == 4)); + } + + @ParameterizedTest + @MethodSource("provideBooleans") + void testChangeActualChunk(final boolean keep, final boolean pArchived, final boolean pDeleted) { + assertThat(new DatablockMigrateChunkPolicy(keep, pArchived, pDeleted).changeActualChunk()) + .isEqualTo(pArchived || pDeleted); + } +}