Commit 9cbd2fb1 authored by Dmitriy Vyukov's avatar Dmitriy Vyukov

runtime: remove locks from netpoll hotpaths

Introduces two-phase goroutine parking mechanism -- prepare to park, commit park.
This mechanism does not require backing mutex to protect wait predicate.
Use it in netpoll. See comment in netpoll.goc for details.
This slightly reduces contention between reader, writer and read/write io notifications;
and just eliminates a bunch of mutex operations from hotpaths, thus making then faster.

benchmark                             old ns/op    new ns/op    delta
BenchmarkTCP4ConcurrentReadWrite           2109         1945   -7.78%
BenchmarkTCP4ConcurrentReadWrite-2         1162         1113   -4.22%
BenchmarkTCP4ConcurrentReadWrite-4          798          755   -5.39%
BenchmarkTCP4ConcurrentReadWrite-8          803          748   -6.85%
BenchmarkTCP4Persistent                    9411         9240   -1.82%
BenchmarkTCP4Persistent-2                  5888         5813   -1.27%
BenchmarkTCP4Persistent-4                  4016         3968   -1.20%
BenchmarkTCP4Persistent-8                  3943         3857   -2.18%

R=golang-codereviews, mikioh.mikioh, gobot, iant, rsc
CC=golang-codereviews, khr
https://golang.org/cl/45700043
parent cb86d867
......@@ -483,6 +483,12 @@ TEXT runtime·xchg(SB), NOSPLIT, $0-8
XCHGL AX, 0(BX)
RET
TEXT runtime·xchgp(SB), NOSPLIT, $0-8
MOVL 4(SP), BX
MOVL 8(SP), AX
XCHGL AX, 0(BX)
RET
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL 4(SP), AX
again:
......
......@@ -549,6 +549,12 @@ TEXT runtime·xchg64(SB), NOSPLIT, $0-16
XCHGQ AX, 0(BX)
RET
TEXT runtime·xchgp(SB), NOSPLIT, $0-16
MOVQ 8(SP), BX
MOVQ 16(SP), AX
XCHGQ AX, 0(BX)
RET
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL 8(SP), AX
again:
......
......@@ -41,6 +41,19 @@ runtime·xchg(uint32 volatile* addr, uint32 v)
}
}
#pragma textflag NOSPLIT
void*
runtime·xchgp(void* volatile* addr, void* v)
{
void *old;
for(;;) {
old = *addr;
if(runtime·cas(addr, old, v))
return old;
}
}
#pragma textflag NOSPLIT
void
runtime·procyield(uint32 cnt)
......
......@@ -224,7 +224,7 @@ runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc)
mysg.selgen = NOSELGEN;
g->param = nil;
enqueue(&c->sendq, &mysg);
runtime·park(runtime·unlock, c, "chan send");
runtime·parkunlock(c, "chan send");
if(g->param == nil) {
runtime·lock(c);
......@@ -252,7 +252,7 @@ asynch:
mysg.elem = nil;
mysg.selgen = NOSELGEN;
enqueue(&c->sendq, &mysg);
runtime·park(runtime·unlock, c, "chan send");
runtime·parkunlock(c, "chan send");
runtime·lock(c);
goto asynch;
......@@ -356,7 +356,7 @@ runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *receive
mysg.selgen = NOSELGEN;
g->param = nil;
enqueue(&c->recvq, &mysg);
runtime·park(runtime·unlock, c, "chan receive");
runtime·parkunlock(c, "chan receive");
if(g->param == nil) {
runtime·lock(c);
......@@ -387,7 +387,7 @@ asynch:
mysg.elem = nil;
mysg.selgen = NOSELGEN;
enqueue(&c->recvq, &mysg);
runtime·park(runtime·unlock, c, "chan receive");
runtime·parkunlock(c, "chan receive");
runtime·lock(c);
goto asynch;
......@@ -799,6 +799,14 @@ selunlock(Select *sel)
}
}
static bool
selparkcommit(G *gp, void *sel)
{
USED(gp);
selunlock(sel);
return true;
}
void
runtime·block(void)
{
......@@ -971,7 +979,7 @@ loop:
}
g->param = nil;
runtime·park((void(*)(Lock*))selunlock, (Lock*)sel, "select");
runtime·park(selparkcommit, sel, "select");
sellock(sel);
sg = g->param;
......
......@@ -2307,7 +2307,7 @@ runfinq(void)
finq = nil;
if(fb == nil) {
fingwait = 1;
runtime·park(runtime·unlock, &finlock, "finalizer wait");
runtime·parkunlock(&finlock, "finalizer wait");
continue;
}
runtime·unlock(&finlock);
......
......@@ -19,21 +19,40 @@ package net
// An implementation must call the following function to denote that the pd is ready.
// void runtime·netpollready(G **gpp, PollDesc *pd, int32 mode);
// PollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
// goroutines respectively. The semaphore can be in the following states:
// READY - io readiness notification is pending;
// a goroutine consumes the notification by changing the state to nil.
// WAIT - a goroutine prepares to park on the semaphore, but not yet parked;
// the goroutine commits to park by changing the state to G pointer,
// or, alternatively, concurrent io notification changes the state to READY,
// or, alternatively, concurrent timeout/close changes the state to nil.
// G pointer - the goroutine is blocked on the semaphore;
// io notification or timeout/close changes the state to READY or nil respectively
// and unparks the goroutine.
// nil - nothing of the above.
#define READY ((G*)1)
#define WAIT ((G*)2)
struct PollDesc
{
PollDesc* link; // in pollcache, protected by pollcache.Lock
// The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
// This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
// pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO rediness notification)
// proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated
// in a lock-free way by all operations.
Lock; // protectes the following fields
uintptr fd;
bool closing;
uintptr seq; // protects from stale timers and ready notifications
G* rg; // G waiting for read or READY (binary semaphore)
G* rg; // READY, WAIT, G waiting for read or nil
Timer rt; // read deadline timer (set if rt.fv != nil)
int64 rd; // read deadline
G* wg; // the same for writes
Timer wt;
int64 wd;
G* wg; // READY, WAIT, G waiting for write or nil
Timer wt; // write deadline timer
int64 wd; // write deadline
};
static struct
......@@ -47,7 +66,7 @@ static struct
// seq is incremented when deadlines are changed or descriptor is reused.
} pollcache;
static bool netpollblock(PollDesc*, int32);
static bool netpollblock(PollDesc*, int32, bool);
static G* netpollunblock(PollDesc*, int32, bool);
static void deadline(int64, Eface);
static void readDeadline(int64, Eface);
......@@ -97,7 +116,6 @@ func runtime_pollClose(pd *PollDesc) {
}
func runtime_pollReset(pd *PollDesc, mode int) (err int) {
runtime·lock(pd);
err = checkerr(pd, mode);
if(err)
goto ret;
......@@ -106,11 +124,9 @@ func runtime_pollReset(pd *PollDesc, mode int) (err int) {
else if(mode == 'w')
pd->wg = nil;
ret:
runtime·unlock(pd);
}
func runtime_pollWait(pd *PollDesc, mode int) (err int) {
runtime·lock(pd);
err = checkerr(pd, mode);
if(err == 0) {
#ifdef GOOS_solaris
......@@ -119,7 +135,7 @@ func runtime_pollWait(pd *PollDesc, mode int) (err int) {
else if(mode == 'w')
runtime·netpollarmwrite(pd->fd);
#endif
while(!netpollblock(pd, mode)) {
while(!netpollblock(pd, mode, false)) {
err = checkerr(pd, mode);
if(err != 0)
break;
......@@ -128,11 +144,9 @@ func runtime_pollWait(pd *PollDesc, mode int) (err int) {
// Pretend it has not happened and retry.
}
}
runtime·unlock(pd);
}
func runtime_pollWaitCanceled(pd *PollDesc, mode int) {
runtime·lock(pd);
#ifdef GOOS_solaris
if(mode == 'r')
runtime·netpollarmread(pd->fd);
......@@ -140,9 +154,8 @@ func runtime_pollWaitCanceled(pd *PollDesc, mode int) {
runtime·netpollarmwrite(pd->fd);
#endif
// wait for ioready, ignore closing or timeouts.
while(!netpollblock(pd, mode))
while(!netpollblock(pd, mode, true))
;
runtime·unlock(pd);
}
func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) {
......@@ -197,7 +210,7 @@ func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) {
}
// If we set the new deadline in the past, unblock currently pending IO if any.
rg = nil;
wg = nil;
runtime·atomicstorep(&wg, nil); // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock
if(pd->rd < 0)
rg = netpollunblock(pd, 'r', false);
if(pd->wd < 0)
......@@ -217,6 +230,7 @@ func runtime_pollUnblock(pd *PollDesc) {
runtime·throw("runtime_pollUnblock: already closing");
pd->closing = true;
pd->seq++;
runtime·atomicstorep(&rg, nil); // full memory barrier between store to closing and read of rg/wg in netpollunblock
rg = netpollunblock(pd, 'r', false);
wg = netpollunblock(pd, 'w', false);
if(pd->rt.fv) {
......@@ -247,12 +261,10 @@ runtime·netpollready(G **gpp, PollDesc *pd, int32 mode)
G *rg, *wg;
rg = wg = nil;
runtime·lock(pd);
if(mode == 'r' || mode == 'r'+'w')
rg = netpollunblock(pd, 'r', true);
if(mode == 'w' || mode == 'r'+'w')
wg = netpollunblock(pd, 'w', true);
runtime·unlock(pd);
if(rg) {
rg->schedlink = *gpp;
*gpp = rg;
......@@ -273,51 +285,75 @@ checkerr(PollDesc *pd, int32 mode)
return 0;
}
static bool
blockcommit(G *gp, G **gpp)
{
return runtime·casp(gpp, WAIT, gp);
}
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
static bool
netpollblock(PollDesc *pd, int32 mode)
netpollblock(PollDesc *pd, int32 mode, bool waitio)
{
G **gpp;
G **gpp, *old;
gpp = &pd->rg;
if(mode == 'w')
gpp = &pd->wg;
if(*gpp == READY) {
*gpp = nil;
return true;
// set the gpp semaphore to WAIT
for(;;) {
old = *gpp;
if(old == READY) {
*gpp = nil;
return true;
}
if(old != nil)
runtime·throw("netpollblock: double wait");
if(runtime·casp(gpp, nil, WAIT))
break;
}
if(*gpp != nil)
runtime·throw("netpollblock: double wait");
*gpp = g;
runtime·park(runtime·unlock, &pd->Lock, "IO wait");
runtime·lock(pd);
if(g->param)
return true;
return false;
// need to recheck error states after setting gpp to WAIT
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
if(waitio || checkerr(pd, mode) == 0)
runtime·park((bool(*)(G*, void*))blockcommit, gpp, "IO wait");
// be careful to not lose concurrent READY notification
old = runtime·xchgp(gpp, nil);
if(old > WAIT)
runtime·throw("netpollblock: corrupted state");
return old == READY;
}
static G*
netpollunblock(PollDesc *pd, int32 mode, bool ioready)
{
G **gpp, *old;
G **gpp, *old, *new;
gpp = &pd->rg;
if(mode == 'w')
gpp = &pd->wg;
if(*gpp == READY)
return nil;
if(*gpp == nil) {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
for(;;) {
old = *gpp;
if(old == READY)
return nil;
if(old == nil && !ioready) {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil;
}
new = nil;
if(ioready)
*gpp = READY;
return nil;
new = READY;
if(runtime·casp(gpp, old, new))
break;
}
old = *gpp;
// pass unblock reason onto blocked g
old->param = (void*)ioready;
*gpp = nil;
return old;
if(old > WAIT)
return old; // must be G*
return nil;
}
static void
......@@ -343,14 +379,14 @@ deadlineimpl(int64 now, Eface arg, bool read, bool write)
if(pd->rd <= 0 || pd->rt.fv == nil)
runtime·throw("deadlineimpl: inconsistent read deadline");
pd->rd = -1;
pd->rt.fv = nil;
runtime·atomicstorep(&pd->rt.fv, nil); // full memory barrier between store to rd and load of rg in netpollunblock
rg = netpollunblock(pd, 'r', false);
}
if(write) {
if(pd->wd <= 0 || (pd->wt.fv == nil && !read))
runtime·throw("deadlineimpl: inconsistent write deadline");
pd->wd = -1;
pd->wt.fv = nil;
runtime·atomicstorep(&pd->wt.fv, nil); // full memory barrier between store to wd and load of wg in netpollunblock
wg = netpollunblock(pd, 'w', false);
}
runtime·unlock(pd);
......
......@@ -1353,10 +1353,10 @@ top:
execute(gp);
}
// Puts the current goroutine into a waiting state and unlocks the lock.
// The goroutine can be made runnable again by calling runtime·ready(gp).
// Puts the current goroutine into a waiting state and calls unlockf.
// If unlockf returns false, the goroutine is resumed.
void
runtime·park(void(*unlockf)(Lock*), Lock *lock, int8 *reason)
runtime·park(bool(*unlockf)(G*, void*), void *lock, int8 *reason)
{
m->waitlock = lock;
m->waitunlockf = unlockf;
......@@ -1364,17 +1364,39 @@ runtime·park(void(*unlockf)(Lock*), Lock *lock, int8 *reason)
runtime·mcall(park0);
}
static bool
parkunlock(G *gp, void *lock)
{
USED(gp);
runtime·unlock(lock);
return true;
}
// Puts the current goroutine into a waiting state and unlocks the lock.
// The goroutine can be made runnable again by calling runtime·ready(gp).
void
runtime·parkunlock(Lock *lock, int8 *reason)
{
runtime·park(parkunlock, lock, reason);
}
// runtime·park continuation on g0.
static void
park0(G *gp)
{
bool ok;
gp->status = Gwaiting;
gp->m = nil;
m->curg = nil;
if(m->waitunlockf) {
m->waitunlockf(m->waitlock);
ok = m->waitunlockf(gp, m->waitlock);
m->waitunlockf = nil;
m->waitlock = nil;
if(!ok) {
gp->status = Grunnable;
execute(gp); // Schedule it back, never returns.
}
}
if(m->lockedg) {
stoplockedm();
......
......@@ -339,7 +339,7 @@ struct M
GCStats gcstats;
bool racecall;
bool needextram;
void (*waitunlockf)(Lock*);
bool (*waitunlockf)(G*, void*);
void* waitlock;
uintptr settype_buf[1024];
......@@ -790,21 +790,6 @@ int32 runtime·read(int32, void*, int32);
int32 runtime·write(int32, void*, int32);
int32 runtime·close(int32);
int32 runtime·mincore(void*, uintptr, byte*);
bool runtime·cas(uint32*, uint32, uint32);
bool runtime·cas64(uint64*, uint64, uint64);
bool runtime·casp(void**, void*, void*);
// Don't confuse with XADD x86 instruction,
// this one is actually 'addx', that is, add-and-fetch.
uint32 runtime·xadd(uint32 volatile*, int32);
uint64 runtime·xadd64(uint64 volatile*, int64);
uint32 runtime·xchg(uint32 volatile*, uint32);
uint64 runtime·xchg64(uint64 volatile*, uint64);
uint32 runtime·atomicload(uint32 volatile*);
void runtime·atomicstore(uint32 volatile*, uint32);
void runtime·atomicstore64(uint64 volatile*, uint64);
uint64 runtime·atomicload64(uint64 volatile*);
void* runtime·atomicloadp(void* volatile*);
void runtime·atomicstorep(void* volatile*, void*);
void runtime·jmpdefer(FuncVal*, void*);
void runtime·exit1(int32);
void runtime·ready(G*);
......@@ -845,14 +830,33 @@ uint32 runtime·fastrand1(void);
void runtime·rewindmorestack(Gobuf*);
int32 runtime·timediv(int64, int32, int32*);
void runtime·setmg(M*, G*);
void runtime·newextram(void);
// atomic operations
bool runtime·cas(uint32*, uint32, uint32);
bool runtime·cas64(uint64*, uint64, uint64);
bool runtime·casp(void**, void*, void*);
// Don't confuse with XADD x86 instruction,
// this one is actually 'addx', that is, add-and-fetch.
uint32 runtime·xadd(uint32 volatile*, int32);
uint64 runtime·xadd64(uint64 volatile*, int64);
uint32 runtime·xchg(uint32 volatile*, uint32);
uint64 runtime·xchg64(uint64 volatile*, uint64);
void* runtime·xchgp(void* volatile*, void*);
uint32 runtime·atomicload(uint32 volatile*);
void runtime·atomicstore(uint32 volatile*, uint32);
void runtime·atomicstore64(uint64 volatile*, uint64);
uint64 runtime·atomicload64(uint64 volatile*);
void* runtime·atomicloadp(void* volatile*);
void runtime·atomicstorep(void* volatile*, void*);
void runtime·setmg(M*, G*);
void runtime·newextram(void);
void runtime·exit(int32);
void runtime·breakpoint(void);
void runtime·gosched(void);
void runtime·gosched0(G*);
void runtime·schedtrace(bool);
void runtime·park(void(*)(Lock*), Lock*, int8*);
void runtime·park(bool(*)(G*, void*), void*, int8*);
void runtime·parkunlock(Lock*, int8*);
void runtime·tsleep(int64, int8*);
M* runtime·newm(void);
void runtime·goexit(void);
......
......@@ -137,7 +137,7 @@ runtime·semacquire(uint32 volatile *addr, bool profile)
// Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep.
semqueue(root, addr, &s);
runtime·park(runtime·unlock, root, "semacquire");
runtime·parkunlock(root, "semacquire");
if(cansemacquire(addr)) {
if(t0)
runtime·blockevent(s.releasetime - t0, 3);
......@@ -254,7 +254,7 @@ func runtime_Syncsemacquire(s *SyncSema) {
else
s->tail->next = &w;
s->tail = &w;
runtime·park(runtime·unlock, s, "semacquire");
runtime·parkunlock(s, "semacquire");
if(t0)
runtime·blockevent(w.releasetime - t0, 2);
}
......@@ -288,7 +288,7 @@ func runtime_Syncsemrelease(s *SyncSema, n uint32) {
else
s->tail->next = &w;
s->tail = &w;
runtime·park(runtime·unlock, s, "semarelease");
runtime·parkunlock(s, "semarelease");
} else
runtime·unlock(s);
}
......@@ -76,7 +76,7 @@ runtime·tsleep(int64 ns, int8 *reason)
t.arg.data = g;
runtime·lock(&timers);
addtimer(&t);
runtime·park(runtime·unlock, &timers, reason);
runtime·parkunlock(&timers, reason);
}
static FuncVal timerprocv = {timerproc};
......@@ -222,7 +222,7 @@ timerproc(void)
if(delta < 0) {
// No timers left - put goroutine to sleep.
timers.rescheduling = true;
runtime·park(runtime·unlock, &timers, "timer goroutine (idle)");
runtime·parkunlock(&timers, "timer goroutine (idle)");
continue;
}
// At least one timer pending. Sleep until then.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment