Skip to content

Refactor transaction timestamp management for cross shard data#313

Open
bootjp wants to merge 4 commits intomainfrom
feature/multi-shard-transaction
Open

Refactor transaction timestamp management for cross shard data#313
bootjp wants to merge 4 commits intomainfrom
feature/multi-shard-transaction

Conversation

@bootjp
Copy link
Owner

@bootjp bootjp commented Feb 15, 2026

No description provided.

Copilot AI review requested due to automatic review settings February 15, 2026 11:42
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @bootjp, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request refactors the core transaction timestamp management within the system, enabling robust distributed transaction capabilities. It introduces explicit phases for transactions (prepare, commit, abort) and manages their state using dedicated metadata and internal keys. The changes ensure consistent timestamping across transactional operations and provide mechanisms for automatic lock resolution during data access, significantly enhancing the system's transactional integrity and scalability.

Highlights

  • Distributed Transaction Support: Introduced comprehensive support for distributed transactions across multiple shards, including prewrite, commit, and abort phases, orchestrated by the ShardedCoordinator.
  • Refactored Timestamp Management: Significantly refactored how transaction timestamps are managed, separating raw request timestamping from transactional timestamping, and introducing explicit commit and abort timestamps.
  • Transaction Metadata and Internal Keys: Implemented new data structures (TxnMeta, txnLock, txnIntent) and encoding/decoding logic for transaction metadata, along with a system of internal keys (!txn|lock|, !txn|int|, etc.) to manage transaction state within the key-value store.
  • MVCC Store Enhancements: Updated the MVCC store to correctly handle versioned values, ensuring they are inserted in sorted order by timestamp and improving the alignCommitTS logic.
  • Transaction Lock Resolution: Added logic to ShardStore to automatically resolve transaction locks during reads and scans, determining the transaction's status (committed, rolled back, or pending) and applying the appropriate resolution.
Changelog
  • adapter/internal.go
    • Imported the bytes package.
    • Refactored stampTimestamps into stampRawTimestamps and stampTxnTimestamps for clearer timestamp assignment logic.
    • Added helper functions forwardedTxnStartTS, forwardedTxnMetaMutation, and fillForwardedTxnCommitTS to manage transaction metadata and commit timestamps for forwarded requests.
  • kv/coordinator.go
    • Imported the sort package.
    • Modified dispatchTxn to incorporate primary key, commit timestamp, and lock TTL into transaction requests.
    • Updated redirect to use the newly introduced primaryKeyForElems for transaction requests.
    • Refactored txnRequests to construct prepare and commit mutations with transaction metadata, primary key, and commit timestamp.
    • Added primaryKeyForElems function to determine the primary key from a list of elements.
    • Added bytesCompare utility function for byte slice comparison.
  • kv/fsm.go
    • Imported the bytes package.
    • Modified Apply to correctly extract the commit timestamp from transaction metadata for commit and abort phases.
    • Added validation in handleRawRequest to prevent mutations of transaction-internal keys and check for conflicting locks.
    • Updated handleCommitRequest and handleAbortRequest to utilize transaction metadata and pass commitTS for aborts.
    • Introduced uniqueMutations to filter duplicate mutations within a request.
    • Added buildPrepareStoreMutations, buildCommitStoreMutations, and buildAbortCleanupStoreMutations to construct store-level mutations for different transaction phases.
    • Implemented appendRollbackRecord to record transaction rollbacks.
    • Added numerous helper functions for transaction-specific operations, including prepareTxnMutation, txnIntentFromPBMutation, txnCleanupMutations, txnLockForCommit, txnIntentForCommit, storeMutationForIntent, commitTxnKeyMutations, shouldClearAbortKey, splitTxnMeta, and assertNoConflictingTxnLock.
  • kv/fsm_occ_test.go
    • Updated a test case to align with the new transaction prepare phase and metadata structure, changing a commit attempt to a prewrite attempt.
  • kv/hlc_wall.go
    • Added hlcWallNow function to retrieve the current wall clock time.
    • Added hlcWallFromNowMs function to calculate a future timestamp based on a millisecond delta.
  • kv/shard_key.go
    • Modified routeKey to extract the embedded logical user key from transaction-internal keys for correct shard routing.
  • kv/shard_store.go
    • Refactored GetAt to use localGetAt and leaderGetAt for improved read path logic.
    • Added isVerifiedRaftLeader helper to check for verified Raft leadership.
    • Introduced leaderGetAt to handle transaction lock resolution for non-transactional keys during leader reads.
    • Added localGetAt for direct access to the underlying store.
    • Modified scanRouteAt to apply resolveScanLocks to scan results.
    • Implemented maybeResolveTxnLock to resolve transaction locks during reads.
    • Added resolveScanLocks to filter transaction-internal keys and resolve locks during scans.
    • Introduced txnStatus enum and primaryTxnStatus function to determine the state of a primary transaction.
    • Added txnCommitTS, hasTxnRollback, and loadTxnLock for querying transaction records.
    • Implemented bestEffortAbortPrimary and applyTxnResolution for transaction cleanup and resolution.
    • Added cleanupTS to determine an appropriate timestamp for transaction cleanup.
  • kv/sharded_coordinator.go
    • Imported the bytes package.
    • Modified Dispatch to handle transactional requests via a new dispatchTxn method.
    • Added dispatchTxn to orchestrate distributed transactions through prewrite, primary commit, and secondary commit phases.
    • Introduced preparedGroup struct to track prepared transaction groups.
    • Added prewriteTxn for the prepare phase of distributed transactions.
    • Added commitPrimaryTxn for committing the primary key of a distributed transaction.
    • Added commitSecondaryTxns for committing secondary keys across shards.
    • Implemented abortPreparedTxn for rolling back prepared transactions.
    • Added txnGroupForID to retrieve a transaction group by ID.
    • Added nextTxnTSAfter to generate the next transaction timestamp after a given start timestamp.
    • Added abortTSFrom to derive an abort timestamp from a commit timestamp.
    • Added txnMetaMutation to create transaction metadata mutations.
    • Added engineGroupIDForKey to determine the group ID for a given key.
    • Updated txnLogs to handle single-shard transactions using the new metadata structure.
    • Added primaryKeyFromMutations to identify the primary key from a set of mutations.
    • Added keyMutations to extract unique keys from mutations.
  • kv/sharded_integration_test.go
    • Updated TestShardedCoordinatorDispatch_RejectsCrossShardTxn to reflect the new distributed transaction behavior, asserting the outcome of a cross-shard transaction rather than its rejection.
  • kv/transaction.go
    • Modified Commit to conditionally attempt transactional cleanup (Abort) only for actual transactional batches.
    • Modified Abort to properly construct abort requests using extracted transaction metadata and a derived abort timestamp.
    • Added extractTxnMeta function to parse transaction metadata from a list of mutations.
  • kv/txn_codec.go
    • Added TxnMeta struct for transaction metadata and implemented its EncodeTxnMeta and DecodeTxnMeta functions.
    • Added txnLock struct for transaction locks and implemented its encodeTxnLock and decodeTxnLock functions.
    • Added txnIntent struct for transaction intents and implemented its encodeTxnIntent and decodeTxnIntent functions.
    • Implemented encodeTxnCommitRecord and decodeTxnCommitRecord for transaction commit records.
    • Implemented encodeTxnRollbackRecord for transaction rollback records.
    • Added u64ToInt helper function for safe uint64 to int conversion.
  • kv/txn_consts.go
    • Added constants for defaultTxnLockTTLms and various mutation factors for transaction prepare, commit, and abort phases.
  • kv/txn_errors.go
    • Added new error types specific to transaction operations, such as ErrTxnMetaMissing, ErrTxnInvalidMeta, ErrTxnLocked, ErrTxnCommitTSRequired, ErrTxnAlreadyCommitted, and ErrTxnPrimaryKeyRequired.
  • kv/txn_keys.go
    • Defined prefixes for transaction-internal keys (txnLockPrefix, txnIntentPrefix, txnCommitPrefix, txnRollbackPrefix, txnMetaPrefix).
    • Implemented functions to construct transaction-internal keys (txnLockKey, txnIntentKey, txnCommitKey, txnRollbackKey).
    • Added isTxnInternalKey and isTxnMetaKey functions to identify transaction-related keys.
    • Added txnRouteKey to extract the logical user key from transaction-internal keys for routing purposes.
  • store/lsm_store.go
    • Removed logic in alignCommitTS that forced the commit timestamp to be strictly greater than the last commit timestamp plus one.
  • store/mvcc_store.go
    • Imported the sort package.
    • Modified putVersionLocked and deleteVersionLocked to use insertVersionSorted for maintaining sorted versions.
    • Added insertVersionSorted function to insert versioned values into a sorted slice.
    • Modified alignCommitTS to update lastCommitTS only if the new commitTS is greater.
  • store/mvcc_store_prop_test.go
    • Updated a property test for Delete to ensure the deletion timestamp (delTS) is strictly after the actual put timestamp (actualPutTS).
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a major and impressive refactoring to support distributed transactions using a 2PC-like protocol. The changes are extensive, touching coordination, state machine logic, and storage layers. The introduction of explicit transaction metadata, locks, and intents is well-structured into new txn_* files. The overall design moves timestamp management to the coordinator level, which is a solid choice for a distributed system.

