Refactor coordinator logic and add sharded support#304
Conversation
Summary of ChangesHello @bootjp, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a fundamental architectural shift by implementing sharding capabilities within the system. It abstracts the coordination logic into a flexible interface, enabling operations to be routed to specific Raft groups based on key ranges. This change impacts how data is stored, retrieved, and how transactions are processed, ensuring scalability and distributed consistency. The accompanying updates to the Jepsen test suite provide robust validation for these new sharding mechanisms. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a significant architectural change by adding sharding support to the system. It refactors the coordinator logic, introducing a ShardedCoordinator, ShardStore, and ShardRouter to manage distributed data and operations across multiple Raft groups. The adapters for gRPC, Redis, and DynamoDB have been updated to be shard-aware. While this is a comprehensive effort to enable horizontal scaling, the review has identified several critical and high-severity issues. These include a major security vulnerability due to a committed private SSH key, hardcoded paths that break environment portability, and multiple instances of stale data reads that could compromise correctness guarantees like linearizability and snapshot isolation. These issues should be addressed before merging.
There was a problem hiding this comment.
Pull request overview
This PR refactors the coordinator architecture to support multi-raft group sharding, enabling horizontal scaling through key-range partitioning. The changes introduce a pluggable coordinator interface with two implementations: the original single-group Coordinate and a new ShardedCoordinator that routes operations across multiple raft groups based on configurable key ranges.
Changes:
- Introduces sharded coordinator infrastructure with key-range based routing via a new distribution engine
- Refactors main.go to support multi-raft group initialization with configurable shard ranges
- Updates all adapters (gRPC, Redis, DynamoDB) to use the abstract Coordinator interface with key-aware leader operations
- Adds Jepsen test enhancements for sharded deployment validation
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| shard_config.go | Parses and validates raft group and shard range configuration from command-line flags |
| multiraft_runtime.go | Initializes multiple raft groups with per-group data directories |
| main.go | Refactored server initialization to build sharded infrastructure and start per-group gRPC servers |
| kv/sharded_coordinator.go | New coordinator implementation routing operations to shards via distribution engine |
| kv/shard_store.go | MVCC store wrapper routing reads/writes to appropriate shard stores with leader proxying |
| kv/shard_router.go | Routes requests to shard-specific transactional managers, now using routeKey normalization |
| kv/shard_key.go | Normalizes internal keys (list metadata/items) to user keys for routing |
| kv/leader_proxy.go | Forwards transactional requests to raft leaders when local node is follower |
| kv/coordinator.go | Extended interface with key-aware leader methods; refactored transaction log building |
| adapter/grpc.go | Updated to use Coordinator interface with IsLeaderForKey/RaftLeaderForKey |
| adapter/redis.go | Updated to use Coordinator interface; refactored transaction timestamp logic |
| adapter/dynamodb.go | Updated to use Coordinator interface |
| adapter/redis_proxy.go | Updated to use RaftLeaderForKey for key-aware proxying |
| kv/sharded_integration_test.go | Integration test validating multi-shard coordinator dispatch |
| kv/shard_router_test.go | New test validating list key routing with routeKey normalization |
| jepsen/src/elastickv/db.clj | Enhanced to support multi-group raft configuration and validation |
| jepsen/src/elastickv/redis_workload.clj | Added local mode and sharding configuration options |
| jepsen/docker/ssh_config | SSH configuration for local Docker-based Jepsen testing |
| jepsen/docker/run-in-docker.sh | Docker entrypoint script for containerized Jepsen tests |
| jepsen/docker/id_rsa | Standard Vagrant insecure private key for test environments |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Added GetIntersectingRoutes method to Engine to find routes whose key ranges intersect with the scan range [start, end). Modified ScanAt in ShardStore to only query groups whose shard ranges intersect with the requested scan range, avoiding unnecessary work and incorrect results. Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Fix ScanAt to only query shards with intersecting key ranges
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 42 out of 43 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
adapter/grpc.go
Outdated
| return resp.Value, nil | ||
| readTS := req.GetTs() | ||
| if readTS == 0 { | ||
| readTS = snapshotTS(r.coordinator.Clock(), r.store) |
There was a problem hiding this comment.
The RawScanAt method calls r.coordinator.Clock() without checking if r.coordinator is nil. Unlike RawGet which has a nil check on lines 48-50, this function directly calls Clock() on line 94. If coordinator is nil, this will cause a nil pointer dereference panic. Add a nil check similar to RawGet, or use the same defensive pattern with a local clock variable.
| readTS = snapshotTS(r.coordinator.Clock(), r.store) | |
| var clock *kv.HLC | |
| if r.coordinator != nil { | |
| clock = r.coordinator.Clock() | |
| } | |
| readTS = snapshotTS(clock, r.store) |
e8d22c4 to
f79a0de
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 42 out of 43 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if err != nil { | ||
| for _, rt := range runtimes { | ||
| rt.Close() | ||
| } | ||
| _ = st.Close() | ||
| if closeStores != nil { | ||
| closeStores() | ||
| } | ||
| return nil, nil, errors.Wrapf(err, "failed to start raft group %d", g.id) |
There was a problem hiding this comment.
When cleaning up after an error in buildShardGroups, the function closes runtimes and the current st/closeStores, but it doesn't close the transport managers (tm) that were already created and registered in the runtimes. The Close method on raftGroupRuntime will handle tm.Close(), but since we're iterating and calling rt.Close() before the current failed runtime is added to the list, the cleanup is correct. However, for clarity and robustness, consider also closing tm explicitly in the error path before returning, in case the raftGroupRuntime.Close() implementation changes in the future.
| func TestShardedCoordinatorDispatch(t *testing.T) { | ||
| ctx := context.Background() | ||
|
|
||
| engine := distribution.NewEngine() | ||
| engine.UpdateRoute([]byte("a"), []byte("m"), 1) | ||
| engine.UpdateRoute([]byte("m"), nil, 2) | ||
|
|
||
| s1 := store.NewMVCCStore() | ||
| r1, stop1 := newSingleRaft(t, "g1", NewKvFSM(s1)) | ||
| defer stop1() | ||
|
|
||
| s2 := store.NewMVCCStore() | ||
| r2, stop2 := newSingleRaft(t, "g2", NewKvFSM(s2)) | ||
| defer stop2() | ||
|
|
||
| groups := map[uint64]*ShardGroup{ | ||
| 1: {Raft: r1, Store: s1, Txn: NewLeaderProxy(r1)}, | ||
| 2: {Raft: r2, Store: s2, Txn: NewLeaderProxy(r2)}, | ||
| } | ||
|
|
||
| shardStore := NewShardStore(engine, groups) | ||
| coord := NewShardedCoordinator(engine, groups, 1, NewHLC(), shardStore) | ||
|
|
||
| ops := &OperationGroup[OP]{ | ||
| IsTxn: false, | ||
| Elems: []*Elem[OP]{ | ||
| {Op: Put, Key: []byte("b"), Value: []byte("v1")}, | ||
| {Op: Put, Key: []byte("x"), Value: []byte("v2")}, | ||
| }, | ||
| } | ||
| if _, err := coord.Dispatch(ctx, ops); err != nil { | ||
| t.Fatalf("dispatch: %v", err) | ||
| } | ||
|
|
||
| readTS := shardStore.LastCommitTS() | ||
| v, err := shardStore.GetAt(ctx, []byte("b"), readTS) | ||
| if err != nil || string(v) != "v1" { | ||
| t.Fatalf("get b: %v %v", v, err) | ||
| } | ||
| v, err = shardStore.GetAt(ctx, []byte("x"), readTS) | ||
| if err != nil || string(v) != "v2" { | ||
| t.Fatalf("get x: %v %v", v, err) | ||
| } | ||
|
|
||
| if _, err := s1.GetAt(ctx, []byte("x"), readTS); !errors.Is(err, store.ErrKeyNotFound) { | ||
| t.Fatalf("expected key x missing in group1, got %v", err) | ||
| } | ||
| if _, err := s2.GetAt(ctx, []byte("b"), readTS); !errors.Is(err, store.ErrKeyNotFound) { | ||
| t.Fatalf("expected key b missing in group2, got %v", err) | ||
| } | ||
| } |
There was a problem hiding this comment.
The test only validates non-transactional cross-shard writes. Consider adding a test case that verifies cross-shard transactions are properly rejected with ErrCrossShardTransactionNotSupported when IsTxn=true and operations span multiple shards. This would validate the critical constraint documented in lines 225-231 of sharded_coordinator.go.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a significant and well-architected refactoring to add sharded support to the key-value store. The changes are extensive, touching coordination, storage, networking, and testing. Key additions include a sharded coordinator, a key-range-based routing engine, and leader-aware stores to ensure read consistency. The explicit rejection of cross-shard transactions is a sensible choice for managing complexity. The introduction of a gRPC connection cache is a good performance optimization. The test suite has also been impressively updated to cover the new sharded architecture.
My review focuses on a few areas for improvement: ensuring proper resource cleanup in the gRPC server, enhancing context propagation in the Redis adapter for better request lifecycle management, and correcting a potential timestamp generation issue in the sharded coordinator to guarantee transaction isolation.
| func (r *GRPCServer) Close() error { | ||
| return nil | ||
| } |
There was a problem hiding this comment.
The Close method is a no-op, but the GRPCServer holds a store.MVCCStore which, in practice, is a *kv.LeaderRoutedStore. This store contains a GRPCConnCache that manages gRPC connections. Without closing the store, these connections will be leaked. The Close method should delegate to r.store.Close() to ensure proper resource cleanup.
func (r *GRPCServer) Close() error {
if r == nil || r.store == nil {
return nil
}
return r.store.Close()
}| if c.clock == nil { | ||
| return maxTS, nil | ||
| } |
There was a problem hiding this comment.
In the case where c.clock is nil, this function returns maxTS. This could result in a new transaction starting with a timestamp equal to a previous transaction's commit timestamp, which violates the strict requirement for start timestamps to be greater than any observed commit timestamps and could break snapshot isolation. To ensure correctness, the returned timestamp should be at least maxTS + 1.
| if c.clock == nil { | |
| return maxTS, nil | |
| } | |
| if c.clock == nil { | |
| return maxTS + 1, nil | |
| } |
adapter/redis.go
Outdated
| } | ||
|
|
||
| _, err = r.coordinator.Dispatch(res) | ||
| _, err = r.coordinator.Dispatch(context.Background(), res) |
There was a problem hiding this comment.
The coordinator.Dispatch method is called with context.Background(). This prevents request cancellation, deadline propagation, and tracing from the client through the Redis adapter. While the redcon library may not directly provide a request-scoped context, using a context with a timeout would make the system more robust against long-running or stuck operations. Consider wrapping context.Background() with a timeout.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Timeout can be made configurable.
defer cancel()
_, err = r.coordinator.Dispatch(ctx, res)There was a problem hiding this comment.
Pull request overview
Copilot reviewed 42 out of 43 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if ctx == nil { | ||
| ctx = context.Background() | ||
| } | ||
|
|
There was a problem hiding this comment.
Coordinate.Dispatch dereferences reqs (reqs.IsTxn / reqs.Elems) without validating that reqs is non-nil and contains at least one non-nil element. A malformed request can panic the server; please return ErrInvalidRequest for nil/empty (and possibly nil elems) similar to ShardedCoordinator.validateOperationGroup().
| // Validate the request before any use to avoid panics on malformed input. | |
| if reqs == nil { | |
| return nil, ErrInvalidRequest | |
| } | |
| if len(reqs.Elems) == 0 { | |
| return nil, ErrInvalidRequest | |
| } | |
| // Ensure there is at least one non-nil element, similar to ShardedCoordinator.validateOperationGroup(). | |
| hasNonNil := false | |
| for _, op := range reqs.Elems { | |
| if op != nil { | |
| hasNonNil = true | |
| break | |
| } | |
| } | |
| if !hasNonNil { | |
| return nil, ErrInvalidRequest | |
| } |
| for _, r := range reqs { | ||
| if len(r.Mutations) == 0 { | ||
| return nil, ErrInvalidRequest | ||
| } | ||
| key := r.Mutations[0].Key | ||
| key := routeKey(r.Mutations[0].Key) |
There was a problem hiding this comment.
ShardRouter.groupRequests iterates over reqs and assumes each *pb.Request is non-nil. If a nil request is ever passed (e.g., via a buggy forwarder/client), this will panic when accessing r.Mutations. Please defensively check r == nil and return ErrInvalidRequest instead of panicking.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request is a significant and well-executed refactoring to introduce sharding support. It refactors the coordinator logic, introduces a sharded store and coordinator, and moves routing logic into the appropriate layers. The changes also include critical correctness fixes for transaction timestamping in an MVCC system and performance improvements like gRPC connection caching. The overall design is solid and the new abstractions are clean. I have one suggestion to improve the consistency and safety of handling transaction logs in the new sharded coordinator.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 42 out of 43 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
proto/service.proto
Outdated
| uint64 ts = 4; // optional read timestamp; if zero, server uses current HLC | ||
| } | ||
|
|
||
| message RawKvPair { |
There was a problem hiding this comment.
The message name RawKvPair uses inconsistent casing (Kv instead of KV) compared to other messages in this file like RawKV service and existing Kv message. Consider renaming to RawKVPair for consistency with protobuf naming conventions where acronyms are typically treated as single words (e.g., RawKV, not RawKv).
There was a problem hiding this comment.
@copilot open a new pull request to apply changes based on this feedback
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Rename RawKvPair to RawKVPair for protobuf naming consistency
|
/gemini review |
There was a problem hiding this comment.
Code Review
This is a major and well-executed refactoring to introduce sharding support. The changes are extensive, introducing several new components like ShardedCoordinator, ShardRouter, and ShardStore to create a clear separation of concerns for key-based routing, leader forwarding, and transaction coordination. The correctness of transaction timestamp generation is also improved, and performance is enhanced through connection caching. Overall, this is a significant and impressive improvement to the architecture.
I've found one critical issue related to transaction correctness in the Redis adapter, which violates the principle of consistent snapshot views in MVCC transactions, and one high-severity issue regarding a potential resource leak. Please see the detailed comments.
I am having trouble creating individual review comments. Click here to see my feedback.
adapter/redis.go (437-444)
The removal of the leader-proxying logic in exec seems incorrect. The new runTransaction implementation relies on r.store to fetch the latest commit timestamps for keys in the transaction. However, the store provided to RedisServer is the raw local MVCCStore, not a leader-aware store wrapper like LeaderRoutedStore or ShardStore.
This means if a Redis EXEC command is received by a follower node, runTransaction will read potentially stale data from its local store to determine the transaction's start timestamp, breaking snapshot isolation. The transaction might then be dispatched to the leader and commit, but it was based on a stale view of the data.
The previous implementation correctly proxied the entire EXEC command to the leader. This logic should be restored, or the store used by RedisServer must be made leader-aware to correctly route the LatestCommitTS lookups.
References
- In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 42 out of 43 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| logs = append(logs, | ||
| &pb.Request{IsTxn: true, Phase: pb.Phase_PREPARE, Ts: startTS, Mutations: muts}, | ||
| &pb.Request{IsTxn: true, Phase: pb.Phase_COMMIT, Ts: startTS, Mutations: muts}, |
There was a problem hiding this comment.
In buildTxnLogs(), the same muts slice is reused for both the PREPARE and COMMIT requests. In coordinator.go, txnRequests() explicitly copies the mutations slice for each phase to avoid any accidental shared-slice side effects if later code appends/modifies mutations. Consider copying muts per phase here too for consistency and future safety.
| logs = append(logs, | |
| &pb.Request{IsTxn: true, Phase: pb.Phase_PREPARE, Ts: startTS, Mutations: muts}, | |
| &pb.Request{IsTxn: true, Phase: pb.Phase_COMMIT, Ts: startTS, Mutations: muts}, | |
| // Copy mutations per phase to avoid shared-slice side effects between phases. | |
| prepMuts := append([]*pb.Mutation(nil), muts...) | |
| commitMuts := append([]*pb.Mutation(nil), muts...) | |
| logs = append(logs, | |
| &pb.Request{IsTxn: true, Phase: pb.Phase_PREPARE, Ts: startTS, Mutations: prepMuts}, | |
| &pb.Request{IsTxn: true, Phase: pb.Phase_COMMIT, Ts: startTS, Mutations: commitMuts}, |
No description provided.