-
Notifications
You must be signed in to change notification settings - Fork 0
Feat: unified key-value store abstraction and adapters #165
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ff0fec4
b1aff0c
fc69ee2
5ac528c
857b2cd
a133876
0fb3340
41d3398
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ bin/ | |
| pkg/ | ||
| target/ | ||
| .wrangler/ | ||
| .edgezero/ | ||
|
|
||
| # env | ||
| .env | ||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -74,9 +74,13 @@ impl AxumDevServer { | |
| } | ||
|
|
||
| #[cfg(test)] | ||
| async fn run_with_listener(self, listener: tokio::net::TcpListener) -> anyhow::Result<()> { | ||
| async fn run_with_listener( | ||
| self, | ||
| listener: tokio::net::TcpListener, | ||
| kv_path: &str, | ||
| ) -> anyhow::Result<()> { | ||
| let AxumDevServer { router, config } = self; | ||
| serve_with_listener(router, listener, config.enable_ctrl_c).await | ||
| serve_with_listener_and_kv_path(router, listener, config.enable_ctrl_c, kv_path).await | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -85,7 +89,25 @@ async fn serve_with_listener( | |
| listener: tokio::net::TcpListener, | ||
| enable_ctrl_c: bool, | ||
| ) -> anyhow::Result<()> { | ||
| let service = EdgeZeroAxumService::new(router); | ||
| serve_with_listener_and_kv_path(router, listener, enable_ctrl_c, ".edgezero/kv.redb").await | ||
| } | ||
|
|
||
| async fn serve_with_listener_and_kv_path( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Consider logging KV store path on startupThis function creates the persistent KV store, but the server startup logs don't mention where data is stored. A |
||
| router: RouterService, | ||
| listener: tokio::net::TcpListener, | ||
| enable_ctrl_c: bool, | ||
| kv_path: &str, | ||
| ) -> anyhow::Result<()> { | ||
| // Create a persistent KV store | ||
| if let Some(parent) = std::path::Path::new(kv_path).parent() { | ||
| std::fs::create_dir_all(parent).context("failed to create KV store directory")?; | ||
| } | ||
| let kv_store = std::sync::Arc::new( | ||
| crate::kv::PersistentKvStore::new(kv_path).context("failed to create KV store")?, | ||
| ); | ||
| let kv_handle = edgezero_core::kv::KvHandle::new(kv_store); | ||
|
|
||
| let service = EdgeZeroAxumService::new(router).with_kv_handle(kv_handle); | ||
| let router = Router::new().fallback_service(service_fn(move |req| { | ||
| let mut svc = service.clone(); | ||
| async move { svc.call(req).await } | ||
|
|
@@ -204,6 +226,7 @@ mod integration_tests { | |
| struct TestServer { | ||
| base_url: String, | ||
| handle: tokio::task::JoinHandle<()>, | ||
| _temp_dir: tempfile::TempDir, | ||
| } | ||
|
|
||
| async fn start_test_server(router: RouterService) -> TestServer { | ||
|
|
@@ -217,13 +240,19 @@ mod integration_tests { | |
| }; | ||
| let server = AxumDevServer::with_config(router, config); | ||
|
|
||
| // Use a unique temp directory for each test server | ||
| let temp_dir = tempfile::tempdir().expect("create temp dir"); | ||
| let kv_path = temp_dir.path().join("kv.redb"); | ||
| let kv_path_str = kv_path.to_str().expect("valid path").to_string(); | ||
|
|
||
| let handle = tokio::spawn(async move { | ||
| let _ = server.run_with_listener(listener).await; | ||
| let _ = server.run_with_listener(listener, &kv_path_str).await; | ||
| }); | ||
|
|
||
| TestServer { | ||
| base_url: format!("http://{}", addr), | ||
| handle, | ||
| _temp_dir: temp_dir, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -358,4 +387,189 @@ mod integration_tests { | |
|
|
||
| drop(listener); | ||
| } | ||
|
|
||
| #[tokio::test(flavor = "multi_thread")] | ||
| async fn kv_store_persists_across_requests() { | ||
| async fn write_handler(ctx: RequestContext) -> Result<&'static str, EdgeError> { | ||
| let store = ctx.kv_handle().expect("kv configured"); | ||
| store.put("counter", &42i32).await?; | ||
| Ok("written") | ||
| } | ||
|
|
||
| async fn read_handler(ctx: RequestContext) -> Result<String, EdgeError> { | ||
| let store = ctx.kv_handle().expect("kv configured"); | ||
| let val: i32 = store.get_or("counter", 0).await?; | ||
| Ok(val.to_string()) | ||
| } | ||
|
|
||
| let router = RouterService::builder() | ||
| .post("/write", write_handler) | ||
| .get("/read", read_handler) | ||
| .build(); | ||
| let server = start_test_server(router).await; | ||
|
|
||
| let client = reqwest::Client::new(); | ||
|
|
||
| // Write a value | ||
| let write_url = format!("{}/write", server.base_url); | ||
| let response = send_with_retry(&client, |client| client.post(write_url.as_str())).await; | ||
| assert_eq!(response.status(), reqwest::StatusCode::OK); | ||
| assert_eq!(response.text().await.unwrap(), "written"); | ||
|
|
||
| // Read it back — proves shared state across requests | ||
| let read_url = format!("{}/read", server.base_url); | ||
| let response = send_with_retry(&client, |client| client.get(read_url.as_str())).await; | ||
| assert_eq!(response.status(), reqwest::StatusCode::OK); | ||
| assert_eq!(response.text().await.unwrap(), "42"); | ||
|
|
||
| server.handle.abort(); | ||
| } | ||
|
|
||
| #[tokio::test(flavor = "multi_thread")] | ||
| async fn kv_store_delete_across_requests() { | ||
| async fn write_handler(ctx: RequestContext) -> Result<&'static str, EdgeError> { | ||
| let kv = ctx.kv_handle().expect("kv configured"); | ||
| kv.put("temp", &"to_delete").await?; | ||
| Ok("written") | ||
| } | ||
|
|
||
| async fn delete_handler(ctx: RequestContext) -> Result<&'static str, EdgeError> { | ||
| let kv = ctx.kv_handle().expect("kv configured"); | ||
| kv.delete("temp").await?; | ||
| Ok("deleted") | ||
| } | ||
|
|
||
| async fn check_handler(ctx: RequestContext) -> Result<String, EdgeError> { | ||
| let kv = ctx.kv_handle().expect("kv configured"); | ||
| let exists = kv.exists("temp").await?; | ||
| Ok(format!("exists={exists}")) | ||
| } | ||
|
|
||
| let router = RouterService::builder() | ||
| .post("/write", write_handler) | ||
| .post("/delete", delete_handler) | ||
| .get("/check", check_handler) | ||
| .build(); | ||
| let server = start_test_server(router).await; | ||
| let client = reqwest::Client::new(); | ||
|
|
||
| // Write | ||
| let url = format!("{}/write", server.base_url); | ||
| send_with_retry(&client, |c| c.post(url.as_str())).await; | ||
|
|
||
| // Verify exists | ||
| let url = format!("{}/check", server.base_url); | ||
| let resp = send_with_retry(&client, |c| c.get(url.as_str())).await; | ||
| assert_eq!(resp.text().await.unwrap(), "exists=true"); | ||
|
|
||
| // Delete | ||
| let url = format!("{}/delete", server.base_url); | ||
| send_with_retry(&client, |c| c.post(url.as_str())).await; | ||
|
|
||
| // Verify gone | ||
| let url = format!("{}/check", server.base_url); | ||
| let resp = send_with_retry(&client, |c| c.get(url.as_str())).await; | ||
| assert_eq!(resp.text().await.unwrap(), "exists=false"); | ||
|
|
||
| server.handle.abort(); | ||
| } | ||
|
|
||
| #[tokio::test(flavor = "multi_thread")] | ||
| async fn kv_store_update_across_requests() { | ||
| async fn increment_handler(ctx: RequestContext) -> Result<String, EdgeError> { | ||
| let kv = ctx.kv_handle().expect("kv configured"); | ||
| let val = kv.update("counter", 0i32, |n| n + 1).await?; | ||
| Ok(val.to_string()) | ||
| } | ||
|
|
||
| let router = RouterService::builder() | ||
| .post("/inc", increment_handler) | ||
| .build(); | ||
| let server = start_test_server(router).await; | ||
| let client = reqwest::Client::new(); | ||
| let url = format!("{}/inc", server.base_url); | ||
|
|
||
| // Increment 5 times, each should return incremented value | ||
| for expected in 1..=5i32 { | ||
| let resp = send_with_retry(&client, |c| c.post(url.as_str())).await; | ||
| assert_eq!( | ||
| resp.text().await.unwrap(), | ||
| expected.to_string(), | ||
| "increment #{expected}" | ||
| ); | ||
| } | ||
|
|
||
| server.handle.abort(); | ||
| } | ||
|
|
||
| #[tokio::test(flavor = "multi_thread")] | ||
| async fn kv_store_returns_not_found_gracefully() { | ||
| async fn read_handler(ctx: RequestContext) -> Result<String, EdgeError> { | ||
| let kv = ctx.kv_handle().expect("kv configured"); | ||
| let val: i32 = kv.get_or("nonexistent", -1).await?; | ||
| Ok(val.to_string()) | ||
| } | ||
|
|
||
| let router = RouterService::builder().get("/read", read_handler).build(); | ||
| let server = start_test_server(router).await; | ||
| let client = reqwest::Client::new(); | ||
|
|
||
| let url = format!("{}/read", server.base_url); | ||
| let resp = send_with_retry(&client, |c| c.get(url.as_str())).await; | ||
| assert_eq!(resp.status(), reqwest::StatusCode::OK); | ||
| assert_eq!(resp.text().await.unwrap(), "-1"); | ||
|
|
||
| server.handle.abort(); | ||
| } | ||
|
|
||
| #[tokio::test(flavor = "multi_thread")] | ||
| async fn kv_store_handles_typed_data() { | ||
| use serde::{Deserialize, Serialize}; | ||
|
|
||
| #[derive(Serialize, Deserialize, PartialEq, Debug)] | ||
| struct UserProfile { | ||
| name: String, | ||
| age: u32, | ||
| active: bool, | ||
| } | ||
|
|
||
| async fn write_handler(ctx: RequestContext) -> Result<&'static str, EdgeError> { | ||
| let kv = ctx.kv_handle().expect("kv configured"); | ||
| let profile = UserProfile { | ||
| name: "Alice".to_string(), | ||
| age: 30, | ||
| active: true, | ||
| }; | ||
| kv.put("user:alice", &profile).await?; | ||
| Ok("saved") | ||
| } | ||
|
|
||
| async fn read_handler(ctx: RequestContext) -> Result<String, EdgeError> { | ||
| let kv = ctx.kv_handle().expect("kv configured"); | ||
| let profile: Option<UserProfile> = kv.get("user:alice").await?; | ||
| match profile { | ||
| Some(p) => Ok(format!("{}:{}", p.name, p.age)), | ||
| None => Ok("not found".to_string()), | ||
| } | ||
| } | ||
|
|
||
| let router = RouterService::builder() | ||
| .post("/save", write_handler) | ||
| .get("/load", read_handler) | ||
| .build(); | ||
| let server = start_test_server(router).await; | ||
| let client = reqwest::Client::new(); | ||
|
|
||
| // Save profile | ||
| let url = format!("{}/save", server.base_url); | ||
| let resp = send_with_retry(&client, |c| c.post(url.as_str())).await; | ||
| assert_eq!(resp.text().await.unwrap(), "saved"); | ||
|
|
||
| // Load profile | ||
| let url = format!("{}/load", server.base_url); | ||
| let resp = send_with_retry(&client, |c| c.get(url.as_str())).await; | ||
| assert_eq!(resp.text().await.unwrap(), "Alice:30"); | ||
|
|
||
| server.handle.abort(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Must fix:
redbshould be behind theaxumfeature gateredb,serde_json(line 40), andweb-time(line 43) are unconditional dependencies, but thekvmodule that uses them is gated on#[cfg(feature = "axum")]inlib.rs. Anyone pulling in this crate just for theclifeature compilesredbunnecessarily.Fix: