Feat: unified key-value store abstraction and adapters#165
Feat: unified key-value store abstraction and adapters#165
Conversation
…m, Fastly, and Cloudflare
Resolves in and in
… and error, and update documentation
…TL validation for expiration tests.
… instead of an in-memory map
ChristianPavilonis
left a comment
There was a problem hiding this comment.
There's some opportunity to cleanup 🧹
| use std::sync::Mutex; | ||
|
|
||
| // Minimal in-memory store for testing the handle/trait contract | ||
| struct MockStore { |
There was a problem hiding this comment.
We have many mock stores in the tests, MockStore in kv.rs and handlers.rs, NoopStore 2x in extractor.rs, Stub in context.rs
| /// | ||
| /// If a KV namespace with this binding exists in your `wrangler.toml`, | ||
| /// it will be automatically available to handlers via the `Kv` extractor. | ||
| pub const DEFAULT_KV_BINDING: &str = "EDGEZERO_KV"; |
There was a problem hiding this comment.
There's several constants that are set to "EDGEZERO_KV" could just have one constant.
| })?; | ||
|
|
||
| // Initialize the table | ||
| let write_txn = db |
There was a problem hiding this comment.
This write_txn boiler plate is repeated 6 times, add some helpers to clean this up.
aram356
left a comment
There was a problem hiding this comment.
Staff Engineer Review — KV Store Feature
The architecture is solid: KvStore trait → KvHandle typed wrapper → Kv extractor → manifest config. The contract test macro is a strong addition. The layering follows the established adapter pattern correctly.
However, there are several issues to address before merge, ranging from a WASM compat convention violation to a security concern in the demo handler.
Summary
| Priority | Count | Categories |
|---|---|---|
| Must fix | 4 | WASM compat, unbounded body, feature gating, committed artifacts |
| Should fix | 6 | Silent failures, dead code, test gaps, API inconsistency |
| Nits | 4 | Dependencies, defensive validation, unnecessary imports |
See inline comments for details on each.
What's well-designed
- Object-safe trait with
#[async_trait(?Send)]+Send + Sync— correct for both WASM single-threaded and native multi-threaded contexts - Validation can't be bypassed —
KvHandle.storeis private, all public methods validate before delegating - Graceful KV injection — Fastly/Cloudflare silently skip KV if the store doesn't exist, so handlers that don't use KV are unaffected
PersistentKvStorewith redb — ACID-compliant, persistent across restarts, lazy TTL eviction. Right tradeoff for local dev- Contract test macro — ensures all backends conform to the same behavioral contract
- Manifest-driven config with per-adapter overrides — follows existing patterns
Issues not attached inline
Must fix: Committed artifacts that don't belong
final_review.md— review document from a prior session. ReferencesMemoryKvStorewhich no longer exists. Should not be checked in.examples/app-demo/node_modules/.mf/cf.json— Miniflare runtime state containing client TLS fingerprints, IP geolocation, and bot detection scores. Auto-generated, should be gitignored.
Should fix: Contract tests don't cover TTL expiration
- The
kv_contract_tests!macro tests put/get/delete/list/exists but not TTL. TTL is tested only in axum adapter tests. No shared contract verifying expired entries disappear across backends.
Should fix: env::set_var in demo tests is unsound on Rust 1.91
handlers.rstests useenv::set_var/env::remove_varforAPI_BASE_URLin tests that run in parallel. Since Rust 1.83, these are unsafe in multi-threaded contexts. Consider extracting the base URL as a function parameter or using#[serial].
Nit: Transitive serde_json in Fastly/Cloudflare adapters
- Neither adapter lists
serde_jsonexplicitly. Works today via transitive dep through core, but fragile if core ever feature-gates it.
|
|
||
| // -- Raw bytes ---------------------------------------------------------- | ||
|
|
||
| #[tokio::test] |
There was a problem hiding this comment.
Must fix: #[tokio::test] in core crate violates WASM compatibility rule
CLAUDE.md is explicit: "Async tests use futures::executor::block_on (not Tokio) for WASM compat."
The contract test macro at line 380 correctly uses futures::executor::block_on, but all 30+ unit tests in this file use #[tokio::test]. This also introduced a tokio dev-dependency in edgezero-core/Cargo.toml that shouldn't be there.
Fix: Replace every #[tokio::test] with #[test] + futures::executor::block_on:
#[test]
fn raw_bytes_roundtrip() {
let h = handle();
futures::executor::block_on(async {
h.put_bytes("k", Bytes::from("hello")).await.unwrap();
assert_eq!(h.get_bytes("k").await.unwrap(), Some(Bytes::from("hello")));
});
}The axum adapter tests using #[tokio::test] are fine — Tokio is a natural dependency there.
| brotli = { workspace = true } | ||
| flate2 = { workspace = true } | ||
| tempfile = { workspace = true } | ||
| tokio = { workspace = true, features = ["macros", "rt"] } |
There was a problem hiding this comment.
Must fix: Remove tokio from core dev-dependencies
This was added to support #[tokio::test] in kv.rs, but core crate tests must use futures::executor::block_on for WASM compat. Remove this line after converting the tests.
| } | ||
|
|
||
| /// Drain a [`Body`] into a single [`Bytes`] buffer, regardless of variant. | ||
| async fn collect_body(body: Body) -> Result<Bytes, EdgeError> { |
There was a problem hiding this comment.
Must fix: Unbounded request body — denial of service
collect_body() reads the entire request into memory with no size limit. The 25MB KV validation at store.put_bytes() fires after the full allocation. An attacker can stream a multi-GB body to /kv/notes/{id} and OOM the process.
The framework itself isn't vulnerable (KvHandle validation is correct), but this demo handler is the canonical example users will copy.
Fix: Cap accumulation:
const MAX_BODY_SIZE: usize = 25 * 1024 * 1024;
async fn collect_body(body: Body) -> Result<Bytes, EdgeError> {
if body.is_stream() {
let mut stream = body.into_stream().expect("checked is_stream");
let mut buf = Vec::new();
while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(EdgeError::internal)?;
buf.extend_from_slice(&chunk);
if buf.len() > MAX_BODY_SIZE {
return Err(EdgeError::bad_request("request body too large"));
}
}
Ok(Bytes::from(buf))
} else {
Ok(body.into_bytes())
}
}| futures-util = { workspace = true, optional = true } | ||
| http = { workspace = true } | ||
| log = { workspace = true } | ||
| redb = { workspace = true } |
There was a problem hiding this comment.
Must fix: redb should be behind the axum feature gate
redb, serde_json (line 40), and web-time (line 43) are unconditional dependencies, but the kv module that uses them is gated on #[cfg(feature = "axum")] in lib.rs. Anyone pulling in this crate just for the cli feature compiles redb unnecessarily.
Fix:
[features]
axum = ["dep:axum", "dep:tokio", "dep:tower", "dep:futures-util", "dep:reqwest", "dep:redb", "dep:serde_json", "dep:web-time"]
[dependencies]
redb = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
web-time = { workspace = true, optional = true }| let mut core_request = into_core_request(req).map_err(map_edge_error)?; | ||
|
|
||
| // Inject KV handle if the store exists — graceful fallback. | ||
| if let Ok(store) = FastlyKvStore::open(kv_store_name) { |
There was a problem hiding this comment.
Should fix: KV open failure silently swallowed
If the store name is misspelled in edgezero.toml, this if let Ok silently proceeds without KV. The handler later calls ctx.kv_handle() and gets None, producing a confusing 500 from the Kv extractor ("no kv store configured"). The developer has no indication that the store open failed — it looks like it was never configured.
Compare: the Axum adapter crashes on the same failure (dev_server.rs:106).
Fix: Log a warning on failure:
match FastlyKvStore::open(kv_store_name) {
Ok(store) => {
core_request.extensions_mut().insert(KvHandle::new(Arc::new(store)));
}
Err(e) => {
log::warn!("KV store '{}' not available: {}", kv_store_name, e);
}
}|
|
||
| // -- Validation --------------------------------------------------------- | ||
|
|
||
| fn validate_key(key: &str) -> Result<(), KvError> { |
There was a problem hiding this comment.
Should fix: Empty keys pass validation
validate_key rejects "." and ".." but accepts "" (empty string). The test at line 739 explicitly exercises empty key roundtrips. Neither Cloudflare nor Fastly document behavior for empty string keys — if a backend ever rejects them, local dev would silently diverge.
Either reject them:
if key.is_empty() {
return Err(KvError::Validation("key cannot be empty".to_string()));
}Or add a comment documenting the intentional permissiveness.
| Ok(()) | ||
| } | ||
|
|
||
| fn validate_ttl(ttl: Duration) -> Result<(), KvError> { |
There was a problem hiding this comment.
Nit: No TTL upper bound — overflow risk
Minimum enforced (60s) but no maximum. Duration::from_secs(u64::MAX) would cause SystemTime::now() + ttl to panic in debug or wrap to epoch in release (making the entry immediately expired — the opposite of intent).
Low risk in practice, but a defensive MAX_TTL (e.g. 1 year) would be trivial:
pub const MAX_TTL: Duration = Duration::from_secs(365 * 24 * 60 * 60);| } | ||
|
|
||
| impl Kv { | ||
| #[must_use] |
There was a problem hiding this comment.
Should fix: Inconsistent #[must_use] annotation
Kv::into_inner() is the only extractor with #[must_use]. None of the other 11 extractors (Json, Path, Query, Form, Headers, Host, ForwardedHost, and all validated variants) have it. Remove for consistency, or add it everywhere.
| use bytes::Bytes; | ||
| use edgezero_core::kv::{KvError, KvStore}; | ||
| use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition}; | ||
| use web_time::SystemTime; |
There was a problem hiding this comment.
Nit: web-time unnecessary in axum adapter
The axum adapter only compiles for native targets (not WASM), so std::time::SystemTime works fine. web_time::SystemTime is a WASM polyfill that adds a dependency for no functional benefit here. std::time::Duration is already used on line 50.
If this is intentional for cross-crate consistency, add a comment explaining why.
| serve_with_listener_and_kv_path(router, listener, enable_ctrl_c, ".edgezero/kv.redb").await | ||
| } | ||
|
|
||
| async fn serve_with_listener_and_kv_path( |
There was a problem hiding this comment.
Nit: Consider logging KV store path on startup
This function creates the persistent KV store, but the server startup logs don't mention where data is stored. A log::info!("KV store: {}", kv_path) here would help developers understand where their data lives and debug persistence issues.
| let mut table = write_txn.open_table(KV_TABLE).map_err(|e| { | ||
| KvError::Internal(anyhow::anyhow!("failed to open table: {}", e)) | ||
| })?; | ||
| table.remove(key).map_err(|e| { |
There was a problem hiding this comment.
Race Condition in Expiration Cleanup (HIGH)
There's a TOCTOU race in the lazy expiration eviction. get_bytes reads the expiry in a read transaction (line 153), drops it (lines 155-156), then opens a separate write transaction to delete the key (lines 159-172). Between the drop and the delete, a concurrent request could put_bytes with a fresh value for the same key — and the delete would silently wipe out that new write.
The same pattern exists in list_keys (lines 284-313): expired keys are collected during a read txn, then batch-deleted in a separate write txn.
Fix: Re-check the expiry timestamp inside the write transaction before deleting:
let write_txn = self.db.begin_write()?;
{
let mut table = write_txn.open_table(KV_TABLE)?;
// Re-read to verify still expired (not overwritten by a concurrent put)
if let Some(entry) = table.get(key)? {
let (_, expires_at) = entry.value();
if Self::is_expired(expires_at) {
table.remove(key)?;
}
}
}
write_txn.commit()?;Apply the same guard in list_keys for each expired key in the batch delete loop.
| #[cfg(feature = "axum")] | ||
| mod dev_server; | ||
| #[cfg(feature = "axum")] | ||
| pub mod kv; |
There was a problem hiding this comment.
⛏️ Please think about using key_value_store in place of just kv because we will have parallel implementation for config and secret stores
Summary
This PR implements the full Key-Value store specification for EdgeZero, providing a portable abstraction for Axum, Fastly, and Cloudflare.
Implements
KvStoretrait andKvHandlewrapper inkv.rs(closes KV Store Abstraction #43, As a developer I want a portable KV store interface #44).ctx.kv_handle()andKvextractor (closes As a developer I want to access KV stores from request handlers #48).[stores.kv]section toedgezero.toml(closes KV store manifest bindings #49).kv_contract_tests!test suite ensuring behavioral consistency (closes KV store contract tests #50).PersistentKvStore(Axum) (closes Axum KV adapter (dev) #47 )FastlyKvStoreincrates/edgezero-adapter-fastly/src/kv.rs(closes Fastly KV adapter #45 )CloudflareKvStoreincrates/edgezero-adapter-cloudflare/src/kv.rs(closes Cloudflare KV adapter #46 )Integration
scripts/smoke_test_kv.shfor cross-platform verification.examples/app-demoto use the new KV features inexamples/app-demo/crates/app-demo-core/src/handlers.rs.