Commit 7393ca84 authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Make all communication between client threads asynchronous.

We used to deadlock with large numbers of tracks.  This should fix that.
parent b8dedcf0
...@@ -497,9 +497,11 @@ func delUpConn(c *client, id string) bool { ...@@ -497,9 +497,11 @@ func delUpConn(c *client, id string) bool {
cc.mu.Unlock() cc.mu.Unlock()
} }
go func(cids []clientId) {
for _, cid := range cids { for _, cid := range cids {
cid.client.action(delConnAction{cid.id}) cid.client.action(delConnAction{cid.id})
} }
}(cids)
conn.pc.Close() conn.pc.Close()
delete(c.up, id) delete(c.up, id)
...@@ -953,9 +955,12 @@ func (c *client) setRequested(audio, video bool) error { ...@@ -953,9 +955,12 @@ func (c *client) setRequested(audio, video bool) error {
c.requestedAudio = audio c.requestedAudio = audio
c.requestedVideo = video c.requestedVideo = video
for _, cc := range c.group.getClients(c) { go func() {
clients := c.group.getClients(c)
for _, cc := range clients {
cc.action(pushTracksAction{c}) cc.action(pushTracksAction{c})
} }
}()
return nil return nil
} }
...@@ -971,6 +976,17 @@ func (c *client) requested(kind webrtc.RTPCodecType) bool { ...@@ -971,6 +976,17 @@ func (c *client) requested(kind webrtc.RTPCodecType) bool {
} }
} }
func pushTracks(c *client, conn *upConnection, tracks []*upTrack, done bool, label string) {
for i, t := range tracks {
c.action(addTrackAction{t, conn, done && i == len(tracks)-1})
}
if done && label != "" {
c.action(addLabelAction{conn.id, conn.label})
}
}
func clientLoop(c *client, conn *websocket.Conn) error { func clientLoop(c *client, conn *websocket.Conn) error {
read := make(chan interface{}, 1) read := make(chan interface{}, 1)
go clientReader(conn, read, c.done) go clientReader(conn, read, c.done)
...@@ -1063,19 +1079,13 @@ func clientLoop(c *client, conn *websocket.Conn) error { ...@@ -1063,19 +1079,13 @@ func clientLoop(c *client, conn *websocket.Conn) error {
}) })
case pushTracksAction: case pushTracksAction:
for _, u := range c.up { for _, u := range c.up {
var done bool tracks := make([]*upTrack, len(u.tracks))
for i, t := range u.tracks { copy(tracks, u.tracks)
done = i >= u.trackCount-1 go pushTracks(
a.c.action(addTrackAction{ a.c, u, tracks,
t, u, done, len(tracks) >= u.trackCount-1,
}) u.label,
} )
if done && u.label != "" {
a.c.action(addLabelAction{
u.id, u.label,
})
}
} }
case connectionFailedAction: case connectionFailedAction:
found := delUpConn(c, a.id) found := delUpConn(c, a.id)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment