Commit 448bb037 authored by Juliusz Chroboczek's avatar Juliusz Chroboczek

Use pushConn to delete connections.

The previous mechanism (going through up.local) was racy and complicated.
parent d3655b89
...@@ -34,7 +34,6 @@ type upTrack interface { ...@@ -34,7 +34,6 @@ type upTrack interface {
} }
type downConnection interface { type downConnection interface {
Close() error
} }
type downTrack interface { type downTrack interface {
......
...@@ -20,7 +20,7 @@ type diskClient struct { ...@@ -20,7 +20,7 @@ type diskClient struct {
id string id string
mu sync.Mutex mu sync.Mutex
down []*diskConn down map[string]*diskConn
closed bool closed bool
} }
...@@ -52,7 +52,7 @@ func (client *diskClient) Close() error { ...@@ -52,7 +52,7 @@ func (client *diskClient) Close() error {
return nil return nil
} }
func (client *diskClient) pushConn(conn upConnection, tracks []upTrack, label string) error { func (client *diskClient) pushConn(id string, conn upConnection, tracks []upTrack, label string) error {
client.mu.Lock() client.mu.Lock()
defer client.mu.Unlock() defer client.mu.Unlock()
...@@ -60,18 +60,32 @@ func (client *diskClient) pushConn(conn upConnection, tracks []upTrack, label st ...@@ -60,18 +60,32 @@ func (client *diskClient) pushConn(conn upConnection, tracks []upTrack, label st
return errors.New("disk client is closed") return errors.New("disk client is closed")
} }
old := client.down[id]
if old != nil {
old.Close()
delete(client.down, id)
}
if conn == nil {
return nil
}
directory := filepath.Join(recordingsDir, client.group.name) directory := filepath.Join(recordingsDir, client.group.name)
err := os.MkdirAll(directory, 0700) err := os.MkdirAll(directory, 0700)
if err != nil { if err != nil {
return err return err
} }
if client.down == nil {
client.down = make(map[string]*diskConn)
}
down, err := newDiskConn(directory, label, conn, tracks) down, err := newDiskConn(directory, label, conn, tracks)
if err != nil { if err != nil {
return err return err
} }
client.down = append(client.down, down) client.down[conn.Id()] = down
return nil return nil
} }
......
...@@ -25,7 +25,7 @@ type client interface { ...@@ -25,7 +25,7 @@ type client interface {
Group() *group Group() *group
Id() string Id() string
Username() string Username() string
pushConn(conn upConnection, tracks []upTrack, label string) error pushConn(id string, conn upConnection, tracks []upTrack, label string) error
pushClient(id, username string, add bool) error pushClient(id, username string, add bool) error
} }
...@@ -53,11 +53,8 @@ type group struct { ...@@ -53,11 +53,8 @@ type group struct {
history []chatHistoryEntry history []chatHistoryEntry
} }
type delConnAction struct { type pushConnAction struct {
id string id string
}
type addConnAction struct {
conn upConnection conn upConnection
tracks []upTrack tracks []upTrack
} }
......
...@@ -111,7 +111,6 @@ type rtpDownConnection struct { ...@@ -111,7 +111,6 @@ type rtpDownConnection struct {
remote upConnection remote upConnection
tracks []*rtpDownTrack tracks []*rtpDownTrack
iceCandidates []*webrtc.ICECandidateInit iceCandidates []*webrtc.ICECandidateInit
close func() error
} }
func newDownConn(id string, remote upConnection) (*rtpDownConnection, error) { func newDownConn(id string, remote upConnection) (*rtpDownConnection, error) {
...@@ -133,10 +132,6 @@ func newDownConn(id string, remote upConnection) (*rtpDownConnection, error) { ...@@ -133,10 +132,6 @@ func newDownConn(id string, remote upConnection) (*rtpDownConnection, error) {
return conn, nil return conn, nil
} }
func (down *rtpDownConnection) Close() error {
return down.close()
}
func (down *rtpDownConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error { func (down *rtpDownConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error {
if down.pc.RemoteDescription() != nil { if down.pc.RemoteDescription() != nil {
return down.pc.AddICECandidate(*candidate) return down.pc.AddICECandidate(*candidate)
...@@ -279,7 +274,6 @@ type rtpUpConnection struct { ...@@ -279,7 +274,6 @@ type rtpUpConnection struct {
iceCandidates []*webrtc.ICECandidateInit iceCandidates []*webrtc.ICECandidateInit
mu sync.Mutex mu sync.Mutex
closed bool
tracks []*rtpUpTrack tracks []*rtpUpTrack
local []downConnection local []downConnection
} }
...@@ -303,9 +297,6 @@ func (up *rtpUpConnection) Label() string { ...@@ -303,9 +297,6 @@ func (up *rtpUpConnection) Label() string {
func (up *rtpUpConnection) addLocal(local downConnection) error { func (up *rtpUpConnection) addLocal(local downConnection) error {
up.mu.Lock() up.mu.Lock()
defer up.mu.Unlock() defer up.mu.Unlock()
if up.closed {
return ErrConnectionClosed
}
for _, t := range up.local { for _, t := range up.local {
if t == local { if t == local {
return nil return nil
...@@ -335,21 +326,6 @@ func (up *rtpUpConnection) getLocal() []downConnection { ...@@ -335,21 +326,6 @@ func (up *rtpUpConnection) getLocal() []downConnection {
return local return local
} }
func (up *rtpUpConnection) Close() error {
up.mu.Lock()
defer up.mu.Unlock()
go func(local []downConnection) {
for _, l := range local {
l.Close()
}
}(up.local)
up.local = nil
up.closed = true
return up.pc.Close()
}
func (up *rtpUpConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error { func (up *rtpUpConnection) addICECandidate(candidate *webrtc.ICECandidateInit) error {
if up.pc.RemoteDescription() != nil { if up.pc.RemoteDescription() != nil {
return up.pc.AddICECandidate(*candidate) return up.pc.AddICECandidate(*candidate)
...@@ -468,7 +444,7 @@ func newUpConn(c client, id string) (*rtpUpConnection, error) { ...@@ -468,7 +444,7 @@ func newUpConn(c client, id string) (*rtpUpConnection, error) {
} }
clients := c.Group().getClients(c) clients := c.Group().getClients(c)
for _, cc := range clients { for _, cc := range clients {
cc.pushConn(conn, tracks, conn.label) cc.pushConn(conn.id, conn, tracks, conn.label)
} }
go rtcpUpSender(conn) go rtcpUpSender(conn)
} }
......
...@@ -261,7 +261,14 @@ func delUpConn(c *webClient, id string) bool { ...@@ -261,7 +261,14 @@ func delUpConn(c *webClient, id string) bool {
} }
conn.mu.Unlock() conn.mu.Unlock()
conn.Close() go func(clients []client) {
for _, c := range clients {
c.pushConn(conn.id, nil, nil, "")
}
}(c.Group().getClients(c))
conn.pc.Close()
return true return true
} }
...@@ -320,10 +327,6 @@ func addDownConn(c *webClient, id string, remote upConnection) (*rtpDownConnecti ...@@ -320,10 +327,6 @@ func addDownConn(c *webClient, id string, remote upConnection) (*rtpDownConnecti
} }
c.down[id] = conn c.down[id] = conn
conn.close = func() error {
return c.action(delConnAction{conn.id})
}
return conn, nil return conn, nil
} }
...@@ -571,12 +574,12 @@ func addDownConnTracks(c *webClient, remote upConnection, tracks []upTrack) (*rt ...@@ -571,12 +574,12 @@ func addDownConnTracks(c *webClient, remote upConnection, tracks []upTrack) (*rt
return down, nil return down, nil
} }
func (c *webClient) pushConn(conn upConnection, tracks []upTrack, label string) error { func (c *webClient) pushConn(id string, conn upConnection, tracks []upTrack, label string) error {
err := c.action(addConnAction{conn, tracks}) err := c.action(pushConnAction{id, conn, tracks})
if err != nil { if err != nil {
return err return err
} }
if label != "" { if conn != nil && label != "" {
err := c.action(addLabelAction{conn.Id(), conn.Label()}) err := c.action(addLabelAction{conn.Id(), conn.Label()})
if err != nil { if err != nil {
return err return err
...@@ -714,7 +717,17 @@ func clientLoop(c *webClient, conn *websocket.Conn) error { ...@@ -714,7 +717,17 @@ func clientLoop(c *webClient, conn *websocket.Conn) error {
} }
case a := <-c.actionCh: case a := <-c.actionCh:
switch a := a.(type) { switch a := a.(type) {
case addConnAction: case pushConnAction:
found := delDownConn(c, a.id)
if a.conn == nil {
if found {
c.write(clientMessage{
Type: "close",
Id: a.id,
})
}
continue
}
down, err := addDownConnTracks( down, err := addDownConnTracks(
c, a.conn, a.tracks, c, a.conn, a.tracks,
) )
...@@ -736,14 +749,6 @@ func clientLoop(c *webClient, conn *websocket.Conn) error { ...@@ -736,14 +749,6 @@ func clientLoop(c *webClient, conn *websocket.Conn) error {
continue continue
} }
} }
case delConnAction:
found := delDownConn(c, a.id)
if found {
c.write(clientMessage{
Type: "close",
Id: a.id,
})
}
case addLabelAction: case addLabelAction:
c.write(clientMessage{ c.write(clientMessage{
Type: "label", Type: "label",
...@@ -757,7 +762,7 @@ func clientLoop(c *webClient, conn *websocket.Conn) error { ...@@ -757,7 +762,7 @@ func clientLoop(c *webClient, conn *websocket.Conn) error {
for i, t := range tracks { for i, t := range tracks {
ts[i] = t ts[i] = t
} }
go a.c.pushConn(u, ts, u.label) go a.c.pushConn(u.id, u, ts, u.label)
} }
case connectionFailedAction: case connectionFailedAction:
found := delUpConn(c, a.id) found := delUpConn(c, a.id)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment