Commit 11320fa5 authored by Alex Brainman's avatar Alex Brainman

net: have separate read and write processing threads on windows

Fixes #4195

R=golang-dev, mikioh.mikioh
CC=golang-dev
https://golang.org/cl/12960046
parent c0148303
...@@ -18,8 +18,6 @@ type netFD struct { ...@@ -18,8 +18,6 @@ type netFD struct {
laddr, raddr Addr laddr, raddr Addr
} }
var canCancelIO = true // used for testing current package
func sysInit() { func sysInit() {
} }
......
...@@ -33,8 +33,6 @@ type netFD struct { ...@@ -33,8 +33,6 @@ type netFD struct {
pd pollDesc pd pollDesc
} }
var canCancelIO = true // used for testing current package
func sysInit() { func sysInit() {
} }
......
...@@ -234,16 +234,20 @@ func (s *ioSrv) ExecIO(o *operation, name string, submit func(o *operation) erro ...@@ -234,16 +234,20 @@ func (s *ioSrv) ExecIO(o *operation, name string, submit func(o *operation) erro
} }
// Start helper goroutines. // Start helper goroutines.
var iosrv *ioSrv var rsrv, wsrv *ioSrv
var onceStartServer sync.Once var onceStartServer sync.Once
func startServer() { func startServer() {
iosrv = new(ioSrv) rsrv = new(ioSrv)
wsrv = new(ioSrv)
if !canCancelIO { if !canCancelIO {
// Only CancelIo API is available. Lets start special goroutine // Only CancelIo API is available. Lets start two special goroutines
// locked to an OS thread, that both starts and cancels IO. // locked to an OS thread, that both starts and cancels IO. One will
iosrv.req = make(chan ioSrvReq) // process read requests, while other will do writes.
go iosrv.ProcessRemoteIO() rsrv.req = make(chan ioSrvReq)
go rsrv.ProcessRemoteIO()
wsrv.req = make(chan ioSrvReq)
go wsrv.ProcessRemoteIO()
} }
} }
...@@ -337,7 +341,7 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error { ...@@ -337,7 +341,7 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
// Call ConnectEx API. // Call ConnectEx API.
o := &fd.wop o := &fd.wop
o.sa = ra o.sa = ra
_, err := iosrv.ExecIO(o, "ConnectEx", func(o *operation) error { _, err := wsrv.ExecIO(o, "ConnectEx", func(o *operation) error {
return syscall.ConnectEx(o.fd.sysfd, o.sa, nil, 0, nil, &o.o) return syscall.ConnectEx(o.fd.sysfd, o.sa, nil, 0, nil, &o.o)
}) })
if err != nil { if err != nil {
...@@ -446,7 +450,7 @@ func (fd *netFD) Read(buf []byte) (int, error) { ...@@ -446,7 +450,7 @@ func (fd *netFD) Read(buf []byte) (int, error) {
defer fd.readUnlock() defer fd.readUnlock()
o := &fd.rop o := &fd.rop
o.InitBuf(buf) o.InitBuf(buf)
n, err := iosrv.ExecIO(o, "WSARecv", func(o *operation) error { n, err := rsrv.ExecIO(o, "WSARecv", func(o *operation) error {
return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil) return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
}) })
if err == nil && n == 0 { if err == nil && n == 0 {
...@@ -468,7 +472,7 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) { ...@@ -468,7 +472,7 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
defer fd.readUnlock() defer fd.readUnlock()
o := &fd.rop o := &fd.rop
o.InitBuf(buf) o.InitBuf(buf)
n, err = iosrv.ExecIO(o, "WSARecvFrom", func(o *operation) error { n, err = rsrv.ExecIO(o, "WSARecvFrom", func(o *operation) error {
if o.rsa == nil { if o.rsa == nil {
o.rsa = new(syscall.RawSockaddrAny) o.rsa = new(syscall.RawSockaddrAny)
} }
...@@ -492,7 +496,7 @@ func (fd *netFD) Write(buf []byte) (int, error) { ...@@ -492,7 +496,7 @@ func (fd *netFD) Write(buf []byte) (int, error) {
} }
o := &fd.wop o := &fd.wop
o.InitBuf(buf) o.InitBuf(buf)
return iosrv.ExecIO(o, "WSASend", func(o *operation) error { return wsrv.ExecIO(o, "WSASend", func(o *operation) error {
return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil) return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil)
}) })
} }
...@@ -508,7 +512,7 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { ...@@ -508,7 +512,7 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
o := &fd.wop o := &fd.wop
o.InitBuf(buf) o.InitBuf(buf)
o.sa = sa o.sa = sa
return iosrv.ExecIO(o, "WSASendto", func(o *operation) error { return wsrv.ExecIO(o, "WSASendto", func(o *operation) error {
return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil) return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil)
}) })
} }
...@@ -541,7 +545,7 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) { ...@@ -541,7 +545,7 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
o.handle = s o.handle = s
var rawsa [2]syscall.RawSockaddrAny var rawsa [2]syscall.RawSockaddrAny
o.rsan = int32(unsafe.Sizeof(rawsa[0])) o.rsan = int32(unsafe.Sizeof(rawsa[0]))
_, err = iosrv.ExecIO(o, "AcceptEx", func(o *operation) error { _, err = rsrv.ExecIO(o, "AcceptEx", func(o *operation) error {
return syscall.AcceptEx(o.fd.sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o) return syscall.AcceptEx(o.fd.sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o)
}) })
if err != nil { if err != nil {
......
...@@ -830,7 +830,7 @@ func TestTransportPersistConnLeakShortBody(t *testing.T) { ...@@ -830,7 +830,7 @@ func TestTransportPersistConnLeakShortBody(t *testing.T) {
} }
nhigh := runtime.NumGoroutine() nhigh := runtime.NumGoroutine()
tr.CloseIdleConnections() tr.CloseIdleConnections()
time.Sleep(50 * time.Millisecond) time.Sleep(400 * time.Millisecond)
runtime.GC() runtime.GC()
nfinal := runtime.NumGoroutine() nfinal := runtime.NumGoroutine()
......
...@@ -42,7 +42,7 @@ func sendFile(fd *netFD, r io.Reader) (written int64, err error, handled bool) { ...@@ -42,7 +42,7 @@ func sendFile(fd *netFD, r io.Reader) (written int64, err error, handled bool) {
o := &fd.wop o := &fd.wop
o.qty = uint32(n) o.qty = uint32(n)
o.handle = syscall.Handle(f.Fd()) o.handle = syscall.Handle(f.Fd())
done, err := iosrv.ExecIO(o, "TransmitFile", func(o *operation) error { done, err := wsrv.ExecIO(o, "TransmitFile", func(o *operation) error {
return syscall.TransmitFile(o.fd.sysfd, o.handle, o.qty, 0, &o.o, nil, syscall.TF_WRITE_BEHIND) return syscall.TransmitFile(o.fd.sysfd, o.handle, o.qty, 0, &o.o, nil, syscall.TF_WRITE_BEHIND)
}) })
if err != nil { if err != nil {
......
...@@ -325,9 +325,6 @@ func TestReadWriteDeadline(t *testing.T) { ...@@ -325,9 +325,6 @@ func TestReadWriteDeadline(t *testing.T) {
t.Skipf("skipping test on %q", runtime.GOOS) t.Skipf("skipping test on %q", runtime.GOOS)
} }
if !canCancelIO {
t.Skip("skipping test on this system")
}
const ( const (
readTimeout = 50 * time.Millisecond readTimeout = 50 * time.Millisecond
writeTimeout = 250 * time.Millisecond writeTimeout = 250 * time.Millisecond
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment