Commit 7014bc64 authored by Sébastien Paolacci's avatar Sébastien Paolacci Committed by Russ Cox

net: spread fd over several pollservers.

Lighten contention without preventing further improvements on pollservers.
Connections are spread over Min(GOMAXPROCS, NumCPU, 8) pollserver instances.

Median of 10 runs, 4 cores @ 3.4GHz, amd/linux-3.2:

BenchmarkTCPOneShot                171917 ns/op   175194 ns/op      1.91%
BenchmarkTCPOneShot-2              101413 ns/op   109462 ns/op      7.94%
BenchmarkTCPOneShot-4               91796 ns/op    35712 ns/op    -61.10%
BenchmarkTCPOneShot-6               90938 ns/op    30607 ns/op    -66.34%
BenchmarkTCPOneShot-8               90374 ns/op    29150 ns/op    -67.75%
BenchmarkTCPOneShot-16             101089 ns/op   111526 ns/op     10.32%

BenchmarkTCPOneShotTimeout         174986 ns/op   178606 ns/op      2.07%
BenchmarkTCPOneShotTimeout-2       101585 ns/op   110678 ns/op      8.95%
BenchmarkTCPOneShotTimeout-4        91547 ns/op    35931 ns/op    -60.75%
BenchmarkTCPOneShotTimeout-6        91496 ns/op    31019 ns/op    -66.10%
BenchmarkTCPOneShotTimeout-8        90670 ns/op    29531 ns/op    -67.43%
BenchmarkTCPOneShotTimeout-16      101013 ns/op   106026 ns/op      4.96%

BenchmarkTCPPersistent              51731 ns/op    53324 ns/op      3.08%
BenchmarkTCPPersistent-2            32888 ns/op    30678 ns/op     -6.72%
BenchmarkTCPPersistent-4            25751 ns/op    15595 ns/op    -39.44%
BenchmarkTCPPersistent-6            26737 ns/op     9805 ns/op    -63.33%
BenchmarkTCPPersistent-8            26850 ns/op     9730 ns/op    -63.76%
BenchmarkTCPPersistent-16          104449 ns/op   102838 ns/op     -1.54%

BenchmarkTCPPersistentTimeout       51806 ns/op    53281 ns/op      2.85%
BenchmarkTCPPersistentTimeout-2     32956 ns/op    30895 ns/op     -6.25%
BenchmarkTCPPersistentTimeout-4     25994 ns/op    18111 ns/op    -30.33%
BenchmarkTCPPersistentTimeout-6     26679 ns/op     9846 ns/op    -63.09%
BenchmarkTCPPersistentTimeout-8     26810 ns/op     9727 ns/op    -63.72%
BenchmarkTCPPersistentTimeout-16   101652 ns/op   104410 ns/op      2.71%

