Skip to content

Commit

Permalink
fix example
Browse files Browse the repository at this point in the history
  • Loading branch information
ryota0624 committed Jan 11, 2025
1 parent 50a0528 commit ea2780f
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 83 deletions.
248 changes: 195 additions & 53 deletions examples/persistence-psql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/asynkron/protoactor-go/persistence/postgresql"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/testcontainers/testcontainers-go/modules/postgres"
"google.golang.org/protobuf/reflect/protoregistry"
"log"
"time"

Expand All @@ -21,101 +22,153 @@ import (
"google.golang.org/protobuf/types/descriptorpb"
)

type protoMsg struct {
type Message struct {
state string
set bool
value string
}

func (p *protoMsg) Reset() {}
func (p *protoMsg) String() string { return p.state }
func (p *protoMsg) ProtoMessage() {}
func (m *Message) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%s"`, m.state)), nil
}

type (
Message struct{ protoMsg }
Snapshot struct{ protoMsg }
)
func (m *Message) UnmarshalJSON(bytes []byte) error {
m.state = string(bytes)
m.set = true
return nil
}

func (m *Message) Zero() protoreflect.Message {
return &Message{}
}

func (m *Message) Reset() {}
func (m *Message) String() string { return m.state }
func (m *Message) ProtoMessage() {}

func (m *protoMsg) ProtoReflect() protoreflect.Message { return (*message)(m) }
type Snapshot struct {
state string
set bool
value string
}

type message protoMsg
func (s *Snapshot) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%s"`, s.state)), nil
}

func (s *Snapshot) UnmarshalJSON(bytes []byte) error {
s.state = string(bytes)
s.set = true
return nil
}

func (s *Snapshot) Zero() protoreflect.Message {
return &Snapshot{}
}

func (s *Snapshot) Reset() {}
func (s *Snapshot) String() string { return s.state }
func (s *Snapshot) ProtoMessage() {}

func (m *Message) ProtoReflect() protoreflect.Message { return m }
func (s *Snapshot) ProtoReflect() protoreflect.Message { return s }

type messageType struct{}

func (messageType) New() protoreflect.Message { return &message{} }
func (messageType) Zero() protoreflect.Message { return (*message)(nil) }
func (messageType) Descriptor() protoreflect.MessageDescriptor { return fileDesc.Messages().Get(0) }
func (messageType) New() protoreflect.Message { return &Message{} }
func (messageType) Zero() protoreflect.Message { return (*Message)(nil) }
func (messageType) Descriptor() protoreflect.MessageDescriptor {
return fileDescForMessage.Messages().Get(0)
}

func (m *message) New() protoreflect.Message { return &message{} }
func (m *message) Descriptor() protoreflect.MessageDescriptor { return fileDesc.Messages().Get(0) }
func (m *message) Type() protoreflect.MessageType { return messageType{} }
func (m *message) Interface() protoreflect.ProtoMessage { return (*protoMsg)(m) }
func (m *message) ProtoMethods() *protoiface.Methods { return nil }
type snapshotType struct{}

var fieldDescS = fileDesc.Messages().Get(0).Fields().Get(0)
func (snapshotType) New() protoreflect.Message { return &Snapshot{} }
func (snapshotType) Zero() protoreflect.Message { return (*Snapshot)(nil) }
func (snapshotType) Descriptor() protoreflect.MessageDescriptor {
return fileDescForSnapshot.Messages().Get(0)
}

func (m *Message) New() protoreflect.Message { return &Message{} }
func (m *Message) Descriptor() protoreflect.MessageDescriptor {
return fileDescForMessage.Messages().Get(0)
}
func (m *Message) Type() protoreflect.MessageType { return messageType{} }
func (m *Message) Interface() protoreflect.ProtoMessage { return m }
func (m *Message) ProtoMethods() *protoiface.Methods { return nil }

func (s *Snapshot) New() protoreflect.Message { return &Snapshot{} }
func (s *Snapshot) Descriptor() protoreflect.MessageDescriptor {
return fileDescForSnapshot.Messages().Get(0)
}
func (s *Snapshot) Type() protoreflect.MessageType { return snapshotType{} }
func (s *Snapshot) Interface() protoreflect.ProtoMessage { return s }
func (s *Snapshot) ProtoMethods() *protoiface.Methods { return nil }

func (m *message) Range(f func(protoreflect.FieldDescriptor, protoreflect.Value) bool) {
var fieldDescSForMessage = fileDescForMessage.Messages().Get(0).Fields().Get(0)

func (m *Message) Range(f func(protoreflect.FieldDescriptor, protoreflect.Value) bool) {
if m.set {
f(fieldDescS, protoreflect.ValueOf(m.value))
f(fieldDescSForMessage, protoreflect.ValueOf(m.value))
}
}

func (m *message) Has(fd protoreflect.FieldDescriptor) bool {
if fd == fieldDescS {
func (m *Message) Has(fd protoreflect.FieldDescriptor) bool {
if fd == fieldDescSForMessage {
return m.set
}
panic("invalid field descriptor")
}

func (m *message) Clear(fd protoreflect.FieldDescriptor) {
if fd == fieldDescS {
func (m *Message) Clear(fd protoreflect.FieldDescriptor) {
if fd == fieldDescSForMessage {
m.value = ""
m.set = false
return
}
panic("invalid field descriptor")
}

func (m *message) Get(fd protoreflect.FieldDescriptor) protoreflect.Value {
if fd == fieldDescS {
func (m *Message) Get(fd protoreflect.FieldDescriptor) protoreflect.Value {
if fd == fieldDescSForMessage {
return protoreflect.ValueOf(m.value)
}
panic("invalid field descriptor")
}

func (m *message) Set(fd protoreflect.FieldDescriptor, v protoreflect.Value) {
if fd == fieldDescS {
func (m *Message) Set(fd protoreflect.FieldDescriptor, v protoreflect.Value) {
if fd == fieldDescSForMessage {
m.value = v.String()
m.set = true
return
}
panic("invalid field descriptor")
}

func (m *message) Mutable(protoreflect.FieldDescriptor) protoreflect.Value {
func (m *Message) Mutable(protoreflect.FieldDescriptor) protoreflect.Value {
panic("invalid field descriptor")
}

func (m *message) NewField(protoreflect.FieldDescriptor) protoreflect.Value {
func (m *Message) NewField(protoreflect.FieldDescriptor) protoreflect.Value {
panic("invalid field descriptor")
}

func (m *message) WhichOneof(protoreflect.OneofDescriptor) protoreflect.FieldDescriptor {
func (m *Message) WhichOneof(protoreflect.OneofDescriptor) protoreflect.FieldDescriptor {
panic("invalid oneof descriptor")
}

func (m *message) GetUnknown() protoreflect.RawFields { return nil }
func (m *Message) GetUnknown() protoreflect.RawFields { return nil }

// func (m *message) SetUnknown(protoreflect.RawFields) { return }
func (m *message) SetUnknown(protoreflect.RawFields) {}
func (m *Message) SetUnknown(protoreflect.RawFields) {}

func (m *message) IsValid() bool {
func (m *Message) IsValid() bool {
return m != nil
}

var fileDesc = func() protoreflect.FileDescriptor {
var fileDescForMessage = func() protoreflect.FileDescriptor {
p := &descriptorpb.FileDescriptorProto{}
if err := prototext.Unmarshal([]byte(descriptorText), p); err != nil {
if err := prototext.Unmarshal([]byte(descriptorTextForMessage), p); err != nil {
panic(err)
}
file, err := protodesc.NewFile(p, nil)
Expand All @@ -125,11 +178,11 @@ var fileDesc = func() protoreflect.FileDescriptor {
return file
}()

const descriptorText = `
const descriptorTextForMessage = `
name: "internal/testprotos/irregular/irregular.proto"
package: "goproto.proto.thirdparty"
message_type {
name: "IrregularMessage"
name: "Message"
field {
name: "s"
number: 1
Expand All @@ -143,13 +196,96 @@ const descriptorText = `
}
`

type AberrantMessage int
var fieldDescSForSnapshot = fileDescForSnapshot.Messages().Get(0).Fields().Get(0)

func (s *Snapshot) Range(f func(protoreflect.FieldDescriptor, protoreflect.Value) bool) {
if s.set {
f(fieldDescSForSnapshot, protoreflect.ValueOf(s.value))
}
}

func (s *Snapshot) Has(fd protoreflect.FieldDescriptor) bool {
if fd == fieldDescSForSnapshot {
return s.set
}
panic("invalid field descriptor")
}

func (s *Snapshot) Clear(fd protoreflect.FieldDescriptor) {
if fd == fieldDescSForSnapshot {
s.value = ""
s.set = false
return
}
panic("invalid field descriptor")
}

func (s *Snapshot) Get(fd protoreflect.FieldDescriptor) protoreflect.Value {
if fd == fieldDescSForSnapshot {
return protoreflect.ValueOf(s.value)
}
panic("invalid field descriptor")
}

func (m AberrantMessage) ProtoMessage() {}
func (m AberrantMessage) Reset() {}
func (m AberrantMessage) String() string { return "" }
func (m AberrantMessage) Marshal() ([]byte, error) { return nil, nil }
func (m AberrantMessage) Unmarshal([]byte) error { return nil }
func (s *Snapshot) Set(fd protoreflect.FieldDescriptor, v protoreflect.Value) {
if fd == fieldDescSForSnapshot {
s.value = v.String()
s.set = true
return
}
panic("invalid field descriptor")
}

func (s *Snapshot) Mutable(protoreflect.FieldDescriptor) protoreflect.Value {
panic("invalid field descriptor")
}

func (s *Snapshot) NewField(protoreflect.FieldDescriptor) protoreflect.Value {
panic("invalid field descriptor")
}

func (s *Snapshot) WhichOneof(protoreflect.OneofDescriptor) protoreflect.FieldDescriptor {
panic("invalid oneof descriptor")
}

func (s *Snapshot) GetUnknown() protoreflect.RawFields { return nil }

// func (m *message) SetUnknown(protoreflect.RawFields) { return }
func (s *Snapshot) SetUnknown(protoreflect.RawFields) {}

func (s *Snapshot) IsValid() bool {
return s != nil
}

var fileDescForSnapshot = func() protoreflect.FileDescriptor {
p := &descriptorpb.FileDescriptorProto{}
if err := prototext.Unmarshal([]byte(descriptorTextForSnapshot), p); err != nil {
panic(err)
}
file, err := protodesc.NewFile(p, nil)
if err != nil {
panic(err)
}
return file
}()

const descriptorTextForSnapshot = `
name: "internal/testprotos/irregular/irregular.proto"
package: "goproto.proto.thirdparty"
message_type {
name: "Snapshot"
field {
name: "s"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "s"
}
}
options {
go_package: "google.golang.org/protobuf/internal/testprotos/irregular"
}
`

type Actor struct {
persistence.Mixin
Expand All @@ -162,7 +298,7 @@ func (a *Actor) Receive(ctx actor.Context) {
log.Println("actor started")
case *persistence.RequestSnapshot:
log.Printf("snapshot internal state '%v'", a.state)
a.PersistSnapshot(&Snapshot{protoMsg: protoMsg{state: a.state}})
a.PersistSnapshot(&Snapshot{state: a.state})
case *Snapshot:
a.state = msg.state
log.Printf("recovered from snapshot, internal state changed to '%v'", a.state)
Expand All @@ -180,6 +316,12 @@ func (a *Actor) Receive(ctx actor.Context) {
}

func main() {
if err := protoregistry.GlobalTypes.RegisterMessage(&Message{}); err != nil {
panic(err)
}
if err := protoregistry.GlobalTypes.RegisterMessage(&Snapshot{}); err != nil {
panic(err)
}
ctx := context2.Background()
dbName := "users"
dbUser := "user"
Expand Down Expand Up @@ -227,8 +369,8 @@ func main() {
props := actor.PropsFromProducer(func() actor.Actor { return &Actor{} },
actor.WithReceiverMiddleware(persistence.Using(provider)))
pid, _ := rootContext.SpawnNamed(props, "persistent")
rootContext.Send(pid, &Message{protoMsg: protoMsg{state: "state4"}})
rootContext.Send(pid, &Message{protoMsg: protoMsg{state: "state5"}})
rootContext.Send(pid, &Message{state: "state4"})
rootContext.Send(pid, &Message{state: "state5"})

rootContext.PoisonFuture(pid).Wait()
fmt.Printf("*** restart ***\n")
Expand Down Expand Up @@ -260,11 +402,11 @@ create table if not exists event_journals
_, err = pool.Exec(context2.Background(), `
create table if not exists snapshots
(
actor_name varchar(255) not null,
snapshot_index bigint not null,
snapshot json not null,
message_type varchar(255) not null,
primary key (id)
actor_name varchar(255) not null,
snapshot_index bigint not null,
snapshot json not null,
message_type varchar(255) not null,
constraint snapshots_pk primary key (actor_name)
);
`)
return err
Expand Down
Loading

0 comments on commit ea2780f

Please sign in to comment.