Commit 2f63afdc authored by Mikio Hara's avatar Mikio Hara

net: ListenMulticastUDP to listen concurrently across multiple listeners

This CL introduces new function ListenMulticastUDP to fix
multicast UDP listening across multiple listeners issue,
to replace old multicast methods JoinGroup and LeaveGroup
on UDPConn.

This CL also enables multicast testing by default.

Fixes #2730.

R=rsc, paul.a.lalonde, fullung, devon.odell
CC=golang-dev
https://golang.org/cl/5562048
parent c86e0397
......@@ -1215,8 +1215,13 @@ reads and writes will time out and no longer block.
</p>
<p>
There is also a new <a href="/pkg/net/#DialTimeout"><code>net.DialTimeout</code></a> method to simplify
timing out dialing a network address.
There are also new functions
<a href="/pkg/net/#DialTimeout"><code>net.DialTimeout</code></a>
to simplify timing out dialing a network address and
<a href="/pkg/net/#ListenMulticastUDP"><code>net.ListenMulticastUDP</code></a>
to allow multicast UDP to listen concurrently across multiple listeners.
The <code>net.ListenMulticastUDP</code> function replaces the old
<code>JoinGroup</code> and <code>LeaveGroup</code> methods.
</p>
<p>
......
......@@ -1118,8 +1118,13 @@ reads and writes will time out and no longer block.
</p>
<p>
There is also a new <a href="/pkg/net/#DialTimeout"><code>net.DialTimeout</code></a> method to simplify
timing out dialing a network address.
There are also new functions
<a href="/pkg/net/#DialTimeout"><code>net.DialTimeout</code></a>
to simplify timing out dialing a network address and
<a href="/pkg/net/#ListenMulticastUDP"><code>net.ListenMulticastUDP</code></a>
to allow multicast UDP to listen concurrently across multiple listeners.
The <code>net.ListenMulticastUDP</code> function replaces the old
<code>JoinGroup</code> and <code>LeaveGroup</code> methods.
</p>
<p>
......
......@@ -5,43 +5,36 @@
package net
import (
"flag"
"os"
"runtime"
"testing"
)
var multicast = flag.Bool("multicast", false, "enable multicast tests")
var multicastUDPTests = []struct {
var listenMulticastUDPTests = []struct {
net string
laddr IP
gaddr IP
gaddr *UDPAddr
flags Flags
ipv6 bool
}{
// cf. RFC 4727: Experimental Values in IPv4, IPv6, ICMPv4, ICMPv6, UDP, and TCP Headers
{"udp", IPv4zero, IPv4(224, 0, 0, 254), (FlagUp | FlagLoopback), false},
{"udp4", IPv4zero, IPv4(224, 0, 0, 254), (FlagUp | FlagLoopback), false},
{"udp", IPv6unspecified, ParseIP("ff0e::114"), (FlagUp | FlagLoopback), true},
{"udp6", IPv6unspecified, ParseIP("ff01::114"), (FlagUp | FlagLoopback), true},
{"udp6", IPv6unspecified, ParseIP("ff02::114"), (FlagUp | FlagLoopback), true},
{"udp6", IPv6unspecified, ParseIP("ff04::114"), (FlagUp | FlagLoopback), true},
{"udp6", IPv6unspecified, ParseIP("ff05::114"), (FlagUp | FlagLoopback), true},
{"udp6", IPv6unspecified, ParseIP("ff08::114"), (FlagUp | FlagLoopback), true},
{"udp6", IPv6unspecified, ParseIP("ff0e::114"), (FlagUp | FlagLoopback), true},
{"udp", &UDPAddr{IPv4(224, 0, 0, 254), 12345}, FlagUp | FlagLoopback, false},
{"udp4", &UDPAddr{IPv4(224, 0, 0, 254), 12345}, FlagUp | FlagLoopback, false},
{"udp", &UDPAddr{ParseIP("ff0e::114"), 12345}, FlagUp | FlagLoopback, true},
{"udp6", &UDPAddr{ParseIP("ff01::114"), 12345}, FlagUp | FlagLoopback, true},
{"udp6", &UDPAddr{ParseIP("ff02::114"), 12345}, FlagUp | FlagLoopback, true},
{"udp6", &UDPAddr{ParseIP("ff04::114"), 12345}, FlagUp | FlagLoopback, true},
{"udp6", &UDPAddr{ParseIP("ff05::114"), 12345}, FlagUp | FlagLoopback, true},
{"udp6", &UDPAddr{ParseIP("ff08::114"), 12345}, FlagUp | FlagLoopback, true},
{"udp6", &UDPAddr{ParseIP("ff0e::114"), 12345}, FlagUp | FlagLoopback, true},
}
func TestMulticastUDP(t *testing.T) {
if runtime.GOOS == "plan9" || runtime.GOOS == "windows" {
return
}
if !*multicast {
t.Logf("test disabled; use --multicast to enable")
func TestListenMulticastUDP(t *testing.T) {
switch runtime.GOOS {
case "netbsd", "openbsd", "plan9", "windows":
return
}
for _, tt := range multicastUDPTests {
for _, tt := range listenMulticastUDPTests {
if tt.ipv6 && (!supportsIPv6 || os.Getuid() != 0) {
continue
}
......@@ -60,14 +53,11 @@ func TestMulticastUDP(t *testing.T) {
t.Logf("an appropriate multicast interface not found")
return
}
c, err := ListenUDP(tt.net, &UDPAddr{IP: tt.laddr})
c, err := ListenMulticastUDP(tt.net, ifi, tt.gaddr)
if err != nil {
t.Fatalf("ListenUDP failed: %v", err)
}
defer c.Close()
if err := c.JoinGroup(ifi, tt.gaddr); err != nil {
t.Fatalf("JoinGroup failed: %v", err)
t.Fatalf("ListenMulticastUDP failed: %v", err)
}
defer c.Close() // test to listen concurrently across multiple listeners
if !tt.ipv6 {
testIPv4MulticastSocketOptions(t, c.fd, ifi)
} else {
......@@ -79,7 +69,7 @@ func TestMulticastUDP(t *testing.T) {
}
var found bool
for _, ifma := range ifmat {
if ifma.(*IPAddr).IP.Equal(tt.gaddr) {
if ifma.(*IPAddr).IP.Equal(tt.gaddr.IP) {
found = true
break
}
......@@ -87,23 +77,16 @@ func TestMulticastUDP(t *testing.T) {
if !found {
t.Fatalf("%q not found in RIB", tt.gaddr.String())
}
if err := c.LeaveGroup(ifi, tt.gaddr); err != nil {
t.Fatalf("LeaveGroup failed: %v", err)
}
}
}
func TestSimpleMulticastUDP(t *testing.T) {
if runtime.GOOS == "plan9" {
return
}
if !*multicast {
t.Logf("test disabled; use --multicast to enable")
func TestSimpleListenMulticastUDP(t *testing.T) {
switch runtime.GOOS {
case "plan9":
return
}
for _, tt := range multicastUDPTests {
var ifi *Interface
for _, tt := range listenMulticastUDPTests {
if tt.ipv6 {
continue
}
......@@ -112,6 +95,7 @@ func TestSimpleMulticastUDP(t *testing.T) {
if err != nil {
t.Fatalf("Interfaces failed: %v", err)
}
var ifi *Interface
for _, x := range ift {
if x.Flags&tt.flags == tt.flags {
ifi = &x
......@@ -122,17 +106,11 @@ func TestSimpleMulticastUDP(t *testing.T) {
t.Logf("an appropriate multicast interface not found")
return
}
c, err := ListenUDP(tt.net, &UDPAddr{IP: tt.laddr})
c, err := ListenMulticastUDP(tt.net, ifi, tt.gaddr)
if err != nil {
t.Fatalf("ListenUDP failed: %v", err)
}
defer c.Close()
if err := c.JoinGroup(ifi, tt.gaddr); err != nil {
t.Fatalf("JoinGroup failed: %v", err)
}
if err := c.LeaveGroup(ifi, tt.gaddr); err != nil {
t.Fatalf("LeaveGroup failed: %v", err)
t.Fatalf("ListenMulticastUDP failed: %v", err)
}
c.Close()
}
}
......
......@@ -28,9 +28,18 @@ func socket(net string, f, t, p int, la, ra syscall.Sockaddr, toAddr func(syscal
syscall.CloseOnExec(s)
syscall.ForkLock.RUnlock()
setDefaultSockopts(s, f, t)
err = setDefaultSockopts(s, f, t)
if err != nil {
closesocket(s)
return nil, err
}
if la != nil {
la, err = listenerSockaddr(s, f, la, toAddr)
if err != nil {
closesocket(s)
return nil, err
}
err = syscall.Bind(s, la)
if err != nil {
closesocket(s)
......
......@@ -31,3 +31,27 @@ func maxListenerBacklog() int {
}
return int(n)
}
func listenerSockaddr(s, f int, la syscall.Sockaddr, toAddr func(syscall.Sockaddr) Addr) (syscall.Sockaddr, error) {
a := toAddr(la)
if a == nil {
return la, nil
}
switch v := a.(type) {
case *UDPAddr:
if v.IP.IsMulticast() {
err := setDefaultMulticastSockopts(s)
if err != nil {
return nil, err
}
switch f {
case syscall.AF_INET:
v.IP = IPv4zero
case syscall.AF_INET6:
v.IP = IPv6unspecified
}
return v.sockaddr(f)
}
}
return la, nil
}
......@@ -25,3 +25,27 @@ func maxListenerBacklog() int {
}
return n
}
func listenerSockaddr(s, f int, la syscall.Sockaddr, toAddr func(syscall.Sockaddr) Addr) (syscall.Sockaddr, error) {
a := toAddr(la)
if a == nil {
return la, nil
}
switch v := a.(type) {
case *UDPAddr:
if v.IP.IsMulticast() {
err := setDefaultMulticastSockopts(s)
if err != nil {
return nil, err
}
switch f {
case syscall.AF_INET:
v.IP = IPv4zero
case syscall.AF_INET6:
v.IP = IPv6unspecified
}
return v.sockaddr(f)
}
}
return la, nil
}
......@@ -12,3 +12,27 @@ func maxListenerBacklog() int {
// TODO: Implement this
return syscall.SOMAXCONN
}
func listenerSockaddr(s syscall.Handle, f int, la syscall.Sockaddr, toAddr func(syscall.Sockaddr) Addr) (syscall.Sockaddr, error) {
a := toAddr(la)
if a == nil {
return la, nil
}
switch v := a.(type) {
case *UDPAddr:
if v.IP.IsMulticast() {
err := setDefaultMulticastSockopts(s)
if err != nil {
return nil, err
}
switch f {
case syscall.AF_INET:
v.IP = IPv4zero
case syscall.AF_INET6:
v.IP = IPv6unspecified
}
return v.sockaddr(f)
}
}
return la, nil
}
......@@ -9,37 +9,55 @@
package net
import (
"os"
"syscall"
)
func setDefaultSockopts(s, f, t int) {
func setDefaultSockopts(s, f, t int) error {
switch f {
case syscall.AF_INET6:
// Allow both IP versions even if the OS default is otherwise.
// Note that some operating systems never admit this option.
syscall.SetsockoptInt(s, syscall.IPPROTO_IPV6, syscall.IPV6_V6ONLY, 0)
}
if f == syscall.AF_UNIX ||
(f == syscall.AF_INET || f == syscall.AF_INET6) && t == syscall.SOCK_STREAM {
// Allow reuse of recently-used addresses.
syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
err := syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
if err != nil {
return os.NewSyscallError("setsockopt", err)
}
// Allow reuse of recently-used ports.
// This option is supported only in descendants of 4.4BSD,
// to make an effective multicast application and an application
// that requires quick draw possible.
syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_REUSEPORT, 1)
err = syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_REUSEPORT, 1)
if err != nil {
return os.NewSyscallError("setsockopt", err)
}
}
// Allow broadcast.
syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_BROADCAST, 1)
err := syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_BROADCAST, 1)
if err != nil {
return os.NewSyscallError("setsockopt", err)
}
return nil
}
func setDefaultMulticastSockopts(fd *netFD) {
fd.incref()
defer fd.decref()
func setDefaultMulticastSockopts(s int) error {
// Allow multicast UDP and raw IP datagram sockets to listen
// concurrently across multiple listeners.
syscall.SetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
syscall.SetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_REUSEPORT, 1)
err := syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
if err != nil {
return os.NewSyscallError("setsockopt", err)
}
err = syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_REUSEPORT, 1)
if err != nil {
return os.NewSyscallError("setsockopt", err)
}
return nil
}
......@@ -7,31 +7,43 @@
package net
import (
"os"
"syscall"
)
func setDefaultSockopts(s, f, t int) {
func setDefaultSockopts(s, f, t int) error {
switch f {
case syscall.AF_INET6:
// Allow both IP versions even if the OS default is otherwise.
// Note that some operating systems never admit this option.
syscall.SetsockoptInt(s, syscall.IPPROTO_IPV6, syscall.IPV6_V6ONLY, 0)
}
if f == syscall.AF_UNIX ||
(f == syscall.AF_INET || f == syscall.AF_INET6) && t == syscall.SOCK_STREAM {
// Allow reuse of recently-used addresses.
syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
err := syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
if err != nil {
return os.NewSyscallError("setsockopt", err)
}
}
// Allow broadcast.
syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_BROADCAST, 1)
err := syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_BROADCAST, 1)
if err != nil {
return os.NewSyscallError("setsockopt", err)
}
return nil
}
func setDefaultMulticastSockopts(fd *netFD) {
fd.incref()
defer fd.decref()
func setDefaultMulticastSockopts(s int) error {
// Allow multicast UDP and raw IP datagram sockets to listen
// concurrently across multiple listeners.
syscall.SetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
err := syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
if err != nil {
return os.NewSyscallError("setsockopt", err)
}
return nil
}
......@@ -7,13 +7,15 @@
package net
import (
"os"
"syscall"
)
func setDefaultSockopts(s syscall.Handle, f, t int) {
func setDefaultSockopts(s syscall.Handle, f, t int) error {
switch f {
case syscall.AF_INET6:
// Allow both IP versions even if the OS default is otherwise.
// Note that some operating systems never admit this option.
syscall.SetsockoptInt(s, syscall.IPPROTO_IPV6, syscall.IPV6_V6ONLY, 0)
}
......@@ -25,14 +27,20 @@ func setDefaultSockopts(s syscall.Handle, f, t int) {
// to be handled by the correct socket.
// Allow broadcast.
syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_BROADCAST, 1)
err := syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_BROADCAST, 1)
if err != nil {
return os.NewSyscallError("setsockopt", err)
}
return nil
}
func setDefaultMulticastSockopts(fd *netFD) {
fd.incref()
defer fd.decref()
func setDefaultMulticastSockopts(s syscall.Handle) error {
// Allow multicast UDP and raw IP datagram sockets to listen
// concurrently across multiple listeners.
syscall.SetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
err := syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
if err != nil {
return os.NewSyscallError("setsockopt", err)
}
return nil
}
......@@ -186,20 +186,10 @@ func ListenUDP(net string, laddr *UDPAddr) (c *UDPConn, err error) {
return &UDPConn{*l.plan9Conn()}, nil
}
// JoinGroup joins the IP multicast group named by addr on ifi,
// which specifies the interface to join. JoinGroup uses the
// default multicast interface if ifi is nil.
func (c *UDPConn) JoinGroup(ifi *Interface, addr IP) error {
if !c.ok() {
return os.EINVAL
}
return os.EPLAN9
}
// LeaveGroup exits the IP multicast group named by addr on ifi.
func (c *UDPConn) LeaveGroup(ifi *Interface, addr IP) error {
if !c.ok() {
return os.EINVAL
}
return os.EPLAN9
// ListenMulticastUDP listens for incoming multicast UDP packets
// addressed to the group address gaddr on ifi, which specifies
// the interface to join. ListenMulticastUDP uses default
// multicast interface if ifi is nil.
func ListenMulticastUDP(net string, ifi *Interface, gaddr *UDPAddr) (*UDPConn, error) {
return nil, os.EPLAN9
}
......@@ -61,7 +61,7 @@ func (c *UDPConn) ok() bool { return c != nil && c.fd != nil }
// Implementation of the Conn interface - see Conn for documentation.
// Read implements the Conn Read method.
func (c *UDPConn) Read(b []byte) (n int, err error) {
func (c *UDPConn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, os.EINVAL
}
......@@ -69,7 +69,7 @@ func (c *UDPConn) Read(b []byte) (n int, err error) {
}
// Write implements the Conn Write method.
func (c *UDPConn) Write(b []byte) (n int, err error) {
func (c *UDPConn) Write(b []byte) (int, error) {
if !c.ok() {
return 0, os.EINVAL
}
......@@ -167,7 +167,7 @@ func (c *UDPConn) ReadFromUDP(b []byte) (n int, addr *UDPAddr, err error) {
}
// ReadFrom implements the PacketConn ReadFrom method.
func (c *UDPConn) ReadFrom(b []byte) (n int, addr Addr, err error) {
func (c *UDPConn) ReadFrom(b []byte) (int, Addr, error) {
if !c.ok() {
return 0, nil, os.EINVAL
}
......@@ -207,6 +207,11 @@ func (c *UDPConn) WriteTo(b []byte, addr Addr) (int, error) {
return c.WriteToUDP(b, a)
}
// File returns a copy of the underlying os.File, set to blocking mode.
// It is the caller's responsibility to close f when finished.
// Closing c does not affect f, and closing f does not affect c.
func (c *UDPConn) File() (f *os.File, err error) { return c.fd.dup() }
// DialUDP connects to the remote address raddr on the network net,
// which must be "udp", "udp4", or "udp6". If laddr is not nil, it is used
// as the local address for the connection.
......@@ -246,36 +251,75 @@ func ListenUDP(net string, laddr *UDPAddr) (*UDPConn, error) {
return newUDPConn(fd), nil
}
// File returns a copy of the underlying os.File, set to blocking mode.
// It is the caller's responsibility to close f when finished.
// Closing c does not affect f, and closing f does not affect c.
func (c *UDPConn) File() (f *os.File, err error) { return c.fd.dup() }
// ListenMulticastUDP listens for incoming multicast UDP packets
// addressed to the group address gaddr on ifi, which specifies
// the interface to join. ListenMulticastUDP uses default
// multicast interface if ifi is nil.
func ListenMulticastUDP(net string, ifi *Interface, gaddr *UDPAddr) (*UDPConn, error) {
switch net {
case "udp", "udp4", "udp6":
default:
return nil, UnknownNetworkError(net)
}
if gaddr == nil || gaddr.IP == nil {
return nil, &OpError{"listenmulticastudp", "udp", nil, errMissingAddress}
}
fd, err := internetSocket(net, gaddr.toAddr(), nil, syscall.SOCK_DGRAM, 0, "listen", sockaddrToUDP)
if err != nil {
return nil, err
}
c := newUDPConn(fd)
ip4 := gaddr.IP.To4()
if ip4 != nil {
err := listenIPv4MulticastUDP(c, ifi, ip4)
if err != nil {
c.Close()
return nil, err
}
} else {
err := listenIPv6MulticastUDP(c, ifi, gaddr.IP)
if err != nil {
c.Close()
return nil, err
}
}
return c, nil
}
// JoinGroup joins the IP multicast group named by addr on ifi,
// which specifies the interface to join. JoinGroup uses the
// default multicast interface if ifi is nil.
func (c *UDPConn) JoinGroup(ifi *Interface, addr IP) error {
if !c.ok() {
return os.EINVAL
func listenIPv4MulticastUDP(c *UDPConn, ifi *Interface, ip IP) error {
if ifi != nil {
err := setIPv4MulticastInterface(c.fd, ifi)
if err != nil {
return err
}
}
err := setIPv4MulticastLoopback(c.fd, false)
if err != nil {
return err
}
setDefaultMulticastSockopts(c.fd)
ip := addr.To4()
if ip != nil {
return joinIPv4GroupUDP(c, ifi, ip)
err = joinIPv4GroupUDP(c, ifi, ip)
if err != nil {
return err
}
return joinIPv6GroupUDP(c, ifi, addr)
return nil
}
// LeaveGroup exits the IP multicast group named by addr on ifi.
func (c *UDPConn) LeaveGroup(ifi *Interface, addr IP) error {
if !c.ok() {
return os.EINVAL
func listenIPv6MulticastUDP(c *UDPConn, ifi *Interface, ip IP) error {
if ifi != nil {
err := setIPv6MulticastInterface(c.fd, ifi)
if err != nil {
return err
}
}
ip := addr.To4()
if ip != nil {
return leaveIPv4GroupUDP(c, ifi, ip)
err := setIPv6MulticastLoopback(c.fd, false)
if err != nil {
return err
}
return leaveIPv6GroupUDP(c, ifi, addr)
err = joinIPv6GroupUDP(c, ifi, ip)
if err != nil {
return err
}
return nil
}
func joinIPv4GroupUDP(c *UDPConn, ifi *Interface, ip IP) error {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment