Skip to content

Commit

Permalink
feat: add tests for changes & analyses syncer
Browse files Browse the repository at this point in the history
* migrated errors/http statuses from mission-control
* migrated upstream handlers from mission-control
  • Loading branch information
adityathebe committed Jan 4, 2024
1 parent 69f5472 commit 72675d2
Show file tree
Hide file tree
Showing 6 changed files with 577 additions and 0 deletions.
87 changes: 87 additions & 0 deletions api/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package api

import (
"errors"
"fmt"
)

// Application error codes.
//
// These are meant to be generic and they map well to HTTP error codes.
const (
ECONFLICT = "conflict"
EFORBIDDEN = "forbidden"
EINTERNAL = "internal"
EINVALID = "invalid"
ENOTFOUND = "not_found"
ENOTIMPLEMENTED = "not_implemented"
EUNAUTHORIZED = "unauthorized"
)

// Error represents an application-specific error.
type Error struct {
// Machine-readable error code.
Code string

// Human-readable error message.
Message string

// DebugInfo contains low-level internal error details that should only be logged.
// End-users should never see this.
DebugInfo string
}

// Error implements the error interface. Not used by the application otherwise.
func (e *Error) Error() string {
return fmt.Sprintf("error: code=%s message=%s", e.Code, e.Message)
}

// WithDebugInfo wraps an application error with a debug message.
func (e *Error) WithDebugInfo(msg string, args ...any) *Error {
e.DebugInfo = fmt.Sprintf(msg, args...)
return e
}

// ErrorCode unwraps an application error and returns its code.
// Non-application errors always return EINTERNAL.
func ErrorCode(err error) string {
var e *Error
if err == nil {
return ""
} else if errors.As(err, &e) {
return e.Code
}
return EINTERNAL
}

// ErrorMessage unwraps an application error and returns its message.
// Non-application errors always return "Internal error".
func ErrorMessage(err error) string {
var e *Error
if err == nil {
return ""
} else if errors.As(err, &e) {
return e.Message
}
return "Internal error."
}

// ErrorDebugInfo unwraps an application error and returns its debug message.
func ErrorDebugInfo(err error) string {
var e *Error
if err == nil {
return ""
} else if errors.As(err, &e) {
return e.DebugInfo
}

return err.Error()
}

// Errorf is a helper function to return an Error with a given code and formatted message.
func Errorf(code string, format string, args ...any) *Error {
return &Error{
Code: code,
Message: fmt.Sprintf(format, args...),
}
}
48 changes: 48 additions & 0 deletions api/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package api

import (
"net/http"

"github.com/flanksource/commons/logger"
"github.com/labstack/echo/v4"
)

type HTTPError struct {
Error string `json:"error"`
Message string `json:"message,omitempty"`
}

type HTTPSuccess struct {
Message string `json:"message"`
Payload any `json:"payload,omitempty"`
}

func WriteError(c echo.Context, err error) error {
code, message := ErrorCode(err), ErrorMessage(err)

if debugInfo := ErrorDebugInfo(err); debugInfo != "" {
logger.WithValues("code", code, "error", message).Errorf(debugInfo)
}

return c.JSON(ErrorStatusCode(code), &HTTPError{Error: message})
}

// ErrorStatusCode returns the associated HTTP status code for an application error code.
func ErrorStatusCode(code string) int {
// lookup of application error codes to HTTP status codes.
var codes = map[string]int{
ECONFLICT: http.StatusConflict,
EINVALID: http.StatusBadRequest,
ENOTFOUND: http.StatusNotFound,
EFORBIDDEN: http.StatusForbidden,
ENOTIMPLEMENTED: http.StatusNotImplemented,
EUNAUTHORIZED: http.StatusUnauthorized,
EINTERNAL: http.StatusInternalServerError,
}

if v, ok := codes[code]; ok {
return v
}

return http.StatusInternalServerError
}
53 changes: 53 additions & 0 deletions query/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package query

import (
"errors"
"fmt"
"strings"

"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
"github.com/google/uuid"
"gorm.io/gorm"
)

func FindAgent(ctx context.Context, name string) (*models.Agent, error) {
var agent models.Agent
err := ctx.DB().Where("name = ?", name).First(&agent).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}

return nil, err
}

return &agent, nil
}

func GetAllResourceIDsOfAgent(ctx context.Context, table, from string, size int, agentID uuid.UUID) ([]string, error) {
var response []string
var err error

switch table {
case "check_statuses":
query := `
SELECT (check_id::TEXT || ',' || time::TEXT)
FROM check_statuses
LEFT JOIN checks ON checks.id = check_statuses.check_id
WHERE checks.agent_id = ? AND (check_statuses.check_id::TEXT, check_statuses.time::TEXT) > (?, ?)
ORDER BY check_statuses.check_id, check_statuses.time
LIMIT ?`
parts := strings.Split(from, ",")
if len(parts) != 2 {
return nil, fmt.Errorf("%s is not a valid next cursor. It must consist of check_id and time separated by a comma", from)
}

err = ctx.DB().Raw(query, agentID, parts[0], parts[1], size).Scan(&response).Error
default:
query := fmt.Sprintf("SELECT id FROM %s WHERE agent_id = ? AND id::TEXT > ? ORDER BY id LIMIT ?", table)
err = ctx.DB().Raw(query, agentID, from, size).Scan(&response).Error

Check failure

Code scanning / CodeQL

Database query built from user-controlled sources High

This query depends on a
user-provided value
.
}

return response, err
}
137 changes: 137 additions & 0 deletions tests/upstream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package tests

