diff --git a/quesma/elasticsearch/index_management.go b/quesma/elasticsearch/index_management.go index 99941a6c7..57570a2f8 100644 --- a/quesma/elasticsearch/index_management.go +++ b/quesma/elasticsearch/index_management.go @@ -34,10 +34,10 @@ type ( } ) -func NewIndexManagement(elasticsearchUrl string) IndexManagement { +func NewIndexManagement(elasticsearch config.ElasticsearchConfiguration) IndexManagement { return &indexManagement{ - ElasticsearchUrl: elasticsearchUrl, - indexResolver: NewIndexResolver(elasticsearchUrl), + ElasticsearchUrl: elasticsearch.Url.String(), + indexResolver: NewIndexResolver(elasticsearch), } } diff --git a/quesma/elasticsearch/index_resolver.go b/quesma/elasticsearch/index_resolver.go index 8ae2166b0..dd70ed1bf 100644 --- a/quesma/elasticsearch/index_resolver.go +++ b/quesma/elasticsearch/index_resolver.go @@ -3,10 +3,11 @@ package elasticsearch import ( - "bytes" + "context" "encoding/json" "io" "net/http" + "quesma/quesma/config" ) type ( @@ -15,7 +16,7 @@ type ( } indexResolver struct { Url string - httpClient *http.Client + httpClient *SimpleClient } Sources struct { Indices []Index `json:"indices"` @@ -37,10 +38,10 @@ type ( } ) -func NewIndexResolver(elasticsearchUrl string) IndexResolver { +func NewIndexResolver(elasticsearch config.ElasticsearchConfiguration) IndexResolver { return &indexResolver{ - Url: elasticsearchUrl, - httpClient: &http.Client{}, + Url: elasticsearch.Url.String(), + httpClient: NewSimpleClient(&elasticsearch), } } @@ -52,11 +53,7 @@ func NormalizePattern(p string) string { } func (im *indexResolver) Resolve(indexPattern string) (Sources, bool, error) { - req, err := http.NewRequest("GET", im.Url+"/_resolve/index/"+indexPattern+"?expand_wildcards=open", bytes.NewBuffer([]byte{})) - if err != nil { - return Sources{}, false, err - } - response, err := im.httpClient.Do(req) + response, err := im.httpClient.Request(context.Background(), "GET", "_resolve/index/"+indexPattern+"?expand_wildcards=open", []byte{}) if err != nil { return Sources{}, false, err } diff --git a/quesma/main.go b/quesma/main.go index 1ca692155..0a5889e4c 100644 --- a/quesma/main.go +++ b/quesma/main.go @@ -90,7 +90,7 @@ func main() { tableDisco := clickhouse.NewTableDiscovery(&cfg, connectionPool, virtualTableStorage) schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.SchemaTypeAdapter{}) - im := elasticsearch.NewIndexManagement(cfg.Elasticsearch.Url.String()) + im := elasticsearch.NewIndexManagement(cfg.Elasticsearch) connManager := connectors.NewConnectorManager(&cfg, connectionPool, phoneHomeAgent, tableDisco) lm := connManager.GetConnector() diff --git a/quesma/quesma/functionality/resolve/resolve.go b/quesma/quesma/functionality/resolve/resolve.go index 7ff6e804c..38cc4ccb0 100644 --- a/quesma/quesma/functionality/resolve/resolve.go +++ b/quesma/quesma/functionality/resolve/resolve.go @@ -42,7 +42,7 @@ func HandleResolve(pattern string, sr schema.Registry, cfg *config.QuesmaConfigu // Combine results from both schema.Registry and Elasticsearch: // todo avoid creating new instances all the time - sourcesFromElastic, _, err := elasticsearch.NewIndexResolver(cfg.Elasticsearch.Url.String()).Resolve(normalizedPattern) + sourcesFromElastic, _, err := elasticsearch.NewIndexResolver(cfg.Elasticsearch).Resolve(normalizedPattern) if err != nil { return elasticsearch.Sources{}, err } diff --git a/quesma/table_resolver/rules.go b/quesma/table_resolver/rules.go index d174f340b..45fa936c9 100644 --- a/quesma/table_resolver/rules.go +++ b/quesma/table_resolver/rules.go @@ -197,15 +197,12 @@ func (r *tableRegistryImpl) makeCheckIfPatternMatchesAllConnectors(pipeline stri } // but maybe we should also check against the actual indexes ?? - - if r.conf.AutodiscoveryEnabled { - - for indexName := range r.elasticIndexes { - if util.IndexPatternMatches(pattern, indexName) { - matchedElastic = append(matchedElastic, indexName) - } + for indexName := range r.elasticIndexes { + if util.IndexPatternMatches(pattern, indexName) { + matchedElastic = append(matchedElastic, indexName) } - + } + if r.conf.AutodiscoveryEnabled { for tableName := range r.clickhouseIndexes { if util.IndexPatternMatches(pattern, tableName) { matchedClickhouse = append(matchedClickhouse, tableName) diff --git a/quesma/table_resolver/table_resolver.go b/quesma/table_resolver/table_resolver.go index 0972e13f8..05bf46914 100644 --- a/quesma/table_resolver/table_resolver.go +++ b/quesma/table_resolver/table_resolver.go @@ -171,6 +171,11 @@ func (r *tableRegistryImpl) updateIndexes() { name: index.Name, } } + for _, index := range sources.DataStreams { + elasticIndexes[index.Name] = table{ + name: index.Name, + } + } logger.Info().Msgf("Elastic tables updated: %v", elasticIndexes) r.elasticIndexes = elasticIndexes