Commit 5b425cc3 authored by Dave Cheney's avatar Dave Cheney

undo CL 6855110 / 869253ef7009

64bit atomics are broken on 32bit systems. This is issue 599.

linux/arm builders all broke with this change, I am concerned that the other 32bit builders are silently impacted.

««« original CL description
net: fix data races on deadline vars

Fixes #4434.

R=mikioh.mikioh, bradfitz, dvyukov, alex.brainman
CC=golang-dev
https://golang.org/cl/6855110
»»»

R=rsc, mikioh.mikioh, dvyukov, minux.ma
CC=golang-dev
https://golang.org/cl/6852105
parent be0d84e3
// Copyright 2012 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build darwin freebsd linux netbsd openbsd windows
package net
import (
"testing"
"time"
)
var deadlineSetTimeTests = []struct {
input time.Time
expected int64
}{
{time.Time{}, 0},
{time.Date(2009, 11, 10, 23, 00, 00, 00, time.UTC), 1257894000000000000}, // 2009-11-10 23:00:00 +0000 UTC
}
func TestDeadlineSetTime(t *testing.T) {
for _, tt := range deadlineSetTimeTests {
var d deadline
d.setTime(tt.input)
actual := d.value()
expected := int64(0)
if !tt.input.IsZero() {
expected = tt.input.UnixNano()
}
if actual != expected {
t.Errorf("set/value failed: expected %v, actual %v", expected, actual)
}
}
}
var deadlineExpiredTests = []struct {
deadline time.Time
expired bool
}{
// note, times are relative to the start of the test run, not
// the start of TestDeadlineExpired
{time.Now().Add(5 * time.Minute), false},
{time.Now().Add(-5 * time.Minute), true},
{time.Time{}, false}, // no deadline set
}
func TestDeadlineExpired(t *testing.T) {
for _, tt := range deadlineExpiredTests {
var d deadline
d.set(tt.deadline.UnixNano())
expired := d.expired()
if expired != tt.expired {
t.Errorf("expire failed: expected %v, actual %v", tt.expired, expired)
}
}
}
...@@ -11,7 +11,6 @@ import ( ...@@ -11,7 +11,6 @@ import (
"os" "os"
"runtime" "runtime"
"sync" "sync"
"sync/atomic"
"syscall" "syscall"
"time" "time"
) )
...@@ -38,11 +37,11 @@ type netFD struct { ...@@ -38,11 +37,11 @@ type netFD struct {
laddr Addr laddr Addr
raddr Addr raddr Addr
// serialize access to Read and Write methods // owned by client
rio, wio sync.Mutex rdeadline int64
rio sync.Mutex
// read and write deadlines wdeadline int64
rdeadline, wdeadline deadline wio sync.Mutex
// owned by fd wait server // owned by fd wait server
ncr, ncw int ncr, ncw int
...@@ -51,31 +50,6 @@ type netFD struct { ...@@ -51,31 +50,6 @@ type netFD struct {
pollServer *pollServer pollServer *pollServer
} }
// deadline is an atomically-accessed number of nanoseconds since 1970
// or 0, if no deadline is set.
type deadline int64
func (d *deadline) expired() bool {
t := d.value()
return t > 0 && time.Now().UnixNano() >= t
}
func (d *deadline) value() int64 {
return atomic.LoadInt64((*int64)(d))
}
func (d *deadline) set(v int64) {
atomic.StoreInt64((*int64)(d), v)
}
func (d *deadline) setTime(t time.Time) {
if t.IsZero() {
d.set(0)
} else {
d.set(t.UnixNano())
}
}
// A pollServer helps FDs determine when to retry a non-blocking // A pollServer helps FDs determine when to retry a non-blocking
// read or write after they get EAGAIN. When an FD needs to wait, // read or write after they get EAGAIN. When an FD needs to wait,
// call s.WaitRead() or s.WaitWrite() to pass the request to the poll server. // call s.WaitRead() or s.WaitWrite() to pass the request to the poll server.
...@@ -108,11 +82,11 @@ func (s *pollServer) AddFD(fd *netFD, mode int) error { ...@@ -108,11 +82,11 @@ func (s *pollServer) AddFD(fd *netFD, mode int) error {
key := intfd << 1 key := intfd << 1
if mode == 'r' { if mode == 'r' {
fd.ncr++ fd.ncr++
t = fd.rdeadline.value() t = fd.rdeadline
} else { } else {
fd.ncw++ fd.ncw++
key++ key++
t = fd.wdeadline.value() t = fd.wdeadline
} }
s.pending[key] = fd s.pending[key] = fd
doWakeup := false doWakeup := false
...@@ -179,8 +153,12 @@ func (s *pollServer) WakeFD(fd *netFD, mode int, err error) { ...@@ -179,8 +153,12 @@ func (s *pollServer) WakeFD(fd *netFD, mode int, err error) {
} }
} }
func (s *pollServer) Now() int64 {
return time.Now().UnixNano()
}
func (s *pollServer) CheckDeadlines() { func (s *pollServer) CheckDeadlines() {
now := time.Now().UnixNano() now := s.Now()
// TODO(rsc): This will need to be handled more efficiently, // TODO(rsc): This will need to be handled more efficiently,
// probably with a heap indexed by wakeup time. // probably with a heap indexed by wakeup time.
...@@ -194,9 +172,9 @@ func (s *pollServer) CheckDeadlines() { ...@@ -194,9 +172,9 @@ func (s *pollServer) CheckDeadlines() {
mode = 'w' mode = 'w'
} }
if mode == 'r' { if mode == 'r' {
t = fd.rdeadline.value() t = fd.rdeadline
} else { } else {
t = fd.wdeadline.value() t = fd.wdeadline
} }
if t > 0 { if t > 0 {
if t <= now { if t <= now {
...@@ -220,15 +198,15 @@ func (s *pollServer) Run() { ...@@ -220,15 +198,15 @@ func (s *pollServer) Run() {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
for { for {
var timeout int64 // nsec to wait for or 0 for none var t = s.deadline
if s.deadline > 0 { if t > 0 {
timeout = s.deadline - time.Now().UnixNano() t = t - s.Now()
if timeout <= 0 { if t <= 0 {
s.CheckDeadlines() s.CheckDeadlines()
continue continue
} }
} }
fd, mode, err := s.poll.WaitFD(s, timeout) fd, mode, err := s.poll.WaitFD(s, t)
if err != nil { if err != nil {
print("pollServer WaitFD: ", err.Error(), "\n") print("pollServer WaitFD: ", err.Error(), "\n")
return return
...@@ -439,9 +417,11 @@ func (fd *netFD) Read(p []byte) (n int, err error) { ...@@ -439,9 +417,11 @@ func (fd *netFD) Read(p []byte) (n int, err error) {
} }
defer fd.decref() defer fd.decref()
for { for {
if fd.rdeadline.expired() { if fd.rdeadline > 0 {
err = errTimeout if time.Now().UnixNano() >= fd.rdeadline {
break err = errTimeout
break
}
} }
n, err = syscall.Read(int(fd.sysfd), p) n, err = syscall.Read(int(fd.sysfd), p)
if err != nil { if err != nil {
...@@ -469,9 +449,11 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) { ...@@ -469,9 +449,11 @@ func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
} }
defer fd.decref() defer fd.decref()
for { for {
if fd.rdeadline.expired() { if fd.rdeadline > 0 {
err = errTimeout if time.Now().UnixNano() >= fd.rdeadline {
break err = errTimeout
break
}
} }
n, sa, err = syscall.Recvfrom(fd.sysfd, p, 0) n, sa, err = syscall.Recvfrom(fd.sysfd, p, 0)
if err != nil { if err != nil {
...@@ -499,13 +481,15 @@ func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.S ...@@ -499,13 +481,15 @@ func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.S
} }
defer fd.decref() defer fd.decref()
for { for {
if fd.rdeadline.expired() { if fd.rdeadline > 0 {
err = errTimeout if time.Now().UnixNano() >= fd.rdeadline {
break err = errTimeout
break
}
} }
n, oobn, flags, sa, err = syscall.Recvmsg(fd.sysfd, p, oob, 0) n, oobn, flags, sa, err = syscall.Recvmsg(fd.sysfd, p, oob, 0)
if err != nil { if err != nil {
// TODO(dfc) should n and oobn be set to 0 // TODO(dfc) should n and oobn be set to nil
if err == syscall.EAGAIN { if err == syscall.EAGAIN {
if err = fd.pollServer.WaitRead(fd); err == nil { if err = fd.pollServer.WaitRead(fd); err == nil {
continue continue
...@@ -528,17 +512,21 @@ func chkReadErr(n int, err error, fd *netFD) error { ...@@ -528,17 +512,21 @@ func chkReadErr(n int, err error, fd *netFD) error {
return err return err
} }
func (fd *netFD) Write(p []byte) (nn int, err error) { func (fd *netFD) Write(p []byte) (int, error) {
fd.wio.Lock() fd.wio.Lock()
defer fd.wio.Unlock() defer fd.wio.Unlock()
if err := fd.incref(false); err != nil { if err := fd.incref(false); err != nil {
return 0, err return 0, err
} }
defer fd.decref() defer fd.decref()
var err error
nn := 0
for { for {
if fd.wdeadline.expired() { if fd.wdeadline > 0 {
err = errTimeout if time.Now().UnixNano() >= fd.wdeadline {
break err = errTimeout
break
}
} }
var n int var n int
n, err = syscall.Write(int(fd.sysfd), p[nn:]) n, err = syscall.Write(int(fd.sysfd), p[nn:])
...@@ -576,9 +564,11 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) { ...@@ -576,9 +564,11 @@ func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
} }
defer fd.decref() defer fd.decref()
for { for {
if fd.wdeadline.expired() { if fd.wdeadline > 0 {
err = errTimeout if time.Now().UnixNano() >= fd.wdeadline {
break err = errTimeout
break
}
} }
err = syscall.Sendto(fd.sysfd, p, 0, sa) err = syscall.Sendto(fd.sysfd, p, 0, sa)
if err == syscall.EAGAIN { if err == syscall.EAGAIN {
...@@ -604,9 +594,11 @@ func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oob ...@@ -604,9 +594,11 @@ func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oob
} }
defer fd.decref() defer fd.decref()
for { for {
if fd.wdeadline.expired() { if fd.wdeadline > 0 {
err = errTimeout if time.Now().UnixNano() >= fd.wdeadline {
break err = errTimeout
break
}
} }
err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0) err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0)
if err == syscall.EAGAIN { if err == syscall.EAGAIN {
......
...@@ -285,39 +285,11 @@ type netFD struct { ...@@ -285,39 +285,11 @@ type netFD struct {
errnoc [2]chan error // read/write submit or cancel operation errors errnoc [2]chan error // read/write submit or cancel operation errors
closec chan bool // used by Close to cancel pending IO closec chan bool // used by Close to cancel pending IO
// serialize access to Read and Write methods // owned by client
rio, wio sync.Mutex rdeadline int64
rio sync.Mutex
// read and write deadlines wdeadline int64
rdeadline, wdeadline deadline wio sync.Mutex
}
// deadline is a number of nanoseconds since 1970 or 0, if no deadline is set.
// For compatability, deadline has the same method set as fd_unix.go, but
// does not use atomic operations as it is not known if data races exist on
// these values.
// TODO(dfc,brainman) when we get a windows race builder, revisit this.
type deadline int64
func (d *deadline) expired() bool {
t := d.value()
return t > 0 && time.Now().UnixNano() >= t
}
func (d *deadline) value() int64 {
return int64(*d)
}
func (d *deadline) set(v int64) {
*d = deadline(v)
}
func (d *deadline) setTime(t time.Time) {
if t.IsZero() {
d.set(0)
} else {
d.set(t.UnixNano())
}
} }
func allocFD(fd syscall.Handle, family, sotype int, net string) *netFD { func allocFD(fd syscall.Handle, family, sotype int, net string) *netFD {
...@@ -450,7 +422,7 @@ func (fd *netFD) Read(buf []byte) (int, error) { ...@@ -450,7 +422,7 @@ func (fd *netFD) Read(buf []byte) (int, error) {
defer fd.rio.Unlock() defer fd.rio.Unlock()
var o readOp var o readOp
o.Init(fd, buf, 'r') o.Init(fd, buf, 'r')
n, err := iosrv.ExecIO(&o, fd.rdeadline.value()) n, err := iosrv.ExecIO(&o, fd.rdeadline)
if err == nil && n == 0 { if err == nil && n == 0 {
err = io.EOF err = io.EOF
} }
...@@ -487,7 +459,7 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) { ...@@ -487,7 +459,7 @@ func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
var o readFromOp var o readFromOp
o.Init(fd, buf, 'r') o.Init(fd, buf, 'r')
o.rsan = int32(unsafe.Sizeof(o.rsa)) o.rsan = int32(unsafe.Sizeof(o.rsa))
n, err = iosrv.ExecIO(&o, fd.rdeadline.value()) n, err = iosrv.ExecIO(&o, fd.rdeadline)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
...@@ -519,7 +491,7 @@ func (fd *netFD) Write(buf []byte) (int, error) { ...@@ -519,7 +491,7 @@ func (fd *netFD) Write(buf []byte) (int, error) {
defer fd.wio.Unlock() defer fd.wio.Unlock()
var o writeOp var o writeOp
o.Init(fd, buf, 'w') o.Init(fd, buf, 'w')
return iosrv.ExecIO(&o, fd.wdeadline.value()) return iosrv.ExecIO(&o, fd.wdeadline)
} }
// WriteTo to network. // WriteTo to network.
...@@ -551,7 +523,7 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { ...@@ -551,7 +523,7 @@ func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
var o writeToOp var o writeToOp
o.Init(fd, buf, 'w') o.Init(fd, buf, 'w')
o.sa = sa o.sa = sa
return iosrv.ExecIO(&o, fd.wdeadline.value()) return iosrv.ExecIO(&o, fd.wdeadline)
} }
// Accept new network connections. // Accept new network connections.
...@@ -600,7 +572,7 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) { ...@@ -600,7 +572,7 @@ func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
var o acceptOp var o acceptOp
o.Init(fd, 'r') o.Init(fd, 'r')
o.newsock = s o.newsock = s
_, err = iosrv.ExecIO(&o, fd.rdeadline.value()) _, err = iosrv.ExecIO(&o, fd.rdeadline)
if err != nil { if err != nil {
closesocket(s) closesocket(s)
return nil, err return nil, err
......
...@@ -82,7 +82,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { ...@@ -82,7 +82,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
if n == 0 && err1 == nil { if n == 0 && err1 == nil {
break break
} }
if err1 == syscall.EAGAIN { if err1 == syscall.EAGAIN && c.wdeadline >= 0 {
if err1 = c.pollServer.WaitWrite(c); err1 == nil { if err1 = c.pollServer.WaitWrite(c); err1 == nil {
continue continue
} }
......
...@@ -58,7 +58,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { ...@@ -58,7 +58,7 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
if n == 0 && err1 == nil { if n == 0 && err1 == nil {
break break
} }
if err1 == syscall.EAGAIN { if err1 == syscall.EAGAIN && c.wdeadline >= 0 {
if err1 = c.pollServer.WaitWrite(c); err1 == nil { if err1 = c.pollServer.WaitWrite(c); err1 == nil {
continue continue
} }
......
...@@ -57,14 +57,16 @@ func socket(net string, f, t, p int, ipv6only bool, ulsa, ursa syscall.Sockaddr, ...@@ -57,14 +57,16 @@ func socket(net string, f, t, p int, ipv6only bool, ulsa, ursa syscall.Sockaddr,
} }
if ursa != nil { if ursa != nil {
fd.wdeadline.setTime(deadline) if !deadline.IsZero() {
fd.wdeadline = deadline.UnixNano()
}
if err = fd.connect(ursa); err != nil { if err = fd.connect(ursa); err != nil {
closesocket(s) closesocket(s)
fd.Close() fd.Close()
return nil, err return nil, err
} }
fd.isConnected = true fd.isConnected = true
fd.wdeadline.set(0) fd.wdeadline = 0
} }
lsa, _ := syscall.Getsockname(s) lsa, _ := syscall.Getsockname(s)
......
...@@ -119,22 +119,29 @@ func setWriteBuffer(fd *netFD, bytes int) error { ...@@ -119,22 +119,29 @@ func setWriteBuffer(fd *netFD, bytes int) error {
return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_SNDBUF, bytes)) return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_SNDBUF, bytes))
} }
// TODO(dfc) these unused error returns could be removed
func setReadDeadline(fd *netFD, t time.Time) error { func setReadDeadline(fd *netFD, t time.Time) error {
fd.rdeadline.setTime(t) if t.IsZero() {
fd.rdeadline = 0
} else {
fd.rdeadline = t.UnixNano()
}
return nil return nil
} }
func setWriteDeadline(fd *netFD, t time.Time) error { func setWriteDeadline(fd *netFD, t time.Time) error {
fd.wdeadline.setTime(t) if t.IsZero() {
fd.wdeadline = 0
} else {
fd.wdeadline = t.UnixNano()
}
return nil return nil
} }
func setDeadline(fd *netFD, t time.Time) error { func setDeadline(fd *netFD, t time.Time) error {
setReadDeadline(fd, t) if err := setReadDeadline(fd, t); err != nil {
setWriteDeadline(fd, t) return err
return nil }
return setWriteDeadline(fd, t)
} }
func setKeepAlive(fd *netFD, keepalive bool) error { func setKeepAlive(fd *netFD, keepalive bool) error {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment