-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathchainipc.go
134 lines (116 loc) · 3.31 KB
/
chainipc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package ipcs
import (
"fmt"
"path/filepath"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/wrappers"
)
const (
// DefaultBaseURL can be used as a reasonable default value for the base URL
DefaultBaseURL = "/tmp"
ipcIdentifierPrefix = "ipc"
ipcConsensusIdentifier = "consensus"
ipcDecisionsIdentifier = "decisions"
)
type context struct {
log logging.Logger
networkID uint32
path string
}
// ChainIPCs maintains IPCs for a set of chains
type ChainIPCs struct {
context
chains map[ids.ID]*EventSockets
blockAcceptorGroup snow.AcceptorGroup
txAcceptorGroup snow.AcceptorGroup
vertexAcceptorGroup snow.AcceptorGroup
}
// NewChainIPCs creates a new *ChainIPCs that writes consensus and decision
// events to IPC sockets
func NewChainIPCs(
log logging.Logger,
path string,
networkID uint32,
blockAcceptorGroup snow.AcceptorGroup,
txAcceptorGroup snow.AcceptorGroup,
vertexAcceptorGroup snow.AcceptorGroup,
defaultChainIDs []ids.ID,
) (*ChainIPCs, error) {
cipcs := &ChainIPCs{
context: context{
log: log,
networkID: networkID,
path: path,
},
chains: make(map[ids.ID]*EventSockets),
blockAcceptorGroup: blockAcceptorGroup,
txAcceptorGroup: txAcceptorGroup,
vertexAcceptorGroup: vertexAcceptorGroup,
}
for _, chainID := range defaultChainIDs {
if _, err := cipcs.Publish(chainID); err != nil {
return nil, err
}
}
return cipcs, nil
}
// Publish creates a set of eventSockets for the given chainID
func (cipcs *ChainIPCs) Publish(chainID ids.ID) (*EventSockets, error) {
if es, ok := cipcs.chains[chainID]; ok {
cipcs.log.Info("returning existing event sockets",
zap.Stringer("blockchainID", chainID),
)
return es, nil
}
es, err := newEventSockets(
cipcs.context,
chainID,
cipcs.blockAcceptorGroup,
cipcs.txAcceptorGroup,
cipcs.vertexAcceptorGroup,
)
if err != nil {
cipcs.log.Error("can't create ipcs",
zap.Error(err),
)
return nil, err
}
cipcs.chains[chainID] = es
cipcs.log.Info("created IPC sockets",
zap.Stringer("blockchainID", chainID),
zap.String("consensusURL", es.ConsensusURL()),
zap.String("decisionsURL", es.DecisionsURL()),
)
return es, nil
}
// Unpublish stops the eventSocket for the given chain if it exists. It returns
// whether or not the socket existed and errors when trying to close it
func (cipcs *ChainIPCs) Unpublish(chainID ids.ID) (bool, error) {
chainIPCs, ok := cipcs.chains[chainID]
if !ok {
return false, nil
}
delete(cipcs.chains, chainID)
return true, chainIPCs.stop()
}
// GetPublishedBlockchains returns the chains that are currently being published
func (cipcs *ChainIPCs) GetPublishedBlockchains() []ids.ID {
return maps.Keys(cipcs.chains)
}
func (cipcs *ChainIPCs) Shutdown() error {
cipcs.log.Info("shutting down chain IPCs")
errs := wrappers.Errs{}
for _, ch := range cipcs.chains {
errs.Add(ch.stop())
}
return errs.Err
}
func ipcURL(ctx context, chainID ids.ID, eventType string) string {
return filepath.Join(ctx.path, fmt.Sprintf("%d-%s-%s", ctx.networkID, chainID.String(), eventType))
}