Skip to content

Commit

Permalink
One line new channel registration
Browse files Browse the repository at this point in the history
  • Loading branch information
mpiraux committed Feb 5, 2019
1 parent bff07c1 commit 09d7088
Show file tree
Hide file tree
Showing 14 changed files with 58 additions and 91 deletions.
3 changes: 1 addition & 2 deletions agents/ack_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ func (a *AckAgent) Run(conn *Connection) {
a.DisableAcks = make(map[PNSpace]bool)
a.TotalDataAcked = make(map[PNSpace]uint64)

incomingPackets := make(chan interface{}, 1000)
conn.IncomingPackets.Register(incomingPackets)
incomingPackets := conn.IncomingPackets.RegisterNewChan(1000)

go func() {
defer a.Logger.Println("Agent terminated")
Expand Down
6 changes: 2 additions & 4 deletions agents/buffer_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ type BufferAgent struct {
func (a *BufferAgent) Run(conn *Connection) {
a.Init("BufferAgent", conn.OriginalDestinationCID)

uPChan := make(chan interface{}, 1000)
conn.UnprocessedPayloads.Register(uPChan)
eLChan := make(chan interface{}, 1000)
conn.EncryptionLevelsAvailable.Register(eLChan)
uPChan := conn.UnprocessedPayloads.RegisterNewChan(1000)
eLChan := conn.EncryptionLevelsAvailable.RegisterNewChan(1000)

unprocessedPayloads := make(map[EncryptionLevel][][]byte)
encryptionLevelsAvailable := make(map[EncryptionLevel]bool)
Expand Down
3 changes: 1 addition & 2 deletions agents/closing_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ type ClosingAgent struct {
func (a *ClosingAgent) Run (conn *Connection) {
a.Init("ClosingAgent", conn.OriginalDestinationCID)

outgoingPackets := make(chan interface{}, 1000)
conn.OutgoingPackets.Register(outgoingPackets)
outgoingPackets := conn.OutgoingPackets.RegisterNewChan(1000)

go func() {
defer a.Logger.Println("Agent terminated")
Expand Down
17 changes: 6 additions & 11 deletions agents/handshake_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
. "github.com/QUIC-Tracker/quic-tracker"
"github.com/dustin/go-broadcast"
"strings"
)

Expand All @@ -27,25 +26,22 @@ type HandshakeAgent struct {
BaseAgent
TLSAgent *TLSAgent
SocketAgent *SocketAgent
HandshakeStatus broadcast.Broadcaster //type: HandshakeStatus
HandshakeStatus Broadcaster //type: HandshakeStatus
IgnoreRetry bool
sendInitial chan bool
receivedRetry bool
}

func (a *HandshakeAgent) Run(conn *Connection) {
a.Init("HandshakeAgent", conn.OriginalDestinationCID)
a.HandshakeStatus = broadcast.NewBroadcaster(10)
a.HandshakeStatus = NewBroadcaster(10)
a.sendInitial = make(chan bool, 1)

incPackets := make(chan interface{}, 1000)
conn.IncomingPackets.Register(incPackets)
incPackets := conn.IncomingPackets.RegisterNewChan(1000)

outPackets := make(chan interface{}, 1000)
conn.OutgoingPackets.Register(outPackets)
outPackets := conn.OutgoingPackets.RegisterNewChan(1000)

tlsStatus := make(chan interface{}, 10)
a.TLSAgent.TLSStatus.Register(tlsStatus)
tlsStatus := a.TLSAgent.TLSStatus.RegisterNewChan(10)

socketStatus := make(chan interface{}, 10)
a.SocketAgent.SocketStatus.Register(socketStatus)
Expand Down Expand Up @@ -132,8 +128,7 @@ func (a *HandshakeAgent) Run(conn *Connection) {
}
}()

status := make(chan interface{}, 1)
a.HandshakeStatus.Register(status)
status := a.HandshakeStatus.RegisterNewChan(1)

go func() {
for {
Expand Down
28 changes: 11 additions & 17 deletions agents/http_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package agents

import (
"bytes"
"errors"
. "github.com/QUIC-Tracker/quic-tracker"
"github.com/QUIC-Tracker/quic-tracker/http3"
"github.com/davecgh/go-spew/spew"
"github.com/dustin/go-broadcast"
"math"
"errors"
)

type HTTPResponse struct {
Expand Down Expand Up @@ -41,8 +40,8 @@ type HTTPAgent struct {
conn *Connection
QPACK QPACKAgent
QPACKEncoderOpts uint32
HTTPResponseReceived broadcast.Broadcaster //type: HTTPResponse
FrameReceived broadcast.Broadcaster //type: HTTPFrameReceived
HTTPResponseReceived Broadcaster //type: HTTPResponse
FrameReceived Broadcaster //type: HTTPFrameReceived
streamData chan streamData
streamDataBuffer map[uint64]*bytes.Buffer
responseBuffer map[uint64]*HTTPResponse
Expand All @@ -61,19 +60,14 @@ func (a *HTTPAgent) Run(conn *Connection) {
a.QPACK = QPACKAgent{EncoderStreamID: 6, DecoderStreamID: 10}
a.QPACK.Run(conn)

a.HTTPResponseReceived = broadcast.NewBroadcaster(1000)
a.FrameReceived = broadcast.NewBroadcaster(1000)

frameReceived := make(chan interface{}, 1000)
a.FrameReceived.Register(frameReceived)
a.HTTPResponseReceived = NewBroadcaster(1000)
a.FrameReceived = NewBroadcaster(1000)

incomingPackets := make(chan interface{}, 1000)
conn.IncomingPackets.Register(incomingPackets)
frameReceived := a.FrameReceived.RegisterNewChan(1000)
incomingPackets := conn.IncomingPackets.RegisterNewChan(1000)

encodedHeaders := make(chan interface{}, 1000)
a.QPACK.EncodedHeaders.Register(encodedHeaders)
decodedHeaders := make(chan interface{}, 1000)
a.QPACK.DecodedHeaders.Register(decodedHeaders)
encodedHeaders := a.QPACK.EncodedHeaders.RegisterNewChan(1000)
decodedHeaders := a.QPACK.DecodedHeaders.RegisterNewChan(1000)

a.controlStreamID = uint64(2)
a.peerControlStreamID = HTTPNoStream
Expand Down Expand Up @@ -229,8 +223,7 @@ func (a *HTTPAgent) SendRequest(path, method, authority string, headers map[stri

streamID := a.nextRequestStream
stream := a.conn.Streams.Get(streamID)
streamChan := make(chan interface{}, 1000)
stream.ReadChan.Register(streamChan)
streamChan := stream.ReadChan.RegisterNewChan(1000)
a.streamDataBuffer[streamID] = new(bytes.Buffer)
response := &HTTPResponse{StreamID: streamID}
a.responseBuffer[streamID] = response
Expand All @@ -254,6 +247,7 @@ func (a *HTTPAgent) SendRequest(path, method, authority string, headers map[stri
}

}
stream.ReadChan.Unregister(streamChan)
}()

a.QPACK.EncodeHeaders <- DecodedHeaders{streamID, hdrs}
Expand Down
3 changes: 1 addition & 2 deletions agents/parse_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ func (a *ParsingAgent) Run(conn *Connection) {
a.conn = conn
a.Init("ParsingAgent", conn.OriginalDestinationCID)

incomingPayloads := make(chan interface{})
a.conn.IncomingPayloads.Register(incomingPayloads)
incomingPayloads := a.conn.IncomingPayloads.RegisterNewChan(1000)

go func() {
defer a.Logger.Println("Agent terminated")
Expand Down
12 changes: 5 additions & 7 deletions agents/qpack_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package agents

import (
. "github.com/QUIC-Tracker/quic-tracker"
"github.com/dustin/go-broadcast"
"github.com/mpiraux/ls-qpack-go"
"math"
)
Expand All @@ -27,9 +26,9 @@ type QPACKAgent struct {
EncoderStreamID uint64
DecoderStreamID uint64
DecodeHeaders chan EncodedHeaders
DecodedHeaders broadcast.Broadcaster //type: DecodedHeaders
DecodedHeaders Broadcaster //type: DecodedHeaders
EncodeHeaders chan DecodedHeaders
EncodedHeaders broadcast.Broadcaster //type: EncodedHeaders
EncodedHeaders Broadcaster //type: EncodedHeaders
encoder *ls_qpack_go.QPackEncoder
decoder *ls_qpack_go.QPackDecoder
}
Expand All @@ -40,13 +39,12 @@ const (

func (a *QPACKAgent) Run(conn *Connection) {
a.Init("QPACKAgent", conn.OriginalDestinationCID)
a.DecodedHeaders = broadcast.NewBroadcaster(1000)
a.EncodedHeaders = broadcast.NewBroadcaster(1000)
a.DecodedHeaders = NewBroadcaster(1000)
a.EncodedHeaders = NewBroadcaster(1000)
a.DecodeHeaders = make(chan EncodedHeaders, 1000)
a.EncodeHeaders = make(chan DecodedHeaders, 1000)

incomingPackets := make(chan interface{}, 1000)
conn.IncomingPackets.Register(incomingPackets)
incomingPackets := conn.IncomingPackets.RegisterNewChan(1000)

a.encoder = ls_qpack_go.NewQPackEncoder(false)
a.decoder = ls_qpack_go.NewQPackDecoder(1024, 100)
Expand Down
11 changes: 3 additions & 8 deletions agents/recovery_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,9 @@ func (a *RecoveryAgent) Run(conn *Connection) {
}
retransmissionTicker := time.NewTicker(100 * time.Millisecond)

incomingPackets := make(chan interface{}, 1000)
conn.IncomingPackets.Register(incomingPackets)

outgoingPackets := make(chan interface{}, 1000)
conn.OutgoingPackets.Register(outgoingPackets)

eLAvailable := make(chan interface{}, 1000)
conn.EncryptionLevelsAvailable.Register(eLAvailable)
incomingPackets := conn.IncomingPackets.RegisterNewChan(1000)
outgoingPackets := conn.OutgoingPackets.RegisterNewChan(1000)
eLAvailable := conn.EncryptionLevelsAvailable.RegisterNewChan(10)

go func() {
defer a.Logger.Println("Agent terminated")
Expand Down
7 changes: 2 additions & 5 deletions agents/rtt_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@ func (a *RTTAgent) Run(conn *Connection) {

a.LargestSentPackets = make(map[PNSpace]PacketNumber)

incomingPackets := make(chan interface{}, 1000)
conn.IncomingPackets.Register(incomingPackets)

outgoingPackets := make(chan interface{}, 1000)
conn.OutgoingPackets.Register(outgoingPackets)
incomingPackets := conn.IncomingPackets.RegisterNewChan(1000)
outgoingPackets := conn.OutgoingPackets.RegisterNewChan(1000)

go func() { // TODO: Support ACK_ECN
defer a.Logger.Println("Agent terminated")
Expand Down
6 changes: 2 additions & 4 deletions agents/send_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ type SendingAgent struct {
func (a *SendingAgent) Run(conn *Connection) {
a.Init("SendingAgent", conn.OriginalDestinationCID)

frameQueue := make(chan interface{}, 1000)
conn.FrameQueue.Register(frameQueue)
newEncryptionLevelAvailable := make(chan interface{}, 10)
conn.EncryptionLevelsAvailable.Register(newEncryptionLevelAvailable)
frameQueue := conn.FrameQueue.RegisterNewChan(1000)
newEncryptionLevelAvailable := conn.EncryptionLevelsAvailable.RegisterNewChan(10)

encryptionLevels := []EncryptionLevel{EncryptionLevelInitial, EncryptionLevel0RTT, EncryptionLevelHandshake, EncryptionLevel1RTT, EncryptionLevelBest, EncryptionLevelBestAppData}
encryptionLevelsAvailable := map[DirectionalEncryptionLevel]bool {
Expand Down
9 changes: 4 additions & 5 deletions agents/socket_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
. "github.com/QUIC-Tracker/quic-tracker"
"github.com/QUIC-Tracker/quic-tracker/compat"
"github.com/dustin/go-broadcast"
"syscall"
"unsafe"
)
Expand All @@ -27,15 +26,15 @@ type SocketAgent struct {
ecn bool
TotalDataReceived int
DatagramsReceived int
SocketStatus broadcast.Broadcaster //type: err
ECNStatus broadcast.Broadcaster //type: ECNStatus
SocketStatus Broadcaster //type: err
ECNStatus Broadcaster //type: ECNStatus
}

func (a *SocketAgent) Run(conn *Connection) {
a.Init("SocketAgent", conn.OriginalDestinationCID)
a.conn = conn
a.SocketStatus = broadcast.NewBroadcaster(10)
a.ECNStatus = broadcast.NewBroadcaster(1000)
a.SocketStatus = NewBroadcaster(10)
a.ECNStatus = NewBroadcaster(1000)
recChan := make(chan []byte)

go func() {
Expand Down
14 changes: 6 additions & 8 deletions agents/tls_agent.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package agents

import (
. "github.com/QUIC-Tracker/quic-tracker"
"encoding/hex"
"github.com/dustin/go-broadcast"
. "github.com/QUIC-Tracker/quic-tracker"
)

type TLSStatus struct {
Expand All @@ -17,21 +16,20 @@ type TLSStatus struct {
// DisableFrameSending. The TLSAgent will broadcast when new encryption or decryption levels are available.
type TLSAgent struct {
BaseAgent
TLSStatus broadcast.Broadcaster //type: TLSStatus
ResumptionTicket broadcast.Broadcaster //type: []byte
TLSStatus Broadcaster //type: TLSStatus
ResumptionTicket Broadcaster //type: []byte
DisableFrameSending bool
}

func (a *TLSAgent) Run(conn *Connection) {
a.Init("TLSAgent", conn.OriginalDestinationCID)
a.TLSStatus = broadcast.NewBroadcaster(10)
a.ResumptionTicket = broadcast.NewBroadcaster(10)
a.TLSStatus = NewBroadcaster(10)
a.ResumptionTicket = NewBroadcaster(10)

encryptionLevels := []DirectionalEncryptionLevel{{EncryptionLevelHandshake, false}, {EncryptionLevelHandshake, true}, {EncryptionLevel1RTT, false}, {EncryptionLevel1RTT, true}}
encryptionLevelsAvailable := make(map[DirectionalEncryptionLevel]bool)

incomingPackets := make(chan interface{}, 1000)
conn.IncomingPackets.Register(incomingPackets)
incomingPackets := conn.IncomingPackets.RegisterNewChan(1000)

cryptoChans := map[PNSpace]chan interface{}{
PNSpaceInitial: make(chan interface{}, 1000),
Expand Down
25 changes: 12 additions & 13 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
. "github.com/QUIC-Tracker/quic-tracker/lib"
"github.com/dustin/go-broadcast"
"github.com/mpiraux/pigotls"
"log"
"net"
Expand Down Expand Up @@ -36,12 +35,12 @@ type Connection struct {
CryptoStreams CryptoStreams // TODO: It should be a parent class without closing states
Streams Streams

IncomingPackets broadcast.Broadcaster //type: Packet
OutgoingPackets broadcast.Broadcaster //type: Packet
IncomingPayloads broadcast.Broadcaster //type: []byte
UnprocessedPayloads broadcast.Broadcaster //type: UnprocessedPayload
EncryptionLevelsAvailable broadcast.Broadcaster //type: DirectionalEncryptionLevel
FrameQueue broadcast.Broadcaster //type: QueuedFrame
IncomingPackets Broadcaster //type: Packet
OutgoingPackets Broadcaster //type: Packet
IncomingPayloads Broadcaster //type: []byte
UnprocessedPayloads Broadcaster //type: UnprocessedPayload
EncryptionLevelsAvailable Broadcaster //type: DirectionalEncryptionLevel
FrameQueue Broadcaster //type: QueuedFrame

OriginalDestinationCID ConnectionID
SourceCID ConnectionID
Expand Down Expand Up @@ -277,12 +276,12 @@ func NewConnection(serverName string, version uint32, ALPN string, SCID []byte,

c.ResumptionTicket = resumptionTicket

c.IncomingPackets = broadcast.NewBroadcaster(1000)
c.OutgoingPackets = broadcast.NewBroadcaster(1000)
c.IncomingPayloads = broadcast.NewBroadcaster(1000)
c.UnprocessedPayloads = broadcast.NewBroadcaster(1000)
c.EncryptionLevelsAvailable = broadcast.NewBroadcaster(10)
c.FrameQueue = broadcast.NewBroadcaster(1000)
c.IncomingPackets = NewBroadcaster(1000)
c.OutgoingPackets = NewBroadcaster(1000)
c.IncomingPayloads = NewBroadcaster(1000)
c.UnprocessedPayloads = NewBroadcaster(1000)
c.EncryptionLevelsAvailable = NewBroadcaster(10)
c.FrameQueue = NewBroadcaster(1000)

c.Logger = log.New(os.Stderr, fmt.Sprintf("[CID %s] ", hex.EncodeToString(c.OriginalDestinationCID)), log.Lshortfile)

Expand Down
5 changes: 2 additions & 3 deletions streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package quictracker

import (
"fmt"
"github.com/dustin/go-broadcast"
"math"
)

Expand Down Expand Up @@ -48,7 +47,7 @@ type Stream struct {
ReadData []byte
WriteData []byte

ReadChan broadcast.Broadcaster
ReadChan Broadcaster
MaxReadReceived uint64
gaps *byteIntervalList

Expand All @@ -63,7 +62,7 @@ type Stream struct {

func NewStream() *Stream {
s := new(Stream)
s.ReadChan = broadcast.NewBroadcaster(1024)
s.ReadChan = NewBroadcaster(1024)
s.readFeedback = make(chan interface{}, 1)
s.ReadChan.Register(s.readFeedback)
s.gaps = NewbyteIntervalList().Init()
Expand Down

0 comments on commit 09d7088

Please sign in to comment.