Commit 4b3ce50d authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Use explicit add/remove messages in writerLoop.

parent e7f9a8f3
...@@ -366,7 +366,7 @@ func addUpConn(c *client, id string) (*upConnection, error) { ...@@ -366,7 +366,7 @@ func addUpConn(c *client, id string) (*upConnection, error) {
rate: estimator.New(time.Second), rate: estimator.New(time.Second),
jitter: jitter.New(remote.Codec().ClockRate), jitter: jitter.New(remote.Codec().ClockRate),
maxBitrate: ^uint64(0), maxBitrate: ^uint64(0),
localCh: make(chan struct{}, 2), localCh: make(chan localTrackAction, 2),
writerDone: make(chan struct{}), writerDone: make(chan struct{}),
} }
u.tracks = append(u.tracks, track) u.tracks = append(u.tracks, track)
...@@ -380,16 +380,16 @@ func addUpConn(c *client, id string) (*upConnection, error) { ...@@ -380,16 +380,16 @@ func addUpConn(c *client, id string) (*upConnection, error) {
} }
c.mu.Unlock() c.mu.Unlock()
go readLoop(conn, track)
go rtcpUpListener(conn, track, receiver)
if tracks != nil { if tracks != nil {
clients := c.group.getClients(c) clients := c.group.getClients(c)
for _, cc := range clients { for _, cc := range clients {
pushConn(cc, u, tracks, u.label) pushConn(cc, u, tracks, u.label)
} }
} }
go readLoop(conn, track)
go rtcpUpListener(conn, track, receiver)
}) })
return conn, nil return conn, nil
...@@ -469,12 +469,26 @@ func writeLoop(conn *upConnection, track *upTrack, ch <-chan packetIndex) { ...@@ -469,12 +469,26 @@ func writeLoop(conn *upConnection, track *upTrack, ch <-chan packetIndex) {
buf := make([]byte, packetcache.BufSize) buf := make([]byte, packetcache.BufSize)
var packet rtp.Packet var packet rtp.Packet
local := track.getLocal() local := make([]downTrack, 0)
for { for {
select { select {
case <-track.localCh: case action := <-track.localCh:
local = track.getLocal() if action.add {
local = append(local, action.track)
} else {
found := false
for i, t := range local {
if t == action.track {
local = append(local[:i], local[i+1:]...)
found = true
break
}
}
if !found {
log.Printf("Deleting unknown track!")
}
}
case pi, ok := <-ch: case pi, ok := <-ch:
if !ok { if !ok {
return return
......
...@@ -18,6 +18,11 @@ import ( ...@@ -18,6 +18,11 @@ import (
"github.com/pion/webrtc/v2" "github.com/pion/webrtc/v2"
) )
type localTrackAction struct {
add bool
track downTrack
}
type upTrack struct { type upTrack struct {
track *webrtc.Track track *webrtc.Track
label string label string
...@@ -29,17 +34,16 @@ type upTrack struct { ...@@ -29,17 +34,16 @@ type upTrack struct {
lastSenderReport uint32 lastSenderReport uint32
lastSenderReportTime uint32 lastSenderReportTime uint32
localCh chan struct{} // signals that local has changed localCh chan localTrackAction // signals that local has changed
writerDone chan struct{} // closed when the loop dies writerDone chan struct{} // closed when the loop dies
mu sync.Mutex mu sync.Mutex
local []downTrack local []downTrack
} }
func (up *upTrack) notifyLocal() { func (up *upTrack) notifyLocal(add bool, track downTrack) {
var s struct{}
select { select {
case up.localCh <- s: case up.localCh <- localTrackAction{add, track}:
case <-up.writerDone: case <-up.writerDone:
} }
} }
...@@ -54,7 +58,7 @@ func (up *upTrack) addLocal(local downTrack) { ...@@ -54,7 +58,7 @@ func (up *upTrack) addLocal(local downTrack) {
} }
up.local = append(up.local, local) up.local = append(up.local, local)
up.mu.Unlock() up.mu.Unlock()
up.notifyLocal() up.notifyLocal(true, local)
} }
func (up *upTrack) delLocal(local downTrack) bool { func (up *upTrack) delLocal(local downTrack) bool {
...@@ -63,7 +67,7 @@ func (up *upTrack) delLocal(local downTrack) bool { ...@@ -63,7 +67,7 @@ func (up *upTrack) delLocal(local downTrack) bool {
if l == local { if l == local {
up.local = append(up.local[:i], up.local[i+1:]...) up.local = append(up.local[:i], up.local[i+1:]...)
up.mu.Unlock() up.mu.Unlock()
up.notifyLocal() up.notifyLocal(false, l)
return true return true
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment