Commit 04b1cfa9 authored by Dmitriy Vyukov's avatar Dmitriy Vyukov

net: reduce number of memory allocations during IO operations

Embed all data necessary for read/write operations directly into netFD.

benchmark                    old ns/op    new ns/op    delta
BenchmarkTCP4Persistent          27669        23341  -15.64%
BenchmarkTCP4Persistent-2        18173        12558  -30.90%
BenchmarkTCP4Persistent-4        10390         7319  -29.56%

This change will intentionally break all builders to see
how many allocations they do per read/write.
This will be fixed soon afterwards.

R=golang-dev, alex.brainman
CC=golang-dev
https://golang.org/cl/12413043
parent 9c0500b4
...@@ -67,16 +67,8 @@ func resolveAndDial(net, addr string, localAddr Addr, deadline time.Time) (Conn, ...@@ -67,16 +67,8 @@ func resolveAndDial(net, addr string, localAddr Addr, deadline time.Time) (Conn,
return dial(net, addr, localAddr, ra, deadline) return dial(net, addr, localAddr, ra, deadline)
} }
// Interface for all IO operations. // operation contains superset of data necessary to perform all async IO.
type anOpIface interface { type operation struct {
Op() *anOp
Name() string
Submit() error
}
// anOp implements functionality common to all IO operations.
// Its beginning must be the same as runtime.net_anOp. Keep these in sync.
type anOp struct {
// Used by IOCP interface, it must be first field // Used by IOCP interface, it must be first field
// of the struct, as our code rely on it. // of the struct, as our code rely on it.
o syscall.Overlapped o syscall.Overlapped
...@@ -87,53 +79,34 @@ type anOp struct { ...@@ -87,53 +79,34 @@ type anOp struct {
errno int32 errno int32
qty uint32 qty uint32
errnoc chan error // fields used only by net package
mu sync.Mutex
fd *netFD fd *netFD
} errc chan error
func (o *anOp) Init(fd *netFD, mode int32) {
o.fd = fd
o.mode = mode
o.runtimeCtx = fd.pd.runtimeCtx
if !canCancelIO {
var i int
if mode == 'r' {
i = 0
} else {
i = 1
}
if fd.errnoc[i] == nil {
fd.errnoc[i] = make(chan error)
}
o.errnoc = fd.errnoc[i]
}
}
func (o *anOp) Op() *anOp {
return o
}
// bufOp is used by IO operations that read / write
// data from / to client buffer.
type bufOp struct {
anOp
buf syscall.WSABuf buf syscall.WSABuf
sa syscall.Sockaddr
rsa *syscall.RawSockaddrAny
rsan int32
handle syscall.Handle
flags uint32
} }
func (o *bufOp) Init(fd *netFD, buf []byte, mode int32) { func (o *operation) InitBuf(buf []byte) {
o.anOp.Init(fd, mode)
o.buf.Len = uint32(len(buf)) o.buf.Len = uint32(len(buf))
if len(buf) == 0 {
o.buf.Buf = nil o.buf.Buf = nil
} else { if len(buf) != 0 {
o.buf.Buf = (*byte)(unsafe.Pointer(&buf[0])) o.buf.Buf = (*byte)(unsafe.Pointer(&buf[0]))
} }
} }
// ioSrv executes net IO requests. // ioSrv executes net IO requests.
type ioSrv struct { type ioSrv struct {
submchan chan anOpIface // submit IO requests req chan ioSrvReq
canchan chan anOpIface // cancel IO requests }
type ioSrvReq struct {
o *operation
submit func(o *operation) error // if nil, cancel the operation
} }
// ProcessRemoteIO will execute submit IO requests on behalf // ProcessRemoteIO will execute submit IO requests on behalf
...@@ -144,36 +117,34 @@ type ioSrv struct { ...@@ -144,36 +117,34 @@ type ioSrv struct {
func (s *ioSrv) ProcessRemoteIO() { func (s *ioSrv) ProcessRemoteIO() {
runtime.LockOSThread() runtime.LockOSThread()
defer runtime.UnlockOSThread() defer runtime.UnlockOSThread()
for { for r := range s.req {
select { if r.submit != nil {
case o := <-s.submchan: r.o.errc <- r.submit(r.o)
o.Op().errnoc <- o.Submit() } else {
case o := <-s.canchan: r.o.errc <- syscall.CancelIo(r.o.fd.sysfd)
o.Op().errnoc <- syscall.CancelIo(syscall.Handle(o.Op().fd.sysfd))
} }
} }
} }
// ExecIO executes a single IO operation oi. It submits and cancels // ExecIO executes a single IO operation o. It submits and cancels
// IO in the current thread for systems where Windows CancelIoEx API // IO in the current thread for systems where Windows CancelIoEx API
// is available. Alternatively, it passes the request onto // is available. Alternatively, it passes the request onto
// runtime netpoll and waits for completion or cancels request. // runtime netpoll and waits for completion or cancels request.
func (s *ioSrv) ExecIO(oi anOpIface) (int, error) { func (s *ioSrv) ExecIO(o *operation, name string, submit func(o *operation) error) (int, error) {
var err error fd := o.fd
o := oi.Op()
// Notify runtime netpoll about starting IO. // Notify runtime netpoll about starting IO.
err = o.fd.pd.Prepare(int(o.mode)) err := fd.pd.Prepare(int(o.mode))
if err != nil { if err != nil {
return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err} return 0, &OpError{name, fd.net, fd.laddr, err}
} }
// Start IO. // Start IO.
if canCancelIO { if canCancelIO {
err = oi.Submit() err = submit(o)
} else { } else {
// Send request to a special dedicated thread, // Send request to a special dedicated thread,
// so it can stop the IO with CancelIO later. // so it can stop the IO with CancelIO later.
s.submchan <- oi s.req <- ioSrvReq{o, submit}
err = <-o.errnoc err = <-o.errc
} }
switch err { switch err {
case nil: case nil:
...@@ -182,15 +153,15 @@ func (s *ioSrv) ExecIO(oi anOpIface) (int, error) { ...@@ -182,15 +153,15 @@ func (s *ioSrv) ExecIO(oi anOpIface) (int, error) {
// IO started, and we have to wait for its completion. // IO started, and we have to wait for its completion.
err = nil err = nil
default: default:
return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err} return 0, &OpError{name, fd.net, fd.laddr, err}
} }
// Wait for our request to complete. // Wait for our request to complete.
err = o.fd.pd.Wait(int(o.mode)) err = fd.pd.Wait(int(o.mode))
if err == nil { if err == nil {
// All is good. Extract our IO results and return. // All is good. Extract our IO results and return.
if o.errno != 0 { if o.errno != 0 {
err = syscall.Errno(o.errno) err = syscall.Errno(o.errno)
return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err} return 0, &OpError{name, fd.net, fd.laddr, err}
} }
return int(o.qty), nil return int(o.qty), nil
} }
...@@ -204,24 +175,24 @@ func (s *ioSrv) ExecIO(oi anOpIface) (int, error) { ...@@ -204,24 +175,24 @@ func (s *ioSrv) ExecIO(oi anOpIface) (int, error) {
} }
// Cancel our request. // Cancel our request.
if canCancelIO { if canCancelIO {
err := syscall.CancelIoEx(syscall.Handle(o.Op().fd.sysfd), &o.o) err := syscall.CancelIoEx(fd.sysfd, &o.o)
// Assuming ERROR_NOT_FOUND is returned, if IO is completed. // Assuming ERROR_NOT_FOUND is returned, if IO is completed.
if err != nil && err != syscall.ERROR_NOT_FOUND { if err != nil && err != syscall.ERROR_NOT_FOUND {
// TODO(brainman): maybe do something else, but panic. // TODO(brainman): maybe do something else, but panic.
panic(err) panic(err)
} }
} else { } else {
s.canchan <- oi s.req <- ioSrvReq{o, nil}
<-o.errnoc <-o.errc
} }
// Wait for cancellation to complete. // Wait for cancellation to complete.
o.fd.pd.WaitCanceled(int(o.mode)) fd.pd.WaitCanceled(int(o.mode))
if o.errno != 0 { if o.errno != 0 {
err = syscall.Errno(o.errno) err = syscall.Errno(o.errno)
if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled
err = netpollErr err = netpollErr
} }
return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err} return 0, &OpError{name, fd.net, fd.laddr, err}
} }
// We issued cancellation request. But, it seems, IO operation succeeded // We issued cancellation request. But, it seems, IO operation succeeded
// before cancellation request run. We need to treat IO operation as // before cancellation request run. We need to treat IO operation as
...@@ -238,8 +209,7 @@ func startServer() { ...@@ -238,8 +209,7 @@ func startServer() {
if !canCancelIO { if !canCancelIO {
// Only CancelIo API is available. Lets start special goroutine // Only CancelIo API is available. Lets start special goroutine
// locked to an OS thread, that both starts and cancels IO. // locked to an OS thread, that both starts and cancels IO.
iosrv.submchan = make(chan anOpIface) iosrv.req = make(chan ioSrvReq)
iosrv.canchan = make(chan anOpIface)
go iosrv.ProcessRemoteIO() go iosrv.ProcessRemoteIO()
} }
} }
...@@ -259,30 +229,39 @@ type netFD struct { ...@@ -259,30 +229,39 @@ type netFD struct {
net string net string
laddr Addr laddr Addr
raddr Addr raddr Addr
errnoc [2]chan error // read/write submit or cancel operation errors
// serialize access to Read and Write methods rop operation // read operation
rio, wio sync.Mutex wop operation // write operation
// wait server // wait server
pd pollDesc pd pollDesc
} }
func newFD(fd syscall.Handle, family, sotype int, net string) (*netFD, error) { func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) {
if initErr != nil { if initErr != nil {
return nil, initErr return nil, initErr
} }
onceStartServer.Do(startServer) onceStartServer.Do(startServer)
netfd := &netFD{ fd := &netFD{
sysfd: fd, sysfd: sysfd,
family: family, family: family,
sotype: sotype, sotype: sotype,
net: net, net: net,
} }
if err := netfd.pd.Init(netfd); err != nil { if err := fd.pd.Init(fd); err != nil {
return nil, err return nil, err
} }
return netfd, nil fd.rop.mode = 'r'
fd.wop.mode = 'w'
fd.rop.fd = fd
fd.wop.fd = fd
fd.rop.runtimeCtx = fd.pd.runtimeCtx
fd.wop.runtimeCtx = fd.pd.runtimeCtx
if !canCancelIO {
fd.rop.errc = make(chan error)
fd.rop.errc = make(chan error)
}
return fd, nil
} }
func (fd *netFD) setAddr(laddr, raddr Addr) { func (fd *netFD) setAddr(laddr, raddr Addr) {
...@@ -291,21 +270,6 @@ func (fd *netFD) setAddr(laddr, raddr Addr) { ...@@ -291,21 +270,6 @@ func (fd *netFD) setAddr(laddr, raddr Addr) {
runtime.SetFinalizer(fd, (*netFD).Close) runtime.SetFinalizer(fd, (*netFD).Close)
} }
// Make new connection.
type connectOp struct {
anOp
ra syscall.Sockaddr
}
func (o *connectOp) Submit() error {
return syscall.ConnectEx(o.fd.sysfd, o.ra, nil, 0, nil, &o.o)
}
func (o *connectOp) Name() string {
return "ConnectEx"
}
func (fd *netFD) connect(la, ra syscall.Sockaddr) error { func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
if !canUseConnectEx(fd.net) { if !canUseConnectEx(fd.net) {
return syscall.Connect(fd.sysfd, ra) return syscall.Connect(fd.sysfd, ra)
...@@ -325,10 +289,13 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error { ...@@ -325,10 +289,13 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
} }
} }
// Call ConnectEx API. // Call ConnectEx API.
var o connectOp o := &fd.wop
o.Init(fd, 'w') o.mu.Lock()
o.ra = ra defer o.mu.Unlock()
_, err := iosrv.ExecIO(&o) o.sa = ra
_, err := iosrv.ExecIO(o, "ConnectEx", func(o *operation) error {
return syscall.ConnectEx(o.fd.sysfd, o.sa, nil, 0, nil, &o.o)
})
if err != nil { if err != nil {
return err return err
} }
...@@ -385,10 +352,10 @@ func (fd *netFD) Close() error { ...@@ -385,10 +352,10 @@ func (fd *netFD) Close() error {
// unblock pending reader and writer // unblock pending reader and writer
fd.pd.Evict() fd.pd.Evict()
// wait for both reader and writer to exit // wait for both reader and writer to exit
fd.rio.Lock() fd.rop.mu.Lock()
defer fd.rio.Unlock() fd.wop.mu.Lock()
fd.wio.Lock() fd.rop.mu.Unlock()
defer fd.wio.Unlock() fd.wop.mu.Unlock()
return nil return nil
} }
...@@ -412,54 +379,24 @@ func (fd *netFD) CloseWrite() error { ...@@ -412,54 +379,24 @@ func (fd *netFD) CloseWrite() error {
return fd.shutdown(syscall.SHUT_WR) return fd.shutdown(syscall.SHUT_WR)
} }
// Read from network.
type readOp struct {
bufOp
}
func (o *readOp) Submit() error {
var d, f uint32
return syscall.WSARecv(syscall.Handle(o.fd.sysfd), &o.buf, 1, &d, &f, &o.o, nil)
}
func (o *readOp) Name() string {
return "WSARecv"
}
func (fd *netFD) Read(buf []byte) (int, error) { func (fd *netFD) Read(buf []byte) (int, error) {
if err := fd.incref(false); err != nil { if err := fd.incref(false); err != nil {
return 0, err return 0, err
} }
defer fd.decref() defer fd.decref()
fd.rio.Lock() o := &fd.rop
defer fd.rio.Unlock() o.mu.Lock()
var o readOp defer o.mu.Unlock()
o.Init(fd, buf, 'r') o.InitBuf(buf)
n, err := iosrv.ExecIO(&o) n, err := iosrv.ExecIO(o, "WSARecv", func(o *operation) error {
return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
})
if err == nil && n == 0 { if err == nil && n == 0 {
err = io.EOF err = io.EOF
} }
return n, err return n, err
} }
// ReadFrom from network.
type readFromOp struct {
bufOp
rsa syscall.RawSockaddrAny
rsan int32
}
func (o *readFromOp) Submit() error {
var d, f uint32
return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &d, &f, &o.rsa, &o.rsan, &o.o, nil)
}
func (o *readFromOp) Name() string {
return "WSARecvFrom"
}
func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) { func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
if len(buf) == 0 { if len(buf) == 0 {
return 0, nil, nil return 0, nil, nil
...@@ -468,12 +405,17 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) { ...@@ -468,12 +405,17 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
return 0, nil, err return 0, nil, err
} }
defer fd.decref() defer fd.decref()
fd.rio.Lock() o := &fd.rop
defer fd.rio.Unlock() o.mu.Lock()
var o readFromOp defer o.mu.Unlock()
o.Init(fd, buf, 'r') o.InitBuf(buf)
o.rsan = int32(unsafe.Sizeof(o.rsa)) n, err = iosrv.ExecIO(o, "WSARecvFrom", func(o *operation) error {
n, err = iosrv.ExecIO(&o) if o.rsa == nil {
o.rsa = new(syscall.RawSockaddrAny)
}
o.rsan = int32(unsafe.Sizeof(*o.rsa))
return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, o.rsa, &o.rsan, &o.o, nil)
})
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
...@@ -481,47 +423,18 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) { ...@@ -481,47 +423,18 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
return return
} }
// Write to network.
type writeOp struct {
bufOp
}
func (o *writeOp) Submit() error {
var d uint32
return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &d, 0, &o.o, nil)
}
func (o *writeOp) Name() string {
return "WSASend"
}
func (fd *netFD) Write(buf []byte) (int, error) { func (fd *netFD) Write(buf []byte) (int, error) {
if err := fd.incref(false); err != nil { if err := fd.incref(false); err != nil {
return 0, err return 0, err
} }
defer fd.decref() defer fd.decref()
fd.wio.Lock() o := &fd.wop
defer fd.wio.Unlock() o.mu.Lock()
var o writeOp defer o.mu.Unlock()
o.Init(fd, buf, 'w') o.InitBuf(buf)
return iosrv.ExecIO(&o) return iosrv.ExecIO(o, "WSASend", func(o *operation) error {
} return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil)
})
// WriteTo to network.
type writeToOp struct {
bufOp
sa syscall.Sockaddr
}
func (o *writeToOp) Submit() error {
var d uint32
return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &d, 0, o.sa, &o.o, nil)
}
func (o *writeToOp) Name() string {
return "WSASendto"
} }
func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
...@@ -532,31 +445,14 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { ...@@ -532,31 +445,14 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
return 0, err return 0, err
} }
defer fd.decref() defer fd.decref()
fd.wio.Lock() o := &fd.wop
defer fd.wio.Unlock() o.mu.Lock()
var o writeToOp defer o.mu.Unlock()
o.Init(fd, buf, 'w') o.InitBuf(buf)
o.sa = sa o.sa = sa
return iosrv.ExecIO(&o) return iosrv.ExecIO(o, "WSASendto", func(o *operation) error {
} return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil)
})
// Accept new network connections.
type acceptOp struct {
anOp
newsock syscall.Handle
attrs [2]syscall.RawSockaddrAny // space for local and remote address only
}
func (o *acceptOp) Submit() error {
var d uint32
l := uint32(unsafe.Sizeof(o.attrs[0]))
return syscall.AcceptEx(o.fd.sysfd, o.newsock,
(*byte)(unsafe.Pointer(&o.attrs[0])), 0, l, l, &d, &o.o)
}
func (o *acceptOp) Name() string {
return "AcceptEx"
} }
func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) { func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
...@@ -579,12 +475,15 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) { ...@@ -579,12 +475,15 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
} }
// Submit accept request. // Submit accept request.
fd.rio.Lock() o := &fd.rop
defer fd.rio.Unlock() o.mu.Lock()
var o acceptOp defer o.mu.Unlock()
o.Init(fd, 'r') o.handle = s
o.newsock = s var rawsa [2]syscall.RawSockaddrAny
_, err = iosrv.ExecIO(&o) o.rsan = int32(unsafe.Sizeof(rawsa[0]))
_, err = iosrv.ExecIO(o, "AcceptEx", func(o *operation) error {
return syscall.AcceptEx(o.fd.sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o)
})
if err != nil { if err != nil {
netfd.Close() netfd.Close()
return nil, err return nil, err
...@@ -600,9 +499,8 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) { ...@@ -600,9 +499,8 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
// Get local and peer addr out of AcceptEx buffer. // Get local and peer addr out of AcceptEx buffer.
var lrsa, rrsa *syscall.RawSockaddrAny var lrsa, rrsa *syscall.RawSockaddrAny
var llen, rlen int32 var llen, rlen int32
l := uint32(unsafe.Sizeof(*lrsa)) syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&rawsa[0])),
syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&o.attrs[0])), 0, uint32(o.rsan), uint32(o.rsan), &lrsa, &llen, &rrsa, &rlen)
0, l, l, &lrsa, &llen, &rrsa, &rlen)
lsa, _ := lrsa.Sockaddr() lsa, _ := lrsa.Sockaddr()
rsa, _ := rrsa.Sockaddr() rsa, _ := rrsa.Sockaddr()
......
...@@ -10,20 +10,6 @@ import ( ...@@ -10,20 +10,6 @@ import (
"syscall" "syscall"
) )
type sendfileOp struct {
anOp
src syscall.Handle // source
n uint32
}
func (o *sendfileOp) Submit() (err error) {
return syscall.TransmitFile(o.fd.sysfd, o.src, o.n, 0, &o.o, nil, syscall.TF_WRITE_BEHIND)
}
func (o *sendfileOp) Name() string {
return "TransmitFile"
}
// sendFile copies the contents of r to c using the TransmitFile // sendFile copies the contents of r to c using the TransmitFile
// system call to minimize copies. // system call to minimize copies.
// //
...@@ -33,7 +19,7 @@ func (o *sendfileOp) Name() string { ...@@ -33,7 +19,7 @@ func (o *sendfileOp) Name() string {
// if handled == false, sendFile performed no work. // if handled == false, sendFile performed no work.
// //
// Note that sendfile for windows does not suppport >2GB file. // Note that sendfile for windows does not suppport >2GB file.
func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { func sendFile(fd *netFD, r io.Reader) (written int64, err error, handled bool) {
var n int64 = 0 // by default, copy until EOF var n int64 = 0 // by default, copy until EOF
lr, ok := r.(*io.LimitedReader) lr, ok := r.(*io.LimitedReader)
...@@ -48,18 +34,18 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { ...@@ -48,18 +34,18 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
return 0, nil, false return 0, nil, false
} }
if err := c.incref(false); err != nil { if err := fd.incref(false); err != nil {
return 0, err, true return 0, err, true
} }
defer c.decref() defer fd.decref()
c.wio.Lock() o := &fd.wop
defer c.wio.Unlock() o.mu.Lock()
defer o.mu.Unlock()
var o sendfileOp o.qty = uint32(n)
o.Init(c, 'w') o.handle = syscall.Handle(f.Fd())
o.n = uint32(n) done, err := iosrv.ExecIO(o, "TransmitFile", func(o *operation) error {
o.src = syscall.Handle(f.Fd()) return syscall.TransmitFile(o.fd.sysfd, o.handle, o.qty, 0, &o.o, nil, syscall.TF_WRITE_BEHIND)
done, err := iosrv.ExecIO(&o) })
if err != nil { if err != nil {
return 0, err, false return 0, err, false
} }
......
...@@ -6,6 +6,7 @@ package net ...@@ -6,6 +6,7 @@ package net
import ( import (
"fmt" "fmt"
"io"
"reflect" "reflect"
"runtime" "runtime"
"sync" "sync"
...@@ -327,3 +328,46 @@ func TestTCPConcurrentAccept(t *testing.T) { ...@@ -327,3 +328,46 @@ func TestTCPConcurrentAccept(t *testing.T) {
ln.Close() ln.Close()
wg.Wait() wg.Wait()
} }
func TestTCPReadWriteMallocs(t *testing.T) {
maxMallocs := 0
switch runtime.GOOS {
// Add other OSes if you know how many mallocs they do.
case "windows":
maxMallocs = 0
}
ln, err := Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("Listen failed: %v", err)
}
defer ln.Close()
var server Conn
errc := make(chan error)
go func() {
var err error
server, err = ln.Accept()
errc <- err
}()
client, err := Dial("tcp", ln.Addr().String())
if err != nil {
t.Fatalf("Dial failed: %v", err)
}
if err := <-errc; err != nil {
t.Fatalf("Accept failed: %v", err)
}
defer server.Close()
var buf [128]byte
mallocs := testing.AllocsPerRun(1000, func() {
_, err := server.Write(buf[:])
if err != nil {
t.Fatalf("Write failed: %v", err)
}
_, err = io.ReadFull(client, buf[:])
if err != nil {
t.Fatalf("Read failed: %v", err)
}
})
if int(mallocs) > maxMallocs {
t.Fatalf("Got %v allocs, want %v", mallocs, maxMallocs)
}
}
...@@ -16,9 +16,9 @@ extern void *runtime·GetQueuedCompletionStatus; ...@@ -16,9 +16,9 @@ extern void *runtime·GetQueuedCompletionStatus;
#define INVALID_HANDLE_VALUE ((uintptr)-1) #define INVALID_HANDLE_VALUE ((uintptr)-1)
// net_anOp must be the same as beginning of net.anOp. Keep these in sync. // net_op must be the same as beginning of net.operation. Keep these in sync.
typedef struct net_anOp net_anOp; typedef struct net_op net_op;
struct net_anOp struct net_op
{ {
// used by windows // used by windows
Overlapped o; Overlapped o;
...@@ -66,7 +66,7 @@ runtime·netpoll(bool block) ...@@ -66,7 +66,7 @@ runtime·netpoll(bool block)
{ {
uint32 wait, qty, key; uint32 wait, qty, key;
int32 mode, errno; int32 mode, errno;
net_anOp *o; net_op *o;
G *gp; G *gp;
if(iocphandle == INVALID_HANDLE_VALUE) if(iocphandle == INVALID_HANDLE_VALUE)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment