Skip to content

Commit

Permalink
Convert column name and table name to lower case (#322)
Browse files Browse the repository at this point in the history
* Only performs the lower case conversion in table model

* modify DeviceTableModelWriter

* fix ut

* modify query

* modify query

* remove null check
  • Loading branch information
shuwenwei authored Dec 6, 2024
1 parent 00e6b7b commit f1466e6
Show file tree
Hide file tree
Showing 12 changed files with 250 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public static void main(String[] args) throws IOException {
tablet.addValue(rowIndex, 0, "id1_field_2");

// id2 column
tablet.addValue(rowIndex, 1, "id1_field_2");
tablet.addValue(rowIndex, 1, "id2_field_2");

// s1 column
tablet.addValue(rowIndex, 2, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public class TableSchema {

Expand All @@ -53,18 +54,45 @@ public class TableSchema {
private Map<String, Integer> idColumnOrder;

public TableSchema(String tableName) {
this.tableName = tableName;
this.tableName = tableName.toLowerCase();
this.measurementSchemas = new ArrayList<>();
this.columnCategories = new ArrayList<>();
this.updatable = true;
}

// for deserialize
public TableSchema(
List<IMeasurementSchema> columnSchemas, List<ColumnCategory> columnCategories) {
this.measurementSchemas =
columnSchemas.stream()
.map(
measurementSchema ->
new MeasurementSchema(
measurementSchema.getMeasurementName().toLowerCase(),
measurementSchema.getType(),
measurementSchema.getEncodingType(),
measurementSchema.getCompressor(),
measurementSchema.getProps()))
.collect(Collectors.toList());
this.columnCategories = columnCategories;
}

public TableSchema(
String tableName,
List<IMeasurementSchema> columnSchemas,
List<ColumnCategory> columnCategories) {
this.tableName = tableName;
this.measurementSchemas = columnSchemas;
this.tableName = tableName.toLowerCase();
this.measurementSchemas =
columnSchemas.stream()
.map(
measurementSchema ->
new MeasurementSchema(
measurementSchema.getMeasurementName().toLowerCase(),
measurementSchema.getType(),
measurementSchema.getEncodingType(),
measurementSchema.getCompressor(),
measurementSchema.getProps()))
.collect(Collectors.toList());
this.columnCategories = columnCategories;
}

Expand All @@ -73,22 +101,24 @@ public TableSchema(
List<String> columnNameList,
List<TSDataType> dataTypeList,
List<ColumnCategory> categoryList) {
this.tableName = tableName;
this.tableName = tableName.toLowerCase();
this.measurementSchemas = new ArrayList<>(columnNameList.size());
for (int i = 0; i < columnNameList.size(); i++) {
measurementSchemas.add(new MeasurementSchema(columnNameList.get(i), dataTypeList.get(i)));
measurementSchemas.add(
new MeasurementSchema(columnNameList.get(i).toLowerCase(), dataTypeList.get(i)));
}
this.columnCategories = categoryList;
}

@TsFileApi
public TableSchema(String tableName, List<ColumnSchema> columnSchemaList) {
this.tableName = tableName;
this.tableName = tableName.toLowerCase();
this.measurementSchemas = new ArrayList<>(columnSchemaList.size());
this.columnCategories = new ArrayList<>(columnSchemaList.size());
for (ColumnSchema columnSchema : columnSchemaList) {
this.measurementSchemas.add(
new MeasurementSchema(columnSchema.getColumnName(), columnSchema.getDataType()));
new MeasurementSchema(
columnSchema.getColumnName().toLowerCase(), columnSchema.getDataType()));
this.columnCategories.add(columnSchema.getColumnCategory());
}
}
Expand Down Expand Up @@ -126,12 +156,13 @@ public Map<String, Integer> getIdColumnOrder() {
* @return i if the given column is the i-th column, -1 if the column is not in the schema
*/
public int findColumnIndex(String columnName) {
final String lowerCaseColumnName = columnName.toLowerCase();
return getColumnPosIndex()
.computeIfAbsent(
columnName,
lowerCaseColumnName,
colName -> {
for (int i = 0; i < measurementSchemas.size(); i++) {
if (measurementSchemas.get(i).getMeasurementName().equals(columnName)) {
if (measurementSchemas.get(i).getMeasurementName().equals(lowerCaseColumnName)) {
return i;
}
}
Expand All @@ -144,13 +175,14 @@ public int findColumnIndex(String columnName) {
* not an ID column
*/
public int findIdColumnOrder(String columnName) {
final String lowerCaseColumnName = columnName.toLowerCase();
return getIdColumnOrder()
.computeIfAbsent(
columnName,
lowerCaseColumnName,
colName -> {
int columnOrder = 0;
for (int i = 0; i < measurementSchemas.size(); i++) {
if (measurementSchemas.get(i).getMeasurementName().equals(columnName)
if (measurementSchemas.get(i).getMeasurementName().equals(lowerCaseColumnName)
&& columnCategories.get(i) == ColumnCategory.ID) {
return columnOrder;
} else if (columnCategories.get(i) == ColumnCategory.ID) {
Expand All @@ -162,7 +194,7 @@ public int findIdColumnOrder(String columnName) {
}

public IMeasurementSchema findColumnSchema(String columnName) {
final int columnIndex = findColumnIndex(columnName);
final int columnIndex = findColumnIndex(columnName.toLowerCase());
return columnIndex >= 0 ? measurementSchemas.get(columnIndex) : null;
}

Expand Down Expand Up @@ -230,15 +262,15 @@ public static TableSchema deserialize(ByteBuffer buffer, DeserializeConfig conte
measurementSchemas.add(measurementSchema);
columnCategories.add(ColumnCategory.values()[buffer.getInt()]);
}
return new TableSchema(null, measurementSchemas, columnCategories);
return new TableSchema(measurementSchemas, columnCategories);
}

public String getTableName() {
return tableName;
}

public void setTableName(String tableName) {
this.tableName = tableName;
this.tableName = tableName.toLowerCase();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,30 +74,33 @@ public List<TableSchema> getAllTableSchema() throws IOException {
public Optional<TableSchema> getTableSchemas(String tableName) throws IOException {
TsFileMetadata tsFileMetadata = fileReader.readFileMetadata();
Map<String, TableSchema> tableSchemaMap = tsFileMetadata.getTableSchemaMap();
return Optional.ofNullable(tableSchemaMap.get(tableName));
return Optional.ofNullable(tableSchemaMap.get(tableName.toLowerCase()));
}

@TsFileApi
public ResultSet query(String tableName, List<String> columnNames, long startTime, long endTime)
throws IOException, NoTableException, NoMeasurementException, ReadProcessException {
String lowerCaseTableName = tableName.toLowerCase();
TsFileMetadata tsFileMetadata = fileReader.readFileMetadata();
TableSchema tableSchema = tsFileMetadata.getTableSchemaMap().get(tableName);
TableSchema tableSchema = tsFileMetadata.getTableSchemaMap().get(lowerCaseTableName);
if (tableSchema == null) {
throw new NoTableException(tableName);
}
List<TSDataType> dataTypeList = new ArrayList<>(columnNames.size());
List<String> lowerCaseColumnNames = new ArrayList<>(columnNames.size());
for (String columnName : columnNames) {
Map<String, Integer> column2IndexMap = tableSchema.buildColumnPosIndex();
Integer columnIndex = column2IndexMap.get(columnName);
Integer columnIndex = column2IndexMap.get(columnName.toLowerCase());
if (columnIndex == null) {
throw new NoMeasurementException(columnName);
}
lowerCaseColumnNames.add(columnName.toLowerCase());
dataTypeList.add(tableSchema.getColumnSchemas().get(columnIndex).getType());
}
TsBlockReader tsBlockReader =
queryExecutor.query(
tableName,
columnNames,
lowerCaseTableName,
lowerCaseColumnNames,
new ExpressionTree.TimeBetweenAnd(startTime, endTime),
null,
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
import org.apache.tsfile.write.chunk.IChunkGroupWriter;
import org.apache.tsfile.write.chunk.NonAlignedChunkGroupWriterImpl;
import org.apache.tsfile.write.chunk.TableChunkGroupWriterImpl;
import org.apache.tsfile.write.record.TSRecord;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.record.datapoint.DataPoint;
Expand Down Expand Up @@ -347,7 +348,7 @@ private boolean checkIsTimeseriesExist(TSRecord record, boolean isAligned)
throws WriteProcessException, IOException {
// initial ChunkGroupWriter of this device in the TSRecord
final IDeviceID deviceID = record.deviceId;
IChunkGroupWriter groupWriter = tryToInitialGroupWriter(deviceID, isAligned);
IChunkGroupWriter groupWriter = tryToInitialGroupWriter(deviceID, isAligned, false);

// initial all SeriesWriters of measurements in this TSRecord
List<IMeasurementSchema> measurementSchemas;
Expand Down Expand Up @@ -411,7 +412,7 @@ private void checkIsTableExistAndSetColumnCategoryList(Tablet tablet)
private void checkIsTimeseriesExist(Tablet tablet, boolean isAligned)
throws WriteProcessException, IOException {
final IDeviceID deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId());
IChunkGroupWriter groupWriter = tryToInitialGroupWriter(deviceID, isAligned);
IChunkGroupWriter groupWriter = tryToInitialGroupWriter(deviceID, isAligned, false);

List<IMeasurementSchema> schemas = tablet.getSchemas();
if (getSchema().containsDevice(deviceID)) {
Expand Down Expand Up @@ -495,11 +496,15 @@ private List<IMeasurementSchema> checkIsAllMeasurementsInGroup(
return schemas;
}

private IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId, boolean isAligned) {
private IChunkGroupWriter tryToInitialGroupWriter(
IDeviceID deviceId, boolean isAligned, boolean isTableModel) {
IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
if (groupWriter == null) {
if (isAligned) {
groupWriter = new AlignedChunkGroupWriterImpl(deviceId, encryptParam);
groupWriter =
isTableModel
? new TableChunkGroupWriterImpl(deviceId, encryptParam)
: new AlignedChunkGroupWriterImpl(deviceId, encryptParam);
if (!isUnseq) { // Sequence File
((AlignedChunkGroupWriterImpl) groupWriter)
.setLastTime(alignedDeviceLastTimeMap.get(deviceId));
Expand Down Expand Up @@ -733,7 +738,7 @@ public boolean writeTable(Tablet tablet, List<Pair<IDeviceID, Integer>> deviceId
for (Pair<IDeviceID, Integer> pair : deviceIdEndIndexPairs) {
// get corresponding ChunkGroupWriter and write this Tablet
recordCount +=
tryToInitialGroupWriter(pair.left, isTableWriteAligned)
tryToInitialGroupWriter(pair.left, isTableWriteAligned, true)
.write(tablet, startIndex, pair.right);
startIndex = pair.right;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {

private long lastTime = Long.MIN_VALUE;
private boolean isInitLastTime = false;
private boolean convertColumnNameToLowerCase = false;

public AlignedChunkGroupWriterImpl(IDeviceID deviceId) {
this.deviceId = deviceId;
Expand Down Expand Up @@ -100,17 +101,20 @@ public void tryToAddSeriesWriter(IMeasurementSchema measurementSchema) throws IO

public ValueChunkWriter tryToAddSeriesWriterInternal(IMeasurementSchema measurementSchema)
throws IOException {
ValueChunkWriter valueChunkWriter =
valueChunkWriterMap.get(measurementSchema.getMeasurementName());
String measurementName =
convertColumnNameToLowerCase
? measurementSchema.getMeasurementName().toLowerCase()
: measurementSchema.getMeasurementName();
ValueChunkWriter valueChunkWriter = valueChunkWriterMap.get(measurementName);
if (valueChunkWriter == null) {
valueChunkWriter =
new ValueChunkWriter(
measurementSchema.getMeasurementName(),
measurementName,
measurementSchema.getCompressor(),
measurementSchema.getType(),
measurementSchema.getEncodingType(),
measurementSchema.getValueEncoder());
valueChunkWriterMap.put(measurementSchema.getMeasurementName(), valueChunkWriter);
valueChunkWriterMap.put(measurementName, valueChunkWriter);
tryToAddEmptyPageAndData(valueChunkWriter);
}
return valueChunkWriter;
Expand All @@ -119,15 +123,19 @@ public ValueChunkWriter tryToAddSeriesWriterInternal(IMeasurementSchema measurem
@Override
public void tryToAddSeriesWriter(List<IMeasurementSchema> measurementSchemas) throws IOException {
for (IMeasurementSchema schema : measurementSchemas) {
if (!valueChunkWriterMap.containsKey(schema.getMeasurementName())) {
String measurementName =
convertColumnNameToLowerCase
? schema.getMeasurementName().toLowerCase()
: schema.getMeasurementName();
if (!valueChunkWriterMap.containsKey(measurementName)) {
ValueChunkWriter valueChunkWriter =
new ValueChunkWriter(
schema.getMeasurementName(),
measurementName,
schema.getCompressor(),
schema.getType(),
schema.getEncodingType(),
schema.getValueEncoder());
valueChunkWriterMap.put(schema.getMeasurementName(), valueChunkWriter);
valueChunkWriterMap.put(measurementName, valueChunkWriter);
tryToAddEmptyPageAndData(valueChunkWriter);
}
}
Expand All @@ -138,15 +146,25 @@ public int write(long time, List<DataPoint> data) throws WriteProcessException,
checkIsHistoryData(time);
List<ValueChunkWriter> emptyValueChunkWriters = new ArrayList<>();
Set<String> existingMeasurements =
data.stream().map(DataPoint::getMeasurementId).collect(Collectors.toSet());
data.stream()
.map(
dataPoint ->
convertColumnNameToLowerCase
? dataPoint.getMeasurementId().toLowerCase()
: dataPoint.getMeasurementId())
.collect(Collectors.toSet());
for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) {
if (!existingMeasurements.contains(entry.getKey())) {
emptyValueChunkWriters.add(entry.getValue());
}
}
for (DataPoint point : data) {
boolean isNull = point.getValue() == null;
ValueChunkWriter valueChunkWriter = valueChunkWriterMap.get(point.getMeasurementId());
String measurementId =
convertColumnNameToLowerCase
? point.getMeasurementId().toLowerCase()
: point.getMeasurementId();
ValueChunkWriter valueChunkWriter = valueChunkWriterMap.get(measurementId);
switch (point.getType()) {
case BOOLEAN:
valueChunkWriter.write(time, (boolean) point.getValue(), isNull);
Expand Down Expand Up @@ -201,7 +219,11 @@ public int write(Tablet tablet, int startRowIndex, int endRowIndex)
// TODO: should we allow duplicated measurements in a Tablet?
Set<String> existingMeasurements =
measurementSchemas.stream()
.map(IMeasurementSchema::getMeasurementName)
.map(
schema ->
convertColumnNameToLowerCase
? schema.getMeasurementName().toLowerCase()
: schema.getMeasurementName())
.collect(Collectors.toSet());
for (Map.Entry<String, ValueChunkWriter> entry : valueChunkWriterMap.entrySet()) {
if (!existingMeasurements.contains(entry.getKey())) {
Expand Down Expand Up @@ -413,4 +435,8 @@ public void setLastTime(Long lastTime) {
isInitLastTime = true;
}
}

public void setConvertColumnNameToLowerCase(boolean convertColumnNameToLowerCase) {
this.convertColumnNameToLowerCase = convertColumnNameToLowerCase;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.tsfile.write.chunk;

import org.apache.tsfile.encrypt.EncryptParameter;
import org.apache.tsfile.file.metadata.IDeviceID;

public class TableChunkGroupWriterImpl extends AlignedChunkGroupWriterImpl {

public TableChunkGroupWriterImpl(IDeviceID deviceId) {
super(deviceId);
setConvertColumnNameToLowerCase(true);
}

public TableChunkGroupWriterImpl(IDeviceID deviceId, EncryptParameter encryptParam) {
super(deviceId, encryptParam);
setConvertColumnNameToLowerCase(true);
}
}
Loading

0 comments on commit f1466e6

Please sign in to comment.