0%
November 6, 2023

Transactions, Go and sqlc

go

sql

Repository

Models

We model a bank account system by

  • Transfer. Record the cash flow
  • Entry. Record the individual account cash flow
  • Account. Record the entity in a bank system

In go they are defined by the following struct:

type Account struct {
	ID        int64     `json:"id"`
	Owner     string    `json:"owner"`
	Balance   int64     `json:"balance"`
	Currency  string    `json:"currency"`
	CreatedAt time.Time `json:"created_at"`
}

type Entry struct {
	ID        int64 `json:"id"`
	AccountID int64 `json:"account_id"`
	// can be +ve or -ve
	Amount    int64     `json:"amount"`
	CreatedAt time.Time `json:"created_at"`
}

type Transfer struct {
	ID            int64 `json:"id"`
	FromAccountID int64 `json:"from_account_id"`
	ToAccountID   int64 `json:"to_account_id"`
	// must be positive
	Amount    int64     `json:"amount"`
	CreatedAt time.Time `json:"created_at"`
}

These are generate by sqlc with the following schema (database migration by goose or any sort of similar tool):

-- +goose Up

CREATE TABLE "accounts" (
  "id" bigserial PRIMARY KEY,
  "owner" varchar NOT NULL,
  "balance" bigint NOT NULL,
  "currency" varchar NOT NULL,
  "created_at" timestamptz NOT NULL DEFAULT (now())
);

CREATE TABLE "entries" (
  "id" bigserial PRIMARY KEY,
  "account_id" bigint NOT NULL,
  "amount" bigint NOT NULL,
  "created_at" timestamptz NOT NULL DEFAULT (now())
);

CREATE TABLE "transfers" (
  "id" bigserial PRIMARY KEY,
  "from_account_id" bigint NOT NULL,
  "to_account_id" bigint NOT NULL,
  "amount" bigint NOT NULL,
  "created_at" timestamptz NOT NULL DEFAULT (now())
);

CREATE INDEX ON "accounts" ("owner");

CREATE INDEX ON "entries" ("account_id");

CREATE INDEX ON "transfers" ("from_account_id");

CREATE INDEX ON "transfers" ("to_account_id");

CREATE INDEX ON "transfers" ("from_account_id", "to_account_id");

COMMENT ON COLUMN "entries"."amount" IS 'can be +ve or -ve';

COMMENT ON COLUMN "transfers"."amount" IS 'must be positive';

ALTER TABLE "entries" ADD FOREIGN KEY ("account_id") REFERENCES "accounts" ("id");

ALTER TABLE "transfers" ADD FOREIGN KEY ("from_account_id") REFERENCES "accounts" ("id");

ALTER TABLE "transfers" ADD FOREIGN KEY ("to_account_id") REFERENCES "accounts" ("id");


-- +goose Down
DROP TABLE accounts, entries, transfers;

Transaction

Scenario
  • We will be performing a transaction repeatedly in multiple goroutines.

  • In each transaction (goroutine), we simply transfer a fixed quantity of 10 from a fixed account1 to another fixed account2.

  • We will be encountering deadlock and try to fix it.

Define Store and Transaction Execution Wrapper
type Store struct {
	*Queries
	db *sql.DB
}

func NewStore(db *sql.DB) *Store {
	return &Store{
		db:      db,
		Queries: New(db),
	}
}

func (store *Store) execTx(ctx context.Context, fn func(*Queries) error) error {
	tx, err := store.db.BeginTx(ctx, nil)
	if err != nil {
		return err
	}
	q := New(tx)
	err = fn(q)
	if err != nil {
		if rbErr := tx.Rollback(); rbErr != nil {
			return fmt.Errorf("tx err: %v, rb err: %v", err, rbErr)
		}
		return err
	}
	return tx.Commit()
}

where Queries implements a DBTX interface generated by sqlc.

TransferTxResult

The following struct is merely a record purpose:

type TransferTxResult struct {
	Transfer    Transfer
	FromAccount Account
	ToAccount   Account
	FromEntry   Entry
	ToEntry     Entry
}

We will investiage the results by storing the consequence of a transaction into this struct.

