Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring out serialization related duties and capturing essence of digest as DigestModel #99

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 35 additions & 74 deletions core/src/main/java/com/tdunning/math/stats/AVLTreeDigest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package com.tdunning.math.stats;

import com.tdunning.math.stats.serde.AVLTreeDigestCompactSerde;
import com.tdunning.math.stats.serde.DigestModelDefaultSerde;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -315,7 +318,7 @@ public double compression() {
*/
@Override
public int byteSize() {
return 32 + summary.size() * 12;
return DigestModelDefaultSerde.byteSize(summary.size());
}

/**
Expand All @@ -324,53 +327,20 @@ public int byteSize() {
*/
@Override
public int smallByteSize() {
int bound = byteSize();
ByteBuffer buf = ByteBuffer.allocate(bound);
asSmallBytes(buf);
return buf.position();
return AVLTreeDigestCompactSerde.byteSize(this);
}

private final static int VERBOSE_ENCODING = 1;
private final static int SMALL_ENCODING = 2;

/**
* Outputs a histogram as bytes using a particularly cheesy encoding.
*/
@Override
public void asBytes(ByteBuffer buf) {
buf.putInt(VERBOSE_ENCODING);
buf.putDouble(min);
buf.putDouble(max);
buf.putDouble((float) compression());
buf.putInt(summary.size());
for (Centroid centroid : summary) {
buf.putDouble(centroid.mean());
}

for (Centroid centroid : summary) {
buf.putInt(centroid.count());
}
DigestModelDefaultSerde.serialize(toModel(), buf);
}

@Override
public void asSmallBytes(ByteBuffer buf) {
buf.putInt(SMALL_ENCODING);
buf.putDouble(min);
buf.putDouble(max);
buf.putDouble(compression());
buf.putInt(summary.size());

double x = 0;
for (Centroid centroid : summary) {
double delta = centroid.mean() - x;
x = centroid.mean();
buf.putFloat((float) delta);
}

for (Centroid centroid : summary) {
int n = centroid.count();
encode(buf, n);
}
AVLTreeDigestCompactSerde.serialize(this, buf);
}

/**
Expand All @@ -381,45 +351,36 @@ public void asSmallBytes(ByteBuffer buf) {
*/
@SuppressWarnings("WeakerAccess")
public static AVLTreeDigest fromBytes(ByteBuffer buf) {
int encoding = buf.getInt();
if (encoding == VERBOSE_ENCODING) {
double min = buf.getDouble();
double max = buf.getDouble();
double compression = buf.getDouble();
AVLTreeDigest r = new AVLTreeDigest(compression);
r.setMinMax(min, max);
int n = buf.getInt();
double[] means = new double[n];
for (int i = 0; i < n; i++) {
means[i] = buf.getDouble();
}
for (int i = 0; i < n; i++) {
r.add(means[i], buf.getInt());
}
return r;
} else if (encoding == SMALL_ENCODING) {
double min = buf.getDouble();
double max = buf.getDouble();
double compression = buf.getDouble();
AVLTreeDigest r = new AVLTreeDigest(compression);
r.setMinMax(min, max);
int n = buf.getInt();
double[] means = new double[n];
double x = 0;
for (int i = 0; i < n; i++) {
double delta = buf.getFloat();
x += delta;
means[i] = x;
}
try {
DigestModel digestModel = DigestModelDefaultSerde.deserialize(buf);
return fromModel(digestModel);
} catch (IllegalArgumentException ex) {
buf.rewind(); //reset the buf position to enable read from start
return AVLTreeDigestCompactSerde.deserialize(buf);
}
}

for (int i = 0; i < n; i++) {
int z = decode(buf);
r.add(means[i], z);
}
return r;
} else {
throw new IllegalStateException("Invalid format for serialized histogram");
public DigestModel toModel() {
double[] positions = new double[summary.size()];
double[] weights = new double[summary.size()];
int i = 0;
for (Centroid centroid : summary) {
positions[i] = centroid.mean();
weights[i] = centroid.count();
i++;
}
return new DigestModel(compression, min, max, i, positions, weights);
}

public static AVLTreeDigest fromModel(DigestModel model) {
AVLTreeDigest r = new AVLTreeDigest(model.compression());
r.setMinMax(model.min(), model.max());
double[] mean = model.centroidPositions();
double[] weight = model.centroidWeights();
for (int i = 0; i < model.centroidCount(); i++) {
r.add(mean[i], (int) weight[i]);
}

return r;
}
}
29 changes: 0 additions & 29 deletions core/src/main/java/com/tdunning/math/stats/AbstractTDigest.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,35 +57,6 @@ static double interpolate(double x, double x0, double x1) {
return (x - x0) / (x1 - x0);
}

static void encode(ByteBuffer buf, int n) {
int k = 0;
while (n < 0 || n > 0x7f) {
byte b = (byte) (0x80 | (0x7f & n));
buf.put(b);
n = n >>> 7;
k++;
if (k >= 6) {
throw new IllegalStateException("Size is implausibly large");
}
}
buf.put((byte) n);
}

static int decode(ByteBuffer buf) {
int v = buf.get();
int z = 0x7f & v;
int shift = 7;
while ((v & 0x80) != 0) {
if (shift > 28) {
throw new IllegalStateException("Shift too large in decode");
}
v = buf.get();
z += (v & 0x7f) << shift;
shift += 7;
}
return z;
}

abstract void add(double x, int w, Centroid base);

/**
Expand Down
71 changes: 71 additions & 0 deletions core/src/main/java/com/tdunning/math/stats/DigestModel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.tdunning.math.stats;

public class DigestModel {
private final double compression;
private final double min;
private final double max;
private final int centroidCount;
private final double[] centroidPositions;
private final double[] centroidWeights;

//For compact encoding of MergingDigest
private Integer mainBufferSize = null;
private Integer tempBufferSize = null;

private boolean compactEncoding = false;

public DigestModel(double compression, double min, double max, int centroidCount, double[] centroidPositions, double[] centroidWeights) {
this.compression = compression;
this.min = min;
this.max = max;
this.centroidCount = centroidCount;
this.centroidPositions = centroidPositions;
this.centroidWeights = centroidWeights;
}

public DigestModel(double compression, double min, double max, int centroidCount, double[] centroidPositions, double[] centroidWeights, int mainBufferSize, int tempBufferSize) {
this(compression, min, max, centroidCount, centroidPositions, centroidWeights);
this.mainBufferSize = mainBufferSize;
this.tempBufferSize = tempBufferSize;
}

public void setCompactEncoding(boolean compactEncoding) {
this.compactEncoding = compactEncoding;
}

public double compression() {
return compression;
}

public double min() {
return min;
}

public double max() {
return max;
}

public int centroidCount() {
return centroidCount;
}

public double[] centroidPositions() {
return centroidPositions;
}

public double[] centroidWeights() {
return centroidWeights;
}

public boolean compactEncoding() {
return compactEncoding;
}

public Integer mainBufferSize() {
return mainBufferSize;
}

public Integer tempBufferSize() {
return tempBufferSize;
}
}
105 changes: 36 additions & 69 deletions core/src/main/java/com/tdunning/math/stats/MergingDigest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package com.tdunning.math.stats;

import com.tdunning.math.stats.serde.DigestModelDefaultSerde;
import com.tdunning.math.stats.serde.MergingDigestCompactSerde;

import java.nio.ByteBuffer;
import java.util.AbstractCollection;
import java.util.ArrayList;
Expand Down Expand Up @@ -752,96 +755,60 @@ public double compression() {
@Override
public int byteSize() {
compress();
// format code, compression(float), buffer-size(int), temp-size(int), #centroids-1(int),
// then two doubles per centroid
return lastUsedCell * 16 + 32;
return DigestModelDefaultSerde.byteSize(lastUsedCell);
}

@Override
public int smallByteSize() {
compress();
// format code(int), compression(float), buffer-size(short), temp-size(short), #centroids-1(short),
// then two floats per centroid
return lastUsedCell * 8 + 30;
}

public enum Encoding {
VERBOSE_ENCODING(1), SMALL_ENCODING(2);

private final int code;

Encoding(int code) {
this.code = code;
}
return MergingDigestCompactSerde.byteSize(lastUsedCell);
}

@Override
public void asBytes(ByteBuffer buf) {
compress();
buf.putInt(Encoding.VERBOSE_ENCODING.code);
buf.putDouble(min);
buf.putDouble(max);
buf.putDouble(compression);
buf.putInt(lastUsedCell);
for (int i = 0; i < lastUsedCell; i++) {
buf.putDouble(weight[i]);
buf.putDouble(mean[i]);
}
DigestModelDefaultSerde.serialize(toModel(), buf);
}

@Override
public void asSmallBytes(ByteBuffer buf) {
compress();
buf.putInt(Encoding.SMALL_ENCODING.code); // 4
buf.putDouble(min); // + 8
buf.putDouble(max); // + 8
buf.putFloat((float) compression); // + 4
buf.putShort((short) mean.length); // + 2
buf.putShort((short) tempMean.length); // + 2
buf.putShort((short) lastUsedCell); // + 2 = 30
for (int i = 0; i < lastUsedCell; i++) {
buf.putFloat((float) weight[i]);
buf.putFloat((float) mean[i]);
}
MergingDigestCompactSerde.serialize(this, buf);
}

@SuppressWarnings("WeakerAccess")
public static MergingDigest fromBytes(ByteBuffer buf) {
int encoding = buf.getInt();
if (encoding == Encoding.VERBOSE_ENCODING.code) {
double min = buf.getDouble();
double max = buf.getDouble();
double compression = buf.getDouble();
int n = buf.getInt();
MergingDigest r = new MergingDigest(compression);
r.setMinMax(min, max);
r.lastUsedCell = n;
for (int i = 0; i < n; i++) {
r.weight[i] = buf.getDouble();
r.mean[i] = buf.getDouble();

r.totalWeight += r.weight[i];
}
return r;
} else if (encoding == Encoding.SMALL_ENCODING.code) {
double min = buf.getDouble();
double max = buf.getDouble();
double compression = buf.getFloat();
int n = buf.getShort();
int bufferSize = buf.getShort();
MergingDigest r = new MergingDigest(compression, bufferSize, n);
r.setMinMax(min, max);
r.lastUsedCell = buf.getShort();
for (int i = 0; i < r.lastUsedCell; i++) {
r.weight[i] = buf.getFloat();
r.mean[i] = buf.getFloat();

r.totalWeight += r.weight[i];
}
return r;
try {
DigestModel digestModel = DigestModelDefaultSerde.deserialize(buf);
return fromModel(digestModel);
} catch (IllegalArgumentException ex) {
buf.rewind(); //reset the buf position to enable read from start
return MergingDigestCompactSerde.deserialize(buf);
}
}

public DigestModel toModel() {
compress();
return new DigestModel(compression, min, max, lastUsedCell, mean, weight, mean.length, tempMean.length);
}

public static MergingDigest fromModel(DigestModel model) {
MergingDigest r;
if(model.compactEncoding()) {
r = new MergingDigest(model.compression(), model.tempBufferSize(), model.mainBufferSize());
} else {
throw new IllegalStateException("Invalid format for serialized histogram");
r = new MergingDigest(model.compression());
}

r.setMinMax(model.min(), model.max());
r.lastUsedCell = model.centroidCount();
double[] mean = model.centroidPositions();
double[] weight = model.centroidWeights();
for (int i = 0;i < model.centroidCount();i++) {
r.mean[i] = mean[i];
r.weight[i] = weight[i];
r.totalWeight += weight[i];
}
return r;
}
}
Loading