Skip to content

Commit

Permalink
Multi-index search for Clickhouse-only indices
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit committed May 2, 2024
1 parent 60ffaa7 commit 6adcd1e
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 41 deletions.
46 changes: 20 additions & 26 deletions quesma/quesma/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"mitmproxy/quesma/quesma/routes"
"mitmproxy/quesma/quesma/termsenum"
"mitmproxy/quesma/quesma/ui"
"mitmproxy/quesma/stats/errorstats"
"mitmproxy/quesma/telemetry"
"mitmproxy/quesma/tracing"
"regexp"
Expand Down Expand Up @@ -166,36 +165,31 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
return elasticsearchQueryResult(string(responseBody), httpOk), nil
})
router.RegisterPathMatcher(routes.IndexAsyncSearchPath, "POST", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
if strings.Contains(params["index"], ",") {
errorstats.GlobalErrorStatistics.RecordKnownError("Multi index search is not supported", nil,
"Multi index search is not yet supported: "+params["index"])
return nil, errors.New("multi index search is not yet supported")
} else {
waitForResultsMs := 1000 // Defaults to 1 second as in docs
if v, ok := params["wait_for_completion_timeout"]; ok {
if w, err := time.ParseDuration(v); err == nil {
waitForResultsMs = int(w.Milliseconds())
} else {
logger.Warn().Msgf("Can't parse wait_for_completion_timeout value: %s", v)
}
waitForResultsMs := 1000 // Defaults to 1 second as in docs
if v, ok := params["wait_for_completion_timeout"]; ok {
if w, err := time.ParseDuration(v); err == nil {
waitForResultsMs = int(w.Milliseconds())
} else {
logger.Warn().Msgf("Can't parse wait_for_completion_timeout value: %s", v)
}
keepOnCompletion := false
if v, ok := params["keep_on_completion"]; ok {
if v == "true" {
keepOnCompletion = true
}
}
keepOnCompletion := false
if v, ok := params["keep_on_completion"]; ok {
if v == "true" {
keepOnCompletion = true
}
responseBody, err := queryRunner.handleAsyncSearch(ctx, cfg, params["index"], []byte(body), lm, im, console, waitForResultsMs, keepOnCompletion)
if err != nil {
if errors.Is(errIndexNotExists, err) {
return &mux.Result{StatusCode: 404}, nil
} else {
return nil, err
}
}
responseBody, err := queryRunner.handleAsyncSearch(ctx, cfg, params["index"], []byte(body), lm, im, console, waitForResultsMs, keepOnCompletion)
if err != nil {
if errors.Is(errIndexNotExists, err) {
return &mux.Result{StatusCode: 404}, nil
} else {
return nil, err
}
return elasticsearchQueryResult(string(responseBody), httpOk), nil
}
return elasticsearchQueryResult(string(responseBody), httpOk), nil
})

router.RegisterPathMatcher(routes.AsyncSearchIdPath, "GET", matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, params["id"])
responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, params["id"])
Expand Down
36 changes: 29 additions & 7 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC
im elasticsearch.IndexManagement,
qmc *ui.QuesmaManagementConsole, optAsync *AsyncQuery) ([]byte, error) {

logger.Debug().Msgf("resolved sources for index pattern %s -> %s", indexPattern, ResolveSources(indexPattern, cfg, im, lm))
sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, cfg, im, lm)

resolved := lm.ResolveIndexes(ctx, indexPattern)
if len(resolved) == 0 {
switch sources {
case sourceBoth:
logger.Error().Msgf("index pattern [%s] resolved to both elasticsearch indices: [%s] and clickhouse tables: [%s]", indexPattern, sourcesElastic, sourcesClickhouse)
return nil, errors.New("querying data in elasticsearch and clickhouse is not supported at the moment")
case sourceNone:
if elasticsearch.IsIndexPattern(indexPattern) {
if optAsync != nil {
return queryparser.EmptyAsyncSearchResponse(optAsync.asyncRequestIdStr, false, 200)
Expand All @@ -132,9 +135,28 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC
logger.WarnWithCtx(ctx).Msgf("could not resolve any table name for [%s]", indexPattern)
return nil, errIndexNotExists
}
} else if len(resolved) > 1 { // async search never worked for multiple indexes, TODO fix
logger.WarnWithCtx(ctx).Msgf("could not resolve multiple table names for [%s]", indexPattern)
resolved = resolved[1:2]
case sourceClickhouse:
logger.Info().Msgf("index pattern [%s] resolved to clickhouse tables: [%s]", indexPattern, sourcesClickhouse)
case sourceElasticsearch:
logger.Error().Msgf("index pattern [%s] resolved to elasticsearch indices: [%s]", indexPattern, sourcesElastic)
panic("elasticsearch-only indexes should not be routed here at all")
}
logger.Debug().Msgf("resolved sources for index pattern %s -> %s", indexPattern, sources)

if len(sourcesClickhouse) == 0 {
if elasticsearch.IsIndexPattern(indexPattern) {
if optAsync != nil {
return queryparser.EmptyAsyncSearchResponse(optAsync.asyncRequestIdStr, false, 200)
} else {
return queryparser.EmptySearchResponse(ctx), nil
}
} else {
logger.WarnWithCtx(ctx).Msgf("could not resolve any table name for [%s]", indexPattern)
return nil, errIndexNotExists
}
} else if len(sourcesClickhouse) > 1 { // async search never worked for multiple indexes, TODO fix
logger.WarnWithCtx(ctx).Msgf("could not resolve multiple table names for [%s], truncating", indexPattern)
sourcesClickhouse = sourcesClickhouse[1:2]
}

var responseBody, translatedQueryBody []byte
Expand Down Expand Up @@ -167,7 +189,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC
var queryInfo model.SearchQueryInfo
var count int

for _, resolvedTableName := range resolved {
for _, resolvedTableName := range sourcesClickhouse {
table, _ := tables.Load(resolvedTableName)
queryTranslator = &queryparser.ClickhouseQueryTranslator{ClickhouseLM: lm, Table: table, Ctx: ctx}
var simpleQuery queryparser.SimpleQuery
Expand Down
16 changes: 8 additions & 8 deletions quesma/quesma/source_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
sourceNone = "none"
)

func ResolveSources(indexPattern string, cfg config.QuesmaConfiguration, im elasticsearch.IndexManagement, lm *clickhouse.LogManager) string {
func ResolveSources(indexPattern string, cfg config.QuesmaConfiguration, im elasticsearch.IndexManagement, lm *clickhouse.LogManager) (string, []string, []string) {
if elasticsearch.IsIndexPattern(indexPattern) {
matchesElastic := []string{}
matchesClickhouse := []string{}
Expand All @@ -43,23 +43,23 @@ func ResolveSources(indexPattern string, cfg config.QuesmaConfiguration, im elas

switch {
case len(matchesElastic) > 0 && len(matchesClickhouse) > 0:
return sourceBoth
return sourceBoth, matchesElastic, matchesClickhouse
case len(matchesElastic) > 0:
return sourceElasticsearch
return sourceElasticsearch, matchesElastic, matchesClickhouse
case len(matchesClickhouse) > 0:
return sourceClickhouse
return sourceClickhouse, matchesElastic, matchesClickhouse
default:
return sourceNone
return sourceNone, matchesElastic, matchesClickhouse
}
} else {
if c, exists := cfg.IndexConfig[indexPattern]; exists {
if c.Enabled {
return sourceClickhouse
return sourceClickhouse, []string{}, []string{indexPattern}
} else {
return sourceElasticsearch
return sourceElasticsearch, []string{indexPattern}, []string{}
}
} else {
return sourceElasticsearch
return sourceElasticsearch, []string{indexPattern}, []string{}
}
}
}

0 comments on commit 6adcd1e

Please sign in to comment.