Skip to content

Commit

Permalink
Implements agents restarting and renables draft-17 negotiation
Browse files Browse the repository at this point in the history
  • Loading branch information
mpiraux committed Mar 21, 2019
1 parent c575da9 commit ccb2842
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 30 deletions.
4 changes: 3 additions & 1 deletion agents/ack_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ type AckAgent struct {
func (a *AckAgent) Run(conn *Connection) {
a.BaseAgent.Init("AckAgent", conn.OriginalDestinationCID)
a.FrameProducingAgent.InitFPA(conn)
a.DisableAcks = make(map[PNSpace]bool)
if a.DisableAcks == nil {
a.DisableAcks = make(map[PNSpace]bool)
}
a.TotalDataAcked = make(map[PNSpace]uint64)

incomingPackets := conn.IncomingPackets.RegisterNewChan(1000)
Expand Down
38 changes: 33 additions & 5 deletions agents/base_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,27 @@ type Agent interface {
Init(name string, SCID ConnectionID)
Run(conn *Connection)
Stop()
Restart()
Join()
}

type RequestFrameArgs struct {
availableSpace int
level EncryptionLevel
number PacketNumber
level EncryptionLevel
number PacketNumber
}

// All agents should embed this structure
type BaseAgent struct {
name string
Logger *log.Logger
close chan bool
close chan bool // true if should restart, false otherwise
closed chan bool
}

func (a *BaseAgent) Name() string { return a.name }

// All agents that embed this structure must call InitFPA() as soon as their Run() method is called
// All agents that embed this structure must call Init() as soon as their Run() method is called
func (a *BaseAgent) Init(name string, ODCID ConnectionID) {
a.name = name
a.Logger = log.New(os.Stderr, fmt.Sprintf("[%s/%s] ", hex.EncodeToString(ODCID), a.Name()), log.Lshortfile)
Expand All @@ -57,6 +58,14 @@ func (a *BaseAgent) Stop() {
}
}

func (a *BaseAgent) Restart() {
select {
case <-a.close:
default:
a.close <- true
}
}

func (a *BaseAgent) Join() {
<-a.closed
}
Expand Down Expand Up @@ -105,6 +114,25 @@ func AttachAgentsToConnection(conn *Connection, agents ...Agent) *ConnectionAgen
c.Add(a)
}

go func() {
for {
select {
case <-conn.ConnectionRestart:
conn.Logger.Printf("Restarting all agents\n")
for _, a := range agents {
a.Restart()
a.Join()
a.Run(conn)
}
conn.ConnectionRestart = make(chan bool, 1)
close(conn.ConnectionRestarted)
conn.Logger.Printf("Restarting all agents: done\n")
case <-conn.ConnectionClosed:
return
}
}
}()

return &c
}

Expand Down Expand Up @@ -132,7 +160,7 @@ func (c *ConnectionAgents) GetFrameProducingAgents() []FrameProducer {
return agents
}

func (c *ConnectionAgents) Stop(names... string) {
func (c *ConnectionAgents) Stop(names ...string) {
for _, n := range names {
c.Get(n).Stop()
c.Get(n).Join()
Expand Down
10 changes: 7 additions & 3 deletions agents/closing_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type ClosingAgent struct {
IdleTimeout *time.Timer
}

func (a *ClosingAgent) Run(conn *Connection) {
func (a *ClosingAgent) Run(conn *Connection) { // TODO: Observe incoming CC and AC
a.Init("ClosingAgent", conn.OriginalDestinationCID)
a.conn = conn
a.IdleDuration = time.Duration(a.conn.TLSTPHandler.IdleTimeout) * time.Second
Expand All @@ -27,7 +27,6 @@ func (a *ClosingAgent) Run(conn *Connection) {
go func() {
defer a.Logger.Println("Agent terminated")
defer close(a.closed)
defer close(a.conn.ConnectionClosed)

for {
select {
Expand All @@ -37,6 +36,7 @@ func (a *ClosingAgent) Run(conn *Connection) {
switch p := i.(type) {
case Framer:
if a.closing && (p.Contains(ConnectionCloseType) || p.Contains(ApplicationCloseType)) {
close(a.conn.ConnectionClosed)
return
}
}
Expand All @@ -46,8 +46,12 @@ func (a *ClosingAgent) Run(conn *Connection) {
case <-a.IdleTimeout.C:
a.closing = true
a.Logger.Printf("Idle timeout of %v reached, closing\n", a.IdleDuration.String())
close(a.conn.ConnectionClosed)
return
case <-a.close:
case shouldRestart := <-a.close:
if !shouldRestart {
close(a.conn.ConnectionClosed)
}
return
}
}
Expand Down
27 changes: 14 additions & 13 deletions agents/handshake_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
. "github.com/QUIC-Tracker/quic-tracker"
"github.com/davecgh/go-spew/spew"
"strings"
)

Expand Down Expand Up @@ -38,13 +39,9 @@ func (a *HandshakeAgent) Run(conn *Connection) {
a.sendInitial = make(chan bool, 1)

incPackets := conn.IncomingPackets.RegisterNewChan(1000)

outPackets := conn.OutgoingPackets.RegisterNewChan(1000)

tlsStatus := a.TLSAgent.TLSStatus.RegisterNewChan(10)

socketStatus := make(chan interface{}, 10)
a.SocketAgent.SocketStatus.Register(socketStatus)
socketStatus := a.SocketAgent.SocketStatus.RegisterNewChan(10)

firstInitialReceived := false
tlsCompleted := false
Expand All @@ -66,20 +63,17 @@ func (a *HandshakeAgent) Run(conn *Connection) {
a.HandshakeStatus.Submit(HandshakeStatus{false, p, err})
return
}
conn.SendPacket(conn.GetInitialPacket(), EncryptionLevelInitial)
close(conn.ConnectionRestart)
case *RetryPacket:
if !a.IgnoreRetry && bytes.Equal(conn.DestinationCID, p.OriginalDestinationCID) && !a.receivedRetry { // TODO: Check the original_connection_id TP too
a.receivedRetry = true
conn.DestinationCID = p.Header().(*LongHeader).SourceCID
tlsTP := conn.TLSTPHandler
conn.TransitionTo(QuicVersion, QuicALPNToken)
tlsTP, alpn := conn.TLSTPHandler, conn.ALPN
spew.Dump(tlsTP)
conn.TransitionTo(QuicVersion, alpn)
conn.TLSTPHandler = tlsTP
conn.Token = p.RetryToken
a.TLSAgent.Stop()
a.TLSAgent.Join()
a.TLSAgent.Run(conn)
a.TLSAgent.TLSStatus.Register(tlsStatus)
conn.SendPacket(conn.GetInitialPacket(), EncryptionLevelInitial)
close(conn.ConnectionRestart)
}
case Framer:
if p.Contains(ConnectionCloseType) || p.Contains(ApplicationCloseType) {
Expand Down Expand Up @@ -122,6 +116,13 @@ func (a *HandshakeAgent) Run(conn *Connection) {
if strings.Contains(i.(error).Error(), "connection refused") {
a.HandshakeStatus.Submit(HandshakeStatus{false, nil , i.(error)})
}
case <-conn.ConnectionRestarted:
incPackets = conn.IncomingPackets.RegisterNewChan(1000)
outPackets = conn.OutgoingPackets.RegisterNewChan(1000)
tlsStatus = a.TLSAgent.TLSStatus.RegisterNewChan(10)
socketStatus = a.SocketAgent.SocketStatus.RegisterNewChan(10)
conn.ConnectionRestarted = make(chan bool, 1)
conn.SendPacket(conn.GetInitialPacket(), EncryptionLevelInitial)
case <-a.close:
return
}
Expand Down
8 changes: 4 additions & 4 deletions agents/http_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,10 @@ func (a *HTTPAgent) SendRequest(path, method, authority string, headers map[stri
}

hdrs := []HTTPHeader{
HTTPHeader{":method", method},
HTTPHeader{":scheme", "https"},
HTTPHeader{":authority", authority},
HTTPHeader{":path", path},
{":method", method},
{":scheme", "https"},
{":authority", authority},
{":path", path},
}
for k, v := range headers {
hdrs = append(hdrs, HTTPHeader{k, v})
Expand Down
23 changes: 20 additions & 3 deletions agents/socket_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (a *SocketAgent) Run(conn *Connection) {

if err != nil {
a.Logger.Println("Closing UDP socket because of error", err.Error())
select {
case <-recChan:
return
default:
}
close(recChan)
a.SocketStatus.Submit(err)
break
Expand All @@ -65,6 +70,11 @@ func (a *SocketAgent) Run(conn *Connection) {
a.Logger.Printf("Received %d bytes from UDP socket\n", i)
payload := make([]byte, i)
copy(payload, recBuf[:i])
select {
case <-recChan:
return
default:
}
recChan <- payload
}
}()
Expand All @@ -80,9 +90,16 @@ func (a *SocketAgent) Run(conn *Connection) {
}

conn.IncomingPayloads.Submit(p)
case <-a.close:
conn.UdpConnection.Close()
// TODO: Close this agent gracefully
case shouldRestart := <-a.close:
if !shouldRestart {
conn.UdpConnection.Close()
}
select {
case <-recChan:
return
default:
}
close(recChan)
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion common.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
MinimumInitialLengthv6 = 1232
MaxUDPPayloadSize = 65507
MaximumVersion = 0xff000012
MinimumVersion = 0xff000012
MinimumVersion = 0xff000011
)

// errors
Expand Down
4 changes: 4 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type Connection struct {
StreamInput Broadcaster //type: StreamInput

ConnectionClosed chan bool
ConnectionRestart chan bool // Triggered when receiving a Retry or a VN packet
ConnectionRestarted chan bool

OriginalDestinationCID ConnectionID
SourceCID ConnectionID
Expand Down Expand Up @@ -290,6 +292,8 @@ func NewConnection(serverName string, version uint32, ALPN string, SCID []byte,
c.FrameQueue = NewBroadcaster(1000)
c.TransportParameters = NewBroadcaster(10)
c.ConnectionClosed = make(chan bool, 1)
c.ConnectionRestart = make(chan bool, 1)
c.ConnectionRestarted = make(chan bool, 1)
c.PreparePacket = NewBroadcaster(1000)
c.StreamInput = NewBroadcaster(1000)

Expand Down

0 comments on commit ccb2842

Please sign in to comment.