import (
"fmt"
"time"

"github.com/labstack/echo/v4"
ginkgo "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/patrickmn/go-cache"

"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
"github.com/flanksource/duty/tests/setup"
"github.com/flanksource/duty/upstream"
)

var _ = ginkgo.Describe("Config Changes & Analyses sync test", ginkgo.Ordered, func() {
var upstreamCtx *context.Context
var echoCloser, drop func()
var upstreamConf upstream.UpstreamConfig
const agentName = "my-agent"

ginkgo.It("prepare upstream database", func() {
var err error
upstreamCtx, drop, err = setup.NewDB(DefaultContext, "upstream")
Expect(err).ToNot(HaveOccurred())

var changes int
err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigChange{}).Scan(&changes).Error
Expect(err).ToNot(HaveOccurred())
Expect(changes).To(Equal(0))

var analyses int
err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigAnalysis{}).Scan(&analyses).Error
Expect(err).ToNot(HaveOccurred())
Expect(analyses).To(Equal(0))

agent := models.Agent{Name: agentName}
err = upstreamCtx.DB().Create(&agent).Error
Expect(err).ToNot(HaveOccurred())
})

ginkgo.It("should setup upstream echo server", func() {
var port int
e := echo.New()
e.Use(func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
c.SetRequest(c.Request().WithContext(upstreamCtx.Wrap(c.Request().Context())))
return next(c)
}
})

e.POST("/upstream/push", upstream.PushHandler(cache.New(time.Hour, time.Hour)))
e.GET("/upstream/pull/:agent_name", upstream.PullHandler([]string{"config_scrapers", "config_items"}))
e.GET("/upstream/status/:agent_name", upstream.StatusHandler([]string{"config_scrapers", "config_items"}))

port, echoCloser = setup.RunEcho(e)

upstreamConf = upstream.UpstreamConfig{
Host: fmt.Sprintf("http://localhost:%d", port),
AgentName: agentName,
}
})

ginkgo.It("should push config items first to satisfy foregin keys for changes & analyses", func() {
reconciler := upstream.NewUpstreamReconciler(upstreamConf, 100)

count, err := reconciler.Sync(DefaultContext, "config_items")
Expect(err).To(BeNil())
Expect(count).To(Not(BeZero()))
})

ginkgo.It("should sync config_changes to upstream", func() {
{
var pushed int
err := DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = true").Model(&models.ConfigChange{}).Scan(&pushed).Error
Expect(err).ToNot(HaveOccurred())
Expect(pushed).To(BeZero())
}

var changes int
err := upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigChange{}).Scan(&changes).Error
Expect(err).ToNot(HaveOccurred())
Expect(changes).To(BeZero())

count, err := upstream.SyncConfigChanges(DefaultContext, upstreamConf, 10)
Expect(err).ToNot(HaveOccurred())

err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigChange{}).Scan(&changes).Error
Expect(err).ToNot(HaveOccurred())
Expect(changes).To(Equal(count))

{
var pending int
err := DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = false").Model(&models.ConfigChange{}).Scan(&pending).Error
Expect(err).ToNot(HaveOccurred())
Expect(pending).To(BeZero())
}
})

ginkgo.It("should sync config_analyses to upstream", func() {
{
var pushed int
err := DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = true").Model(&models.ConfigAnalysis{}).Scan(&pushed).Error
Expect(err).ToNot(HaveOccurred())
Expect(pushed).To(BeZero())
}

var analyses int
err := upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigAnalysis{}).Scan(&analyses).Error
Expect(err).ToNot(HaveOccurred())
Expect(analyses).To(BeZero())

count, err := upstream.SyncConfigAnalyses(DefaultContext, upstreamConf, 10)
Expect(err).ToNot(HaveOccurred())

err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigAnalysis{}).Scan(&analyses).Error
Expect(err).ToNot(HaveOccurred())
Expect(analyses).To(Equal(count))

{
var pending int
err := DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = false").Model(&models.ConfigAnalysis{}).Scan(&pending).Error
Expect(err).ToNot(HaveOccurred())
Expect(pending).To(BeZero())
}
})

ginkgo.It("should stop echo server ", func() {
echoCloser()
})

ginkgo.It("should drop upstream database ", func() {
drop()
})
})
Loading

0 comments on commit 72675d2

Please sign in to comment.