Commit c2118847 authored by Dmitriy Vyukov's avatar Dmitriy Vyukov

runtime: add network polling support into scheduler

This is a part of the bigger change that moves network poller into runtime:
https://golang.org/cl/7326051/

R=golang-dev, bradfitz, mikioh.mikioh, rsc
CC=golang-dev
https://golang.org/cl/7448048
parent 4dd3e1e8
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build windows
#include "runtime.h"
// Polls for ready network connections.
// Returns list of goroutines that become runnable.
G*
runtime·netpoll(bool block)
{
// Implementation for platforms that do not support
// integrated network poller.
USED(block);
return nil;
}
...@@ -49,6 +49,7 @@ struct Sched { ...@@ -49,6 +49,7 @@ struct Sched {
Note stopnote; Note stopnote;
uint32 sysmonwait; uint32 sysmonwait;
Note sysmonnote; Note sysmonnote;
uint64 lastpoll;
int32 profilehz; // cpu profiling rate int32 profilehz; // cpu profiling rate
}; };
...@@ -107,6 +108,7 @@ static void globrunqput(G*); ...@@ -107,6 +108,7 @@ static void globrunqput(G*);
static G* globrunqget(P*); static G* globrunqget(P*);
static P* pidleget(void); static P* pidleget(void);
static void pidleput(P*); static void pidleput(P*);
static void injectglist(G*);
// The bootstrap sequence is: // The bootstrap sequence is:
// //
...@@ -135,6 +137,7 @@ runtime·schedinit(void) ...@@ -135,6 +137,7 @@ runtime·schedinit(void)
// so that we don't need to call malloc when we crash. // so that we don't need to call malloc when we crash.
// runtime·findfunc(0); // runtime·findfunc(0);
runtime·sched.lastpoll = runtime·nanotime();
procs = 1; procs = 1;
p = runtime·getenv("GOMAXPROCS"); p = runtime·getenv("GOMAXPROCS");
if(p != nil && (n = runtime·atoi(p)) > 0) { if(p != nil && (n = runtime·atoi(p)) > 0) {
...@@ -391,8 +394,11 @@ runtime·starttheworld(void) ...@@ -391,8 +394,11 @@ runtime·starttheworld(void)
{ {
P *p, *p1; P *p, *p1;
M *mp; M *mp;
G *gp;
bool add; bool add;
gp = runtime·netpoll(false); // non-blocking
injectglist(gp);
add = needaddgcproc(); add = needaddgcproc();
runtime·lock(&runtime·sched); runtime·lock(&runtime·sched);
if(newprocs) { if(newprocs) {
...@@ -976,7 +982,7 @@ execute(G *gp) ...@@ -976,7 +982,7 @@ execute(G *gp)
} }
// Finds a runnable goroutine to execute. // Finds a runnable goroutine to execute.
// Tries to steal from other P's and get g from global queue. // Tries to steal from other P's, get g from global queue, poll network.
static G* static G*
findrunnable(void) findrunnable(void)
{ {
...@@ -1001,6 +1007,13 @@ top: ...@@ -1001,6 +1007,13 @@ top:
if(gp) if(gp)
return gp; return gp;
} }
// poll network
gp = runtime·netpoll(false); // non-blocking
if(gp) {
injectglist(gp->schedlink);
gp->status = Grunnable;
return gp;
}
// If number of spinning M's >= number of busy P's, block. // If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption // This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low. // when GOMAXPROCS>>1 but the program parallelism is low.
...@@ -1055,10 +1068,54 @@ stop: ...@@ -1055,10 +1068,54 @@ stop:
break; break;
} }
} }
// poll network
if(runtime·xchg64(&runtime·sched.lastpoll, 0) != 0) {
if(m->p)
runtime·throw("findrunnable: netpoll with p");
if(m->spinning)
runtime·throw("findrunnable: netpoll with spinning");
gp = runtime·netpoll(true); // block until new work is available
runtime·atomicstore64(&runtime·sched.lastpoll, runtime·nanotime());
if(gp) {
runtime·lock(&runtime·sched);
p = pidleget();
runtime·unlock(&runtime·sched);
if(p) {
acquirep(p);
injectglist(gp->schedlink);
gp->status = Grunnable;
return gp;
}
injectglist(gp);
}
}
stopm(); stopm();
goto top; goto top;
} }
// Injects the list of runnable G's into the scheduler.
// Can run concurrently with GC.
static void
injectglist(G *glist)
{
int32 n;
G *gp;
if(glist == nil)
return;
runtime·lock(&runtime·sched);
for(n = 0; glist; n++) {
gp = glist;
glist = gp->schedlink;
gp->status = Grunnable;
globrunqput(gp);
}
runtime·unlock(&runtime·sched);
for(; n && runtime·sched.npidle; n--)
startm(nil, false);
}
// One round of scheduler: find a runnable goroutine and execute it. // One round of scheduler: find a runnable goroutine and execute it.
// Never returns. // Never returns.
static void static void
...@@ -1916,6 +1973,8 @@ static void ...@@ -1916,6 +1973,8 @@ static void
sysmon(void) sysmon(void)
{ {
uint32 idle, delay; uint32 idle, delay;
int64 now, lastpoll;
G *gp;
uint32 ticks[MaxGomaxprocs]; uint32 ticks[MaxGomaxprocs];
idle = 0; // how many cycles in succession we had not wokeup somebody idle = 0; // how many cycles in succession we had not wokeup somebody
...@@ -1940,6 +1999,14 @@ sysmon(void) ...@@ -1940,6 +1999,14 @@ sysmon(void)
} else } else
runtime·unlock(&runtime·sched); runtime·unlock(&runtime·sched);
} }
// poll network if not polled for more than 10ms
lastpoll = runtime·atomicload64(&runtime·sched.lastpoll);
now = runtime·nanotime();
if(lastpoll != 0 && lastpoll + 10*1000*1000 > now) {
gp = runtime·netpoll(false); // non-blocking
injectglist(gp);
}
// retake P's blocked in syscalls
if(retake(ticks)) if(retake(ticks))
idle = 0; idle = 0;
else else
......
...@@ -767,6 +767,7 @@ void runtime·blockevent(int64, int32); ...@@ -767,6 +767,7 @@ void runtime·blockevent(int64, int32);
extern int64 runtime·blockprofilerate; extern int64 runtime·blockprofilerate;
void runtime·addtimer(Timer*); void runtime·addtimer(Timer*);
bool runtime·deltimer(Timer*); bool runtime·deltimer(Timer*);
G* runtime·netpoll(bool);
#pragma varargck argpos runtime·printf 1 #pragma varargck argpos runtime·printf 1
#pragma varargck type "d" int32 #pragma varargck type "d" int32
...@@ -968,5 +969,5 @@ extern uint64 ·neginf; ...@@ -968,5 +969,5 @@ extern uint64 ·neginf;
enum enum
{ {
UseSpanType = 1, UseSpanType = 0,
}; };
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment