diff --git a/quesma/ab_testing/collector/collector.go b/quesma/ab_testing/collector/collector.go index e9390d09e..e75cbdf08 100644 --- a/quesma/ab_testing/collector/collector.go +++ b/quesma/ab_testing/collector/collector.go @@ -83,12 +83,13 @@ func NewCollector(ctx context.Context, ingester ingest.Ingester, healthQueue cha &diffTransformer{}, //&ppPrintFanout{}, //&mismatchedOnlyFilter{}, + &redactOkResults{}, //&elasticSearchFanout{ // url: "http://localhost:8080", // indexName: "ab_testing_logs", //}, &internalIngestFanout{ - indexName: "ab_testing_logs", + indexName: ab_testing.ABTestingTableName, ingestProcessor: ingester, }, }, diff --git a/quesma/ab_testing/collector/diff.go b/quesma/ab_testing/collector/diff.go index 0e7a038ee..612d83b61 100644 --- a/quesma/ab_testing/collector/diff.go +++ b/quesma/ab_testing/collector/diff.go @@ -66,22 +66,31 @@ func (t *diffTransformer) process(in EnrichedResults) (out EnrichedResults, drop } if len(mismatches) > 0 { - b, err := json.MarshalIndent(mismatches, "", " ") - - if err != nil { - return in, false, fmt.Errorf("failed to marshal mismatches: %w", err) - } - in.Mismatch.Mismatches = string(b) in.Mismatch.IsOK = false in.Mismatch.Count = len(mismatches) - in.Mismatch.Message = mismatches.String() topMismatchType, _ := t.mostCommonMismatchType(mismatches) if topMismatchType != "" { in.Mismatch.TopMismatchType = topMismatchType } + // if there are too many mismatches, we only show the first 20 + // this is to avoid overwhelming the user with too much information + const mismatchesSize = 20 + + if len(mismatches) > mismatchesSize { + mismatches = mismatches[:mismatchesSize] + } + + b, err := json.MarshalIndent(mismatches, "", " ") + + if err != nil { + return in, false, fmt.Errorf("failed to marshal mismatches: %w", err) + } + in.Mismatch.Mismatches = string(b) + in.Mismatch.Message = mismatches.String() + } else { in.Mismatch.Mismatches = "[]" in.Mismatch.IsOK = true diff --git a/quesma/ab_testing/collector/filters.go b/quesma/ab_testing/collector/filters.go index 73bcca505..bad21eea3 100644 --- a/quesma/ab_testing/collector/filters.go +++ b/quesma/ab_testing/collector/filters.go @@ -40,3 +40,27 @@ func (t *mismatchedOnlyFilter) process(in EnrichedResults) (out EnrichedResults, // avoid unused struct error var _ = &mismatchedOnlyFilter{} + +type redactOkResults struct { +} + +func (t *redactOkResults) name() string { + return "redactOkResults" +} + +func (t *redactOkResults) process(in EnrichedResults) (out EnrichedResults, drop bool, err error) { + + // we're not interested in the details of the request and responses if the mismatch is OK + + redactMsg := "***REDACTED***" + if in.Mismatch.IsOK { + in.Request.Body = redactMsg + in.A.Body = redactMsg + in.B.Body = redactMsg + in.Mismatch.Message = "OK" + } + + return in, false, nil +} + +var _ = &redactOkResults{} diff --git a/quesma/ab_testing/model.go b/quesma/ab_testing/model.go index 02d9d8531..3e21ee638 100644 --- a/quesma/ab_testing/model.go +++ b/quesma/ab_testing/model.go @@ -2,6 +2,8 @@ // SPDX-License-Identifier: Elastic-2.0 package ab_testing +const ABTestingTableName = "ab_testing_logs" + type Request struct { Path string `json:"path"` IndexName string `json:"index_name"` diff --git a/quesma/jsondiff/elastic_response_diff.go b/quesma/jsondiff/elastic_response_diff.go index e43cc4cf4..83a27c57b 100644 --- a/quesma/jsondiff/elastic_response_diff.go +++ b/quesma/jsondiff/elastic_response_diff.go @@ -6,7 +6,7 @@ import "fmt" // NewElasticResponseJSONDiff creates a JSONDiff instance that is tailored to compare Elasticsearch response JSONs. func NewElasticResponseJSONDiff() (*JSONDiff, error) { - d, err := NewJSONDiff("^id$", ".*Quesma_key_.*", "^took$") + d, err := NewJSONDiff("^id$", ".*Quesma_key_.*", "^took$", ".*__quesma_total_count", ".*\\._id", "^_shards.*", ".*\\._score", ".*\\._source", ".*\\.__quesma_originalKey") if err != nil { return nil, fmt.Errorf("could not create JSONDiff: %v", err) diff --git a/quesma/jsondiff/jsondiff.go b/quesma/jsondiff/jsondiff.go index f9bd69a4c..1ce9160ca 100644 --- a/quesma/jsondiff/jsondiff.go +++ b/quesma/jsondiff/jsondiff.go @@ -25,6 +25,8 @@ func newType(code, message string) mismatchType { var ( invalidType = newType("invalid_type", "Types are not equal") invalidValue = newType("invalid_value", "Values are not equal") + invalidNumberValue = newType("invalid_number_value", "Numbers are not equal") + invalidDateValue = newType("invalid_date_value", "Dates are not equal") invalidArrayLength = newType("invalid_array_length", "Array lengths are not equal") invalidArrayLengthOffByOne = newType("invalid_array_length_off_by_one", "Array lengths are off by one.") objectDifference = newType("object_difference", "Objects are different") @@ -355,6 +357,8 @@ func (d *JSONDiff) asType(a any) string { return fmt.Sprintf("%T", a) } +var dateRx = regexp.MustCompile(`\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}`) + func (d *JSONDiff) compare(expected any, actual any) { if d.isIgnoredPath() { @@ -399,9 +403,9 @@ func (d *JSONDiff) compare(expected any, actual any) { case float64: // float operations are noisy, we need to compare them with desired precision - - epsilon := 1e-9 - relativeTolerance := 1e-9 + // this is lousy, but it works for now + epsilon := 1e-3 + relativeTolerance := 1e-3 aFloat := expected.(float64) bFloat := actual.(float64) @@ -411,8 +415,37 @@ func (d *JSONDiff) compare(expected any, actual any) { relativeDiff := absDiff / math.Max(math.Abs(aFloat), math.Abs(bFloat)) if relativeDiff > relativeTolerance { - d.addMismatch(invalidValue, d.asValue(expected), d.asValue(actual)) + d.addMismatch(invalidNumberValue, d.asValue(expected), d.asValue(actual)) + } + } + + default: + d.addMismatch(invalidType, d.asType(expected), d.asType(actual)) + } + + case string: + + switch actualString := actual.(type) { + case string: + + if dateRx.MatchString(aVal) && dateRx.MatchString(actualString) { + + // TODO add better date comparison here + // parse both date and compare them with desired precision + + // elastics returns date in formats + // "2024-10-24T00:00:00.000+02:00" + // "2024-10-24T00:00:00.000Z" + + // quesma returns + // 2024-10-23T22:00:00.000 + compareOnly := "2000-01-" + + if aVal[:len(compareOnly)] != actualString[:len(compareOnly)] { + d.addMismatch(invalidDateValue, d.asValue(expected), d.asValue(actual)) } + + return } default: diff --git a/quesma/jsondiff/jsondiff_test.go b/quesma/jsondiff/jsondiff_test.go index 6d40fe776..ce8ddcb56 100644 --- a/quesma/jsondiff/jsondiff_test.go +++ b/quesma/jsondiff/jsondiff_test.go @@ -39,7 +39,7 @@ func TestJSONDiff(t *testing.T) { name: "Test 2", expected: `{"a": 1, "b": 2, "c": 3}`, actual: `{"a": 1, "b": 3, "c": 3}`, - problems: []JSONMismatch{mismatch("b", invalidValue)}, + problems: []JSONMismatch{mismatch("b", invalidNumberValue)}, }, { @@ -67,7 +67,7 @@ func TestJSONDiff(t *testing.T) { name: "array element difference", expected: `{"a": [1, 2, 3], "b": 2, "c": 3}`, actual: `{"a": [1, 2, 4], "b": 2, "c": 3}`, - problems: []JSONMismatch{mismatch("a.[2]", invalidValue)}, + problems: []JSONMismatch{mismatch("a.[2]", invalidNumberValue)}, }, { @@ -81,28 +81,28 @@ func TestJSONDiff(t *testing.T) { name: "object difference", expected: `{"a": {"b": 1}, "c": 3}`, actual: `{"a": {"b": 2}, "c": 3}`, - problems: []JSONMismatch{mismatch("a.b", invalidValue)}, + problems: []JSONMismatch{mismatch("a.b", invalidNumberValue)}, }, { name: "deep path difference", expected: `{"a": {"d": {"b": 1}}, "c": 3}`, actual: `{"a": {"d": {"b": 2}}, "c": 3}`, - problems: []JSONMismatch{mismatch("a.d.b", invalidValue)}, + problems: []JSONMismatch{mismatch("a.d.b", invalidNumberValue)}, }, { name: "deep path difference", expected: `{"a": {"d": {"b": 1}}, "c": 3, "_ignore": 1}`, actual: `{"a": {"d": {"b": 2}}, "c": 3}`, - problems: []JSONMismatch{mismatch("a.d.b", invalidValue)}, + problems: []JSONMismatch{mismatch("a.d.b", invalidNumberValue)}, }, { name: "array sort difference ", expected: `{"a": [1, 2, 3], "b": 2, "c": 3}`, actual: `{"a": [1, 3, 2], "b": 2, "c": 3}`, - problems: []JSONMismatch{mismatch("a.[1]", invalidValue), mismatch("a.[2]", invalidValue)}, + problems: []JSONMismatch{mismatch("a.[1]", invalidNumberValue), mismatch("a.[2]", invalidNumberValue)}, }, { diff --git a/quesma/quesma/config/config_v2.go b/quesma/quesma/config/config_v2.go index 9e10e73c3..0ebe1c720 100644 --- a/quesma/quesma/config/config_v2.go +++ b/quesma/quesma/config/config_v2.go @@ -578,8 +578,9 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration { } } - if len(processedConfig.QueryTarget) == 2 && !(processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) { - errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration - when you specify two targets, ClickHouse has to be the primary one and Elastic has to be the secondary one", indexName)) + if len(processedConfig.QueryTarget) == 2 && !((processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) || + (processedConfig.QueryTarget[0] == ElasticsearchTarget && processedConfig.QueryTarget[1] == ClickhouseTarget)) { + errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration", indexName)) continue } @@ -676,10 +677,12 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration { } } - if len(processedConfig.QueryTarget) == 2 && !(processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) { - errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration - when you specify two targets, ClickHouse has to be the primary one and Elastic has to be the secondary one", indexName)) + if len(processedConfig.QueryTarget) == 2 && !((processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) || + (processedConfig.QueryTarget[0] == ElasticsearchTarget && processedConfig.QueryTarget[1] == ClickhouseTarget)) { + errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration", indexName)) continue } + if len(processedConfig.QueryTarget) == 2 { // Turn on A/B testing processedConfig.Optimizers = make(map[string]OptimizerConfiguration) diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go index de7204654..ed22cff85 100644 --- a/quesma/quesma/search.go +++ b/quesma/quesma/search.go @@ -203,7 +203,7 @@ func (q *QueryRunner) runExecutePlanAsync(ctx context.Context, plan *model.Execu }() } -func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, table *clickhouse.Table, body types.JSON, optAsync *AsyncQuery, optComparePlansCh chan<- executionPlanResult) (responseBody []byte, err error) { +func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, table *clickhouse.Table, body types.JSON, optAsync *AsyncQuery, optComparePlansCh chan<- executionPlanResult, abTestingMainPlan bool) (responseBody []byte, err error) { contextValues := tracing.ExtractValues(ctx) id := contextValues.RequestId path := contextValues.RequestPath @@ -214,7 +214,7 @@ func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan sendMainPlanResult := func(responseBody []byte, err error) { if optComparePlansCh != nil { optComparePlansCh <- executionPlanResult{ - isMain: true, + isMain: abTestingMainPlan, plan: plan, err: err, responseBody: responseBody, @@ -300,16 +300,16 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin return nil, end_user_errors.ErrSearchCondition.New(fmt.Errorf("no connectors to use")) } - var clickhouseDecision *table_resolver.ConnectorDecisionClickhouse - var elasticDecision *table_resolver.ConnectorDecisionElastic + var clickhouseConnector *table_resolver.ConnectorDecisionClickhouse + for _, connector := range decision.UseConnectors { switch c := connector.(type) { case *table_resolver.ConnectorDecisionClickhouse: - clickhouseDecision = c + clickhouseConnector = c case *table_resolver.ConnectorDecisionElastic: - elasticDecision = c + // NOP default: return nil, fmt.Errorf("unknown connector type: %T", c) @@ -317,14 +317,10 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin } // it's impossible here to don't have a clickhouse decision - if clickhouseDecision == nil { + if clickhouseConnector == nil { return nil, fmt.Errorf("no clickhouse connector") } - if elasticDecision != nil { - fmt.Println("elastic", elasticDecision) - } - var responseBody []byte startTime := time.Now() @@ -343,7 +339,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin var table *clickhouse.Table // TODO we should use schema here only var currentSchema schema.Schema - resolvedIndexes := clickhouseDecision.ClickhouseTables + resolvedIndexes := clickhouseConnector.ClickhouseTables if len(resolvedIndexes) == 1 { indexName := resolvedIndexes[0] // we got exactly one table here because of the check above @@ -446,17 +442,12 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin plan.StartTime = startTime plan.Name = model.MainExecutionPlan - // Some flags may trigger alternative execution plans, this is primary for dev - - alternativePlan, alternativePlanExecutor := q.maybeCreateAlternativeExecutionPlan(ctx, resolvedIndexes, plan, queryTranslator, body, table, optAsync != nil) - - var optComparePlansCh chan<- executionPlanResult - - if alternativePlan != nil { - optComparePlansCh = q.runAlternativePlanAndComparison(ctx, alternativePlan, alternativePlanExecutor, body) + if decision.EnableABTesting { + return q.executeABTesting(ctx, plan, queryTranslator, table, body, optAsync, decision, indexPattern) } - return q.executePlan(ctx, plan, queryTranslator, table, body, optAsync, optComparePlansCh) + return q.executePlan(ctx, plan, queryTranslator, table, body, optAsync, nil, true) + } func (q *QueryRunner) storeAsyncSearch(qmc *ui.QuesmaManagementConsole, id, asyncId string, diff --git a/quesma/quesma/search_ab_testing.go b/quesma/quesma/search_ab_testing.go new file mode 100644 index 000000000..5bec6d5df --- /dev/null +++ b/quesma/quesma/search_ab_testing.go @@ -0,0 +1,358 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package quesma + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "quesma/ab_testing" + "quesma/clickhouse" + "quesma/elasticsearch" + "quesma/logger" + "quesma/model" + "quesma/queryparser" + "quesma/quesma/async_search_storage" + "quesma/quesma/recovery" + "quesma/quesma/types" + "quesma/quesma/ui" + "quesma/table_resolver" + "quesma/tracing" + "quesma/util" + "time" +) + +type executionPlanResult struct { + isMain bool + plan *model.ExecutionPlan + err error + responseBody []byte + endTime time.Time +} + +// runABTestingResultsCollector runs the alternative plan and comparison method in the background. It returns a channel to collect the main plan results. +func (q *QueryRunner) runABTestingResultsCollector(ctx context.Context, indexPattern string, body types.JSON) (chan<- executionPlanResult, context.Context) { + + contextValues := tracing.ExtractValues(ctx) + + backgroundContext, cancelFunc := context.WithCancel(tracing.NewContextWithRequest(ctx)) + + numberOfExpectedResults := len([]string{model.MainExecutionPlan, model.AlternativeExecutionPlan}) + + optComparePlansCh := make(chan executionPlanResult, numberOfExpectedResults) + + // collector + go func(optComparePlansCh <-chan executionPlanResult) { + defer recovery.LogPanic() + + var aResult *executionPlanResult + var bResult *executionPlanResult + + for aResult == nil || bResult == nil { + + select { + + case r := <-optComparePlansCh: + if r.isMain { + aResult = &r + } else { + bResult = &r + } + + case <-time.After(1 * time.Minute): + logger.ErrorWithCtx(ctx).Msgf("timeout waiting for A/B results. A result: %v, B result: %v", aResult, bResult) + // and cancel the context to stop the execution of the alternative plan + cancelFunc() + return + } + } + + bytes, err := body.Bytes() + if err != nil { + bytes = []byte("error converting body to bytes") + } + + errorToString := func(err error) string { + if err != nil { + return err.Error() + } + return "" + } + + abResult := ab_testing.Result{ + Request: ab_testing.Request{ + Path: contextValues.RequestPath, + IndexName: indexPattern, + Body: string(bytes), + }, + + A: ab_testing.Response{ + Name: aResult.plan.Name, + Body: string(aResult.responseBody), + Time: aResult.endTime.Sub(aResult.plan.StartTime).Seconds(), + Error: errorToString(aResult.err), + }, + + B: ab_testing.Response{ + Name: bResult.plan.Name, + Body: string(bResult.responseBody), + Time: bResult.endTime.Sub(bResult.plan.StartTime).Seconds(), + Error: errorToString(bResult.err), + }, + RequestID: contextValues.RequestId, + OpaqueID: contextValues.OpaqueId, + } + + q.ABResultsSender.Send(abResult) + + }(optComparePlansCh) + + return optComparePlansCh, backgroundContext +} + +func (q *QueryRunner) executeABTesting(ctx context.Context, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, table *clickhouse.Table, body types.JSON, optAsync *AsyncQuery, decision *table_resolver.Decision, indexPattern string) ([]byte, error) { + + optComparePlansCh, backgroundContext := q.runABTestingResultsCollector(ctx, indexPattern, body) + + var planExecutors []func(ctx context.Context) ([]byte, error) + + for i, connector := range decision.UseConnectors { + + isMainPlan := i == 0 // the first plan is the main plan + + var planExecutor func(ctx context.Context) ([]byte, error) + + switch connector.(type) { + + case *table_resolver.ConnectorDecisionClickhouse: + planExecutor = func(ctx context.Context) ([]byte, error) { + plan.Name = "clickhouse" + return q.executePlan(ctx, plan, queryTranslator, table, body, optAsync, optComparePlansCh, isMainPlan) + } + + case *table_resolver.ConnectorDecisionElastic: + planExecutor = func(ctx context.Context) ([]byte, error) { + elasticPlan := &model.ExecutionPlan{ + IndexPattern: plan.IndexPattern, + QueryRowsTransformers: []model.QueryRowsTransformer{}, + Queries: []*model.Query{}, + StartTime: plan.StartTime, + Name: "elastic", + } + return q.executePlanElastic(ctx, elasticPlan, body, optAsync, optComparePlansCh, isMainPlan) + } + + default: + return nil, fmt.Errorf("unknown connector type: %T", connector) + } + planExecutors = append(planExecutors, planExecutor) + } + + if len(planExecutors) != 2 { + return nil, fmt.Errorf("expected 2 plans (A,B) to execute, but got %d", len(planExecutors)) + } + + // B plan aka alternative + go func() { + defer recovery.LogPanic() + _, _ = planExecutors[1](backgroundContext) // ignore the result + }() + + // A plan aka main plan + // run the first plan in the main thread + return planExecutors[0](ctx) +} + +type asyncElasticSearchWithError struct { + response types.JSON + translatedQueryBody []types.TranslatedSQLQuery + err error +} + +func (q *QueryRunner) executePlanElastic(ctx context.Context, plan *model.ExecutionPlan, requestBody types.JSON, optAsync *AsyncQuery, optComparePlansCh chan<- executionPlanResult, abTestingMainPlan bool) (responseBody []byte, err error) { + + contextValues := tracing.ExtractValues(ctx) + id := contextValues.RequestId + path := contextValues.RequestPath + opaqueId := contextValues.OpaqueId + + doneCh := make(chan asyncElasticSearchWithError, 1) + + sendABResult := func(response []byte, err error) { + optComparePlansCh <- executionPlanResult{ + isMain: abTestingMainPlan, // TODO + plan: plan, + err: err, + responseBody: response, + endTime: time.Now(), + } + } + + go func() { + defer recovery.LogAndHandlePanic(ctx, func(err error) { + doneCh <- asyncElasticSearchWithError{err: err} + }) + + resp, err := q.callElastic(ctx, plan, requestBody, optAsync) + + doneCh <- asyncElasticSearchWithError{response: resp, translatedQueryBody: nil, err: err} + }() + + if optAsync == nil { + bodyAsBytes, _ := requestBody.Bytes() + response := <-doneCh + if response.err != nil { + err = response.err + sendABResult(nil, err) + return nil, err + } + + pushSecondaryInfo(q.quesmaManagementConsole, id, "", path, bodyAsBytes, response.translatedQueryBody, responseBody, plan.StartTime) + sendABResult(responseBody, err) + return responseBody, err + } else { + select { + + case <-ctx.Done(): + return nil, ctx.Err() + + case <-time.After(time.Duration(optAsync.waitForResultsMs) * time.Millisecond): + go func() { // Async search takes longer. Return partial results and wait for + recovery.LogPanicWithCtx(ctx) + res := <-doneCh + responseBody, err = q.storeAsyncSearchWithRaw(q.quesmaManagementConsole, id, optAsync.asyncId, optAsync.startTime, path, requestBody, res.response, res.err, res.translatedQueryBody, true, opaqueId) + sendABResult(responseBody, err) + }() + return q.handlePartialAsyncSearch(ctx, optAsync.asyncId) + case res := <-doneCh: + responseBody, err = q.storeAsyncSearchWithRaw(q.quesmaManagementConsole, id, optAsync.asyncId, optAsync.startTime, path, requestBody, res.response, res.err, res.translatedQueryBody, true, opaqueId) + sendABResult(responseBody, err) + return responseBody, err + } + } +} + +func (q *QueryRunner) callElastic(ctx context.Context, plan *model.ExecutionPlan, requestBody types.JSON, optAsync *AsyncQuery) (responseBody types.JSON, err error) { + + url := fmt.Sprintf("%s/_search", plan.IndexPattern) + + client := elasticsearch.NewSimpleClient(&q.cfg.Elasticsearch) + + requestBodyAsBytes, err := requestBody.Bytes() + if err != nil { + return nil, err + } + + resp, err := client.Request(ctx, "POST", url, requestBodyAsBytes) + + if err != nil { + return nil, err + } + + data, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if err := resp.Body.Close(); err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + // here we try to parse the response as JSON, if it fails we return the plain text + responseBody, err = types.ParseJSON(string(data)) + if err != nil { + responseBody = types.JSON{"plainText": string(data)} + } + + return responseBody, fmt.Errorf("error calling elastic. got error code: %d", resp.StatusCode) + } + + contextValues := tracing.ExtractValues(ctx) + pushPrimaryInfo(q.quesmaManagementConsole, contextValues.RequestId, data, plan.StartTime) + + responseBody, err = types.ParseJSON(string(data)) + + if err != nil { + return nil, err + } + + return responseBody, nil +} + +// this is a copy of AsyncSearchEntireResp +type AsyncSearchElasticResp struct { + StartTimeInMillis uint64 `json:"start_time_in_millis"` + CompletionTimeInMillis uint64 `json:"completion_time_in_millis"` + ExpirationTimeInMillis uint64 `json:"expiration_time_in_millis"` + ID string `json:"id,omitempty"` + IsRunning bool `json:"is_running"` + IsPartial bool `json:"is_partial"` + // CompletionStatus If the async search completed, this field shows the status code of the + // search. + // For example, 200 indicates that the async search was successfully completed. + // 503 indicates that the async search was completed with an error. + CompletionStatus *int `json:"completion_status,omitempty"` + Response any `json:"response"` +} + +func WrapElasticResponseAsAsync(searchResponse any, asyncId string, isPartial bool, completionStatus *int) *AsyncSearchElasticResp { + + response := AsyncSearchElasticResp{ + Response: searchResponse, + ID: asyncId, + IsPartial: isPartial, + IsRunning: isPartial, + } + + response.CompletionStatus = completionStatus + return &response +} + +// TODO rename and change signature to use asyncElasticSearchWithError +func (q *QueryRunner) storeAsyncSearchWithRaw(qmc *ui.QuesmaManagementConsole, id, asyncId string, + startTime time.Time, path string, body types.JSON, resultJSON types.JSON, resultError error, translatedQueryBody []types.TranslatedSQLQuery, keep bool, opaqueId string) (responseBody []byte, err error) { + + took := time.Since(startTime) + + bodyAsBytes, err := body.Bytes() + if err != nil { + return nil, err + } + + if resultError == nil { + okStatus := 200 + asyncResponse := WrapElasticResponseAsAsync(resultJSON, asyncId, false, &okStatus) + responseBody, err = json.MarshalIndent(asyncResponse, "", " ") + } else { + responseBody, _ = queryparser.EmptyAsyncSearchResponse(asyncId, false, 503) + err = resultError + } + + qmc.PushSecondaryInfo(&ui.QueryDebugSecondarySource{ + Id: id, + AsyncId: asyncId, + OpaqueId: opaqueId, + Path: path, + IncomingQueryBody: bodyAsBytes, + QueryBodyTranslated: translatedQueryBody, + QueryTranslatedResults: responseBody, + SecondaryTook: took, + }) + + if keep { + compressedBody := responseBody + isCompressed := false + if err == nil { + if compressed, compErr := util.Compress(responseBody); compErr == nil { + compressedBody = compressed + isCompressed = true + } + } + q.AsyncRequestStorage.Store(asyncId, async_search_storage.NewAsyncRequestResult(compressedBody, err, time.Now(), isCompressed)) + } + + return +} diff --git a/quesma/quesma/search_alternative.go b/quesma/quesma/search_alternative.go deleted file mode 100644 index 96f34a561..000000000 --- a/quesma/quesma/search_alternative.go +++ /dev/null @@ -1,215 +0,0 @@ -// Copyright Quesma, licensed under the Elastic License 2.0. -// SPDX-License-Identifier: Elastic-2.0 -package quesma - -import ( - "bytes" - "context" - "fmt" - "io" - "net/http" - "quesma/ab_testing" - "quesma/clickhouse" - "quesma/elasticsearch" - "quesma/logger" - "quesma/model" - "quesma/quesma/config" - "quesma/quesma/recovery" - "quesma/quesma/types" - "quesma/tracing" - "time" -) - -type executionPlanResult struct { - isMain bool - plan *model.ExecutionPlan - err error - responseBody []byte - endTime time.Time -} - -type executionPlanExecutor func(ctx context.Context) ([]byte, error) - -// runAlternativePlanAndComparison runs the alternative plan and comparison method in the background. It returns a channel to collect the main plan results. -func (q *QueryRunner) runAlternativePlanAndComparison(ctx context.Context, plan *model.ExecutionPlan, alternativePlanExecutor executionPlanExecutor, body types.JSON) chan<- executionPlanResult { - - contextValues := tracing.ExtractValues(ctx) - - numberOfExpectedResults := len([]string{model.MainExecutionPlan, model.AlternativeExecutionPlan}) - - optComparePlansCh := make(chan executionPlanResult, numberOfExpectedResults) - - // run alternative plan in the background (generator) - go func(optComparePlansCh chan<- executionPlanResult) { - defer recovery.LogPanic() - - // results are passed via channel - newCtx := tracing.NewContextWithRequest(ctx) - body, err := alternativePlanExecutor(newCtx) - - optComparePlansCh <- executionPlanResult{ - isMain: false, - plan: plan, - err: err, - responseBody: body, - endTime: time.Now(), - } - - }(optComparePlansCh) - - // collector - go func(optComparePlansCh <-chan executionPlanResult) { - defer recovery.LogPanic() - var alternative executionPlanResult - var main executionPlanResult - - for range numberOfExpectedResults { - r := <-optComparePlansCh - logger.InfoWithCtx(ctx).Msgf("received results %s", r.plan.Name) - if r.isMain { - main = r - } else { - alternative = r - } - } - - bytes, err := body.Bytes() - if err != nil { - bytes = []byte("error converting body to bytes") - } - - errorToString := func(err error) string { - if err != nil { - return err.Error() - } - return "" - } - - abResult := ab_testing.Result{ - Request: ab_testing.Request{ - Path: contextValues.RequestPath, - IndexName: plan.IndexPattern, - Body: string(bytes), - }, - - A: ab_testing.Response{ - Name: main.plan.Name, - Body: string(main.responseBody), - Time: main.endTime.Sub(main.plan.StartTime).Seconds(), - Error: errorToString(main.err), - }, - - B: ab_testing.Response{ - Name: alternative.plan.Name, - Body: string(alternative.responseBody), - Time: alternative.endTime.Sub(alternative.plan.StartTime).Seconds(), - Error: errorToString(alternative.err), - }, - RequestID: contextValues.RequestId, - OpaqueID: contextValues.OpaqueId, - } - q.ABResultsSender.Send(abResult) - - }(optComparePlansCh) - - return optComparePlansCh -} - -func (q *QueryRunner) maybeCreateAlternativeExecutionPlan(ctx context.Context, indexes []string, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, body types.JSON, table *clickhouse.Table, isAsync bool) (*model.ExecutionPlan, executionPlanExecutor) { - - // TODO not sure how to check configure when we have multiple indexes - if len(indexes) != 1 { - return nil, nil - } - - resolvedTableName := indexes[0] - - // TODO is should be enabled in a different way. it's not an optimizer - cfg, disabled := q.cfg.IndexConfig[resolvedTableName].GetOptimizerConfiguration(config.ElasticABOptimizerName) - if !disabled { - return q.askElasticAsAnAlternative(ctx, resolvedTableName, plan, queryTranslator, body, table, isAsync, cfg) - } - - return nil, nil -} - -func (q *QueryRunner) askElasticAsAnAlternative(ctx context.Context, resolvedTableName string, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, body types.JSON, table *clickhouse.Table, isAsync bool, props map[string]string) (*model.ExecutionPlan, executionPlanExecutor) { - - // the name of "B" responses - alternativeName := "elastic" - - // Here we should use backend connector - // - elasticUrl := q.cfg.Elasticsearch.Url.String() - user := q.cfg.Elasticsearch.User - pass := q.cfg.Elasticsearch.Password - - if url, ok := props["url"]; ok { - elasticUrl = url - } - - if u, ok := props["user"]; ok { - user = u - } - - if p, ok := props["password"]; ok { - pass = p - } - - if name, ok := props["name"]; ok { - alternativeName = name - } - - requestBody, err := body.Bytes() - if err != nil { - return nil, nil - } - - alternativePlan := &model.ExecutionPlan{ - IndexPattern: plan.IndexPattern, - QueryRowsTransformers: []model.QueryRowsTransformer{}, - Queries: []*model.Query{}, - StartTime: plan.StartTime, - Name: alternativeName, - } - - url := fmt.Sprintf("%s/%s/_search", elasticUrl, plan.IndexPattern) - - return alternativePlan, func(ctx context.Context) ([]byte, error) { - - client := &http.Client{} - - req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody)) - - if err != nil { - return nil, err - } - - elasticsearch.AddBasicAuthIfNeeded(req, user, pass) - - req.Header.Set("Content-Type", "application/json") - - resp, err := client.Do(req) - if err != nil { - return nil, err - } - - responseBody, err := io.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - if err := resp.Body.Close(); err != nil { - return nil, err - } - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("error calling elastic. got error code: %d", resp.StatusCode) - } - - contextValues := tracing.ExtractValues(ctx) - pushPrimaryInfo(q.quesmaManagementConsole, contextValues.RequestId, responseBody, plan.StartTime) - - return responseBody, nil - } -} diff --git a/quesma/table_resolver/rules.go b/quesma/table_resolver/rules.go index 3650198d9..3fd0a1ccd 100644 --- a/quesma/table_resolver/rules.go +++ b/quesma/table_resolver/rules.go @@ -169,6 +169,19 @@ func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfi ClickhouseTables: []string{input.source}}, &ConnectorDecisionElastic{}}, } + } else if targets[0] == config.ElasticsearchTarget && targets[1] == config.ClickhouseTarget { + + return &Decision{ + Reason: "Enabled in the config. A/B testing.", + EnableABTesting: true, + UseConnectors: []ConnectorDecision{ + &ConnectorDecisionElastic{}, + &ConnectorDecisionClickhouse{ + ClickhouseTableName: input.source, + ClickhouseTables: []string{input.source}}, + }, + } + } default: