Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving v2 (new quesma) tests #1167

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
368 changes: 0 additions & 368 deletions quesma/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,378 +4,10 @@
package main

import (
"context"
"github.com/stretchr/testify/assert"
"net/http"
"os"
"os/signal"
"quesma/backend_connectors"
"quesma/frontend_connectors"
"quesma/processors"
"quesma/quesma/config"
quesma_api "quesma_v2/core"
"sync/atomic"
"syscall"
"testing"
"time"
)

// just to make sure that the buildIngestOnlyQuesma is used
func Test_Main(m *testing.T) {
_ = buildIngestOnlyQuesma()
}

func emitRequests(stop chan os.Signal, t *testing.T, testData []struct {
url string
expectedResponse string
}) {
go func() {
time.Sleep(1 * time.Second)
requestBody := []byte(`{"query": {"match_all": {}}}`)
var resp string
var err error
for _, test := range testData {
resp, err = sendRequest(test.url, requestBody)
assert.NoError(t, err)
assert.Contains(t, test.expectedResponse, resp)
}
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
close(stop)
}()
}

func Test_backendConnectorValidation(t *testing.T) {
var tcpProcessor quesma_api.Processor = processors.NewPostgresToMySqlProcessor()
var postgressPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
postgressPipeline.AddProcessor(tcpProcessor)
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies())

const endpoint = "root:password@tcp(127.0.0.1:3306)/test"
var mySqlBackendConnector quesma_api.BackendConnector = &backend_connectors.MySqlBackendConnector{
Endpoint: endpoint,
}
postgressPipeline.AddBackendConnector(mySqlBackendConnector)
quesmaBuilder.AddPipeline(postgressPipeline)
_, err := quesmaBuilder.Build()
assert.NoError(t, err)
}

var fallbackCalled int32 = 0

func fallback(_ context.Context, _ *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
metadata := quesma_api.MakeNewMetadata()
atomic.AddInt32(&fallbackCalled, 1)
resp := []byte("unknown\n")
return &quesma_api.Result{Meta: metadata, GenericResult: resp}, nil
}

func ab_testing_scenario() quesma_api.QuesmaBuilder {
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies())

cfg := &config.QuesmaConfiguration{
DisableAuth: true,
Elasticsearch: config.ElasticsearchConfiguration{
Url: &config.Url{Host: "localhost:9200", Scheme: "http"},
User: "",
Password: "",
},
}

ingestFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)
ingestHTTPRouter := quesma_api.NewPathRouter()
ingestHTTPRouter.AddRoute("/_bulk", bulk)
ingestHTTPRouter.AddRoute("/_doc", doc)
ingestFrontendConnector.AddRouter(ingestHTTPRouter)
var ingestPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
ingestPipeline.AddFrontendConnector(ingestFrontendConnector)
var abIngestTestProcessor quesma_api.Processor = processors.NewABTestProcessor("ABIngestTestProcessor", false)

var ingestProcessor quesma_api.Processor = NewIngestProcessor()
var innerIngestProcessor1 quesma_api.Processor = NewInnerIngestProcessor1()
ingestProcessor.AddProcessor(innerIngestProcessor1)
var innerIngestProcessor2 quesma_api.Processor = NewInnerIngestProcessor2()
ingestProcessor.AddProcessor(innerIngestProcessor2)

ingestPipeline.AddProcessor(ingestProcessor)
ingestPipeline.AddProcessor(abIngestTestProcessor)

queryFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)
queryHTTPRouter := quesma_api.NewPathRouter()
queryHTTPRouter.AddRoute("/_search", search)
queryFrontendConnector.AddRouter(queryHTTPRouter)
var queryPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
queryPipeline.AddFrontendConnector(queryFrontendConnector)
var queryProcessor quesma_api.Processor = NewQueryProcessor()
var innerQueryProcessor1 quesma_api.Processor = NewInnerQueryProcessor1()
queryProcessor.AddProcessor(innerQueryProcessor1)
var innerQueryProcessor2 quesma_api.Processor = NewInnerQueryProcessor2()
queryProcessor.AddProcessor(innerQueryProcessor2)
var abQueryTestProcessor quesma_api.Processor = processors.NewABTestProcessor("ABQueryTestProcessor", true)

queryPipeline.AddProcessor(queryProcessor)
queryPipeline.AddProcessor(abQueryTestProcessor)
quesmaBuilder.AddPipeline(ingestPipeline)
quesmaBuilder.AddPipeline(queryPipeline)

quesma, _ := quesmaBuilder.Build()
return quesma
}

func fallbackScenario() quesma_api.QuesmaBuilder {
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies())

cfg := &config.QuesmaConfiguration{
DisableAuth: true,
Elasticsearch: config.ElasticsearchConfiguration{
Url: &config.Url{Host: "localhost:9200", Scheme: "http"},
User: "",
Password: "",
},
}
ingestFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)

ingestHTTPRouter := quesma_api.NewPathRouter()
var fallback quesma_api.HTTPFrontendHandler = fallback
ingestHTTPRouter.AddFallbackHandler(fallback)
ingestFrontendConnector.AddRouter(ingestHTTPRouter)
var ingestPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
ingestPipeline.AddFrontendConnector(ingestFrontendConnector)
quesmaBuilder.AddPipeline(ingestPipeline)