R=rsc, dvyukov, dave, mikioh.mikioh, bradfitz, remyoudompheng
CC=golang-dev
https://golang.org/cl/6496054
parent 6feb6132
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"errors" "errors"
"io" "io"
"os" "os"
"runtime"
"sync" "sync"
"syscall" "syscall"
"time" "time"
...@@ -45,6 +46,9 @@ type netFD struct { ...@@ -45,6 +46,9 @@ type netFD struct {
// owned by fd wait server // owned by fd wait server
ncr, ncw int ncr, ncw int
// wait server
pollServer *pollServer
} }
// A pollServer helps FDs determine when to retry a non-blocking // A pollServer helps FDs determine when to retry a non-blocking
...@@ -255,21 +259,45 @@ func (s *pollServer) WaitWrite(fd *netFD) error { ...@@ -255,21 +259,45 @@ func (s *pollServer) WaitWrite(fd *netFD) error {
} }
// Network FD methods. // Network FD methods.
// All the network FDs use a single pollServer. // Spread network FDs over several pollServers.
var pollMaxN int
var pollservers []*pollServer
var startServersOnce []func()
var pollserver *pollServer func init() {
var onceStartServer sync.Once pollMaxN = runtime.NumCPU()
if pollMaxN > 8 {
pollMaxN = 8 // No improvement then.
}
pollservers = make([]*pollServer, pollMaxN)
startServersOnce = make([]func(), pollMaxN)
for i := 0; i < pollMaxN; i++ {
k := i
once := new(sync.Once)
startServersOnce[i] = func() { once.Do(func() { startServer(k) }) }
}
}
func startServer() { func startServer(k int) {
p, err := newPollServer() p, err := newPollServer()
if err != nil { if err != nil {
print("Start pollServer: ", err.Error(), "\n") panic(err)
}
pollservers[k] = p
}
func server(fd int) *pollServer {
pollN := runtime.GOMAXPROCS(0)
if pollN > pollMaxN {
pollN = pollMaxN
} }
pollserver = p k := fd % pollN
startServersOnce[k]()
return pollservers[k]
} }
func newFD(fd, family, sotype int, net string) (*netFD, error) { func newFD(fd, family, sotype int, net string) (*netFD, error) {
onceStartServer.Do(startServer)
if err := syscall.SetNonblock(fd, true); err != nil { if err := syscall.SetNonblock(fd, true); err != nil {
return nil, err return nil, err
} }
...@@ -281,6 +309,7 @@ func newFD(fd, family, sotype int, net string) (*netFD, error) { ...@@ -281,6 +309,7 @@ func newFD(fd, family, sotype int, net string) (*netFD, error) {
} }
netfd.cr = make(chan error, 1) netfd.cr = make(chan error, 1)
netfd.cw = make(chan error, 1) netfd.cw = make(chan error, 1)
netfd.pollServer = server(fd)
return netfd, nil return netfd, nil
} }
...@@ -300,7 +329,7 @@ func (fd *netFD) setAddr(laddr, raddr Addr) { ...@@ -300,7 +329,7 @@ func (fd *netFD) setAddr(laddr, raddr Addr) {
func (fd *netFD) connect(ra syscall.Sockaddr) error { func (fd *netFD) connect(ra syscall.Sockaddr) error {
err := syscall.Connect(fd.sysfd, ra) err := syscall.Connect(fd.sysfd, ra)
if err == syscall.EINPROGRESS { if err == syscall.EINPROGRESS {
if err = pollserver.WaitWrite(fd); err != nil { if err = fd.pollServer.WaitWrite(fd); err != nil {
return err return err
} }
var e int var e int
...@@ -354,8 +383,8 @@ func (fd *netFD) decref() { ...@@ -354,8 +383,8 @@ func (fd *netFD) decref() {
} }
func (fd *netFD) Close() error { func (fd *netFD) Close() error {
pollserver.Lock() // needed for both fd.incref(true) and pollserver.Evict fd.pollServer.Lock() // needed for both fd.incref(true) and pollserver.Evict
defer pollserver.Unlock() defer fd.pollServer.Unlock()
if err := fd.incref(true); err != nil { if err := fd.incref(true); err != nil {
return err return err
} }
...@@ -364,7 +393,7 @@ func (fd *netFD) Close() error { ...@@ -364,7 +393,7 @@ func (fd *netFD) Close() error {
// the final decref will close fd.sysfd. This should happen // the final decref will close fd.sysfd. This should happen
// fairly quickly, since all the I/O is non-blocking, and any // fairly quickly, since all the I/O is non-blocking, and any
// attempts to block in the pollserver will return errClosing. // attempts to block in the pollserver will return errClosing.
pollserver.Evict(fd) fd.pollServer.Evict(fd)
fd.decref() fd.decref()
return nil return nil
} }
...@@ -401,7 +430,7 @@ func (fd *netFD) Read(p []byte) (n int, err error) { ...@@ -401,7 +430,7 @@ func (fd *netFD) Read(p []byte) (n int, err error) {
if err == syscall.EAGAIN { if err == syscall.EAGAIN {
err = errTimeout err = errTimeout
if fd.rdeadline >= 0 { if fd.rdeadline >= 0 {
if err = pollserver.WaitRead(fd); err == nil { if err = fd.pollServer.WaitRead(fd); err == nil {
continue continue
} }
} }
...@@ -431,7 +460,7 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) { ...@@ -431,7 +460,7 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
if err == syscall.EAGAIN { if err == syscall.EAGAIN {
err = errTimeout err = errTimeout
if fd.rdeadline >= 0 { if fd.rdeadline >= 0 {
if err = pollserver.WaitRead(fd); err == nil { if err = fd.pollServer.WaitRead(fd); err == nil {
continue continue
} }
} }
...@@ -459,7 +488,7 @@ func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.S ...@@ -459,7 +488,7 @@ func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.S
if err == syscall.EAGAIN { if err == syscall.EAGAIN {
err = errTimeout err = errTimeout
if fd.rdeadline >= 0 { if fd.rdeadline >= 0 {
if err = pollserver.WaitRead(fd); err == nil { if err = fd.pollServer.WaitRead(fd); err == nil {
continue continue
} }
} }
...@@ -501,7 +530,7 @@ func (fd *netFD) Write(p []byte) (int, error) { ...@@ -501,7 +530,7 @@ func (fd *netFD) Write(p []byte) (int, error) {
if err == syscall.EAGAIN { if err == syscall.EAGAIN {
err = errTimeout err = errTimeout
if fd.wdeadline >= 0 { if fd.wdeadline >= 0 {
if err = pollserver.WaitWrite(fd); err == nil { if err = fd.pollServer.WaitWrite(fd); err == nil {
continue continue
} }
} }
...@@ -533,7 +562,7 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) { ...@@ -533,7 +562,7 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
if err == syscall.EAGAIN { if err == syscall.EAGAIN {
err = errTimeout err = errTimeout
if fd.wdeadline >= 0 { if fd.wdeadline >= 0 {
if err = pollserver.WaitWrite(fd); err == nil { if err = fd.pollServer.WaitWrite(fd); err == nil {
continue continue
} }
} }
...@@ -560,7 +589,7 @@ func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oob ...@@ -560,7 +589,7 @@ func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oob
if err == syscall.EAGAIN { if err == syscall.EAGAIN {
err = errTimeout err = errTimeout
if fd.wdeadline >= 0 { if fd.wdeadline >= 0 {
if err = pollserver.WaitWrite(fd); err == nil { if err = fd.pollServer.WaitWrite(fd); err == nil {
continue continue
} }
} }
...@@ -595,7 +624,7 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err e ...@@ -595,7 +624,7 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err e
if err == syscall.EAGAIN { if err == syscall.EAGAIN {
err = errTimeout err = errTimeout
if fd.rdeadline >= 0 { if fd.rdeadline >= 0 {
if err = pollserver.WaitRead(fd); err == nil { if err = fd.pollServer.WaitRead(fd); err == nil {
continue continue
} }
} }
......
...@@ -83,7 +83,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { ...@@ -83,7 +83,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
break break
} }
if err1 == syscall.EAGAIN && c.wdeadline >= 0 { if err1 == syscall.EAGAIN && c.wdeadline >= 0 {
if err1 = pollserver.WaitWrite(c); err1 == nil { if err1 = c.pollServer.WaitWrite(c); err1 == nil {
continue continue
} }
} }
......
...@@ -59,7 +59,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { ...@@ -59,7 +59,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
break break
} }
if err1 == syscall.EAGAIN && c.wdeadline >= 0 { if err1 == syscall.EAGAIN && c.wdeadline >= 0 {
if err1 = pollserver.WaitWrite(c); err1 == nil { if err1 = c.pollServer.WaitWrite(c); err1 == nil {
continue continue
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment