Skip to content

Commit

Permalink
modify according review
Browse files Browse the repository at this point in the history
  • Loading branch information
Wei-hao-Li committed Jan 9, 2025
1 parent 9ec6ce1 commit 677697a
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public enum ColumnEncoding {
/** TEXT. */
BINARY_ARRAY((byte) 3),
/** All data types. */
RLE((byte) 4);
RLE((byte) 4),
/** All data types. */
DICTIONARY((byte) 5);

private final byte value;

Expand Down Expand Up @@ -61,6 +63,8 @@ private static ColumnEncoding getColumnEncoding(byte value) {
return BINARY_ARRAY;
case 4:
return RLE;
case 5:
return DICTIONARY;
default:
throw new IllegalArgumentException("Invalid value: " + value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ private ColumnEncoderFactory() {
encodingToEncoder.put(ColumnEncoding.BYTE_ARRAY, new ByteArrayColumnEncoder());
encodingToEncoder.put(ColumnEncoding.BINARY_ARRAY, new BinaryArrayColumnEncoder());
encodingToEncoder.put(ColumnEncoding.RLE, new RunLengthColumnEncoder());
encodingToEncoder.put(ColumnEncoding.DICTIONARY, new DictionaryColumnEncoder());
}

public static ColumnEncoder get(ColumnEncoding columnEncoding) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ static Column createInternal(
idsOffset, positionCount, dictionary, ids, false, false, dictionarySourceId);
}

private DictionaryColumn(
DictionaryColumn(
int idsOffset,
int positionCount,
Column dictionary,
Expand Down Expand Up @@ -243,7 +243,7 @@ public TSDataType getDataType() {

@Override
public ColumnEncoding getEncoding() {
return dictionary.getEncoding();
return ColumnEncoding.DICTIONARY;
}

@Override
Expand Down Expand Up @@ -279,8 +279,7 @@ public boolean[] isNull() {

@Override
public void setNull(int start, int end) {
throw new UnsupportedOperationException(
String.format("set null of %s is not supported !", DictionaryColumn.class.getSimpleName()));
throw new UnsupportedOperationException(getClass().getSimpleName());
}

@Override
Expand Down Expand Up @@ -420,49 +419,14 @@ public Object getObject(int position) {
return dictionary.getObject(getId(position));
}

@Override
public boolean[] getBooleans() {
return dictionary.getBooleans();
}

@Override
public int[] getInts() {
return dictionary.getInts();
}

@Override
public long[] getLongs() {
return dictionary.getLongs();
}

@Override
public float[] getFloats() {
return dictionary.getFloats();
}

@Override
public double[] getDoubles() {
return dictionary.getDoubles();
}

@Override
public Binary[] getBinaries() {
return dictionary.getBinaries();
}

@Override
public Object[] getObjects() {
return dictionary.getObjects();
}

@Override
public TsPrimitiveType getTsPrimitiveType(int position) {
return dictionary.getTsPrimitiveType(position);
}

@Override
public void reset() {
dictionary.reset();
throw new UnsupportedOperationException(getClass().getSimpleName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.tsfile.read.common.block.column;

import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnEncoding;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

public class DictionaryColumnEncoder implements ColumnEncoder {

@Override
public Column readColumn(ByteBuffer input, TSDataType dataType, int positionCount) {
// dictionary
ColumnEncoder columnEncoder = ColumnEncoderFactory.get(ColumnEncoding.deserializeFrom(input));
Column dictionary = columnEncoder.readColumn(input, dataType, ReadWriteIOUtils.readInt(input));

// ids
int[] ids = ReadWriteIOUtils.readInts(input);

// flatten the dictionary
return dictionary.copyPositions(ids, 0, positionCount);
}

@Override
public void writeColumn(DataOutputStream output, Column column) throws IOException {
DictionaryColumn dictionaryColumn = (DictionaryColumn) column;
// compact before serialize
dictionaryColumn = dictionaryColumn.compact();

Column dictionary = dictionaryColumn.getDictionary();

// dictionary
dictionary.getEncoding().serializeTo(output);
int positionCount = dictionary.getPositionCount();
ReadWriteIOUtils.write(positionCount, output);
ColumnEncoder columnEncoder = ColumnEncoderFactory.get(dictionary.getEncoding());
columnEncoder.writeColumn(output, dictionary);

// ids
ReadWriteIOUtils.writeInts(
dictionaryColumn.getRawIds(),
dictionaryColumn.getRawIdsOffset(),
dictionaryColumn.getPositionCount(),
output);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import static java.util.Objects.requireNonNull;
import static org.apache.tsfile.read.common.block.column.ColumnUtil.checkArrayRange;
import static org.apache.tsfile.read.common.block.column.ColumnUtil.checkReadablePosition;
import static org.apache.tsfile.read.common.block.column.ColumnUtil.checkValidPosition;
import static org.apache.tsfile.read.common.block.column.ColumnUtil.checkValidRegion;

public class RunLengthEncodedColumn implements Column {
Expand Down Expand Up @@ -119,48 +119,6 @@ public boolean[] getBooleans() {
return res;
}

@Override
public int[] getInts() {
int[] res = new int[positionCount];
Arrays.fill(res, value.getInt(0));
return res;
}

@Override
public long[] getLongs() {
long[] res = new long[positionCount];
Arrays.fill(res, value.getLong(0));
return res;
}

@Override
public float[] getFloats() {
float[] res = new float[positionCount];
Arrays.fill(res, value.getFloat(0));
return res;
}

@Override
public double[] getDoubles() {
double[] res = new double[positionCount];
Arrays.fill(res, value.getDouble(0));
return res;
}

@Override
public Binary[] getBinaries() {
Binary[] res = new Binary[positionCount];
Arrays.fill(res, value.getBinary(0));
return res;
}

@Override
public Object[] getObjects() {
Object[] res = new Object[positionCount];
Arrays.fill(res, value.getObject(0));
return res;
}

@Override
public TsPrimitiveType getTsPrimitiveType(int position) {
return value.getTsPrimitiveType(0);
Expand Down Expand Up @@ -227,19 +185,20 @@ public Column subColumnCopy(int fromIndex) {
public Column getPositions(int[] positions, int offset, int length) {
checkArrayRange(positions, offset, length);

return DictionaryColumn.createInternal(
offset, length, this, positions, DictionaryId.randomDictionaryId());
for (int i = offset; i < offset + length; i++) {
checkValidPosition(positions[i], positionCount);
}
return new RunLengthEncodedColumn(value, length);
}

@Override
public Column copyPositions(int[] positions, int offset, int length) {
checkArrayRange(positions, offset, length);

for (int position : positions) {
checkReadablePosition(this, position);
for (int i = offset; i < offset + length; i++) {
checkValidPosition(positions[i], positionCount);
}

return new RunLengthEncodedColumn(value, length);
return new RunLengthEncodedColumn(value.subColumnCopy(0), length);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,23 @@ public static Object readObject(ByteBuffer buffer) {
}
}

public static void writeInts(int[] ints, int offset, int length, OutputStream outputStream)
throws IOException {
write(length, outputStream);
for (int i = 0; i < length; i++) {
write(ints[offset + i], outputStream);
}
}

public static int[] readInts(ByteBuffer buffer) {
int length = readInt(buffer);
int[] ints = new int[length];
for (int i = 0; i < length; i++) {
ints[i] = readInt(buffer);
}
return ints;
}

public static ByteBuffer clone(ByteBuffer original) {
ByteBuffer clone = ByteBuffer.allocate(original.remaining());
while (original.hasRemaining()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.tsfile.common.block;

import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.block.column.ColumnEncoding;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.column.ColumnEncoder;
import org.apache.tsfile.read.common.block.column.ColumnEncoderFactory;
import org.apache.tsfile.read.common.block.column.DictionaryColumn;
import org.apache.tsfile.read.common.block.column.LongColumn;
import org.apache.tsfile.read.common.block.column.LongColumnBuilder;

import org.junit.Assert;
import org.junit.Test;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

public class DictionaryColumnEncodingTest {
@Test
public void testIntColumn() {
ColumnBuilder columnBuilder = new LongColumnBuilder(null, 10);
for (int i = 0; i < 10; i++) {
if (i == 5) {
columnBuilder.appendNull();
} else {
columnBuilder.writeLong(i % 5);
}
}
Column originalColumn = columnBuilder.build();
DictionaryColumn input =
(DictionaryColumn) originalColumn.getPositions(new int[] {1, 3, 5, 8, 9}, 1, 4);

ColumnEncoder encoder = ColumnEncoderFactory.get(ColumnEncoding.DICTIONARY);

ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(byteArrayOutputStream);
try {
encoder.writeColumn(dos, input);
} catch (IOException e) {
e.printStackTrace();
Assert.fail();
}

ByteBuffer buffer = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
Column output = encoder.readColumn(buffer, TSDataType.INT64, input.getPositionCount());
Assert.assertTrue(output instanceof LongColumn);

Assert.assertEquals(input.getPositionCount(), output.getPositionCount());
Assert.assertTrue(output.mayHaveNull());
Assert.assertEquals(3, output.getLong(0));
Assert.assertTrue(output.isNull(1));
Assert.assertEquals(3, output.getLong(2));
Assert.assertEquals(4, output.getLong(3));
}
}
Loading

0 comments on commit 677697a

Please sign in to comment.