Repository
Models
We model a bank account system by
- Transfer. Record the cash flow
- Entry. Record the individual account cash flow
- Account. Record the entity in a bank system
In go they are defined by the following struct:
type Account struct { ID int64 `json:"id"` Owner string `json:"owner"` Balance int64 `json:"balance"` Currency string `json:"currency"` CreatedAt time.Time `json:"created_at"` } type Entry struct { ID int64 `json:"id"` AccountID int64 `json:"account_id"` // can be +ve or -ve Amount int64 `json:"amount"` CreatedAt time.Time `json:"created_at"` } type Transfer struct { ID int64 `json:"id"` FromAccountID int64 `json:"from_account_id"` ToAccountID int64 `json:"to_account_id"` // must be positive Amount int64 `json:"amount"` CreatedAt time.Time `json:"created_at"` }
These are generate by sqlc
with the following schema (database migration by goose
or any sort of similar tool):
-- +goose Up CREATE TABLE "accounts" ( "id" bigserial PRIMARY KEY, "owner" varchar NOT NULL, "balance" bigint NOT NULL, "currency" varchar NOT NULL, "created_at" timestamptz NOT NULL DEFAULT (now()) ); CREATE TABLE "entries" ( "id" bigserial PRIMARY KEY, "account_id" bigint NOT NULL, "amount" bigint NOT NULL, "created_at" timestamptz NOT NULL DEFAULT (now()) ); CREATE TABLE "transfers" ( "id" bigserial PRIMARY KEY, "from_account_id" bigint NOT NULL, "to_account_id" bigint NOT NULL, "amount" bigint NOT NULL, "created_at" timestamptz NOT NULL DEFAULT (now()) ); CREATE INDEX ON "accounts" ("owner"); CREATE INDEX ON "entries" ("account_id"); CREATE INDEX ON "transfers" ("from_account_id"); CREATE INDEX ON "transfers" ("to_account_id"); CREATE INDEX ON "transfers" ("from_account_id", "to_account_id"); COMMENT ON COLUMN "entries"."amount" IS 'can be +ve or -ve'; COMMENT ON COLUMN "transfers"."amount" IS 'must be positive'; ALTER TABLE "entries" ADD FOREIGN KEY ("account_id") REFERENCES "accounts" ("id"); ALTER TABLE "transfers" ADD FOREIGN KEY ("from_account_id") REFERENCES "accounts" ("id"); ALTER TABLE "transfers" ADD FOREIGN KEY ("to_account_id") REFERENCES "accounts" ("id"); -- +goose Down DROP TABLE accounts, entries, transfers;
Transaction
Scenario
-
We will be performing a transaction repeatedly in multiple goroutines.
-
In each transaction (goroutine), we simply transfer a fixed quantity of 10 from a fixed
account1
to another fixedaccount2
. -
We will be encountering
deadlock
and try to fix it.
Define Store and Transaction Execution Wrapper
type Store struct { *Queries db *sql.DB } func NewStore(db *sql.DB) *Store { return &Store{ db: db, Queries: New(db), } } func (store *Store) execTx(ctx context.Context, fn func(*Queries) error) error { tx, err := store.db.BeginTx(ctx, nil) if err != nil { return err } q := New(tx) err = fn(q) if err != nil { if rbErr := tx.Rollback(); rbErr != nil { return fmt.Errorf("tx err: %v, rb err: %v", err, rbErr) } return err } return tx.Commit() }
where Queries
implements a DBTX
interface generated by sqlc
.
TransferTxResult
The following struct is merely a record purpose:
type TransferTxResult struct { Transfer Transfer FromAccount Account ToAccount Account FromEntry Entry ToEntry Entry }
We will investiage the results by storing the consequence of a transaction into this struct.
Implement a Transaction
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) { var result TransferTxResult err := store.execTx(ctx, func(q *Queries) error { var err error result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{ FromAccountID: arg.FromAccountID, ToAccountID: arg.ToAccountID, Amount: arg.Amount, }) if err != nil { return err } result.FromEntry, err = q.CreateEntry(ctx, CreateEntryParams{ AccountID: arg.FromAccountID, Amount: -arg.Amount, }) result.ToEntry, err = q.CreateEntry(ctx, CreateEntryParams{ AccountID: arg.ToAccountID, Amount: arg.Amount, }) if err != nil { return err }
and here comes the fun part:
account1, err := q.GetAccount(ctx, arg.FromAccountID) if err != nil { return err } result.FromAccount, err = q.UpdateAccount(ctx, UpdateAccountParams{ ID: arg.FromAccountID, Balance: account1.Balance - arg.Amount, }) account2, err := q.GetAccount(ctx, arg.ToAccountID) if err != nil { return err } result.ToAccount, err = q.UpdateAccount(ctx, UpdateAccountParams{ ID: arg.ToAccountID, Balance: account2.Balance + arg.Amount, }) if err != nil { return err } return nil }) return result, err }
Run Transactions Concurrently
func TestTransferTx(t *testing.T) { store := NewStore(testDB) account1 := createRandomAccount(t) account2 := createRandomAccount(t) n := 5 amount := int64(10) errChan := make(chan error) resultChan := make(chan TransferTxResult) for i := 0; i < n; i++ { go func() { result, err := store.TransferTx(context.Background(), TransferTxParams{ FromAccountID: account1.ID, ToAccountID: account2.ID, Amount: amount, }) errChan <- err resultChan <- result }() } for i := 0; i < n; i++ { err := <-errChan require.NoError(t, err) result := <-resultChan resultFromAcocunt := result.FromAccount resultToAccount := result.ToAccount fmt.Println("before tx:", "fromAccount", account1.Balance, "toAccount", account2.Balance, "amount", int64(i+1)*amount) fmt.Println("after tx: ", "fromAccount", resultFromAcocunt.Balance, "toAccount", resultToAccount.Balance) fmt.Println("-------") diff1 := account1.Balance - resultFromAcocunt.Balance diff2 := resultToAccount.Balance - account2.Balance require.Equal(t, diff1, diff2) require.True(t, diff1 > 0) require.True(t, diff1 > 0) } }
Test Fails
before tx: fromAccount 650 toAccount 976 amount 10 after tx: fromAccount 640 toAccount 986 ------- before tx: fromAccount 650 toAccount 976 amount 20 after tx: fromAccount 630 toAccount 996 ------- before tx: fromAccount 650 toAccount 976 amount 30 after tx: fromAccount 630 toAccount 1006 ------- --- FAIL: TestTransferTx (0.09s) c:\Users\user\Repos\Go\2023-11-04-api-time\internal\db\store_test.go:62: Error Trace: c:/Users/user/Repos/Go/2023-11-04-api-time/internal/db/store_test.go:62 Error: Not equal: expected: 20 actual : 30 Test: TestTransferTx FAIL FAIL github.com/machingclee/2023-11-04-go-gin/internal/db 0.283s
Transactions in Raw SQL and Transition into Go
SQL
We ssh
into pgSQL
server by
docker exec -it <container-name> bash root@3b68a2663049:/# psql -d pgdb -U pguser
Let's start a transaction on the first terminal:
pgdb=# begin; BEGIN pgdb=*# select * from accounts where id = 1 for update;
Next if we repeat the same transaction at another terminal, we get blocked until the first transaction has committed the result.
We continue to update the account and commit the result:
pgdb=# begin; BEGIN pgdb=*# select * from accounts where id = 1 for update; id | owner | balance | currency | created_at ----+--------+---------+----------+------------------------------- 1 | agqcdw | 0 | USD | 2023-11-05 12:21:30.566415+00 (1 row) pgdb=*# update accounts set balance = 500 where id = 1; UPDATE 1 pgdb=*# commit; COMMIT pgdb=#
The second transaction is now unblocked immediately.
Same Scenario in Go
Let's update this raw SQL in our sql/queries/account
and use sqlc
to generate new GetAccountForUpdate
function:
-- name: GetAccountForUpdate :one SELECT * FROM accounts WHERE id = $1 LIMIT 1 FOR UPDATE;
Now a new version of GetAccount has been generated:
const getAccountForUpdate = `-- name: GetAccountForUpdate :one SELECT id, owner, balance, currency, created_at FROM accounts WHERE id = $1 LIMIT 1 FOR UPDATE ` func (q *Queries) GetAccountForUpdate(ctx context.Context, id int64) (Account, error) { row := q.db.QueryRowContext(ctx, getAccountForUpdate, id) var i Account err := row.Scan( &i.ID, &i.Owner, &i.Balance, &i.Currency, &i.CreatedAt, ) return i, err }
Let's rerun our test:
Deadlock
before tx: fromAccount 488 toAccount 962 amount 10 after tx: fromAccount 478 toAccount 972 ------- --- FAIL: TestTransferTx (1.07s) c:\Users\user\Repos\Go\2023-11-04-api-time\internal\db\store_test.go:37: Error Trace: c:/Users/user/Repos/Go/2023-11-04-api-time/internal/db/store_test.go:37 Error: Received unexpected error: pq: deadlock detected Test: TestTransferTx
This is because in a course of a transaction we are going to create a record in transfers table, which refers to an entity in accounts table (the foreign key constraint). Since we have not committed any transaction yet, any select statement selecting that particular account for update will be locked, resulting in a dead lock.
Solution
We just need to inform postgres that we are updating that row without touching the primary key, then our transaction will not need to acquire a lock for updating the account entity:
-- name: GetAccountForUpdate :one SELECT * FROM accounts WHERE id = $1 LIMIT 1 FOR NO KEY UPDATE;
sqlc genreate
and now if we run the test again:
ok github.com/machingclee/2023-11-04-go-gin/internal/db 0.288s
Combine Get and Update SQL into One, sqlc Trick to Change Parameter Name
In TransferTx
we have Get
and Update
the account in two separate queries.
If we know that we are just doing an add operation (+ve or -ve), we may combine the queries into one:
-- name: AddAccountBalance :one UPDATE accounts SET balance = balance + $2 WHERE id = $1 RETURNING *;
By sqlc generate
we get
type AddAccountBalanceParams struct { ID int64 `json:"id"` Balance int64 `json:"balance"` }
The field name Balance
is very confusing and it should be amount
instead.
sqlc
allows us to change parameter name arbitrarily:
-- name: AddAccountBalance :one UPDATE accounts SET balance = balance + sqlc.arg(amount) WHERE id = sqlc.arg(id) RETURNING *;
Now sqlc geneate
again:
type AddAccountBalanceParams struct { Amount int64 `json:"amount"` ID int64 `json:"id"` }
as desired.
Now our transaction has changed from
account1, err := q.GetAccountForUpdate(ctx, arg.FromAccountID) if err != nil { return err } result.FromAccount, err = q.UpdateAccount(ctx, UpdateAccountParams{ ID: arg.FromAccountID, Balance: account1.Balance - arg.Amount, }) account2, err := q.GetAccountForUpdate(ctx, arg.ToAccountID) if err != nil { return err } result.ToAccount, err = q.UpdateAccount(ctx, UpdateAccountParams{ ID: arg.ToAccountID, Balance: account2.Balance + arg.Amount, })
into
result.FromAccount, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{ ID: arg.FromAccountID, Amount: -arg.Amount, }) result.ToAccount, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{ ID: arg.ToAccountID, Amount: arg.Amount, })
Transaction Isolation Levels
- Read Uncomitted.
- In this level uncommitted changes in a course of transaction is visible by other transaction, which results in a dirty read.
pgSQL
does not have this level. Even though you can set isolation level toRead Uncommitted
, which is the same asRead Committed
.
- Read Committed.
- In this level dirty read is avoided but the same select query may result in two different results of the same row, an non-repeatable read phenomenon occurs.
- Worse still, same query can result in different number of rows, which is a phantom-read phenomenon.
- Repeatable Read.
- In this level both non-repeatable read and phantom-read are prevented, but the data of the select query from a "second" transaction will keep out-dated even "first" transaction has committed.
- If we try to run an update in the second transaction, different database engines have different interpretation.
- In
mySQL
we get consistent updated result from the perspective of database, but from the view of second transaction updated values are inconsistent. - In
pgSQL
we directly get an error.
- In
- Serializable.
- All select statement in this level is forced to be
SELECT FOR SHARE
, meaning all other transactions are only allowed to read. - A commission of a read-only transaction is needed to release the lock for any update statement from other transactions.
- All select statement in this level is forced to be