Commit 2b605670 authored by Austin Clements's avatar Austin Clements

sync: internal fixed size lock-free queue for sync.Pool

This is the first step toward fixing multiple issues with sync.Pool.
This adds a fixed size, lock-free, single-producer, multi-consumer
queue that will be used in the new Pool stealing implementation.

For #22950, #22331.

Change-Id: I50e85e3cb83a2ee71f611ada88e7f55996504bb5
Reviewed-on: https://go-review.googlesource.com/c/go/+/166957
Run-TryBot: Austin Clements <austin@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: default avatarDavid Chase <drchase@google.com>
parent e47ced78
...@@ -9,3 +9,28 @@ var Runtime_Semacquire = runtime_Semacquire ...@@ -9,3 +9,28 @@ var Runtime_Semacquire = runtime_Semacquire
var Runtime_Semrelease = runtime_Semrelease var Runtime_Semrelease = runtime_Semrelease
var Runtime_procPin = runtime_procPin var Runtime_procPin = runtime_procPin
var Runtime_procUnpin = runtime_procUnpin var Runtime_procUnpin = runtime_procUnpin
// poolDequeue testing.
type PoolDequeue interface {
PushHead(val interface{}) bool
PopHead() (interface{}, bool)
PopTail() (interface{}, bool)
}
func NewPoolDequeue(n int) PoolDequeue {
return &poolDequeue{
vals: make([]eface, n),
}
}
func (d *poolDequeue) PushHead(val interface{}) bool {
return d.pushHead(val)
}
func (d *poolDequeue) PopHead() (interface{}, bool) {
return d.popHead()
}
func (d *poolDequeue) PopTail() (interface{}, bool) {
return d.popTail()
}
...@@ -150,6 +150,79 @@ func TestPoolStress(t *testing.T) { ...@@ -150,6 +150,79 @@ func TestPoolStress(t *testing.T) {
} }
} }
func TestPoolDequeue(t *testing.T) {
const P = 10
// In long mode, do enough pushes to wrap around the 21-bit
// indexes.
N := 1<<21 + 1000
if testing.Short() {
N = 1e3
}
d := NewPoolDequeue(16)
have := make([]int32, N)
var stop int32
var wg WaitGroup
// Start P-1 consumers.
for i := 1; i < P; i++ {
wg.Add(1)
go func() {
fail := 0
for atomic.LoadInt32(&stop) == 0 {
val, ok := d.PopTail()
if ok {
fail = 0
atomic.AddInt32(&have[val.(int)], 1)
if val.(int) == N-1 {
atomic.StoreInt32(&stop, 1)
}
} else {
// Speed up the test by
// allowing the pusher to run.
if fail++; fail%100 == 0 {
runtime.Gosched()
}
}
}
wg.Done()
}()
}
// Start 1 producer.
nPopHead := 0
wg.Add(1)
go func() {
for j := 0; j < N; j++ {
for !d.PushHead(j) {
// Allow a popper to run.
runtime.Gosched()
}
if j%10 == 0 {
val, ok := d.PopHead()
if ok {
nPopHead++
atomic.AddInt32(&have[val.(int)], 1)
}
}
}
wg.Done()
}()
wg.Wait()
// Check results.
for i, count := range have {
if count != 1 {
t.Errorf("expected have[%d] = 1, got %d", i, count)
}
}
if nPopHead == 0 {
// In theory it's possible in a valid schedule for
// popHead to never succeed, but in practice it almost
// always succeeds, so this is unlikely to flake.
t.Errorf("popHead never succeeded")
}
}
func BenchmarkPool(b *testing.B) { func BenchmarkPool(b *testing.B) {
var p Pool var p Pool
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
......
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sync
import (
"sync/atomic"
"unsafe"
)
// poolDequeue is a lock-free fixed-size single-producer,
// multi-consumer queue. The single producer can both push and pop
// from the head, and consumers can pop from the tail.
//
// It has the added feature that it nils out unused slots to avoid
// unnecessary retention of objects. This is important for sync.Pool,
// but not typically a property considered in the literature.
type poolDequeue struct {
// headTail packs together a 32-bit head index and a 32-bit
// tail index. Both are indexes into vals modulo len(vals)-1.
//
// tail = index of oldest data in queue
// head = index of next slot to fill
//
// Slots in the range [tail, head) are owned by consumers.
// A consumer continues to own a slot outside this range until
// it nils the slot, at which point ownership passes to the
// producer.
//
// The head index is stored in the most-significant bits so
// that we can atomically add to it and the overflow is
// harmless.
headTail uint64
// vals is a ring buffer of interface{} values stored in this
// dequeue. The size of this must be a power of 2.
//
// vals[i].typ is nil if the slot is empty and non-nil
// otherwise. A slot is still in use until *both* the tail
// index has moved beyond it and typ has been set to nil. This
// is set to nil atomically by the consumer and read
// atomically by the producer.
vals []eface
}
type eface struct {
typ, val unsafe.Pointer
}
const dequeueBits = 32
// dequeueLimit is the maximum size of a poolDequeue.
//
// This is half of 1<<dequeueBits because detecting fullness depends
// on wrapping around the ring buffer without wrapping around the
// index.
const dequeueLimit = (1 << dequeueBits) / 2
// dequeueNil is used in poolDeqeue to represent interface{}(nil).
// Since we use nil to represent empty slots, we need a sentinel value
// to represent nil.
type dequeueNil *struct{}
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
const mask = 1<<dequeueBits - 1
head = uint32((ptrs >> dequeueBits) & mask)
tail = uint32(ptrs & mask)
return
}
func (d *poolDequeue) pack(head, tail uint32) uint64 {
const mask = 1<<dequeueBits - 1
return (uint64(head) << dequeueBits) |
uint64(tail&mask)
}
// pushHead adds val at the head of the queue. It returns false if the
// queue is full. It must only be called by a single producer.
func (d *poolDequeue) pushHead(val interface{}) bool {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// Queue is full.
return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]
// Check if the head slot has been released by popTail.
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
// Another goroutine is still cleaning up the tail, so
// the queue is actually still full.
return false
}
// The head slot is free, so we own it.
if val == nil {
val = dequeueNil(nil)
}
*(*interface{})(unsafe.Pointer(slot)) = val
// Increment head. This passes ownership of slot to popTail
// and acts as a store barrier for writing the slot.
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
// popHead removes and returns the element at the head of the queue.
// It returns false if the queue is empty. It must only be called by a
// single producer.
func (d *poolDequeue) popHead() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
return nil, false
}
// Confirm tail and decrement head. We do this before
// reading the value to take back ownership of this
// slot.
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// We successfully took back slot.
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// Zero the slot. Unlike popTail, this isn't racing with
// pushHead, so we don't need to be careful here.
*slot = eface{}
return val, true
}
// popTail removes and returns the element at the tail of the queue.
// It returns false if the queue is empty. It may be called by any
// number of consumers.
func (d *poolDequeue) popTail() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
return nil, false
}
// Confirm head and tail (for our speculative check
// above) and increment tail. If this succeeds, then
// we own the slot at tail.
ptrs2 := d.pack(head, tail+1)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// Success.
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
// We now own slot.
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// Tell pushHead that we're done with this slot. Zeroing the
// slot is also important so we don't leave behind references
// that could keep this object live longer than necessary.
//
// We write to val first and then publish that we're done with
// this slot by atomically writing to typ.
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
// At this point pushHead owns the slot.
return val, true
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment