Skip to content

Commit

Permalink
On Read Retransmit send FSM to SENDING
Browse files Browse the repository at this point in the history
RFC6347 Section-4.2.4 states

```
The implementation reads a retransmitted flight from the peer: the
implementation transitions to the SENDING state, where it
retransmits the flight, resets the retransmit timer, and returns
to the WAITING state.  The rationale here is that the receipt of a
duplicate message is the likely result of timer expiry on the peer
and therefore suggests that part of one's previous flight was
lost.
```

Resolves #478
  • Loading branch information
Sean-Der committed Jul 11, 2024
1 parent a6d9640 commit 2356231
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 14 deletions.
25 changes: 18 additions & 7 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,21 +878,32 @@ func (c *Conn) handleIncomingPacket(ctx context.Context, buf []byte, rAddr net.A
}
}

isHandshake, err := c.fragmentBuffer.push(append([]byte{}, buf...))
isHandshake, isRetransmit, err := c.fragmentBuffer.push(append([]byte{}, buf...))
if err != nil {
// Decode error must be silently discarded
// [RFC6347 Section-4.1.2.7]
c.log.Debugf("defragment failed: %s", err)
return false, nil, nil
} else if isHandshake {
markPacketAsValid()
for out, epoch := c.fragmentBuffer.pop(); out != nil; out, epoch = c.fragmentBuffer.pop() {
header := &handshake.Header{}
if err := header.Unmarshal(out); err != nil {
c.log.Debugf("%s: handshake parse failed: %s", srvCliStr(c.state.isClient), err)
continue

if isRetransmit {
// The implementation reads a retransmitted flight from the peer: the
// implementation transitions to the SENDING state
// [RFC6347 Section-4.2.4]
select {
case c.fsm.readRetransmit <- struct{}{}:
default:
}
} else {
for out, epoch := c.fragmentBuffer.pop(); out != nil; out, epoch = c.fragmentBuffer.pop() {
header := &handshake.Header{}
if err := header.Unmarshal(out); err != nil {
c.log.Debugf("%s: handshake parse failed: %s", srvCliStr(c.state.isClient), err)
continue
}
c.handshakeCache.push(out, epoch, header.MessageSequence, header.Type, !c.state.isClient)
}
c.handshakeCache.push(out, epoch, header.MessageSequence, header.Type, !c.state.isClient)
}

return true, nil, nil
Expand Down
17 changes: 11 additions & 6 deletions fragment_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,29 @@ func (f *fragmentBuffer) size() int {
// Attempts to push a DTLS packet to the fragmentBuffer
// when it returns true it means the fragmentBuffer has inserted and the buffer shouldn't be handled
// when an error returns it is fatal, and the DTLS connection should be stopped
func (f *fragmentBuffer) push(buf []byte) (bool, error) {
func (f *fragmentBuffer) push(buf []byte) (isHandshake, isRetransmit bool, err error) {
if f.size()+len(buf) >= fragmentBufferMaxSize {
return false, errFragmentBufferOverflow
return false, false, errFragmentBufferOverflow
}

frag := new(fragment)
if err := frag.recordLayerHeader.Unmarshal(buf); err != nil {
return false, err
return false, false, err
}

// fragment isn't a handshake, we don't need to handle it
if frag.recordLayerHeader.ContentType != protocol.ContentTypeHandshake {
return false, nil
return false, false, nil
}

for buf = buf[recordlayer.FixedHeaderSize:]; len(buf) != 0; frag = new(fragment) {
if err := frag.handshakeHeader.Unmarshal(buf); err != nil {
return false, err
return false, false, err
}

// Fragment is a retransmission, we have already assembled it so ignoring
if frag.handshakeHeader.MessageSequence < f.currentMessageSequenceNumber {
return true, true, nil
}

if _, ok := f.cache[frag.handshakeHeader.MessageSequence]; !ok {
Expand All @@ -80,7 +85,7 @@ func (f *fragmentBuffer) push(buf []byte) (bool, error) {
buf = buf[end:]
}

return true, nil
return true, false, nil
}

func (f *fragmentBuffer) pop() (content []byte, epoch uint16) {
Expand Down
5 changes: 4 additions & 1 deletion handshaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type handshakeFSM struct {
cache *handshakeCache
cfg *handshakeConfig
closed chan struct{}
readRetransmit chan struct{}
}

type handshakeConfig struct {
Expand Down Expand Up @@ -173,6 +174,7 @@ func newHandshakeFSM(
cfg: cfg,
retransmitInterval: cfg.initialRetransmitInterval,
closed: make(chan struct{}),
readRetransmit: make(chan struct{}, 1),
}
}

Expand Down Expand Up @@ -303,7 +305,6 @@ func (s *handshakeFSM) wait(ctx context.Context, c flightConn) (handshakeState,
}
s.currentFlight = nextFlight
return handshakePreparing, nil

case <-retransmitTimer.C:
if !s.retransmit {
return handshakeWaiting, nil
Expand All @@ -319,6 +320,8 @@ func (s *handshakeFSM) wait(ctx context.Context, c flightConn) (handshakeState,
s.retransmitInterval = time.Second * 60
}
return handshakeSending, nil
case <-s.readRetransmit:
return handshakeSending, nil
case <-ctx.Done():
s.retransmitInterval = s.cfg.initialRetransmitInterval
return handshakeErrored, ctx.Err()
Expand Down

0 comments on commit 2356231

Please sign in to comment.