Skip to content

Commit a8f8a9b

Browse files
committed
Clean up locks and add comments
1 parent 571fde9 commit a8f8a9b

2 files changed

Lines changed: 36 additions & 20 deletions

File tree

pkg/rtpfb/history.go

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package rtpfb
55

66
import (
7+
"sync"
78
"time"
89

910
"github.com/pion/rtcp"
@@ -22,6 +23,7 @@ type ssrcSequenceNumber struct {
2223
// report. buildReport can be used to create a new report including all packets
2324
// from nextReport to highestAcked.
2425
type history struct {
26+
lock sync.RWMutex
2527
counter uint64
2628
twccToCounter map[uint16]uint64
2729
ssrcSeqNrToCounter map[ssrcSequenceNumber]uint64
@@ -35,12 +37,14 @@ type history struct {
3537

3638
func newHistory() *history {
3739
return &history{
40+
lock: sync.RWMutex{},
3841
counter: 0,
3942
twccToCounter: map[uint16]uint64{},
4043
ssrcSeqNrToCounter: map[ssrcSequenceNumber]uint64{},
4144
packets: make(map[uint64]*PacketReport),
4245
highestAcked: 0,
4346
nextReport: 0,
47+
cleanUntil: 0,
4448
}
4549
}
4650

@@ -52,21 +56,20 @@ func (h *history) addOutgoing(
5256
size int,
5357
departure time.Time,
5458
) {
55-
count := h.counter
56-
h.counter++
59+
h.lock.Lock()
60+
defer h.lock.Unlock()
5761

5862
if isTWCC {
59-
h.twccToCounter[twccSequenceNumber] = count
63+
h.twccToCounter[twccSequenceNumber] = h.counter
6064
} else {
6165
h.ssrcSeqNrToCounter[ssrcSequenceNumber{
6266
ssrc: ssrc,
6367
sequenceNumber: rtpSequenceNumber,
64-
}] = count
68+
}] = h.counter
6569
}
66-
67-
h.packets[count] = &PacketReport{
70+
h.packets[h.counter] = &PacketReport{
6871
SSRC: ssrc,
69-
SequenceNumber: count,
72+
SequenceNumber: h.counter,
7073
RTPSequenceNumber: rtpSequenceNumber,
7174
TWCCSequenceNumber: twccSequenceNumber,
7275
Size: size,
@@ -75,15 +78,22 @@ func (h *history) addOutgoing(
7578
Arrival: time.Time{},
7679
ECN: rtcp.ECNNonECT,
7780
}
81+
h.counter++
7882
}
7983

84+
// onFeedback maps an incoming ack for counter to the PacketReport stored when
85+
// the packet was sent. If the packet cannot be found, the ack is ignored.
86+
//
87+
// onFeedback must be called while holding the lock for reading.
88+
// onFeedback returns the time between ts and the time the packet was sent.
8089
func (h *history) onFeedback(ts time.Time, counter uint64, ack acknowledgement) (time.Duration, bool) {
8190
p, ok := h.packets[counter]
8291
if !ok {
92+
// ignore ack for unknown packet
8393
return 0, false
8494
}
8595
p.Arrived = ack.arrived
86-
if p.Arrived {
96+
if p.Arrived && h.highestAcked < p.SequenceNumber {
8797
h.highestAcked = p.SequenceNumber
8898
}
8999
p.Arrival = ack.arrival
@@ -92,7 +102,12 @@ func (h *history) onFeedback(ts time.Time, counter uint64, ack acknowledgement)
92102
return ts.Sub(p.Departure), true
93103
}
94104

105+
// onTWCCFeedback maps an acknowledgement to the counter by TWCC sequence number
106+
// and then calls onFeedback.
95107
func (h *history) onTWCCFeedback(ts time.Time, ack acknowledgement) (time.Duration, bool) {
108+
h.lock.RLock()
109+
defer h.lock.RUnlock()
110+
96111
counter, ok := h.twccToCounter[ack.sequenceNumber]
97112
if !ok {
98113
// ignore ack for unknown packet
@@ -102,7 +117,12 @@ func (h *history) onTWCCFeedback(ts time.Time, ack acknowledgement) (time.Durati
102117
return h.onFeedback(ts, counter, ack)
103118
}
104119

120+
// onCCFBFeedback maps an acknowledgement to the counter by ssrc and sequence
121+
// number and then calls onFeedback.
105122
func (h *history) onCCFBFeedback(ts time.Time, ssrc uint32, ack acknowledgement) (time.Duration, bool) {
123+
h.lock.RLock()
124+
defer h.lock.RUnlock()
125+
106126
counter, ok := h.ssrcSeqNrToCounter[ssrcSequenceNumber{
107127
ssrc: ssrc,
108128
sequenceNumber: ack.sequenceNumber,
@@ -124,6 +144,9 @@ func (h *history) onCCFBFeedback(ts time.Time, ssrc uint32, ack acknowledgement)
124144
//
125145
//nolint:godox
126146
func (h *history) buildReport() []PacketReport {
147+
h.lock.Lock()
148+
defer h.lock.Unlock()
149+
127150
if h.nextReport > h.highestAcked {
128151
return nil
129152
}
@@ -145,6 +168,8 @@ func (h *history) buildReport() []PacketReport {
145168
return res
146169
}
147170

171+
// delete removes p from the history. It must be called while holding the lock
172+
// for writing.
148173
func (h *history) delete(p *PacketReport) {
149174
if p.IsTWCC {
150175
delete(h.twccToCounter, p.TWCCSequenceNumber)
@@ -155,6 +180,9 @@ func (h *history) delete(p *PacketReport) {
155180
})
156181
}
157182

183+
// cleanBefore removes all entries in the interval [h.cleanBefore, counter).
184+
// cleanBefore must be called while holding the lock for writing, because it
185+
// calls out to delete.
158186
func (h *history) cleanBefore(counter uint64) {
159187
for i := h.cleanUntil; i < counter; i++ {
160188
if p, ok := h.packets[i]; ok {

pkg/rtpfb/interceptor.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package rtpfb
66

77
import (
88
"math"
9-
"sync"
109
"time"
1110

1211
"github.com/pion/interceptor"
@@ -72,7 +71,6 @@ func NewInterceptor(opts ...Option) (*InterceptorFactory, error) {
7271
func (f *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
7372
in := &Interceptor{
7473
NoOp: interceptor.NoOp{},
75-
lock: sync.RWMutex{},
7674
log: logging.NewDefaultLoggerFactory().NewLogger("ccfb_interceptor"),
7775
timestamp: time.Now,
7876
history: newHistory(),
@@ -94,7 +92,6 @@ func (f *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor,
9492
// report, a PacketReport will be added to the ccfb.Report.
9593
type Interceptor struct {
9694
interceptor.NoOp
97-
lock sync.RWMutex
9895
log logging.LeveledLogger
9996
timestamp func() time.Time
10097

@@ -109,9 +106,6 @@ func (i *Interceptor) bindTWCCStream(twccHdrExtID uint8, writer interceptor.RTPW
109106
) (int, error) {
110107
ts := i.timestamp()
111108

112-
i.lock.Lock()
113-
defer i.lock.Unlock()
114-
115109
var twccHdrExt rtp.TransportCCExtension
116110
if err := twccHdrExt.Unmarshal(header.GetExtension(twccHdrExtID)); err != nil {
117111
i.log.Warnf(
@@ -145,9 +139,6 @@ func (i *Interceptor) bindCCFBStream(writer interceptor.RTPWriter) interceptor.R
145139
) (int, error) {
146140
ts := i.timestamp()
147141

148-
i.lock.Lock()
149-
defer i.lock.Unlock()
150-
151142
i.history.addOutgoing(
152143
header.SSRC,
153144
header.SequenceNumber,
@@ -217,9 +208,6 @@ func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.
217208

218209
//nolint:cyclop
219210
func (i *Interceptor) processFeedback(ts time.Time, pkts []rtcp.Packet) (time.Duration, []PacketReport) {
220-
i.lock.Lock()
221-
defer i.lock.Unlock()
222-
223211
shortestRTT := time.Duration(math.MaxInt64)
224212
var ackDelay time.Duration
225213

0 commit comments

Comments
 (0)