Commit 23e15f72 authored by Dmitriy Vyukov's avatar Dmitriy Vyukov

net: add special netFD mutex

The mutex, fdMutex, handles locking and lifetime of sysfd,
and serializes Read and Write methods.
This allows to strip 2 sync.Mutex.Lock calls,
2 sync.Mutex.Unlock calls, 1 defer and some amount
of misc overhead from every network operation.

On linux/amd64, Intel E5-2690:
benchmark                             old ns/op    new ns/op    delta
BenchmarkTCP4Persistent                    9595         9454   -1.47%
BenchmarkTCP4Persistent-2                  8978         8772   -2.29%
BenchmarkTCP4ConcurrentReadWrite           4900         4625   -5.61%
BenchmarkTCP4ConcurrentReadWrite-2         2603         2500   -3.96%

In general it strips 70-500 ns from every network operation depending
on processor model. On my relatively new E5-2690 it accounts to ~5%
of network op cost.

Fixes #6074.

R=golang-dev, bradfitz, alex.brainman, iant, mikioh.mikioh
CC=golang-dev
https://golang.org/cl/12418043
parent 89b5c6c0
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package net
import "sync/atomic"
// fdMutex is a specialized synchronization primitive
// that manages lifetime of an fd and serializes access
// to Read and Write methods on netFD.
type fdMutex struct {
state uint64
rsema uint32
wsema uint32
}
// fdMutex.state is organized as follows:
// 1 bit - whether netFD is closed, if set all subsequent lock operations will fail.
// 1 bit - lock for read operations.
// 1 bit - lock for write operations.
// 20 bits - total number of references (read+write+misc).
// 20 bits - number of outstanding read waiters.
// 20 bits - number of outstanding write waiters.
const (
mutexClosed = 1 << 0
mutexRLock = 1 << 1
mutexWLock = 1 << 2
mutexRef = 1 << 3
mutexRefMask = (1<<20 - 1) << 3
mutexRWait = 1 << 23
mutexRMask = (1<<20 - 1) << 23
mutexWWait = 1 << 43
mutexWMask = (1<<20 - 1) << 43
)
// Read operations must do RWLock(true)/RWUnlock(true).
// Write operations must do RWLock(false)/RWUnlock(false).
// Misc operations must do Incref/Decref. Misc operations include functions like
// setsockopt and setDeadline. They need to use Incref/Decref to ensure that
// they operate on the correct fd in presence of a concurrent Close call
// (otherwise fd can be closed under their feet).
// Close operation must do IncrefAndClose/Decref.
// RWLock/Incref return whether fd is open.
// RWUnlock/Decref return whether fd is closed and there are no remaining references.
func (mu *fdMutex) Incref() bool {
for {
old := atomic.LoadUint64(&mu.state)
if old&mutexClosed != 0 {
return false
}
new := old + mutexRef
if new&mutexRefMask == 0 {
panic("net: inconsistent fdMutex")
}
if atomic.CompareAndSwapUint64(&mu.state, old, new) {
return true
}
}
}
func (mu *fdMutex) IncrefAndClose() bool {
for {
old := atomic.LoadUint64(&mu.state)
if old&mutexClosed != 0 {
return false
}
// Mark as closed and acquire a reference.
new := (old | mutexClosed) + mutexRef
if new&mutexRefMask == 0 {
panic("net: inconsistent fdMutex")
}
// Remove all read and write waiters.
new &^= mutexRMask | mutexWMask
if atomic.CompareAndSwapUint64(&mu.state, old, new) {
// Wake all read and write waiters,
// they will observe closed flag after wakeup.
for old&mutexRMask != 0 {
old -= mutexRWait
runtime_Semrelease(&mu.rsema)
}
for old&mutexWMask != 0 {
old -= mutexWWait
runtime_Semrelease(&mu.wsema)
}
return true
}
}
}
func (mu *fdMutex) Decref() bool {
for {
old := atomic.LoadUint64(&mu.state)
if old&mutexRefMask == 0 {
panic("net: inconsistent fdMutex")
}
new := old - mutexRef
if atomic.CompareAndSwapUint64(&mu.state, old, new) {
return new&(mutexClosed|mutexRef) == mutexClosed
}
}
}
func (mu *fdMutex) RWLock(read bool) bool {
var mutexBit, mutexWait, mutexMask uint64
var mutexSema *uint32
if read {
mutexBit = mutexRLock
mutexWait = mutexRWait
mutexMask = mutexRMask
mutexSema = &mu.rsema
} else {
mutexBit = mutexWLock
mutexWait = mutexWWait
mutexMask = mutexWMask
mutexSema = &mu.wsema
}
for {
old := atomic.LoadUint64(&mu.state)
if old&mutexClosed != 0 {
return false
}
var new uint64
if old&mutexBit == 0 {
// Lock is free, acquire it.
new = (old | mutexBit) + mutexRef
if new&mutexRefMask == 0 {
panic("net: inconsistent fdMutex")
}
} else {
// Wait for lock.
new = old + mutexWait
if new&mutexMask == 0 {
panic("net: inconsistent fdMutex")
}
}
if atomic.CompareAndSwapUint64(&mu.state, old, new) {
if old&mutexBit == 0 {
return true
}
runtime_Semacquire(mutexSema)
// The signaller has subtracted mutexWait.
}
}
}
func (mu *fdMutex) RWUnlock(read bool) bool {
var mutexBit, mutexWait, mutexMask uint64
var mutexSema *uint32
if read {
mutexBit = mutexRLock
mutexWait = mutexRWait
mutexMask = mutexRMask
mutexSema = &mu.rsema
} else {
mutexBit = mutexWLock
mutexWait = mutexWWait
mutexMask = mutexWMask
mutexSema = &mu.wsema
}
for {
old := atomic.LoadUint64(&mu.state)
if old&mutexBit == 0 || old&mutexRefMask == 0 {
panic("net: inconsistent fdMutex")
}
// Drop lock, drop reference and wake read waiter if present.
new := (old &^ mutexBit) - mutexRef
if old&mutexMask != 0 {
new -= mutexWait
}
if atomic.CompareAndSwapUint64(&mu.state, old, new) {
if old&mutexMask != 0 {
runtime_Semrelease(mutexSema)
}
return new&(mutexClosed|mutexRef) == mutexClosed
}
}
}
// Implemented in runtime package.
func runtime_Semacquire(sema *uint32)
func runtime_Semrelease(sema *uint32)
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package net
import (
"math/rand"
"runtime"
"testing"
"time"
)
func TestMutexLock(t *testing.T) {
var mu fdMutex
if !mu.Incref() {
t.Fatal("broken")
}
if mu.Decref() {
t.Fatal("broken")
}
if !mu.RWLock(true) {
t.Fatal("broken")
}
if mu.RWUnlock(true) {
t.Fatal("broken")
}
if !mu.RWLock(false) {
t.Fatal("broken")
}
if mu.RWUnlock(false) {
t.Fatal("broken")
}
}
func TestMutexClose(t *testing.T) {
var mu fdMutex
if !mu.IncrefAndClose() {
t.Fatal("broken")
}
if mu.Incref() {
t.Fatal("broken")
}
if mu.RWLock(true) {
t.Fatal("broken")
}
if mu.RWLock(false) {
t.Fatal("broken")
}
if mu.IncrefAndClose() {
t.Fatal("broken")
}
}
func TestMutexCloseUnblock(t *testing.T) {
c := make(chan bool)
var mu fdMutex
mu.RWLock(true)
for i := 0; i < 4; i++ {
go func() {
if mu.RWLock(true) {
t.Fatal("broken")
}
c <- true
}()
}
// Concurrent goroutines must not be able to read lock the mutex.
time.Sleep(time.Millisecond)
select {
case <-c:
t.Fatal("broken")
default:
}
mu.IncrefAndClose() // Must unblock the readers.
for i := 0; i < 4; i++ {
select {
case <-c:
case <-time.After(10 * time.Second):
t.Fatal("broken")
}
}
if mu.Decref() {
t.Fatal("broken")
}
if !mu.RWUnlock(true) {
t.Fatal("broken")
}
}
func TestMutexPanic(t *testing.T) {
ensurePanics := func(f func()) {
defer func() {
if recover() == nil {
t.Fatal("does not panic")
}
}()
f()
}
var mu fdMutex
ensurePanics(func() { mu.Decref() })
ensurePanics(func() { mu.RWUnlock(true) })
ensurePanics(func() { mu.RWUnlock(false) })
ensurePanics(func() { mu.Incref(); mu.Decref(); mu.Decref() })
ensurePanics(func() { mu.RWLock(true); mu.RWUnlock(true); mu.RWUnlock(true) })
ensurePanics(func() { mu.RWLock(false); mu.RWUnlock(false); mu.RWUnlock(false) })
// ensure that it's still not broken
mu.Incref()
mu.Decref()
mu.RWLock(true)
mu.RWUnlock(true)
mu.RWLock(false)
mu.RWUnlock(false)
}
func TestMutexStress(t *testing.T) {
P := 8
N := int(1e6)
if testing.Short() {
P = 4
N = 1e4
}
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P))
done := make(chan bool)
var mu fdMutex
var readState [2]uint64
var writeState [2]uint64
for p := 0; p < P; p++ {
go func() {
r := rand.New(rand.NewSource(rand.Int63()))
for i := 0; i < N; i++ {
switch r.Intn(3) {
case 0:
if !mu.Incref() {
t.Fatal("broken")
}
if mu.Decref() {
t.Fatal("broken")
}
case 1:
if !mu.RWLock(true) {
t.Fatal("broken")
}
// Ensure that it provides mutual exclusion for readers.
if readState[0] != readState[1] {
t.Fatal("broken")
}
readState[0]++
readState[1]++
if mu.RWUnlock(true) {
t.Fatal("broken")
}
case 2:
if !mu.RWLock(false) {
t.Fatal("broken")
}
// Ensure that it provides mutual exclusion for writers.
if writeState[0] != writeState[1] {
t.Fatal("broken")
}
writeState[0]++
writeState[1]++
if mu.RWUnlock(false) {
t.Fatal("broken")
}
}
}
done <- true
}()
}
for p := 0; p < P; p++ {
<-done
}
if !mu.IncrefAndClose() {
t.Fatal("broken")
}
if !mu.Decref() {
t.Fatal("broken")
}
}
......@@ -132,7 +132,7 @@ func setDeadlineImpl(fd *netFD, t time.Time, mode int) error {
if t.IsZero() {
d = 0
}
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
......
......@@ -10,7 +10,6 @@ import (
"io"
"os"
"runtime"
"sync"
"sync/atomic"
"syscall"
"time"
......@@ -18,13 +17,8 @@ import (
// Network file descriptor.
type netFD struct {
// locking/lifetime of sysfd
sysmu sync.Mutex
sysref int
// must lock both sysmu and pollDesc to write
// can lock either to read
closing bool
// locking/lifetime of sysfd + serialize access to Read and Write methods
fdmu fdMutex
// immutable until Close
sysfd int
......@@ -35,9 +29,6 @@ type netFD struct {
laddr Addr
raddr Addr
// serialize access to Read and Write methods
rio, wio sync.Mutex
// wait server
pd pollDesc
}
......@@ -84,8 +75,9 @@ func (fd *netFD) name() string {
}
func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
fd.wio.Lock()
defer fd.wio.Unlock()
// Do not need to call fd.writeLock here,
// because fd is not yet accessible to user,
// so no concurrent operations are possible.
if err := fd.pd.PrepareWrite(); err != nil {
return err
}
......@@ -104,44 +96,69 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
return nil
}
func (fd *netFD) destroy() {
// Poller may want to unregister fd in readiness notification mechanism,
// so this must be executed before closesocket.
fd.pd.Close()
closesocket(fd.sysfd)
fd.sysfd = -1
runtime.SetFinalizer(fd, nil)
}
// Add a reference to this fd.
// If closing==true, pollDesc must be locked; mark the fd as closing.
// Returns an error if the fd cannot be used.
func (fd *netFD) incref(closing bool) error {
fd.sysmu.Lock()
if fd.closing {
fd.sysmu.Unlock()
func (fd *netFD) incref() error {
if !fd.fdmu.Incref() {
return errClosing
}
fd.sysref++
if closing {
fd.closing = true
}
fd.sysmu.Unlock()
return nil
}
// Remove a reference to this FD and close if we've been asked to do so (and
// there are no references left.
// Remove a reference to this FD and close if we've been asked to do so
// (and there are no references left).
func (fd *netFD) decref() {
fd.sysmu.Lock()
fd.sysref--
if fd.closing && fd.sysref == 0 {
// Poller may want to unregister fd in readiness notification mechanism,
// so this must be executed before closesocket.
fd.pd.Close()
closesocket(fd.sysfd)
fd.sysfd = -1
runtime.SetFinalizer(fd, nil)
}
fd.sysmu.Unlock()
if fd.fdmu.Decref() {
fd.destroy()
}
}
// Add a reference to this fd and lock for reading.
// Returns an error if the fd cannot be used.
func (fd *netFD) readLock() error {
if !fd.fdmu.RWLock(true) {
return errClosing
}
return nil
}
// Unlock for reading and remove a reference to this FD.
func (fd *netFD) readUnlock() {
if fd.fdmu.RWUnlock(true) {
fd.destroy()
}
}
// Add a reference to this fd and lock for writing.
// Returns an error if the fd cannot be used.
func (fd *netFD) writeLock() error {
if !fd.fdmu.RWLock(false) {
return errClosing
}
return nil
}
// Unlock for writing and remove a reference to this FD.
func (fd *netFD) writeUnlock() {
if fd.fdmu.RWUnlock(false) {
fd.destroy()
}
}
func (fd *netFD) Close() error {
fd.pd.Lock() // needed for both fd.incref(true) and pollDesc.Evict
if err := fd.incref(true); err != nil {
if !fd.fdmu.IncrefAndClose() {
fd.pd.Unlock()
return err
return errClosing
}
// Unblock any I/O. Once it all unblocks and returns,
// so that it cannot be referring to fd.sysfd anymore,
......@@ -158,7 +175,7 @@ func (fd *netFD) Close() error {
}
func (fd *netFD) shutdown(how int) error {
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......@@ -178,12 +195,10 @@ func (fd *netFD) CloseWrite() error {
}
func (fd *netFD) Read(p []byte) (n int, err error) {
fd.rio.Lock()
defer fd.rio.Unlock()
if err := fd.incref(false); err != nil {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.decref()
defer fd.readUnlock()
if err := fd.pd.PrepareRead(); err != nil {
return 0, &OpError{"read", fd.net, fd.raddr, err}
}
......@@ -207,12 +222,10 @@ func (fd *netFD) Read(p []byte) (n int, err error) {
}
func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
fd.rio.Lock()
defer fd.rio.Unlock()
if err := fd.incref(false); err != nil {
if err := fd.readLock(); err != nil {
return 0, nil, err
}
defer fd.decref()
defer fd.readUnlock()
if err := fd.pd.PrepareRead(); err != nil {
return 0, nil, &OpError{"read", fd.net, fd.laddr, err}
}
......@@ -236,12 +249,10 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
}
func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err error) {
fd.rio.Lock()
defer fd.rio.Unlock()
if err := fd.incref(false); err != nil {
if err := fd.readLock(); err != nil {
return 0, 0, 0, nil, err
}
defer fd.decref()
defer fd.readUnlock()
if err := fd.pd.PrepareRead(); err != nil {
return 0, 0, 0, nil, &OpError{"read", fd.net, fd.laddr, err}
}
......@@ -272,12 +283,10 @@ func chkReadErr(n int, err error, fd *netFD) error {
}
func (fd *netFD) Write(p []byte) (nn int, err error) {
fd.wio.Lock()
defer fd.wio.Unlock()
if err := fd.incref(false); err != nil {
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.decref()
defer fd.writeUnlock()
if err := fd.pd.PrepareWrite(); err != nil {
return 0, &OpError{"write", fd.net, fd.raddr, err}
}
......@@ -311,12 +320,10 @@ func (fd *netFD) Write(p []byte) (nn int, err error) {
}
func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
fd.wio.Lock()
defer fd.wio.Unlock()
if err := fd.incref(false); err != nil {
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.decref()
defer fd.writeUnlock()
if err := fd.pd.PrepareWrite(); err != nil {
return 0, &OpError{"write", fd.net, fd.raddr, err}
}
......@@ -338,12 +345,10 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
}
func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err error) {
fd.wio.Lock()
defer fd.wio.Unlock()
if err := fd.incref(false); err != nil {
if err := fd.writeLock(); err != nil {
return 0, 0, err
}
defer fd.decref()
defer fd.writeUnlock()
if err := fd.pd.PrepareWrite(); err != nil {
return 0, 0, &OpError{"write", fd.net, fd.raddr, err}
}
......@@ -366,12 +371,10 @@ func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oob
}
func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err error) {
fd.rio.Lock()
defer fd.rio.Unlock()
if err := fd.incref(false); err != nil {
if err := fd.readLock(); err != nil {
return nil, err
}
defer fd.decref()
defer fd.readUnlock()
var s int
var rsa syscall.Sockaddr
......
......@@ -105,7 +105,6 @@ type operation struct {
qty uint32
// fields used only by net package
mu sync.Mutex
fd *netFD
errc chan error
buf syscall.WSABuf
......@@ -246,10 +245,8 @@ func startServer() {
// Network file descriptor.
type netFD struct {
// locking/lifetime of sysfd
sysmu sync.Mutex
sysref int
closing bool
// locking/lifetime of sysfd + serialize access to Read and Write methods
fdmu fdMutex
// immutable until Close
sysfd syscall.Handle
......@@ -313,6 +310,9 @@ func (fd *netFD) setAddr(laddr, raddr Addr) {
}
func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
// Do not need to call fd.writeLock here,
// because fd is not yet accessible to user,
// so no concurrent operations are possible.
if !canUseConnectEx(fd.net) {
return syscall.Connect(fd.sysfd, ra)
}
......@@ -332,8 +332,6 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
}
// Call ConnectEx API.
o := &fd.wop
o.mu.Lock()
defer o.mu.Unlock()
o.sa = ra
_, err := iosrv.ExecIO(o, "ConnectEx", func(o *operation) error {
return syscall.ConnectEx(o.fd.sysfd, o.sa, nil, 0, nil, &o.o)
......@@ -345,64 +343,80 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
return syscall.Setsockopt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_UPDATE_CONNECT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd)))
}
func (fd *netFD) destroy() {
if fd.sysfd == syscall.InvalidHandle {
return
}
// Poller may want to unregister fd in readiness notification mechanism,
// so this must be executed before closesocket.
fd.pd.Close()
closesocket(fd.sysfd)
fd.sysfd = syscall.InvalidHandle
// no need for a finalizer anymore
runtime.SetFinalizer(fd, nil)
}
// Add a reference to this fd.
// If closing==true, mark the fd as closing.
// Returns an error if the fd cannot be used.
func (fd *netFD) incref(closing bool) error {
if fd == nil {
func (fd *netFD) incref() error {
if !fd.fdmu.Incref() {
return errClosing
}
fd.sysmu.Lock()
if fd.closing {
fd.sysmu.Unlock()
return errClosing
return nil
}
// Remove a reference to this FD and close if we've been asked to do so
// (and there are no references left).
func (fd *netFD) decref() {
if fd.fdmu.Decref() {
fd.destroy()
}
fd.sysref++
if closing {
fd.closing = true
}
// Add a reference to this fd and lock for reading.
// Returns an error if the fd cannot be used.
func (fd *netFD) readLock() error {
if !fd.fdmu.RWLock(true) {
return errClosing
}
closing = fd.closing
fd.sysmu.Unlock()
return nil
}
// Remove a reference to this FD and close if we've been asked to do so (and
// there are no references left.
func (fd *netFD) decref() {
if fd == nil {
return
// Unlock for reading and remove a reference to this FD.
func (fd *netFD) readUnlock() {
if fd.fdmu.RWUnlock(true) {
fd.destroy()
}
fd.sysmu.Lock()
fd.sysref--
if fd.closing && fd.sysref == 0 && fd.sysfd != syscall.InvalidHandle {
// Poller may want to unregister fd in readiness notification mechanism,
// so this must be executed before closesocket.
fd.pd.Close()
closesocket(fd.sysfd)
fd.sysfd = syscall.InvalidHandle
// no need for a finalizer anymore
runtime.SetFinalizer(fd, nil)
}
// Add a reference to this fd and lock for writing.
// Returns an error if the fd cannot be used.
func (fd *netFD) writeLock() error {
if !fd.fdmu.RWLock(false) {
return errClosing
}
return nil
}
// Unlock for writing and remove a reference to this FD.
func (fd *netFD) writeUnlock() {
if fd.fdmu.RWUnlock(false) {
fd.destroy()
}
fd.sysmu.Unlock()
}
func (fd *netFD) Close() error {
if err := fd.incref(true); err != nil {
return err
if !fd.fdmu.IncrefAndClose() {
return errClosing
}
defer fd.decref()
// unblock pending reader and writer
fd.pd.Evict()
// wait for both reader and writer to exit
fd.rop.mu.Lock()
fd.wop.mu.Lock()
fd.rop.mu.Unlock()
fd.wop.mu.Unlock()
fd.decref()
return nil
}
func (fd *netFD) shutdown(how int) error {
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......@@ -422,13 +436,11 @@ func (fd *netFD) CloseWrite() error {
}
func (fd *netFD) Read(buf []byte) (int, error) {
if err := fd.incref(false); err != nil {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.decref()
defer fd.readUnlock()
o := &fd.rop
o.mu.Lock()
defer o.mu.Unlock()
o.InitBuf(buf)
n, err := iosrv.ExecIO(o, "WSARecv", func(o *operation) error {
return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
......@@ -443,13 +455,11 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
if len(buf) == 0 {
return 0, nil, nil
}
if err := fd.incref(false); err != nil {
if err := fd.readLock(); err != nil {
return 0, nil, err
}
defer fd.decref()
defer fd.readUnlock()
o := &fd.rop
o.mu.Lock()
defer o.mu.Unlock()
o.InitBuf(buf)
n, err = iosrv.ExecIO(o, "WSARecvFrom", func(o *operation) error {
if o.rsa == nil {
......@@ -466,13 +476,11 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
}
func (fd *netFD) Write(buf []byte) (int, error) {
if err := fd.incref(false); err != nil {
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.decref()
defer fd.writeUnlock()
o := &fd.wop
o.mu.Lock()
defer o.mu.Unlock()
o.InitBuf(buf)
return iosrv.ExecIO(o, "WSASend", func(o *operation) error {
return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil)
......@@ -483,13 +491,11 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
if len(buf) == 0 {
return 0, nil
}
if err := fd.incref(false); err != nil {
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.decref()
defer fd.writeUnlock()
o := &fd.wop
o.mu.Lock()
defer o.mu.Unlock()
o.InitBuf(buf)
o.sa = sa
return iosrv.ExecIO(o, "WSASendto", func(o *operation) error {
......@@ -498,10 +504,10 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
}
func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
if err := fd.incref(false); err != nil {
if err := fd.readLock(); err != nil {
return nil, err
}
defer fd.decref()
defer fd.readUnlock()
// Get new socket.
s, err := sysSocket(fd.family, fd.sotype, 0)
......@@ -522,8 +528,6 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
// Submit accept request.
o := &fd.rop
o.mu.Lock()
defer o.mu.Unlock()
o.handle = s
var rawsa [2]syscall.RawSockaddrAny
o.rsan = int32(unsafe.Sizeof(rawsa[0]))
......
......@@ -58,12 +58,10 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
return 0, err, false
}
c.wio.Lock()
defer c.wio.Unlock()
if err := c.incref(false); err != nil {
if err := c.writeLock(); err != nil {
return 0, err, true
}
defer c.decref()
defer c.writeUnlock()
dst := c.sysfd
src := int(f.Fd())
......
......@@ -36,12 +36,10 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
return 0, nil, false
}
c.wio.Lock()
defer c.wio.Unlock()
if err := c.incref(false); err != nil {
if err := c.writeLock(); err != nil {
return 0, err, true
}
defer c.decref()
defer c.writeUnlock()
dst := c.sysfd
src := int(f.Fd())
......
......@@ -34,13 +34,12 @@ func sendFile(fd *netFD, r io.Reader) (written int64, err error, handled bool) {
return 0, nil, false
}
if err := fd.incref(false); err != nil {
if err := fd.writeLock(); err != nil {
return 0, err, true
}
defer fd.decref()
defer fd.writeUnlock()
o := &fd.wop
o.mu.Lock()
defer o.mu.Unlock()
o.qty = uint32(n)
o.handle = syscall.Handle(f.Fd())
done, err := iosrv.ExecIO(o, "TransmitFile", func(o *operation) error {
......
......@@ -101,7 +101,7 @@ done:
}
func setReadBuffer(fd *netFD, bytes int) error {
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......@@ -109,7 +109,7 @@ func setReadBuffer(fd *netFD, bytes int) error {
}
func setWriteBuffer(fd *netFD, bytes int) error {
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......@@ -117,7 +117,7 @@ func setWriteBuffer(fd *netFD, bytes int) error {
}
func setKeepAlive(fd *netFD, keepalive bool) error {
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......@@ -133,7 +133,7 @@ func setLinger(fd *netFD, sec int) error {
l.Onoff = 0
l.Linger = 0
}
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......
......@@ -18,7 +18,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error {
}
var a [4]byte
copy(a[:], ip.To4())
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......@@ -26,7 +26,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error {
}
func setIPv4MulticastLoopback(fd *netFD, v bool) error {
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......
......@@ -15,7 +15,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error {
v = int32(ifi.Index)
}
mreq := &syscall.IPMreqn{Ifindex: v}
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......@@ -23,7 +23,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error {
}
func setIPv4MulticastLoopback(fd *netFD, v bool) error {
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......
......@@ -16,7 +16,7 @@ func joinIPv4Group(fd *netFD, ifi *Interface, ip IP) error {
if err := setIPv4MreqToInterface(mreq, ifi); err != nil {
return err
}
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......@@ -28,7 +28,7 @@ func setIPv6MulticastInterface(fd *netFD, ifi *Interface) error {
if ifi != nil {
v = ifi.Index
}
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......@@ -36,7 +36,7 @@ func setIPv6MulticastInterface(fd *netFD, ifi *Interface) error {
}
func setIPv6MulticastLoopback(fd *netFD, v bool) error {
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......@@ -49,7 +49,7 @@ func joinIPv6Group(fd *netFD, ifi *Interface, ip IP) error {
if ifi != nil {
mreq.Interface = uint32(ifi.Index)
}
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......
......@@ -17,7 +17,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error {
}
var a [4]byte
copy(a[:], ip.To4())
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......@@ -25,7 +25,7 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error {
}
func setIPv4MulticastLoopback(fd *netFD, v bool) error {
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......
......@@ -14,7 +14,7 @@ import (
// Set keep alive period.
func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......
......@@ -14,7 +14,7 @@ import (
// Set keep alive period.
func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......
......@@ -12,7 +12,7 @@ import (
)
func setNoDelay(fd *netFD, noDelay bool) error {
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......
......@@ -14,7 +14,7 @@ import (
// Set keep alive period.
func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......
......@@ -11,7 +11,7 @@ import (
)
func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
if err := fd.incref(false); err != nil {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
......
......@@ -2019,7 +2019,7 @@ runtime·gc(int32 force)
if(gcpercent < 0)
return;
runtime·semacquire(&runtime·worldsema);
runtime·semacquire(&runtime·worldsema, false);
if(!force && mstats.heap_alloc < mstats.next_gc) {
// typically threads which lost the race to grab
// worldsema exit here when gc is done.
......@@ -2218,7 +2218,7 @@ runtime·ReadMemStats(MStats *stats)
// because stoptheworld can only be used by
// one goroutine at a time, and there might be
// a pending garbage collection already calling it.
runtime·semacquire(&runtime·worldsema);
runtime·semacquire(&runtime·worldsema, false);
m->gcing = 1;
runtime·stoptheworld();
updatememstats(nil);
......
......@@ -447,7 +447,7 @@ func Stack(b Slice, all bool) (n int) {
pc = (uintptr)runtime·getcallerpc(&b);
if(all) {
runtime·semacquire(&runtime·worldsema);
runtime·semacquire(&runtime·worldsema, false);
m->gcing = 1;
runtime·stoptheworld();
}
......@@ -494,7 +494,7 @@ func GoroutineProfile(b Slice) (n int, ok bool) {
ok = false;
n = runtime·gcount();
if(n <= b.len) {
runtime·semacquire(&runtime·worldsema);
runtime·semacquire(&runtime·worldsema, false);
m->gcing = 1;
runtime·stoptheworld();
......
......@@ -206,6 +206,14 @@ func runtime_pollUnblock(pd *PollDesc) {
runtime·ready(wg);
}
func runtime_Semacquire(addr *uint32) {
runtime·semacquire(addr, true);
}
func runtime_Semrelease(addr *uint32) {
runtime·semrelease(addr);
}
uintptr
runtime·netpollfd(PollDesc *pd)
{
......
......@@ -1836,7 +1836,7 @@ runtime·gomaxprocsfunc(int32 n)
}
runtime·unlock(&runtime·sched);
runtime·semacquire(&runtime·worldsema);
runtime·semacquire(&runtime·worldsema, false);
m->gcing = 1;
runtime·stoptheworld();
newprocs = n;
......
......@@ -326,7 +326,7 @@ runtime·RaceReleaseMerge(void *addr)
void
runtime·RaceSemacquire(uint32 *s)
{
runtime·semacquire(s);
runtime·semacquire(s, false);
}
// func RaceSemrelease(s *uint32)
......
......@@ -1021,7 +1021,7 @@ bool runtime·isInf(float64 f, int32 sign);
bool runtime·isNaN(float64 f);
float64 runtime·ldexp(float64 d, int32 e);
float64 runtime·modf(float64 d, float64 *ip);
void runtime·semacquire(uint32*);
void runtime·semacquire(uint32*, bool);
void runtime·semrelease(uint32*);
int32 runtime·gomaxprocsfunc(int32 n);
void runtime·procyield(uint32);
......
......@@ -98,8 +98,8 @@ cansemacquire(uint32 *addr)
return 0;
}
static void
semacquireimpl(uint32 volatile *addr, int32 profile)
void
runtime·semacquire(uint32 volatile *addr, bool profile)
{
Sema s; // Needs to be allocated on stack, otherwise garbage collector could deallocate it
SemaRoot *root;
......@@ -144,12 +144,6 @@ semacquireimpl(uint32 volatile *addr, int32 profile)
}
}
void
runtime·semacquire(uint32 volatile *addr)
{
semacquireimpl(addr, 0);
}
void
runtime·semrelease(uint32 volatile *addr)
{
......@@ -189,7 +183,7 @@ runtime·semrelease(uint32 volatile *addr)
}
func runtime_Semacquire(addr *uint32) {
semacquireimpl(addr, 1);
runtime·semacquire(addr, true);
}
func runtime_Semrelease(addr *uint32) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment