diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index 9ce78be2632c..870573679370 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -53,7 +53,7 @@ impl TryFrom for RowInsertRequests { // tags if let Some(tags) = tags { - let kvs = tags.iter().map(|(k, v)| (k.to_string(), v.as_str())); + let kvs = tags.iter().map(|(k, v)| (k.to_string(), v.to_string())); row_writer::write_tags(table_data, kvs, &mut one_row)?; } diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 3c114f335d13..d4017cd4e107 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -303,7 +303,7 @@ pub fn to_grpc_row_insert_requests(request: WriteRequest) -> Result<(RowInsertRe let mut multi_table_data = MultiTableData::new(); - for series in &request.timeseries { + for series in request.timeseries { let table_name = &series .labels .iter() @@ -326,29 +326,54 @@ pub fn to_grpc_row_insert_requests(request: WriteRequest) -> Result<(RowInsertRe series.samples.len(), ); - for Sample { value, timestamp } in &series.samples { + // labels + let kvs = series.labels.into_iter().filter_map(|label| { + if label.name == METRIC_NAME_LABEL { + None + } else { + Some((label.name, label.value)) + } + }); + + if series.samples.len() == 1 { let mut one_row = table_data.alloc_one_row(); - // labels - let kvs = series.labels.iter().filter_map(|label| { - if label.name == METRIC_NAME_LABEL { - None - } else { - Some((label.name.to_string(), label.value.as_str())) - } - }); row_writer::write_tags(table_data, kvs, &mut one_row)?; // value - row_writer::write_f64(table_data, GREPTIME_VALUE, *value, &mut one_row)?; + row_writer::write_f64( + table_data, + GREPTIME_VALUE, + series.samples[0].value, + &mut one_row, + )?; // timestamp row_writer::write_ts_millis( table_data, GREPTIME_TIMESTAMP, - Some(*timestamp), + Some(series.samples[0].timestamp), &mut one_row, )?; table_data.add_row(one_row); + } else { + for Sample { value, timestamp } in &series.samples { + let mut one_row = table_data.alloc_one_row(); + + // labels + let kvs = kvs.clone(); + row_writer::write_tags(table_data, kvs, &mut one_row)?; + // value + row_writer::write_f64(table_data, GREPTIME_VALUE, *value, &mut one_row)?; + // timestamp + row_writer::write_ts_millis( + table_data, + GREPTIME_TIMESTAMP, + Some(*timestamp), + &mut one_row, + )?; + + table_data.add_row(one_row); + } } } diff --git a/src/servers/src/row_writer.rs b/src/servers/src/row_writer.rs index c11f3bfc9fc8..4b8956015ef1 100644 --- a/src/servers/src/row_writer.rs +++ b/src/servers/src/row_writer.rs @@ -140,16 +140,10 @@ impl MultiTableData { pub fn write_tags( table_data: &mut TableData, - kvs: impl Iterator, + kvs: impl Iterator, one_row: &mut Vec, ) -> Result<()> { - let ktv_iter = kvs.map(|(k, v)| { - ( - k, - ColumnDataType::String, - ValueData::StringValue(v.to_string()), - ) - }); + let ktv_iter = kvs.map(|(k, v)| (k, ColumnDataType::String, ValueData::StringValue(v))); write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row) } @@ -209,18 +203,22 @@ fn write_by_semantic_type( } = table_data; for (name, datatype, value) in ktv_iter { - let index = column_indexes.entry(name.clone()).or_insert(schema.len()); - if *index == schema.len() { + let index = column_indexes.get(&name); + if let Some(index) = index { + check_schema(datatype, semantic_type, &schema[*index])?; + one_row[*index].value_data = Some(value); + } else { + let index = schema.len(); schema.push(ColumnSchema { - column_name: name, + column_name: name.clone(), datatype: datatype as i32, semantic_type: semantic_type as i32, ..Default::default() }); - one_row.push(value.into()); - } else { - check_schema(datatype, semantic_type, &schema[*index])?; - one_row[*index].value_data = Some(value); + column_indexes.insert(name, index); + one_row.push(Value { + value_data: Some(value), + }); } } @@ -264,22 +262,24 @@ pub fn write_ts_precision( } }; - let index = column_indexes.entry(name.clone()).or_insert(schema.len()); - if *index == schema.len() { - schema.push(ColumnSchema { - column_name: name, - datatype: ColumnDataType::TimestampMillisecond as i32, - semantic_type: SemanticType::Timestamp as i32, - ..Default::default() - }); - one_row.push(ValueData::TimestampMillisecondValue(ts).into()) - } else { + let index = column_indexes.get(&name); + if let Some(index) = index { check_schema( ColumnDataType::TimestampMillisecond, SemanticType::Timestamp, &schema[*index], )?; one_row[*index].value_data = Some(ValueData::TimestampMillisecondValue(ts)); + } else { + let index = schema.len(); + schema.push(ColumnSchema { + column_name: name.clone(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + ..Default::default() + }); + column_indexes.insert(name, index); + one_row.push(ValueData::TimestampMillisecondValue(ts).into()) } Ok(())