Implement a Transaction
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
	var result TransferTxResult
	err := store.execTx(ctx, func(q *Queries) error {
		var err error
		result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{
			FromAccountID: arg.FromAccountID,
			ToAccountID:   arg.ToAccountID,
			Amount:        arg.Amount,
		})

		if err != nil {
			return err
		}

		result.FromEntry, err = q.CreateEntry(ctx, CreateEntryParams{
			AccountID: arg.FromAccountID,
			Amount:    -arg.Amount,
		})

		result.ToEntry, err = q.CreateEntry(ctx, CreateEntryParams{
			AccountID: arg.ToAccountID,
			Amount:    arg.Amount,
		})

		if err != nil {
			return err
		}

and here comes the fun part:


		account1, err := q.GetAccount(ctx, arg.FromAccountID)
		if err != nil {
			return err
		}
		result.FromAccount, err = q.UpdateAccount(ctx, UpdateAccountParams{
			ID:      arg.FromAccountID,
			Balance: account1.Balance - arg.Amount,
		})

		account2, err := q.GetAccount(ctx, arg.ToAccountID)
		if err != nil {
			return err
		}
		result.ToAccount, err = q.UpdateAccount(ctx, UpdateAccountParams{
			ID:      arg.ToAccountID,
			Balance: account2.Balance + arg.Amount,
		})

		if err != nil {
			return err
		}

		return nil
	})
	return result, err
}
Run Transactions Concurrently
func TestTransferTx(t *testing.T) {
	store := NewStore(testDB)

	account1 := createRandomAccount(t)
	account2 := createRandomAccount(t)

	n := 5
	amount := int64(10)

	errChan := make(chan error)
	resultChan := make(chan TransferTxResult)

	for i := 0; i < n; i++ {
		go func() {
			result, err := store.TransferTx(context.Background(), TransferTxParams{
				FromAccountID: account1.ID,
				ToAccountID:   account2.ID,
				Amount:        amount,
			})
			errChan <- err
			resultChan <- result
		}()
	}

	for i := 0; i < n; i++ {
		err := <-errChan
		require.NoError(t, err)

		result := <-resultChan
		resultFromAcocunt := result.FromAccount
		resultToAccount := result.ToAccount

		fmt.Println("before tx:", "fromAccount", account1.Balance, "toAccount", account2.Balance, "amount", int64(i+1)*amount)
		fmt.Println("after tx: ", "fromAccount", resultFromAcocunt.Balance, "toAccount", resultToAccount.Balance)
		fmt.Println("-------")

		diff1 := account1.Balance - resultFromAcocunt.Balance
		diff2 := resultToAccount.Balance - account2.Balance

		require.Equal(t, diff1, diff2)
		require.True(t, diff1 > 0)
		require.True(t, diff1 > 0)
	}
}
Test Fails
before tx: fromAccount 650 toAccount 976 amount 10
after tx:  fromAccount 640 toAccount 986
-------
before tx: fromAccount 650 toAccount 976 amount 20
after tx:  fromAccount 630 toAccount 996
-------
before tx: fromAccount 650 toAccount 976 amount 30
after tx:  fromAccount 630 toAccount 1006
-------
--- FAIL: TestTransferTx (0.09s)
    c:\Users\user\Repos\Go\2023-11-04-api-time\internal\db\store_test.go:62:
        	Error Trace:	c:/Users/user/Repos/Go/2023-11-04-api-time/internal/db/store_test.go:62
        	Error:      	Not equal:
        	            	expected: 20
        	            	actual  : 30
        	Test:       	TestTransferTx
FAIL
FAIL	github.com/machingclee/2023-11-04-go-gin/internal/db	0.283s

Transactions in Raw SQL and Transition into Go

SQL

We ssh into pgSQL server by

docker exec -it <container-name> bash
root@3b68a2663049:/# psql -d pgdb -U pguser

Let's start a transaction on the first terminal:

pgdb=# begin;
BEGIN
pgdb=*# select * from accounts where id = 1 for update;

Next if we repeat the same transaction at another terminal, we get blocked until the first transaction has committed the result.

We continue to update the account and commit the result:

pgdb=# begin;
BEGIN
pgdb=*# select * from accounts where id = 1 for update;
 id | owner  | balance | currency |          created_at
----+--------+---------+----------+-------------------------------
  1 | agqcdw |       0 | USD      | 2023-11-05 12:21:30.566415+00
(1 row)

pgdb=*# update accounts set balance = 500 where id = 1;
UPDATE 1
pgdb=*# commit;
COMMIT
pgdb=#

The second transaction is now unblocked immediately.

Same Scenario in Go

Let's update this raw SQL in our sql/queries/account and use sqlc to generate new GetAccountForUpdate function:

-- name: GetAccountForUpdate :one
SELECT * FROM accounts
WHERE id = $1 LIMIT 1
FOR UPDATE;

Now a new version of GetAccount has been generated:

const getAccountForUpdate = `-- name: GetAccountForUpdate :one
SELECT id, owner, balance, currency, created_at FROM accounts
WHERE id = $1 LIMIT 1
FOR UPDATE
`

