Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group indexes by prefix (common table) #1175

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ func (lm *LogManager) Start() {

lm.tableDiscovery.ReloadTableDefinitions()

logger.Info().Msgf("schemas loaded: %s", lm.tableDiscovery.TableDefinitions().Keys())
schemaNames := lm.tableDiscovery.TableDefinitions().Keys()
if len(schemaNames) < 20 {
logger.Info().Msgf("schemas loaded: %s", lm.tableDiscovery.TableDefinitions().Keys())
} else {
logger.Info().Msgf("total schemas loaded: %d", len(schemaNames))
}

const reloadInterval = 1 * time.Minute
forceReloadCh := lm.tableDiscovery.ForceReloadCh()

Expand Down
6 changes: 3 additions & 3 deletions quesma/comment_metadata/comment_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ func (c *CommentMetadata) Marshall() string {
return metadataPrefix + "V" + metadataVersion + ":" + params.Encode()
}

func UnmarshallCommentMetadata(s string) (*CommentMetadata, error) {
var metadataRegexp = regexp.MustCompile(metadataPrefix + `V([0-9+]):([^\s]+)`)

rx := regexp.MustCompile(metadataPrefix + `V([0-9+]):([^\s]+)`)
func UnmarshallCommentMetadata(s string) (*CommentMetadata, error) {

groups := rx.FindStringSubmatch(s)
groups := metadataRegexp.FindStringSubmatch(s)

if len(groups) == 0 {
return nil, nil // no metadata, we return nil here, that's not an error
Expand Down
1 change: 1 addition & 0 deletions quesma/quesma/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type QuesmaConfiguration struct {
DefaultIngestTarget []string
DefaultQueryTarget []string
DefaultIngestOptimizers map[string]OptimizerConfiguration
DefaultQueryOptimizers map[string]OptimizerConfiguration
}

func (c *QuesmaConfiguration) AliasFields(indexName string) map[string]string {
Expand Down
14 changes: 13 additions & 1 deletion quesma/quesma/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,13 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
conf.DefaultIngestTarget = []string{}
conf.DefaultQueryTarget = defaultConfig.QueryTarget
conf.AutodiscoveryEnabled = slices.Contains(conf.DefaultQueryTarget, ClickhouseTarget)

if defaultQueryConfig, ok := queryProcessor.Config.IndexConfig[DefaultWildcardIndexName]; ok {
conf.DefaultQueryOptimizers = defaultQueryConfig.Optimizers
} else {
conf.DefaultQueryOptimizers = nil
}

delete(queryProcessor.Config.IndexConfig, DefaultWildcardIndexName)

for indexName, indexConfig := range queryProcessor.Config.IndexConfig {
Expand Down Expand Up @@ -727,6 +734,12 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
conf.DefaultQueryTarget = defaultConfig.QueryTarget
conf.AutodiscoveryEnabled = slices.Contains(conf.DefaultQueryTarget, ClickhouseTarget)

if defaultQueryConfig, ok := queryProcessor.Config.IndexConfig[DefaultWildcardIndexName]; ok {
conf.DefaultQueryOptimizers = defaultQueryConfig.Optimizers
} else {
conf.DefaultQueryOptimizers = nil
}

delete(queryProcessor.Config.IndexConfig, DefaultWildcardIndexName)

for indexName, indexConfig := range queryProcessor.Config.IndexConfig {
Expand Down Expand Up @@ -831,7 +844,6 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
}

END:

for _, idxCfg := range conf.IndexConfig {
if idxCfg.UseCommonTable {
conf.CreateCommonTable = true
Expand Down
53 changes: 51 additions & 2 deletions quesma/quesma/schema_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,25 @@ func (s *SchemaCheckPass) applyMapTransformations(indexSchema schema.Schema, que
return query, nil
}

func (s *SchemaCheckPass) computeListIndexPrefixesToGroup() []string {

const groupByCommonTableIndexes = "group_common_table_indexes"

var groupIndexesPrefix []string
if s.cfg.DefaultQueryOptimizers != nil {
if opt, ok := s.cfg.DefaultQueryOptimizers[groupByCommonTableIndexes]; ok {
if !opt.Disabled {
for k, v := range opt.Properties {
if v != "false" {
groupIndexesPrefix = append(groupIndexesPrefix, k)
}
}
}
}
}
return groupIndexesPrefix
}

func (s *SchemaCheckPass) applyPhysicalFromExpression(currentSchema schema.Schema, query *model.Query) (*model.Query, error) {

if query.TableName == model.SingleTableNamePlaceHolder {
Expand Down Expand Up @@ -423,10 +442,40 @@ func (s *SchemaCheckPass) applyPhysicalFromExpression(currentSchema schema.Schem
// add filter for common table, if needed
if useCommonTable && from == physicalFromExpression {

var indexWhere []model.Expr
orExpression := make(map[string]model.Expr)

groupIndexesPrefix := s.computeListIndexPrefixesToGroup()

for _, indexName := range query.Indexes {
indexWhere = append(indexWhere, model.NewInfixExpr(model.NewColumnRef(common_table.IndexNameColumn), "=", model.NewLiteral(fmt.Sprintf("'%s'", indexName))))
var added bool

// apply optimization here
if len(groupIndexesPrefix) > 0 {
for _, prefix := range groupIndexesPrefix {
if strings.HasPrefix(indexName, prefix) {
added = true
if _, ok := orExpression[prefix]; !ok {
orExpression[prefix] = model.NewFunction("startsWith", model.NewColumnRef(common_table.IndexNameColumn), model.NewLiteral(fmt.Sprintf("'%s'", prefix)))
}
}
}
}

if !added {
orExpression[indexName] = model.NewInfixExpr(model.NewColumnRef(common_table.IndexNameColumn), "=", model.NewLiteral(fmt.Sprintf("'%s'", indexName)))
}
}

// keep in the order
var orExpressionOrder []string
for k := range orExpression {
orExpressionOrder = append(orExpressionOrder, k)
}
sort.Strings(orExpressionOrder)

var indexWhere []model.Expr
for _, name := range orExpressionOrder {
indexWhere = append(indexWhere, orExpression[name])
}

indicesWhere := model.Or(indexWhere)
Expand Down
102 changes: 89 additions & 13 deletions quesma/quesma/schema_transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package quesma

import (
"github.com/QuesmaOrg/quesma/quesma/clickhouse"
"github.com/QuesmaOrg/quesma/quesma/common_table"
"github.com/QuesmaOrg/quesma/quesma/model"
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
"github.com/QuesmaOrg/quesma/quesma/schema"
Expand Down Expand Up @@ -670,11 +671,18 @@ func TestApplyWildCard(t *testing.T) {
func TestApplyPhysicalFromExpression(t *testing.T) {

indexConfig := map[string]config.IndexConfiguration{
"test": {},
"test": {},
"test2": {UseCommonTable: true},
"test3": {UseCommonTable: true},
}
cfg := config.QuesmaConfiguration{
IndexConfig: indexConfig,
}
DefaultQueryOptimizers: map[string]config.OptimizerConfiguration{
"group_common_table_indexes": {
Disabled: false,
Properties: map[string]string{
"daily-": "true",
}}}}

lm := clickhouse.NewLogManagerEmpty()

Expand Down Expand Up @@ -704,23 +712,24 @@ func TestApplyPhysicalFromExpression(t *testing.T) {
td.Store(tableDefinition.Name, &tableDefinition)

s := schema.NewSchemaRegistry(tableDiscovery, &cfg, clickhouse.SchemaTypeAdapter{})
transform := NewSchemaCheckPass(&config.QuesmaConfiguration{IndexConfig: indexConfig}, nil, defaultSearchAfterStrategy)
transform := NewSchemaCheckPass(&cfg, nil, defaultSearchAfterStrategy)

tests := []struct {
name string
indexes []string // default is []string{"test"}
input model.SelectCommand
expected model.SelectCommand
}{
{
"single table",
model.SelectCommand{
name: "single table",
input: model.SelectCommand{
FromClause: model.NewTableRef(model.SingleTableNamePlaceHolder),
Columns: []model.Expr{
model.NewColumnRef("a"),
model.NewCountFunc(),
},
},
model.SelectCommand{
expected: model.SelectCommand{
FromClause: model.NewTableRef("test"),
Columns: []model.Expr{
model.NewColumnRef("a"),
Expand All @@ -730,8 +739,69 @@ func TestApplyPhysicalFromExpression(t *testing.T) {
},

{
"cte with fixed table name",
model.SelectCommand{
name: "single table with common table",
indexes: []string{"test2"},
input: model.SelectCommand{
FromClause: model.NewTableRef(model.SingleTableNamePlaceHolder),
Columns: []model.Expr{
model.NewColumnRef("a"),
model.NewCountFunc(),
},
},
expected: model.SelectCommand{
FromClause: model.NewTableRef(common_table.TableName),
Columns: []model.Expr{
model.NewColumnRef("a"),
model.NewCountFunc(),
},
WhereClause: model.NewInfixExpr(model.NewColumnRef(common_table.IndexNameColumn), "=", model.NewLiteral("'test2'")),
},
},

{
name: "two tables with common table",
indexes: []string{"test2", "test3"},
input: model.SelectCommand{
FromClause: model.NewTableRef(model.SingleTableNamePlaceHolder),
Columns: []model.Expr{
model.NewColumnRef("a"),
model.NewCountFunc(),
},
},
expected: model.SelectCommand{
FromClause: model.NewTableRef(common_table.TableName),
Columns: []model.Expr{
model.NewColumnRef("a"),
model.NewCountFunc(),
},
WhereClause: model.Or([]model.Expr{model.NewInfixExpr(model.NewColumnRef(common_table.IndexNameColumn), "=", model.NewLiteral("'test2'")),
model.NewInfixExpr(model.NewColumnRef(common_table.IndexNameColumn), "=", model.NewLiteral("'test3'"))}),
},
},

{
name: "two daily tables tables with common table (group_common_table_indexes optimizer)",
indexes: []string{"daily-1", "daily-2"},
input: model.SelectCommand{
FromClause: model.NewTableRef(model.SingleTableNamePlaceHolder),
Columns: []model.Expr{
model.NewColumnRef("a"),
model.NewCountFunc(),
},
},
expected: model.SelectCommand{
FromClause: model.NewTableRef(common_table.TableName),
Columns: []model.Expr{
model.NewColumnRef("a"),
model.NewCountFunc(),
},
WhereClause: model.NewFunction("startsWith", model.NewColumnRef(common_table.IndexNameColumn), model.NewLiteral("'daily-'")),
},
},

{
name: "cte with fixed table name",
input: model.SelectCommand{
FromClause: model.NewTableRef(model.SingleTableNamePlaceHolder),
Columns: []model.Expr{
model.NewColumnRef("a"),
Expand All @@ -749,7 +819,7 @@ func TestApplyPhysicalFromExpression(t *testing.T) {
},
},
},
model.SelectCommand{
expected: model.SelectCommand{
FromClause: model.NewTableRef("test"),
Columns: []model.Expr{
model.NewColumnRef("a"),
Expand All @@ -770,8 +840,8 @@ func TestApplyPhysicalFromExpression(t *testing.T) {
},

{
"cte with table name",
model.SelectCommand{
name: "cte with table name",
input: model.SelectCommand{
FromClause: model.NewTableRef(model.SingleTableNamePlaceHolder),
Columns: []model.Expr{
model.NewColumnRef("order_date"),
Expand All @@ -789,7 +859,7 @@ func TestApplyPhysicalFromExpression(t *testing.T) {
},
},
},
model.SelectCommand{
expected: model.SelectCommand{
FromClause: model.NewTableRef("test"),
Columns: []model.Expr{
model.NewColumnRef("order_date"),
Expand Down Expand Up @@ -817,11 +887,17 @@ func TestApplyPhysicalFromExpression(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

indexes := tt.indexes
if len(indexes) == 0 {
indexes = []string{"test"}
}

query := &model.Query{
TableName: "test",
SelectCommand: tt.input,
Schema: indexSchema,
Indexes: []string{"test"},
Indexes: indexes,
}

expectedAsString := model.AsString(tt.expected)
Expand Down
Loading
Loading