From f53610fd7fb1902121d0b73cff79cece7abac152 Mon Sep 17 00:00:00 2001
From: Przemyslaw Delewski <102958445+pdelewski@users.noreply.github.com>
Date: Tue, 22 Oct 2024 15:27:20 +0200
Subject: [PATCH] Fixing elastic fallback in security mode (#902)
This PR fixes some regressions with elastic fallback in security mode.
before:
after:
---
quesma/elasticsearch/index_management.go | 6 +++---
quesma/elasticsearch/index_resolver.go | 17 +++++++----------
quesma/main.go | 2 +-
quesma/quesma/functionality/resolve/resolve.go | 2 +-
quesma/table_resolver/rules.go | 13 +++++--------
quesma/table_resolver/table_resolver.go | 5 +++++
6 files changed, 22 insertions(+), 23 deletions(-)
diff --git a/quesma/elasticsearch/index_management.go b/quesma/elasticsearch/index_management.go
index 99941a6c7..57570a2f8 100644
--- a/quesma/elasticsearch/index_management.go
+++ b/quesma/elasticsearch/index_management.go
@@ -34,10 +34,10 @@ type (
}
)
-func NewIndexManagement(elasticsearchUrl string) IndexManagement {
+func NewIndexManagement(elasticsearch config.ElasticsearchConfiguration) IndexManagement {
return &indexManagement{
- ElasticsearchUrl: elasticsearchUrl,
- indexResolver: NewIndexResolver(elasticsearchUrl),
+ ElasticsearchUrl: elasticsearch.Url.String(),
+ indexResolver: NewIndexResolver(elasticsearch),
}
}
diff --git a/quesma/elasticsearch/index_resolver.go b/quesma/elasticsearch/index_resolver.go
index 8ae2166b0..dd70ed1bf 100644
--- a/quesma/elasticsearch/index_resolver.go
+++ b/quesma/elasticsearch/index_resolver.go
@@ -3,10 +3,11 @@
package elasticsearch
import (
- "bytes"
+ "context"
"encoding/json"
"io"
"net/http"
+ "quesma/quesma/config"
)
type (
@@ -15,7 +16,7 @@ type (
}
indexResolver struct {
Url string
- httpClient *http.Client
+ httpClient *SimpleClient
}
Sources struct {
Indices []Index `json:"indices"`
@@ -37,10 +38,10 @@ type (
}
)
-func NewIndexResolver(elasticsearchUrl string) IndexResolver {
+func NewIndexResolver(elasticsearch config.ElasticsearchConfiguration) IndexResolver {
return &indexResolver{
- Url: elasticsearchUrl,
- httpClient: &http.Client{},
+ Url: elasticsearch.Url.String(),
+ httpClient: NewSimpleClient(&elasticsearch),
}
}
@@ -52,11 +53,7 @@ func NormalizePattern(p string) string {
}
func (im *indexResolver) Resolve(indexPattern string) (Sources, bool, error) {
- req, err := http.NewRequest("GET", im.Url+"/_resolve/index/"+indexPattern+"?expand_wildcards=open", bytes.NewBuffer([]byte{}))
- if err != nil {
- return Sources{}, false, err
- }
- response, err := im.httpClient.Do(req)
+ response, err := im.httpClient.Request(context.Background(), "GET", "_resolve/index/"+indexPattern+"?expand_wildcards=open", []byte{})
if err != nil {
return Sources{}, false, err
}
diff --git a/quesma/main.go b/quesma/main.go
index 1ca692155..0a5889e4c 100644
--- a/quesma/main.go
+++ b/quesma/main.go
@@ -90,7 +90,7 @@ func main() {
tableDisco := clickhouse.NewTableDiscovery(&cfg, connectionPool, virtualTableStorage)
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.SchemaTypeAdapter{})
- im := elasticsearch.NewIndexManagement(cfg.Elasticsearch.Url.String())
+ im := elasticsearch.NewIndexManagement(cfg.Elasticsearch)
connManager := connectors.NewConnectorManager(&cfg, connectionPool, phoneHomeAgent, tableDisco)
lm := connManager.GetConnector()
diff --git a/quesma/quesma/functionality/resolve/resolve.go b/quesma/quesma/functionality/resolve/resolve.go
index 7ff6e804c..38cc4ccb0 100644
--- a/quesma/quesma/functionality/resolve/resolve.go
+++ b/quesma/quesma/functionality/resolve/resolve.go
@@ -42,7 +42,7 @@ func HandleResolve(pattern string, sr schema.Registry, cfg *config.QuesmaConfigu
// Combine results from both schema.Registry and Elasticsearch:
// todo avoid creating new instances all the time
- sourcesFromElastic, _, err := elasticsearch.NewIndexResolver(cfg.Elasticsearch.Url.String()).Resolve(normalizedPattern)
+ sourcesFromElastic, _, err := elasticsearch.NewIndexResolver(cfg.Elasticsearch).Resolve(normalizedPattern)
if err != nil {
return elasticsearch.Sources{}, err
}
diff --git a/quesma/table_resolver/rules.go b/quesma/table_resolver/rules.go
index d174f340b..45fa936c9 100644
--- a/quesma/table_resolver/rules.go
+++ b/quesma/table_resolver/rules.go
@@ -197,15 +197,12 @@ func (r *tableRegistryImpl) makeCheckIfPatternMatchesAllConnectors(pipeline stri
}
// but maybe we should also check against the actual indexes ??
-
- if r.conf.AutodiscoveryEnabled {
-
- for indexName := range r.elasticIndexes {
- if util.IndexPatternMatches(pattern, indexName) {
- matchedElastic = append(matchedElastic, indexName)
- }
+ for indexName := range r.elasticIndexes {
+ if util.IndexPatternMatches(pattern, indexName) {
+ matchedElastic = append(matchedElastic, indexName)
}
-
+ }
+ if r.conf.AutodiscoveryEnabled {
for tableName := range r.clickhouseIndexes {
if util.IndexPatternMatches(pattern, tableName) {
matchedClickhouse = append(matchedClickhouse, tableName)
diff --git a/quesma/table_resolver/table_resolver.go b/quesma/table_resolver/table_resolver.go
index 0972e13f8..05bf46914 100644
--- a/quesma/table_resolver/table_resolver.go
+++ b/quesma/table_resolver/table_resolver.go
@@ -171,6 +171,11 @@ func (r *tableRegistryImpl) updateIndexes() {
name: index.Name,
}
}
+ for _, index := range sources.DataStreams {
+ elasticIndexes[index.Name] = table{
+ name: index.Name,
+ }
+ }
logger.Info().Msgf("Elastic tables updated: %v", elasticIndexes)
r.elasticIndexes = elasticIndexes