func (q *Queries) GetAccountForUpdate(ctx context.Context, id int64) (Account, error) {
	row := q.db.QueryRowContext(ctx, getAccountForUpdate, id)
	var i Account
	err := row.Scan(
		&i.ID,
		&i.Owner,
		&i.Balance,
		&i.Currency,
		&i.CreatedAt,
	)
	return i, err
}

Let's rerun our test:

Deadlock
before tx: fromAccount 488 toAccount 962 amount 10
after tx:  fromAccount 478 toAccount 972
-------
--- FAIL: TestTransferTx (1.07s)
    c:\Users\user\Repos\Go\2023-11-04-api-time\internal\db\store_test.go:37:
        	Error Trace:	c:/Users/user/Repos/Go/2023-11-04-api-time/internal/db/store_test.go:37
        	Error:      	Received unexpected error:
        	            	pq: deadlock detected
        	Test:       	TestTransferTx

This is because in a course of a transaction we are going to create a record in transfers table, which refers to an entity in accounts table (the foreign key constraint). Since we have not committed any transaction yet, any select statement selecting that particular account for update will be locked, resulting in a dead lock.

Solution

We just need to inform postgres that we are updating that row without touching the primary key, then our transaction will not need to acquire a lock for updating the account entity:

-- name: GetAccountForUpdate :one
SELECT * FROM accounts
WHERE id = $1 LIMIT 1
FOR NO KEY UPDATE;

sqlc genreate and now if we run the test again:

ok  	github.com/machingclee/2023-11-04-go-gin/internal/db	0.288s

Combine Get and Update SQL into One, sqlc Trick to Change Parameter Name

In TransferTx we have Get and Update the account in two separate queries.

If we know that we are just doing an add operation (+ve or -ve), we may combine the queries into one:

-- name: AddAccountBalance :one
UPDATE accounts
SET balance = balance + $2
WHERE id = $1
RETURNING *;

By sqlc generate we get

type AddAccountBalanceParams struct {
	ID      int64 `json:"id"`
	Balance int64 `json:"balance"`
}

The field name Balance is very confusing and it should be amount instead.

sqlc allows us to change parameter name arbitrarily:

-- name: AddAccountBalance :one
UPDATE accounts
SET balance = balance + sqlc.arg(amount)
WHERE id = sqlc.arg(id)
RETURNING *;

Now sqlc geneate again:

type AddAccountBalanceParams struct {
	Amount int64 `json:"amount"`
	ID     int64 `json:"id"`
}

as desired.

Now our transaction has changed from

		account1, err := q.GetAccountForUpdate(ctx, arg.FromAccountID)
		if err != nil {
			return err
		}
		result.FromAccount, err = q.UpdateAccount(ctx, UpdateAccountParams{
			ID:      arg.FromAccountID,
			Balance: account1.Balance - arg.Amount,
		})

		account2, err := q.GetAccountForUpdate(ctx, arg.ToAccountID)
		if err != nil {
			return err
		}
		result.ToAccount, err = q.UpdateAccount(ctx, UpdateAccountParams{
			ID:      arg.ToAccountID,
			Balance: account2.Balance + arg.Amount,
		})

into

		result.FromAccount, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
			ID:     arg.FromAccountID,
			Amount: -arg.Amount,
		})

		result.ToAccount, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
			ID:     arg.ToAccountID,
			Amount: arg.Amount,
		})

Transaction Isolation Levels

  • Read Uncomitted.
    • In this level uncommitted changes in a course of transaction is visible by other transaction, which results in a dirty read.
    • pgSQL does not have this level. Even though you can set isolation level to Read Uncommitted, which is the same as Read Committed.
  • Read Committed.
    • In this level dirty read is avoided but the same select query may result in two different results of the same row, an non-repeatable read phenomenon occurs.
    • Worse still, same query can result in different number of rows, which is a phantom-read phenomenon.
  • Repeatable Read.
    • In this level both non-repeatable read and phantom-read are prevented, but the data of the select query from a "second" transaction will keep out-dated even "first" transaction has committed.
    • If we try to run an update in the second transaction, different database engines have different interpretation.
      • In mySQL we get consistent updated result from the perspective of database, but from the view of second transaction updated values are inconsistent.
      • In pgSQL we directly get an error.
  • Serializable.
    • All select statement in this level is forced to be SELECT FOR SHARE, meaning all other transactions are only allowed to read.
    • A commission of a read-only transaction is needed to release the lock for any update statement from other transactions.

Reference