Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add is_pushed columns to changes and analyses #407

Merged
merged 4 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions api/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package api

import (
"errors"
"fmt"
)

// Application error codes.
//
// These are meant to be generic and they map well to HTTP error codes.
const (
ECONFLICT = "conflict"
EFORBIDDEN = "forbidden"
EINTERNAL = "internal"
EINVALID = "invalid"
ENOTFOUND = "not_found"
ENOTIMPLEMENTED = "not_implemented"
EUNAUTHORIZED = "unauthorized"
)

// Error represents an application-specific error.
type Error struct {
// Machine-readable error code.
Code string

// Human-readable error message.
Message string

// DebugInfo contains low-level internal error details that should only be logged.
// End-users should never see this.
DebugInfo string
}

// Error implements the error interface. Not used by the application otherwise.
func (e *Error) Error() string {
return fmt.Sprintf("error: code=%s message=%s", e.Code, e.Message)
}

// WithDebugInfo wraps an application error with a debug message.
func (e *Error) WithDebugInfo(msg string, args ...any) *Error {
e.DebugInfo = fmt.Sprintf(msg, args...)
return e
}

// ErrorCode unwraps an application error and returns its code.
// Non-application errors always return EINTERNAL.
func ErrorCode(err error) string {
var e *Error
if err == nil {
return ""
} else if errors.As(err, &e) {
return e.Code
}
return EINTERNAL
}

// ErrorMessage unwraps an application error and returns its message.
// Non-application errors always return "Internal error".
func ErrorMessage(err error) string {
var e *Error
if err == nil {
return ""
} else if errors.As(err, &e) {
return e.Message
}
return "Internal error."
}

// ErrorDebugInfo unwraps an application error and returns its debug message.
func ErrorDebugInfo(err error) string {
var e *Error
if err == nil {
return ""
} else if errors.As(err, &e) {
return e.DebugInfo
}

return err.Error()
}

// Errorf is a helper function to return an Error with a given code and formatted message.
func Errorf(code string, format string, args ...any) *Error {
return &Error{
Code: code,
Message: fmt.Sprintf(format, args...),
}
}
48 changes: 48 additions & 0 deletions api/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package api

import (
"net/http"

"github.com/flanksource/commons/logger"
"github.com/labstack/echo/v4"
)

type HTTPError struct {
Error string `json:"error"`
Message string `json:"message,omitempty"`
}

type HTTPSuccess struct {
Message string `json:"message"`
Payload any `json:"payload,omitempty"`
}

func WriteError(c echo.Context, err error) error {
code, message := ErrorCode(err), ErrorMessage(err)

if debugInfo := ErrorDebugInfo(err); debugInfo != "" {
logger.WithValues("code", code, "error", message).Errorf(debugInfo)
}

return c.JSON(ErrorStatusCode(code), &HTTPError{Error: message})
}

// ErrorStatusCode returns the associated HTTP status code for an application error code.
func ErrorStatusCode(code string) int {
// lookup of application error codes to HTTP status codes.
var codes = map[string]int{
ECONFLICT: http.StatusConflict,
EINVALID: http.StatusBadRequest,
ENOTFOUND: http.StatusNotFound,
EFORBIDDEN: http.StatusForbidden,
ENOTIMPLEMENTED: http.StatusNotImplemented,
EUNAUTHORIZED: http.StatusUnauthorized,
EINTERNAL: http.StatusInternalServerError,
}

if v, ok := codes[code]; ok {
return v
}

return http.StatusInternalServerError
}
3 changes: 2 additions & 1 deletion job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (
)

const (
ResourceTypeComponent = "components"
ResourceTypeCheckStatuses = "check_statuses"
ResourceTypeComponent = "components"
ResourceTypeUpstream = "upstream"
)

