Commit d234f9a7 authored by Daniel Theophanes's avatar Daniel Theophanes

database/sql: allow using a single connection from the database

Databases have the following concepts: Statement, Batch, and Session.

A statement is often a single line like:
SELECT Amount from Account where ID = 50;

A batch is one or more statements submitted together for the query
to process. It may be a DELETE, INSERT, two UPDATES and a SELECT in
a single query text.

A session is usually represented by a single database connection.
This often is an issue when dealing with scopes in databases.
Temporary tables and variables can have batch, session, or global
scope depending on the syntax, database, and use.

Furthermore, some databases (sybase and derivatives in perticular)
that prevent certain statements from being in the same batch
and may necessitate being in the same session.

By allowing users to extract a Conn from the database they can manage
session on their own without hacking around it by making connection
pools of single connections (a real workaround presented in issue).
It is tempting to just use a transaction, but this isn't always
desirable or an option if running an interactive session or
alter script set that itself starts transactions.

Fixes #18081

Change-Id: I9bdf0796632c48d4bcaef3624c629641984ffaf2
Reviewed-on: https://go-review.googlesource.com/40694Reviewed-by: default avatarBrad Fitzpatrick <bradfitz@golang.org>
parent 42c5f399
...@@ -583,6 +583,17 @@ func Open(driverName, dataSourceName string) (*DB, error) { ...@@ -583,6 +583,17 @@ func Open(driverName, dataSourceName string) (*DB, error) {
return db, nil return db, nil
} }
func (db *DB) pingDC(ctx context.Context, dc *driverConn, release func(error)) error {
var err error
if pinger, ok := dc.ci.(driver.Pinger); ok {
withLock(dc, func() {
err = pinger.Ping(ctx)
})
}
release(err)
return err
}
// PingContext verifies a connection to the database is still alive, // PingContext verifies a connection to the database is still alive,
// establishing a connection if necessary. // establishing a connection if necessary.
func (db *DB) PingContext(ctx context.Context) error { func (db *DB) PingContext(ctx context.Context) error {
...@@ -602,11 +613,7 @@ func (db *DB) PingContext(ctx context.Context) error { ...@@ -602,11 +613,7 @@ func (db *DB) PingContext(ctx context.Context) error {
return err return err
} }
if pinger, ok := dc.ci.(driver.Pinger); ok { return db.pingDC(ctx, dc, dc.releaseConn)
err = pinger.Ping(ctx)
}
dc.releaseConn(err)
return err
} }
// Ping verifies a connection to the database is still alive, // Ping verifies a connection to the database is still alive,
...@@ -1404,6 +1411,189 @@ func (db *DB) Driver() driver.Driver { ...@@ -1404,6 +1411,189 @@ func (db *DB) Driver() driver.Driver {
return db.driver return db.driver
} }
// ErrConnDone is returned by any operation that is performed on a connection
// that has already been committed or rolled back.
var ErrConnDone = errors.New("database/sql: connection is already closed")
// Conn returns a single connection by either opening a new connection
// or returning an existing connection from the connection pool. Conn will
// block until either a connection is returned or ctx is canceled.
// Queries run on the same Conn will be run in the same database session.
//
// Every Conn must be returned to the database pool after use by
// calling Conn.Close.
func (db *DB) Conn(ctx context.Context) (*Conn, error) {
var dc *driverConn
var err error
for i := 0; i < maxBadConnRetries; i++ {
dc, err = db.conn(ctx, cachedOrNewConn)
if err != driver.ErrBadConn {
break
}
}
if err == driver.ErrBadConn {
dc, err = db.conn(ctx, cachedOrNewConn)
}
if err != nil {
return nil, err
}
conn := &Conn{
db: db,
dc: dc,
}
return conn, nil
}
// Conn represents a single database session rather a pool of database
// sessions. Prefer running queries from DB unless there is a specific
// need for a continuous single database session.
//
// A Conn must call Close to return the connection to the database pool
// and may do so concurrently with a running query.
//
// After a call to Close, all operations on the
// connection fail with ErrConnDone.
type Conn struct {
db *DB
// closemu prevents the connection from closing while there
// is an active query. It is held for read during queries
// and exclusively during close.
closemu sync.RWMutex
// dc is owned until close, at which point
// it's returned to the connection pool.
dc *driverConn
// done transitions from 0 to 1 exactly once, on close.
// Once done, all operations fail with ErrConnDone.
// Use atomic operations on value when checking value.
done int32
}
func (c *Conn) grabConn() (*driverConn, error) {
if atomic.LoadInt32(&c.done) != 0 {
return nil, ErrConnDone
}
return c.dc, nil
}
// PingContext verifies the connection to the database is still alive.
func (c *Conn) PingContext(ctx context.Context) error {
dc, err := c.grabConn()
if err != nil {
return err
}
c.closemu.RLock()
return c.db.pingDC(ctx, dc, c.closemuRUnlockCondReleaseConn)
}
// ExecContext executes a query without returning any rows.
// The args are for any placeholder parameters in the query.
func (c *Conn) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
dc, err := c.grabConn()
if err != nil {
return nil, err
}
c.closemu.RLock()
return c.db.execDC(ctx, dc, c.closemuRUnlockCondReleaseConn, query, args)
}
// QueryContext executes a query that returns rows, typically a SELECT.
// The args are for any placeholder parameters in the query.
func (c *Conn) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
dc, err := c.grabConn()
if err != nil {
return nil, err
}
c.closemu.RLock()
return c.db.queryDC(ctx, dc, c.closemuRUnlockCondReleaseConn, query, args)
}
// QueryRowContext executes a query that is expected to return at most one row.
// QueryRowContext always returns a non-nil value. Errors are deferred until
// Row's Scan method is called.
func (c *Conn) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {
rows, err := c.QueryContext(ctx, query, args...)
return &Row{rows: rows, err: err}
}
// PrepareContext creates a prepared statement for later queries or executions.
// Multiple queries or executions may be run concurrently from the
// returned statement.
// The caller must call the statement's Close method
// when the statement is no longer needed.
//
// The provided context is used for the preparation of the statement, not for the
// execution of the statement.
func (c *Conn) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
dc, err := c.grabConn()
if err != nil {
return nil, err
}
c.closemu.RLock()
return c.db.prepareDC(ctx, dc, c.closemuRUnlockCondReleaseConn, query)
}
// BeginTx starts a transaction.
//
// The provided context is used until the transaction is committed or rolled back.
// If the context is canceled, the sql package will roll back
// the transaction. Tx.Commit will return an error if the context provided to
// BeginTx is canceled.
//
// The provided TxOptions is optional and may be nil if defaults should be used.
// If a non-default isolation level is used that the driver doesn't support,
// an error will be returned.
func (c *Conn) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
dc, err := c.grabConn()
if err != nil {
return nil, err
}
c.closemu.RLock()
return c.db.beginDC(ctx, dc, c.closemuRUnlockCondReleaseConn, opts)
}
// closemuRUnlockCondReleaseConn read unlocks closemu
// as the sql operation is done with the dc.
func (c *Conn) closemuRUnlockCondReleaseConn(err error) {
c.closemu.RUnlock()
if err == driver.ErrBadConn {
c.close(err)
}
}
func (c *Conn) close(err error) error {
if !atomic.CompareAndSwapInt32(&c.done, 0, 1) {
return ErrConnDone
}
// Lock around releasing the driver connection
// to ensure all queries have been stopped before doing so.
c.closemu.Lock()
defer c.closemu.Unlock()
c.dc.releaseConn(err)
c.dc = nil
c.db = nil
return err
}
// Close returns the connection to the connection pool.
// All operations after a Close will return with ErrConnDone.
// Close is safe to call concurrently with other operations and will
// block until all other operations finish. It may be useful to first
// cancel any used context and then call close directly after.
func (c *Conn) Close() error {
return c.close(nil)
}
// Tx is an in-progress database transaction. // Tx is an in-progress database transaction.
// //
// A transaction must end with a call to Commit or Rollback. // A transaction must end with a call to Commit or Rollback.
......
...@@ -139,6 +139,7 @@ func closeDB(t testing.TB, db *DB) { ...@@ -139,6 +139,7 @@ func closeDB(t testing.TB, db *DB) {
t.Errorf("Error closing fakeConn: %v", err) t.Errorf("Error closing fakeConn: %v", err)
} }
}) })
db.mu.Lock()
for i, dc := range db.freeConn { for i, dc := range db.freeConn {
if n := len(dc.openStmt); n > 0 { if n := len(dc.openStmt); n > 0 {
// Just a sanity check. This is legal in // Just a sanity check. This is legal in
...@@ -149,6 +150,8 @@ func closeDB(t testing.TB, db *DB) { ...@@ -149,6 +150,8 @@ func closeDB(t testing.TB, db *DB) {
t.Errorf("while closing db, freeConn %d/%d had %d open stmts; want 0", i, len(db.freeConn), n) t.Errorf("while closing db, freeConn %d/%d had %d open stmts; want 0", i, len(db.freeConn), n)
} }
} }
db.mu.Unlock()
err := db.Close() err := db.Close()
if err != nil { if err != nil {
t.Fatalf("error closing DB: %v", err) t.Fatalf("error closing DB: %v", err)
...@@ -1298,6 +1301,69 @@ func TestTxErrBadConn(t *testing.T) { ...@@ -1298,6 +1301,69 @@ func TestTxErrBadConn(t *testing.T) {
} }
} }
func TestConnQuery(t *testing.T) {
db := newTestDB(t, "people")
defer closeDB(t, db)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conn, err := db.Conn(ctx)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
var name string
err = conn.QueryRowContext(ctx, "SELECT|people|name|age=?", 3).Scan(&name)
if err != nil {
t.Fatal(err)
}
if name != "Chris" {
t.Fatalf("unexpected result, got %q want Chris", name)
}
err = conn.PingContext(ctx)
if err != nil {
t.Fatal(err)
}
}
func TestConnTx(t *testing.T) {
db := newTestDB(t, "people")
defer closeDB(t, db)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conn, err := db.Conn(ctx)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
tx, err := conn.BeginTx(ctx, nil)
if err != nil {
t.Fatal(err)
}
insertName, insertAge := "Nancy", 33
_, err = tx.ExecContext(ctx, "INSERT|people|name=?,age=?,photo=APHOTO", insertName, insertAge)
if err != nil {
t.Fatal(err)
}
err = tx.Commit()
if err != nil {
t.Fatal(err)
}
var selectName string
err = conn.QueryRowContext(ctx, "SELECT|people|name|age=?", insertAge).Scan(&selectName)
if err != nil {
t.Fatal(err)
}
if selectName != insertName {
t.Fatalf("got %q want %q", selectName, insertName)
}
}
// Tests fix for issue 2542, that we release a lock when querying on // Tests fix for issue 2542, that we release a lock when querying on
// a closed connection. // a closed connection.
func TestIssue2542Deadlock(t *testing.T) { func TestIssue2542Deadlock(t *testing.T) {
...@@ -2338,6 +2404,28 @@ func TestManyErrBadConn(t *testing.T) { ...@@ -2338,6 +2404,28 @@ func TestManyErrBadConn(t *testing.T) {
if err = stmt.Close(); err != nil { if err = stmt.Close(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Conn
db = manyErrBadConnSetup()
defer closeDB(t, db)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conn, err := db.Conn(ctx)
if err != nil {
t.Fatal(err)
}
err = conn.Close()
if err != nil {
t.Fatal(err)
}
// Ping
db = manyErrBadConnSetup()
defer closeDB(t, db)
err = db.PingContext(ctx)
if err != nil {
t.Fatal(err)
}
} }
// golang.org/issue/5718 // golang.org/issue/5718
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment