diff --git a/quesma/clickhouse/table_discovery.go b/quesma/clickhouse/table_discovery.go index 6a73c3b4f..b77792fc7 100644 --- a/quesma/clickhouse/table_discovery.go +++ b/quesma/clickhouse/table_discovery.go @@ -11,11 +11,13 @@ import ( "github.com/QuesmaOrg/quesma/quesma/logger" "github.com/QuesmaOrg/quesma/quesma/persistence" "github.com/QuesmaOrg/quesma/quesma/quesma/config" + "github.com/QuesmaOrg/quesma/quesma/quesma/types" "github.com/QuesmaOrg/quesma/quesma/schema" "github.com/QuesmaOrg/quesma/quesma/util" quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core" "github.com/goccy/go-json" "strings" + "sync" "sync/atomic" "time" ) @@ -34,12 +36,15 @@ func (d DbKind) String() string { type TableDiscovery interface { ReloadTableDefinitions() TableDefinitions() *TableMap + AddTable(tableName string, table *Table) TableDefinitionsFetchError() error LastAccessTime() time.Time LastReloadTime() time.Time ForceReloadCh() <-chan chan<- struct{} AutodiscoveryEnabled() bool + + RegisterTablesReloadListener(ch chan<- types.ReloadMessage) } type tableDiscovery struct { @@ -51,6 +56,9 @@ type tableDiscovery struct { forceReloadCh chan chan<- struct{} ReloadTablesError error virtualTableStorage persistence.JSONDatabase + + reloadObserversMutex sync.Mutex + reloadObservers []chan<- types.ReloadMessage } type columnMetadata struct { @@ -79,6 +87,10 @@ type TableDiscoveryTableProviderAdapter struct { TableDiscovery } +func (t TableDiscoveryTableProviderAdapter) RegisterTablesReloadListener(ch chan<- types.ReloadMessage) { + t.TableDiscovery.RegisterTablesReloadListener(ch) +} + func (t TableDiscoveryTableProviderAdapter) TableDefinitions() map[string]schema.Table { // here we filter out our internal columns @@ -125,6 +137,31 @@ func NewTableDiscoveryWith(cfg *config.QuesmaConfiguration, dbConnPool quesma_ap return result } +func (td *tableDiscovery) AddTable(tableName string, table *Table) { + td.tableDefinitions.Load().Store(tableName, table) + td.notifyObservers() +} + +func (td *tableDiscovery) RegisterTablesReloadListener(ch chan<- types.ReloadMessage) { + td.reloadObserversMutex.Lock() + defer td.reloadObserversMutex.Unlock() + td.reloadObservers = append(td.reloadObservers, ch) +} + +func (td *tableDiscovery) notifyObservers() { + + td.reloadObserversMutex.Lock() + defer td.reloadObserversMutex.Unlock() + + msg := types.ReloadMessage{Timestamp: time.Now()} + for _, observer := range td.reloadObservers { + fmt.Println("Sending message to observer", observer) + go func() { + observer <- msg + }() + } +} + func (td *tableDiscovery) TableDefinitionsFetchError() error { return td.ReloadTablesError } @@ -178,6 +215,8 @@ func (td *tableDiscovery) ReloadTableDefinitions() { td.ReloadTablesError = nil td.populateTableDefinitions(configuredTables, databaseName, td.cfg) + + td.notifyObservers() } func (td *tableDiscovery) readVirtualTables(configuredTables map[string]discoveredTable) map[string]discoveredTable { @@ -634,6 +673,9 @@ func NewEmptyTableDiscovery() *EmptyTableDiscovery { } } +func (td *EmptyTableDiscovery) RegisterTablesReloadListener(ch chan<- types.ReloadMessage) { +} + func (td *EmptyTableDiscovery) ReloadTableDefinitions() { } @@ -660,3 +702,7 @@ func (td *EmptyTableDiscovery) ForceReloadCh() <-chan chan<- struct{} { func (td *EmptyTableDiscovery) AutodiscoveryEnabled() bool { return td.Autodiscovery } + +func (td *EmptyTableDiscovery) AddTable(tableName string, table *Table) { + td.TableMap.Store(tableName, table) +} diff --git a/quesma/ingest/common_table_test.go b/quesma/ingest/common_table_test.go index 6c0573127..6357703b5 100644 --- a/quesma/ingest/common_table_test.go +++ b/quesma/ingest/common_table_test.go @@ -191,6 +191,8 @@ func TestIngestToCommonTable(t *testing.T) { tableDisco := clickhouse.NewTableDiscovery(quesmaConfig, db, virtualTableStorage) schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, quesmaConfig, clickhouse.SchemaTypeAdapter{}) + schemaRegistry.Start() + defer schemaRegistry.Stop() resolver := table_resolver.NewEmptyTableResolver() diff --git a/quesma/ingest/processor.go b/quesma/ingest/processor.go index d974d8664..243b86a68 100644 --- a/quesma/ingest/processor.go +++ b/quesma/ingest/processor.go @@ -962,7 +962,7 @@ func (ip *IngestProcessor) AddTableIfDoesntExist(table *chLib.Table) bool { logger.Error().Msgf("error storing virtual table: %v", err) } } - ip.tableDiscovery.TableDefinitions().Store(table.Name, table) + ip.tableDiscovery.AddTable(table.Name, table) return true } wasntCreated := !t.Created diff --git a/quesma/main.go b/quesma/main.go index eb730c84a..c940110a7 100644 --- a/quesma/main.go +++ b/quesma/main.go @@ -107,6 +107,7 @@ func main() { virtualTableStorage := persistence.NewElasticJSONDatabase(cfg.Elasticsearch, common_table.VirtualTableElasticIndexName) tableDisco := clickhouse.NewTableDiscovery(&cfg, connectionPool, virtualTableStorage) schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.SchemaTypeAdapter{}) + schemaRegistry.Start() im := elasticsearch.NewIndexManagement(cfg.Elasticsearch) @@ -146,6 +147,7 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() + schemaRegistry.Stop() feature.NotSupportedLogger.Stop() phoneHomeAgent.Stop(ctx) lm.Stop() diff --git a/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go b/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go index 84a7a9402..f13eb8ed8 100644 --- a/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go +++ b/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go @@ -104,6 +104,7 @@ func (p *ElasticsearchToClickHouseIngestProcessor) prepareTemporaryIngestProcess virtualTableStorage := persistence.NewElasticJSONDatabase(esBackendConn.GetConfig(), common_table.VirtualTableElasticIndexName) tableDisco := clickhouse.NewTableDiscovery(oldQuesmaConfig, connectionPool, virtualTableStorage) schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, oldQuesmaConfig, clickhouse.SchemaTypeAdapter{}) + schemaRegistry.Start() dummyTableResolver := table_resolver.NewDummyTableResolver() diff --git a/quesma/processors/es_to_ch_query/elasticsearch_to_clickhouse_query_processor.go b/quesma/processors/es_to_ch_query/elasticsearch_to_clickhouse_query_processor.go index 73ac31e9b..c7d50567a 100644 --- a/quesma/processors/es_to_ch_query/elasticsearch_to_clickhouse_query_processor.go +++ b/quesma/processors/es_to_ch_query/elasticsearch_to_clickhouse_query_processor.go @@ -98,6 +98,7 @@ func (p *ElasticsearchToClickHouseQueryProcessor) prepareTemporaryQueryProcessor virtualTableStorage := persistence.NewElasticJSONDatabase(esBackendConn.GetConfig(), common_table.VirtualTableElasticIndexName) tableDisco := clickhouse.NewTableDiscovery(oldQuesmaConfig, connectionPool, virtualTableStorage) schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, oldQuesmaConfig, clickhouse.SchemaTypeAdapter{}) + schemaRegistry.Start() logManager := clickhouse.NewEmptyLogManager(oldQuesmaConfig, connectionPool, phoneHomeAgent, tableDisco) logManager.Start() diff --git a/quesma/quesma/router_test.go b/quesma/quesma/router_test.go index 1ae229028..ce45251cb 100644 --- a/quesma/quesma/router_test.go +++ b/quesma/quesma/router_test.go @@ -280,7 +280,12 @@ func TestConfigureRouter(t *testing.T) { }, } tr := TestTableResolver{} - testRouter := ConfigureRouter(cfg, schema.NewSchemaRegistry(fixedTableProvider{}, cfg, clickhouse.SchemaTypeAdapter{}), &clickhouse.LogManager{}, &ingest.IngestProcessor{}, &ui.QuesmaManagementConsole{}, telemetry.NewPhoneHomeAgent(cfg, nil, ""), &QueryRunner{}, tr, nil) + + schemaRegistry := schema.NewSchemaRegistry(fixedTableProvider{}, cfg, clickhouse.SchemaTypeAdapter{}) + schemaRegistry.Start() + defer schemaRegistry.Stop() + + testRouter := ConfigureRouter(cfg, schemaRegistry, &clickhouse.LogManager{}, &ingest.IngestProcessor{}, &ui.QuesmaManagementConsole{}, telemetry.NewPhoneHomeAgent(cfg, nil, ""), &QueryRunner{}, tr, nil) tests := []struct { path string diff --git a/quesma/quesma/schema_transformer_test.go b/quesma/quesma/schema_transformer_test.go index 2ecff921f..bf2eace13 100644 --- a/quesma/quesma/schema_transformer_test.go +++ b/quesma/quesma/schema_transformer_test.go @@ -6,6 +6,7 @@ import ( "github.com/QuesmaOrg/quesma/quesma/clickhouse" "github.com/QuesmaOrg/quesma/quesma/model" "github.com/QuesmaOrg/quesma/quesma/quesma/config" + "github.com/QuesmaOrg/quesma/quesma/quesma/types" "github.com/QuesmaOrg/quesma/quesma/schema" "github.com/stretchr/testify/assert" "strconv" @@ -19,8 +20,8 @@ type fixedTableProvider struct { func (f fixedTableProvider) TableDefinitions() map[string]schema.Table { return f.tables } - -func (f fixedTableProvider) AutodiscoveryEnabled() bool { return false } +func (f fixedTableProvider) AutodiscoveryEnabled() bool { return false } +func (f fixedTableProvider) RegisterTablesReloadListener(chan<- types.ReloadMessage) {} func Test_ipRangeTransform(t *testing.T) { const isIPAddressInRangePrimitive = "isIPAddressInRange" @@ -86,6 +87,8 @@ func Test_ipRangeTransform(t *testing.T) { TableName: "kibana_sample_data_logs_nested", FieldName: "nested.clientip"}: "nested_clientip", } s := schema.NewSchemaRegistry(tableProvider, &cfg, clickhouse.SchemaTypeAdapter{}) + s.Start() + defer s.Stop() transform := NewSchemaCheckPass(&cfg, tableDiscovery, defaultSearchAfterStrategy) s.UpdateFieldEncodings(fieldEncodings) @@ -704,6 +707,8 @@ func TestApplyPhysicalFromExpression(t *testing.T) { td.Store(tableDefinition.Name, &tableDefinition) s := schema.NewSchemaRegistry(tableDiscovery, &cfg, clickhouse.SchemaTypeAdapter{}) + s.Start() + defer s.Stop() transform := NewSchemaCheckPass(&config.QuesmaConfiguration{IndexConfig: indexConfig}, nil, defaultSearchAfterStrategy) tests := []struct { @@ -964,6 +969,8 @@ func TestFullTextFields(t *testing.T) { } s := schema.NewSchemaRegistry(tableDiscovery, &cfg, clickhouse.SchemaTypeAdapter{}) + s.Start() + defer s.Stop() transform := NewSchemaCheckPass(&config.QuesmaConfiguration{IndexConfig: indexConfig}, nil, defaultSearchAfterStrategy) indexSchema, ok := s.FindSchema("test") @@ -1071,6 +1078,9 @@ func Test_applyMatchOperator(t *testing.T) { } s := schema.NewSchemaRegistry(tableDiscovery, &cfg, clickhouse.SchemaTypeAdapter{}) + s.Start() + defer s.Stop() + transform := NewSchemaCheckPass(&cfg, nil, defaultSearchAfterStrategy) indexSchema, ok := s.FindSchema("test") @@ -1171,6 +1181,8 @@ func Test_checkAggOverUnsupportedType(t *testing.T) { } s := schema.NewSchemaRegistry(tableDiscovery, &cfg, clickhouse.SchemaTypeAdapter{}) + s.Start() + defer s.Stop() transform := NewSchemaCheckPass(&cfg, nil, defaultSearchAfterStrategy) indexSchema, ok := s.FindSchema("test") diff --git a/quesma/quesma/types/messages.go b/quesma/quesma/types/messages.go new file mode 100644 index 000000000..e7e042cd3 --- /dev/null +++ b/quesma/quesma/types/messages.go @@ -0,0 +1,10 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package types + +import "time" + +type ReloadMessage struct { + Timestamp time.Time +} diff --git a/quesma/schema/registry.go b/quesma/schema/registry.go index cc8a9c02a..a4fb4a3e8 100644 --- a/quesma/schema/registry.go +++ b/quesma/schema/registry.go @@ -6,14 +6,20 @@ import ( "github.com/QuesmaOrg/quesma/quesma/comment_metadata" "github.com/QuesmaOrg/quesma/quesma/logger" "github.com/QuesmaOrg/quesma/quesma/quesma/config" + "github.com/QuesmaOrg/quesma/quesma/quesma/recovery" + "github.com/QuesmaOrg/quesma/quesma/quesma/types" "github.com/QuesmaOrg/quesma/quesma/util" "sync" + "time" ) // TODO we should rethink naming and types used in this package type ( Registry interface { + Start() + Stop() + AllSchemas() map[IndexName]Schema FindSchema(name IndexName) (Schema, bool) UpdateFieldsOrigins(name IndexName, fields map[FieldName]FieldSource) @@ -28,20 +34,26 @@ type ( } EncodedFieldName string schemaRegistry struct { + sync.RWMutex // this lock is used to protect all the fields below + // locking is done in public methods only to avoid deadlocks + // index configuration overrides always take precedence indexConfiguration *map[string]config.IndexConfiguration dataSourceTableProvider TableProvider dataSourceTypeAdapter typeAdapter dynamicConfiguration map[string]Table - fieldEncodingsLock sync.RWMutex fieldEncodings map[FieldEncodingKey]EncodedFieldName - fieldOriginsLock sync.RWMutex fieldOrigins map[IndexName]map[FieldName]FieldSource + + cachedSchemas map[IndexName]Schema + + doneCh chan struct{} } typeAdapter interface { Convert(string) (QuesmaType, bool) } TableProvider interface { + RegisterTablesReloadListener(chan<- types.ReloadMessage) TableDefinitions() map[string]Table AutodiscoveryEnabled() bool } @@ -58,13 +70,13 @@ type ( func (s *schemaRegistry) getInternalToPublicFieldEncodings(tableName string) map[EncodedFieldName]string { fieldsEncodingsPerIndex := make(map[string]EncodedFieldName) - s.fieldEncodingsLock.RLock() + for key, value := range s.fieldEncodings { if key.TableName == tableName { fieldsEncodingsPerIndex[key.FieldName] = EncodedFieldName(value) } } - s.fieldEncodingsLock.RUnlock() + internalToPublicFieldsEncodings := make(map[EncodedFieldName]string) for key, value := range fieldsEncodingsPerIndex { @@ -74,6 +86,61 @@ func (s *schemaRegistry) getInternalToPublicFieldEncodings(tableName string) map return internalToPublicFieldsEncodings } +func (s *schemaRegistry) invalidateCache() { + s.cachedSchemas = nil +} + +func (s *schemaRegistry) Start() { + + notificationChannel := make(chan types.ReloadMessage, 1) + + s.dataSourceTableProvider.RegisterTablesReloadListener(notificationChannel) + + protectedReload := func() { + defer recovery.LogPanic() + s.Lock() + defer s.Unlock() + + s.invalidateCache() + } + + go func() { + // reload schemas every 5 minutes + // table_discovery can be disabled, so we need to reload schemas periodically just in case + ticker := time.NewTicker(5 * time.Minute) + for { + select { + case <-notificationChannel: + protectedReload() + + case <-ticker.C: + protectedReload() + + case <-s.doneCh: + return + } + } + }() +} + +func (s *schemaRegistry) Stop() { + s.doneCh <- struct{}{} +} + +func (s *schemaRegistry) loadOrGetSchemas() map[IndexName]Schema { + + if s.cachedSchemas == nil { + schema, err := s.loadSchemas() + if err != nil { + logger.Error().Err(err).Msg("error loading schema") + return make(map[IndexName]Schema) + } + s.cachedSchemas = schema + } + + return s.cachedSchemas +} + func (s *schemaRegistry) loadSchemas() (map[IndexName]Schema, error) { definitions := s.dataSourceTableProvider.TableDefinitions() schemas := make(map[IndexName]Schema) @@ -126,25 +193,27 @@ func (s *schemaRegistry) populateSchemaFromDynamicConfiguration(indexName string } func (s *schemaRegistry) AllSchemas() map[IndexName]Schema { - if schemas, err := s.loadSchemas(); err != nil { - logger.Error().Msgf("error loading schemas: %v", err) - return make(map[IndexName]Schema) - } else { - return schemas - } + s.Lock() + defer s.Unlock() + + return s.loadOrGetSchemas() } func (s *schemaRegistry) FindSchema(name IndexName) (Schema, bool) { - if schemas, err := s.loadSchemas(); err != nil { - logger.Error().Msgf("error loading schemas: %v", err) - return Schema{}, false - } else { - schema, found := schemas[name] - return schema, found - } + s.Lock() + defer s.Unlock() + + schemas := s.loadOrGetSchemas() + + schema, found := schemas[name] + return schema, found + } func (s *schemaRegistry) UpdateDynamicConfiguration(name IndexName, table Table) { + s.Lock() + defer s.Unlock() + s.dynamicConfiguration[name.AsString()] = table dynamicEncodings := make(map[FieldEncodingKey]EncodedFieldName) for _, column := range table.Columns { @@ -152,20 +221,29 @@ func (s *schemaRegistry) UpdateDynamicConfiguration(name IndexName, table Table) // Otherwise, they will be populated only based on ingested data which might not contain all the fields dynamicEncodings[FieldEncodingKey{TableName: name.AsString(), FieldName: column.Name}] = EncodedFieldName(util.FieldToColumnEncoder(column.Name)) } - s.UpdateFieldEncodings(dynamicEncodings) + s.updateFieldEncodingsInternal(dynamicEncodings) + s.invalidateCache() } -func (s *schemaRegistry) UpdateFieldEncodings(encodings map[FieldEncodingKey]EncodedFieldName) { - s.fieldEncodingsLock.Lock() - defer s.fieldEncodingsLock.Unlock() +func (s *schemaRegistry) updateFieldEncodingsInternal(encodings map[FieldEncodingKey]EncodedFieldName) { + for key, value := range encodings { s.fieldEncodings[key] = EncodedFieldName(value) } } +func (s *schemaRegistry) UpdateFieldEncodings(encodings map[FieldEncodingKey]EncodedFieldName) { + s.Lock() + defer s.Unlock() + + s.updateFieldEncodingsInternal(encodings) + s.invalidateCache() +} + func (s *schemaRegistry) GetFieldEncodings() map[FieldEncodingKey]EncodedFieldName { - s.fieldEncodingsLock.RLock() - defer s.fieldEncodingsLock.RUnlock() + s.RLock() + defer s.RUnlock() + fieldEncodings := make(map[FieldEncodingKey]EncodedFieldName) for key, value := range s.fieldEncodings { fieldEncodings[key] = EncodedFieldName(value) @@ -175,13 +253,16 @@ func (s *schemaRegistry) GetFieldEncodings() map[FieldEncodingKey]EncodedFieldNa } func NewSchemaRegistry(tableProvider TableProvider, configuration *config.QuesmaConfiguration, dataSourceTypeAdapter typeAdapter) Registry { - return &schemaRegistry{ + res := &schemaRegistry{ indexConfiguration: &configuration.IndexConfig, dataSourceTableProvider: tableProvider, dataSourceTypeAdapter: dataSourceTypeAdapter, dynamicConfiguration: make(map[string]Table), + cachedSchemas: nil, fieldEncodings: make(map[FieldEncodingKey]EncodedFieldName), + doneCh: make(chan struct{}), } + return res } func (s *schemaRegistry) populateSchemaFromStaticConfiguration(indexConfiguration config.IndexConfiguration, fields map[FieldName]Field) { @@ -283,7 +364,7 @@ func (s *schemaRegistry) removeGeoPhysicalFields(fields map[FieldName]Field) { } func (s *schemaRegistry) populateFieldsOrigins(indexName string, fields map[FieldName]Field) { - s.fieldOriginsLock.RLock() + if fieldOrigins, ok := s.fieldOrigins[IndexName(indexName)]; ok { for fieldName, field := range fields { if origin, ok := fieldOrigins[field.InternalPropertyName]; ok { @@ -292,12 +373,13 @@ func (s *schemaRegistry) populateFieldsOrigins(indexName string, fields map[Fiel } } } - s.fieldOriginsLock.RUnlock() + } func (s *schemaRegistry) UpdateFieldsOrigins(name IndexName, fields map[FieldName]FieldSource) { - s.fieldOriginsLock.Lock() - defer s.fieldOriginsLock.Unlock() + s.Lock() + defer s.Unlock() + if s.fieldOrigins == nil { s.fieldOrigins = make(map[IndexName]map[FieldName]FieldSource) } diff --git a/quesma/schema/registry_test.go b/quesma/schema/registry_test.go index 684b765ab..3970e115c 100644 --- a/quesma/schema/registry_test.go +++ b/quesma/schema/registry_test.go @@ -5,6 +5,7 @@ package schema_test import ( "github.com/QuesmaOrg/quesma/quesma/clickhouse" "github.com/QuesmaOrg/quesma/quesma/quesma/config" + "github.com/QuesmaOrg/quesma/quesma/quesma/types" "github.com/QuesmaOrg/quesma/quesma/schema" "github.com/k0kubun/pp" "github.com/stretchr/testify/assert" @@ -264,6 +265,9 @@ func Test_schemaRegistry_FindSchema(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := schema.NewSchemaRegistry(tt.tableDiscovery, &tt.cfg, clickhouse.SchemaTypeAdapter{}) + s.Start() + defer s.Stop() + resultSchema, resultFound := s.FindSchema(tt.tableName) if resultFound != tt.found { t.Errorf("FindSchema() got1 = %v, want %v", resultFound, tt.found) @@ -300,6 +304,8 @@ func Test_schemaRegistry_UpdateDynamicConfiguration(t *testing.T) { }} s := schema.NewSchemaRegistry(tableDiscovery, &cfg, clickhouse.SchemaTypeAdapter{}) + s.Start() + defer s.Stop() expectedSchema := schema.NewSchema(map[schema.FieldName]schema.Field{ "message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeKeyword, InternalPropertyType: "String"}, @@ -341,5 +347,6 @@ type fixedTableProvider struct { tables map[string]schema.Table } -func (f fixedTableProvider) TableDefinitions() map[string]schema.Table { return f.tables } -func (f fixedTableProvider) AutodiscoveryEnabled() bool { return false } +func (f fixedTableProvider) TableDefinitions() map[string]schema.Table { return f.tables } +func (f fixedTableProvider) AutodiscoveryEnabled() bool { return false } +func (f fixedTableProvider) RegisterTablesReloadListener(chan<- types.ReloadMessage) {} diff --git a/quesma/schema/static_registry.go b/quesma/schema/static_registry.go index 061db58f6..a5488cfa2 100644 --- a/quesma/schema/static_registry.go +++ b/quesma/schema/static_registry.go @@ -58,3 +58,6 @@ func (e *StaticRegistry) GetFieldEncodings() map[FieldEncodingKey]EncodedFieldNa func (e *StaticRegistry) UpdateFieldsOrigins(name IndexName, fields map[FieldName]FieldSource) { } + +func (e *StaticRegistry) Start() {} +func (e *StaticRegistry) Stop() {} diff --git a/quesma/table_resolver/table_resolver.go b/quesma/table_resolver/table_resolver.go index fa75b68c2..a21408022 100644 --- a/quesma/table_resolver/table_resolver.go +++ b/quesma/table_resolver/table_resolver.go @@ -10,10 +10,10 @@ import ( "github.com/QuesmaOrg/quesma/quesma/logger" "github.com/QuesmaOrg/quesma/quesma/quesma/config" "github.com/QuesmaOrg/quesma/quesma/quesma/recovery" + "github.com/QuesmaOrg/quesma/quesma/quesma/types" "github.com/QuesmaOrg/quesma/quesma/v2/core" "sort" "sync" - "time" ) type tableResolver interface { @@ -150,12 +150,6 @@ func (r *tableRegistryImpl) updateIndexes() { logger.Info().Msgf("Index registry updating state.") - // TODO how to interact with the table discovery ? - // right now we enforce the reload of the table definitions - // schema registry is doing the same - // we should inject list of tables into the resolver - r.tableDiscovery.ReloadTableDefinitions() - tableMap := r.tableDiscovery.TableDefinitions() clickhouseIndexes := make(map[string]table) @@ -209,6 +203,10 @@ func (r *tableRegistryImpl) Stop() { } func (r *tableRegistryImpl) Start() { + + notificationChannel := make(chan types.ReloadMessage, 1) + r.tableDiscovery.RegisterTablesReloadListener(notificationChannel) + go func() { defer recovery.LogPanic() logger.Info().Msg("Table resolve started.") @@ -217,7 +215,7 @@ func (r *tableRegistryImpl) Start() { select { case <-r.ctx.Done(): return - case <-time.After(1 * time.Minute): + case <-notificationChannel: r.updateState() } }