Commit 65a81865 authored by Kirill Smelkov's avatar Kirill Smelkov

go/transaction: New package to deal with transactions (very draft)

Add Go counterpart of Python transaction package[1,2]. The Go version is
not complete - in particular Transaction.Commit is not yet implemented.
However even in this state it is useful to have transaction around for
read-only transaction cases.

The synchronization logic is more well-thought - in particular there is
no dance around "new transaction", because in Go, contrary to ZODB/py,
ZODB connections will be always opened under already started
transaction. See "Synchronization" section in package documentation for
details on this topic.

[1] http://transaction.readthedocs.org
[2] https://github.com/zopefoundation/transaction
parent 6d8e0d52
Zope Public License (ZPL) Version 2.1
A copyright notice accompanies this license document that identifies the
copyright holders.
This license has been certified as open source. It has also been designated as
GPL compatible by the Free Software Foundation (FSF).
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions in source code must retain the accompanying copyright
notice, this list of conditions, and the following disclaimer.
2. Redistributions in binary form must reproduce the accompanying copyright
notice, this list of conditions, and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Names of the copyright holders must not be used to endorse or promote
products derived from this software without prior written permission from the
copyright holders.
4. The right to distribute this software or to use it for any purpose does not
give you the right to use Servicemarks (sm) or Trademarks (tm) of the
copyright
holders. Use of them is covered by separate agreement with the copyright
holders.
5. If any files are modified, you must cause the modified files to carry
prominent notices stating that you changed the files and the date of any
change.
Disclaimer
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY EXPRESSED
OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
EVENT SHALL THE COPYRIGHT HOLDERS BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// Copyright (c) 2001, 2002 Zope Foundation and Contributors.
// All Rights Reserved.
//
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This software is subject to the provisions of the Zope Public License,
// Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
// THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
// WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
// FOR A PARTICULAR PURPOSE.
// Package transaction provides transaction management via two-phase commit protocol.
//
// It is modelled after Python transaction package:
//
// http://transaction.readthedocs.org
// https://github.com/zopefoundation/transaction
//
// but is not exactly equal to it.
//
//
// Overview
//
// Transactions are represented by Transaction interface. A transaction can be
// started with New, which creates transaction object and remembers it in a
// child of provided context:
//
// txn, ctx := transaction.New(ctx)
//
// The transaction should be eventually completed by user - either committed or aborted, e.g.
//
// ... // do something with data
// err := txn.Commit(ctx)
//
// As transactions are associated with contexts, Current returns that associated transaction:
//
// txn := transaction.Current(ctx)
//
// That might be useful to pass transactions through API boundaries.
//
//
// For convenience With is also provided to run a function in a new
// transaction, and either commit or abort it, depending on that function
// result:
//
// ok, err := transaction.With(ctx, func(ctx context.Context) error {
// ... // do something
// return err // the transaction will be committed on nil; aborted on !nil
// })
//
//
// Contrary to transaction/py there is no relation in between transaction and
// current thread - a transaction scope is managed completely by programmer. In
// particular it is possible to use one transaction in several goroutines
// simultaneously. There can be also several in-progress transactions running
// simultaneously, even in one goroutine.
//
//
// Two-phase commit
//
// Transactions this package manages, support committing data into several
// backends simultaneously. The way it is organized is via employing two-phase
// commit protocol:
//
// https://en.wikipedia.org/wiki/Two-phase_commit_protocol
//
// For this scheme to work, every data backend (e.g. database connection) which
// participates in a transaction, must first let the transaction know when the
// data it manages was modified. Then at commit time the transaction manager
// will perform two-phase commit related calls to the backends that joined the
// transaction.
//
// The details of interaction between transaction manager and a backend are in
// DataManager interface. As backends usually provide specific API to users for
// accessing its data, the following example is relevant:
//
// func (b *MyBackend) ChangeID(ctx context.Context, newID int) {
// b.id = newID
//
// // data changed - join the transaction to participate in commit.
// txn := transaction.Current(ctx)
// txn.Join(b)
// }
//
//
// Synchronization
//
// An object, e.g. a backend, might want to be notified of transaction completion events.
// For example
//
// - backends that do not have hooks installed to every access of its
// data, might want to check data dirtiness before transaction
// completion starts,
//
// - backends might need to free some resources after transaction
// completes.
//
// Transaction.RegisterSync provides the way to be notified of such
// synchronization points. Please see Synchronizer interface for details.
//
// Notice: transaction/py also provides "new transaction" synchronization point,
// but there is no need for it in transaction/go:
//
// - the only place where it is used is in zodb/py because data backend
// (Connection) is opened while there is no yet transaction started:
//
// https://github.com/zopefoundation/ZODB/blob/3.10.7-4-gb8d7a8567/src/ZODB/collaborations.txt#L25-L29
//
// - the way transaction/go works is to first start a transaction, and
// then, under it, open all backends, e.g. ZODB connections. This way
// when backend open happens, there is already transaction object
// available in the context, and thus there is no need to be notified of
// when transaction is created.
package transaction
import (
"context"
)
// Status describes status of a transaction.
type Status int
const (
Active Status = iota // transaction is in progress
Committing // transaction commit started
Committed // transaction commit finished successfully
// XXX CommitFailed // transaction commit resulted in error
Aborting // transaction abort started
Aborted // transaction was aborted by user
// XXX AbortFailed ? // transaction abort resulted in error
// XXX Doomed // transaction was doomed
)
// Transaction represents a transaction.
//
// ... and should be completed by user via either Commit or Abort.
//
// Before completion, if there are changes to managed data, corresponding
// DataManager(s) must join the transaction to participate in the completion.
type Transaction interface {
User() string // user name associated with transaction
Description() string // description of transaction
// XXX map[string]interface{} (objects must be simple values serialized with pickle or json, not "instances")
Extension() string
// XXX +Note, SetUser, ...
// Status returns current status of the transaction.
Status() Status
// Commit finalizes the transaction.
//
// Commit completes the transaction by executing the two-phase commit
// algorithm for all DataManagers associated with the transaction.
//
// Commit must not be called after transaction completion began.
Commit(ctx context.Context) error
// Abort aborts the transaction.
//
// Abort completes the transaction by executing Abort on all
// DataManagers associated with it.
//
// Abort must not be called after transaction completion began.
Abort() // XXX + ctx, error?
// XXX + Doom?
// ---- part for data managers & friends ----
// XXX move to separate interface?
// Join associates a DataManager to the transaction.
//
// Only associated data managers will participate in the transaction
// completion - commit or abort.
//
// Join must be called before transaction completion begins.
Join(dm DataManager)
// RegisterSync registers sync to be notified of this transaction boundary events.
//
// See Synchronizer for details.
//
// RegisterSync must be called before transaction completion begins.
RegisterSync(sync Synchronizer)
// XXX SetData(key interface{}, data interface{})
// XXX GetData(key interface{}) interface{}, ok
}
// New creates new transaction.
//
// The transaction will be associated with new context derived from ctx.
// Nested transactions are not supported.
func New(ctx context.Context) (txn Transaction, txnCtx context.Context) {
return newTxn(ctx)
}
// Current returns current transaction.
//
// It panics if there is no transaction associated with provided context.
func Current(ctx context.Context) Transaction {
return currentTxn(ctx)
}
// DataManager manages data and can transactionally persist it.
//
// If DataManager is registered to transaction via Transaction.Join, it will
// participate in that transaction completion - commit or abort. In other words
// a data manager have to join to corresponding transaction when it sees there
// are modifications to data it manages.
type DataManager interface {
// Abort should abort all modifications to managed data.
//
// Abort is called by Transaction outside of two-phase commit, and only
// if abort was caused by user requesting transaction abort. If
// two-phase commit was started and transaction needs to be aborted due
// to two-phase commit logic, TPCAbort will be called.
Abort(txn Transaction) // XXX +ctx, error
// TPCBegin should begin commit of a transaction, starting the two-phase commit.
TPCBegin(txn Transaction) // XXX +ctx, error ?
// Commit should commit modifications to managed data.
//
// It should save changes to be made persistent if the transaction
// commits (if TPCFinish is called later). If TPCAbort is called
// later, changes must not persist.
//
// This should include conflict detection and handling. If no conflicts
// or errors occur, the data manager should be prepared to make the
// changes persist when TPCFinish is called.
Commit(ctx context.Context, txn Transaction) error
// TPCVote should verify that a data manager can commit the transaction.
//
// This is the last chance for a data manager to vote 'no'. A data
// manager votes 'no' by returning an error.
TPCVote(ctx context.Context, txn Transaction) error
// TPCFinish should indicate confirmation that the transaction is done.
//
// It should make all changes to data modified by this transaction persist.
//
// This should never fail. If this returns an error, the database is
// not expected to maintain consistency; it's a serious error.
TPCFinish(ctx context.Context, txn Transaction) error
// TPCAbort should Abort a transaction.
//
// This is called by a transaction manager to end a two-phase commit on
// the data manager. It should abandon all changes to data modified
// by this transaction.
//
// This should never fail.
TPCAbort(ctx context.Context, txn Transaction) // XXX error?
// XXX better do without SortKey - with it it is assumed that
// datamanagers are processed serially.
// SortKey() string
}
// Synchronizer is the interface to participate in transaction-boundary notifications.
type Synchronizer interface {
// BeforeCompletion is called before corresponding transaction is going to be completed.
//
// The transaction manager calls BeforeCompletion before txn is going
// to be completed - either committed or aborted.
BeforeCompletion(txn Transaction) // XXX +ctx, error?
// AfterCompletion is called after corresponding transaction was completed.
//
// The transaction manager calls AfterCompletion after txn is completed
// - either committed or aborted.
AfterCompletion(txn Transaction) // XXX +ctx, error?
}
// ---- syntactic sugar ----
// With runs f in a new transaction, and either commits or aborts it depending on f result.
//
// If f succeeds - the transaction is committed.
// If f fails - the transaction is aborted.
//
// On return ok indicates whether f succeeded, and the error, depending on ok,
// is either the error from f, or the error from transaction commit.
func With(ctx context.Context, f func(context.Context) error) (ok bool, _ error) {
txn, ctx := New(ctx)
err := f(ctx)
if err != nil {
txn.Abort() // XXX err
return false, err
}
return true, txn.Commit(ctx)
}
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package transaction
import (
"context"
"sync"
//"lab.nexedi.com/kirr/go123/xerr"
)
// transaction implements Transaction.
type transaction struct {
mu sync.Mutex
status Status
datav []DataManager
syncv []Synchronizer
// metadata
user string
description string
extension string // XXX
}
// ctxKey is the type private to transaction package, used as key in contexts.
type ctxKey struct{}
// getTxn returns transaction associated with provided context.
// nil is returned if there is no association.
func getTxn(ctx context.Context) *transaction {
t := ctx.Value(ctxKey{})
if t == nil {
return nil
}
return t.(*transaction)
}
// currentTxn serves Current.
func currentTxn(ctx context.Context) Transaction {
txn := getTxn(ctx)
if txn == nil {
panic("transaction: no current transaction")
}
return txn
}
// newTxn serves New.
func newTxn(ctx context.Context) (Transaction, context.Context) {
if getTxn(ctx) != nil {
panic("transaction: new: nested transactions not supported")
}
txn := &transaction{status: Active}
txnCtx := context.WithValue(ctx, ctxKey{}, txn)
return txn, txnCtx
}
// Status implements Transaction.
func (txn *transaction) Status() Status {
txn.mu.Lock()
defer txn.mu.Unlock()
return txn.status
}
// Commit implements Transaction.
func (txn *transaction) Commit(ctx context.Context) error {
panic("TODO")
}
// Abort implements Transaction.
func (txn *transaction) Abort() {
var datav []DataManager
var syncv []Synchronizer
// under lock: change state to aborting; extract datav/syncv
func() {
txn.mu.Lock()
defer txn.mu.Unlock()
txn.checkNotYetCompleting("abort")
txn.status = Aborting
datav = txn.datav; txn.datav = nil
syncv = txn.syncv; txn.syncv = nil
}()
// lock released
// sync.BeforeCompletion -> errBeforeCompletion
n := len(syncv)
wg := sync.WaitGroup{}
wg.Add(n)
//errv := make([]error, n)
for i := 0; i < n; i++ {
i := i
go func() {
defer wg.Done()
syncv[i].BeforeCompletion(txn)
//errv[i] = syncv[i].BeforeCompletion(ctx, txn)
}()
}
wg.Wait()
//ev := xerr.Errorv{}
//for _, err := range errv {
// ev.Appendif(err)
//}
//errBeforeCompletion := ev.Err()
//xerr.Context(&errBeforeCompletion, "transaction: abort:")
// XXX if before completion = err -> skip data.Abort()? state -> AbortFailed?
// data.Abort
n = len(datav)
wg = sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
i := i
go func() {
defer wg.Done()
datav[i].Abort(txn) // XXX err?
}()
}
wg.Wait()
// XXX set txn status
txn.mu.Lock()
// assert .status == Aborting
txn.status = Aborted // XXX what if errBeforeCompletion?
txn.mu.Unlock()
// sync.AfterCompletion
n = len(syncv)
wg = sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
i := i
go func() {
defer wg.Done()
syncv[i].AfterCompletion(txn)
}()
}
// XXX return error?
}
// Join implements Transaction.
func (txn *transaction) Join(dm DataManager) {
txn.mu.Lock()
defer txn.mu.Unlock()
txn.checkNotYetCompleting("join")
// XXX forbid double join?
txn.datav = append(txn.datav, dm)
}
// RegisterSync implements Transaction.
func (txn *transaction) RegisterSync(sync Synchronizer) {
txn.mu.Lock()
defer txn.mu.Unlock()
txn.checkNotYetCompleting("register sync")
// XXX forbid double register?
txn.syncv = append(txn.syncv, sync)
}
// checkNotYetCompleting asserts that transaction completion has not yet began.
//
// and panics if the assert fails.
// must be called with .mu held.
func (txn *transaction) checkNotYetCompleting(who string) {
switch txn.status {
case Active: // XXX + Doomed ?
// ok
default:
panic("transaction: " + who + ": transaction completion already began")
}
}
// ---- meta ----
func (txn *transaction) User() string { return txn.user }
func (txn *transaction) Description() string { return txn.description }
func (txn *transaction) Extension() string { return txn.extension }
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package transaction
import (
"context"
"sync/atomic"
"testing"
)
func TestBasic(t *testing.T) {
ctx := context.Background()
// Current(ø) -> panic
func() {
defer func() {
r := recover()
if r == nil {
t.Fatal("Current(ø) -> no panic")
}
if want := "transaction: no current transaction"; r != want {
t.Fatalf("Current(ø) -> %q; want %q", r, want)
}
}()
Current(ctx)
}()
// New
txn, ctx := New(ctx)
if txn_ := Current(ctx); txn_ != txn {
t.Fatalf("New inconsistent with Current: txn = %#v; txn_ = %#v", txn, txn_)
}
// New(!ø) -> panic
func() {
defer func() {
r := recover()
if r == nil {
t.Fatal("New(!ø) -> no panic")
}
if want := "transaction: new: nested transactions not supported"; r != want {
t.Fatalf("New(!ø) -> %q; want %q", r, want)
}
}()
_, _ = New(ctx)
}()
}
// DataManager that verifies abort path.
type dmAbortOnly struct {
t *testing.T
txn Transaction
nabort int32
}
func (d *dmAbortOnly) Modify() {
d.txn.Join(d)
}
func (d *dmAbortOnly) Abort(txn Transaction) {
if txn != d.txn {
d.t.Fatalf("abort: txn is different")
}
atomic.AddInt32(&d.nabort, +1)
}
func (d *dmAbortOnly) bug() { d.t.Fatal("must not be called on abort") }
func (d *dmAbortOnly) TPCBegin(_ Transaction) { d.bug(); panic(0) }
func (d *dmAbortOnly) Commit(_ context.Context, _ Transaction) error { d.bug(); panic(0) }
func (d *dmAbortOnly) TPCVote(_ context.Context, _ Transaction) error { d.bug(); panic(0) }
func (d *dmAbortOnly) TPCFinish(_ context.Context, _ Transaction) error { d.bug(); panic(0) }
func (d *dmAbortOnly) TPCAbort(_ context.Context, _ Transaction) { d.bug(); panic(0) }
func TestAbort(t *testing.T) {
txn, ctx := New(context.Background())
dm := &dmAbortOnly{t: t, txn: Current(ctx)}
dm.Modify()
// XXX +sync
txn.Abort()
if !(dm.nabort == 1 && txn.Status() == Aborted) {
t.Fatalf("abort: nabort=%d; txn.Status=%v", dm.nabort, txn.Status())
}
// Abort 2nd time -> panic
func() {
defer func() {
r := recover()
if r == nil {
t.Fatal("Abort2 -> no panic")
}
if want := "transaction: abort: transaction completion already began"; r != want {
t.Fatalf("Abort2 -> %q; want %q", r, want)
}
}()
txn.Abort()
}()
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment