Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline refactor #8

Merged
merged 4 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 5 additions & 40 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,53 +30,18 @@ var (
//go:embed config.schema.json
var jsonSchemaFile embed.FS

type Writer struct {
Type string `json:"type"`

Raw []byte `json:"-"`
}

func (w *Writer) UnmarshalJSON(data []byte) error {
var writerConfig struct {
Type string `json:"type"`
}

if err := json.Unmarshal(data, &writerConfig); err != nil {
return err
}

w.Type = writerConfig.Type
w.Raw = data

return nil
}

type WriterConfigValidator interface {
Validate() error
}

func WriterConfig[T WriterConfigValidator](config Writer) (T, error) {
var cfg T
if err := json.Unmarshal(config.Raw, &cfg); err != nil {
return cfg, err
}

if err := cfg.Validate(); err != nil {
return cfg, fmt.Errorf("%w: %s", ErrConfigNotValid, err)
}

return cfg, nil
}

type Authentication struct {
Secret SecretSource `json:"secret"`
}

type Processors []GenericConfig
type Sinks []GenericConfig

type Integration struct {
Type string `json:"type"`
Authentication Authentication `json:"authentication"`
Writers []Writer `json:"writers"`
EventIDPath string `json:"eventIdPath"`
Processors Processors `json:"processors"`
Sinks Sinks `json:"sinks"`
}

type Configuration struct {
Expand Down
41 changes: 26 additions & 15 deletions internal/config/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,28 @@
}
}
},
"eventIdPath": {
"type": "string"
"processors": {
"type": "array",
"items": {
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": [
"mapper"
]
},
"outputEvent": {
"type": "object"
}
},
"required": [
"type",
"outputEvent"
]
}
},
"writers": {
"sinks": {
"type": "array",
"items": {
"oneOf": [
Expand All @@ -40,20 +58,12 @@
},
"collection": {
"type": "string"
},
"outputEvent": {
"type": "object"
},
"idField": {
"type": "string"
}
},
"required": [
"type",
"url",
"outputEvent",
"collection",
"idField"
"collection"
]
}
]
Expand All @@ -62,16 +72,17 @@
},
"required": [
"type",
"writers",
"eventIdPath"
]
"sinks"
],
"additionalProperties": false
},
"minItems": 1
}
},
"required": [
"integrations"
],
"additionalProperties": false,
"definitions": {
"secret": {
"type": "object",
Expand Down
57 changes: 38 additions & 19 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ func TestLoadServiceConfiguration(t *testing.T) {
t.Setenv("TEST_LOAD_SERVICE_MONGO_URL", "mongodb://localhost:27017")

tests := map[string]struct {
path string
expectedError string
expectedContent *Configuration
expectedWriterConfig string
path string

expectedError string
expectedContent *Configuration
expectedSinkConfig string
expectedProcessorConfig string
}{
"invalid configuration not match schema": {
path: "./testdata/invalid-config.json",
Expand All @@ -51,16 +53,21 @@ func TestLoadServiceConfiguration(t *testing.T) {
Authentication: Authentication{
Secret: SecretSource("MY_SECRET"),
},
EventIDPath: "issue.id",
Writers: []Writer{
Processors: Processors{
{
Type: "mapper",
},
},
Sinks: Sinks{
{
Type: "mongo",
},
},
},
},
},
expectedWriterConfig: getExpectedWriterConfig(t),
expectedSinkConfig: getExpectedSinkConfig(t),
expectedProcessorConfig: getExpectedProcessorConfig(t),
},
"invalid config if integrations is empty": {
path: "./testdata/empty-integrations.json",
Expand All @@ -76,10 +83,15 @@ func TestLoadServiceConfiguration(t *testing.T) {
require.Nil(t, config)
} else {
require.NoError(t, err)
rawConfig := config.Integrations[0].Writers[0].Raw
config.Integrations[0].Writers[0].Raw = nil
rawSinkConfig := config.Integrations[0].Sinks[0].Raw
config.Integrations[0].Sinks[0].Raw = nil

rawProcessorConfig := config.Integrations[0].Processors[0].Raw
config.Integrations[0].Processors[0].Raw = nil

require.Equal(t, test.expectedContent, config)
require.JSONEq(t, test.expectedWriterConfig, string(rawConfig))
require.JSONEq(t, test.expectedSinkConfig, string(rawSinkConfig))
require.JSONEq(t, test.expectedProcessorConfig, string(rawProcessorConfig))
}
})
}
Expand All @@ -89,7 +101,7 @@ func TestWriterConfig(t *testing.T) {
t.Setenv("TEST_LOAD_SERVICE_MONGO_URL", "mongodb://localhost:27017")

t.Run("config is parsed correctly", func(t *testing.T) {
config := Writer{
config := GenericConfig{
Type: "mongo",
Raw: []byte(`{
"type": "mongo",
Expand All @@ -106,9 +118,9 @@ func TestWriterConfig(t *testing.T) {
}`),
}

cfg, err := WriterConfig[Config](config)
cfg, err := GetConfig[testConfig](config)
require.NoError(t, err)
require.Equal(t, Config{
require.Equal(t, testConfig{
URL: "mongodb://localhost:27017",
Collection: "my-collection",
OutputEvent: map[string]any{
Expand All @@ -121,31 +133,38 @@ func TestWriterConfig(t *testing.T) {
})
}

type Config struct {
type testConfig struct {
URL SecretSource `json:"url"`
Collection string `json:"collection"`
OutputEvent map[string]any
}

func (c Config) Validate() error {
func (c testConfig) Validate() error {
return nil
}

func getExpectedWriterConfig(t *testing.T) string {
func getExpectedSinkConfig(t *testing.T) string {
t.Helper()

return `{
"type": "mongo",
"url": {
"fromEnv": "TEST_LOAD_SERVICE_MONGO_URL"
},
"collection": "my-collection",
"collection": "my-collection"
}`
}

func getExpectedProcessorConfig(t *testing.T) string {
t.Helper()

return `{
"type": "mapper",
"outputEvent": {
"key": "{{ issue.key }}",
"summary": "{{ issue.fields.summary }}",
"createdAt": "{{ issue.fields.created }}",
"description": "{{ issue.fields.description }}"
},
"idField": "key"
}
}`
}
40 changes: 27 additions & 13 deletions internal/config/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
"testing"

"github.com/mia-platform/integration-connector-agent/internal/config"
"github.com/mia-platform/integration-connector-agent/internal/writer/mongo"
"github.com/mia-platform/integration-connector-agent/internal/processors/mapper"
"github.com/mia-platform/integration-connector-agent/internal/sinks/mongo"

"github.com/stretchr/testify/require"
)
Expand All @@ -28,33 +29,46 @@ func TestWriterConfig(t *testing.T) {
cfg, err := config.LoadServiceConfiguration("testdata/all-writer-config.json")
require.NoError(t, err)

writers := cfg.Integrations[0].Writers
writers := cfg.Integrations[0].Sinks
require.NotNil(t, writers)

mappedWriters := map[string]config.Writer{}
processors := cfg.Integrations[0].Processors
require.NotNil(t, processors)

mappedSinks := map[string]config.GenericConfig{}
for _, writer := range writers {
mappedWriters[writer.Type] = writer
mappedSinks[writer.Type] = writer
}

mappedProcessors := map[string]config.GenericConfig{}
for _, p := range processors {
mappedProcessors[p.Type] = p
}

secretValue := "my-secret-env"
t.Setenv("TEST_SECRET_ENV", secretValue)

t.Run("mongo", func(t *testing.T) {
mappedWriters, ok := mappedWriters["mongo"]
sinkConfig, ok := mappedSinks["mongo"]
require.True(t, ok)

mongoConfig, err := config.WriterConfig[*mongo.Config](mappedWriters)
mongoConfig, err := config.GetConfig[*mongo.Config](sinkConfig)
require.NoError(t, err)
require.Equal(t, &mongo.Config{
URL: config.SecretSource(secretValue),
Collection: "my-collection",
OutputEvent: map[string]any{
"key": "{{ issue.key }}",
"summary": "{{ issue.fields.summary }}",
"createdAt": "{{ issue.fields.created }}",
"description": "{{ issue.fields.description }}",
},
IDField: "key",
}, mongoConfig)
})

t.Run("mapper", func(t *testing.T) {
processorCfg, ok := mappedProcessors["mapper"]
require.True(t, ok)

mapperConfig, err := config.GetConfig[mapper.Config](processorCfg)
require.NoError(t, err)
require.JSONEq(t,
`{"key": "{{ issue.key }}","summary": "{{ issue.fields.summary }}","createdAt": "{{ issue.fields.created }}","description": "{{ issue.fields.description }}"}`,
string(mapperConfig.OutputEvent),
)
})
}
19 changes: 11 additions & 8 deletions internal/config/testdata/all-writer-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,24 @@
"fromFile": "testdata/secret"
}
},
"eventIdPath": "issue.id",
"writers": [
"processors": [
{
"type": "mongo",
"url": {
"fromEnv": "TEST_SECRET_ENV"
},
"collection": "my-collection",
"type": "mapper",
"outputEvent": {
"key": "{{ issue.key }}",
"summary": "{{ issue.fields.summary }}",
"createdAt": "{{ issue.fields.created }}",
"description": "{{ issue.fields.description }}"
}
}
],
"sinks": [
{
"type": "mongo",
"url": {
"fromEnv": "TEST_SECRET_ENV"
},
"idField": "key"
"collection": "my-collection"
}
]
}
Expand Down
19 changes: 11 additions & 8 deletions internal/config/testdata/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,24 @@
"fromFile": "testdata/secret"
}
},
"eventIdPath": "issue.id",
"writers": [
"processors": [
{
"type": "mongo",
"url": {
"fromEnv": "TEST_LOAD_SERVICE_MONGO_URL"
},
"collection": "my-collection",
"type": "mapper",
"outputEvent": {
"key": "{{ issue.key }}",
"summary": "{{ issue.fields.summary }}",
"createdAt": "{{ issue.fields.created }}",
"description": "{{ issue.fields.description }}"
}
}
],
"sinks": [
{
"type": "mongo",
"url": {
"fromEnv": "TEST_LOAD_SERVICE_MONGO_URL"
},
"idField": "key"
"collection": "my-collection"
}
]
}
Expand Down
Loading