var RetentionHour = Retention{
Expand Down
4 changes: 4 additions & 0 deletions models/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ type ConfigChange struct {
Diff string `gorm:"column:diff;default:null" json:"diff,omitempty"`
Details types.JSON `gorm:"column:details" json:"details,omitempty"`
CreatedAt *time.Time `gorm:"column:created_at" json:"created_at"`
// IsPushed when set to true indicates that the check status has been pushed to upstream.
IsPushed bool `json:"is_pushed,omitempty"`
}

func (c ConfigChange) TableName() string {
Expand Down Expand Up @@ -230,6 +232,8 @@ type ConfigAnalysis struct {
Source string `gorm:"column:source" json:"source,omitempty"`
FirstObserved *time.Time `gorm:"column:first_observed;<-:false" json:"first_observed"`
LastObserved *time.Time `gorm:"column:last_observed" json:"last_observed"`
// IsPushed when set to true indicates that the check status has been pushed to upstream.
IsPushed bool `json:"is_pushed,omitempty"`
}

func (a ConfigAnalysis) TableName() string {
Expand Down
53 changes: 53 additions & 0 deletions query/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package query

import (
"errors"
"fmt"
"strings"

"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
"github.com/google/uuid"
"gorm.io/gorm"
)

func FindAgent(ctx context.Context, name string) (*models.Agent, error) {
var agent models.Agent
err := ctx.DB().Where("name = ?", name).First(&agent).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}

return nil, err
}

return &agent, nil
}

func GetAllResourceIDsOfAgent(ctx context.Context, table, from string, size int, agentID uuid.UUID) ([]string, error) {
var response []string
var err error

switch table {
case "check_statuses":
query := `
SELECT (check_id::TEXT || ',' || time::TEXT)
FROM check_statuses
LEFT JOIN checks ON checks.id = check_statuses.check_id
WHERE checks.agent_id = ? AND (check_statuses.check_id::TEXT, check_statuses.time::TEXT) > (?, ?)
ORDER BY check_statuses.check_id, check_statuses.time
LIMIT ?`
parts := strings.Split(from, ",")
if len(parts) != 2 {
return nil, fmt.Errorf("%s is not a valid next cursor. It must consist of check_id and time separated by a comma", from)
}

err = ctx.DB().Raw(query, agentID, parts[0], parts[1], size).Scan(&response).Error
default:
query := fmt.Sprintf("SELECT id FROM %s WHERE agent_id = ? AND id::TEXT > ? ORDER BY id LIMIT ?", table)
err = ctx.DB().Raw(query, agentID, from, size).Scan(&response).Error
Dismissed Show dismissed Hide dismissed
}

return response, err
}
12 changes: 12 additions & 0 deletions schema/config.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ table "config_analysis" {
null = true
type = timestamptz
}
column "is_pushed" {
adityathebe marked this conversation as resolved.
Show resolved Hide resolved
null = false
default = false
type = bool
comment = "is_pushed when set to true indicates that the check status has been pushed to upstream."
}
primary_key {
columns = [column.id]
}
Expand Down Expand Up @@ -131,6 +137,12 @@ table "config_changes" {
type = timestamptz
default = sql("now()")
}
column "is_pushed" {
null = false
default = false
type = bool
comment = "is_pushed when set to true indicates that the check status has been pushed to upstream."
}
primary_key {
columns = [column.id]
}
Expand Down
137 changes: 137 additions & 0 deletions tests/upstream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package tests

import (
"fmt"
"time"

"github.com/labstack/echo/v4"
ginkgo "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/patrickmn/go-cache"

"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
"github.com/flanksource/duty/tests/setup"
"github.com/flanksource/duty/upstream"
)

var _ = ginkgo.Describe("Config Changes & Analyses sync test", ginkgo.Ordered, func() {
var upstreamCtx *context.Context
var echoCloser, drop func()
var upstreamConf upstream.UpstreamConfig
const agentName = "my-agent"

ginkgo.It("prepare upstream database", func() {
var err error
upstreamCtx, drop, err = setup.NewDB(DefaultContext, "upstream")
Expect(err).ToNot(HaveOccurred())

var changes int
err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigChange{}).Scan(&changes).Error
Expect(err).ToNot(HaveOccurred())
Expect(changes).To(Equal(0))

var analyses int
err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigAnalysis{}).Scan(&analyses).Error
Expect(err).ToNot(HaveOccurred())
Expect(analyses).To(Equal(0))

agent := models.Agent{Name: agentName}
err = upstreamCtx.DB().Create(&agent).Error
Expect(err).ToNot(HaveOccurred())
})

ginkgo.It("should setup upstream echo server", func() {
var port int
e := echo.New()
e.Use(func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
c.SetRequest(c.Request().WithContext(upstreamCtx.Wrap(c.Request().Context())))
return next(c)
}
})

e.POST("/upstream/push", upstream.PushHandler(cache.New(time.Hour, time.Hour)))
e.GET("/upstream/pull/:agent_name", upstream.PullHandler([]string{"config_scrapers", "config_items"}))
e.GET("/upstream/status/:agent_name", upstream.StatusHandler([]string{"config_scrapers", "config_items"}))

port, echoCloser = setup.RunEcho(e)

upstreamConf = upstream.UpstreamConfig{
Host: fmt.Sprintf("http://localhost:%d", port),
AgentName: agentName,
}
})

ginkgo.It("should push config items first to satisfy foregin keys for changes & analyses", func() {
reconciler := upstream.NewUpstreamReconciler(upstreamConf, 100)

count, err := reconciler.Sync(DefaultContext, "config_items")
Expect(err).To(BeNil())
Expect(count).To(Not(BeZero()))
})

ginkgo.It("should sync config_changes to upstream", func() {
{
var pushed int
err := DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = true").Model(&models.ConfigChange{}).Scan(&pushed).Error
Expect(err).ToNot(HaveOccurred())
Expect(pushed).To(BeZero())
}

var changes int
err := upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigChange{}).Scan(&changes).Error
Expect(err).ToNot(HaveOccurred())
Expect(changes).To(BeZero())

count, err := upstream.SyncConfigChanges(DefaultContext, upstreamConf, 10)
Expect(err).ToNot(HaveOccurred())

err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigChange{}).Scan(&changes).Error
Expect(err).ToNot(HaveOccurred())
Expect(changes).To(Equal(count))

{
var pending int
err := DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = false").Model(&models.ConfigChange{}).Scan(&pending).Error
Expect(err).ToNot(HaveOccurred())
Expect(pending).To(BeZero())
}
})

ginkgo.It("should sync config_analyses to upstream", func() {
{
var pushed int
err := DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = true").Model(&models.ConfigAnalysis{}).Scan(&pushed).Error
Expect(err).ToNot(HaveOccurred())
Expect(pushed).To(BeZero())
}

var analyses int
err := upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigAnalysis{}).Scan(&analyses).Error
Expect(err).ToNot(HaveOccurred())
Expect(analyses).To(BeZero())

count, err := upstream.SyncConfigAnalyses(DefaultContext, upstreamConf, 10)
Expect(err).ToNot(HaveOccurred())

err = upstreamCtx.DB().Select("COUNT(*)").Model(&models.ConfigAnalysis{}).Scan(&analyses).Error
Expect(err).ToNot(HaveOccurred())
Expect(analyses).To(Equal(count))

{
var pending int
err := DefaultContext.DB().Select("COUNT(*)").Where("is_pushed = false").Model(&models.ConfigAnalysis{}).Scan(&pending).Error
Expect(err).ToNot(HaveOccurred())
Expect(pending).To(BeZero())
}
})

ginkgo.It("should stop echo server ", func() {
echoCloser()
})

ginkgo.It("should drop upstream database ", func() {
drop()
})
})
Loading
Loading