diff --git a/Cargo.lock b/Cargo.lock index 698e3138d806..e9b91d20379a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1661,6 +1661,20 @@ dependencies = [ "url", ] +[[package]] +name = "common-decimal" +version = "0.4.2" +dependencies = [ + "arrow", + "bigdecimal 0.4.2", + "common-error", + "common-macro", + "rust_decimal", + "serde", + "serde_json", + "snafu", +] + [[package]] name = "common-error" version = "0.4.2" @@ -2707,6 +2721,7 @@ dependencies = [ "arrow-array", "arrow-schema", "common-base", + "common-decimal", "common-error", "common-macro", "common-telemetry", @@ -2717,6 +2732,7 @@ dependencies = [ "num-traits", "ordered-float 3.9.2", "paste", + "rust_decimal", "serde", "serde_json", "snafu", @@ -2731,20 +2747,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "decimal" -version = "0.4.2" -dependencies = [ - "arrow", - "bigdecimal 0.4.2", - "common-error", - "common-macro", - "rust_decimal", - "serde", - "serde_json", - "snafu", -] - [[package]] name = "der" version = "0.5.1" @@ -4843,6 +4845,7 @@ dependencies = [ "common-base", "common-catalog", "common-datasource", + "common-decimal", "common-error", "common-macro", "common-procedure", @@ -8559,6 +8562,7 @@ dependencies = [ "common-base", "common-catalog", "common-datasource", + "common-decimal", "common-error", "common-macro", "common-query", diff --git a/Cargo.toml b/Cargo.toml index b178fd9b97ea..a348a82c96ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -136,6 +136,7 @@ common-base = { path = "src/common/base" } common-catalog = { path = "src/common/catalog" } common-config = { path = "src/common/config" } common-datasource = { path = "src/common/datasource" } +common-decimal = { path = "src/common/decimal" } common-error = { path = "src/common/error" } common-function = { path = "src/common/function" } common-greptimedb-telemetry = { path = "src/common/greptimedb-telemetry" } diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 5ebea9c4bbe1..9328540bdf0a 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -158,7 +158,8 @@ impl TryFrom for ColumnDataTypeWrapper { }, ConcreteDataType::Null(_) | ConcreteDataType::List(_) - | ConcreteDataType::Dictionary(_) => { + | ConcreteDataType::Dictionary(_) + | ConcreteDataType::Decimal128(_) => { return error::IntoColumnDataTypeSnafu { from: datatype }.fail() } }); @@ -341,7 +342,7 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) { TimeUnit::Microsecond => values.duration_microsecond_values.push(val.value()), TimeUnit::Nanosecond => values.duration_nanosecond_values.push(val.value()), }, - Value::List(_) => unreachable!(), + Value::List(_) | Value::Decimal128(_) => unreachable!(), }); column.null_mask = null_mask.into_vec(); } @@ -522,7 +523,10 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) -> values.duration_nanosecond_values, )), }, - ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { + ConcreteDataType::Null(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) + | ConcreteDataType::Decimal128(_) => { unreachable!() } } @@ -692,7 +696,10 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec< .into_iter() .map(|v| Value::Duration(Duration::new_nanosecond(v))) .collect(), - ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { + ConcreteDataType::Null(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) + | ConcreteDataType::Decimal128(_) => { unreachable!() } } @@ -816,7 +823,7 @@ pub fn to_proto_value(value: Value) -> Option { value_data: Some(ValueData::DurationNanosecondValue(v.value())), }, }, - Value::List(_) => return None, + Value::List(_) | Value::Decimal128(_) => return None, }; Some(proto_value) @@ -908,9 +915,10 @@ pub fn to_column_data_type(data_type: &ConcreteDataType) -> Option ColumnDataType::IntervalDayTime, - ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { - return None - } + ConcreteDataType::Null(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) + | ConcreteDataType::Decimal128(_) => return None, }; Some(column_data_type) @@ -974,7 +982,7 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { TimeUnit::Microsecond => ValueData::DurationMicrosecondValue(v.value()), TimeUnit::Nanosecond => ValueData::DurationNanosecondValue(v.value()), }), - Value::List(_) => unreachable!(), + Value::List(_) | Value::Decimal128(_) => unreachable!(), }, } } diff --git a/src/common/decimal/Cargo.toml b/src/common/decimal/Cargo.toml index 14e714de0688..dd0ba90c440f 100644 --- a/src/common/decimal/Cargo.toml +++ b/src/common/decimal/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "decimal" +name = "common-decimal" version.workspace = true edition.workspace = true license.workspace = true diff --git a/src/common/decimal/src/decimal128.rs b/src/common/decimal/src/decimal128.rs index 98becbe9c8a7..c68039fb57a7 100644 --- a/src/common/decimal/src/decimal128.rs +++ b/src/common/decimal/src/decimal128.rs @@ -43,7 +43,7 @@ const BYTES_TO_OVERFLOW_RUST_DECIMAL: usize = 28; /// **precision**: the total number of digits in the number, it's range is \[1, 38\]. /// /// **scale**: the number of digits to the right of the decimal point, it's range is \[0, precision\]. -#[derive(Debug, Default, Eq, Copy, Clone, Serialize, Deserialize)] +#[derive(Debug, Eq, Copy, Clone, Serialize, Deserialize)] pub struct Decimal128 { value: i128, precision: u8, @@ -51,8 +51,18 @@ pub struct Decimal128 { } impl Decimal128 { - /// Create a new Decimal128 from i128, precision and scale. - pub fn new_unchecked(value: i128, precision: u8, scale: i8) -> Self { + /// Create a new Decimal128 from i128, precision and scale without any validation. + pub fn new(value: i128, precision: u8, scale: i8) -> Self { + // debug assert precision and scale is valid + debug_assert!( + precision > 0 && precision <= DECIMAL128_MAX_PRECISION, + "precision should be in [1, {}]", + DECIMAL128_MAX_PRECISION + ); + debug_assert!( + scale >= 0 && scale <= precision as i8, + "scale should be in [0, precision]" + ); Self { value, precision, @@ -60,6 +70,7 @@ impl Decimal128 { } } + /// Try new Decimal128 from i128, precision and scale with validation. pub fn try_new(value: i128, precision: u8, scale: i8) -> error::Result { // make sure the precision and scale is valid. valid_precision_and_scale(precision, scale)?; @@ -70,6 +81,7 @@ impl Decimal128 { }) } + /// Return underlying value without precision and scale pub fn val(&self) -> i128 { self.value } @@ -90,6 +102,17 @@ impl Decimal128 { } } +/// The default value of Decimal128 is 0, and its precision is 1 and scale is 0. +impl Default for Decimal128 { + fn default() -> Self { + Self { + value: 0, + precision: 1, + scale: 0, + } + } +} + impl PartialEq for Decimal128 { fn eq(&self, other: &Self) -> bool { self.precision.eq(&other.precision) @@ -270,7 +293,7 @@ mod tests { #[test] fn test_common_decimal128() { - let decimal = Decimal128::new_unchecked(123456789, 9, 3); + let decimal = Decimal128::new(123456789, 9, 3); assert_eq!(decimal.to_string(), "123456.789"); let decimal = Decimal128::try_new(123456789, 9, 0); diff --git a/src/common/decimal/src/lib.rs b/src/common/decimal/src/lib.rs index 815c79fa0fad..694b32449def 100644 --- a/src/common/decimal/src/lib.rs +++ b/src/common/decimal/src/lib.rs @@ -14,3 +14,5 @@ pub mod decimal128; pub mod error; + +pub use decimal128::Decimal128; diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index 85643ea82530..4c6e4a6af99c 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -71,7 +71,8 @@ macro_rules! convert_arrow_array_to_grpc_vals { return Ok(vals); }, )+ - ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => unreachable!("Should not send {:?} in gRPC", $data_type), + // TODO(QuenKar): support gRPC for Decimal128 + ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Decimal128(_) => unreachable!("Should not send {:?} in gRPC", $data_type), } }}; } diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index f1fd406caac5..79ec3099367b 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -13,6 +13,7 @@ arrow-array.workspace = true arrow-schema.workspace = true arrow.workspace = true common-base.workspace = true +common-decimal.workspace = true common-error.workspace = true common-macro.workspace = true common-telemetry.workspace = true @@ -23,6 +24,7 @@ num = "0.4" num-traits = "0.2" ordered-float = { version = "3.0", features = ["serde"] } paste = "1.0" +rust_decimal = "1.32.0" serde.workspace = true serde_json = "1.0" snafu.workspace = true diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 938c8ba498dc..877bf47e3bf5 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -19,6 +19,8 @@ use arrow::compute::cast as arrow_array_cast; use arrow::datatypes::{ DataType as ArrowDataType, IntervalUnit as ArrowIntervalUnit, TimeUnit as ArrowTimeUnit, }; +use arrow_schema::DECIMAL_DEFAULT_SCALE; +use common_decimal::decimal128::DECIMAL128_MAX_PRECISION; use common_time::interval::IntervalUnit; use common_time::timestamp::TimeUnit; use paste::paste; @@ -27,13 +29,13 @@ use serde::{Deserialize, Serialize}; use crate::error::{self, Error, Result}; use crate::type_id::LogicalTypeId; use crate::types::{ - BinaryType, BooleanType, DateTimeType, DateType, DictionaryType, DurationMicrosecondType, - DurationMillisecondType, DurationNanosecondType, DurationSecondType, DurationType, Float32Type, - Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, IntervalDayTimeType, - IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, ListType, NullType, StringType, - TimeMillisecondType, TimeType, TimestampMicrosecondType, TimestampMillisecondType, - TimestampNanosecondType, TimestampSecondType, TimestampType, UInt16Type, UInt32Type, - UInt64Type, UInt8Type, + BinaryType, BooleanType, DateTimeType, DateType, Decimal128Type, DictionaryType, + DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType, DurationSecondType, + DurationType, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, + IntervalDayTimeType, IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, ListType, + NullType, StringType, TimeMillisecondType, TimeType, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, TimestampType, + UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; use crate::value::Value; use crate::vectors::MutableVector; @@ -56,6 +58,9 @@ pub enum ConcreteDataType { Float32(Float32Type), Float64(Float64Type), + // Decimal128 type: + Decimal128(Decimal128Type), + // String types: Binary(BinaryType), String(StringType), @@ -102,6 +107,9 @@ impl fmt::Display for ConcreteDataType { ConcreteDataType::Dictionary(_) => write!(f, "Dictionary"), ConcreteDataType::Interval(_) => write!(f, "Interval"), ConcreteDataType::Duration(_) => write!(f, "Duration"), + ConcreteDataType::Decimal128(d) => { + write!(f, "Decimal128({},{})", d.precision(), d.scale()) + } } } } @@ -150,6 +158,7 @@ impl ConcreteDataType { | ConcreteDataType::Time(_) | ConcreteDataType::Interval(_) | ConcreteDataType::Duration(_) + | ConcreteDataType::Decimal128(_) ) } @@ -183,6 +192,10 @@ impl ConcreteDataType { matches!(self, ConcreteDataType::Timestamp(_)) } + pub fn is_decimal(&self) -> bool { + matches!(self, ConcreteDataType::Decimal128(_)) + } + pub fn numerics() -> Vec { vec![ ConcreteDataType::int8_datatype(), @@ -235,6 +248,13 @@ impl ConcreteDataType { } } + pub fn as_decimal128(&self) -> Option { + match self { + ConcreteDataType::Decimal128(d) => Some(*d), + _ => None, + } + } + /// Checks if the data type can cast to another data type. pub fn can_arrow_type_cast_to(&self, to_type: &ConcreteDataType) -> bool { let array = arrow_array::new_empty_array(&self.as_arrow_type()); @@ -292,6 +312,9 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType { ArrowDataType::Duration(u) => { ConcreteDataType::Duration(DurationType::from_unit(u.into())) } + ArrowDataType::Decimal128(precision, scale) => { + ConcreteDataType::decimal128_datatype(*precision, *scale) + } _ => { return error::UnsupportedArrowTypeSnafu { arrow_type: dt.clone(), @@ -454,6 +477,14 @@ impl ConcreteDataType { ) -> ConcreteDataType { ConcreteDataType::Dictionary(DictionaryType::new(key_type, value_type)) } + + pub fn decimal128_datatype(precision: u8, scale: i8) -> ConcreteDataType { + ConcreteDataType::Decimal128(Decimal128Type::new(precision, scale)) + } + + pub fn decimal128_default_datatype() -> ConcreteDataType { + Self::decimal128_datatype(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE) + } } /// Data type abstraction. @@ -613,6 +644,14 @@ mod tests { assert!(ConcreteDataType::boolean_datatype().is_boolean()); } + #[test] + fn test_is_decimal() { + assert!(!ConcreteDataType::int32_datatype().is_decimal()); + assert!(!ConcreteDataType::float32_datatype().is_decimal()); + assert!(ConcreteDataType::decimal128_datatype(10, 2).is_decimal()); + assert!(ConcreteDataType::decimal128_datatype(18, 6).is_decimal()); + } + #[test] fn test_is_stringifiable() { assert!(!ConcreteDataType::int32_datatype().is_stringifiable()); @@ -670,6 +709,8 @@ mod tests { assert!(!ConcreteDataType::float32_datatype().is_signed()); assert!(!ConcreteDataType::float64_datatype().is_signed()); + + assert!(ConcreteDataType::decimal128_datatype(10, 2).is_signed()); } #[test] @@ -695,6 +736,7 @@ mod tests { assert!(!ConcreteDataType::duration_millisecond_datatype().is_unsigned()); assert!(!ConcreteDataType::duration_microsecond_datatype().is_unsigned()); assert!(!ConcreteDataType::duration_nanosecond_datatype().is_unsigned()); + assert!(!ConcreteDataType::decimal128_datatype(10, 2).is_unsigned()); assert!(ConcreteDataType::uint8_datatype().is_unsigned()); assert!(ConcreteDataType::uint16_datatype().is_unsigned()); @@ -808,5 +850,9 @@ mod tests { ConcreteDataType::duration_second_datatype().to_string(), "Duration" ); + assert_eq!( + ConcreteDataType::decimal128_datatype(10, 2).to_string(), + "Decimal128(10,2)" + ); } } diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index f96239bf3a36..316b50e3276c 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -122,6 +122,23 @@ pub enum Error { #[snafu(display("Failed to unpack value to given type: {}", reason))] TryFromValue { reason: String, location: Location }, + + #[snafu(display("Failed to specify the precision {} and scale {}", precision, scale))] + InvalidPrecisionOrScale { + precision: u8, + scale: i8, + #[snafu(source)] + error: arrow::error::ArrowError, + location: Location, + }, + + #[snafu(display("Value exceeds the precision {} bound", precision))] + ValueExceedsPrecision { + precision: u8, + #[snafu(source)] + error: arrow::error::ArrowError, + location: Location, + }, } impl ErrorExt for Error { diff --git a/src/datatypes/src/scalars.rs b/src/datatypes/src/scalars.rs index ceb38964fb5f..8539819a439c 100644 --- a/src/datatypes/src/scalars.rs +++ b/src/datatypes/src/scalars.rs @@ -14,6 +14,7 @@ use std::any::Any; +use common_decimal::Decimal128; use common_time::{Date, DateTime}; use crate::types::{ @@ -22,8 +23,8 @@ use crate::types::{ }; use crate::value::{ListValue, ListValueRef, Value}; use crate::vectors::{ - BinaryVector, BooleanVector, DateTimeVector, DateVector, ListVector, MutableVector, - PrimitiveVector, StringVector, Vector, + BinaryVector, BooleanVector, DateTimeVector, DateVector, Decimal128Vector, ListVector, + MutableVector, PrimitiveVector, StringVector, Vector, }; fn get_iter_capacity>(iter: &I) -> usize { @@ -277,6 +278,27 @@ impl<'a> ScalarRef<'a> for Date { } } +impl Scalar for Decimal128 { + type VectorType = Decimal128Vector; + type RefType<'a> = Decimal128; + + fn as_scalar_ref(&self) -> Self::RefType<'_> { + *self + } + + fn upcast_gat<'short, 'long: 'short>(long: Self::RefType<'long>) -> Self::RefType<'short> { + long + } +} + +impl<'a> ScalarRef<'a> for Decimal128 { + type ScalarType = Decimal128; + + fn to_owned_scalar(&self) -> Self::ScalarType { + *self + } +} + impl Scalar for DateTime { type VectorType = DateTimeVector; type RefType<'a> = DateTime; @@ -396,6 +418,13 @@ mod tests { assert_eq!(date, date.to_owned_scalar()); } + #[test] + fn test_decimal_scalar() { + let decimal = Decimal128::new(1, 1, 1); + assert_eq!(decimal, decimal.as_scalar_ref()); + assert_eq!(decimal, decimal.to_owned_scalar()); + } + #[test] fn test_datetime_scalar() { let dt = DateTime::new(123); diff --git a/src/datatypes/src/type_id.rs b/src/datatypes/src/type_id.rs index 3d8635a5fb6d..29e3065abe58 100644 --- a/src/datatypes/src/type_id.rs +++ b/src/datatypes/src/type_id.rs @@ -30,6 +30,8 @@ pub enum LogicalTypeId { Float32, Float64, + Decimal128, + // String types: String, Binary, @@ -123,6 +125,7 @@ impl LogicalTypeId { LogicalTypeId::DurationMillisecond => ConcreteDataType::duration_millisecond_datatype(), LogicalTypeId::DurationMicrosecond => ConcreteDataType::duration_microsecond_datatype(), LogicalTypeId::DurationNanosecond => ConcreteDataType::duration_nanosecond_datatype(), + LogicalTypeId::Decimal128 => ConcreteDataType::decimal128_default_datatype(), } } } diff --git a/src/datatypes/src/types.rs b/src/datatypes/src/types.rs index aaaf8655dd85..686fd9c49f10 100644 --- a/src/datatypes/src/types.rs +++ b/src/datatypes/src/types.rs @@ -17,6 +17,7 @@ mod boolean_type; pub mod cast; mod date_type; mod datetime_type; +mod decimal_type; mod dictionary_type; mod duration_type; mod interval_type; @@ -29,9 +30,10 @@ mod timestamp_type; pub use binary_type::BinaryType; pub use boolean_type::BooleanType; -pub use cast::cast_with_opt; +pub use cast::{cast, cast_with_opt}; pub use date_type::DateType; pub use datetime_type::DateTimeType; +pub use decimal_type::Decimal128Type; pub use dictionary_type::DictionaryType; pub use duration_type::{ DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType, DurationSecondType, diff --git a/src/datatypes/src/types/cast.rs b/src/datatypes/src/types/cast.rs index 7d2754f5bb2b..80ca310ac893 100644 --- a/src/datatypes/src/types/cast.rs +++ b/src/datatypes/src/types/cast.rs @@ -16,6 +16,21 @@ use crate::data_type::{ConcreteDataType, DataType}; use crate::error::{self, Error, Result}; use crate::types::TimeType; use crate::value::Value; +use crate::vectors::Helper; + +/// Used to cast the value to dest ConcreteDataType temporarily. +/// To keep the same behavior as arrow-rs. +pub fn cast(src_value: Value, dest_type: &ConcreteDataType) -> Result { + if src_value == Value::Null { + return Ok(Value::Null); + } + let src_type = src_value.data_type(); + let scalar_value = src_value.try_to_scalar_value(&src_type)?; + let new_value = Helper::try_from_scalar_value(scalar_value, 1)? + .cast(dest_type)? + .get(0); + Ok(new_value) +} /// Cast options for cast functions. #[derive(Debug, Default, Clone, PartialEq, Eq, Hash)] diff --git a/src/datatypes/src/types/decimal_type.rs b/src/datatypes/src/types/decimal_type.rs new file mode 100644 index 000000000000..edda8fe9f7eb --- /dev/null +++ b/src/datatypes/src/types/decimal_type.rs @@ -0,0 +1,75 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow_schema::DataType as ArrowDataType; +use common_decimal::Decimal128; +use serde::{Deserialize, Serialize}; + +use crate::prelude::{DataType, ScalarVectorBuilder}; +use crate::type_id::LogicalTypeId; +use crate::value::Value; +use crate::vectors::{Decimal128VectorBuilder, MutableVector}; + +/// Decimal type with precision and scale information. +#[derive( + Debug, Default, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, +)] +pub struct Decimal128Type { + precision: u8, + scale: i8, +} + +impl Decimal128Type { + pub fn new(precision: u8, scale: i8) -> Self { + Self { precision, scale } + } + + pub fn precision(&self) -> u8 { + self.precision + } + + pub fn scale(&self) -> i8 { + self.scale + } +} + +impl DataType for Decimal128Type { + fn name(&self) -> &str { + "decimal128" + } + + fn logical_type_id(&self) -> LogicalTypeId { + LogicalTypeId::Decimal128 + } + + fn default_value(&self) -> Value { + Value::Decimal128(Decimal128::default()) + } + + fn as_arrow_type(&self) -> ArrowDataType { + ArrowDataType::Decimal128(self.precision, self.scale) + } + + fn create_mutable_vector(&self, capacity: usize) -> Box { + Box::new(Decimal128VectorBuilder::with_capacity(capacity)) + } + + fn try_cast(&self, val: Value) -> Option { + match val { + Value::Null => Some(Value::Null), + Value::Decimal128(_) => Some(val), + _ => None, + } + } +} diff --git a/src/datatypes/src/types/string_type.rs b/src/datatypes/src/types/string_type.rs index febff36324a1..5f01207a6f74 100644 --- a/src/datatypes/src/types/string_type.rs +++ b/src/datatypes/src/types/string_type.rs @@ -80,6 +80,7 @@ impl DataType for StringType { Value::Time(v) => Some(Value::String(StringBytes::from(v.to_iso8601_string()))), Value::Interval(v) => Some(Value::String(StringBytes::from(v.to_iso8601_string()))), Value::Duration(v) => Some(Value::String(StringBytes::from(v.to_string()))), + Value::Decimal128(v) => Some(Value::String(StringBytes::from(v.to_string()))), // StringBytes is only support for utf-8, Value::Binary is not allowed. Value::Binary(_) | Value::List(_) => None, diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 024f45f2b36c..23135a6e44a8 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use arrow::datatypes::{DataType as ArrowDataType, Field}; use common_base::bytes::{Bytes, StringBytes}; +use common_decimal::Decimal128; use common_telemetry::logging; use common_time::date::Date; use common_time::datetime::DateTime; @@ -61,6 +62,9 @@ pub enum Value { Float32(OrderedF32), Float64(OrderedF64), + // Decimal type: + Decimal128(Decimal128), + // String types: String(StringBytes), Binary(Bytes), @@ -116,6 +120,7 @@ impl Display for Value { .join(", "); write!(f, "{}[{}]", v.datatype.name(), items) } + Value::Decimal128(v) => write!(f, "{}", v), } } } @@ -148,6 +153,7 @@ impl Value { Value::Interval(v) => ConcreteDataType::interval_datatype(v.unit()), Value::List(list) => ConcreteDataType::list_datatype(list.datatype().clone()), Value::Duration(d) => ConcreteDataType::duration_datatype(d.unit()), + Value::Decimal128(d) => ConcreteDataType::decimal128_datatype(d.precision(), d.scale()), } } @@ -192,6 +198,7 @@ impl Value { Value::Time(v) => ValueRef::Time(*v), Value::Interval(v) => ValueRef::Interval(*v), Value::Duration(v) => ValueRef::Duration(*v), + Value::Decimal128(v) => ValueRef::Decimal128(*v), } } @@ -271,6 +278,7 @@ impl Value { TimeUnit::Microsecond => LogicalTypeId::DurationMicrosecond, TimeUnit::Nanosecond => LogicalTypeId::DurationNanosecond, }, + Value::Decimal128(_) => LogicalTypeId::Decimal128, } } @@ -318,6 +326,10 @@ impl Value { IntervalUnit::MonthDayNano => ScalarValue::IntervalMonthDayNano(Some(v.to_i128())), }, Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())), + Value::Decimal128(d) => { + let (v, p, s) = d.to_scalar_value(); + ScalarValue::Decimal128(v, p, s) + } }; Ok(scalar_value) @@ -357,6 +369,7 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result time_to_scalar_value(t.unit(), None)?, ConcreteDataType::Duration(d) => duration_to_scalar_value(d.unit(), None), + ConcreteDataType::Decimal128(d) => ScalarValue::Decimal128(None, d.precision(), d.scale()), }) } @@ -533,6 +546,8 @@ impl_try_from_value!(Time, Time); impl_try_from_value!(DateTime, DateTime); impl_try_from_value!(Timestamp, Timestamp); impl_try_from_value!(Interval, Interval); +impl_try_from_value!(Duration, Duration); +impl_try_from_value!(Decimal128, Decimal128); macro_rules! impl_value_from { ($Variant: ident, $Type: ident) => { @@ -575,6 +590,7 @@ impl_value_from!(Timestamp, Timestamp); impl_value_from!(Interval, Interval); impl_value_from!(Duration, Duration); impl_value_from!(String, String); +impl_value_from!(Decimal128, Decimal128); impl From<&str> for Value { fn from(string: &str) -> Value { @@ -620,6 +636,7 @@ impl TryFrom for serde_json::Value { Value::Time(v) => serde_json::to_value(v.value())?, Value::Interval(v) => serde_json::to_value(v.to_i128())?, Value::Duration(v) => serde_json::to_value(v.value())?, + Value::Decimal128(v) => serde_json::to_value(v.to_string())?, }; Ok(json_value) @@ -840,6 +857,7 @@ impl From> for Value { ValueRef::Interval(v) => Value::Interval(v), ValueRef::Duration(v) => Value::Duration(v), ValueRef::List(v) => v.to_value(), + ValueRef::Decimal128(v) => Value::Decimal128(v), } } } @@ -862,6 +880,9 @@ pub enum ValueRef<'a> { Float32(OrderedF32), Float64(OrderedF64), + // Decimal type: + Decimal128(Decimal128), + // String types: String(&'a str), Binary(&'a [u8]), @@ -1003,6 +1024,11 @@ impl<'a> ValueRef<'a> { pub fn as_list(&self) -> Result> { impl_as_for_value_ref!(self, List) } + + /// Cast itself to [Decimal128]. + pub fn as_decimal128(&self) -> Result> { + impl_as_for_value_ref!(self, Decimal128) + } } impl<'a> PartialOrd for ValueRef<'a> { @@ -1053,6 +1079,7 @@ impl_value_ref_from!(Timestamp, Timestamp); impl_value_ref_from!(Time, Time); impl_value_ref_from!(Interval, Interval); impl_value_ref_from!(Duration, Duration); +impl_value_ref_from!(Decimal128, Decimal128); impl<'a> From<&'a str> for ValueRef<'a> { fn from(string: &'a str) -> ValueRef<'a> { @@ -1143,6 +1170,7 @@ impl<'a> ValueRef<'a> { ValueRef::Time(_) => 16, ValueRef::Duration(_) => 16, ValueRef::Interval(_) => 24, + ValueRef::Decimal128(_) => 32, ValueRef::List(v) => match v { ListValueRef::Indexed { vector, .. } => vector.memory_size() / vector.len(), ListValueRef::Ref { val } => val.estimated_size(), @@ -2227,8 +2255,6 @@ mod tests { #[test] fn test_value_ref_estimated_size() { - assert_eq!(std::mem::size_of::(), 24); - check_value_ref_size_eq(&ValueRef::Boolean(true), 1); check_value_ref_size_eq(&ValueRef::UInt8(1), 1); check_value_ref_size_eq(&ValueRef::UInt16(1), 2); @@ -2304,6 +2330,7 @@ mod tests { idx: 2, }), 85, - ) + ); + check_value_ref_size_eq(&ValueRef::Decimal128(Decimal128::new(1234, 3, 1)), 32) } } diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index d69d1bb82926..4bfcb82e76ea 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -30,6 +30,7 @@ mod boolean; mod constant; mod date; mod datetime; +mod decimal; mod duration; mod eq; mod helper; @@ -48,6 +49,7 @@ pub use boolean::{BooleanVector, BooleanVectorBuilder}; pub use constant::ConstantVector; pub use date::{DateVector, DateVectorBuilder}; pub use datetime::{DateTimeVector, DateTimeVectorBuilder}; +pub use decimal::{Decimal128Vector, Decimal128VectorBuilder}; pub use duration::{ DurationMicrosecondVector, DurationMicrosecondVectorBuilder, DurationMillisecondVector, DurationMillisecondVectorBuilder, DurationNanosecondVector, DurationNanosecondVectorBuilder, diff --git a/src/datatypes/src/vectors/decimal.rs b/src/datatypes/src/vectors/decimal.rs new file mode 100644 index 000000000000..bb3402470bd1 --- /dev/null +++ b/src/datatypes/src/vectors/decimal.rs @@ -0,0 +1,521 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::fmt::Debug; +use std::sync::Arc; + +use arrow_array::builder::{ArrayBuilder, Decimal128Builder}; +use arrow_array::iterator::ArrayIter; +use arrow_array::{Array, ArrayRef, Decimal128Array}; +use common_decimal::decimal128::{DECIMAL128_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION}; +use common_decimal::Decimal128; +use snafu::{OptionExt, ResultExt}; + +use super::{MutableVector, Validity, Vector, VectorRef}; +use crate::arrow::datatypes::DataType as ArrowDataType; +use crate::data_type::ConcreteDataType; +use crate::error::{ + self, CastTypeSnafu, InvalidPrecisionOrScaleSnafu, Result, ValueExceedsPrecisionSnafu, +}; +use crate::prelude::{ScalarVector, ScalarVectorBuilder}; +use crate::serialize::Serializable; +use crate::value::{Value, ValueRef}; +use crate::vectors; + +/// Decimal128Vector is a vector keep i128 values with precision and scale. +#[derive(Debug, PartialEq)] +pub struct Decimal128Vector { + array: Decimal128Array, +} + +impl Decimal128Vector { + /// New a Decimal128Vector from Arrow Decimal128Array + pub fn new(array: Decimal128Array) -> Self { + Self { array } + } + + /// Construct Vector from i128 values + pub fn from_values>(iter: I) -> Self { + Self { + array: Decimal128Array::from_iter_values(iter), + } + } + + /// Construct Vector from i128 values slice + pub fn from_slice>(slice: P) -> Self { + let iter = slice.as_ref().iter().copied(); + Self { + array: Decimal128Array::from_iter_values(iter), + } + } + + /// Construct Vector from Wrapper(Decimal128) values slice + pub fn from_wrapper_slice>(slice: P) -> Self { + let iter = slice.as_ref().iter().copied().map(|v| v.val()); + Self { + array: Decimal128Array::from_iter_values(iter), + } + } + + /// Get decimal128 value from vector by offset and length. + pub fn get_slice(&self, offset: usize, length: usize) -> Self { + let array = self.array.slice(offset, length); + Self { array } + } + + /// Returns a Decimal vector with the same data as self, with the + /// specified precision and scale(should in Decimal128 range), + /// and return error if value is out of precision bound. + /// + /// + /// For example: + /// value = 12345, precision = 3, return error. + pub fn with_precision_and_scale(self, precision: u8, scale: i8) -> Result { + // validate if precision is too small + self.validate_decimal_precision(precision)?; + let array = self + .array + .with_precision_and_scale(precision, scale) + .context(InvalidPrecisionOrScaleSnafu { precision, scale })?; + Ok(Self { array }) + } + + /// Returns a Decimal vector with the same data as self, with the + /// specified precision and scale(should in Decimal128 range), + /// and return null if value is out of precision bound. + /// + /// For example: + /// value = 12345, precision = 3, the value will be casted to null. + pub fn with_precision_and_scale_to_null(self, precision: u8, scale: i8) -> Result { + self.null_if_overflow_precision(precision) + .with_precision_and_scale(precision, scale) + } + + /// Return decimal value as string + pub fn value_as_string(&self, idx: usize) -> String { + self.array.value_as_string(idx) + } + + /// Return decimal128 vector precision + pub fn precision(&self) -> u8 { + self.array.precision() + } + + /// Return decimal128 vector scale + pub fn scale(&self) -> i8 { + self.array.scale() + } + + /// Return decimal128 vector inner array + pub(crate) fn as_arrow(&self) -> &dyn Array { + &self.array + } + + /// Validate decimal precision, if precision is invalid, return error. + fn validate_decimal_precision(&self, precision: u8) -> Result<()> { + self.array + .validate_decimal_precision(precision) + .context(ValueExceedsPrecisionSnafu { precision }) + } + + /// Values that exceed the precision bounds will be casted to Null. + fn null_if_overflow_precision(&self, precision: u8) -> Self { + Self { + array: self.array.null_if_overflow_precision(precision), + } + } + + /// Get decimal128 Value from array by index. + fn get_decimal128_value_from_array(&self, index: usize) -> Option { + if self.array.is_valid(index) { + // Safety: The index have been checked by `is_valid()`. + let value = unsafe { self.array.value_unchecked(index) }; + // Safety: The precision and scale have been checked by Vector. + Some(Decimal128::new(value, self.precision(), self.scale())) + } else { + None + } + } +} + +impl Vector for Decimal128Vector { + fn data_type(&self) -> ConcreteDataType { + if let ArrowDataType::Decimal128(p, s) = self.array.data_type() { + ConcreteDataType::decimal128_datatype(*p, *s) + } else { + ConcreteDataType::decimal128_default_datatype() + } + } + + fn vector_type_name(&self) -> String { + "Decimal128Vector".to_string() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn len(&self) -> usize { + self.array.len() + } + + fn to_arrow_array(&self) -> ArrayRef { + Arc::new(self.array.clone()) + } + + fn to_boxed_arrow_array(&self) -> Box { + Box::new(self.array.clone()) + } + + fn validity(&self) -> Validity { + vectors::impl_validity_for_vector!(self.array) + } + + fn memory_size(&self) -> usize { + self.array.get_buffer_memory_size() + } + + fn null_count(&self) -> usize { + self.array.null_count() + } + + fn is_null(&self, row: usize) -> bool { + self.array.is_null(row) + } + + fn slice(&self, offset: usize, length: usize) -> VectorRef { + let array = self.array.slice(offset, length); + Arc::new(Self { array }) + } + + fn get(&self, index: usize) -> Value { + if let Some(decimal) = self.get_decimal128_value_from_array(index) { + Value::Decimal128(decimal) + } else { + Value::Null + } + } + + fn get_ref(&self, index: usize) -> ValueRef { + if let Some(decimal) = self.get_decimal128_value_from_array(index) { + ValueRef::Decimal128(decimal) + } else { + ValueRef::Null + } + } +} + +impl From for Decimal128Vector { + fn from(array: Decimal128Array) -> Self { + Self { array } + } +} + +impl From>> for Decimal128Vector { + fn from(vec: Vec>) -> Self { + let array = Decimal128Array::from_iter(vec); + Self { array } + } +} + +impl Serializable for Decimal128Vector { + fn serialize_to_json(&self) -> Result> { + self.iter_data() + .map(|v| match v { + None => Ok(serde_json::Value::Null), // if decimal vector not present, map to NULL + Some(d) => serde_json::to_value(d), + }) + .collect::>() + .context(error::SerializeSnafu) + } +} + +pub struct Decimal128Iter<'a> { + precision: u8, + scale: i8, + iter: ArrayIter<&'a Decimal128Array>, +} + +impl<'a> Iterator for Decimal128Iter<'a> { + type Item = Option; + + fn next(&mut self) -> Option { + Some( + self.iter + .next() + .and_then(|v| v.map(|v| Decimal128::new(v, self.precision, self.scale))), + ) + } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } +} + +impl ScalarVector for Decimal128Vector { + type OwnedItem = Decimal128; + + type RefItem<'a> = Decimal128; + + type Iter<'a> = Decimal128Iter<'a>; + + type Builder = Decimal128VectorBuilder; + + fn get_data(&self, idx: usize) -> Option> { + self.get_decimal128_value_from_array(idx) + } + + fn iter_data(&self) -> Self::Iter<'_> { + Self::Iter { + precision: self.precision(), + scale: self.scale(), + iter: self.array.iter(), + } + } +} + +pub struct Decimal128VectorBuilder { + precision: u8, + scale: i8, + mutable_array: Decimal128Builder, +} + +impl MutableVector for Decimal128VectorBuilder { + fn data_type(&self) -> ConcreteDataType { + ConcreteDataType::decimal128_datatype(self.precision, self.scale) + } + + fn len(&self) -> usize { + self.mutable_array.len() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } + + fn to_vector(&mut self) -> VectorRef { + Arc::new(self.finish()) + } + + fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> { + let decimal_val = value.as_decimal128()?.map(|v| v.val()); + self.mutable_array.append_option(decimal_val); + Ok(()) + } + + fn push_null(&mut self) { + self.mutable_array.append_null(); + } + + fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> { + let decimal_vector = + vector + .as_any() + .downcast_ref::() + .context(CastTypeSnafu { + msg: format!( + "Failed to cast vector from {} to Decimal128Vector", + vector.vector_type_name(), + ), + })?; + let slice = decimal_vector.get_slice(offset, length); + self.mutable_array + .extend(slice.iter_data().map(|v| v.map(|d| d.val()))); + Ok(()) + } +} + +impl ScalarVectorBuilder for Decimal128VectorBuilder { + type VectorType = Decimal128Vector; + + fn with_capacity(capacity: usize) -> Self { + Self { + precision: DECIMAL128_MAX_PRECISION, + scale: DECIMAL128_DEFAULT_SCALE, + mutable_array: Decimal128Builder::with_capacity(capacity), + } + } + + fn push(&mut self, value: Option<::RefItem<'_>>) { + self.mutable_array.append_option(value.map(|v| v.val())); + } + + fn finish(&mut self) -> Self::VectorType { + Decimal128Vector { + array: self.mutable_array.finish(), + } + } +} + +impl Decimal128VectorBuilder { + /// Change the precision and scale of the Decimal128VectorBuilder. + pub fn with_precision_and_scale(self, precision: u8, scale: i8) -> Result { + let mutable_array = self + .mutable_array + .with_precision_and_scale(precision, scale) + .context(InvalidPrecisionOrScaleSnafu { precision, scale })?; + Ok(Self { + precision, + scale, + mutable_array, + }) + } +} + +vectors::impl_try_from_arrow_array_for_vector!(Decimal128Array, Decimal128Vector); + +#[cfg(test)] +pub mod tests { + use arrow_array::Decimal128Array; + use common_decimal::Decimal128; + + use super::*; + use crate::vectors::operations::VectorOp; + use crate::vectors::Int8Vector; + + #[test] + fn test_from_arrow_decimal128_array() { + let decimal_array = Decimal128Array::from(vec![Some(123), Some(456)]); + let decimal_vector = Decimal128Vector::from(decimal_array); + let expect = Decimal128Vector::from_values(vec![123, 456]); + + assert_eq!(decimal_vector, expect); + } + + #[test] + fn test_from_slice() { + let decimal_vector = Decimal128Vector::from_slice([123, 456]); + let decimal_vector2 = Decimal128Vector::from_wrapper_slice([ + Decimal128::new(123, 10, 2), + Decimal128::new(456, 10, 2), + ]); + let expect = Decimal128Vector::from_values(vec![123, 456]); + + assert_eq!(decimal_vector, expect); + assert_eq!(decimal_vector2, expect); + } + + #[test] + fn test_decimal128_vector_slice() { + let data = vec![100, 200, 300]; + // create a decimal vector + let decimal_vector = Decimal128Vector::from_values(data.clone()) + .with_precision_and_scale(10, 2) + .unwrap(); + let decimal_vector2 = decimal_vector.slice(1, 2); + assert_eq!(decimal_vector2.len(), 2); + assert_eq!( + decimal_vector2.get(0), + Value::Decimal128(Decimal128::new(200, 10, 2)) + ); + assert_eq!( + decimal_vector2.get(1), + Value::Decimal128(Decimal128::new(300, 10, 2)) + ); + } + + #[test] + fn test_decimal128_vector_basic() { + let data = vec![100, 200, 300]; + // create a decimal vector + let decimal_vector = Decimal128Vector::from_values(data.clone()) + .with_precision_and_scale(10, 2) + .unwrap(); + + // can use value_of_string(idx) get a decimal string + assert_eq!(decimal_vector.value_as_string(0), "1.00"); + + // iterator for-loop + for i in 0..data.len() { + assert_eq!( + decimal_vector.get_data(i), + Some(Decimal128::new((i + 1) as i128 * 100, 10, 2)) + ); + assert_eq!( + decimal_vector.get(i), + Value::Decimal128(Decimal128::new((i + 1) as i128 * 100, 10, 2)) + ); + assert_eq!( + decimal_vector.get_ref(i), + ValueRef::Decimal128(Decimal128::new((i + 1) as i128 * 100, 10, 2)) + ); + } + + // directly convert vector with precision = 2 and scale = 1, + // then all of value will be null because of precision. + let decimal_vector = decimal_vector + .with_precision_and_scale_to_null(2, 1) + .unwrap(); + assert_eq!(decimal_vector.len(), 3); + assert!(decimal_vector.is_null(0)); + assert!(decimal_vector.is_null(1)); + assert!(decimal_vector.is_null(2)); + } + + #[test] + fn test_decimal128_vector_builder() { + let mut decimal_builder = Decimal128VectorBuilder::with_capacity(3) + .with_precision_and_scale(10, 2) + .unwrap(); + decimal_builder.push(Some(Decimal128::new(100, 10, 2))); + decimal_builder.push(Some(Decimal128::new(200, 10, 2))); + decimal_builder.push(Some(Decimal128::new(300, 10, 2))); + let decimal_vector = decimal_builder.finish(); + assert_eq!(decimal_vector.len(), 3); + assert_eq!(decimal_vector.precision(), 10); + assert_eq!(decimal_vector.scale(), 2); + assert_eq!( + decimal_vector.get(0), + Value::Decimal128(Decimal128::new(100, 10, 2)) + ); + assert_eq!( + decimal_vector.get(1), + Value::Decimal128(Decimal128::new(200, 10, 2)) + ); + assert_eq!( + decimal_vector.get(2), + Value::Decimal128(Decimal128::new(300, 10, 2)) + ); + + // push value error + let mut decimal_builder = Decimal128VectorBuilder::with_capacity(3); + decimal_builder.push(Some(Decimal128::new(123, 38, 10))); + decimal_builder.push(Some(Decimal128::new(1234, 38, 10))); + decimal_builder.push(Some(Decimal128::new(12345, 38, 10))); + let decimal_vector = decimal_builder.finish(); + assert_eq!(decimal_vector.precision(), 38); + assert_eq!(decimal_vector.scale(), 10); + let result = decimal_vector.with_precision_and_scale(3, 2); + assert_eq!( + "Value exceeds the precision 3 bound", + result.unwrap_err().to_string() + ); + } + + #[test] + fn test_cast_to_decimal128() { + let vector = Int8Vector::from_values(vec![1, 2, 3, 4, 100]); + let casted_vector = vector.cast(&ConcreteDataType::decimal128_datatype(3, 1)); + assert!(casted_vector.is_ok()); + let vector = casted_vector.unwrap(); + let array = vector.as_any().downcast_ref::().unwrap(); + // because 100 is out of Decimal(3, 1) range, so it will be null + assert!(array.is_null(4)); + } +} diff --git a/src/datatypes/src/vectors/eq.rs b/src/datatypes/src/vectors/eq.rs index f9c02f76a4f5..fcf97515ee27 100644 --- a/src/datatypes/src/vectors/eq.rs +++ b/src/datatypes/src/vectors/eq.rs @@ -20,12 +20,12 @@ use crate::data_type::DataType; use crate::types::{DurationType, TimeType, TimestampType}; use crate::vectors::constant::ConstantVector; use crate::vectors::{ - BinaryVector, BooleanVector, DateTimeVector, DateVector, DurationMicrosecondVector, - DurationMillisecondVector, DurationNanosecondVector, DurationSecondVector, - IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector, ListVector, - PrimitiveVector, StringVector, TimeMicrosecondVector, TimeMillisecondVector, - TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, TimestampMillisecondVector, - TimestampNanosecondVector, TimestampSecondVector, Vector, + BinaryVector, BooleanVector, DateTimeVector, DateVector, Decimal128Vector, + DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector, + DurationSecondVector, IntervalDayTimeVector, IntervalMonthDayNanoVector, + IntervalYearMonthVector, ListVector, PrimitiveVector, StringVector, TimeMicrosecondVector, + TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, + TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, Vector, }; use crate::with_match_primitive_type_id; @@ -151,6 +151,9 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool { is_vector_eq!(DurationNanosecondVector, lhs, rhs) } }, + Decimal128(_) => { + is_vector_eq!(Decimal128Vector, lhs, rhs) + } } } @@ -242,6 +245,9 @@ mod tests { assert_vector_ref_eq(Arc::new(DurationMillisecondVector::from_values([300, 310]))); assert_vector_ref_eq(Arc::new(DurationMicrosecondVector::from_values([300, 310]))); assert_vector_ref_eq(Arc::new(DurationNanosecondVector::from_values([300, 310]))); + assert_vector_ref_eq(Arc::new(Decimal128Vector::from_values(vec![ + 1i128, 2i128, 3i128, + ]))); } #[test] @@ -312,5 +318,10 @@ mod tests { Arc::new(DurationSecondVector::from_values([300, 310])), Arc::new(DurationSecondVector::from_values([300, 320])), ); + + assert_vector_ref_ne( + Arc::new(Decimal128Vector::from_values([300i128, 310i128])), + Arc::new(Decimal128Vector::from_values([300i128, 320i128])), + ); } } diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs index 0bd5cd9d891f..f37048838cc8 100644 --- a/src/datatypes/src/vectors/helper.rs +++ b/src/datatypes/src/vectors/helper.rs @@ -25,7 +25,10 @@ use arrow_schema::IntervalUnit; use datafusion_common::ScalarValue; use snafu::{OptionExt, ResultExt}; -use super::{IntervalDayTimeVector, IntervalYearMonthVector}; +use super::{ + Decimal128Vector, DurationMicrosecondVector, DurationMillisecondVector, + DurationNanosecondVector, DurationSecondVector, IntervalDayTimeVector, IntervalYearMonthVector, +}; use crate::data_type::ConcreteDataType; use crate::error::{self, Result}; use crate::scalars::{Scalar, ScalarVectorBuilder}; @@ -218,12 +221,23 @@ impl Helper { ScalarValue::IntervalMonthDayNano(v) => { ConstantVector::new(Arc::new(IntervalMonthDayNanoVector::from(vec![v])), length) } - ScalarValue::Decimal128(_, _, _) - | ScalarValue::Decimal256(_, _, _) - | ScalarValue::DurationSecond(_) - | ScalarValue::DurationMillisecond(_) - | ScalarValue::DurationMicrosecond(_) - | ScalarValue::DurationNanosecond(_) + ScalarValue::DurationSecond(v) => { + ConstantVector::new(Arc::new(DurationSecondVector::from(vec![v])), length) + } + ScalarValue::DurationMillisecond(v) => { + ConstantVector::new(Arc::new(DurationMillisecondVector::from(vec![v])), length) + } + ScalarValue::DurationMicrosecond(v) => { + ConstantVector::new(Arc::new(DurationMicrosecondVector::from(vec![v])), length) + } + ScalarValue::DurationNanosecond(v) => { + ConstantVector::new(Arc::new(DurationNanosecondVector::from(vec![v])), length) + } + ScalarValue::Decimal128(v, p, s) => { + let vector = Decimal128Vector::from(vec![v]).with_precision_and_scale(p, s)?; + ConstantVector::new(Arc::new(vector), length) + } + ScalarValue::Decimal256(_, _, _) | ScalarValue::Struct(_, _) | ScalarValue::Dictionary(_, _) => { return error::ConversionSnafu { @@ -318,14 +332,29 @@ impl Helper { IntervalMonthDayNanoVector::try_from_arrow_interval_array(array)?, ), }, + ArrowDataType::Duration(unit) => match unit { + TimeUnit::Second => { + Arc::new(DurationSecondVector::try_from_arrow_duration_array(array)?) + } + TimeUnit::Millisecond => Arc::new( + DurationMillisecondVector::try_from_arrow_duration_array(array)?, + ), + TimeUnit::Microsecond => Arc::new( + DurationMicrosecondVector::try_from_arrow_duration_array(array)?, + ), + TimeUnit::Nanosecond => Arc::new( + DurationNanosecondVector::try_from_arrow_duration_array(array)?, + ), + }, + ArrowDataType::Decimal128(_, _) => { + Arc::new(Decimal128Vector::try_from_arrow_array(array)?) + } ArrowDataType::Float16 - | ArrowDataType::Duration(_) | ArrowDataType::LargeList(_) | ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Struct(_) | ArrowDataType::Union(_, _) | ArrowDataType::Dictionary(_, _) - | ArrowDataType::Decimal128(_, _) | ArrowDataType::Decimal256(_, _) | ArrowDataType::Map(_, _) | ArrowDataType::RunEndEncoded(_, _) => { @@ -375,8 +404,10 @@ mod tests { }; use arrow::datatypes::{Field, Int32Type}; use arrow_array::DictionaryArray; + use common_decimal::Decimal128; use common_time::time::Time; - use common_time::{Date, DateTime, Interval}; + use common_time::timestamp::TimeUnit; + use common_time::{Date, DateTime, Duration, Interval}; use super::*; use crate::value::Value; @@ -428,6 +459,37 @@ mod tests { } } + #[test] + fn test_try_from_scalar_duration_value() { + let vector = + Helper::try_from_scalar_value(ScalarValue::DurationSecond(Some(42)), 3).unwrap(); + assert_eq!( + ConcreteDataType::duration_second_datatype(), + vector.data_type() + ); + assert_eq!(3, vector.len()); + for i in 0..vector.len() { + assert_eq!( + Value::Duration(Duration::new(42, TimeUnit::Second)), + vector.get(i) + ); + } + } + + #[test] + fn test_try_from_scalar_decimal128_value() { + let vector = + Helper::try_from_scalar_value(ScalarValue::Decimal128(Some(42), 3, 1), 3).unwrap(); + assert_eq!( + ConcreteDataType::decimal128_datatype(3, 1), + vector.data_type() + ); + assert_eq!(3, vector.len()); + for i in 0..vector.len() { + assert_eq!(Value::Decimal128(Decimal128::new(42, 3, 1)), vector.get(i)); + } + } + #[test] fn test_try_from_list_value() { let value = ScalarValue::List( diff --git a/src/datatypes/src/vectors/operations.rs b/src/datatypes/src/vectors/operations.rs index 748bcd3ff5ce..b2de83c6e6f3 100644 --- a/src/datatypes/src/vectors/operations.rs +++ b/src/datatypes/src/vectors/operations.rs @@ -24,8 +24,8 @@ use crate::error::{self, Result}; use crate::types::LogicalPrimitiveType; use crate::vectors::constant::ConstantVector; use crate::vectors::{ - BinaryVector, BooleanVector, ConcreteDataType, ListVector, NullVector, PrimitiveVector, - StringVector, UInt32Vector, Vector, VectorRef, + BinaryVector, BooleanVector, ConcreteDataType, Decimal128Vector, ListVector, NullVector, + PrimitiveVector, StringVector, UInt32Vector, Vector, VectorRef, }; /// Vector compute operations. @@ -99,7 +99,13 @@ macro_rules! impl_scalar_vector_op { )+}; } -impl_scalar_vector_op!(BinaryVector, BooleanVector, ListVector, StringVector); +impl_scalar_vector_op!( + BinaryVector, + BooleanVector, + ListVector, + StringVector, + Decimal128Vector +); impl VectorOp for PrimitiveVector { fn replicate(&self, offsets: &[usize]) -> VectorRef { diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 58477ee30f01..7f340b04a724 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -22,6 +22,7 @@ chrono.workspace = true common-base.workspace = true common-catalog.workspace = true common-datasource.workspace = true +common-decimal.workspace = true common-error.workspace = true common-macro.workspace = true common-procedure.workspace = true diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index f6b3119e3f9f..4cc6fd3274ac 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -14,6 +14,7 @@ use bytes::Buf; use common_base::bytes::Bytes; +use common_decimal::Decimal128; use common_time::time::Time; use common_time::{Date, Duration, Interval}; use datatypes::data_type::ConcreteDataType; @@ -74,6 +75,7 @@ impl SortField { ConcreteDataType::Time(_) => 10, ConcreteDataType::Duration(_) => 10, ConcreteDataType::Interval(_) => 18, + ConcreteDataType::Decimal128(_) => 19, ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => 0, @@ -138,7 +140,8 @@ impl SortField { DateTime, datetime, Time, time, Interval, interval, - Duration, duration + Duration, duration, + Decimal128, decimal128 ); Ok(()) @@ -204,7 +207,8 @@ impl SortField { Time, Time, DateTime, DateTime, Interval, Interval, - Duration, Duration + Duration, Duration, + Decimal128, Decimal128 ) } } diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index a358df1c14be..6c909deb41b3 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -200,6 +200,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { } Value::Time(v) => row_writer .write_col(v.to_timezone_aware_string(query_context.time_zone()))?, + Value::Decimal128(v) => row_writer.write_col(v.to_string())?, } } row_writer.end_row().await?; @@ -246,6 +247,7 @@ pub(crate) fn create_mysql_column( ConcreteDataType::DateTime(_) => Ok(ColumnType::MYSQL_TYPE_DATETIME), ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR), ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME), + ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL), _ => error::InternalSnafu { err_msg: format!("not implemented for column datatype {:?}", data_type), } diff --git a/src/servers/src/postgres/types.rs b/src/servers/src/postgres/types.rs index f33b42ebd618..fd75d0c1cf12 100644 --- a/src/servers/src/postgres/types.rs +++ b/src/servers/src/postgres/types.rs @@ -103,6 +103,7 @@ pub(super) fn encode_value(value: &Value, builder: &mut DataRowEncoder) -> PgWir } } Value::Interval(v) => builder.encode_field(&PgInterval::from(*v)), + Value::Decimal128(v) => builder.encode_field(&v.to_string()), Value::List(_) | Value::Duration(_) => { Err(PgWireError::ApiError(Box::new(Error::Internal { err_msg: format!( @@ -131,6 +132,7 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result { &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP), &ConcreteDataType::Time(_) => Ok(Type::TIME), &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL), + &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC), &ConcreteDataType::Duration(_) | &ConcreteDataType::List(_) | &ConcreteDataType::Dictionary(_) => error::InternalSnafu { diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 17ec57afd3ba..d72a66721fd1 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true api.workspace = true common-base.workspace = true common-catalog.workspace = true +common-decimal.workspace = true common-error.workspace = true common-macro.workspace = true common-query.workspace = true diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 69d3b987ece3..eae6551c30e1 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -39,11 +39,11 @@ use common_time::Timestamp; use datatypes::prelude::ConcreteDataType; use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN}; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, COMMENT_KEY}; -use datatypes::types::cast::CastOption; use datatypes::types::{cast, TimestampType}; use datatypes::value::{OrderedF32, OrderedF64, Value}; pub use option_map::OptionMap; use snafu::{ensure, OptionExt, ResultExt}; +use sqlparser::ast::ExactNumberInfo; pub use transform::{get_data_type_by_alias_name, transform_statements}; use crate::ast::{ @@ -146,6 +146,7 @@ macro_rules! parse_number_to_value { let n = parse_sql_number::($n)?; Ok(Value::Timestamp(Timestamp::new(n, t.unit()))) }, + // TODO(QuenKar): parse decimal128 string with precision and scale _ => ParseSqlValueSnafu { msg: format!("Fail to parse number {}, invalid column type: {:?}", @@ -223,11 +224,9 @@ pub fn sql_value_to_value( } }; if value.data_type() != *data_type { - cast::cast_with_opt(value, data_type, &CastOption { strict: true }).with_context(|_| { - InvalidCastSnafu { - sql_value: sql_val.clone(), - datatype: data_type, - } + cast(value, data_type).with_context(|_| InvalidCastSnafu { + sql_value: sql_val.clone(), + datatype: data_type, }) } else { Ok(value) @@ -412,6 +411,15 @@ pub fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result Ok(ConcreteDataType::interval_month_day_nano_datatype()), + SqlDataType::Decimal(exact_info) => match exact_info { + ExactNumberInfo::None => Ok(ConcreteDataType::decimal128_default_datatype()), + // refer to https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html + // In standard SQL, the syntax DECIMAL(M) is equivalent to DECIMAL(M,0). + ExactNumberInfo::Precision(p) => Ok(ConcreteDataType::decimal128_datatype(*p as u8, 0)), + ExactNumberInfo::PrecisionAndScale(p, s) => { + Ok(ConcreteDataType::decimal128_datatype(*p as u8, *s as i8)) + } + }, _ => error::SqlTypeNotSupportedSnafu { t: data_type.clone(), } @@ -445,6 +453,9 @@ pub fn concrete_data_type_to_sql_data_type(data_type: &ConcreteDataType) -> Resu )), ConcreteDataType::Interval(_) => Ok(SqlDataType::Interval), ConcreteDataType::Binary(_) => Ok(SqlDataType::Varbinary(None)), + ConcreteDataType::Decimal128(d) => Ok(SqlDataType::Decimal( + ExactNumberInfo::PrecisionAndScale(d.precision() as u64, d.scale() as u64), + )), ConcreteDataType::Duration(_) | ConcreteDataType::Null(_) | ConcreteDataType::List(_) diff --git a/tests/cases/standalone/common/order/order_variable_size_payload.result b/tests/cases/standalone/common/order/order_variable_size_payload.result index 8fc3593ce539..becf9aa5c14f 100644 --- a/tests/cases/standalone/common/order/order_variable_size_payload.result +++ b/tests/cases/standalone/common/order/order_variable_size_payload.result @@ -9,17 +9,17 @@ Affected Rows: 7 -- SQLNESS SORT_RESULT 2 2 SELECT * FROM t0 ORDER BY t0.c0 DESC; -+------+-------------------------+ -| c0 | t | -+------+-------------------------+ -| null | 1970-01-01T00:00:00.002 | -| null | 1970-01-01T00:00:00.003 | -| null | 1970-01-01T00:00:00.004 | -| null | 1970-01-01T00:00:00.005 | -| null | 1970-01-01T00:00:00.006 | -| null | 1970-01-01T00:00:00.007 | -| a | 1970-01-01T00:00:00.001 | -+------+-------------------------+ ++----+-------------------------+ +| c0 | t | ++----+-------------------------+ +| | 1970-01-01T00:00:00.002 | +| | 1970-01-01T00:00:00.003 | +| | 1970-01-01T00:00:00.004 | +| | 1970-01-01T00:00:00.005 | +| | 1970-01-01T00:00:00.006 | +| | 1970-01-01T00:00:00.007 | +| a | 1970-01-01T00:00:00.001 | ++----+-------------------------+ CREATE TABLE test0 (job VARCHAR, host VARCHAR, t TIMESTAMP TIME INDEX); @@ -75,7 +75,7 @@ SELECT * FROM test1 ORDER BY s; | 3555555555552 | 1970-01-01T00:00:00.003 | | 3555555555553 | 1970-01-01T00:00:00.007 | | 355555555556 | 1970-01-01T00:00:00.005 | -| null | 1970-01-01T00:00:00.002 | +| | 1970-01-01T00:00:00.002 | +---------------+-------------------------+ CREATE TABLE test4 (i INT, j INT, t TIMESTAMP TIME INDEX); @@ -342,7 +342,7 @@ select i, split_part(s, 'b', 1) from test8 order by i; | i | split_part(test8.s,Utf8("b"),Int64(1)) | +---+----------------------------------------+ | 1 | cc | -| 2 | null | +| 2 | | | 3 | a | | | d | +---+----------------------------------------+