diff --git a/quesma/model/pipeline_aggregations/bucket_script.go b/quesma/model/pipeline_aggregations/bucket_script.go index 543823759..e7c319291 100644 --- a/quesma/model/pipeline_aggregations/bucket_script.go +++ b/quesma/model/pipeline_aggregations/bucket_script.go @@ -4,16 +4,20 @@ package pipeline_aggregations import ( "context" + "fmt" "quesma/logger" "quesma/model" + "quesma/util" + "strings" ) type BucketScript struct { - ctx context.Context + *PipelineAggregation + script string } -func NewBucketScript(ctx context.Context) BucketScript { - return BucketScript{ctx: ctx} +func NewBucketScript(ctx context.Context, script string) BucketScript { + return BucketScript{script: script, PipelineAggregation: newPipelineAggregation(ctx, "_count")} } func (query BucketScript) AggregationType() model.AggregationType { @@ -21,27 +25,64 @@ func (query BucketScript) AggregationType() model.AggregationType { } func (query BucketScript) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap { - if len(rows) == 0 { - logger.WarnWithCtx(query.ctx).Msg("no rows returned for bucket script aggregation") - return model.JsonMap{"value": 0} - } - var response []model.JsonMap - for _, row := range rows { - response = append(response, model.JsonMap{"value": row.Cols[0].Value}) - } - return model.JsonMap{ - "buckets": response, + const defaultValue = 0. + switch { + case query.script == "params.numerator != null && params.denominator != null && params.denominator != 0 ? params.numerator / params.denominator : 0": + numerator := query.findFilterValue(rows, "numerator") + denominator := query.findFilterValue(rows, "denominator") + if denominator == 0 { + return model.JsonMap{"value": defaultValue} + } + return model.JsonMap{"value": numerator / denominator} + case len(rows) == 1: + for _, row := range rows { + return model.JsonMap{"value": util.ExtractNumeric64(row.LastColValue())} + } } + + logger.WarnWithCtx(query.ctx).Msgf("unexpected result in bucket_script: %s, len(rows): %d. Returning default.", query.String(), len(rows)) + return model.JsonMap{"value": defaultValue} } -func (query BucketScript) CalculateResultWhenMissing(*model.Query, []model.QueryResultRow) []model.QueryResultRow { - return []model.QueryResultRow{} +func (query BucketScript) CalculateResultWhenMissing(parentRows []model.QueryResultRow) []model.QueryResultRow { + resultRows := make([]model.QueryResultRow, 0, len(parentRows)) + for _, parentRow := range parentRows { + resultRow := parentRow.Copy() + if len(resultRow.Cols) != 0 { + resultRow.Cols[len(resultRow.Cols)-1].Value = util.ExtractNumeric64(parentRow.LastColValue()) + } else { + logger.ErrorWithCtx(query.ctx).Msgf("unexpected empty parent row in bucket_script: %s", query.String()) + } + resultRows = append(resultRows, resultRow) + } + return resultRows } func (query BucketScript) String() string { - return "bucket script" + return fmt.Sprintf("bucket_script(isCount: %v, parent: %s, pathToParent: %v, parentBucketAggregation: %v, script: %v)", + query.isCount, query.Parent, query.PathToParent, query.parentBucketAggregation, query.script) } func (query BucketScript) PipelineAggregationType() model.PipelineAggregationType { - return model.PipelineParentAggregation // not sure, maybe it's sibling. + return model.PipelineParentAggregation // not sure, maybe it's sibling. change hasn't changed the result when running some tests. +} + +func (query BucketScript) findFilterValue(rows []model.QueryResultRow, filterName string) float64 { + const defaultValue = 0.0 + for _, row := range rows { + for _, col := range row.Cols { + colName := col.ColName + if !strings.HasSuffix(colName, "_col_0") { + continue + } + colName = strings.TrimSuffix(colName, "_col_0") + if strings.HasSuffix(colName, "-"+filterName) { + return float64(util.ExtractInt64(col.Value)) + } + } + } + + logger.WarnWithCtx(query.ctx).Msgf("could not find filter value for filter: %s, bucket_script: %s, len(rows): %d."+ + "Returning default", filterName, query.String(), len(rows)) + return defaultValue } diff --git a/quesma/model/pipeline_aggregations/pipeline_aggregation.go b/quesma/model/pipeline_aggregations/pipeline_aggregation.go index 787351008..d79da6a10 100644 --- a/quesma/model/pipeline_aggregations/pipeline_aggregation.go +++ b/quesma/model/pipeline_aggregations/pipeline_aggregation.go @@ -27,7 +27,7 @@ func newPipelineAggregation(ctx context.Context, bucketsPath string) *PipelineAg const delimiter = ">" if len(bucketsPath) == 0 { logger.WarnWithCtx(ctx).Msgf("invalid bucketsPath: %s. Using empty string as parent.", bucketsPath) - return &PipelineAggregation{} + return &PipelineAggregation{isCount: true} // count, as it's the simplest case } parent := "" @@ -54,6 +54,10 @@ func (p *PipelineAggregation) IsCount() bool { return p.isCount } +func (p *PipelineAggregation) GetParentBucketAggregation() model.QueryType { + return p.parentBucketAggregation +} + func (p *PipelineAggregation) SetParentBucketAggregation(parentBucketAggregation model.QueryType) { p.parentBucketAggregation = parentBucketAggregation } diff --git a/quesma/model/pipeline_query_type.go b/quesma/model/pipeline_query_type.go index c9155ac55..b41f9846c 100644 --- a/quesma/model/pipeline_query_type.go +++ b/quesma/model/pipeline_query_type.go @@ -26,5 +26,6 @@ type PipelineQueryType interface { GetPathToParent() []string IsCount() bool + GetParentBucketAggregation() QueryType SetParentBucketAggregation(parentBucketAggregation QueryType) } diff --git a/quesma/queryparser/aggregation_parser_test.go b/quesma/queryparser/aggregation_parser_test.go index 2e40ed546..4d8d17ac9 100644 --- a/quesma/queryparser/aggregation_parser_test.go +++ b/quesma/queryparser/aggregation_parser_test.go @@ -666,6 +666,7 @@ func allAggregationTests() []testdata.AggregationTestCase { add(kibana_visualize.PipelineAggregationTests, "kibana-visualize/pipeline_agg_req") add(clients.KunkkaTests, "clients/kunkka") add(clients.OpheliaTests, "clients/ophelia") + add(clients.CloverTests, "clients/clover") return allTests } diff --git a/quesma/queryparser/pancake_pipelines.go b/quesma/queryparser/pancake_pipelines.go index f27937eea..004af6545 100644 --- a/quesma/queryparser/pancake_pipelines.go +++ b/quesma/queryparser/pancake_pipelines.go @@ -19,6 +19,10 @@ func (p pancakePipelinesProcessor) selectPipelineRows(pipeline model.PipelineQue bucketAggregation *pancakeModelBucketAggregation) ( result []model.QueryResultRow) { + if bucketAggregation == nil { + return rows + } + isCount := pipeline.IsCount() for _, row := range rows { newRow := model.QueryResultRow{Index: row.Index} diff --git a/quesma/queryparser/pancake_sql_query_generation_test.go b/quesma/queryparser/pancake_sql_query_generation_test.go index 39abb223f..05c6d079b 100644 --- a/quesma/queryparser/pancake_sql_query_generation_test.go +++ b/quesma/queryparser/pancake_sql_query_generation_test.go @@ -60,6 +60,10 @@ func TestPancakeQueryGeneration(t *testing.T) { t.Skip("Need to implement order by top metrics (talk with Jacek, he has an idea)") } + if test.TestName == "multiple buckets_path(file:clients/clover,nr:1)" { + t.Skip("Unskip after merge of auto_date_histogram") + } + fmt.Println("i:", i, "test:", test.TestName) jsonp, err := types.ParseJSON(test.QueryRequestJson) diff --git a/quesma/queryparser/pipeline_aggregations.go b/quesma/queryparser/pipeline_aggregations.go index c4b9bd952..b5dc33d41 100644 --- a/quesma/queryparser/pipeline_aggregations.go +++ b/quesma/queryparser/pipeline_aggregations.go @@ -6,6 +6,7 @@ import ( "quesma/logger" "quesma/model" "quesma/model/pipeline_aggregations" + "strings" ) // CAUTION: maybe "return" everywhere isn't corrent, as maybe there can be multiple pipeline aggregations at one level. @@ -168,17 +169,20 @@ func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) ( if !ok { return } - if bucketsPath != pipeline_aggregations.BucketsPathCount { + if !strings.HasSuffix(bucketsPath, pipeline_aggregations.BucketsPathCount) { logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not '_count', but %s. Skipping this aggregation", bucketsPath) return } - // if ["script"]["source"] != "_value", skip the aggregation scriptRaw, exists := bucketScript["script"] if !exists { logger.WarnWithCtx(cw.Ctx).Msg("no script in bucket_script. Skipping this aggregation") return } + if script, ok := scriptRaw.(string); ok { + return pipeline_aggregations.NewBucketScript(cw.Ctx, script), true + } + script, ok := scriptRaw.(QueryMap) if !ok { logger.WarnWithCtx(cw.Ctx).Msgf("script is not a map, but %T, value: %v. Skipping this aggregation", scriptRaw, scriptRaw) @@ -186,8 +190,8 @@ func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) ( } if sourceRaw, exists := script["source"]; exists { if source, ok := sourceRaw.(string); ok { - if source != "_value" { - logger.WarnWithCtx(cw.Ctx).Msgf("source is not '_value', but %s. Skipping this aggregation", source) + if source != "_value" && source != "count * 1" { + logger.WarnWithCtx(cw.Ctx).Msgf("source is not '_value'/'count * 1', but %s. Skipping this aggregation", source) return } } else { @@ -200,10 +204,10 @@ func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) ( } // okay, we've checked everything, it's indeed a simple count - return pipeline_aggregations.NewBucketScript(cw.Ctx), true + return pipeline_aggregations.NewBucketScript(cw.Ctx, ""), true } -func (cw *ClickhouseQueryTranslator) parseBucketsPath(shouldBeQueryMap any, aggregationName string) (bucketsPath string, success bool) { +func (cw *ClickhouseQueryTranslator) parseBucketsPath(shouldBeQueryMap any, aggregationName string) (bucketsPathStr string, success bool) { queryMap, ok := shouldBeQueryMap.(QueryMap) if !ok { logger.WarnWithCtx(cw.Ctx).Msgf("%s is not a map, but %T, value: %v", aggregationName, shouldBeQueryMap, shouldBeQueryMap) @@ -214,10 +218,27 @@ func (cw *ClickhouseQueryTranslator) parseBucketsPath(shouldBeQueryMap any, aggr logger.WarnWithCtx(cw.Ctx).Msg("no buckets_path in avg_bucket") return } - bucketsPath, ok = bucketsPathRaw.(string) - if !ok { - logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not a string, but %T, value: %v", bucketsPathRaw, bucketsPathRaw) - return + + switch bucketsPath := bucketsPathRaw.(type) { + case string: + return bucketsPath, true + case QueryMap: + // TODO: handle arbitrary nr of keys (and arbitrary scripts, because we also handle only one special case) + if len(bucketsPath) == 1 || len(bucketsPath) == 2 { + for _, bucketPath := range bucketsPath { + if _, ok = bucketPath.(string); !ok { + logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not a map with string values, but %T. Skipping this aggregation", bucketPath) + return + } + // Kinda weird to return just the first value, but seems working on all cases so far. + // After fixing the TODO above, it should also get fixed. + return bucketPath.(string), true + } + } else { + logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not a map with one or two keys, but %d. Skipping this aggregation", len(bucketsPath)) + } } - return bucketsPath, true + + logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path in wrong format, type: %T, value: %v", bucketsPathRaw, bucketsPathRaw) + return } diff --git a/quesma/testdata/clients/clover.go b/quesma/testdata/clients/clover.go new file mode 100644 index 000000000..978edfc0d --- /dev/null +++ b/quesma/testdata/clients/clover.go @@ -0,0 +1,348 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package clients + +import ( + "quesma/model" + "quesma/testdata" +) + +var CloverTests = []testdata.AggregationTestCase{ + { // [0] TODO: add empty bucket for 1 of date_histogram buckets, because of min_doc_count=0 and extended_bounds. After extended_bounds PR. + TestName: "todo", + QueryRequestJson: ` + { + "aggs": { + "1": { + "aggs": { + "timeseries": { + "aggs": { + "2": { + "bucket_script": { + "buckets_path": { + "count": "_count" + }, + "gap_policy": "skip", + "script": { + "lang": "expression", + "source": "count * 1" + } + } + } + }, + "date_histogram": { + "extended_bounds": { + "max": 1726264800000, + "min": 1726264800000 + }, + "field": "@timestamp", + "fixed_interval": "2592000s", + "min_doc_count": 0, + "time_zone": "Europe/Warsaw" + } + } + }, + "meta": { + "indexPatternString": "ab*", + "intervalString": "2592000s", + "panelId": "0", + "seriesId": "1", + "timeField": "timestamp" + }, + "terms": { + "field": "nobel_laureate", + "order": { + "_count": "desc" + } + } + } + }, + "size": 0 + }`, + ExpectedResponse: ` + { + "took": 0, + "timed_out": false, + "_shards": { + "total": 1, + "successful": 1, + "failed": 0, + "skipped": 0 + }, + "hits": { + "total": { + "value": 14074, + "relation": "eq" + }, + "max_score": null, + "hits": [] + }, + "aggregations": { + "1": { + "buckets": [ + { + "doc_count": 672, + "key": "/apm", + "timeseries": { + "buckets": [ + { + "2": { + "value": 319 + }, + "doc_count": 319, + "key": 1728856800000, + "key_as_string": "2024-10-13T22:00:00.000" + } + ] + } + }, + { + "doc_count": 655, + "key": "/", + "timeseries": { + "buckets": [ + { + "2": { + "value": 12 + }, + "doc_count": 12, + "key": 1726264800000, + "key_as_string": "2024-09-13T22:00:00.000" + }, + { + "2": { + "value": 301 + }, + "doc_count": 301, + "key": 1728856800000, + "key_as_string": "2024-10-13T22:00:00.000" + } + ] + } + } + ], + "doc_count_error_upper_bound": 0, + "meta": { + "indexPatternString": "ab*", + "intervalString": "2592000s", + "panelId": "0", + "seriesId": "1", + "timeField": "timestamp" + }, + "sum_other_doc_count": 49100 + } + } + }`, + ExpectedPancakeResults: []model.QueryResultRow{ + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__1__parent_count", int64(50427)), + model.NewQueryResultCol("aggr__1__key_0", "/apm"), + model.NewQueryResultCol("aggr__1__count", int64(672)), + model.NewQueryResultCol("aggr__1__timeseries__key_0", int64(1728864000000/2592000000)), + model.NewQueryResultCol("aggr__1__timeseries__count", int64(319)), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__1__parent_count", int64(50427)), + model.NewQueryResultCol("aggr__1__key_0", "/"), + model.NewQueryResultCol("aggr__1__count", int64(655)), + model.NewQueryResultCol("aggr__1__timeseries__key_0", int64(1726272000000/2592000000)), + model.NewQueryResultCol("aggr__1__timeseries__count", int64(12)), + }}, + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__1__parent_count", int64(50427)), + model.NewQueryResultCol("aggr__1__key_0", "/"), + model.NewQueryResultCol("aggr__1__count", int64(655)), + model.NewQueryResultCol("aggr__1__timeseries__key_0", int64(1728864000000/2592000000)), + model.NewQueryResultCol("aggr__1__timeseries__count", int64(301)), + }}, + }, + ExpectedPancakeSQL: ` + SELECT "aggr__1__parent_count", "aggr__1__key_0", "aggr__1__count", + "aggr__1__timeseries__key_0", "aggr__1__timeseries__count" + FROM ( + SELECT "aggr__1__parent_count", "aggr__1__key_0", "aggr__1__count", + "aggr__1__timeseries__key_0", "aggr__1__timeseries__count", + dense_rank() OVER (ORDER BY "aggr__1__count" DESC, "aggr__1__key_0" ASC) AS + "aggr__1__order_1_rank", + dense_rank() OVER (PARTITION BY "aggr__1__key_0" ORDER BY + "aggr__1__timeseries__key_0" ASC) AS "aggr__1__timeseries__order_1_rank" + FROM ( + SELECT sum(count(*)) OVER () AS "aggr__1__parent_count", + "nobel_laureate" AS "aggr__1__key_0", + sum(count(*)) OVER (PARTITION BY "aggr__1__key_0") AS "aggr__1__count", + toInt64((toUnixTimestamp64Milli("@timestamp")+timeZoneOffset(toTimezone( + "@timestamp", 'Europe/Warsaw'))*1000) / 2592000000) AS + "aggr__1__timeseries__key_0", count(*) AS "aggr__1__timeseries__count" + FROM __quesma_table_name + GROUP BY "nobel_laureate" AS "aggr__1__key_0", + toInt64((toUnixTimestamp64Milli("@timestamp")+timeZoneOffset(toTimezone( + "@timestamp", 'Europe/Warsaw'))*1000) / 2592000000) AS + "aggr__1__timeseries__key_0")) + WHERE "aggr__1__order_1_rank"<=11 + ORDER BY "aggr__1__order_1_rank" ASC, "aggr__1__timeseries__order_1_rank" ASC + `, + }, + { // [1] + TestName: "multiple buckets_path", + QueryRequestJson: ` + { + "aggs": { + "timeseries": { + "aggs": { + "a2": { + "bucket_script": { + "buckets_path": { + "denominator": "a2-denominator>_count", + "numerator": "a2-numerator>_count" + }, + "script": "params.numerator != null && params.denominator != null && params.denominator != 0 ? params.numerator / params.denominator : 0" + } + }, + "a2-denominator": { + "filter": { + "bool": { + "filter": [], + "must": [], + "must_not": [], + "should": [] + } + } + }, + "a2-numerator": { + "filter": { + "bool": { + "filter": [], + "must": [ + { + "query_string": { + "analyze_wildcard": true, + "query": "NOT table.flower : clover" + } + } + ], + "must_not": [], + "should": [] + } + } + } + }, + "auto_date_histogram": { + "buckets": 1, + "field": "@timestamp" + }, + "meta": { + "indexPatternString": "ab*", + "intervalString": "900000ms", + "normalized": true, + "panelId": "0", + "seriesId": "1", + "timeField": "@timestamp" + } + } + }, + "query": { + "bool": { + "filter": [], + "must": [ + { + "range": { + "@timestamp": { + "format": "strict_date_optional_time", + "gte": "2024-10-11T09:58:03.723Z", + "lte": "2024-10-11T10:13:03.723Z" + } + } + }, + { + "bool": { + "filter": [], + "must": [], + "must_not": [], + "should": [] + } + }, + { + "bool": { + "filter": [], + "must": [], + "must_not": [], + "should": [] + } + } + ], + "must_not": [], + "should": [] + } + }, + "runtime_mappings": {}, + "size": 0, + "timeout": "30000ms", + "track_total_hits": true + }`, + ExpectedResponse: ` + { + "completion_time_in_millis": 1728635627258, + "expiration_time_in_millis": 1728635687254, + "id": "FlhaTzBhMkpQU3lLMmlzNHhBeU9FMHcbaUp3ZGNYdDNSaGF3STVFZ2xWY3RuQTo2MzU4", + "is_partial": false, + "is_running": false, + "response": { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 1, + "total": 1 + }, + "aggregations": { + "timeseries": { + "buckets": [ + { + "a2": { + "value": 202.0 + }, + "doc_count": 202, + "key": 1728518400000, + "key_as_string": "2024-10-10T00:00:00.000Z" + } + ], + "interval": "7d", + "meta": { + "dataViewId": "d3d7af60-4c81-11e8-b3d7-01146121b73d", + "indexPatternString": "kibana_sample_data_flights", + "intervalString": "54000000ms", + "normalized": true, + "panelId": "1a1d745d-0c21-4103-a2ae-df41d4fbd366", + "seriesId": "866fb08f-b9a4-43eb-a400-38ebb6c13aed", + "timeField": "timestamp" + } + } + }, + "hits": { + "hits": [], + "max_score": null, + "total": { + "relation": "eq", + "value": 202 + } + }, + "timed_out": false, + "took": 4 + }, + "start_time_in_millis": 1728635627254 + }`, + ExpectedPancakeResults: []model.QueryResultRow{ + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__timeseries__count", int64(202)), + }}, + }, + ExpectedPancakeSQL: ` + SELECT count(*) AS "aggr__timeseries__count", + countIf(True) AS + "name", + countIf(NOT ("field" = 'sth')) AS + "name" + FROM __quesma_table_name + WHERE ("@timestamp">=parseDateTime64BestEffort('2024-10-11T09:58:03.723Z') AND + "@timestamp"<=parseDateTime64BestEffort('2024-10-11T10:13:03.723Z'))`, + }, +} diff --git a/quesma/util/utils.go b/quesma/util/utils.go index b22e889a9..221e99219 100644 --- a/quesma/util/utils.go +++ b/quesma/util/utils.go @@ -718,6 +718,13 @@ func ExtractNumeric64Maybe(value any) (asFloat64 float64, success bool) { return 0.0, false } +// ExtractNumeric64 returns float64 value behind `value`, if it's numeric (some kind of (*)int or (*)float). +// Returns 0 if `value` is not numeric. +func ExtractNumeric64(value any) float64 { + asFloat64, _ := ExtractNumeric64Maybe(value) + return asFloat64 +} + type sqlMockMismatchSql struct { expected string actual string