Skip to content

Commit

Permalink
Adding required event type to db and adjusting methods
Browse files Browse the repository at this point in the history
  • Loading branch information
L4B0MB4 committed Sep 18, 2024
1 parent 4297463 commit be5f5bf
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 48 deletions.
18 changes: 18 additions & 0 deletions pkg/client/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,24 @@ func NewEventSourcingHttpClient(urlStr string) (*EventSourcingHttpClient, error)
}

func (client *EventSourcingHttpClient) AddEvents(aggregateId string, events []models.Event) error {
if len(aggregateId) <= 0 {
return fmt.Errorf("AGGREGATEID EMPTY")
}
for _, event := range events {
if len(event.AggregateType) <= 0 {
return fmt.Errorf("AGGREGATETYPE EMPTY")
}
if len(event.Data) == 0 {
return fmt.Errorf("DATA EMPTY")
}
if len(event.Name) == 0 {
return fmt.Errorf("NAME EMPTY")
}
}
return client.AddEventsWithoutValidation(aggregateId, events)
}

func (client *EventSourcingHttpClient) AddEventsWithoutValidation(aggregateId string, events []models.Event) error {
bodyBytes, err := json.Marshal(events)
if err != nil {
log.Info().Err(err).Msg("Could not marshal events")
Expand Down
4 changes: 2 additions & 2 deletions pkg/integration_test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ func TestClientAddingEventsAndRetrievingThemFromServer(t *testing.T) {
client, httpHandler, db := setup()
defer teardown(httpHandler, db)

err := client.AddEvents("myaggregate4444", []models.Event{{Version: 1, Name: "asdasd", Data: []byte{0, 1, 2}}, {Version: 2, Name: "asdasd2", Data: []byte{1, 2, 3}}})
err := client.AddEventsWithoutValidation("myaggregate4444", []models.Event{{Version: 1, Name: "asdasd", Data: []byte{0, 1, 2}, AggregateType: "mytype"}, {Version: 2, Name: "asdasd2", Data: []byte{1, 2, 3}, AggregateType: "mytype"}})
if err != nil {
log.Error().Err(err).Msg("ERROR ADDING EVENTS")
t.Fail()
}
err = client.AddEvents("differentaggregate", []models.Event{{Version: 7, Name: "asdasd", Data: []byte{0, 1, 2}}, {Version: 8, Name: "asdasd2", Data: []byte{1, 2, 3}}})
err = client.AddEventsWithoutValidation("differentaggregate", []models.Event{{Version: 7, Name: "asdasd", Data: []byte{0, 1, 2}, AggregateType: "mytype"}, {Version: 8, Name: "asdasd2", Data: []byte{1, 2, 3}, AggregateType: "mytype"}})
if err != nil {
log.Error().Err(err).Msg("ERROR ADDING EVENTS FOR SECOND AGGREGATE")
t.Fail()
Expand Down
9 changes: 5 additions & 4 deletions pkg/models/event.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package models

type Event struct {
Version int64 `json:"version" binding:"required"`
Name string `json:"name" binding:"required"`
Data []byte `json:"data" binding:"required"`
AggregateId string `json:"-"`
Version int64 `json:"version" binding:"required"`
Name string `json:"name" binding:"required"`
Data []byte `json:"data" binding:"required"`
AggregateId string `json:"-"`
AggregateType string `json:"aggregateType" binding:"required"`
}
14 changes: 7 additions & 7 deletions pkg/store/event_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,28 +81,28 @@ func (e *EventRepository) addEvent(tx *sql.Tx, event *eventEntity) error {
}

stmtAgg, err := e.store.Prepare(`
INSERT INTO aggregate_state(id,version_0, version_1)
VALUES (?,?,?)
INSERT INTO aggregate_state(id,type, version_0, version_1)
VALUES (?,?,?,?)
`)
if err != nil {
log.Info().Err(err).Msg("Preparing insert statement for events table")
return err
}
defer stmt.Close()

_, err = tx.Stmt(stmtAgg).Exec(event.AggregateId, v0, v1)
_, err = tx.Stmt(stmtAgg).Exec(event.AggregateId, event.AggregateType, v0, v1)
if err != nil {
return err
}

return nil
}

func (e *EventRepository) GetEventsForAggregate(aggregateType string) ([]models.Event, error) {
func (e *EventRepository) GetEventsForAggregate(aggregateId string) ([]models.Event, error) {

// Prepare the SQL query
query := `
SELECT events.Name, events.version_0, events.version_1, events.data,events.aggregateId
SELECT events.Name, events.version_0, events.version_1, events.data,events.aggregateId,aggregate_state.type
FROM events
JOIN aggregate_state
ON events.aggregateId = aggregate_state.id
Expand All @@ -120,7 +120,7 @@ func (e *EventRepository) GetEventsForAggregate(aggregateType string) ([]models.
defer stmt.Close()

// Execute the query
rows, err := stmt.Query(aggregateType)
rows, err := stmt.Query(aggregateId)
if err != nil {
log.Info().Err(err).Msg("Error running query statement")
return nil, errors.New("COULD NOT QUERY EVENTS")
Expand All @@ -136,7 +136,7 @@ func (e *EventRepository) GetEventsForAggregate(aggregateType string) ([]models.

var v0 int32
var v1 int32
err = rows.Scan(&event.Name, &v0, &v1, &event.Data, &event.AggregateId)
err = rows.Scan(&event.Name, &v0, &v1, &event.Data, &event.AggregateId, &event.AggregateType)
if err != nil {
log.Info().Err(err).Msg("Error scanning rows")
return nil, errors.New("COULD NOT RETRIEVE EVENT")
Expand Down
67 changes: 37 additions & 30 deletions pkg/store/event_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ func TestAddEventSuccessful(t *testing.T) {
}
r := store.NewEventRepository(conn)
ev := models.Event{
Version: 1,
Name: "testevent",
Data: []byte{0, 1},
AggregateId: "anyaggregatetype",
Version: 1,
Name: "testevent",
Data: []byte{0, 1},
AggregateId: "anyaggregateId",
AggregateType: "aggregateType",
}
err = r.AddEvents([]models.Event{ev})
if err != nil {
Expand Down Expand Up @@ -77,10 +78,11 @@ func TestAddEventDuplicate(t *testing.T) {
}
r := store.NewEventRepository(conn)
ev := models.Event{
Version: 1,
Name: "testevent",
Data: []byte{0, 1},
AggregateId: "anyaggregatetype",
Version: 1,
Name: "testevent",
Data: []byte{0, 1},
AggregateId: "anyaggregateId",
AggregateType: "aggregateType",
}
err = r.AddEvents([]models.Event{ev})
if err != nil {
Expand All @@ -104,10 +106,11 @@ func TestAddTwoFollowingEvents(t *testing.T) {
}
r := store.NewEventRepository(conn)
ev := models.Event{
Version: 1,
Name: "testevent",
Data: []byte{0, 1},
AggregateId: "anyaggregatetype",
Version: 1,
Name: "testevent",
Data: []byte{0, 1},
AggregateId: "anyaggregateId",
AggregateType: "aggregateType",
}
err = r.AddEvents([]models.Event{ev})
if err != nil {
Expand All @@ -130,7 +133,7 @@ func TestAddThreeEventsOfTwoAggregates(t *testing.T) {
t.Fail()
}
r := store.NewEventRepository(conn)
oldAggType := "anyaggregatetype"
oldAggType := "anyaggregateId"
ev := models.Event{
Version: 1,
Name: "testevent",
Expand All @@ -141,7 +144,7 @@ func TestAddThreeEventsOfTwoAggregates(t *testing.T) {
ev.Version++
r.AddEvents([]models.Event{ev})
ev.Version = 0
newAggType := "aggregatetype2"
newAggType := "aggregateId2"
ev.AggregateId = newAggType
r.AddEvents([]models.Event{ev})

Expand All @@ -167,16 +170,18 @@ func TestAddTwoFollowingEventsInOneArray(t *testing.T) {
}
r := store.NewEventRepository(conn)
ev := models.Event{
Version: 1,
Name: "testevent",
Data: []byte{0, 1},
AggregateId: "anyaggregatetype",
Version: 1,
Name: "testevent",
Data: []byte{0, 1},
AggregateId: "anyaggregateId",
AggregateType: "aggregateType",
}
ev1 := models.Event{
Version: 2,
Name: "testevent",
Data: []byte{0, 1},
AggregateId: "anyaggregatetype",
Version: 2,
Name: "testevent",
Data: []byte{0, 1},
AggregateId: "anyaggregateId",
AggregateType: "aggregateType",
}
err = r.AddEvents([]models.Event{ev, ev1})
if err != nil {
Expand Down Expand Up @@ -212,16 +217,18 @@ func TestAddTwoEventsWithSameVersionInOneArray(t *testing.T) {
}
r := store.NewEventRepository(conn)
ev := models.Event{
Version: 1,
Name: "testevent",
Data: []byte{0, 1},
AggregateId: "anyaggregatetype",
Version: 1,
Name: "testevent",
Data: []byte{0, 1},
AggregateId: "anyaggregateId",
AggregateType: "aggregateType",
}
ev1 := models.Event{
Version: 1,
Name: "othervent",
Data: []byte{0, 1},
AggregateId: "anyaggregatetype",
Version: 1,
Name: "othervent",
Data: []byte{0, 1},
AggregateId: "anyaggregateId",
AggregateType: "aggregateType",
}
err = r.AddEvents([]models.Event{ev, ev1})
if err == nil {
Expand Down
27 changes: 22 additions & 5 deletions pkg/store/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ func (d *DatabaseConnection) SetUp() {
if createAggregateStateTable(db) != nil {
return
}
if createAggregateTableIndex(db) != nil {
if createAggregateTableIdIndex(db) != nil {
return
}
if createAggregateSnapshotTable(db) != nil {
if createAggregateTableTypeIndex(db) != nil {
return
}
d.db = db
Expand Down Expand Up @@ -93,7 +93,7 @@ func createEventTableIndex(db *sql.DB) error {

func createAggregateStateTable(db *sql.DB) error {
//type = name of the aggregate
stmt, err := db.Prepare("CREATE TABLE IF NOT EXISTS aggregate_state (id TEXT,version_0 INTEGER,version_1 INTEGER,UNIQUE(id,version_0, version_1) ON CONFLICT FAIL )")
stmt, err := db.Prepare("CREATE TABLE IF NOT EXISTS aggregate_state (id TEXT,type TEXT,version_0 INTEGER,version_1 INTEGER,UNIQUE(id,version_0, version_1) ON CONFLICT FAIL )")
if err != nil {

log.Info().Err(err).Msg("Preparing statement for aggregate_state table")
Expand All @@ -108,7 +108,7 @@ func createAggregateStateTable(db *sql.DB) error {
return nil
}

func createAggregateTableIndex(db *sql.DB) error {
func createAggregateTableIdIndex(db *sql.DB) error {

stmt, err := db.Prepare("CREATE INDEX IF NOT EXISTS IX_aggregate_state__id ON aggregate_state(id);")
if err != nil {
Expand All @@ -124,7 +124,24 @@ func createAggregateTableIndex(db *sql.DB) error {
}
return nil
}
func createAggregateTableTypeIndex(db *sql.DB) error {

stmt, err := db.Prepare("CREATE INDEX IF NOT EXISTS IX_aggregate_state__typr ON aggregate_state(type);")
if err != nil {

log.Info().Err(err).Msg("Preparing statement for aggregate_state table")
return err
}
_, err = stmt.Exec()
if err != nil {

log.Info().Err(err).Msg("Creating index on aggregate_state table")
return err
}
return nil
}

/*
func createAggregateSnapshotTable(db *sql.DB) error {
stmt, err := db.Prepare("CREATE TABLE IF NOT EXISTS aggregate_snapshots (id TEXT PRIMARY KEY, name TEXT,version_0 INTEGER,version_1 INTEGER,UNIQUE(version_0, version_1) ON CONFLICT FAIL )")
if err != nil {
Expand All @@ -139,7 +156,7 @@ func createAggregateSnapshotTable(db *sql.DB) error {
return err
}
return nil
}
}*/

func (d *DatabaseConnection) GetDbConnection() (*sql.DB, error) {
if !d.initialized {
Expand Down

0 comments on commit be5f5bf

Please sign in to comment.