Commit 9a1e142b authored by Keith Randall's avatar Keith Randall Committed by Dmitriy Vyukov

runtime: convert channel operations to Go, part 1 (chansend1).

LGTM=dvyukov
R=dvyukov, khr
CC=golang-codereviews
https://golang.org/cl/127460044
parent 7f223e3b
......@@ -383,6 +383,7 @@ func (w *Walker) parseFile(dir, file string) (*ast.File, error) {
" mspan struct{}; m struct{}; lock struct{}; slicetype struct{};" +
" iface struct{}; eface struct{}; interfacetype struct{}; itab struct{};" +
" mcache struct{}; bucket struct{}; sudog struct{}; g struct{};" +
" hchan struct{}; chantype struct{}; waitq struct{};" +
" note struct{};" +
")"
f, err = parser.ParseFile(fset, filename, src, 0)
......
......@@ -340,6 +340,7 @@ selecttype(int32 size)
sudog->list = list(sudog->list, nod(ODCLFIELD, newname(lookup("elem")), typenod(ptrto(types[TUINT8]))));
sudog->list = list(sudog->list, nod(ODCLFIELD, newname(lookup("releasetime")), typenod(types[TUINT64])));
sudog->list = list(sudog->list, nod(ODCLFIELD, newname(lookup("nrelease")), typenod(types[TINT32])));
sudog->list = list(sudog->list, nod(ODCLFIELD, newname(lookup("waitlink")), typenod(ptrto(types[TUINT8]))));
typecheck(&sudog, Etype);
sudog->type->noalg = 1;
sudog->type->local = 1;
......
......@@ -52,3 +52,5 @@ TEXT ·unsafe_New(SB),NOSPLIT,$0-0
JMP runtime·newobject(SB)
TEXT ·unsafe_NewArray(SB),NOSPLIT,$0-0
JMP runtime·newarray(SB)
TEXT ·makechan(SB),NOSPLIT,$0-0
JMP runtime·makechan(SB)
......@@ -52,3 +52,5 @@ TEXT ·unsafe_New(SB),NOSPLIT,$0-0
JMP runtime·newobject(SB)
TEXT ·unsafe_NewArray(SB),NOSPLIT,$0-0
JMP runtime·newarray(SB)
TEXT ·makechan(SB),NOSPLIT,$0-0
JMP runtime·makechan(SB)
......@@ -52,3 +52,5 @@ TEXT ·unsafe_New(SB),NOSPLIT,$0-0
JMP runtime·newobject(SB)
TEXT ·unsafe_NewArray(SB),NOSPLIT,$0-0
JMP runtime·newarray(SB)
TEXT ·makechan(SB),NOSPLIT,$0-0
JMP runtime·makechan(SB)
......@@ -52,3 +52,5 @@ TEXT ·unsafe_New(SB),NOSPLIT,$0-0
B runtime·newobject(SB)
TEXT ·unsafe_NewArray(SB),NOSPLIT,$0-0
B runtime·newarray(SB)
TEXT ·makechan(SB),NOSPLIT,$0-0
B runtime·makechan(SB)
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package runtime
// This file contains the implementation of Go channels
// and select statements.
import "unsafe"
const (
maxAlign = 8
hchanSize = unsafe.Sizeof(hchan{})
debugChan = false
)
// TODO: make hchan.buf an unsafe.Pointer, not a *uint8
func makechan(t *chantype, size int64) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
gothrow("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
gothrow("makechan: bad alignment")
}
if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (maxMem-hchanSize)/uintptr(elem.size)) {
panic("makechan: size out of range")
}
var c *hchan
if elem.kind&kindNoPointers != 0 || size == 0 {
// allocate memory in one call
c = (*hchan)(gomallocgc(hchanSize+uintptr(size)*uintptr(elem.size), nil, flagNoScan))
if size > 0 && elem.size != 0 {
c.buf = (*uint8)(add(unsafe.Pointer(c), hchanSize))
} else {
c.buf = (*uint8)(unsafe.Pointer(c)) // race detector uses this location for synchronization
}
} else {
c = new(hchan)
c.buf = (*uint8)(newarray(elem, uintptr(size)))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
println("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size)
}
return c
}
// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(unsafe.Pointer(c.buf), uintptr(i)*uintptr(c.elemsize))
}
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(t *chantype, c *hchan, elem unsafe.Pointer) {
chansend(t, c, elem, true, gogetcallerpc(unsafe.Pointer(&t)))
}
/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if raceenabled {
fn := chansend
pc := **(**uintptr)(unsafe.Pointer(&fn))
raceReadObjectPC(t.elem, ep, callerpc, pc)
}
if c == nil {
if !block {
return false
}
gopark(nil, nil, "chan send (nil chan)")
return false // not reached
}
if debugChan {
println("chansend: chan=", c)
}
var t0 int64
if blockprofilerate > 0 {
t0 = gocputicks()
}
golock(&c.lock)
if raceenabled {
fn := chansend
pc := **(**uintptr)(unsafe.Pointer(&fn))
racereadpc(unsafe.Pointer(c), pc, callerpc)
}
if c.closed != 0 {
gounlock(&c.lock)
panic("send on closed channel")
}
if c.dataqsiz == 0 { // synchronous channel
sg := c.recvq.dequeue()
if sg != nil { // found a waiting receiver
if raceenabled {
racesync(c, sg)
}
gounlock(&c.lock)
recvg := sg.g
recvg.param = unsafe.Pointer(sg)
if sg.elem != nil {
memmove(unsafe.Pointer(sg.elem), ep, uintptr(c.elemsize))
}
if sg.releasetime != 0 {
// Yes, this is ugly. On 64-bit sg.releasetime has type
// int. On 32-bit it has type int64. There's no easy way
// to assign to both types in Go. At some point we'll
// write the Go types directly instead of generating them
// via the C types. At that point, this nastiness goes away.
*(*int64)(unsafe.Pointer(&sg.releasetime)) = gocputicks()
}
goready(recvg)
return true
}
if !block {
gounlock(&c.lock)
return false
}
// no receiver available: block on this channel.
gp := getg()
mysg := acquireSudog()
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = (*uint8)(ep)
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send")
// someone woke us up.
if gp.param == nil {
if c.closed == 0 {
gothrow("chansend: spurious wakeup")
}
panic("send on closed channel")
}
if mysg.releasetime > 0 {
goblockevent(int64(mysg.releasetime)-t0, 3)
}
if mysg != gp.waiting {
gothrow("G waiting list is corrupted!")
}
gp.waiting = nil
releaseSudog(mysg)
return true
}
// asynchronous channel
// wait for some space to write our data
var t1 int64
for c.qcount >= c.dataqsiz {
if !block {
gounlock(&c.lock)
return false
}
gp := getg()
mysg := acquireSudog()
if t0 != 0 {
mysg.releasetime = -1
}
mysg.g = gp
mysg.elem = nil
mysg.selectdone = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send")
// someone woke us up - try again
if mysg.releasetime != 0 {
t1 = int64(mysg.releasetime)
}
releaseSudog(mysg)
golock(&c.lock)
if c.closed != 0 {
gounlock(&c.lock)
panic("send on closed channel")
}
}
// write our data into the channel buffer
if raceenabled {
raceacquire(chanbuf(c, c.sendx))
racerelease(chanbuf(c, c.sendx))
}
memmove(chanbuf(c, c.sendx), ep, uintptr(c.elemsize))
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
// wake up a waiting receiver
sg := c.recvq.dequeue()
if sg != nil {
recvg := sg.g
gounlock(&c.lock)
if sg.releasetime != 0 {
*(*int64)(unsafe.Pointer(&sg.releasetime)) = gocputicks()
}
goready(recvg)
} else {
gounlock(&c.lock)
}
if t1 > 0 {
goblockevent(t1-t0, 3)
}
return true
}
func (q *waitq) enqueue(sgp *sudog) {
sgp.link = nil
if q.first == nil {
q.first = sgp
q.last = sgp
return
}
q.last.link = sgp
q.last = sgp
}
func (q *waitq) dequeue() *sudog {
for {
sgp := q.first
if sgp == nil {
return nil
}
q.first = sgp.link
if q.last == sgp {
q.last = nil
}
// if sgp participates in a select and is already signaled, ignore it
if sgp.selectdone != nil {
// claim the right to signal
if *sgp.selectdone != 0 || !gocas(sgp.selectdone, 0, 1) {
continue
}
}
return sgp
}
}
func racesync(c *hchan, sg *sudog) {
racerelease(chanbuf(c, 0))
raceacquireg(sg.g, chanbuf(c, 0))
racereleaseg(sg.g, chanbuf(c, 0))
raceacquire(chanbuf(c, 0))
}
......@@ -18,78 +18,6 @@ static SudoG* dequeue(WaitQ*);
static void enqueue(WaitQ*, SudoG*);
static void racesync(Hchan*, SudoG*);
static Type hchanType;
static String hchanStr;
void
runtime·chaninit(void)
{
int32 i, off;
byte *mask;
// Generate (bare minimum) type descriptor for Hchan.
hchanType.size = sizeof(Hchan);
hchanStr = runtime·gostringnocopy((byte*)"chan");
hchanType.string = &hchanStr;
// Hchan has only one interesting pointer -- buf.
off = offsetof(Hchan, buf)/PtrSize*gcBits;
if(off%8)
runtime·throw("makechan: unaligned buffer");
if(off+8 >= sizeof(hchanType.gc)*8)
runtime·throw("makechan: gc mask does not fit");
mask = (byte*)hchanType.gc;
for(i = 0; i < off/8; i++)
mask[i] = (BitsScalar<<2) | (BitsScalar<<6);
mask[off/8] = (BitsPointer<<2) | (BitsDead<<6);
}
static Hchan*
makechan(ChanType *t, int64 hint)
{
Hchan *c;
Type *elem;
elem = t->elem;
// compiler checks this but be safe.
if(elem->size >= (1<<16))
runtime·throw("makechan: invalid channel element type");
if((sizeof(*c)%MAXALIGN) != 0 || elem->align > MAXALIGN)
runtime·throw("makechan: bad alignment");
if(hint < 0 || (intgo)hint != hint || (elem->size > 0 && hint > (MaxMem - sizeof(*c)) / elem->size))
runtime·panicstring("makechan: size out of range");
if((elem->kind&KindNoPointers) || hint == 0) {
// allocate memory in one call
c = (Hchan*)runtime·mallocgc(sizeof(*c) + hint*elem->size, nil, FlagNoScan);
if(hint > 0 && elem->size != 0)
c->buf = (byte*)(c+1);
else
c->buf = (byte*)c; // race detector uses this location for synchronization
} else {
c = (Hchan*)runtime·cnew(&hchanType);
c->buf = runtime·cnewarray(elem, hint);
}
c->elemsize = elem->size;
c->elemtype = elem;
c->dataqsiz = hint;
if(debug)
runtime·printf("makechan: chan=%p; elemsize=%D; elemalg=%p; dataqsiz=%D\n",
c, (int64)elem->size, elem->alg, (int64)c->dataqsiz);
return c;
}
func reflect·makechan(t *ChanType, size uint64) (c *Hchan) {
c = makechan(t, size);
}
func makechan(t *ChanType, size int64) (c *Hchan) {
c = makechan(t, size);
}
/*
* generic single channel send/recv
* if the bool pointer is nil,
......@@ -375,11 +303,6 @@ closed:
return true;
}
#pragma textflag NOSPLIT
func chansend1(t *ChanType, c *Hchan, elem *byte) {
chansend(t, c, elem, true, runtime·getcallerpc(&t));
}
#pragma textflag NOSPLIT
func chanrecv1(t *ChanType, c *Hchan, elem *byte) {
chanrecv(t, c, elem, true, nil);
......
......@@ -5,22 +5,9 @@
#define MAXALIGN 8
typedef struct WaitQ WaitQ;
typedef struct SudoG SudoG;
typedef struct Select Select;
typedef struct Scase Scase;
// Known to compiler.
// Changes here must also be made in src/cmd/gc/select.c's selecttype.
struct SudoG
{
G* g;
uint32* selectdone;
SudoG* link;
byte* elem; // data element
int64 releasetime;
int32 nrelease; // -1 for acquire
};
struct WaitQ
{
SudoG* first;
......
......@@ -430,6 +430,38 @@ func TestMultiConsumer(t *testing.T) {
}
}
func TestShrinkStackDuringBlockedSend(t *testing.T) {
// make sure that channel operations still work when we are
// blocked on a channel send and we shrink the stack.
// NOTE: this test probably won't fail unless stack.c:StackDebug
// is set to >= 1.
const n = 10
c := make(chan int)
done := make(chan struct{})
go func() {
for i := 0; i < n; i++ {
c <- i
// use lots of stack, briefly.
stackGrowthRecursive(20)
}
done <- struct{}{}
}()
for i := 0; i < n; i++ {
x := <-c
if x != i {
t.Errorf("bad channel read: want %d, got %d", i, x)
}
// Waste some time so sender can finish using lots of stack
// and block in channel send.
time.Sleep(1 * time.Millisecond)
// trigger GC which will shrink the stack of the sender.
runtime.GC()
}
<-done
}
func BenchmarkChanNonblocking(b *testing.B) {
myc := make(chan int)
b.RunParallel(func(pb *testing.PB) {
......
......@@ -218,7 +218,10 @@ runtime·blockevent(int64 cycles, int32 skip)
if(rate <= 0 || (rate > cycles && runtime·fastrand1()%rate > cycles))
return;
nstk = runtime·callers(skip, stk, nelem(stk));
if(g->m->curg == nil || g->m->curg == g)
nstk = runtime·callers(skip, stk, nelem(stk));
else
nstk = runtime·gcallers(g->m->curg, skip, stk, nelem(stk));
runtime·lock(&runtime·proflock);
b = stkbucket(BProf, 0, stk, nstk, true);
b->data.bp.count++;
......@@ -226,6 +229,12 @@ runtime·blockevent(int64 cycles, int32 skip)
runtime·unlock(&runtime·proflock);
}
void
runtime·blockevent_m(void)
{
runtime·blockevent(g->m->scalararg[0] + ((int64)g->m->scalararg[1]<<32), g->m->scalararg[2]);
}
void
runtime·iterate_memprof(void (*callback)(Bucket*, uintptr, uintptr*, uintptr, uintptr, uintptr))
{
......
......@@ -287,7 +287,7 @@ func TestBlockProfile(t *testing.T) {
`},
{"chan send", blockChanSend, `
[0-9]+ [0-9]+ @ 0x[0-9,a-f]+ 0x[0-9,a-f]+ 0x[0-9,a-f]+ 0x[0-9,a-f]+ 0x[0-9,a-f]+
# 0x[0-9,a-f]+ runtime\.chansend1\+0x[0-9,a-f]+ .*/src/pkg/runtime/chan.goc:[0-9]+
# 0x[0-9,a-f]+ runtime\.chansend1\+0x[0-9,a-f]+ .*/src/pkg/runtime/chan.go:[0-9]+
# 0x[0-9,a-f]+ runtime/pprof_test\.blockChanSend\+0x[0-9,a-f]+ .*/src/pkg/runtime/pprof/pprof_test.go:[0-9]+
# 0x[0-9,a-f]+ runtime/pprof_test\.TestBlockProfile\+0x[0-9,a-f]+ .*/src/pkg/runtime/pprof/pprof_test.go:[0-9]+
`},
......
......@@ -158,7 +158,6 @@ runtime·schedinit(void)
runtime·symtabinit();
runtime·stackinit();
runtime·mallocinit();
runtime·chaninit();
mcommoninit(g->m);
// Initialize the itable value for newErrorCString,
......
......@@ -63,6 +63,7 @@ typedef uint8 byte;
typedef struct Func Func;
typedef struct G G;
typedef struct Gobuf Gobuf;
typedef struct SudoG SudoG;
typedef struct Lock Lock;
typedef struct M M;
typedef struct P P;
......@@ -217,6 +218,18 @@ struct Gobuf
uintreg ret;
uintptr lr;
};
// Known to compiler.
// Changes here must also be made in src/cmd/gc/select.c's selecttype.
struct SudoG
{
G* g;
uint32* selectdone;
SudoG* link;
byte* elem; // data element
int64 releasetime;
int32 nrelease; // -1 for acquire
SudoG* waitlink; // G.waiting list
};
struct GCStats
{
// the struct must consist of only uint64's,
......@@ -285,6 +298,7 @@ struct G
uintptr sigpc;
uintptr gopc; // pc of go statement that created this goroutine
uintptr racectx;
SudoG *waiting; // sudog structures this G is waiting on (that have a valid elem ptr)
uintptr end[];
};
......@@ -884,7 +898,6 @@ MCache* runtime·allocmcache(void);
void runtime·freemcache(MCache*);
void runtime·mallocinit(void);
void runtime·gcinit(void);
void runtime·chaninit(void);
void* runtime·mallocgc(uintptr size, Type* typ, uint32 flag);
void runtime·runpanic(Panic*);
uintptr runtime·getcallersp(void*);
......
......@@ -268,8 +268,10 @@ runtime·stackfree(G *gp, void *v, Stktop *top)
n = (uintptr)(top+1) - (uintptr)v;
if(n & (n-1))
runtime·throw("stack not a power of 2");
if(StackDebug >= 1)
if(StackDebug >= 1) {
runtime·printf("stackfree %p %d\n", v, (int32)n);
runtime·memclr(v, n); // for testing, clobber stack data
}
gp->stacksize -= n;
if(runtime·debug.efence || StackFromSystem) {
if(runtime·debug.efence || StackFaultOnFree)
......@@ -753,6 +755,21 @@ adjustdefers(G *gp, AdjustInfo *adjinfo)
}
}
static void
adjustsudogs(G *gp, AdjustInfo *adjinfo)
{
SudoG *s;
byte *e;
// the data elements pointed to by a SudoG structure
// might be in the stack.
for(s = gp->waiting; s != nil; s = s->waitlink) {
e = s->elem;
if(adjinfo->oldstk <= e && e < adjinfo->oldbase)
s->elem = e + adjinfo->delta;
}
}
// Copies the top stack segment of gp to a new stack segment of a
// different size. The top segment must contain nframes frames.
static void
......@@ -791,6 +808,7 @@ copystack(G *gp, uintptr nframes, uintptr newsize)
// adjust other miscellaneous things that have pointers into stacks.
adjustctxt(gp, &adjinfo);
adjustdefers(gp, &adjinfo);
adjustsudogs(gp, &adjinfo);
// copy the stack (including Stktop) to the new location
runtime·memmove(newbase - used, oldbase - used, used);
......@@ -1069,6 +1087,8 @@ runtime·shrinkstack(G *gp)
if(gp->m != nil && gp->m->libcallsp != 0)
return;
#endif
if(StackDebug > 0)
runtime·printf("shrinking stack %D->%D\n", (uint64)oldsize, (uint64)newsize);
nframes = copyabletopsegment(gp);
if(nframes == -1)
return;
......
......@@ -30,6 +30,18 @@ func racereadrangepc(addr unsafe.Pointer, len int, callpc, pc uintptr)
//go:noescape
func racewriterangepc(addr unsafe.Pointer, len int, callpc, pc uintptr)
//go:noescape
func raceacquire(addr unsafe.Pointer)
//go:noescape
func racerelease(addr unsafe.Pointer)
//go:noescape
func raceacquireg(gp *g, addr unsafe.Pointer)
//go:noescape
func racereleaseg(gp *g, addr unsafe.Pointer)
// Should be a built-in for unsafe.Pointer?
func add(p unsafe.Pointer, x uintptr) unsafe.Pointer {
return unsafe.Pointer(uintptr(p) + x)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment