Commit 9d047b36 authored by Kirill Smelkov's avatar Kirill Smelkov

X recvPkt via only 1 syscall

No big speedup measurable but very visible in tracing.
parent 2677c346
...@@ -33,8 +33,11 @@ import ( ...@@ -33,8 +33,11 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/someonegg/gocontainer/rbuf"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet" "lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/go123/xbytes"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
) )
...@@ -78,6 +81,8 @@ type NodeLink struct { ...@@ -78,6 +81,8 @@ type NodeLink struct {
axclosed int32 // whether CloseAccept was called axclosed int32 // whether CloseAccept was called
closed int32 // whether Close was called closed int32 // whether Close was called
rxbuf rbuf.RingBuf // buffer for reading from peerLink
} }
// Conn is a connection established over NodeLink // Conn is a connection established over NodeLink
...@@ -714,6 +719,11 @@ func (c *Conn) sendPkt2(pkt *PktBuf) error { ...@@ -714,6 +719,11 @@ func (c *Conn) sendPkt2(pkt *PktBuf) error {
} }
} }
// XXX serveSend is not needed - Conn.Write is already can be used by multiple
// goroutines simultaneously and works atomically; (same for Conn.Read etc - see pool.FD)
// https://github.com/golang/go/blob/go1.9-3-g099336988b/src/net/net.go#L109
// https://github.com/golang/go/blob/go1.9-3-g099336988b/src/internal/poll/fd_unix.go#L14
// serveSend handles requests to transmit packets from client connections and // serveSend handles requests to transmit packets from client connections and
// serially executes them over associated node link. // serially executes them over associated node link.
func (nl *NodeLink) serveSend() { func (nl *NodeLink) serveSend() {
...@@ -772,35 +782,67 @@ var ErrPktTooBig = errors.New("packet too big") ...@@ -772,35 +782,67 @@ var ErrPktTooBig = errors.New("packet too big")
// //
// rx error, if any, is returned as is and is analyzed in serveRecv // rx error, if any, is returned as is and is analyzed in serveRecv
func (nl *NodeLink) recvPkt() (*PktBuf, error) { func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// TODO organize rx buffers management (freelist etc)
// first read to read pkt header and hopefully up to page of data in 1 syscall
pkt := pktAlloc(4096) pkt := pktAlloc(4096)
// TODO reenable, but NOTE next packet can be also prefetched here -> use buffering ? // len=4K but cap can be more since pkt is from pool - use all space to buffer reads
//n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, pktHeaderLen) // XXX vvv -> pktAlloc() ?
n, err := io.ReadFull(nl.peerLink, pkt.Data[:pktHeaderLen]) data := pkt.Data[:cap(pkt.Data)]
if err != nil {
return nil, err n := 0 // number of pkt bytes obtained so far
// next packet could be already prefetched in part by previous read
if nl.rxbuf.Len() > 0 {
δn, _ := nl.rxbuf.Read(data[:pktHeaderLen])
n += δn
//n += copy(data[:pktHeaderLen], nl.rxbuf)
//nl.rxbuf = nl.rxbuf[n:]
}
// first read to read pkt header and hopefully rest of packet in 1 syscall
if n < pktHeaderLen {
δn, err := io.ReadAtLeast(nl.peerLink, data[n:], pktHeaderLen - n)
if err != nil {
return nil, err
}
n += δn
} }
pkth := pkt.Header() pkth := pkt.Header()
// XXX -> better PktHeader.Decode() ? pktLen := int(pktHeaderLen + ntoh32(pkth.MsgLen)) // whole packet length
pktLen := pktHeaderLen + ntoh32(pkth.MsgLen) // .MsgLen is payload-only length without header
if pktLen > pktMaxSize { if pktLen > pktMaxSize {
return nil, ErrPktTooBig return nil, ErrPktTooBig
} }
pkt.Resize(int(pktLen)) // resize data if we don't have enough room in it
data = xbytes.Resize(data, pktLen)
data = data[:cap(data)]
// we might have more data already prefetched in rxbuf
if nl.rxbuf.Len() > 0 {
δn, _ := nl.rxbuf.Read(data[n:pktLen])
//δn := copy(data[n:pktLen], nl.rxbuf)
//nl.rxbuf = nl.rxbuf[δn:]
n += δn
}
// read rest of pkt data, if we need to // read rest of pkt data, if we need to
if n < len(pkt.Data) { if n < pktLen {
_, err = io.ReadFull(nl.peerLink, pkt.Data[n:]) δn, err := io.ReadAtLeast(nl.peerLink, data[n:], pktLen - n)
if err != nil { if err != nil {
return nil, err return nil, err
} }
n += δn
}
// put overread data into rxbuf for next reader
if n > pktLen {
nl.rxbuf.Write(data[pktLen:n])
} }
// fixup data/pkt
data = data[:n]
pkt.Data = data
if /* XXX temp show only tx */ true && dumpio { if /* XXX temp show only tx */ true && dumpio {
// XXX -> log // XXX -> log
fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt) fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
......
...@@ -765,11 +765,11 @@ func benchmarkNetConnRTT(b *testing.B, c1, c2 net.Conn) { ...@@ -765,11 +765,11 @@ func benchmarkNetConnRTT(b *testing.B, c1, c2 net.Conn) {
defer xclose(c2) defer xclose(c2)
for { for {
n, erx := c2.Read(buf2) n, erx := io.ReadFull(c2, buf2)
//fmt.Printf("2: rx %q\n", buf2[:n]) //fmt.Printf("2: rx %q\n", buf2[:n])
if n > 0 { if n > 0 {
if n != 1 { if n != len(buf2) {
b.Fatalf("read -> %d bytes ; want 1", n) b.Fatalf("read -> %d bytes ; want %d", n, len(buf2))
} }
//fmt.Printf("2: tx %q\n", buf2) //fmt.Printf("2: tx %q\n", buf2)
...@@ -801,9 +801,9 @@ func benchmarkNetConnRTT(b *testing.B, c1, c2 net.Conn) { ...@@ -801,9 +801,9 @@ func benchmarkNetConnRTT(b *testing.B, c1, c2 net.Conn) {
b.Fatal(err) b.Fatal(err)
} }
n, err := c1.Read(buf1) n, err := io.ReadFull(c1, buf1)
//fmt.Printf("1: rx %q\n", buf1[:n]) //fmt.Printf("1: rx %q\n", buf1[:n])
if !(n == 1 && err == nil) { if !(n == len(buf1) && err == nil) {
b.Fatalf("read back: n=%v err=%v", n, err) b.Fatalf("read back: n=%v err=%v", n, err)
} }
......
...@@ -49,13 +49,6 @@ func (pkt *PktBuf) Payload() []byte { ...@@ -49,13 +49,6 @@ func (pkt *PktBuf) Payload() []byte {
return pkt.Data[pktHeaderLen:] return pkt.Data[pktHeaderLen:]
} }
// Resize resizes pkt to be of length n.
//
// semantic = xbytes.Resize.
func (pkt *PktBuf) Resize(n int) {
pkt.Data = xbytes.Resize(pkt.Data, n)
}
// ---- PktBuf freelist ---- // ---- PktBuf freelist ----
// pktBufPool is sync.Pool<pktBuf> // pktBufPool is sync.Pool<pktBuf>
......
...@@ -74,8 +74,10 @@ Sgo() { ...@@ -74,8 +74,10 @@ Sgo() {
# XXX use `go run ...` so it does not need go install? # XXX use `go run ...` so it does not need go install?
# -alsologtostderr # -alsologtostderr
# -cpuprofile cpu.out
# -trace trace.out
exec -a Sgo \ exec -a Sgo \
neo -cpuprofile cpu.out -log_dir=$log storage -cluster=$cluster -bind=$Sbind -masters=$Mbind "$@" & neo -trace trace.out -log_dir=$log storage -cluster=$cluster -bind=$Sbind -masters=$Mbind "$@" &
} }
...@@ -268,7 +270,7 @@ sync ...@@ -268,7 +270,7 @@ sync
# run benchmarks # run benchmarks
N=`seq 2` # XXX repeat benchmarks N time N=`seq 4` # XXX repeat benchmarks N time
# time1 <url> - run benchmarks on the URL once # time1 <url> - run benchmarks on the URL once
bench1() { bench1() {
......
...@@ -28,6 +28,7 @@ import ( ...@@ -28,6 +28,7 @@ import (
"os" "os"
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
"runtime/trace"
//"lab.nexedi.com/kirr/neo/go/xcommon/log" //"lab.nexedi.com/kirr/neo/go/xcommon/log"
) )
...@@ -126,6 +127,7 @@ func (prog *MainProg) main() { ...@@ -126,6 +127,7 @@ func (prog *MainProg) main() {
flag.Usage = prog.usage flag.Usage = prog.usage
cpuprofile := flag.String("cpuprofile", "", "write cpu profile to `file`") cpuprofile := flag.String("cpuprofile", "", "write cpu profile to `file`")
memprofile := flag.String("memprofile", "", "write memory profile to `file`") memprofile := flag.String("memprofile", "", "write memory profile to `file`")
traceout := flag.String("trace", "", "write execution trace to `file`")
flag.Parse() flag.Parse()
argv := flag.Args() argv := flag.Args()
...@@ -162,6 +164,22 @@ func (prog *MainProg) main() { ...@@ -162,6 +164,22 @@ func (prog *MainProg) main() {
} }
}() }()
if *traceout != "" {
f, err := os.Create(*traceout)
if err != nil {
Fatal("could not create trace: ", err)
}
defer func() {
if err := f.Close(); err != nil {
Fatal("could not close trace: ", err)
}
}()
if err := trace.Start(f); err != nil {
Fatal("could not start trace: ", err)
}
defer trace.Stop()
}
// help on a topic // help on a topic
if command == "help" { if command == "help" {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment