Skip to content

Commit

Permalink
On Read Retransmit send FSM to SENDING
Browse files Browse the repository at this point in the history
RFC6347 Section-4.2.4 states

```
The implementation reads a retransmitted flight from the peer: the
implementation transitions to the SENDING state, where it
retransmits the flight, resets the retransmit timer, and returns
to the WAITING state.  The rationale here is that the receipt of a
duplicate message is the likely result of timer expiry on the peer
and therefore suggests that part of one's previous flight was
lost.
```

Resolves #478
  • Loading branch information
Sean-Der committed Jul 11, 2024
1 parent a6d9640 commit 555670f
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 17 deletions.
25 changes: 18 additions & 7 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,21 +878,32 @@ func (c *Conn) handleIncomingPacket(ctx context.Context, buf []byte, rAddr net.A
}
}

isHandshake, err := c.fragmentBuffer.push(append([]byte{}, buf...))
isHandshake, isRetransmit, err := c.fragmentBuffer.push(append([]byte{}, buf...))
if err != nil {
// Decode error must be silently discarded
// [RFC6347 Section-4.1.2.7]
c.log.Debugf("defragment failed: %s", err)
return false, nil, nil
} else if isHandshake {
markPacketAsValid()
for out, epoch := c.fragmentBuffer.pop(); out != nil; out, epoch = c.fragmentBuffer.pop() {
header := &handshake.Header{}
if err := header.Unmarshal(out); err != nil {
c.log.Debugf("%s: handshake parse failed: %s", srvCliStr(c.state.isClient), err)
continue

if isRetransmit {
// The implementation reads a retransmitted flight from the peer: the
// implementation transitions to the SENDING state
// [RFC6347 Section-4.2.4]
select {
case c.fsm.readRetransmit <- struct{}{}:
default:

Check warning on line 896 in conn.go

View check run for this annotation

Codecov / codecov/patch

conn.go#L894-L896

Added lines #L894 - L896 were not covered by tests
}
} else {
for out, epoch := c.fragmentBuffer.pop(); out != nil; out, epoch = c.fragmentBuffer.pop() {
header := &handshake.Header{}
if err := header.Unmarshal(out); err != nil {
c.log.Debugf("%s: handshake parse failed: %s", srvCliStr(c.state.isClient), err)
continue

Check warning on line 903 in conn.go

View check run for this annotation

Codecov / codecov/patch

conn.go#L902-L903

Added lines #L902 - L903 were not covered by tests
}
c.handshakeCache.push(out, epoch, header.MessageSequence, header.Type, !c.state.isClient)
}
c.handshakeCache.push(out, epoch, header.MessageSequence, header.Type, !c.state.isClient)
}

return true, nil, nil
Expand Down
17 changes: 11 additions & 6 deletions fragment_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,29 @@ func (f *fragmentBuffer) size() int {
// Attempts to push a DTLS packet to the fragmentBuffer
// when it returns true it means the fragmentBuffer has inserted and the buffer shouldn't be handled
// when an error returns it is fatal, and the DTLS connection should be stopped
func (f *fragmentBuffer) push(buf []byte) (bool, error) {
func (f *fragmentBuffer) push(buf []byte) (isHandshake, isRetransmit bool, err error) {
if f.size()+len(buf) >= fragmentBufferMaxSize {
return false, errFragmentBufferOverflow
return false, false, errFragmentBufferOverflow
}

frag := new(fragment)
if err := frag.recordLayerHeader.Unmarshal(buf); err != nil {
return false, err
return false, false, err

Check warning on line 53 in fragment_buffer.go

View check run for this annotation

Codecov / codecov/patch

fragment_buffer.go#L53

Added line #L53 was not covered by tests
}

// fragment isn't a handshake, we don't need to handle it
if frag.recordLayerHeader.ContentType != protocol.ContentTypeHandshake {
return false, nil
return false, false, nil
}

for buf = buf[recordlayer.FixedHeaderSize:]; len(buf) != 0; frag = new(fragment) {
if err := frag.handshakeHeader.Unmarshal(buf); err != nil {
return false, err
return false, false, err

Check warning on line 63 in fragment_buffer.go

View check run for this annotation

Codecov / codecov/patch

fragment_buffer.go#L63

Added line #L63 was not covered by tests
}

// Fragment is a retransmission, we have already assembled it so ignoring
if frag.handshakeHeader.MessageSequence < f.currentMessageSequenceNumber {
return true, true, nil

Check warning on line 68 in fragment_buffer.go

View check run for this annotation

Codecov / codecov/patch

fragment_buffer.go#L68

Added line #L68 was not covered by tests
}

if _, ok := f.cache[frag.handshakeHeader.MessageSequence]; !ok {
Expand All @@ -80,7 +85,7 @@ func (f *fragmentBuffer) push(buf []byte) (bool, error) {
buf = buf[end:]
}

return true, nil
return true, false, nil
}

func (f *fragmentBuffer) pop() (content []byte, epoch uint16) {
Expand Down
6 changes: 3 additions & 3 deletions fragment_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestFragmentBuffer(t *testing.T) {
} {
fragmentBuffer := newFragmentBuffer()
for _, frag := range test.In {
status, err := fragmentBuffer.push(frag)
status, _, err := fragmentBuffer.push(frag)
if err != nil {
t.Error(err)
} else if !status {
Expand Down Expand Up @@ -122,13 +122,13 @@ func TestFragmentBuffer_Overflow(t *testing.T) {
fragmentBuffer := newFragmentBuffer()

// Push a buffer that doesn't exceed size limits
if _, err := fragmentBuffer.push([]byte{0x16, 0xfe, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0F, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xfe, 0xff, 0x00}); err != nil {
if _, _, err := fragmentBuffer.push([]byte{0x16, 0xfe, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0F, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xfe, 0xff, 0x00}); err != nil {
t.Fatal(err)
}

// Allocate a buffer that exceeds cache size
largeBuffer := make([]byte, fragmentBufferMaxSize)
if _, err := fragmentBuffer.push(largeBuffer); !errors.Is(err, errFragmentBufferOverflow) {
if _, _, err := fragmentBuffer.push(largeBuffer); !errors.Is(err, errFragmentBufferOverflow) {
t.Fatalf("Pushing a large buffer returned (%s) expected(%s)", err, errFragmentBufferOverflow)
}
}
5 changes: 4 additions & 1 deletion handshaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type handshakeFSM struct {
cache *handshakeCache
cfg *handshakeConfig
closed chan struct{}
readRetransmit chan struct{}
}

type handshakeConfig struct {
Expand Down Expand Up @@ -173,6 +174,7 @@ func newHandshakeFSM(
cfg: cfg,
retransmitInterval: cfg.initialRetransmitInterval,
closed: make(chan struct{}),
readRetransmit: make(chan struct{}, 1),
}
}

Expand Down Expand Up @@ -303,7 +305,6 @@ func (s *handshakeFSM) wait(ctx context.Context, c flightConn) (handshakeState,
}
s.currentFlight = nextFlight
return handshakePreparing, nil

case <-retransmitTimer.C:
if !s.retransmit {
return handshakeWaiting, nil
Expand All @@ -319,6 +320,8 @@ func (s *handshakeFSM) wait(ctx context.Context, c flightConn) (handshakeState,
s.retransmitInterval = time.Second * 60
}
return handshakeSending, nil
case <-s.readRetransmit:
return handshakeSending, nil

Check warning on line 324 in handshaker.go

View check run for this annotation

Codecov / codecov/patch

handshaker.go#L323-L324

Added lines #L323 - L324 were not covered by tests
case <-ctx.Done():
s.retransmitInterval = s.cfg.initialRetransmitInterval
return handshakeErrored, ctx.Err()
Expand Down

0 comments on commit 555670f

Please sign in to comment.