Skip to content

Commit

Permalink
Rewrite _resolve endpoint to use the schema registry (#520)
Browse files Browse the repository at this point in the history
Before this change, the `_resolve` endpoint had several bugs. One of
them, which affected me, was wrong result for an index present in
Elastic, but missing from ClickHouse and not enabled in the config. In
that case Quesma should reach to Elastic, but previously it just looked
at ClickHouse and immediately returned 404.

Since the previous logic was quite convoluted and used older
`LogManager` instead of `schema.Registry`, this PR largely rewrites the
implementation to be simpler, better documented and to use the
`schema.Registry`.

I extensively tested all possible scenarios (see the table in
`Test_combineSourcesFromElasticWithRegistry`), both as a unit test and
manually in Kibana.

The resolve code is now also moved to `functionality/resolve` package.

To make it possible to use the `schema.Registry` in the new code,
`schema.Schema` had to be extended with new boolean flag
`ExistsInDataSource`. Before the addition of this boolean flag, schema
registry contained both: schemas without created table in ClickHouse
(only specified/enabled in configuration) and schemas with created table
in ClickHouse - no easy way to differentiate between them.
  • Loading branch information
avelanarius authored Jul 15, 2024
1 parent 4871087 commit d77d241
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 105 deletions.
76 changes: 76 additions & 0 deletions quesma/quesma/functionality/resolve/resolve.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package resolve

import (
"quesma/elasticsearch"
"quesma/quesma/config"
"quesma/schema"
"slices"
)

func HandleResolve(pattern string, sr schema.Registry, cfg config.QuesmaConfiguration) (elasticsearch.Sources, error) {
// In the _resolve endpoint we want to combine the results from both schema.Registry and Elasticsearch

normalizedPattern := elasticsearch.NormalizePattern(pattern)

// Optimization: if it's not a pattern, let's try avoiding querying Elasticsearch - let's first try
// finding that index in schema.Registry:
if !elasticsearch.IsIndexPattern(normalizedPattern) {
if foundSchema, found := sr.FindSchema(schema.TableName(normalizedPattern)); found {
if !foundSchema.ExistsInDataSource {
// index configured by the user, but not present in the data source
return elasticsearch.Sources{}, nil
}

return elasticsearch.Sources{
Indices: []elasticsearch.Index{},
Aliases: []elasticsearch.Alias{},
DataStreams: []elasticsearch.DataStream{
{
Name: normalizedPattern,
BackingIndices: []string{normalizedPattern},
TimestampField: `@timestamp`,
},
},
}, nil
}

// ...index not found in schema.Registry (meaning the user did not configure it), but it could exist in Elastic
}

// Combine results from both schema.Registry and Elasticsearch:

// todo avoid creating new instances all the time
sourcesFromElastic, _, err := elasticsearch.NewIndexResolver(cfg.Elasticsearch.Url.String()).Resolve(normalizedPattern)
if err != nil {
return elasticsearch.Sources{}, err
}

combineSourcesFromElasticWithRegistry(&sourcesFromElastic, sr.AllSchemas(), normalizedPattern)
return sourcesFromElastic, nil
}

func combineSourcesFromElasticWithRegistry(sourcesFromElastic *elasticsearch.Sources, schemas map[schema.TableName]schema.Schema, normalizedPattern string) {
sourcesFromElastic.Indices =
slices.DeleteFunc(sourcesFromElastic.Indices, func(i elasticsearch.Index) bool {
_, exists := schemas[schema.TableName(i.Name)]
return exists
})
sourcesFromElastic.DataStreams = slices.DeleteFunc(sourcesFromElastic.DataStreams, func(i elasticsearch.DataStream) bool {
_, exists := schemas[schema.TableName(i.Name)]
return exists
})

for name, currentSchema := range schemas {
indexName := name.AsString()

if config.MatchName(normalizedPattern, indexName) && currentSchema.ExistsInDataSource {
sourcesFromElastic.DataStreams = append(sourcesFromElastic.DataStreams, elasticsearch.DataStream{
Name: indexName,
BackingIndices: []string{indexName},
TimestampField: `@timestamp`,
})
}
}
}
104 changes: 104 additions & 0 deletions quesma/quesma/functionality/resolve/resolve_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package resolve

import (
"github.com/stretchr/testify/assert"
"quesma/elasticsearch"
"quesma/schema"
"testing"
)

func Test_combineSourcesFromElasticWithRegistry(t *testing.T) {
// Expected behavior:
//
// # | In Elastic? | Exists in data source? | Enabled in the config (= present in schema.Registry)? | Quesma response
// 1 | NO | NO | NO | Not exists
// 2 | NO | NO | YES | Not exists
// 3 | YES | NO | NO | Exists
// 4 | YES | NO | YES | Not exist
// 5 | NO | YES | NO | Not exist
// 6 | NO | YES | YES | Exists
// 7 | YES | YES | NO | Exists
// 8 | YES | YES | YES | Exists

tests := []struct {
name string
sourcesFromElastic elasticsearch.Sources
schemas map[schema.TableName]schema.Schema
normalizedPattern string
expectedResult elasticsearch.Sources
}{
// Cases 1, 3 (index1), 5, 7 (index1):
{
name: "index not enabled in config, some unrelated index in Elastic",
sourcesFromElastic: elasticsearch.Sources{
Indices: []elasticsearch.Index{{Name: "index1"}},
Aliases: []elasticsearch.Alias{},
DataStreams: []elasticsearch.DataStream{},
},
schemas: map[schema.TableName]schema.Schema{}, // schema.Registry won't contain disabled indexes, even if they exist in the data source (manually created by the user)
normalizedPattern: "index*",
expectedResult: elasticsearch.Sources{
Indices: []elasticsearch.Index{{Name: "index1"}},
Aliases: []elasticsearch.Alias{},
DataStreams: []elasticsearch.DataStream{},
},
},
// Cases 2 (index2), 4 (index1):
{
name: "index enabled in config, not present in the data source; decoy index in Elastic",
sourcesFromElastic: elasticsearch.Sources{
Indices: []elasticsearch.Index{{Name: "index1"} /* decoy */, {Name: "index3"}},
Aliases: []elasticsearch.Alias{},
DataStreams: []elasticsearch.DataStream{},
},
schemas: map[schema.TableName]schema.Schema{
"index1": schema.Schema{ExistsInDataSource: false},
"index2": schema.Schema{ExistsInDataSource: false},
"quesma": schema.Schema{ExistsInDataSource: true},
},
normalizedPattern: "index*",
expectedResult: elasticsearch.Sources{
Indices: []elasticsearch.Index{{Name: "index3"}},
Aliases: []elasticsearch.Alias{},
DataStreams: []elasticsearch.DataStream{},
},
},
// Cases 6 (index2), 8 (index1, index3):
{
name: "index enabled in config, present in the data source",
sourcesFromElastic: elasticsearch.Sources{
Indices: []elasticsearch.Index{{Name: "index1"}, {Name: "index4"}},
Aliases: []elasticsearch.Alias{},
DataStreams: []elasticsearch.DataStream{{Name: "index3"}, {Name: "index5"}},
},
schemas: map[schema.TableName]schema.Schema{
"index1": schema.Schema{ExistsInDataSource: true},
"index2": schema.Schema{ExistsInDataSource: true},
"index3": schema.Schema{ExistsInDataSource: true},
"quesma": schema.Schema{ExistsInDataSource: true},
},
normalizedPattern: "index*",
expectedResult: elasticsearch.Sources{
Indices: []elasticsearch.Index{{Name: "index4"}},
Aliases: []elasticsearch.Alias{},
DataStreams: []elasticsearch.DataStream{
{Name: "index5"},
{Name: "index1", BackingIndices: []string{"index1"}, TimestampField: `@timestamp`},
{Name: "index2", BackingIndices: []string{"index2"}, TimestampField: `@timestamp`},
{Name: "index3", BackingIndices: []string{"index3"}, TimestampField: `@timestamp`},
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
combineSourcesFromElasticWithRegistry(&tt.sourcesFromElastic, tt.schemas, tt.normalizedPattern)
assert.ElementsMatchf(t, tt.sourcesFromElastic.Aliases, tt.expectedResult.Aliases, "Aliases don't match")
assert.ElementsMatchf(t, tt.sourcesFromElastic.Indices, tt.expectedResult.Indices, "Indices don't match")
assert.ElementsMatchf(t, tt.sourcesFromElastic.DataStreams, tt.expectedResult.DataStreams, "DataStreams don't match")
})
}
}
78 changes: 9 additions & 69 deletions quesma/quesma/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"quesma/quesma/functionality/bulk"
"quesma/quesma/functionality/doc"
"quesma/quesma/functionality/field_capabilities"
"quesma/quesma/functionality/resolve"
"quesma/quesma/functionality/terms_enum"
"quesma/quesma/mux"
"quesma/quesma/routes"
Expand All @@ -24,7 +25,6 @@ import (
"quesma/telemetry"
"quesma/tracing"
"regexp"
"slices"
"strings"
"time"
)
Expand Down Expand Up @@ -83,75 +83,11 @@ func configureRouter(cfg config.QuesmaConfiguration, sr schema.Registry, lm *cli
})

router.Register(routes.ResolveIndexPath, method("GET"), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
pattern := elasticsearch.NormalizePattern(req.Params["index"])
if elasticsearch.IsIndexPattern(pattern) {
// todo avoid creating new instances all the time
sources, found, err := elasticsearch.NewIndexResolver(cfg.Elasticsearch.Url.String()).Resolve(pattern)
if err != nil {
return nil, err
}
if !found {
return &mux.Result{StatusCode: 404}, nil
}

definitions, err := lm.GetTableDefinitions()
if err != nil {
return nil, err
}
sources.Indices = slices.DeleteFunc(sources.Indices, func(i elasticsearch.Index) bool {
return definitions.Has(i.Name)
})
sources.DataStreams = slices.DeleteFunc(sources.DataStreams, func(i elasticsearch.DataStream) bool {
return definitions.Has(i.Name)
})
definitions.Range(
func(name string, table *clickhouse.Table) bool {
if config.MatchName(elasticsearch.NormalizePattern(pattern), name) {
sources.DataStreams = append(sources.DataStreams, elasticsearch.DataStream{
Name: name,
BackingIndices: []string{name},
TimestampField: `@timestamp`,
})
}

return true
})

return resolveIndexResult(sources), nil
} else {
if config.MatchName(elasticsearch.NormalizePattern(pattern), pattern) {
definitions, err := lm.GetTableDefinitions()
if err != nil {
return nil, err
}

if definitions.Has(pattern) {
return resolveIndexResult(elasticsearch.Sources{
Indices: []elasticsearch.Index{},
Aliases: []elasticsearch.Alias{},
DataStreams: []elasticsearch.DataStream{
{
Name: pattern,
BackingIndices: []string{pattern},
TimestampField: `@timestamp`,
},
},
}), nil
} else {
return &mux.Result{StatusCode: 404}, nil
}
} else {
sources, found, err := elasticsearch.NewIndexResolver(cfg.Elasticsearch.Url.String()).Resolve(pattern)
if err != nil {
return nil, err
}
if !found {
return &mux.Result{StatusCode: 404}, nil
}

return resolveIndexResult(sources), nil
}
sources, err := resolve.HandleResolve(req.Params["index"], sr, cfg)
if err != nil {
return nil, err
}
return resolveIndexResult(sources), nil
})

router.Register(routes.IndexCountPath, and(method("GET"), matchedAgainstPattern(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
Expand Down Expand Up @@ -401,6 +337,10 @@ func elasticsearchInsertResult(body string, statusCode int) *mux.Result {
}

func resolveIndexResult(sources elasticsearch.Sources) *mux.Result {
if len(sources.Aliases) == 0 && len(sources.DataStreams) == 0 && len(sources.Indices) == 0 {
return &mux.Result{StatusCode: 404}
}

body, err := json.Marshal(sources)
if err != nil {
panic(err)
Expand Down
10 changes: 6 additions & 4 deletions quesma/schema/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ func (s *schemaRegistry) loadSchemas() (map[TableName]Schema, error) {
aliases := make(map[FieldName]FieldName)

s.populateSchemaFromStaticConfiguration(indexConfiguration, fields)
s.populateSchemaFromTableDefinition(definitions, indexName, fields)
existsInDataSource := s.populateSchemaFromTableDefinition(definitions, indexName, fields)
s.populateAliases(indexConfiguration, fields, aliases)
schemas[TableName(indexName)] = NewSchemaWithAliases(fields, aliases)
schemas[TableName(indexName)] = NewSchemaWithAliases(fields, aliases, existsInDataSource)
}

return schemas, nil
Expand Down Expand Up @@ -136,8 +136,9 @@ func (s *schemaRegistry) populateAliases(indexConfiguration config.IndexConfigur
}
}

func (s *schemaRegistry) populateSchemaFromTableDefinition(definitions map[string]Table, indexName string, fields map[FieldName]Field) {
if tableDefinition, found := definitions[indexName]; found {
func (s *schemaRegistry) populateSchemaFromTableDefinition(definitions map[string]Table, indexName string, fields map[FieldName]Field) (existsInDataSource bool) {
tableDefinition, found := definitions[indexName]
if found {
logger.Debug().Msgf("loading schema for table %s", indexName)

for _, column := range tableDefinition.Columns {
Expand All @@ -154,4 +155,5 @@ func (s *schemaRegistry) populateSchemaFromTableDefinition(definitions map[strin
}
}
}
return found
}
Loading

0 comments on commit d77d241

Please sign in to comment.