Skip to content

Commit

Permalink
Fixing elastic fallback in security mode (#902)
Browse files Browse the repository at this point in the history
This PR fixes some regressions with elastic fallback in security mode.
before:
<img width="1620" alt="image"
src="https://github.com/user-attachments/assets/da408780-5a51-46fb-8a52-64d152f3cd75">
after:
<img width="1721" alt="image"
src="https://github.com/user-attachments/assets/82df7324-2a5b-470a-9b99-b122a5c07804">
  • Loading branch information
pdelewski authored Oct 22, 2024
1 parent 43a1eed commit f53610f
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 23 deletions.
6 changes: 3 additions & 3 deletions quesma/elasticsearch/index_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ type (
}
)

func NewIndexManagement(elasticsearchUrl string) IndexManagement {
func NewIndexManagement(elasticsearch config.ElasticsearchConfiguration) IndexManagement {
return &indexManagement{
ElasticsearchUrl: elasticsearchUrl,
indexResolver: NewIndexResolver(elasticsearchUrl),
ElasticsearchUrl: elasticsearch.Url.String(),
indexResolver: NewIndexResolver(elasticsearch),
}
}

Expand Down
17 changes: 7 additions & 10 deletions quesma/elasticsearch/index_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
package elasticsearch

import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"quesma/quesma/config"
)

type (
Expand All @@ -15,7 +16,7 @@ type (
}
indexResolver struct {
Url string
httpClient *http.Client
httpClient *SimpleClient
}
Sources struct {
Indices []Index `json:"indices"`
Expand All @@ -37,10 +38,10 @@ type (
}
)

func NewIndexResolver(elasticsearchUrl string) IndexResolver {
func NewIndexResolver(elasticsearch config.ElasticsearchConfiguration) IndexResolver {
return &indexResolver{
Url: elasticsearchUrl,
httpClient: &http.Client{},
Url: elasticsearch.Url.String(),
httpClient: NewSimpleClient(&elasticsearch),
}
}

Expand All @@ -52,11 +53,7 @@ func NormalizePattern(p string) string {
}

func (im *indexResolver) Resolve(indexPattern string) (Sources, bool, error) {
req, err := http.NewRequest("GET", im.Url+"/_resolve/index/"+indexPattern+"?expand_wildcards=open", bytes.NewBuffer([]byte{}))
if err != nil {
return Sources{}, false, err
}
response, err := im.httpClient.Do(req)
response, err := im.httpClient.Request(context.Background(), "GET", "_resolve/index/"+indexPattern+"?expand_wildcards=open", []byte{})
if err != nil {
return Sources{}, false, err
}
Expand Down
2 changes: 1 addition & 1 deletion quesma/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func main() {
tableDisco := clickhouse.NewTableDiscovery(&cfg, connectionPool, virtualTableStorage)
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.SchemaTypeAdapter{})

im := elasticsearch.NewIndexManagement(cfg.Elasticsearch.Url.String())
im := elasticsearch.NewIndexManagement(cfg.Elasticsearch)

connManager := connectors.NewConnectorManager(&cfg, connectionPool, phoneHomeAgent, tableDisco)
lm := connManager.GetConnector()
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/functionality/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func HandleResolve(pattern string, sr schema.Registry, cfg *config.QuesmaConfigu
// Combine results from both schema.Registry and Elasticsearch:

// todo avoid creating new instances all the time
sourcesFromElastic, _, err := elasticsearch.NewIndexResolver(cfg.Elasticsearch.Url.String()).Resolve(normalizedPattern)
sourcesFromElastic, _, err := elasticsearch.NewIndexResolver(cfg.Elasticsearch).Resolve(normalizedPattern)
if err != nil {
return elasticsearch.Sources{}, err
}
Expand Down
13 changes: 5 additions & 8 deletions quesma/table_resolver/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,12 @@ func (r *tableRegistryImpl) makeCheckIfPatternMatchesAllConnectors(pipeline stri
}

// but maybe we should also check against the actual indexes ??

if r.conf.AutodiscoveryEnabled {

for indexName := range r.elasticIndexes {
if util.IndexPatternMatches(pattern, indexName) {
matchedElastic = append(matchedElastic, indexName)
}
for indexName := range r.elasticIndexes {
if util.IndexPatternMatches(pattern, indexName) {
matchedElastic = append(matchedElastic, indexName)
}

}
if r.conf.AutodiscoveryEnabled {
for tableName := range r.clickhouseIndexes {
if util.IndexPatternMatches(pattern, tableName) {
matchedClickhouse = append(matchedClickhouse, tableName)
Expand Down
5 changes: 5 additions & 0 deletions quesma/table_resolver/table_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ func (r *tableRegistryImpl) updateIndexes() {
name: index.Name,
}
}
for _, index := range sources.DataStreams {
elasticIndexes[index.Name] = table{
name: index.Name,
}
}

logger.Info().Msgf("Elastic tables updated: %v", elasticIndexes)
r.elasticIndexes = elasticIndexes
Expand Down

0 comments on commit f53610f

Please sign in to comment.