Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 104 additions & 17 deletions adapter/internal.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package adapter

import (
"bytes"
"context"

"github.com/bootjp/elastickv/kv"
Expand Down Expand Up @@ -56,30 +57,116 @@ func (i *Internal) stampTimestamps(req *pb.ForwardRequest) {
return
}
if req.IsTxn {
var startTs uint64
// All requests in a transaction must have the same timestamp.
// Find a timestamp from the requests, or generate a new one if none exist.
for _, r := range req.Requests {
if r.Ts != 0 {
startTs = r.Ts
break
}
i.stampTxnTimestamps(req.Requests)
return
}

i.stampRawTimestamps(req.Requests)
}

func (i *Internal) stampRawTimestamps(reqs []*pb.Request) {
for _, r := range reqs {
if r == nil {
continue
}
if r.Ts != 0 {
continue
}
if i.clock == nil {
r.Ts = 1
continue
}
r.Ts = i.clock.Next()
}
}

if startTs == 0 && len(req.Requests) > 0 {
startTs = i.clock.Next()
func (i *Internal) stampTxnTimestamps(reqs []*pb.Request) {
startTS := forwardedTxnStartTS(reqs)
if startTS == 0 {
if i.clock == nil {
startTS = 1
} else {
startTS = i.clock.Next()
}
}

// Assign the unified timestamp to all requests in the transaction.
for _, r := range req.Requests {
r.Ts = startTs
// Assign the unified timestamp to all requests in the transaction.
for _, r := range reqs {
if r != nil {
r.Ts = startTS
}
return
}

for _, r := range req.Requests {
if r.Ts == 0 {
r.Ts = i.clock.Next()
i.fillForwardedTxnCommitTS(reqs, startTS)
}

func forwardedTxnStartTS(reqs []*pb.Request) uint64 {
for _, r := range reqs {
if r != nil && r.Ts != 0 {
return r.Ts
}
}
return 0
}

func forwardedTxnMetaMutation(r *pb.Request, metaPrefix []byte) (*pb.Mutation, bool) {
if r == nil {
return nil, false
}
if r.Phase != pb.Phase_COMMIT && r.Phase != pb.Phase_ABORT {
return nil, false
}
if len(r.Mutations) == 0 || r.Mutations[0] == nil {
return nil, false
}
if !bytes.HasPrefix(r.Mutations[0].Key, metaPrefix) {
return nil, false
}
return r.Mutations[0], true
}

func (i *Internal) fillForwardedTxnCommitTS(reqs []*pb.Request, startTS uint64) {
type metaToUpdate struct {
m *pb.Mutation
meta kv.TxnMeta
}

metaMutations := make([]metaToUpdate, 0, len(reqs))
prefix := []byte(kv.TxnMetaPrefix)
for _, r := range reqs {
m, ok := forwardedTxnMetaMutation(r, prefix)
if !ok {
continue
}
meta, err := kv.DecodeTxnMeta(m.Value)
if err != nil {
continue
}
if meta.CommitTS != 0 {
continue
}
metaMutations = append(metaMutations, metaToUpdate{m: m, meta: meta})
}
if len(metaMutations) == 0 {
return
}

commitTS := startTS + 1
if commitTS == 0 {
// Overflow: can't choose a commit timestamp strictly greater than startTS.
return
}
if i.clock != nil {
i.clock.Observe(startTS)
commitTS = i.clock.Next()
Comment on lines 154 to 161
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fillForwardedTxnCommitTS falls back to commitTS := startTS + 1 when i.clock is nil. This can overflow (wrap to 0) for large startTS values and violates the invariant enforced in the FSM that commit/abort timestamps must be strictly greater than startTS. Consider adding an overflow guard (e.g., saturate or reject) or reusing the overflow-safe helper used elsewhere (abortTSFrom).

Suggested change
commitTS := startTS + 1
if i.clock != nil {
i.clock.Observe(startTS)
commitTS = i.clock.Next()
var commitTS uint64
if i.clock != nil {
i.clock.Observe(startTS)
commitTS = i.clock.Next()
} else {
const maxUint64 = ^uint64(0)
if startTS == maxUint64 {
// Cannot choose a CommitTS strictly greater than startTS without overflow.
// Leave metaMutations unchanged rather than wrapping to 0.
return
}
commitTS = startTS + 1

Copilot uses AI. Check for mistakes.
}
if commitTS <= startTS {
// Defensive: avoid writing an invalid CommitTS.
return
}

for _, item := range metaMutations {
item.meta.CommitTS = commitTS
item.m.Value = kv.EncodeTxnMeta(item.meta)
}
}
27 changes: 24 additions & 3 deletions adapter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ func (r *RedisServer) Run() error {

name := strings.ToUpper(string(cmd.Args[0]))
if state.inTxn && name != cmdExec && name != cmdDiscard && name != cmdMulti {
state.queue = append(state.queue, cmd)
// redcon reuses the underlying argument buffers; copy queued commands
// so MULTI/EXEC works reliably under concurrency and with -race.
state.queue = append(state.queue, cloneCommand(cmd))
conn.WriteString("QUEUED")
return
}
Expand All @@ -170,6 +172,17 @@ func (r *RedisServer) Run() error {
return errors.WithStack(err)
}

func cloneCommand(cmd redcon.Command) redcon.Command {
out := redcon.Command{
Raw: bytes.Clone(cmd.Raw),
Args: make([][]byte, len(cmd.Args)),
}
for i := range cmd.Args {
out.Args[i] = bytes.Clone(cmd.Args[i])
}
return out
}

func (r *RedisServer) Stop() {
_ = r.listen.Close()
}
Expand Down Expand Up @@ -233,8 +246,14 @@ func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) {
return
}

key := cmd.Args[1]
readTS := r.readTS()
v, err := r.readValueAt(cmd.Args[1], readTS)
// When proxying reads to the leader, let the leader choose a safe snapshot.
// Our local store watermark may lag behind a just-committed transaction.
if !r.coordinator.IsLeaderForKey(key) {
readTS = 0
}
v, err := r.readValueAt(key, readTS)
if err != nil {
switch {
case errors.Is(err, store.ErrKeyNotFound):
Expand Down Expand Up @@ -1156,7 +1175,9 @@ func (r *RedisServer) tryLeaderGetAt(key []byte, ts uint64) ([]byte, error) {
if err != nil {
return nil, errors.WithStack(err)
}

if resp.Value == nil {
return nil, errors.WithStack(store.ErrKeyNotFound)
}
return resp.Value, nil
}

Expand Down
2 changes: 1 addition & 1 deletion adapter/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestRedis_follower_redirect_node_set_get_deleted(t *testing.T) {
assert.Equal(t, int64(1), res3.Val())

res4 := rdb.Get(ctx, string(key))
assert.NoError(t, res4.Err())
assert.Equal(t, redis.Nil, res4.Err())
assert.Equal(t, "", res4.Val())
}

Expand Down
66 changes: 58 additions & 8 deletions kv/coordinator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kv

import (
"bytes"
"context"

pb "github.com/bootjp/elastickv/proto"
Expand Down Expand Up @@ -101,7 +102,18 @@ func (c *Coordinate) nextStartTS() uint64 {
}

func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64) (*CoordinateResponse, error) {
logs := txnRequests(startTS, reqs)
primary := primaryKeyForElems(reqs)
if len(primary) == 0 {
return nil, errors.WithStack(ErrTxnPrimaryKeyRequired)
}

commitTS := c.clock.Next()
if commitTS <= startTS {
c.clock.Observe(startTS)
commitTS = c.clock.Next()
}

logs := txnRequests(startTS, commitTS, defaultTxnLockTTLms, primary, reqs)

r, err := c.transactionManager.Commit(logs)
if err != nil {
Expand Down Expand Up @@ -185,7 +197,11 @@ func (c *Coordinate) redirect(ctx context.Context, reqs *OperationGroup[OP]) (*C

var requests []*pb.Request
if reqs.IsTxn {
requests = txnRequests(reqs.StartTS, reqs.Elems)
primary := primaryKeyForElems(reqs.Elems)
Copy link

Copilot AI Feb 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the follower redirect path, a txn with all-empty keys will generate a TxnMeta with an empty PrimaryKey and forward it to the leader. The leader will later fail with ErrTxnPrimaryKeyRequired, but the leader path returns that error earlier. To keep behavior consistent (and avoid forwarding obviously-invalid txns), validate that primaryKeyForElems(reqs.Elems) is non-empty here and return ErrTxnPrimaryKeyRequired/ErrInvalidRequest before calling txnRequests.

Suggested change
primary := primaryKeyForElems(reqs.Elems)
primary := primaryKeyForElems(reqs.Elems)
if len(primary) == 0 {
return nil, errors.WithStack(ErrTxnPrimaryKeyRequired)
}

Copilot uses AI. Check for mistakes.
if len(primary) == 0 {
return nil, errors.WithStack(ErrTxnPrimaryKeyRequired)
}
requests = txnRequests(reqs.StartTS, 0, defaultTxnLockTTLms, primary, reqs.Elems)
} else {
for _, req := range reqs.Elems {
requests = append(requests, c.toRawRequest(req))
Expand Down Expand Up @@ -237,17 +253,51 @@ func elemToMutation(req *Elem[OP]) *pb.Mutation {
panic("unreachable")
}

func txnRequests(startTS uint64, reqs []*Elem[OP]) []*pb.Request {
muts := make([]*pb.Mutation, 0, len(reqs))
func txnRequests(startTS, commitTS, lockTTLms uint64, primaryKey []byte, reqs []*Elem[OP]) []*pb.Request {
meta := &pb.Mutation{
Op: pb.Op_PUT,
Key: []byte(txnMetaPrefix),
Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKey, LockTTLms: lockTTLms, CommitTS: 0}),
}

prepareMuts := make([]*pb.Mutation, 0, len(reqs)+1)
prepareMuts = append(prepareMuts, meta)
for _, req := range reqs {
muts = append(muts, elemToMutation(req))
prepareMuts = append(prepareMuts, elemToMutation(req))
}

commitMeta := &pb.Mutation{
Op: pb.Op_PUT,
Key: []byte(txnMetaPrefix),
Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKey, LockTTLms: 0, CommitTS: commitTS}),
}
commitMuts := make([]*pb.Mutation, 0, len(reqs)+1)
commitMuts = append(commitMuts, commitMeta)
for _, req := range reqs {
commitMuts = append(commitMuts, &pb.Mutation{Op: pb.Op_PUT, Key: req.Key})
}

// Use separate slices for PREPARE and COMMIT to avoid sharing slice header/state.
prepareMuts := append([]*pb.Mutation(nil), muts...)
commitMuts := append([]*pb.Mutation(nil), muts...)
return []*pb.Request{
{IsTxn: true, Phase: pb.Phase_PREPARE, Ts: startTS, Mutations: prepareMuts},
{IsTxn: true, Phase: pb.Phase_COMMIT, Ts: startTS, Mutations: commitMuts},
}
}

func primaryKeyForElems(reqs []*Elem[OP]) []byte {
var primary []byte
seen := make(map[string]struct{}, len(reqs))
for _, e := range reqs {
if e == nil || len(e.Key) == 0 {
continue
}
k := string(e.Key)
if _, ok := seen[k]; ok {
continue
}
seen[k] = struct{}{}
if primary == nil || bytes.Compare(e.Key, primary) < 0 {
primary = e.Key
}
}
return primary
}
Comment on lines 286 to 303
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This implementation for finding the primary key collects all unique keys into a slice and then sorts it to find the lexicographically smallest key. A more efficient approach would be to iterate through the elements once and keep track of the smallest key found so far. This would avoid the overhead of allocating a slice for all keys and the cost of sorting, especially for transactions with many keys.

func primaryKeyForElems(reqs []*Elem[OP]) []byte {
	var primary []byte
	seen := make(map[string]struct{})
	for _, e := range reqs {
		if e == nil || len(e.Key) == 0 {
			continue
		}
		k := string(e.Key)
		if _, ok := seen[k]; ok {
			continue
		}
		seen[k] = struct{}{}
		if primary == nil || bytes.Compare(e.Key, primary) < 0 {
			primary = e.Key
		}
	}
	return primary
}

Loading
Loading