-
Notifications
You must be signed in to change notification settings - Fork 2
Refactor transaction timestamp management for cross shard data #313
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
dc280eb
fda8142
35f7783
b7e219a
daa1385
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,6 +1,7 @@ | ||||||||||||
| package kv | ||||||||||||
|
|
||||||||||||
| import ( | ||||||||||||
| "bytes" | ||||||||||||
| "context" | ||||||||||||
|
|
||||||||||||
| pb "github.com/bootjp/elastickv/proto" | ||||||||||||
|
|
@@ -101,7 +102,18 @@ func (c *Coordinate) nextStartTS() uint64 { | |||||||||||
| } | ||||||||||||
|
|
||||||||||||
| func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64) (*CoordinateResponse, error) { | ||||||||||||
| logs := txnRequests(startTS, reqs) | ||||||||||||
| primary := primaryKeyForElems(reqs) | ||||||||||||
| if len(primary) == 0 { | ||||||||||||
| return nil, errors.WithStack(ErrTxnPrimaryKeyRequired) | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| commitTS := c.clock.Next() | ||||||||||||
| if commitTS <= startTS { | ||||||||||||
| c.clock.Observe(startTS) | ||||||||||||
| commitTS = c.clock.Next() | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| logs := txnRequests(startTS, commitTS, defaultTxnLockTTLms, primary, reqs) | ||||||||||||
|
|
||||||||||||
| r, err := c.transactionManager.Commit(logs) | ||||||||||||
| if err != nil { | ||||||||||||
|
|
@@ -185,7 +197,11 @@ func (c *Coordinate) redirect(ctx context.Context, reqs *OperationGroup[OP]) (*C | |||||||||||
|
|
||||||||||||
| var requests []*pb.Request | ||||||||||||
| if reqs.IsTxn { | ||||||||||||
| requests = txnRequests(reqs.StartTS, reqs.Elems) | ||||||||||||
| primary := primaryKeyForElems(reqs.Elems) | ||||||||||||
|
||||||||||||
| primary := primaryKeyForElems(reqs.Elems) | |
| primary := primaryKeyForElems(reqs.Elems) | |
| if len(primary) == 0 { | |
| return nil, errors.WithStack(ErrTxnPrimaryKeyRequired) | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation for finding the primary key collects all unique keys into a slice and then sorts it to find the lexicographically smallest key. A more efficient approach would be to iterate through the elements once and keep track of the smallest key found so far. This would avoid the overhead of allocating a slice for all keys and the cost of sorting, especially for transactions with many keys.
func primaryKeyForElems(reqs []*Elem[OP]) []byte {
var primary []byte
seen := make(map[string]struct{})
for _, e := range reqs {
if e == nil || len(e.Key) == 0 {
continue
}
k := string(e.Key)
if _, ok := seen[k]; ok {
continue
}
seen[k] = struct{}{}
if primary == nil || bytes.Compare(e.Key, primary) < 0 {
primary = e.Key
}
}
return primary
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fillForwardedTxnCommitTSfalls back tocommitTS := startTS + 1wheni.clockis nil. This can overflow (wrap to 0) for largestartTSvalues and violates the invariant enforced in the FSM that commit/abort timestamps must be strictly greater thanstartTS. Consider adding an overflow guard (e.g., saturate or reject) or reusing the overflow-safe helper used elsewhere (abortTSFrom).