From bdce0f6c7227e7f85ec50d13188af7e4e22cb232 Mon Sep 17 00:00:00 2001 From: kandha Date: Wed, 31 Aug 2016 17:30:31 -0700 Subject: [PATCH] Implementation of bytebuffer based API --- .../distributedlog/BKLogSegmentWriter.java | 24 +++++----- .../distributedlog/EnvelopedEntry.java | 47 ++++++++++--------- .../distributedlog/EnvelopedEntryWriter.java | 3 +- .../impl/BKLogSegmentEntryWriter.java | 5 +- .../logsegment/LogSegmentEntryWriter.java | 7 +-- .../com/twitter/distributedlog/TestEntry.java | 11 ++--- .../distributedlog/TestEnvelopedEntry.java | 25 +++++----- .../EnvelopedRecordSetReader.java | 4 +- .../EnvelopedRecordSetWriter.java | 29 +++++++----- .../com/twitter/distributedlog/io/Buffer.java | 38 +++++++++++++-- .../distributedlog/io/CompressionCodec.java | 8 ++-- .../io/IdentityCompressionCodec.java | 19 ++++---- .../io/LZ4CompressionCodec.java | 31 ++++++------ pom.xml | 2 +- 14 files changed, 148 insertions(+), 105 deletions(-) diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java index 004b2fbf7..d9edaf4ac 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java @@ -17,27 +17,18 @@ */ package com.twitter.distributedlog; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.BKTransmitException; import com.twitter.distributedlog.exceptions.EndOfStreamException; import com.twitter.distributedlog.exceptions.FlushException; +import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException; import com.twitter.distributedlog.exceptions.LockingException; import com.twitter.distributedlog.exceptions.LogRecordTooLongException; import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException; import com.twitter.distributedlog.exceptions.WriteCancelledException; import com.twitter.distributedlog.exceptions.WriteException; -import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException; import com.twitter.distributedlog.feature.CoreFeatureKeys; import com.twitter.distributedlog.injector.FailureInjector; import com.twitter.distributedlog.injector.RandomDelayFailureInjector; @@ -77,9 +68,18 @@ import scala.runtime.AbstractFunction1; import scala.runtime.BoxedUnit; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + import static com.google.common.base.Charsets.UTF_8; -import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE; +import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; /** * BookKeeper Based Log Segment Writer. @@ -1081,7 +1081,7 @@ private Future transmit() synchronized (this) { BKTransmitPacket packet = new BKTransmitPacket(recordSetToTransmit); packetPrevious = packet; - entryWriter.asyncAddEntry(toSend.getData(), 0, toSend.size(), + entryWriter.asyncAddEntry(toSend.getData(), this, packet); if (recordSetToTransmit.hasUserRecords()) { diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntry.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntry.java index 55d3be97c..bf713a1da 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntry.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntry.java @@ -17,23 +17,22 @@ */ package com.twitter.distributedlog; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; - import com.google.common.base.Preconditions; - +import com.twitter.distributedlog.annotations.DistributedLogAnnotations.Compression; import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException; +import com.twitter.distributedlog.io.CompressionCodec; +import com.twitter.distributedlog.io.CompressionUtils; +import com.twitter.distributedlog.util.BitMaskUtils; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; -import com.twitter.distributedlog.annotations.DistributedLogAnnotations.Compression; -import com.twitter.distributedlog.io.CompressionCodec; -import com.twitter.distributedlog.io.CompressionUtils; -import com.twitter.distributedlog.util.BitMaskUtils; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; /** * An enveloped entry written to BookKeeper. @@ -112,7 +111,7 @@ public EnvelopedEntry(byte version, */ public EnvelopedEntry(byte version, CompressionCodec.Type compressionType, - byte[] decompressed, + ByteBuffer decompressed, int length, StatsLogger statsLogger) throws InvalidEnvelopedEntryException { @@ -141,12 +140,12 @@ public void writeFully(DataOutputStream out) throws IOException { header.write(out); // Compress CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType); - byte[] compressed = codec.compress( + ByteBuffer compressed = codec.compress( payloadDecompressed.payload, 0, payloadDecompressed.length, compressionStat); - this.payloadCompressed = new Payload(compressed.length, compressed); + this.payloadCompressed = new Payload(compressed.limit(), compressed); this.compressedEntryBytes.add(payloadCompressed.length); this.decompressedEntryBytes.add(payloadDecompressed.length); payloadCompressed.write(out); @@ -165,18 +164,18 @@ public void readFully(DataInputStream in) throws IOException { payloadCompressed.read(in); // Decompress CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType); - byte[] decompressed = codec.decompress( + ByteBuffer decompressed = codec.decompress( payloadCompressed.payload, 0, payloadCompressed.length, header.decompressedSize, decompressionStat); - this.payloadDecompressed = new Payload(decompressed.length, decompressed); + this.payloadDecompressed = new Payload(decompressed.limit(), decompressed); this.compressedEntryBytes.add(payloadCompressed.length); this.decompressedEntryBytes.add(payloadDecompressed.length); } - public byte[] getDecompressedPayload() throws IOException { + public ByteBuffer getDecompressedPayload() throws IOException { if (!isReady()) { throw new IOException("Decompressed payload is not initialized"); } @@ -245,7 +244,7 @@ private void read(DataInputStream in) throws IOException { public static class Payload { private int length = 0; - private byte[] payload = null; + private ByteBuffer payload = null; // Whether this struct is ready for reading/writing. private boolean ready = false; @@ -254,7 +253,7 @@ public static class Payload { Payload() { } - Payload(int length, byte[] payload) { + Payload(int length, ByteBuffer payload) { this.length = length; this.payload = payload; this.ready = true; @@ -262,13 +261,14 @@ public static class Payload { private void write(DataOutputStream out) throws IOException { out.writeInt(length); - out.write(payload, 0, length); + out.write(payload.array(), 0, length); } private void read(DataInputStream in) throws IOException { this.length = in.readInt(); - this.payload = new byte[length]; - in.readFully(payload); + this.payload = ByteBuffer.wrap(new byte[length]); + //TODO: Fix this + in.readFully(payload.array()); this.ready = true; } } @@ -290,7 +290,8 @@ public static InputStream fromInputStream(InputStream src, src.reset(); EnvelopedEntry entry = new EnvelopedEntry(version, statsLogger); entry.readFully(new DataInputStream(src)); - return new ByteArrayInputStream(entry.getDecompressedPayload()); + return new ByteArrayInputStream(entry.getDecompressedPayload().array()); + } } diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryWriter.java index df5628869..ef62bc084 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryWriter.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryWriter.java @@ -31,6 +31,7 @@ import java.io.DataOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; @@ -162,7 +163,7 @@ public synchronized Buffer getBuffer() throws InvalidEnvelopedEntryException, IO // We can't escape this allocation because things need to be read from one byte array // and then written to another. This is the destination. Buffer toSend = new Buffer(buffer.size()); - byte[] decompressed = buffer.getData(); + ByteBuffer decompressed = buffer.getData(); int length = buffer.size(); EnvelopedEntry entry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION, codec, diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKLogSegmentEntryWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKLogSegmentEntryWriter.java index 57b4e6996..db816f2e8 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKLogSegmentEntryWriter.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/BKLogSegmentEntryWriter.java @@ -21,6 +21,7 @@ import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter; import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.LedgerHandle; +import java.nio.ByteBuffer; /** * Ledger based log segment entry writer. @@ -49,9 +50,9 @@ public void asyncClose(AsyncCallback.CloseCallback callback, Object ctx) { } @Override - public void asyncAddEntry(byte[] data, int offset, int length, + public void asyncAddEntry(ByteBuffer data, AsyncCallback.AddCallback callback, Object ctx) { - lh.asyncAddEntry(data, offset, length, callback, ctx); + lh.asyncAddEntry(data, callback, ctx); } @Override diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryWriter.java index 8b7d9b27e..a0fe32e5c 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryWriter.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryWriter.java @@ -21,6 +21,7 @@ import com.twitter.distributedlog.Entry; import com.twitter.distributedlog.util.Sizable; import org.apache.bookkeeper.client.AsyncCallback; +import java.nio.ByteBuffer; /** * An interface class to write the enveloped entry (serialized bytes of @@ -56,10 +57,6 @@ public interface LogSegmentEntryWriter extends Sizable { * * @param data * data to add - * @param offset - * offset in the data - * @param length - * length of the data * @param callback * callback * @param ctx @@ -67,6 +64,6 @@ public interface LogSegmentEntryWriter extends Sizable { * @see org.apache.bookkeeper.client.LedgerHandle#asyncAddEntry( * byte[], int, int, AsyncCallback.AddCallback, Object) */ - void asyncAddEntry(byte[] data, int offset, int length, + void asyncAddEntry(ByteBuffer data, AsyncCallback.AddCallback callback, Object ctx); } diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestEntry.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestEntry.java index 0e4737bac..821edb8bd 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestEntry.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestEntry.java @@ -17,14 +17,12 @@ */ package com.twitter.distributedlog; -import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.twitter.distributedlog.Entry.Reader; import com.twitter.distributedlog.Entry.Writer; import com.twitter.distributedlog.exceptions.LogRecordTooLongException; import com.twitter.distributedlog.io.Buffer; import com.twitter.distributedlog.io.CompressionCodec; -import com.twitter.io.Buf; import com.twitter.util.Await; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; @@ -38,7 +36,8 @@ import static com.google.common.base.Charsets.UTF_8; import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; /** * Test Case of {@link Entry} @@ -58,7 +57,7 @@ public void testEmptyRecordSet() throws Exception { Buffer buffer = writer.getBuffer(); Entry recordSet = Entry.newBuilder() - .setData(buffer.getData(), 0, buffer.size()) + .setData(buffer.getData().array(), 0, buffer.size()) .setLogSegmentInfo(1L, 0L) .setEntryId(0L) .build(); @@ -145,7 +144,7 @@ public void testWriteRecords() throws Exception { // Test reading from buffer Entry recordSet = Entry.newBuilder() - .setData(buffer.getData(), 0, buffer.size()) + .setData(buffer.getData().array(), 0, buffer.size()) .setLogSegmentInfo(1L, 1L) .setEntryId(0L) .build(); @@ -278,7 +277,7 @@ void verifyReadResult(Buffer data, DLSN expectedDLSN, long expectedTxId) throws Exception { Entry recordSet = Entry.newBuilder() - .setData(data.getData(), 0, data.size()) + .setData(data.getData().array(), 0, data.size()) .setLogSegmentInfo(lssn, startSequenceId) .setEntryId(entryId) .deserializeRecordSet(deserializeRecordSet) diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestEnvelopedEntry.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestEnvelopedEntry.java index 37f261d25..84f60a3c8 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestEnvelopedEntry.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestEnvelopedEntry.java @@ -17,10 +17,6 @@ */ package com.twitter.distributedlog; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; - import com.twitter.distributedlog.io.Buffer; import com.twitter.distributedlog.io.CompressionCodec; import org.apache.bookkeeper.stats.NullStatsLogger; @@ -29,6 +25,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.nio.ByteBuffer; + public class TestEnvelopedEntry { static final Logger LOG = LoggerFactory.getLogger(TestEnvelopedEntry.class); @@ -49,16 +50,16 @@ public void testEnvelope() throws Exception { byte[] data = getString(false).getBytes(); EnvelopedEntry writeEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION, CompressionCodec.Type.NONE, - data, + ByteBuffer.wrap(data), data.length, new NullStatsLogger()); Buffer outBuf = new Buffer(2 * data.length); writeEntry.writeFully(new DataOutputStream(outBuf)); EnvelopedEntry readEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION, new NullStatsLogger()); - readEntry.readFully(new DataInputStream(new ByteArrayInputStream(outBuf.getData()))); - byte[] newData = readEntry.getDecompressedPayload(); - Assert.assertEquals("Written data should equal read data", new String(data), new String(newData)); + readEntry.readFully(new DataInputStream(new ByteArrayInputStream(outBuf.getData().array()))); + ByteBuffer newData = readEntry.getDecompressedPayload(); + Assert.assertEquals("Written data should equal read data", new String(data), new String(newData.array())); } @Test(timeout = 20000) @@ -66,7 +67,7 @@ public void testLZ4Compression() throws Exception { byte[] data = getString(true).getBytes(); EnvelopedEntry writeEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION, CompressionCodec.Type.LZ4, - data, + ByteBuffer.wrap(data), data.length, new NullStatsLogger()); Buffer outBuf = new Buffer(data.length); @@ -74,8 +75,8 @@ public void testLZ4Compression() throws Exception { Assert.assertTrue(data.length > outBuf.size()); EnvelopedEntry readEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION, new NullStatsLogger()); - readEntry.readFully(new DataInputStream(new ByteArrayInputStream(outBuf.getData()))); - byte[] newData = readEntry.getDecompressedPayload(); - Assert.assertEquals("Written data should equal read data", new String(data), new String(newData)); + readEntry.readFully(new DataInputStream(new ByteArrayInputStream(outBuf.getData().array()))); + ByteBuffer newData = readEntry.getDecompressedPayload(); + Assert.assertEquals("Written data should equal read data", new String(data), new String(newData.array())); } } diff --git a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/EnvelopedRecordSetReader.java b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/EnvelopedRecordSetReader.java index d3f7a99e4..12698d5ae 100644 --- a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/EnvelopedRecordSetReader.java +++ b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/EnvelopedRecordSetReader.java @@ -75,9 +75,9 @@ class EnvelopedRecordSetReader implements LogRecordSet.Reader { if (COMPRESSION_CODEC_LZ4 == codecCode) { CompressionCodec codec = CompressionUtils.getCompressionCodec(CompressionCodec.Type.LZ4); - byte[] decompressedData = codec.decompress(compressedData, 0, actualDataLen, + ByteBuffer decompressedData = codec.decompress(ByteBuffer.wrap(compressedData), 0, actualDataLen, originDataLen, NullOpStatsLogger); - this.reader = ByteBuffer.wrap(decompressedData); + this.reader = decompressedData; } else { if (originDataLen != actualDataLen) { throw new IOException("Inconsistent data length found for a non-compressed record set : original = " diff --git a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/EnvelopedRecordSetWriter.java b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/EnvelopedRecordSetWriter.java index 32b3cf4fa..18015cb35 100644 --- a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/EnvelopedRecordSetWriter.java +++ b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/EnvelopedRecordSetWriter.java @@ -17,11 +17,11 @@ */ package com.twitter.distributedlog; +import com.twitter.distributedlog.exceptions.LogRecordTooLongException; +import com.twitter.distributedlog.exceptions.WriteException; import com.twitter.distributedlog.io.Buffer; import com.twitter.distributedlog.io.CompressionCodec; import com.twitter.distributedlog.io.CompressionUtils; -import com.twitter.distributedlog.exceptions.LogRecordTooLongException; -import com.twitter.distributedlog.exceptions.WriteException; import com.twitter.util.Promise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +35,13 @@ import java.util.List; import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; -import static com.twitter.distributedlog.LogRecordSet.*; +import static com.twitter.distributedlog.LogRecordSet.COMPRESSION_CODEC_LZ4; +import static com.twitter.distributedlog.LogRecordSet.COMPRESSION_CODEC_NONE; +import static com.twitter.distributedlog.LogRecordSet.HEADER_LEN; +import static com.twitter.distributedlog.LogRecordSet.METADATA_COMPRESSION_MASK; +import static com.twitter.distributedlog.LogRecordSet.METADATA_VERSION_MASK; +import static com.twitter.distributedlog.LogRecordSet.NullOpStatsLogger; +import static com.twitter.distributedlog.LogRecordSet.VERSION; /** * {@link Buffer} based log record set writer. @@ -139,17 +145,18 @@ public synchronized ByteBuffer getBuffer() { } ByteBuffer createBuffer() { - byte[] data = buffer.getData(); + ByteBuffer data = buffer.getData(); int dataOffset = HEADER_LEN; int dataLen = buffer.size() - HEADER_LEN; if (COMPRESSION_CODEC_LZ4 != codecCode) { - ByteBuffer recordSetBuffer = ByteBuffer.wrap(data, 0, buffer.size()); + ByteBuffer recordSetBuffer = data; // update count recordSetBuffer.putInt(4, count); // update data len recordSetBuffer.putInt(8, dataLen); recordSetBuffer.putInt(12, dataLen); + recordSetBuffer.flip(); return recordSetBuffer; } @@ -157,16 +164,16 @@ ByteBuffer createBuffer() { CompressionCodec compressor = CompressionUtils.getCompressionCodec(codec); - byte[] compressed = + ByteBuffer compressed = compressor.compress(data, dataOffset, dataLen, NullOpStatsLogger); ByteBuffer recordSetBuffer; - if (compressed.length > dataLen) { - byte[] newData = new byte[HEADER_LEN + compressed.length]; - System.arraycopy(data, 0, newData, 0, HEADER_LEN + dataLen); + if (compressed.limit() > dataLen) { + byte[] newData = new byte[HEADER_LEN + compressed.limit()]; + System.arraycopy(data.array(), 0, newData, 0, HEADER_LEN + dataLen); recordSetBuffer = ByteBuffer.wrap(newData); } else { - recordSetBuffer = ByteBuffer.wrap(data); + recordSetBuffer = data; } // version recordSetBuffer.position(4); @@ -174,7 +181,7 @@ ByteBuffer createBuffer() { recordSetBuffer.putInt(count); // update data len recordSetBuffer.putInt(dataLen); - recordSetBuffer.putInt(compressed.length); + recordSetBuffer.putInt(compressed.limit()); recordSetBuffer.put(compressed); recordSetBuffer.flip(); return recordSetBuffer; diff --git a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/io/Buffer.java b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/io/Buffer.java index 4c9b23fd5..d796fd88d 100644 --- a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/io/Buffer.java +++ b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/io/Buffer.java @@ -18,16 +18,48 @@ package com.twitter.distributedlog.io; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; /** * {@link ByteArrayOutputStream} based buffer. */ -public class Buffer extends ByteArrayOutputStream { +public class Buffer extends OutputStream { + ByteBuffer buf; public Buffer(int initialCapacity) { - super(initialCapacity); + buf = ByteBuffer.allocate(initialCapacity); } - public byte[] getData() { + public ByteBuffer getData() { return buf; } + + /** + * Writes the specified byte to this output stream. The general + * contract for write is that one byte is written + * to the output stream. The byte to be written is the eight + * low-order bits of the argument b. The 24 + * high-order bits of b are ignored. + *

+ * Subclasses of OutputStream must provide an + * implementation for this method. + * + * @param b the byte. + * @throws IOException if an I/O error occurs. In particular, + * an IOException may be thrown if the + * output stream has been closed. + */ + @Override + public void write(int b) throws IOException { + buf.put((byte)b); + } + + public int size() { + return buf.position(); + } + + //TODO: Implement this + public void reset() { + } } diff --git a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/io/CompressionCodec.java b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/io/CompressionCodec.java index 2637f8756..2aa2582a2 100644 --- a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/io/CompressionCodec.java +++ b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/io/CompressionCodec.java @@ -19,6 +19,8 @@ import org.apache.bookkeeper.stats.OpStatsLogger; +import java.nio.ByteBuffer; + /** * Common interface for compression/decompression operations using different * compression codecs. @@ -45,7 +47,7 @@ public static enum Type { * The compressed data * The returned byte array is sized to the length of the compressed data */ - byte[] compress(byte[] data, int offset, int length, OpStatsLogger compressionStat); + ByteBuffer compress(ByteBuffer data, int offset, int length, OpStatsLogger compressionStat); /** * Return the decompressed data as a byte array. @@ -60,7 +62,7 @@ public static enum Type { * @return * The decompressed data */ - byte[] decompress(byte[] data, int offset, int length, OpStatsLogger decompressionStat); + ByteBuffer decompress(ByteBuffer data, int offset, int length, OpStatsLogger decompressionStat); /** * Return the decompressed data as a byte array. @@ -77,5 +79,5 @@ public static enum Type { * @return * The decompressed data */ - byte[] decompress(byte[] data, int offset, int length, int decompressedSize, OpStatsLogger decompressionStat); + ByteBuffer decompress(ByteBuffer data, int offset, int length, int decompressedSize, OpStatsLogger decompressionStat); } diff --git a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/io/IdentityCompressionCodec.java b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/io/IdentityCompressionCodec.java index 4cc776284..2a2c26c2d 100644 --- a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/io/IdentityCompressionCodec.java +++ b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/io/IdentityCompressionCodec.java @@ -17,30 +17,33 @@ */ package com.twitter.distributedlog.io; -import java.util.Arrays; - import com.google.common.base.Preconditions; - import org.apache.bookkeeper.stats.OpStatsLogger; +import java.nio.ByteBuffer; + public class IdentityCompressionCodec implements CompressionCodec { @Override - public byte[] compress(byte[] data, int offset, int length, OpStatsLogger compressionStat) { + public ByteBuffer compress(ByteBuffer data, int offset, int length, OpStatsLogger compressionStat) { Preconditions.checkNotNull(data); Preconditions.checkArgument(length >= 0); - return Arrays.copyOfRange(data, offset, offset + length); + data.limit(offset+length); + data.position(offset); + return data; } @Override - public byte[] decompress(byte[] data, int offset, int length, OpStatsLogger decompressionStat) { + public ByteBuffer decompress(ByteBuffer data, int offset, int length, OpStatsLogger decompressionStat) { Preconditions.checkNotNull(data); - return Arrays.copyOfRange(data, offset, offset + length); + data.limit(offset+length); + data.position(offset); + return data; } @Override // Decompressed size is the same as the length of the data because this is an // Identity compressor - public byte[] decompress(byte[] data, int offset, int length, + public ByteBuffer decompress(ByteBuffer data, int offset, int length, int decompressedSize, OpStatsLogger decompressionStat) { return decompress(data, offset, length, decompressionStat); } diff --git a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/io/LZ4CompressionCodec.java b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/io/LZ4CompressionCodec.java index 2137452a6..6a4784f62 100644 --- a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/io/LZ4CompressionCodec.java +++ b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/io/LZ4CompressionCodec.java @@ -17,19 +17,18 @@ */ package com.twitter.distributedlog.io; -import java.util.concurrent.TimeUnit; - import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; - import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Exception; import net.jpountz.lz4.LZ4Factory; import net.jpountz.lz4.LZ4FastDecompressor; import net.jpountz.lz4.LZ4SafeDecompressor; - import org.apache.bookkeeper.stats.OpStatsLogger; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + /** * All functions are thread safe. */ @@ -49,22 +48,22 @@ public LZ4CompressionCodec() { } @Override - public byte[] compress(byte[] data, int offset, int length, OpStatsLogger compressionStat) { + public ByteBuffer compress(ByteBuffer data, int offset, int length, OpStatsLogger compressionStat) { Preconditions.checkNotNull(data); - Preconditions.checkArgument(offset >= 0 && offset < data.length); + Preconditions.checkArgument(offset >= 0 && offset < data.limit()); Preconditions.checkArgument(length >= 0); Preconditions.checkNotNull(compressionStat); Stopwatch watch = Stopwatch.createStarted(); - byte[] compressed = compressor.compress(data, offset, length); + byte[] compressed = compressor.compress(data.array(), offset, length); compressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS)); - return compressed; + return ByteBuffer.wrap(compressed); } @Override - public byte[] decompress(byte[] data, int offset, int length, OpStatsLogger decompressionStat) { + public ByteBuffer decompress(ByteBuffer data, int offset, int length, OpStatsLogger decompressionStat) { Preconditions.checkNotNull(data); - Preconditions.checkArgument(offset >= 0 && offset < data.length); + Preconditions.checkArgument(offset >= 0 && offset < data.limit()); Preconditions.checkArgument(length >= 0); Preconditions.checkNotNull(decompressionStat); @@ -73,9 +72,9 @@ public byte[] decompress(byte[] data, int offset, int length, OpStatsLogger deco int outLength = length * 3; while (true) { try { - byte[] decompressed = safeDecompressor.decompress(data, offset, length, outLength); + byte[] decompressed = safeDecompressor.decompress(data.array(), offset, length, outLength); decompressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS)); - return decompressed; + return ByteBuffer.wrap(decompressed); } catch (LZ4Exception e) { outLength *= 2; } @@ -84,17 +83,17 @@ public byte[] decompress(byte[] data, int offset, int length, OpStatsLogger deco @Override // length parameter is ignored here because of the way the fastDecompressor works. - public byte[] decompress(byte[] data, int offset, int length, int decompressedSize, + public ByteBuffer decompress(ByteBuffer data, int offset, int length, int decompressedSize, OpStatsLogger decompressionStat) { Preconditions.checkNotNull(data); - Preconditions.checkArgument(offset >= 0 && offset < data.length); + Preconditions.checkArgument(offset >= 0 && offset < data.limit()); Preconditions.checkArgument(length >= 0); Preconditions.checkArgument(decompressedSize >= 0); Preconditions.checkNotNull(decompressionStat); Stopwatch watch = Stopwatch.createStarted(); - byte[] decompressed = fastDecompressor.decompress(data, offset, decompressedSize); + byte[] decompressed = fastDecompressor.decompress(data.array(), offset, decompressedSize); decompressionStat.registerSuccessfulEvent(watch.elapsed(TimeUnit.MICROSECONDS)); - return decompressed; + return ByteBuffer.wrap(decompressed); } } diff --git a/pom.xml b/pom.xml index 57f6d1f0c..6f3e07f91 100644 --- a/pom.xml +++ b/pom.xml @@ -88,7 +88,7 @@ UTF-8 UTF-8 3.5.1-alpha - 4.3.5-TWTTR-OSS + 4.3.4-TWTTR-SNAPSHOT 6.34.0 4.6.0 3.17.0