I've found a few areas for improvement:

  • Some code duplication that can be refactored.
  • A couple of performance issues, one of which is significant for scans.
  • A potential bug in primary key selection logic.

Addressing these points will make the new transaction implementation more robust and performant. Great work on this complex feature.

Comment on lines 325 to 348
func (s *ShardStore) resolveScanLocks(ctx context.Context, kvs []*store.KVPair, ts uint64) ([]*store.KVPair, error) {
if len(kvs) == 0 {
return kvs, nil
}
out := make([]*store.KVPair, 0, len(kvs))
for _, kvp := range kvs {
if kvp == nil {
continue
}
// Filter txn-internal keys from user-facing scans.
if isTxnInternalKey(kvp.Key) {
continue
}
v, err := s.GetAt(ctx, kvp.Key, ts)
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
continue
}
return nil, err
}
out = append(out, &store.KVPair{Key: kvp.Key, Value: v})
}
return out, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The resolveScanLocks function iterates over scan results and calls s.GetAt for each key-value pair (line 338). This creates a N+1 query problem, which can severely degrade performance for scans that return many keys, as each GetAt call may trigger its own lock resolution logic.

To improve performance, consider implementing a batch mechanism to resolve locks for all keys found in the scan at once.

Comment on lines 471 to 485
func primaryKeyFromMutations(muts []*pb.Mutation) []byte {
if len(muts) == 0 {
return nil
}
primary := muts[0].Key
for _, m := range muts[1:] {
if m == nil || len(m.Key) == 0 {
continue
}
if bytes.Compare(m.Key, primary) < 0 {
primary = m.Key
}
}
return primary
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There's a potential bug in primaryKeyFromMutations. If muts[0] is nil or has an empty key, it could lead to incorrect behavior or a panic. The function should be made more robust to handle such cases.

Additionally, this function's logic for selecting a primary key is different from primaryKeyForElems used in the distributed transaction path, which is inconsistent. primaryKeyForElems also handles duplicate keys, which this function does not.

Consider refactoring this to be more robust and consistent.

func primaryKeyFromMutations(muts []*pb.Mutation) []byte {
	var primary []byte
	for _, m := range muts {
		if m == nil || len(m.Key) == 0 {
			continue
		}
		if primary == nil || bytes.Compare(m.Key, primary) < 0 {
			primary = m.Key
		}
	}
	return primary
}

Comment on lines 157 to 164
for _, m := range metaMutations {
meta, err := kv.DecodeTxnMeta(m.Value)
if err != nil {
continue
}
meta.CommitTS = commitTS
m.Value = kv.EncodeTxnMeta(meta)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This loop re-decodes the TxnMeta for each mutation, which was already decoded in the first loop (lines 133-146). This is inefficient.

To improve performance, you could store the decoded meta object along with the mutation pointer in the first loop and reuse it here. For example:

type metaToUpdate struct {
    m    *pb.Mutation
    meta kv.TxnMeta
}
// ...
// In the first loop:
// metaToUpdateList = append(metaToUpdateList, metaToUpdate{m: m, meta: meta})
// ...
// In this loop:
for _, item := range metaToUpdateList {
    item.meta.CommitTS = commitTS
    item.m.Value = kv.EncodeTxnMeta(item.meta)
}

Comment on lines 304 to 326
func bytesCompare(a, b []byte) int {
min := len(a)
if len(b) < min {
min = len(b)
}
for i := 0; i < min; i++ {
if a[i] == b[i] {
continue
}
if a[i] < b[i] {
return -1
}
return 1
}
switch {
case len(a) < len(b):
return -1
case len(a) > len(b):
return 1
default:
return 0
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This bytesCompare function is a reimplementation of the standard library's bytes.Compare. To reduce code duplication and improve maintainability, you should remove this function and use bytes.Compare directly. For example, line 300 can be changed to use it.

kv/fsm.go Outdated
Comment on lines 489 to 501
func splitTxnMeta(muts []*pb.Mutation) (TxnMeta, []*pb.Mutation, error) {
if len(muts) == 0 || muts[0] == nil || len(muts[0].Key) == 0 {
return TxnMeta{}, nil, errors.WithStack(ErrTxnMetaMissing)
}
if !isTxnMetaKey(muts[0].Key) {
return TxnMeta{}, nil, errors.WithStack(ErrTxnMetaMissing)
}
meta, err := DecodeTxnMeta(muts[0].Value)
if err != nil {
return TxnMeta{}, nil, errors.WithStack(errors.Wrap(err, "decode txn meta"))
}
return meta, muts[1:], nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This splitTxnMeta function is identical to extractTxnMeta in kv/transaction.go. This code duplication should be avoided.

Consider moving this logic to a single, shared function in a package like kv/txn_codec.go and using it in both places to improve maintainability.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors transaction timestamp handling and adds MVCC-level plumbing for transactional metadata/locks so commits/aborts can carry an explicit commit timestamp and reads/scans can resolve transactional locks.

Changes:

  • Remove “auto-bump” commit timestamp alignment in MVCC stores and keep MVCC versions ordered by timestamp.
  • Introduce transaction internal keys/codecs (meta/lock/intent/commit/rollback) and propagate commit timestamps through coordinator/FSM paths.
  • Add shard-store read/scan lock-resolution behavior and update integration/property tests for the new semantics.

Reviewed changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
store/mvcc_store_prop_test.go Ensures delete timestamps are strictly after puts in property tests.
store/mvcc_store.go Keeps per-key version lists sorted; adjusts commitTS alignment behavior.
store/lsm_store.go Removes commitTS auto-bump; relies on max-tracking for LastCommitTS.
kv/txn_keys.go Adds txn internal key prefixes, key builders, and routing extraction.
kv/txn_errors.go Adds typed txn errors for meta/locking/commitTS validation.
kv/txn_consts.go Adds txn constants (TTL default, mutation sizing factors).
kv/txn_codec.go Adds encoding/decoding for txn meta, locks, intents, and records.
kv/transaction.go Updates commit/abort behavior and txn meta extraction for cleanup.
kv/sharded_integration_test.go Updates cross-shard txn expectations (now succeeds) but needs renaming.
kv/sharded_coordinator.go Implements distributed txn dispatch (prewrite/commit/abort helpers).
kv/shard_store.go Adds read-time lock resolution and filters txn-internal keys from scans.
kv/shard_key.go Extends routeKey() to route txn-internal keys by embedded user key.
kv/hlc_wall.go Adds wall-time helpers for TTL/cleanup timestamp derivation.
kv/fsm_occ_test.go Updates OCC conflict test to match prepare-phase conflicts.
kv/fsm.go Implements prepare/commit/abort txn phases with locks/intents and commitTS enforcement.
kv/coordinator.go Refactors txn request construction to include meta + explicit commitTS.
adapter/internal.go Centralizes timestamp stamping for forwarded raw/txn requests and fills commitTS.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +295 to +299
func (s *ShardStore) maybeResolveTxnLock(ctx context.Context, g *ShardGroup, key []byte, readTS uint64) error {
// Only consider locks visible at the read timestamp.
lockBytes, err := g.Store.GetAt(ctx, txnLockKey(key), readTS)
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new lock-resolution path introduced here (maybeResolveTxnLock and related helpers) changes observable read semantics (auto-resolving committed/rolled-back txns, returning ErrTxnLocked, filtering txn-internal keys on scans), but there are no focused tests exercising these behaviors. Please add unit/integration coverage for pending-lock reads, commit-driven resolution, and scan filtering of !txn|... keys.

Copilot uses AI. Check for mistakes.
Comment on lines 109 to 149
t.Fatalf("expected key x missing, got %v", err)
if v, err := shardStore.GetAt(ctx, []byte("x"), readTS); err != nil || string(v) != "v2" {
t.Fatalf("get x: %v %v", v, err)
}
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test name TestShardedCoordinatorDispatch_RejectsCrossShardTxn no longer matches the assertions: the test now expects a cross-shard transactional batch to succeed and write both keys. Rename the test (and any related comments) to reflect the new behavior, otherwise the suite becomes misleading for future changes.

Copilot uses AI. Check for mistakes.
Comment on lines 170 to 174
if err != nil {
return nil, errors.WithStack(err)
}
return kvs, nil
return s.resolveScanLocks(ctx, kvs, ts)
}
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scanRouteAt now calls resolveScanLocks for the Raft leader/proxy paths, but the earlier g.Raft == nil branch still returns raw ScanAt results without filtering txn-internal keys. That makes scan semantics depend on whether Raft is configured and can leak !txn|... keys in local/test setups. Consider applying the same txn-internal filtering (or resolveScanLocks) in the no-Raft branch for consistent behavior.

Copilot uses AI. Check for mistakes.
continue
}
startTS := req.Ts
abortTS := startTS + 1
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransactionManager.Abort derives abortTS as startTS + 1 without overflow handling. If startTS is math.MaxUint64, this wraps to 0 and will fail the FSM requirement that abort/commit timestamps are strictly greater than startTS. Use the same overflow-safe logic as abortTSFrom (or otherwise guard/saturate) when computing the abort timestamp.

Suggested change
abortTS := startTS + 1
abortTS := abortTSFrom(startTS)

Copilot uses AI. Check for mistakes.
Comment on lines 151 to 154
commitTS := startTS + 1
if i.clock != nil {
i.clock.Observe(startTS)
commitTS = i.clock.Next()
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fillForwardedTxnCommitTS falls back to commitTS := startTS + 1 when i.clock is nil. This can overflow (wrap to 0) for large startTS values and violates the invariant enforced in the FSM that commit/abort timestamps must be strictly greater than startTS. Consider adding an overflow guard (e.g., saturate or reject) or reusing the overflow-safe helper used elsewhere (abortTSFrom).

Suggested change
commitTS := startTS + 1
if i.clock != nil {
i.clock.Observe(startTS)
commitTS = i.clock.Next()
var commitTS uint64
if i.clock != nil {
i.clock.Observe(startTS)
commitTS = i.clock.Next()
} else {
const maxUint64 = ^uint64(0)
if startTS == maxUint64 {
// Cannot choose a CommitTS strictly greater than startTS without overflow.
// Leave metaMutations unchanged rather than wrapping to 0.
return
}
commitTS = startTS + 1

Copilot uses AI. Check for mistakes.
Phase: pb.Phase_COMMIT,
Ts: startTS,
Mutations: append([]*pb.Mutation{
{Op: pb.Op_PUT, Key: []byte(txnMetaPrefix), Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKeyFromMutations(muts)})},
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buildTxnLogs constructs the COMMIT request with a TxnMeta that omits CommitTS (defaults to 0). The FSM now requires meta.CommitTS != 0 for COMMIT/ABORT phases, so these generated logs will error if this path is used (even for single-shard helpers). Consider passing a real commit timestamp into buildTxnLogs (or generating one) and encoding it into the commit-phase TxnMeta.

Suggested change
{Op: pb.Op_PUT, Key: []byte(txnMetaPrefix), Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKeyFromMutations(muts)})},
{Op: pb.Op_PUT, Key: []byte(txnMetaPrefix), Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKeyFromMutations(muts), CommitTS: startTS})},

Copilot uses AI. Check for mistakes.
@bootjp bootjp requested a review from Copilot February 15, 2026 15:54
@bootjp
Copy link
Owner Author

bootjp commented Feb 15, 2026

/gemini review

@bootjp bootjp changed the title Refactor transaction timestamp management Refactor transaction timestamp management for cross shard data Feb 15, 2026
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a major refactoring to implement a distributed transaction protocol, likely based on Percolator. The changes are extensive, touching coordination, state machine logic, storage, and adapters. The introduction of 2-phase commit (prewrite/commit), transaction metadata, lock resolution on read, and sharded transaction coordination is a significant step towards a robust distributed KV store. The code is well-structured with new concepts cleanly separated into new files (e.g., txn_codec.go, txn_keys.go). My review includes one suggestion for a minor performance improvement in the primary key selection logic.

Comment on lines 284 to 303
func primaryKeyForElems(reqs []*Elem[OP]) []byte {
keys := make([][]byte, 0, len(reqs))
seen := map[string]struct{}{}
for _, e := range reqs {
if e == nil || len(e.Key) == 0 {
continue
}
k := string(e.Key)
if _, ok := seen[k]; ok {
continue
}
seen[k] = struct{}{}
keys = append(keys, e.Key)
}
if len(keys) == 0 {
return nil
}
sort.Slice(keys, func(i, j int) bool { return bytes.Compare(keys[i], keys[j]) < 0 })
return keys[0]
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This implementation for finding the primary key collects all unique keys into a slice and then sorts it to find the lexicographically smallest key. A more efficient approach would be to iterate through the elements once and keep track of the smallest key found so far. This would avoid the overhead of allocating a slice for all keys and the cost of sorting, especially for transactions with many keys.

func primaryKeyForElems(reqs []*Elem[OP]) []byte {
	var primary []byte
	seen := make(map[string]struct{})
	for _, e := range reqs {
		if e == nil || len(e.Key) == 0 {
			continue
		}
		k := string(e.Key)
		if _, ok := seen[k]; ok {
			continue
		}
		seen[k] = struct{}{}
		if primary == nil || bytes.Compare(e.Key, primary) < 0 {
			primary = e.Key
		}
	}
	return primary
}

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.


func (c *ShardedCoordinator) nextTxnTSAfter(startTS uint64) uint64 {
if c.clock == nil {
return startTS + 1
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function nextTxnTSAfter doesn't check for overflow when c.clock is nil. If startTS equals ^uint64(0) (the maximum uint64 value), then startTS + 1 will overflow to 0. This could cause issues when comparing timestamps. Consider adding an overflow check similar to the one in abortTSFrom or documenting that timestamps near uint64 max are not supported.

Suggested change
return startTS + 1
nextTS := startTS + 1
if nextTS == 0 {
return startTS
}
return nextTS

Copilot uses AI. Check for mistakes.
Comment on lines 170 to 190
func (c *ShardedCoordinator) commitSecondaryTxns(startTS uint64, primaryGid uint64, primaryKey []byte, grouped map[uint64][]*pb.Mutation, gids []uint64, commitTS uint64, maxIndex uint64) uint64 {
meta := txnMetaMutation(primaryKey, 0, commitTS)
for _, gid := range gids {
if gid == primaryGid {
continue
}
g, ok := c.groups[gid]
if !ok || g == nil || g.Txn == nil {
continue
}
req := &pb.Request{
IsTxn: true,
Phase: pb.Phase_COMMIT,
Ts: startTS,
Mutations: append([]*pb.Mutation{meta}, keyMutations(grouped[gid])...),
}
if r, err := g.Txn.Commit([]*pb.Request{req}); err == nil && r != nil && r.CommitIndex > maxIndex {
maxIndex = r.CommitIndex
}
}
return maxIndex
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commitSecondaryTxns function ignores errors from secondary shard commits (line 186). If a secondary shard fails to commit after the primary has committed, the transaction is left in an inconsistent state where the primary is committed but some secondaries might not be. This could lead to data inconsistency. Consider logging these errors or implementing a retry mechanism to ensure all secondaries eventually commit, or document that best-effort secondary commits are intentional.

Copilot uses AI. Check for mistakes.
}

func (i *Internal) fillForwardedTxnCommitTS(reqs []*pb.Request, startTS uint64) {
const metaPrefix = "!txn|meta|"
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transaction metadata prefix is duplicated here as a local constant. This constant is already defined in kv/txn_keys.go as txnMetaPrefix = "!txn|meta|". Consider importing and using the constant from kv package instead of duplicating it to maintain consistency and avoid potential issues if the prefix changes.

Copilot uses AI. Check for mistakes.
@bootjp
Copy link
Owner Author

bootjp commented Feb 15, 2026

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a major refactoring of the transaction handling mechanism, moving from a simple optimistic concurrency control model to a full-fledged two-phase commit (2PC) protocol to support cross-shard transactions. This is a significant and complex change that touches many parts of the system, including the coordinator, the Raft FSM, the storage layer, and the Redis adapter.

The implementation appears to be well-structured and correct, introducing new concepts like transaction metadata, locks, intents, and commit/rollback records. The logic for the 2PC coordinator, read-time lock resolution, and timestamp management seems sound. The code is well-modularized with new files for codecs, key formats, and constants, which improves maintainability.

I have one suggestion regarding a potential performance issue in the scan lock resolution logic, which could lead to an N+1 query problem. Otherwise, this is an excellent piece of work.

Comment on lines +328 to +347
func (s *ShardStore) resolveScanLocks(ctx context.Context, g *ShardGroup, kvs []*store.KVPair, ts uint64) ([]*store.KVPair, error) {
if len(kvs) == 0 {
return kvs, nil
}
if g == nil || g.Store == nil {
return []*store.KVPair{}, nil
}
out := make([]*store.KVPair, 0, len(kvs))
for _, kvp := range kvs {
resolved, skip, err := s.resolveScanKVP(ctx, g, kvp, ts)
if err != nil {
return nil, err
}
if skip {
continue
}
out = append(out, resolved)
}
return out, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of resolveScanLocks iterates through each key-value pair from the scan and resolves locks one by one by calling resolveScanKVP. Inside resolveScanKVP, if a lock is found, it triggers leaderGetAt, which can result in multiple store reads and potentially a new Raft command to resolve the lock. This creates an N+1 query problem where N is the number of locked keys in the scan range, which could lead to significant performance degradation for scans over ranges with many locked keys.

Consider refactoring this to batch lock resolutions. For example, you could first iterate through the scan results to collect all locked keys and their transaction metadata. Then, you could determine the status of all primary keys in a batch. Finally, you could apply all necessary transaction resolutions (commit/abort) in a single or a few batched Raft commands, and then re-read the values for the resolved keys. This would reduce the number of round trips and Raft commands significantly.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 20 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +253 to +258
func abortTSFrom(commitTS uint64) uint64 {
abortTS := commitTS + 1
if abortTS == 0 {
return commitTS
}
return abortTS
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abortTSFrom returns commitTS when commitTS+1 overflows, which can yield abortTS == commitTS and potentially <= startTS. That makes ABORT requests invalid (kvFSM requires abortTS > startTS) and can strand locks. Prefer to detect overflow and return an explicit error (or choose a different cleanup timestamp strategy) instead of returning an invalid/ambiguous abort timestamp.

Copilot uses AI. Check for mistakes.
var requests []*pb.Request
if reqs.IsTxn {
requests = txnRequests(reqs.StartTS, reqs.Elems)
primary := primaryKeyForElems(reqs.Elems)
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the follower redirect path, a txn with all-empty keys will generate a TxnMeta with an empty PrimaryKey and forward it to the leader. The leader will later fail with ErrTxnPrimaryKeyRequired, but the leader path returns that error earlier. To keep behavior consistent (and avoid forwarding obviously-invalid txns), validate that primaryKeyForElems(reqs.Elems) is non-empty here and return ErrTxnPrimaryKeyRequired/ErrInvalidRequest before calling txnRequests.

Suggested change
primary := primaryKeyForElems(reqs.Elems)
primary := primaryKeyForElems(reqs.Elems)
if len(primary) == 0 {
return nil, errors.WithStack(ErrTxnPrimaryKeyRequired)
}

Copilot uses AI. Check for mistakes.
Comment on lines +164 to +175
func insertVersionSorted(versions []VersionedValue, vv VersionedValue) []VersionedValue {
// Keep versions sorted by TS ascending so lookups can assume max TS is last.
i := sort.Search(len(versions), func(i int) bool { return versions[i].TS >= vv.TS })
if i < len(versions) && versions[i].TS == vv.TS {
// Idempotence: overwrite same timestamp.
versions[i] = vv
return versions
}
versions = append(versions, VersionedValue{})
copy(versions[i+1:], versions[i:])
versions[i] = vv
return versions
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insertVersionSorted always does a binary search + slice shift, even for the common case where vv.TS is greater than the last version timestamp. This turns the hot-path into O(n) per write unnecessarily. Add a fast-path that appends when len(versions)==0 or versions[len-1].TS < vv.TS (and only do Search+copy for out-of-order / overwrite cases).

Copilot uses AI. Check for mistakes.
Comment on lines +237 to +243
func (c *ShardedCoordinator) nextTxnTSAfter(startTS uint64) uint64 {
if c.clock == nil {
nextTS := startTS + 1
if nextTS == 0 {
return startTS
}
return nextTS
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextTxnTSAfter returns startTS when startTS+1 overflows (clock==nil path). That can produce commitTS == startTS, which violates the CommitTS > StartTS invariant enforced by kvFSM and can leave prepared locks unresolvable (commit fails, then abortTSFrom(commitTS) also returns an invalid timestamp). Consider returning an error on overflow (or saturating to MaxUint64 and explicitly failing the txn) so callers can abort/cleanup deterministically.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant