Commit b042bed9 authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Maintain reception statistics, send receiver reports.

parent e2d89c7c
...@@ -277,6 +277,8 @@ func addUpConn(c *client, id string) (*upConnection, error) { ...@@ -277,6 +277,8 @@ func addUpConn(c *client, id string) (*upConnection, error) {
sendICE(c, id, candidate) sendICE(c, id, candidate)
}) })
go rtcpUpSender(c, conn)
pc.OnTrack(func(remote *webrtc.Track, receiver *webrtc.RTPReceiver) { pc.OnTrack(func(remote *webrtc.Track, receiver *webrtc.RTPReceiver) {
c.mu.Lock() c.mu.Lock()
u, ok := c.up[id] u, ok := c.up[id]
...@@ -303,6 +305,8 @@ func addUpConn(c *client, id string) (*upConnection, error) { ...@@ -303,6 +305,8 @@ func addUpConn(c *client, id string) (*upConnection, error) {
} }
go upLoop(conn, track) go upLoop(conn, track)
go rtcpUpListener(conn, track, receiver)
}) })
return conn, nil return conn, nil
...@@ -357,6 +361,78 @@ func upLoop(conn *upConnection, track *upTrack) { ...@@ -357,6 +361,78 @@ func upLoop(conn *upConnection, track *upTrack) {
} }
} }
func rtcpUpListener(conn *upConnection, track *upTrack, r *webrtc.RTPReceiver) {
for {
ps, err := r.ReadRTCP()
if err != nil {
if err != io.EOF {
log.Printf("ReadRTCP: %v", err)
}
return
}
for _, p := range ps {
switch p := p.(type) {
case *rtcp.SenderReport:
atomic.StoreUint32(&track.lastSenderReport,
uint32(p.NTPTime>>16))
case *rtcp.SourceDescription:
default:
log.Printf("RTCP: %T", p)
}
}
}
}
func sendRR(c *client, conn *upConnection) error {
c.mu.Lock()
if len(conn.tracks) == 0 {
c.mu.Unlock()
return nil
}
ssrc := conn.tracks[0].track.SSRC()
reports := make([]rtcp.ReceptionReport, 0, len(conn.tracks))
for _, t := range conn.tracks {
expected, lost, eseqno := t.cache.GetStats(true)
if expected == 0 {
expected = 1
}
if lost >= expected {
lost = expected - 1
}
reports = append(reports, rtcp.ReceptionReport{
SSRC: t.track.SSRC(),
LastSenderReport: atomic.LoadUint32(&t.lastSenderReport),
FractionLost: uint8((lost * 256) / expected),
TotalLost: lost,
LastSequenceNumber: eseqno,
})
}
c.mu.Unlock()
return conn.pc.WriteRTCP([]rtcp.Packet{
&rtcp.ReceiverReport{
SSRC: ssrc,
Reports: reports,
},
})
}
func rtcpUpSender(c *client, conn *upConnection) {
for {
time.Sleep(time.Second)
err := sendRR(c, conn)
if err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
return
}
log.Printf("WriteRTCP: %v", err)
}
}
}
func delUpConn(c *client, id string) { func delUpConn(c *client, id string) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
...@@ -498,7 +574,7 @@ func addDownTrack(c *client, id string, remoteTrack *upTrack, remoteConn *upConn ...@@ -498,7 +574,7 @@ func addDownTrack(c *client, id string, remoteTrack *upTrack, remoteConn *upConn
conn.tracks = append(conn.tracks, track) conn.tracks = append(conn.tracks, track)
remoteTrack.addLocal(track) remoteTrack.addLocal(track)
go rtcpListener(c.group, conn, track, s) go rtcpDownListener(c.group, conn, track, s)
return conn, s, nil return conn, s, nil
} }
...@@ -509,7 +585,7 @@ func msSinceEpoch() uint64 { ...@@ -509,7 +585,7 @@ func msSinceEpoch() uint64 {
return uint64(time.Since(epoch) / time.Millisecond) return uint64(time.Since(epoch) / time.Millisecond)
} }
func rtcpListener(g *group, conn *downConnection, track *downTrack, s *webrtc.RTPSender) { func rtcpDownListener(g *group, conn *downConnection, track *downTrack, s *webrtc.RTPSender) {
for { for {
ps, err := s.ReadRTCP() ps, err := s.ReadRTCP()
if err != nil { if err != nil {
......
...@@ -21,10 +21,11 @@ import ( ...@@ -21,10 +21,11 @@ import (
) )
type upTrack struct { type upTrack struct {
track *webrtc.Track track *webrtc.Track
cache *packetcache.Cache cache *packetcache.Cache
maxBitrate uint64 maxBitrate uint64
lastPLI uint64 lastPLI uint64
lastSenderReport uint32
mu sync.Mutex mu sync.Mutex
local []*downTrack local []*downTrack
......
...@@ -13,10 +13,18 @@ type entry struct { ...@@ -13,10 +13,18 @@ type entry struct {
} }
type Cache struct { type Cache struct {
mu sync.Mutex mu sync.Mutex
first uint16 // the first seqno //stats
bitmap uint32 last uint16
tail int // the next entry to be rewritten cycle uint16
lastValid bool
expected uint32
lost uint32
// bitmap
first uint16
bitmap uint32
// packet cache
tail int
entries []entry entries []entry
} }
...@@ -26,9 +34,21 @@ func New(capacity int) *Cache { ...@@ -26,9 +34,21 @@ func New(capacity int) *Cache {
} }
} }
func seqnoInvalid(seqno, reference uint16) bool {
if ((seqno - reference) & 0x8000) == 0 {
return false
}
if reference - seqno > 0x100 {
return true
}
return false
}
// Set a bit in the bitmap, shifting first if necessary. // Set a bit in the bitmap, shifting first if necessary.
func (cache *Cache) set(seqno uint16) { func (cache *Cache) set(seqno uint16) {
if cache.bitmap == 0 { if cache.bitmap == 0 || seqnoInvalid(seqno, cache.first) {
cache.first = seqno cache.first = seqno
cache.bitmap = 1 cache.bitmap = 1
return return
...@@ -65,6 +85,25 @@ func (cache *Cache) Store(seqno uint16, buf []byte) uint16 { ...@@ -65,6 +85,25 @@ func (cache *Cache) Store(seqno uint16, buf []byte) uint16 {
cache.mu.Lock() cache.mu.Lock()
defer cache.mu.Unlock() defer cache.mu.Unlock()
if !cache.lastValid || seqnoInvalid(seqno, cache.last) {
cache.last = seqno
cache.lastValid = true
cache.expected++
} else {
if ((cache.last - seqno) & 0x8000) != 0 {
cache.expected += uint32(seqno - cache.last)
cache.lost += uint32(seqno - cache.last - 1)
if seqno < cache.last {
cache.cycle++
}
cache.last = seqno
} else {
if cache.lost > 0 {
cache.lost--
}
}
}
cache.set(seqno) cache.set(seqno)
cache.entries[cache.tail].seqno = seqno cache.entries[cache.tail].seqno = seqno
...@@ -102,3 +141,18 @@ func (cache *Cache) BitmapGet() (uint16, uint16) { ...@@ -102,3 +141,18 @@ func (cache *Cache) BitmapGet() (uint16, uint16) {
cache.first += 17 cache.first += 17
return first, bitmap return first, bitmap
} }
func (cache *Cache) GetStats(reset bool) (uint32, uint32, uint32) {
cache.mu.Lock()
defer cache.mu.Unlock()
expected := cache.expected
lost := cache.lost
eseqno := uint32(cache.cycle)<<16 | uint32(cache.last)
if reset {
cache.expected = 0
cache.lost = 0
}
return expected, lost, eseqno
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment