44package rtpfb
55
66import (
7+ "sync"
78 "time"
89
910 "github.com/pion/rtcp"
@@ -22,6 +23,7 @@ type ssrcSequenceNumber struct {
2223// report. buildReport can be used to create a new report including all packets
2324// from nextReport to highestAcked.
2425type history struct {
26+ lock sync.RWMutex
2527 counter uint64
2628 twccToCounter map [uint16 ]uint64
2729 ssrcSeqNrToCounter map [ssrcSequenceNumber ]uint64
@@ -35,12 +37,14 @@ type history struct {
3537
3638func newHistory () * history {
3739 return & history {
40+ lock : sync.RWMutex {},
3841 counter : 0 ,
3942 twccToCounter : map [uint16 ]uint64 {},
4043 ssrcSeqNrToCounter : map [ssrcSequenceNumber ]uint64 {},
4144 packets : make (map [uint64 ]* PacketReport ),
4245 highestAcked : 0 ,
4346 nextReport : 0 ,
47+ cleanUntil : 0 ,
4448 }
4549}
4650
@@ -52,21 +56,20 @@ func (h *history) addOutgoing(
5256 size int ,
5357 departure time.Time ,
5458) {
55- count := h . counter
56- h . counter ++
59+ h . lock . Lock ()
60+ defer h . lock . Unlock ()
5761
5862 if isTWCC {
59- h .twccToCounter [twccSequenceNumber ] = count
63+ h .twccToCounter [twccSequenceNumber ] = h . counter
6064 } else {
6165 h .ssrcSeqNrToCounter [ssrcSequenceNumber {
6266 ssrc : ssrc ,
6367 sequenceNumber : rtpSequenceNumber ,
64- }] = count
68+ }] = h . counter
6569 }
66-
67- h .packets [count ] = & PacketReport {
70+ h .packets [h .counter ] = & PacketReport {
6871 SSRC : ssrc ,
69- SequenceNumber : count ,
72+ SequenceNumber : h . counter ,
7073 RTPSequenceNumber : rtpSequenceNumber ,
7174 TWCCSequenceNumber : twccSequenceNumber ,
7275 Size : size ,
@@ -75,15 +78,22 @@ func (h *history) addOutgoing(
7578 Arrival : time.Time {},
7679 ECN : rtcp .ECNNonECT ,
7780 }
81+ h .counter ++
7882}
7983
84+ // onFeedback maps an incoming ack for counter to the PacketReport stored when
85+ // the packet was sent. If the packet cannot be found, the ack is ignored.
86+ //
87+ // onFeedback must be called while holding the lock for reading.
88+ // onFeedback returns the time between ts and the time the packet was sent.
8089func (h * history ) onFeedback (ts time.Time , counter uint64 , ack acknowledgement ) (time.Duration , bool ) {
8190 p , ok := h .packets [counter ]
8291 if ! ok {
92+ // ignore ack for unknown packet
8393 return 0 , false
8494 }
8595 p .Arrived = ack .arrived
86- if p .Arrived {
96+ if p .Arrived && h . highestAcked < p . SequenceNumber {
8797 h .highestAcked = p .SequenceNumber
8898 }
8999 p .Arrival = ack .arrival
@@ -92,7 +102,12 @@ func (h *history) onFeedback(ts time.Time, counter uint64, ack acknowledgement)
92102 return ts .Sub (p .Departure ), true
93103}
94104
105+ // onTWCCFeedback maps an acknowledgement to the counter by TWCC sequence number
106+ // and then calls onFeedback.
95107func (h * history ) onTWCCFeedback (ts time.Time , ack acknowledgement ) (time.Duration , bool ) {
108+ h .lock .RLock ()
109+ defer h .lock .RUnlock ()
110+
96111 counter , ok := h .twccToCounter [ack .sequenceNumber ]
97112 if ! ok {
98113 // ignore ack for unknown packet
@@ -102,7 +117,12 @@ func (h *history) onTWCCFeedback(ts time.Time, ack acknowledgement) (time.Durati
102117 return h .onFeedback (ts , counter , ack )
103118}
104119
120+ // onCCFBFeedback maps an acknowledgement to the counter by ssrc and sequence
121+ // number and then calls onFeedback.
105122func (h * history ) onCCFBFeedback (ts time.Time , ssrc uint32 , ack acknowledgement ) (time.Duration , bool ) {
123+ h .lock .RLock ()
124+ defer h .lock .RUnlock ()
125+
106126 counter , ok := h .ssrcSeqNrToCounter [ssrcSequenceNumber {
107127 ssrc : ssrc ,
108128 sequenceNumber : ack .sequenceNumber ,
@@ -124,6 +144,9 @@ func (h *history) onCCFBFeedback(ts time.Time, ssrc uint32, ack acknowledgement)
124144//
125145//nolint:godox
126146func (h * history ) buildReport () []PacketReport {
147+ h .lock .Lock ()
148+ defer h .lock .Unlock ()
149+
127150 if h .nextReport > h .highestAcked {
128151 return nil
129152 }
@@ -145,6 +168,8 @@ func (h *history) buildReport() []PacketReport {
145168 return res
146169}
147170
171+ // delete removes p from the history. It must be called while holding the lock
172+ // for writing.
148173func (h * history ) delete (p * PacketReport ) {
149174 if p .IsTWCC {
150175 delete (h .twccToCounter , p .TWCCSequenceNumber )
@@ -155,6 +180,9 @@ func (h *history) delete(p *PacketReport) {
155180 })
156181}
157182
183+ // cleanBefore removes all entries in the interval [h.cleanBefore, counter).
184+ // cleanBefore must be called while holding the lock for writing, because it
185+ // calls out to delete.
158186func (h * history ) cleanBefore (counter uint64 ) {
159187 for i := h .cleanUntil ; i < counter ; i ++ {
160188 if p , ok := h .packets [i ]; ok {
0 commit comments