Commit 5a20b4a6 authored by Dmitriy Vyukov's avatar Dmitriy Vyukov

sync: faster Cond

The new version does not require any memory allocations and is 30-50% faster.
Also detect and painc if Cond is copied after first.

benchmark            old ns/op    new ns/op    delta
BenchmarkCond1             317          195  -38.49%
BenchmarkCond1-2           875          607  -30.63%
BenchmarkCond1-4          1116          548  -50.90%
BenchmarkCond1-8          1013          613  -39.49%
BenchmarkCond1-16          983          450  -54.22%
BenchmarkCond2             559          352  -37.03%
BenchmarkCond2-2          1916         1378  -28.08%
BenchmarkCond2-4          1518         1322  -12.91%
BenchmarkCond2-8          2313         1291  -44.19%
BenchmarkCond2-16         1885         1078  -42.81%
BenchmarkCond4            1070          614  -42.62%
BenchmarkCond4-2          4899         3047  -37.80%
BenchmarkCond4-4          3813         3006  -21.16%
BenchmarkCond4-8          3605         3045  -15.53%
BenchmarkCond4-16         4148         2637  -36.43%
BenchmarkCond8            2086         1264  -39.41%
BenchmarkCond8-2          9961         6736  -32.38%
BenchmarkCond8-4          8135         7689   -5.48%
BenchmarkCond8-8          9623         7517  -21.89%
BenchmarkCond8-16        11661         8093  -30.60%

R=sougou, rsc, bradfitz, r
CC=golang-dev
https://golang.org/cl/11573043
parent d1b66439
......@@ -21,22 +21,23 @@ package sync
#include "runtime.h"
#include "arch_GOARCH.h"
typedef struct Sema Sema;
struct Sema
typedef struct SemaWaiter SemaWaiter;
struct SemaWaiter
{
uint32 volatile* addr;
G* g;
int64 releasetime;
Sema* prev;
Sema* next;
int32 nrelease; // -1 for acquire
SemaWaiter* prev;
SemaWaiter* next;
};
typedef struct SemaRoot SemaRoot;
struct SemaRoot
{
Lock;
Sema* head;
Sema* tail;
SemaWaiter* head;
SemaWaiter* tail;
// Number of waiters. Read w/o the lock.
uint32 volatile nwait;
};
......@@ -59,7 +60,7 @@ semroot(uint32 *addr)
}
static void
semqueue(SemaRoot *root, uint32 volatile *addr, Sema *s)
semqueue(SemaRoot *root, uint32 volatile *addr, SemaWaiter *s)
{
s->g = g;
s->addr = addr;
......@@ -73,7 +74,7 @@ semqueue(SemaRoot *root, uint32 volatile *addr, Sema *s)
}
static void
semdequeue(SemaRoot *root, Sema *s)
semdequeue(SemaRoot *root, SemaWaiter *s)
{
if(s->next)
s->next->prev = s->prev;
......@@ -101,7 +102,7 @@ cansemacquire(uint32 *addr)
void
runtime·semacquire(uint32 volatile *addr, bool profile)
{
Sema s; // Needs to be allocated on stack, otherwise garbage collector could deallocate it
SemaWaiter s; // Needs to be allocated on stack, otherwise garbage collector could deallocate it
SemaRoot *root;
int64 t0;
......@@ -147,7 +148,7 @@ runtime·semacquire(uint32 volatile *addr, bool profile)
void
runtime·semrelease(uint32 volatile *addr)
{
Sema *s;
SemaWaiter *s;
SemaRoot *root;
root = semroot(addr);
......@@ -200,3 +201,93 @@ func runtime_Semacquire(addr *uint32) {
func runtime_Semrelease(addr *uint32) {
runtime·semrelease(addr);
}
typedef struct SyncSema SyncSema;
struct SyncSema
{
Lock;
SemaWaiter* head;
SemaWaiter* tail;
};
func runtime_Syncsemcheck(size uintptr) {
if(size != sizeof(SyncSema)) {
runtime·printf("bad SyncSema size: sync:%D runtime:%D\n", (int64)size, (int64)sizeof(SyncSema));
runtime·throw("bad SyncSema size");
}
}
// Syncsemacquire waits for a pairing Syncsemrelease on the same semaphore s.
func runtime_Syncsemacquire(s *SyncSema) {
SemaWaiter w, *wake;
int64 t0;
w.g = g;
w.nrelease = -1;
w.next = nil;
w.releasetime = 0;
t0 = 0;
if(runtime·blockprofilerate > 0) {
t0 = runtime·cputicks();
w.releasetime = -1;
}
runtime·lock(s);
if(s->head && s->head->nrelease > 0) {
// have pending release, consume it
wake = nil;
s->head->nrelease--;
if(s->head->nrelease == 0) {
wake = s->head;
s->head = wake->next;
if(s->head == nil)
s->tail = nil;
}
runtime·unlock(s);
if(wake)
runtime·ready(wake->g);
} else {
// enqueue itself
if(s->tail == nil)
s->head = &w;
else
s->tail->next = &w;
s->tail = &w;
runtime·park(runtime·unlock, s, "semacquire");
if(t0)
runtime·blockevent(w.releasetime - t0, 2);
}
}
// Syncsemrelease waits for n pairing Syncsemacquire on the same semaphore s.
func runtime_Syncsemrelease(s *SyncSema, n uint32) {
SemaWaiter w, *wake;
w.g = g;
w.nrelease = (int32)n;
w.next = nil;
w.releasetime = 0;
runtime·lock(s);
while(w.nrelease > 0 && s->head && s->head->nrelease < 0) {
// have pending acquire, satisfy it
wake = s->head;
s->head = wake->next;
if(s->head == nil)
s->tail = nil;
if(wake->releasetime)
wake->releasetime = runtime·cputicks();
runtime·ready(wake->g);
w.nrelease--;
}
if(w.nrelease > 0) {
// enqueue itself
if(s->tail == nil)
s->head = &w;
else
s->tail->next = &w;
s->tail = &w;
runtime·park(runtime·unlock, s, "semarelease");
} else
runtime·unlock(s);
}
......@@ -4,6 +4,11 @@
package sync
import (
"sync/atomic"
"unsafe"
)
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
......@@ -11,27 +16,16 @@ package sync
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond can be created as part of other structures.
// A Cond must not be copied after first use.
type Cond struct {
L Locker // held while observing or changing the condition
m Mutex // held to avoid internal races
// We must be careful to make sure that when Signal
// releases a semaphore, the corresponding acquire is
// executed by a goroutine that was already waiting at
// the time of the call to Signal, not one that arrived later.
// To ensure this, we segment waiting goroutines into
// generations punctuated by calls to Signal. Each call to
// Signal begins another generation if there are no goroutines
// left in older generations for it to wake. Because of this
// optimization (only begin another generation if there
// are no older goroutines left), we only need to keep track
// of the two most recent generations, which we call old
// and new.
oldWaiters int // number of waiters in old generation...
oldSema *uint32 // ... waiting on this semaphore
// L is held while observing or changing the condition
L Locker
newWaiters int // number of waiters in new generation...
newSema *uint32 // ... waiting on this semaphore
sema syncSema
waiters uint32 // number of waiters
checker copyChecker
}
// NewCond returns a new Cond with Locker l.
......@@ -56,22 +50,16 @@ func NewCond(l Locker) *Cond {
// c.L.Unlock()
//
func (c *Cond) Wait() {
c.checker.check()
if raceenabled {
_ = c.m.state
raceDisable()
}
c.m.Lock()
if c.newSema == nil {
c.newSema = new(uint32)
}
s := c.newSema
c.newWaiters++
c.m.Unlock()
atomic.AddUint32(&c.waiters, 1)
if raceenabled {
raceEnable()
}
c.L.Unlock()
runtime_Semacquire(s)
runtime_Syncsemacquire(&c.sema)
c.L.Lock()
}
......@@ -80,26 +68,7 @@ func (c *Cond) Wait() {
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
if raceenabled {
_ = c.m.state
raceDisable()
}
c.m.Lock()
if c.oldWaiters == 0 && c.newWaiters > 0 {
// Retire old generation; rename new to old.
c.oldWaiters = c.newWaiters
c.oldSema = c.newSema
c.newWaiters = 0
c.newSema = nil
}
if c.oldWaiters > 0 {
c.oldWaiters--
runtime_Semrelease(c.oldSema)
}
c.m.Unlock()
if raceenabled {
raceEnable()
}
c.signalImpl(false)
}
// Broadcast wakes all goroutines waiting on c.
......@@ -107,27 +76,43 @@ func (c *Cond) Signal() {
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
c.signalImpl(true)
}
func (c *Cond) signalImpl(all bool) {
c.checker.check()
if raceenabled {
_ = c.m.state
raceDisable()
}
c.m.Lock()
// Wake both generations.
if c.oldWaiters > 0 {
for i := 0; i < c.oldWaiters; i++ {
runtime_Semrelease(c.oldSema)
}
c.oldWaiters = 0
for {
old := atomic.LoadUint32(&c.waiters)
if old == 0 {
if raceenabled {
raceEnable()
}
if c.newWaiters > 0 {
for i := 0; i < c.newWaiters; i++ {
runtime_Semrelease(c.newSema)
return
}
c.newWaiters = 0
c.newSema = nil
new := old - 1
if all {
new = 0
}
c.m.Unlock()
if atomic.CompareAndSwapUint32(&c.waiters, old, new) {
if raceenabled {
raceEnable()
}
runtime_Syncsemrelease(&c.sema, old-new)
return
}
}
}
// copyChecker holds back pointer to itself to detect object copying.
type copyChecker uintptr
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
......@@ -5,6 +5,8 @@ package sync_test
import (
. "sync"
"runtime"
"testing"
)
......@@ -124,3 +126,130 @@ func TestCondBroadcast(t *testing.T) {
}
c.Broadcast()
}
func TestRace(t *testing.T) {
x := 0
c := NewCond(&Mutex{})
done := make(chan bool)
go func() {
c.L.Lock()
x = 1
c.Wait()
if x != 2 {
t.Fatal("want 2")
}
x = 3
c.Signal()
c.L.Unlock()
done <- true
}()
go func() {
c.L.Lock()
for {
if x == 1 {
x = 2
c.Signal()
break
}
c.L.Unlock()
runtime.Gosched()
c.L.Lock()
}
c.L.Unlock()
done <- true
}()
go func() {
c.L.Lock()
for {
if x == 2 {
c.Wait()
if x != 3 {
t.Fatal("want 3")
}
break
}
if x == 3 {
break
}
c.L.Unlock()
runtime.Gosched()
c.L.Lock()
}
c.L.Unlock()
done <- true
}()
<-done
<-done
<-done
}
func TestCondCopy(t *testing.T) {
defer func() {
err := recover()
if err == nil || err.(string) != "sync.Cond is copied" {
t.Fatalf("got %v, expect sync.Cond is copied", err)
}
}()
c := Cond{L: &Mutex{}}
c.Signal()
c2 := c
c2.Signal()
}
func BenchmarkCond1(b *testing.B) {
benchmarkCond(b, 1)
}
func BenchmarkCond2(b *testing.B) {
benchmarkCond(b, 2)
}
func BenchmarkCond4(b *testing.B) {
benchmarkCond(b, 4)
}
func BenchmarkCond8(b *testing.B) {
benchmarkCond(b, 8)
}
func BenchmarkCond16(b *testing.B) {
benchmarkCond(b, 16)
}
func BenchmarkCond32(b *testing.B) {
benchmarkCond(b, 32)
}
func benchmarkCond(b *testing.B, waiters int) {
c := NewCond(&Mutex{})
done := make(chan bool)
id := 0
for routine := 0; routine < waiters+1; routine++ {
go func() {
for i := 0; i < b.N; i++ {
c.L.Lock()
if id == -1 {
c.L.Unlock()
break
}
id++
if id == waiters+1 {
id = 0
c.Broadcast()
} else {
c.Wait()
}
c.L.Unlock()
}
c.L.Lock()
id = -1
c.Broadcast()
c.L.Unlock()
done <- true
}()
}
for routine := 0; routine < waiters+1; routine++ {
<-done
}
}
......@@ -4,6 +4,8 @@
package sync
import "unsafe"
// defined in package runtime
// Semacquire waits until *s > 0 and then atomically decrements it.
......@@ -16,3 +18,19 @@ func runtime_Semacquire(s *uint32)
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
func runtime_Semrelease(s *uint32)
// Opaque representation of SyncSema in runtime/sema.goc.
type syncSema [3]uintptr
// Syncsemacquire waits for a pairing Syncsemrelease on the same semaphore s.
func runtime_Syncsemacquire(s *syncSema)
// Syncsemrelease waits for n pairing Syncsemacquire on the same semaphore s.
func runtime_Syncsemrelease(s *syncSema, n uint32)
// Ensure that sync and runtime agree on size of syncSema.
func runtime_Syncsemcheck(size uintptr)
func init() {
var s syncSema
runtime_Syncsemcheck(unsafe.Sizeof(s))
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment