Skip to content

Commit

Permalink
Update smoke test to check logs after triggering async queries (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
pdelewski authored May 9, 2024
1 parent bd4da06 commit ee7fc1d
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 28 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ jobs:

- name: Verify if data is flowing
working-directory: smoke-test
env:
GITHUB_ACTIONS: true
run: go run main.go

- name: Print docker status
Expand Down
1 change: 1 addition & 0 deletions docker/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ services:
- QUESMA_port=8080
- QUESMA_logging_path=/var/quesma/logs
- QUESMA_clickhouse_url=clickhouse://clickhouse:9000
- QUESMA_logging_fileLogging=true
depends_on:
clickhouse:
condition: service_healthy
Expand Down
1 change: 1 addition & 0 deletions docker/local-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
- QUESMA_logging_path=/var/quesma/logs
- QUESMA_mode=dual-write-query-clickhouse
- QUESMA_CONFIG_FILE=/config/local-dev.yaml
- QUESMA_logging_fileLogging=true
depends_on:
clean-clickhouse:
condition: service_completed_successfully
Expand Down
132 changes: 104 additions & 28 deletions smoke-test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"log"
"net/http"
"os"
"slices"
"strings"
"time"
Expand All @@ -36,7 +37,13 @@ const (
printInterval = 5 * time.Second
)

const query = `
const (
localLogPath = "../docker/quesma/logs/quesma.log"
ciLogPath = "/home/runner/work/quesma/quesma/docker/quesma/logs/quesma.log"
ciEnvVar = "GITHUB_ACTIONS"
)

var queries = []string{`
{
"_source": false,
"fields": [
Expand Down Expand Up @@ -75,8 +82,8 @@ const query = `
"range": {
"@timestamp": {
"format": "strict_date_optional_time",
"gte": "2024-01-23T14:43:19.481Z",
"lte": "2024-01-23T14:58:19.481Z"
"gte": "now-1d",
"lte": "now-1s"
}
}
}
Expand Down Expand Up @@ -110,7 +117,53 @@ const query = `
"track_total_hits": false,
"version": true
}
`
`,
`{
"_source": {
"excludes": []
},
"aggs": {
"0": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "30s",
"min_doc_count": 1,
"time_zone": "Europe/Warsaw"
}
}
},
"fields": [
{
"field": "@timestamp",
"format": "date_time"
}
],
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"format": "strict_date_optional_time",
"gte": "now-1d",
"lte": "now-1s"
}
}
}
],
"must": [],
"must_not": [],
"should": []
}
},
"runtime_mappings": {},
"script_fields": {},
"size": 0,
"stored_fields": [
"*"
],
"track_total_hits": true
}`}

const kibanaInternalLog = `
{
Expand Down Expand Up @@ -182,7 +235,7 @@ func main() {
reportUri := waitForScheduleReportGeneration()
waitForLogsInClickhouse("logs-generic-default", time.Minute)
println(" Logs in Clickhouse: OK")
waitForAsyncQuery(time.Minute)
waitForAsyncQuery(time.Minute, queries)
println(" AsyncQuery: OK")
waitForKibanaLogExplorer("kibana LogExplorer", time.Minute)
println(" Kibana LogExplorer: OK")
Expand Down Expand Up @@ -424,37 +477,60 @@ func waitForLogsInElasticsearchRaw(serviceName, url string, quesmaSource bool, t
}
}

func waitForAsyncQuery(timeout time.Duration) {
serviceName := "async query"
res := waitFor(serviceName, func() bool {
resp, err := http.Post(asyncQueryUrl, "application/json", bytes.NewBuffer([]byte(query)))
func checkLogs() {
value := os.Getenv(ciEnvVar)
logPath := localLogPath
if value != "" {
logPath = ciLogPath
}
content, err := os.ReadFile(logPath)
if err != nil {
panic("Error reading file:" + err.Error())
return
}

if err == nil {
defer resp.Body.Close()
if resp.StatusCode == 200 {
body, err := io.ReadAll(resp.Body)
if err == nil {
var response map[string]interface{}
_ = json.Unmarshal(body, &response)
fileContent := string(content)
searchString := "Panic recovered:"

if response["completion_time_in_millis"] != nil {
if sourceClickhouse(resp) {
return true
} else {
panic("invalid X-Quesma-Source header value")
if bytes.Contains([]byte(fileContent), []byte(searchString)) {
panic("Panic recovered in quesma.log")
}
}

func waitForAsyncQuery(timeout time.Duration, queries []string) {
serviceName := "async query"
for _, query := range queries {
res := waitFor(serviceName, func() bool {
resp, err := http.Post(asyncQueryUrl, "application/json", bytes.NewBuffer([]byte(query)))

if err == nil {
defer resp.Body.Close()
if resp.StatusCode == 200 {
body, err := io.ReadAll(resp.Body)
if err == nil {
var response map[string]interface{}
_ = json.Unmarshal(body, &response)

if response["completion_time_in_millis"] != nil {
if sourceClickhouse(resp) {
return true
} else {
panic("invalid X-Quesma-Source header value")
}
}
} else {
log.Println(err)
}
} else {
log.Println(err)
}
}
}
return false
}, timeout)
return false
}, timeout)

if !res {
panic(serviceName + " is not alive or is not receiving logs")
if !res {
panic(serviceName + " is not alive or is not receiving logs")
}
}
checkLogs()
}

func headerExists(headers http.Header, key string, value string) bool {
Expand Down

0 comments on commit ee7fc1d

Please sign in to comment.