diff --git a/src/main/java/net/spy/memcached/transcoders/BaseSerializingTranscoder.java b/src/main/java/net/spy/memcached/transcoders/BaseSerializingTranscoder.java index e99b93db3..d8fd38ec9 100644 --- a/src/main/java/net/spy/memcached/transcoders/BaseSerializingTranscoder.java +++ b/src/main/java/net/spy/memcached/transcoders/BaseSerializingTranscoder.java @@ -22,11 +22,8 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.UnsupportedEncodingException; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; import net.spy.memcached.CachedData; -import net.spy.memcached.compat.CloseUtil; import net.spy.memcached.compat.SpyObject; /** @@ -34,17 +31,9 @@ * compressed data. */ public abstract class BaseSerializingTranscoder extends SpyObject { - - /** - * Default compression threshold value. - */ - public static final int DEFAULT_COMPRESSION_THRESHOLD = 16384; - private static final String DEFAULT_CHARSET = "UTF-8"; - - protected int compressionThreshold = DEFAULT_COMPRESSION_THRESHOLD; protected String charset = DEFAULT_CHARSET; - + private final Compressor compressor; private final int maxSize; /** @@ -52,6 +41,16 @@ public abstract class BaseSerializingTranscoder extends SpyObject { */ public BaseSerializingTranscoder(int max) { super(); + this.compressor = new GzipCompressor(); + maxSize = max; + } + + /** + * Initialize a serializing transcoder with the given maximum data size and compressor. + */ + public BaseSerializingTranscoder(int max, Compressor compressor) { + super(); + this.compressor = compressor; maxSize = max; } @@ -67,7 +66,10 @@ public boolean asyncDecode(CachedData d) { * @param to the number of bytes */ public void setCompressionThreshold(int to) { - compressionThreshold = to; + if (compressor == null) { + return; + } + compressor.setCompressionThreshold(to); } /** @@ -83,6 +85,35 @@ public void setCharset(String to) { charset = to; } + protected CachedData doCompress(byte[] before, int flags, Class type) { + if (compressor == null) { + return new CachedData(flags, before, getMaxSize()); + } + if (before.length > compressor.getCompressionThreshold()) { + byte[] compressed = compressor.compress(before); + if (compressed.length < before.length) { + getLogger().debug("Compressed %s from %d to %d", type.getName(), + before.length, compressed.length); + before = compressed; + flags |= SerializingTranscoder.COMPRESSED; + } else if (compressed.length > before.length) { + getLogger().info("Compression increased the size of %s from %d to %d", + type.getName(), before.length, compressed.length); + } else { + getLogger().info("Compression makes same length of %s : %d", + type.getName(), before.length); + } + } + return new CachedData(flags, before, getMaxSize()); + } + + protected byte[] doDecompress(CachedData cachedData) { + if (compressor == null) { + return cachedData.getData(); + } + return compressor.decompress(cachedData.getData()); + } + /** * Get the bytes representing the given serialized object. */ @@ -127,56 +158,6 @@ protected Object deserialize(byte[] in) { return rv; } - /** - * Compress the given array of bytes. - */ - protected byte[] compress(byte[] in) { - if (in == null) { - throw new NullPointerException("Can't compress null"); - } - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - GZIPOutputStream gz = null; - try { - gz = new GZIPOutputStream(bos); - gz.write(in); - } catch (IOException e) { - throw new RuntimeException("IO exception compressing data", e); - } finally { - CloseUtil.close(gz); - CloseUtil.close(bos); - } - byte[] rv = bos.toByteArray(); - getLogger().debug("Compressed %d bytes to %d", in.length, rv.length); - return rv; - } - - /** - * Decompress the given array of bytes. - * - * @return null if the bytes cannot be decompressed - */ - protected byte[] decompress(byte[] in) { - ByteArrayOutputStream bos = null; - if (in != null) { - ByteArrayInputStream bis = new ByteArrayInputStream(in); - bos = new ByteArrayOutputStream(); - GZIPInputStream gis; - try { - gis = new GZIPInputStream(bis); - - byte[] buf = new byte[8192]; - int r = -1; - while ((r = gis.read(buf)) > 0) { - bos.write(buf, 0, r); - } - } catch (IOException e) { - getLogger().warn("Failed to decompress data", e); - bos = null; - } - } - return bos == null ? null : bos.toByteArray(); - } - /** * Decode the string with the current character set. */ diff --git a/src/main/java/net/spy/memcached/transcoders/Compressor.java b/src/main/java/net/spy/memcached/transcoders/Compressor.java new file mode 100644 index 000000000..ec9b5dd0e --- /dev/null +++ b/src/main/java/net/spy/memcached/transcoders/Compressor.java @@ -0,0 +1,24 @@ +package net.spy.memcached.transcoders; + +public interface Compressor { + /** + * Default compression threshold value. + */ + int DEFAULT_COMPRESSION_THRESHOLD = 16384; + + /** + * Compress the given array of bytes. + */ + public byte[] compress(byte[] in); + + /** + * Decompress the given array of bytes. + * + * @return null if the bytes cannot be decompressed + */ + public byte[] decompress(byte[] in); + + public int getCompressionThreshold(); + + public void setCompressionThreshold(int to); +} diff --git a/src/main/java/net/spy/memcached/transcoders/GzipCompressor.java b/src/main/java/net/spy/memcached/transcoders/GzipCompressor.java new file mode 100644 index 000000000..8f5d013ae --- /dev/null +++ b/src/main/java/net/spy/memcached/transcoders/GzipCompressor.java @@ -0,0 +1,66 @@ +package net.spy.memcached.transcoders; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import net.spy.memcached.compat.CloseUtil; +import net.spy.memcached.compat.SpyObject; + +public class GzipCompressor extends SpyObject implements Compressor { + private int compressionThreshold = DEFAULT_COMPRESSION_THRESHOLD; + + @Override + public byte[] compress(byte[] in) { + if (in == null) { + throw new NullPointerException("Can't compress null"); + } + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + GZIPOutputStream gz = null; + try { + gz = new GZIPOutputStream(bos); + gz.write(in); + } catch (IOException e) { + throw new RuntimeException("IO exception compressing data", e); + } finally { + CloseUtil.close(gz); + CloseUtil.close(bos); + } + return bos.toByteArray(); + } + + @Override + public byte[] decompress(byte[] in) { + ByteArrayOutputStream bos = null; + if (in != null) { + ByteArrayInputStream bis = new ByteArrayInputStream(in); + bos = new ByteArrayOutputStream(); + GZIPInputStream gis; + try { + gis = new GZIPInputStream(bis); + byte[] buf = new byte[8192]; + int r = -1; + while ((r = gis.read(buf)) > 0) { + bos.write(buf, 0, r); + } + } catch (IOException e) { + getLogger().warn("Failed to decompress data", e); + bos = null; + } + } + return bos == null ? null : bos.toByteArray(); + } + + @Override + public int getCompressionThreshold() { + return compressionThreshold; + } + + @Override + public void setCompressionThreshold(int compressionThreshold) { + this.compressionThreshold = compressionThreshold; + } + +} diff --git a/src/main/java/net/spy/memcached/transcoders/SerializingTranscoder.java b/src/main/java/net/spy/memcached/transcoders/SerializingTranscoder.java index 8cd58bd75..a2918e5b9 100644 --- a/src/main/java/net/spy/memcached/transcoders/SerializingTranscoder.java +++ b/src/main/java/net/spy/memcached/transcoders/SerializingTranscoder.java @@ -59,6 +59,20 @@ public SerializingTranscoder(int max) { super(max); } + /** + * Get a serializing transcoder customized compressor. + */ + public SerializingTranscoder(Compressor compressor) { + super(CachedData.MAX_SIZE, compressor); + } + + /** + * Get a serializing transcoder that specifies the max data size and customized compressor. + */ + public SerializingTranscoder(int max, Compressor compressor) { + super(max, compressor); + } + @Override public boolean asyncDecode(CachedData d) { if ((d.getFlags() & COMPRESSED) != 0 @@ -72,7 +86,7 @@ public Object decode(CachedData d) { byte[] data = d.getData(); Object rv = null; if ((d.getFlags() & COMPRESSED) != 0) { - data = decompress(d.getData()); + data = doDecompress(d); } int flags = d.getFlags() & SPECIAL_MASK; if ((d.getFlags() & SERIALIZED) != 0 && data != null) { @@ -146,20 +160,6 @@ public CachedData encode(Object o) { flags |= SERIALIZED; } assert b != null; - if (b.length > compressionThreshold) { - byte[] compressed = compress(b); - if (compressed.length < b.length) { - getLogger().debug("Compressed %s from %d to %d", - o.getClass().getName(), b.length, compressed.length); - b = compressed; - flags |= COMPRESSED; - } else { - getLogger().info( - "Compression increased the size of %s from %d to %d", - o.getClass().getName(), b.length, compressed.length); - } - } - return new CachedData(flags, b, getMaxSize()); + return doCompress(b, flags, o.getClass()); } - } diff --git a/src/main/java/net/spy/memcached/transcoders/WhalinTranscoder.java b/src/main/java/net/spy/memcached/transcoders/WhalinTranscoder.java index b407f8235..471baa130 100644 --- a/src/main/java/net/spy/memcached/transcoders/WhalinTranscoder.java +++ b/src/main/java/net/spy/memcached/transcoders/WhalinTranscoder.java @@ -48,11 +48,15 @@ public WhalinTranscoder() { super(CachedData.MAX_SIZE); } + public WhalinTranscoder(Compressor compressor) { + super(CachedData.MAX_SIZE, compressor); + } + public Object decode(CachedData d) { byte[] data = d.getData(); Object rv = null; if ((d.getFlags() & COMPRESSED) != 0) { - data = decompress(d.getData()); + data = doDecompress(d); } if ((d.getFlags() & SERIALIZED) != 0) { rv = deserialize(data); @@ -152,20 +156,7 @@ public CachedData encode(Object o) { flags |= SERIALIZED; } assert b != null; - if (b.length > compressionThreshold) { - byte[] compressed = compress(b); - if (compressed.length < b.length) { - getLogger().debug("Compressed %s from %d to %d", - o.getClass().getName(), b.length, compressed.length); - b = compressed; - flags |= COMPRESSED; - } else { - getLogger().info( - "Compression increased the size of %s from %d to %d", - o.getClass().getName(), b.length, compressed.length); - } - } - return new CachedData(flags, b, getMaxSize()); + return doCompress(b, flags, o.getClass()); } protected Character decodeCharacter(byte[] b) { diff --git a/src/main/java/net/spy/memcached/transcoders/WhalinV1Transcoder.java b/src/main/java/net/spy/memcached/transcoders/WhalinV1Transcoder.java index 093a9d43b..70caaf1e2 100644 --- a/src/main/java/net/spy/memcached/transcoders/WhalinV1Transcoder.java +++ b/src/main/java/net/spy/memcached/transcoders/WhalinV1Transcoder.java @@ -34,6 +34,10 @@ public WhalinV1Transcoder() { super(CachedData.MAX_SIZE); } + public WhalinV1Transcoder(Compressor compressor) { + super(CachedData.MAX_SIZE, compressor); + } + public CachedData encode(Object o) { byte[] b = null; int flags = 0; @@ -66,27 +70,14 @@ public CachedData encode(Object o) { flags |= SERIALIZED; } assert b != null; - if (b.length > compressionThreshold) { - byte[] compressed = compress(b); - if (compressed.length < b.length) { - getLogger().info("Compressed %s from %d to %d", - o.getClass().getName(), b.length, compressed.length); - b = compressed; - flags |= COMPRESSED; - } else { - getLogger().info( - "Compression increased the size of %s from %d to %d", - o.getClass().getName(), b.length, compressed.length); - } - } - return new CachedData(flags, b, getMaxSize()); + return doCompress(b, flags, o.getClass()); } public Object decode(CachedData d) { byte[] data = d.getData(); Object rv = null; if ((d.getFlags() & COMPRESSED) != 0) { - data = decompress(d.getData()); + data = doDecompress(d); } if ((d.getFlags() & SERIALIZED) != 0) { rv = deserialize(data); diff --git a/src/test/java/net/spy/memcached/transcoders/BaseSerializingTranscoderTest.java b/src/test/java/net/spy/memcached/transcoders/BaseSerializingTranscoderTest.java index 99189a803..6371e0f2b 100644 --- a/src/test/java/net/spy/memcached/transcoders/BaseSerializingTranscoderTest.java +++ b/src/test/java/net/spy/memcached/transcoders/BaseSerializingTranscoderTest.java @@ -12,6 +12,7 @@ public class BaseSerializingTranscoderTest extends TestCase { private Exposer ex; + private Compressor compressor = new GzipCompressor(); @Override protected void setUp() throws Exception { @@ -33,7 +34,7 @@ public void testInvalidCharacterSet() { public void testCompressNull() { try { - ex.compress(null); + compressor.compress(null); fail("Expected an assertion error"); } catch (NullPointerException e) { // pass @@ -67,7 +68,7 @@ public void testSerializeNull() { } public void testDecompressNull() { - assertNull(ex.decompress(null)); + assertNull(compressor.decompress(null)); } public void testUndeserializable() throws Exception { @@ -117,21 +118,11 @@ public void overrideCharsetSet(String to) { charset = to; } - @Override - public byte[] compress(byte[] in) { - return super.compress(in); - } - @Override public String decodeString(byte[] data) { return super.decodeString(data); } - @Override - public byte[] decompress(byte[] in) { - return super.decompress(in); - } - @Override public Object deserialize(byte[] in) { return super.deserialize(in);