return quesmaBuilder
}

func Test_fallbackScenario(t *testing.T) {
qBuilder := fallbackScenario()
q1, _ := qBuilder.Build()
q1.Start()
stop := make(chan os.Signal, 1)
testData := []struct {
url string
expectedResponse string
}{
{"http://localhost:8888/_bulk", "unknown\n"},
{"http://localhost:8888/_doc", "unknown\n"},
{"http://localhost:8888/_search", "unknown\n"},
{"http://localhost:8888/_search", "unknown\n"},
}
emitRequests(stop, t, testData)
<-stop
q1.Stop(context.Background())
atomic.LoadInt32(&fallbackCalled)
assert.Equal(t, int32(4), fallbackCalled)
}

func Test_scenario1(t *testing.T) {
q1 := ab_testing_scenario()
q1.Start()
stop := make(chan os.Signal, 1)
testData := []struct {
url string
expectedResponse string
}{
{"http://localhost:8888/_bulk", `bulk->IngestProcessor->InnerIngestProcessor1->0ABIngestTestProcessor
bulk->IngestProcessor->InnerIngestProcessor2->0ABIngestTestProcessor
`},
{"http://localhost:8888/_doc", `doc->IngestProcessor->InnerIngestProcessor1->0ABIngestTestProcessor
doc->IngestProcessor->InnerIngestProcessor2->0ABIngestTestProcessor
`},
{"http://localhost:8888/_search", "ABTestProcessor processor: Responses are equal\n"},
{"http://localhost:8888/_search", "ABTestProcessor processor: Responses are not equal\n"},
}
emitRequests(stop, t, testData)
<-stop
q1.Stop(context.Background())
}

var middlewareCallCount int32 = 0

type Middleware struct {
emitError bool
}

func (m *Middleware) ServeHTTP(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&middlewareCallCount, 1)
if m.emitError {
http.Error(w, "middleware", http.StatusInternalServerError)
}
}

type Middleware2 struct {
}

func (m *Middleware2) ServeHTTP(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&middlewareCallCount, 1)
w.WriteHeader(200)
}

func createMiddleWareScenario(emitError bool, cfg *config.QuesmaConfiguration) quesma_api.QuesmaBuilder {
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies())

frontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)
HTTPRouter := quesma_api.NewPathRouter()
var fallback quesma_api.HTTPFrontendHandler = fallback
HTTPRouter.AddFallbackHandler(fallback)
frontendConnector.AddRouter(HTTPRouter)
frontendConnector.AddMiddleware(&Middleware{emitError: emitError})
frontendConnector.AddMiddleware(&Middleware2{})

var pipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
pipeline.AddFrontendConnector(frontendConnector)
var ingestProcessor quesma_api.Processor = NewIngestProcessor()
pipeline.AddProcessor(ingestProcessor)
quesmaBuilder.AddPipeline(pipeline)
return quesmaBuilder
}

func Test_middleware(t *testing.T) {

cfg := &config.QuesmaConfiguration{
DisableAuth: true,
Elasticsearch: config.ElasticsearchConfiguration{
Url: &config.Url{Host: "localhost:9200", Scheme: "http"},
User: "",
Password: "",
},
}
{
quesmaBuilder := createMiddleWareScenario(true, cfg)
quesmaBuilder.Build()
quesmaBuilder.Start()
stop := make(chan os.Signal, 1)
testData := []struct {
url string
expectedResponse string
}{
{"http://localhost:8888/_bulk", "middleware\n"},
{"http://localhost:8888/_doc", "middleware\n"},
{"http://localhost:8888/_search", "middleware\n"},
{"http://localhost:8888/_search", "middleware\n"},
}
emitRequests(stop, t, testData)

<-stop
quesmaBuilder.Stop(context.Background())
atomic.LoadInt32(&middlewareCallCount)
assert.Equal(t, int32(4), middlewareCallCount)
}
atomic.StoreInt32(&middlewareCallCount, 0)
{
quesmaBuilder := createMiddleWareScenario(false, cfg)
quesmaBuilder.Build()
quesmaBuilder.Start()
stop := make(chan os.Signal, 1)
testData := []struct {
url string
expectedResponse string
}{
{"http://localhost:8888/_bulk", "middleware\n"},
{"http://localhost:8888/_doc", "middleware\n"},
{"http://localhost:8888/_search", "middleware\n"},
{"http://localhost:8888/_search", "middleware\n"},
}
emitRequests(stop, t, testData)
<-stop
quesmaBuilder.Stop(context.Background())
atomic.LoadInt32(&middlewareCallCount)
assert.Equal(t, int32(8), middlewareCallCount)
}
}

func Test_QuesmaBuild(t *testing.T) {
cfg := &config.QuesmaConfiguration{
DisableAuth: true,
Elasticsearch: config.ElasticsearchConfiguration{
Url: &config.Url{Host: "localhost:9200", Scheme: "http"},
User: "",
Password: "",
},
}
{
// Two pipelines with different endpoints
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies())
firstFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)
firstHTTPRouter := quesma_api.NewPathRouter()
firstHTTPRouter.AddRoute("/_bulk", bulk)
firstFrontendConnector.AddRouter(firstHTTPRouter)
var firstPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
firstPipeline.AddFrontendConnector(firstFrontendConnector)

secondFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8889", cfg)
secondHTTPRouter := quesma_api.NewPathRouter()
secondHTTPRouter.AddRoute("/_search", search)
secondFrontendConnector.AddRouter(secondHTTPRouter)
var secondPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
secondPipeline.AddFrontendConnector(secondFrontendConnector)

quesmaBuilder.AddPipeline(firstPipeline)
quesmaBuilder.AddPipeline(secondPipeline)
quesma, err := quesmaBuilder.Build()
assert.NotNil(t, quesma)
assert.Equal(t, 2, len(quesma.GetPipelines()))
assert.Equal(t, 1, len(quesma.GetPipelines()[0].GetFrontendConnectors()))
assert.Equal(t, 1, len(quesma.GetPipelines()[1].GetFrontendConnectors()))
assert.Equal(t, 1, len(quesma.GetPipelines()[0].GetFrontendConnectors()[0].(quesma_api.HTTPFrontendConnector).GetRouter().GetHandlers()))
assert.Equal(t, 1, len(quesma.GetPipelines()[1].GetFrontendConnectors()[0].(quesma_api.HTTPFrontendConnector).GetRouter().GetHandlers()))
assert.NotEqual(t, quesma.GetPipelines()[1].GetFrontendConnectors()[0], quesma.GetPipelines()[0].GetFrontendConnectors()[0])

assert.NoError(t, err)

}
{
// Two pipelines with the same endpoint
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies())
firstFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)
firstHTTPRouter := quesma_api.NewPathRouter()
firstHTTPRouter.AddRoute("/_bulk", bulk)
firstFrontendConnector.AddRouter(firstHTTPRouter)
var firstPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
firstPipeline.AddFrontendConnector(firstFrontendConnector)

secondFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)
secondHTTPRouter := quesma_api.NewPathRouter()
secondHTTPRouter.AddRoute("/_search", search)
secondFrontendConnector.AddRouter(secondHTTPRouter)
var secondPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
secondPipeline.AddFrontendConnector(secondFrontendConnector)

quesmaBuilder.AddPipeline(firstPipeline)
quesmaBuilder.AddPipeline(secondPipeline)
quesma, err := quesmaBuilder.Build()
assert.NotNil(t, quesma)
assert.Equal(t, 2, len(quesma.GetPipelines()))
assert.Equal(t, 1, len(quesma.GetPipelines()[0].GetFrontendConnectors()))
assert.Equal(t, 1, len(quesma.GetPipelines()[1].GetFrontendConnectors()))
assert.Equal(t, 2, len(quesma.GetPipelines()[0].GetFrontendConnectors()[0].(quesma_api.HTTPFrontendConnector).GetRouter().GetHandlers()))
assert.Equal(t, 2, len(quesma.GetPipelines()[1].GetFrontendConnectors()[0].(quesma_api.HTTPFrontendConnector).GetRouter().GetHandlers()))
assert.Equal(t, quesma.GetPipelines()[1].GetFrontendConnectors()[0], quesma.GetPipelines()[0].GetFrontendConnectors()[0])
assert.NoError(t, err)
}
{
// One pipeline with the same endpoint
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma(quesma_api.EmptyDependencies())
firstFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)
firstHTTPRouter := quesma_api.NewPathRouter()
firstHTTPRouter.AddRoute("/_bulk", bulk)
firstFrontendConnector.AddRouter(firstHTTPRouter)
var firstPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
firstPipeline.AddFrontendConnector(firstFrontendConnector)

secondFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)
secondHTTPRouter := quesma_api.NewPathRouter()
secondHTTPRouter.AddRoute("/_search", search)
secondFrontendConnector.AddRouter(secondHTTPRouter)
firstPipeline.AddFrontendConnector(secondFrontendConnector)

quesmaBuilder.AddPipeline(firstPipeline)
quesma, err := quesmaBuilder.Build()
assert.NotNil(t, quesma)
assert.Equal(t, 1, len(quesma.GetPipelines()))
assert.Equal(t, 2, len(quesma.GetPipelines()[0].GetFrontendConnectors()))
assert.Equal(t, 2, len(quesma.GetPipelines()[0].GetFrontendConnectors()[0].(quesma_api.HTTPFrontendConnector).GetRouter().GetHandlers()))
assert.Equal(t, 2, len(quesma.GetPipelines()[0].GetFrontendConnectors()[1].(quesma_api.HTTPFrontendConnector).GetRouter().GetHandlers()))
assert.Equal(t, quesma.GetPipelines()[0].GetFrontendConnectors()[0], quesma.GetPipelines()[0].GetFrontendConnectors()[1])

assert.NoError(t, err)
}
}
2 changes: 1 addition & 1 deletion quesma/test_utils.go → quesma/quesma/test_utils.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package main
package quesma

import (
"bytes"
Expand Down
Loading
Loading