diff --git a/README.md b/README.md index b824fae..139dbdd 100644 --- a/README.md +++ b/README.md @@ -65,483 +65,38 @@ public class OrderProcessor extends DurableHandler { } ``` -## Core Operations - -### step() – Execute with Checkpointing - -Steps execute your code and checkpoint the result. On replay, results from completed checkpoints are returned without re-execution. - -```java -// Basic step (blocks until complete) -var result = ctx.step("fetch-user", User.class, () -> userService.getUser(userId)); - -// Step with custom configuration (retries, semantics, serialization) -var result = ctx.step("call-api", Response.class, - () -> externalApi.call(request), - StepConfig.builder() - .retryStrategy(...) - .semantics(...) - .build()); -``` - -See [Step Configuration](#step-configuration) for retry strategies, delivery semantics, and per-step serialization options. - -### stepAsync() and DurableFuture – Concurrent Operations - -`stepAsync()` starts a step in the background and returns a `DurableFuture`. This enables concurrent execution patterns. - -```java -// Start multiple operations concurrently -DurableFuture userFuture = ctx.stepAsync("fetch-user", User.class, - () -> userService.getUser(userId)); -DurableFuture> ordersFuture = ctx.stepAsync("fetch-orders", - new TypeToken>() {}, () -> orderService.getOrders(userId)); - -// Both operations run concurrently -// Block and get results when needed -User user = userFuture.get(); -List orders = ordersFuture.get(); -``` - -### wait() – Suspend Without Cost - -Waits suspend the function and resume after the specified duration. You're not charged during suspension. - -```java -// Wait 30 minutes -ctx.wait(Duration.ofMinutes(30)); - -// Named wait (useful for debugging) -ctx.wait("cooling-off-period", Duration.ofDays(7)); -``` - -### createCallback() – Wait for External Events - -Callbacks suspend execution until an external system sends a result. Use this for human approvals, webhooks, or any event-driven workflow. - -```java -// Create a callback and get the ID to share with external systems -DurableCallbackFuture callback = ctx.createCallback("approval", String.class); - -// Send the callback ID to an external system within a step -ctx.step("send-notification", String.class, () -> { - notificationService.sendApprovalRequest(callback.callbackId(), requestDetails); - return "notification-sent"; -}); - -// Suspend until the external system calls back with a result -String approvalResult = callback.get(); -``` - -The external system completes the callback by calling the Lambda Durable Functions API with the callback ID and result payload. - -#### Callback Configuration - -Configure timeouts and serialization to handle cases where callbacks are never completed or need custom deserialization: - -```java -var config = CallbackConfig.builder() - .timeout(Duration.ofHours(24)) // Max time to wait for callback - .heartbeatTimeout(Duration.ofHours(1)) // Max time between heartbeats - .serDes(new CustomSerDes()) // Custom serialization/deserialization - .build(); - -var callback = ctx.createCallback("approval", String.class, config); -``` - -| Option | Description | -|--------|-------------| -| `timeout()` | Maximum duration to wait for the callback to complete | -| `heartbeatTimeout()` | Maximum duration between heartbeat signals from the external system | -| `serDes()` | Custom SerDes for deserializing callback results (e.g., encryption, custom formats) | - -#### Callback Exceptions - -| Exception | When Thrown | -|-----------|-------------| -| `CallbackTimeoutException` | Callback exceeded its timeout duration | -| `CallbackFailedException` | External system sent an error response | - -```java -try { - var result = callback.get(); -} catch (CallbackTimeoutException e) { - // Callback timed out - implement fallback logic -} catch (CallbackFailedException e) { - // External system reported an error -} -``` - -### invoke() - Invoke another Lambda function - - -```java -// Basic invoke -var result = ctx.invoke("invoke-function", - "function-name", - "\"payload\"", - Result.class, - InvokeConfig.builder() - .payloadSerDes(...) // payload serializer - .resultSerDes(...) // result deserializer - .timeout(Duration.of(...)) // wait timeout - .tenantId(...) // Lambda tenantId - .build() - ); - -``` - -### runInChildContext() – Isolated Execution Contexts - -Child contexts run an isolated stream of work with their own operation counter and checkpoint log. They support the full range of durable operations — `step`, `wait`, `invoke`, `createCallback`, and nested child contexts. - -```java -// Sync: blocks until the child context completes -var result = ctx.runInChildContext("validate-order", String.class, child -> { - var data = child.step("fetch", String.class, () -> fetchData()); - child.wait(Duration.ofMinutes(5)); - return child.step("validate", String.class, () -> validate(data)); -}); - -// Async: returns a DurableFuture for concurrent execution -var futureA = ctx.runInChildContextAsync("branch-a", String.class, child -> { - return child.step("work-a", String.class, () -> doWorkA()); -}); -var futureB = ctx.runInChildContextAsync("branch-b", String.class, child -> { - return child.step("work-b", String.class, () -> doWorkB()); -}); - -// Wait for all child contexts to complete -var results = DurableFuture.allOf(futureA, futureB); -``` - -## Step Configuration - -Configure step behavior with `StepConfig`: - -```java -ctx.step("my-step", Result.class, () -> doWork(), - StepConfig.builder() - .retryStrategy(...) // How to handle failures - .semantics(...) // At-least-once vs at-most-once - .serDes(...) // Custom serialization - .build()); -``` - -### Retry Strategies - -Configure how steps handle transient failures: - -```java -// No retry - fail immediately (default) -var noRetries = StepConfig.builder().retryStrategy(RetryStrategies.Presets.NO_RETRY).build() - -// Exponential backoff with jitter -var customRetries = StepConfig.builder() - .retryStrategy(RetryStrategies.exponentialBackoff( - 5, // max attempts - Duration.ofSeconds(2), // initial delay - Duration.ofSeconds(30), // max delay - 2.0, // backoff multiplier - JitterStrategy.FULL)) // randomize delays - .build() -``` - -### Step-Retry Semantics - -Control how steps behave when interrupted mid-execution: - -| Semantic | Behavior | Use Case | -|----------|----------|----------| -| `AT_LEAST_ONCE_PER_RETRY` (default) | Re-executes step if interrupted before completion | Idempotent operations (database upserts, API calls with idempotency keys) | -| `AT_MOST_ONCE_PER_RETRY` | Never re-executes; throws `StepInterruptedException` if interrupted | Non-idempotent operations (sending emails, charging payments) | - -```java -// Default: at-least-once per retry (step may re-run if interrupted) -var result = ctx.step("idempotent-update", Result.class, - () -> database.upsert(record)); - -// At-most-once per retry -var result = ctx.step("send-email", Result.class, - () -> emailService.send(notification), - StepConfig.builder() - .semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY) - .build()); -``` - -**Important**: These semantics apply *per retry attempt*, not per overall execution: - -- **AT_LEAST_ONCE_PER_RETRY**: The step executes at least once per retry. If the step succeeds but checkpointing fails (e.g., sandbox crash), the step re-executes on replay. -- **AT_MOST_ONCE_PER_RETRY**: A checkpoint is created before execution. If failure occurs after checkpoint but before completion, the step is skipped on replay and `StepInterruptedException` is thrown. - -To achieve step-level at-most-once semantics, combine with a no-retry strategy: - -```java -// True at-most-once: step executes at most once, ever -var result = ctx.step("charge-payment", Result.class, - () -> paymentService.charge(amount), - StepConfig.builder() - .semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY) - .retryStrategy(RetryStrategies.Presets.NO_RETRY) - .build()); -``` - -Without this, a step using `AT_MOST_ONCE_PER_RETRY` with retries enabled could still execute multiple times across different retry attempts. - -### Generic Types - -When a step returns a parameterized type like `List`, use `TypeToken` to preserve the type information: - -```java -var users = ctx.step("fetch-users", new TypeToken>() {}, - () -> userService.getAllUsers()); - -var orderMap = ctx.step("fetch-orders", new TypeToken>() {}, - () -> orderService.getOrdersByCustomer()); -``` - -This is needed for the SDK to deserialize a checkpointed result and get the exact type to reconstruct. See [TypeToken and Type Erasure](docs/internal-design.md#typetoken-and-type-erasure) for technical details. - -## Configuration - -Customize SDK behavior by overriding `createConfiguration()` in your handler: - -```java -public class OrderProcessor extends DurableHandler { - - @Override - protected DurableConfig createConfiguration() { - // Custom Lambda client with connection pooling - var lambdaClient = LambdaClient.builder() - .httpClient(ApacheHttpClient.builder() - .maxConnections(50) - .connectionTimeout(Duration.ofSeconds(30)) - .build()) - .build(); - - return DurableConfig.builder() - .withLambdaClient(lambdaClient) - .withSerDes(new MyCustomSerDes()) // Custom serialization - .withExecutorService(Executors.newFixedThreadPool(10)) // Custom thread pool - .withLoggerConfig(LoggerConfig.withReplayLogging()) // Enable replay logs - .build(); - } - - @Override - protected OrderResult handleRequest(Order order, DurableContext ctx) { - // Your handler logic - } -} -``` - -| Option | Description | Default | -|--------|-------------|---------| -| `withLambdaClient()` | Custom AWS Lambda client | Auto-configured Lambda client | -| `withSerDes()` | Serializer for step results | Jackson with default settings | -| `withExecutorService()` | Thread pool for user-defined operations | Cached daemon thread pool | -| `withLoggerConfig()` | Logger behavior configuration | Suppress logs during replay | - -The `withExecutorService()` option configures the thread pool used for running user-defined operations. Internal SDK coordination (checkpoint batching, polling) runs on an SDK-managed thread pool. - -## Logging - -The SDK provides a `DurableLogger` via `ctx.getLogger()` that automatically includes execution metadata in log entries and suppresses duplicate logs during replay. - -### Basic Usage - -```java -@Override -protected OrderResult handleRequest(Order order, DurableContext ctx) { - ctx.getLogger().info("Processing order: {}", order.getId()); - - var result = ctx.step("validate", String.class, stepCtx -> { - stepCtx.getLogger().debug("Validating order details"); - return validate(order); - }); - - ctx.getLogger().info("Order processed successfully"); - return new OrderResult(result); -} -``` - -### Log Output - -Logs include execution context via MDC (works with any SLF4J-compatible logging framework): - -```json -{ - "timestamp": "2024-01-15T10:30:00.000Z", - "level": "INFO", - "message": "Processing order: ORD-123", - "durableExecutionArn": "arn:aws:lambda:us-east-1:123456789:function:order-processor:exec-abc123", - "requestId": "a1b2c3d4-5678-90ab-cdef-example12345", - "operationId": "1", - "operationName": "validate" -} -``` - -### Replay Behavior - -By default, logs are suppressed during replay to avoid duplicates: - -``` -First Invocation: - [INFO] Processing order: ORD-123 ✓ Logged - [DEBUG] Validating order details ✓ Logged - -Replay (after wait): - [INFO] Processing order: ORD-123 ✗ Suppressed (already logged) - [DEBUG] Validating order details ✗ Suppressed - [INFO] Continuing after wait ✓ Logged (new code path) -``` - -To log during replay (e.g., for debugging): - -```java -@Override -protected DurableConfig createConfiguration() { - return DurableConfig.builder() - .withLoggerConfig(LoggerConfig.withReplayLogging()) - .build(); -} -``` - -## Error Handling - -The SDK throws specific exceptions to help you handle different failure scenarios: - -``` -DurableExecutionException - General durable exception -├── NonDeterministicExecutionException - Code changed between original execution and replay. Fix code to maintain determinism; don't change step order/names. -├── SerDesException - Serialization and deserialization exception. -└── DurableOperationException - General operation exception - ├── StepException - General Step exception - │ ├── StepFailedException - Step exhausted all retry attempts.Catch to implement fallback logic or let execution fail. - │ └── StepInterruptedException - `AT_MOST_ONCE` step was interrupted before completion. Implement manual recovery (check if operation completed externally) - ├── InvokeException - General chained invocation exception - │ ├── InvokeFailedException - Chained invocation failed. Handle the error or propagate failure. - │ ├── InvokeTimedoutException - Chained invocation timed out. Handle the error or propagate failure. - │ └── InvokeStoppedException - Chained invocation stopped. Handle the error or propagate failure. - ├── CallbackException - General callback exception - │ ├── CallbackFailedException - External system sent an error response to the callback. Handle the error or propagate failure - │ └── CallbackTimeoutException - Callback exceeded its timeout duration. Handle the error or propagate the failure - └── ChildContextFailedException - Child context failed and the original exception could not be reconstructed -``` - -```java -try { - var result = ctx.step("charge-payment", Payment.class, - () -> paymentService.charge(amount), - StepConfig.builder() - .semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY) - .build()); -} catch (StepInterruptedException e) { - // Step started but we don't know if it completed - // Check payment status externally before retrying - var status = paymentService.checkStatus(transactionId); - if (status.isPending()) { - throw e; // Let it fail - manual intervention needed - } -} -``` - -## Testing - -The SDK includes testing utilities for both local development and cloud-based integration testing. - -### Installation - -```xml - - software.amazon.lambda.durable - aws-durable-execution-sdk-java-testing - VERSION - test - -``` - -### Local Testing - -```java -@Test -void testOrderProcessing() { - var handler = new OrderProcessor(); - var runner = LocalDurableTestRunner.create(Order.class, handler); - - var result = runner.runUntilComplete(new Order("order-123", items)); - - assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); - assertNotNull(result.getResult(OrderResult.class).getTrackingNumber()); -} -``` - -You can also pass a lambda directly instead of a handler instance: - -```java -var runner = LocalDurableTestRunner.create(Order.class, (order, ctx) -> { - var result = ctx.step("process", String.class, () -> "done"); - return new OrderResult(order.getId(), result); -}); -``` - -### Inspecting Operations - -```java -var result = runner.runUntilComplete(input); - -// Verify specific step completed -var paymentOp = result.getOperation("process-payment"); -assertNotNull(paymentOp); -assertEquals(OperationStatus.SUCCEEDED, paymentOp.getStatus()); - -// Get step result -var paymentResult = paymentOp.getStepResult(Payment.class); -assertNotNull(paymentResult.getTransactionId()); - -// Inspect all operations -List succeeded = result.getSucceededOperations(); -List failed = result.getFailedOperations(); -``` - -### Controlling Time in Tests +## Deployment -By default, `runUntilComplete()` skips wait durations. For testing time-dependent logic, disable this: +See [examples/README.md](./examples/README.md) for complete instructions on local testing and running cloud integration tests. -```java -var runner = LocalDurableTestRunner.create(Order.class, handler) - .withSkipTime(false); // Don't auto-advance time +See [Deploy and invoke Lambda durable functions with the AWS CLI](https://docs.aws.amazon.com/lambda/latest/dg/durable-getting-started-cli.html) for more information on deploying and invoking durable functions. -var result = runner.run(input); -assertEquals(ExecutionStatus.PENDING, result.getStatus()); // Blocked on wait - -runner.advanceTime(); // Manually advance past the wait -result = runner.run(input); -assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); -``` +See [Deploy Lambda durable functions with Infrastructure as Code](https://docs.aws.amazon.com/lambda/latest/dg/durable-getting-started-iac.html) for more information on deploying durable functions using infrastructure-as-code. -### Cloud Testing +## Documentation -Test against deployed Lambda functions: +- [AWS Documentation](https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html) – Official AWS Lambda durable functions guide +- [Best Practices](https://docs.aws.amazon.com/lambda/latest/dg/durable-best-practices.html) – Patterns and recommendations +- [SDK Design](docs/design.md) – Details of SDK internal architecture -```java -var runner = CloudDurableTestRunner.create( - "arn:aws:lambda:us-east-1:123456789012:function:order-processor:$LATEST", - Order.class, - OrderResult.class); +**Core Operations** -var result = runner.run(new Order("order-123", items)); -assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); -``` +- [Steps](docs/core/steps.md) – Execute code with automatic checkpointing and retry support +- [Wait](docs/core/wait.md) - Pause execution without blocking Lambda resources +- [Callbacks](docs/core/callbacks.md) - Wait for external systems to respond +- [Invoke](docs/core/invoke.md) - Call other durable functions +- [Child Contexts](docs/core/child-contexts.md) - Organize complex workflows into isolated units -## Deployment +**Examples** -See [examples/README.md](./examples/README.md) for complete instructions on local testing, deployment, invoking functions, and running cloud integration tests. +- [Examples](examples/README.md) - Working examples of each operation -## Documentation +**Advanced Topics** -- [AWS Lambda Durable Functions](https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html) – Official AWS documentation -- [Durable Execution SDK Guide](https://docs.aws.amazon.com/lambda/latest/dg/durable-execution-sdk.html) – SDK usage guide -- [Best Practices](https://docs.aws.amazon.com/lambda/latest/dg/durable-best-practices.html) – Patterns and recommendations +- [Configuration](docs/advanced/configuration.md) - Customize SDK behaviour +- [Error Handling](docs/advanced/error-handling.md) - SDK exceptions for handling failures +- [Logging](docs/advanced/logging.md) - How to use DurableLogger +- [Testing](docs/advanced/testing.md) - Utilities for local development and cloud-based integration testing ## Related SDKs diff --git a/docs/advanced/configuration.md b/docs/advanced/configuration.md new file mode 100644 index 0000000..57f628c --- /dev/null +++ b/docs/advanced/configuration.md @@ -0,0 +1,40 @@ +## Configuration + +Customize SDK behavior by overriding `createConfiguration()` in your handler: + +```java +public class OrderProcessor extends DurableHandler { + + @Override + protected DurableConfig createConfiguration() { + // Custom Lambda client with connection pooling + var lambdaClient = LambdaClient.builder() + .httpClient(ApacheHttpClient.builder() + .maxConnections(50) + .connectionTimeout(Duration.ofSeconds(30)) + .build()) + .build(); + + return DurableConfig.builder() + .withLambdaClient(lambdaClient) + .withSerDes(new MyCustomSerDes()) // Custom serialization + .withExecutorService(Executors.newFixedThreadPool(10)) // Custom thread pool + .withLoggerConfig(LoggerConfig.withReplayLogging()) // Enable replay logs + .build(); + } + + @Override + protected OrderResult handleRequest(Order order, DurableContext ctx) { + // Your handler logic + } +} +``` + +| Option | Description | Default | +|--------|-------------|---------| +| `withLambdaClient()` | Custom AWS Lambda client | Auto-configured Lambda client | +| `withSerDes()` | Serializer for step results | Jackson with default settings | +| `withExecutorService()` | Thread pool for user-defined operations | Cached daemon thread pool | +| `withLoggerConfig()` | Logger behavior configuration | Suppress logs during replay | + +The `withExecutorService()` option configures the thread pool used for running user-defined operations. Internal SDK coordination (checkpoint batching, polling) runs on an SDK-managed thread pool. \ No newline at end of file diff --git a/docs/advanced/error-handling.md b/docs/advanced/error-handling.md new file mode 100644 index 0000000..3f0d324 --- /dev/null +++ b/docs/advanced/error-handling.md @@ -0,0 +1,38 @@ +## Error Handling + +The SDK throws specific exceptions to help you handle different failure scenarios: + +``` +DurableExecutionException - General durable exception +├── NonDeterministicExecutionException - Code changed between original execution and replay. Fix code to maintain determinism; don't change step order/names. +├── SerDesException - Serialization and deserialization exception. +└── DurableOperationException - General operation exception + ├── StepException - General Step exception + │ ├── StepFailedException - Step exhausted all retry attempts.Catch to implement fallback logic or let execution fail. + │ └── StepInterruptedException - `AT_MOST_ONCE` step was interrupted before completion. Implement manual recovery (check if operation completed externally) + ├── InvokeException - General chained invocation exception + │ ├── InvokeFailedException - Chained invocation failed. Handle the error or propagate failure. + │ ├── InvokeTimedoutException - Chained invocation timed out. Handle the error or propagate failure. + │ └── InvokeStoppedException - Chained invocation stopped. Handle the error or propagate failure. + ├── CallbackException - General callback exception + │ ├── CallbackFailedException - External system sent an error response to the callback. Handle the error or propagate failure + │ └── CallbackTimeoutException - Callback exceeded its timeout duration. Handle the error or propagate the failure + └── ChildContextFailedException - Child context failed and the original exception could not be reconstructed +``` + +```java +try { + var result = ctx.step("charge-payment", Payment.class, + () -> paymentService.charge(amount), + StepConfig.builder() + .semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY) + .build()); +} catch (StepInterruptedException e) { + // Step started but we don't know if it completed + // Check payment status externally before retrying + var status = paymentService.checkStatus(transactionId); + if (status.isPending()) { + throw e; // Let it fail - manual intervention needed + } +} +``` \ No newline at end of file diff --git a/docs/advanced/logging.md b/docs/advanced/logging.md new file mode 100644 index 0000000..fe47622 --- /dev/null +++ b/docs/advanced/logging.md @@ -0,0 +1,62 @@ +## Logging + +The SDK provides a `DurableLogger` via `ctx.getLogger()` that automatically includes execution metadata in log entries and suppresses duplicate logs during replay. + +### Basic Usage + +```java +@Override +protected OrderResult handleRequest(Order order, DurableContext ctx) { + ctx.getLogger().info("Processing order: {}", order.getId()); + + var result = ctx.step("validate", String.class, stepCtx -> { + stepCtx.getLogger().debug("Validating order details"); + return validate(order); + }); + + ctx.getLogger().info("Order processed successfully"); + return new OrderResult(result); +} +``` + +### Log Output + +Logs include execution context via MDC (works with any SLF4J-compatible logging framework): + +```json +{ + "timestamp": "2024-01-15T10:30:00.000Z", + "level": "INFO", + "message": "Processing order: ORD-123", + "durableExecutionArn": "arn:aws:lambda:us-east-1:123456789:function:order-processor:exec-abc123", + "requestId": "a1b2c3d4-5678-90ab-cdef-example12345", + "operationId": "1", + "operationName": "validate" +} +``` + +### Replay Behavior + +By default, logs are suppressed during replay to avoid duplicates: + +``` +First Invocation: + [INFO] Processing order: ORD-123 ✓ Logged + [DEBUG] Validating order details ✓ Logged + +Replay (after wait): + [INFO] Processing order: ORD-123 ✗ Suppressed (already logged) + [DEBUG] Validating order details ✗ Suppressed + [INFO] Continuing after wait ✓ Logged (new code path) +``` + +To log during replay (e.g., for debugging): + +```java +@Override +protected DurableConfig createConfiguration() { + return DurableConfig.builder() + .withLoggerConfig(LoggerConfig.withReplayLogging()) + .build(); +} +``` \ No newline at end of file diff --git a/docs/advanced/testing.md b/docs/advanced/testing.md new file mode 100644 index 0000000..4ee1f58 --- /dev/null +++ b/docs/advanced/testing.md @@ -0,0 +1,87 @@ +## Testing + +The SDK includes testing utilities for both local development and cloud-based integration testing. + +### Installation + +```xml + + software.amazon.lambda.durable + aws-durable-execution-sdk-java-testing + VERSION + test + +``` + +### Local Testing + +```java +@Test +void testOrderProcessing() { + var handler = new OrderProcessor(); + var runner = LocalDurableTestRunner.create(Order.class, handler); + + var result = runner.runUntilComplete(new Order("order-123", items)); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertNotNull(result.getResult(OrderResult.class).getTrackingNumber()); +} +``` + +You can also pass a lambda directly instead of a handler instance: + +```java +var runner = LocalDurableTestRunner.create(Order.class, (order, ctx) -> { + var result = ctx.step("process", String.class, () -> "done"); + return new OrderResult(order.getId(), result); +}); +``` + +### Inspecting Operations + +```java +var result = runner.runUntilComplete(input); + +// Verify specific step completed +var paymentOp = result.getOperation("process-payment"); +assertNotNull(paymentOp); +assertEquals(OperationStatus.SUCCEEDED, paymentOp.getStatus()); + +// Get step result +var paymentResult = paymentOp.getStepResult(Payment.class); +assertNotNull(paymentResult.getTransactionId()); + +// Inspect all operations +List succeeded = result.getSucceededOperations(); +List failed = result.getFailedOperations(); +``` + +### Controlling Time in Tests + +By default, `runUntilComplete()` skips wait durations. For testing time-dependent logic, disable this: + +```java +var runner = LocalDurableTestRunner.create(Order.class, handler) + .withSkipTime(false); // Don't auto-advance time + +var result = runner.run(input); +assertEquals(ExecutionStatus.PENDING, result.getStatus()); // Blocked on wait + +runner.advanceTime(); // Manually advance past the wait +result = runner.run(input); +assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); +``` + +### Cloud Testing + +Test against deployed Lambda functions: + +```java +var runner = CloudDurableTestRunner.create( + "arn:aws:lambda:us-east-1:123456789012:function:order-processor:$LATEST", + Order.class, + OrderResult.class); + +var result = runner.run(new Order("order-123", items)); +assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); +``` \ No newline at end of file diff --git a/docs/core/callbacks.md b/docs/core/callbacks.md new file mode 100644 index 0000000..c38a22a --- /dev/null +++ b/docs/core/callbacks.md @@ -0,0 +1,56 @@ +## createCallback() – Wait for External Events + +Callbacks suspend execution until an external system sends a result. Use this for human approvals, webhooks, or any event-driven workflow. + +```java +// Create a callback and get the ID to share with external systems +DurableCallbackFuture callback = ctx.createCallback("approval", String.class); + +// Send the callback ID to an external system within a step +ctx.step("send-notification", String.class, () -> { + notificationService.sendApprovalRequest(callback.callbackId(), requestDetails); + return "notification-sent"; +}); + +// Suspend until the external system calls back with a result +String approvalResult = callback.get(); +``` + +The external system completes the callback by calling the Lambda Durable Functions API with the callback ID and result payload. + +#### Callback Configuration + +Configure timeouts and serialization to handle cases where callbacks are never completed or need custom deserialization: + +```java +var config = CallbackConfig.builder() + .timeout(Duration.ofHours(24)) // Max time to wait for callback + .heartbeatTimeout(Duration.ofHours(1)) // Max time between heartbeats + .serDes(new CustomSerDes()) // Custom serialization/deserialization + .build(); + +var callback = ctx.createCallback("approval", String.class, config); +``` + +| Option | Description | +|--------|-------------| +| `timeout()` | Maximum duration to wait for the callback to complete | +| `heartbeatTimeout()` | Maximum duration between heartbeat signals from the external system | +| `serDes()` | Custom SerDes for deserializing callback results (e.g., encryption, custom formats) | + +#### Callback Exceptions + +| Exception | When Thrown | +|-----------|-------------| +| `CallbackTimeoutException` | Callback exceeded its timeout duration | +| `CallbackFailedException` | External system sent an error response | + +```java +try { + var result = callback.get(); +} catch (CallbackTimeoutException e) { + // Callback timed out - implement fallback logic +} catch (CallbackFailedException e) { + // External system reported an error +} +``` \ No newline at end of file diff --git a/docs/core/child-contexts.md b/docs/core/child-contexts.md new file mode 100644 index 0000000..6a58540 --- /dev/null +++ b/docs/core/child-contexts.md @@ -0,0 +1,23 @@ +## runInChildContext() – Isolated Execution Contexts + +Child contexts run an isolated stream of work with their own operation counter and checkpoint log. They support the full range of durable operations — `step`, `wait`, `invoke`, `createCallback`, and nested child contexts. + +```java +// Sync: blocks until the child context completes +var result = ctx.runInChildContext("validate-order", String.class, child -> { + var data = child.step("fetch", String.class, () -> fetchData()); + child.wait(Duration.ofMinutes(5)); + return child.step("validate", String.class, () -> validate(data)); +}); + +// Async: returns a DurableFuture for concurrent execution +var futureA = ctx.runInChildContextAsync("branch-a", String.class, child -> { + return child.step("work-a", String.class, () -> doWorkA()); +}); +var futureB = ctx.runInChildContextAsync("branch-b", String.class, child -> { + return child.step("work-b", String.class, () -> doWorkB()); +}); + +// Wait for all child contexts to complete +var results = DurableFuture.allOf(futureA, futureB); +``` \ No newline at end of file diff --git a/docs/core/invoke.md b/docs/core/invoke.md new file mode 100644 index 0000000..73dfe3d --- /dev/null +++ b/docs/core/invoke.md @@ -0,0 +1,18 @@ +## invoke() - Invoke another Lambda function + + +```java +// Basic invoke +var result = ctx.invoke("invoke-function", + "function-name", + "\"payload\"", + Result.class, + InvokeConfig.builder() + .payloadSerDes(...) // payload serializer + .resultSerDes(...) // result deserializer + .timeout(Duration.of(...)) // wait timeout + .tenantId(...) // Lambda tenantId + .build() + ); + +``` \ No newline at end of file diff --git a/docs/core/steps.md b/docs/core/steps.md new file mode 100644 index 0000000..9c8af1b --- /dev/null +++ b/docs/core/steps.md @@ -0,0 +1,122 @@ +## step() – Execute with Checkpointing + +Steps execute your code and checkpoint the result. On replay, results from completed checkpoints are returned without re-execution. + +```java +// Basic step (blocks until complete) +var result = ctx.step("fetch-user", User.class, () -> userService.getUser(userId)); + +// Step with custom configuration (retries, semantics, serialization) +var result = ctx.step("call-api", Response.class, + () -> externalApi.call(request), + StepConfig.builder() + .retryStrategy(...) + .semantics(...) + .build()); +``` + +See [Step Configuration](#step-configuration) for retry strategies, delivery semantics, and per-step serialization options. + +### stepAsync() and DurableFuture – Concurrent Operations + +`stepAsync()` starts a step in the background and returns a `DurableFuture`. This enables concurrent execution patterns. + +```java +// Start multiple operations concurrently +DurableFuture userFuture = ctx.stepAsync("fetch-user", User.class, + () -> userService.getUser(userId)); +DurableFuture> ordersFuture = ctx.stepAsync("fetch-orders", + new TypeToken>() {}, () -> orderService.getOrders(userId)); + +// Both operations run concurrently +// Block and get results when needed +User user = userFuture.get(); +List orders = ordersFuture.get(); +``` + +## Step Configuration + +Configure step behavior with `StepConfig`: + +```java +ctx.step("my-step", Result.class, () -> doWork(), + StepConfig.builder() + .retryStrategy(...) // How to handle failures + .semantics(...) // At-least-once vs at-most-once + .serDes(...) // Custom serialization + .build()); +``` + +### Retry Strategies + +Configure how steps handle transient failures: + +```java +// No retry - fail immediately (default) +var noRetries = StepConfig.builder().retryStrategy(RetryStrategies.Presets.NO_RETRY).build() + +// Exponential backoff with jitter +var customRetries = StepConfig.builder() + .retryStrategy(RetryStrategies.exponentialBackoff( + 5, // max attempts + Duration.ofSeconds(2), // initial delay + Duration.ofSeconds(30), // max delay + 2.0, // backoff multiplier + JitterStrategy.FULL)) // randomize delays + .build() +``` + +### Step-Retry Semantics + +Control how steps behave when interrupted mid-execution: + +| Semantic | Behavior | Use Case | +|----------|----------|----------| +| `AT_LEAST_ONCE_PER_RETRY` (default) | Re-executes step if interrupted before completion | Idempotent operations (database upserts, API calls with idempotency keys) | +| `AT_MOST_ONCE_PER_RETRY` | Never re-executes; throws `StepInterruptedException` if interrupted | Non-idempotent operations (sending emails, charging payments) | + +```java +// Default: at-least-once per retry (step may re-run if interrupted) +var result = ctx.step("idempotent-update", Result.class, + () -> database.upsert(record)); + +// At-most-once per retry +var result = ctx.step("send-email", Result.class, + () -> emailService.send(notification), + StepConfig.builder() + .semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY) + .build()); +``` + +**Important**: These semantics apply *per retry attempt*, not per overall execution: + +- **AT_LEAST_ONCE_PER_RETRY**: The step executes at least once per retry. If the step succeeds but checkpointing fails (e.g., sandbox crash), the step re-executes on replay. +- **AT_MOST_ONCE_PER_RETRY**: A checkpoint is created before execution. If failure occurs after checkpoint but before completion, the step is skipped on replay and `StepInterruptedException` is thrown. + +To achieve step-level at-most-once semantics, combine with a no-retry strategy: + +```java +// True at-most-once: step executes at most once, ever +var result = ctx.step("charge-payment", Result.class, + () -> paymentService.charge(amount), + StepConfig.builder() + .semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY) + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build()); +``` + +Without this, a step using `AT_MOST_ONCE_PER_RETRY` with retries enabled could still execute multiple times across different retry attempts. + +### Generic Types + +When a step returns a parameterized type like `List`, use `TypeToken` to preserve the type information: + +```java +var users = ctx.step("fetch-users", new TypeToken>() {}, + () -> userService.getAllUsers()); + +var orderMap = ctx.step("fetch-orders", new TypeToken>() {}, + () -> orderService.getOrdersByCustomer()); +``` + +This is needed for the SDK to deserialize a checkpointed result and get the exact type to reconstruct. See [TypeToken and Type Erasure](docs/internal-design.md#typetoken-and-type-erasure) for technical details. \ No newline at end of file diff --git a/docs/core/wait.md b/docs/core/wait.md new file mode 100644 index 0000000..81f31d6 --- /dev/null +++ b/docs/core/wait.md @@ -0,0 +1,11 @@ +## wait() – Suspend Without Cost + +Waits suspend the function and resume after the specified duration. You're not charged during suspension. + +```java +// Wait 30 minutes +ctx.wait(Duration.ofMinutes(30)); + +// Named wait (useful for debugging) +ctx.wait("cooling-off-period", Duration.ofDays(7)); +``` \ No newline at end of file