From 23a1ea93bcd7edc76a716d3f86748d0d243d715e Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Fri, 27 Feb 2026 23:01:14 -0800 Subject: [PATCH 1/2] lazy initialize loggers --- .../amazon/lambda/durable/BaseContext.java | 14 ++- .../amazon/lambda/durable/DurableContext.java | 47 +++++--- .../lambda/durable/DurableExecutor.java | 81 +++++++------- .../amazon/lambda/durable/StepContext.java | 36 +++++-- .../durable/execution/ExecutionManager.java | 21 ++-- .../lambda/durable/logging/DurableLogger.java | 65 +++++------ .../operation/ChildContextOperation.java | 56 +++++----- .../durable/operation/StepOperation.java | 101 ++++++++++-------- .../lambda/durable/DurableContextTest.java | 45 +++++--- .../lambda/durable/ReplayValidationTest.java | 37 ++++--- .../amazon/lambda/durable/TestContext.java | 8 +- .../execution/ExecutionManagerTest.java | 11 +- .../durable/logging/DurableLoggerTest.java | 36 ++++--- .../operation/CallbackOperationTest.java | 6 +- 14 files changed, 327 insertions(+), 237 deletions(-) diff --git a/sdk/src/main/java/software/amazon/lambda/durable/BaseContext.java b/sdk/src/main/java/software/amazon/lambda/durable/BaseContext.java index e38e96d..cd04bcb 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/BaseContext.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/BaseContext.java @@ -6,21 +6,27 @@ import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.logging.DurableLogger; -public abstract class BaseContext { +public abstract class BaseContext implements AutoCloseable { protected final ExecutionManager executionManager; private final DurableConfig durableConfig; private final Context lambdaContext; private final ExecutionContext executionContext; private final String contextId; + private final String contextName; private boolean isReplaying; /** Creates a new BaseContext instance. */ protected BaseContext( - ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext, String contextId) { + ExecutionManager executionManager, + DurableConfig durableConfig, + Context lambdaContext, + String contextId, + String contextName) { this.executionManager = executionManager; this.durableConfig = durableConfig; this.lambdaContext = lambdaContext; this.contextId = contextId; + this.contextName = contextName; this.executionContext = new ExecutionContext(executionManager.getDurableExecutionArn()); this.isReplaying = executionManager.hasOperationsForContext(contextId); } @@ -71,6 +77,10 @@ public String getContextId() { return contextId; } + public String getContextName() { + return contextName; + } + public ExecutionManager getExecutionManager() { return executionManager; } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java index 3743633..42a40b7 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java @@ -20,20 +20,17 @@ public class DurableContext extends BaseContext { private final AtomicInteger operationCounter; - private final DurableLogger logger; + private volatile DurableLogger logger; /** Shared initialization — sets all fields. */ private DurableContext( - ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext, String contextId) { - super(executionManager, durableConfig, lambdaContext, contextId); + ExecutionManager executionManager, + DurableConfig durableConfig, + Context lambdaContext, + String contextId, + String contextName) { + super(executionManager, durableConfig, lambdaContext, contextId, contextName); this.operationCounter = new AtomicInteger(0); - - var requestId = lambdaContext != null ? lambdaContext.getAwsRequestId() : null; - this.logger = new DurableLogger( - LoggerFactory.getLogger(DurableContext.class), - executionManager, - requestId, - durableConfig.getLoggerConfig().suppressReplayLogs()); } /** @@ -48,7 +45,7 @@ private DurableContext( */ public static DurableContext createRootContext( ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext) { - return new DurableContext(executionManager, durableConfig, lambdaContext, null); + return new DurableContext(executionManager, durableConfig, lambdaContext, null, null); } /** @@ -57,8 +54,9 @@ public static DurableContext createRootContext( * @param childContextId the child context's ID (the CONTEXT operation's operation ID) * @return a new DurableContext for the child context */ - public DurableContext createChildContext(String childContextId) { - return new DurableContext(executionManager, getDurableConfig(), getLambdaContext(), childContextId); + public DurableContext createChildContext(String childContextId, String childContextName) { + return new DurableContext( + executionManager, getDurableConfig(), getLambdaContext(), childContextId, childContextName); } /** @@ -67,8 +65,9 @@ public DurableContext createChildContext(String childContextId) { * @param stepOperationId the ID of the step operation (used for thread registration) * @return a new StepContext instance */ - public StepContext createStepContext(String stepOperationId) { - return new StepContext(executionManager, getDurableConfig(), getLambdaContext(), stepOperationId); + public StepContext createStepContext(String stepOperationId, String stepOperationName, int attempt) { + return new StepContext( + executionManager, getDurableConfig(), getLambdaContext(), stepOperationId, stepOperationName, attempt); } // ========== step methods ========== @@ -305,9 +304,27 @@ public DurableFuture runInChildContextAsync( * @return the durable logger */ public DurableLogger getLogger() { + // lazy initialize logger + if (logger == null) { + synchronized (this) { + if (logger == null) { + logger = new DurableLogger(LoggerFactory.getLogger(StepContext.class), this); + } + } + } return logger; } + /** + * Clears the logger's thread properties. Called during context destruction to prevent memory leaks and ensure clean + * state for subsequent executions. + */ + public void close() { + if (logger != null) { + logger.clearThreadProperties(); + } + } + /** * Get the next operationId. For root contexts, returns sequential IDs like "1", "2", "3". For child contexts, * prefixes with the contextId to ensure global uniqueness, e.g. "1-1", "1-2" for operations inside child context diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableExecutor.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableExecutor.java index fe01d6d..d52ab07 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableExecutor.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableExecutor.java @@ -38,52 +38,45 @@ public static DurableExecutionOutput execute( Class inputType, BiFunction handler, DurableConfig config) { - var executionManager = new ExecutionManager( - input.durableExecutionArn(), input.checkpointToken(), input.initialExecutionState(), config); - - var handlerFuture = CompletableFuture.supplyAsync( - () -> { - var userInput = - extractUserInput(executionManager.getExecutionOperation(), config.getSerDes(), inputType); - // Create context in the executor thread so it detects the correct thread name - var context = DurableContext.createRootContext(executionManager, config, lambdaContext); - executionManager.registerActiveThread(null); - executionManager.setCurrentThreadContext(new ThreadContext(null, ThreadType.CONTEXT)); - return handler.apply(userInput, context); - }, - config.getExecutorService()); // Get executor from config for running user code - - // Execute the handlerFuture in ExecutionManager. If it completes successfully, the output of user function - // will be returned. Otherwise, it will complete exceptionally with a SuspendExecutionException or a failure. - return executionManager - .runUntilCompleteOrSuspend(handlerFuture) - .handle((result, ex) -> { - if (ex != null) { - // an exception thrown from handlerFuture or suspension/termination occurred - Throwable cause = ExceptionHelper.unwrapCompletableFuture(ex); - if (cause instanceof SuspendExecutionException) { - return DurableExecutionOutput.pending(); + try (var executionManager = new ExecutionManager(input, config)) { + executionManager.registerActiveThread(null); + var handlerFuture = CompletableFuture.supplyAsync( + () -> { + var userInput = extractUserInput( + executionManager.getExecutionOperation(), config.getSerDes(), inputType); + // use try-with-resources to clear logger properties + try (var context = DurableContext.createRootContext(executionManager, config, lambdaContext)) { + // Create context in the executor thread so it detects the correct thread name + executionManager.setCurrentThreadContext(new ThreadContext(null, ThreadType.CONTEXT)); + return handler.apply(userInput, context); } + }, + config.getExecutorService()); // Get executor from config for running user code + + // Execute the handlerFuture in ExecutionManager. If it completes successfully, the output of user function + // will be returned. Otherwise, it will complete exceptionally with a SuspendExecutionException or a + // failure. + return executionManager + .runUntilCompleteOrSuspend(handlerFuture) + .handle((result, ex) -> { + if (ex != null) { + // an exception thrown from handlerFuture or suspension/termination occurred + Throwable cause = ExceptionHelper.unwrapCompletableFuture(ex); + if (cause instanceof SuspendExecutionException) { + return DurableExecutionOutput.pending(); + } + + logger.debug("Execution failed: {}", cause.getMessage()); + return DurableExecutionOutput.failure(buildErrorObject(cause, config.getSerDes())); + } + // user handler complete successfully + var outputPayload = config.getSerDes().serialize(result); - logger.debug("Execution failed: {}", cause.getMessage()); - return DurableExecutionOutput.failure(buildErrorObject(cause, config.getSerDes())); - } - // user handler complete successfully - var outputPayload = config.getSerDes().serialize(result); - - logger.debug("Execution completed"); - return DurableExecutionOutput.success(handleLargePayload(executionManager, outputPayload)); - }) - .whenComplete((v, ex) -> { - // We shutdown the execution to make sure remaining checkpoint calls in the queue are drained - // We DO NOT shutdown the executor since it should stay warm for re-invokes against a warm Lambda - // runtime. - // For example, a re-invoke after a wait should re-use the same executor instance from - // DurableConfig. - // userExecutor.shutdown(); - executionManager.shutdown(); - }) - .join(); + logger.debug("Execution completed"); + return DurableExecutionOutput.success(handleLargePayload(executionManager, outputPayload)); + }) + .join(); + } } private static String handleLargePayload(ExecutionManager executionManager, String outputPayload) { diff --git a/sdk/src/main/java/software/amazon/lambda/durable/StepContext.java b/sdk/src/main/java/software/amazon/lambda/durable/StepContext.java index e631bf6..27d8d46 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/StepContext.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/StepContext.java @@ -8,7 +8,8 @@ import software.amazon.lambda.durable.logging.DurableLogger; public class StepContext extends BaseContext { - private final DurableLogger logger; + private volatile DurableLogger logger; + private final int attempt; /** * Creates a new StepContext instance for use in step operations. @@ -22,19 +23,36 @@ protected StepContext( ExecutionManager executionManager, DurableConfig durableConfig, Context lambdaContext, - String stepOperationId) { - super(executionManager, durableConfig, lambdaContext, stepOperationId); + String stepOperationId, + String stepOperationName, + int attempt) { + super(executionManager, durableConfig, lambdaContext, stepOperationId, stepOperationName); + this.attempt = attempt; + } - var requestId = lambdaContext != null ? lambdaContext.getAwsRequestId() : null; - this.logger = new DurableLogger( - LoggerFactory.getLogger(StepContext.class), - executionManager, - requestId, - durableConfig.getLoggerConfig().suppressReplayLogs()); + /** @return the current attempt */ + public int getAttempt() { + return attempt; } @Override public DurableLogger getLogger() { + // lazy initialize logger + if (logger == null) { + synchronized (this) { + if (logger == null) { + logger = new DurableLogger(LoggerFactory.getLogger(StepContext.class), this); + } + } + } return logger; } + + /** Closes the logger for this context. */ + @Override + public void close() { + if (logger != null) { + logger.clearThreadProperties(); + } + } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java b/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java index ccc77e9..3a7bfed 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java @@ -22,6 +22,7 @@ import software.amazon.awssdk.services.lambda.model.OperationUpdate; import software.amazon.lambda.durable.DurableConfig; import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException; +import software.amazon.lambda.durable.model.DurableExecutionInput; import software.amazon.lambda.durable.operation.BaseDurableOperation; /** @@ -45,7 +46,7 @@ * * @see InternalExecutor */ -public class ExecutionManager { +public class ExecutionManager implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(ExecutionManager.class); @@ -65,25 +66,21 @@ public class ExecutionManager { // ===== Checkpoint Batching ===== private final CheckpointBatcher checkpointBatcher; - public ExecutionManager( - String durableExecutionArn, - String checkpointToken, - CheckpointUpdatedExecutionState initialExecutionState, - DurableConfig config) { - this.durableExecutionArn = durableExecutionArn; + public ExecutionManager(DurableExecutionInput input, DurableConfig config) { + this.durableExecutionArn = input.durableExecutionArn(); // Create checkpoint batcher for internal coordination this.checkpointBatcher = - new CheckpointBatcher(config, durableExecutionArn, checkpointToken, this::onCheckpointComplete); + new CheckpointBatcher(config, durableExecutionArn, input.checkpointToken(), this::onCheckpointComplete); - this.operationStorage = checkpointBatcher.fetchAllPages(initialExecutionState).stream() + this.operationStorage = checkpointBatcher.fetchAllPages(input.initialExecutionState()).stream() .collect(Collectors.toConcurrentMap(Operation::id, op -> op)); // Start in REPLAY mode if we have more than just the initial EXECUTION operation this.executionMode = new AtomicReference<>(operationStorage.size() > 1 ? ExecutionMode.REPLAY : ExecutionMode.EXECUTION); - executionOp = findExecutionOp(initialExecutionState); + executionOp = findExecutionOp(input.initialExecutionState()); // Validate initial operation is an EXECUTION operation if (executionOp == null) { @@ -248,7 +245,9 @@ public CompletableFuture pollForOperationUpdates(String operationId, } // ===== Utilities ===== - public void shutdown() { + /** Shutdown the checkpoint batcher. */ + @Override + public void close() { checkpointBatcher.shutdown(); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java b/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java index 914cc04..95072d9 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java @@ -4,7 +4,9 @@ import org.slf4j.Logger; import org.slf4j.MDC; -import software.amazon.lambda.durable.execution.ExecutionManager; +import software.amazon.lambda.durable.BaseContext; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.StepContext; /** * Logger wrapper that adds durable execution context to log entries via MDC and optionally suppresses logs during @@ -14,51 +16,51 @@ public class DurableLogger { static final String MDC_EXECUTION_ARN = "durableExecutionArn"; static final String MDC_REQUEST_ID = "requestId"; static final String MDC_OPERATION_ID = "operationId"; + static final String MDC_CONTEXT_ID = "contextId"; static final String MDC_OPERATION_NAME = "operationName"; + static final String MDC_CONTEXT_NAME = "contextName"; static final String MDC_ATTEMPT = "attempt"; private final Logger delegate; - private final ExecutionManager executionManager; - private final String requestId; - private final boolean suppressReplayLogs; + private final BaseContext context; - public DurableLogger( - Logger delegate, ExecutionManager executionManager, String requestId, boolean suppressReplayLogs) { + public DurableLogger(Logger delegate, BaseContext context) { this.delegate = delegate; - this.executionManager = executionManager; - this.requestId = requestId; - this.suppressReplayLogs = suppressReplayLogs; + this.context = context; - // Set execution-level MDC for main thread - setExecutionContext(); - } + // execution arn + MDC.put(MDC_EXECUTION_ARN, context.getExecutionContext().getDurableExecutionArn()); - private void setExecutionContext() { - MDC.put(MDC_EXECUTION_ARN, executionManager.getDurableExecutionArn()); + // lambda request id + var requestId = + context.getLambdaContext() != null ? context.getLambdaContext().getAwsRequestId() : null; if (requestId != null) { MDC.put(MDC_REQUEST_ID, requestId); } - } - public void setOperationContext(String operationId, String operationName, Integer attempt) { - // Set execution-level MDC (needed for executor threads) - setExecutionContext(); - // Set operation-level MDC - if (operationId != null) { + if (context instanceof DurableContext) { + // context thread - context id + if (context.getContextId() != null) { + MDC.put(MDC_CONTEXT_ID, context.getContextId()); + } + if (context.getContextName() != null) { + MDC.put(MDC_CONTEXT_NAME, context.getContextName()); + } + } else { + // step context + var operationId = context.getContextId(); + // step context - step operation id MDC.put(MDC_OPERATION_ID, operationId); - } - if (operationName != null) { - MDC.put(MDC_OPERATION_NAME, operationName); - } - if (attempt != null) { - MDC.put(MDC_ATTEMPT, String.valueOf(attempt)); + // step context - step operation name + if (context.getContextName() != null) { + MDC.put(MDC_OPERATION_NAME, context.getContextName()); + } + MDC.put(MDC_ATTEMPT, String.valueOf(((StepContext) context).getAttempt())); } } - public void clearOperationContext() { - MDC.remove(MDC_OPERATION_ID); - MDC.remove(MDC_OPERATION_NAME); - MDC.remove(MDC_ATTEMPT); + public void clearThreadProperties() { + MDC.clear(); } public void trace(String format, Object... args) { @@ -86,7 +88,8 @@ public void error(String message, Throwable t) { } private boolean shouldSuppress() { - return suppressReplayLogs && executionManager.isReplaying(); + return context.getDurableConfig().getLoggerConfig().suppressReplayLogs() + && context.getExecutionManager().isReplaying(); } private void log(Runnable logAction) { diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java index 16b6a36..dfe94e8 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java @@ -5,6 +5,7 @@ import static software.amazon.lambda.durable.model.OperationSubType.RUN_IN_CHILD_CONTEXT; import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.function.Function; import software.amazon.awssdk.services.lambda.model.ContextOptions; @@ -97,32 +98,35 @@ private void executeChildContext() { // registerActiveThread is idempotent (no-op if already registered). registerActiveThread(contextId); - userExecutor.execute(() -> { - setCurrentThreadContext(new ThreadContext(contextId, ThreadType.CONTEXT)); - try { - var childContext = getContext().createChildContext(contextId); - - T result = function.apply(childContext); - - if (replayChildContext) { - // Replaying a SUCCEEDED child with replayChildren=true — skip checkpointing. - // Advance the phaser so get() doesn't block waiting for a checkpoint response. - this.reconstructedResult = result; - markAlreadyCompleted(); - return; - } - - checkpointSuccess(result); - } catch (Throwable e) { - handleChildContextFailure(e); - } finally { - try { - deregisterActiveThread(contextId); - } catch (SuspendExecutionException e) { - // Expected when this is the last active thread — suspension already signaled - } - } - }); + CompletableFuture.runAsync( + () -> { + setCurrentThreadContext(new ThreadContext(contextId, ThreadType.CONTEXT)); + // use a try-with-resources to clear logger properties + try (var childContext = getContext().createChildContext(contextId, getName())) { + try { + T result = function.apply(childContext); + + if (replayChildContext) { + // Replaying a SUCCEEDED child with replayChildren=true — skip checkpointing. + // Advance the phaser so get() doesn't block waiting for a checkpoint response. + this.reconstructedResult = result; + markAlreadyCompleted(); + return; + } + + checkpointSuccess(result); + } catch (Throwable e) { + handleChildContextFailure(e); + } finally { + try { + deregisterActiveThread(contextId); + } catch (SuspendExecutionException e) { + // Expected when this is the last active thread — suspension already signaled + } + } + } + }, + userExecutor); } private void checkpointSuccess(T result) { diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java index 6a3ffe4..24ce68f 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java @@ -94,57 +94,66 @@ private void executeStepLogic(int attempt) { registerActiveThread(stepThreadId); // Execute user code in customer-configured executor - userExecutor.execute(() -> { - StepContext stepContext = getContext().createStepContext(getOperationId()); - // Set thread local ThreadContext on the executor thread - setCurrentThreadContext(new ThreadContext(stepThreadId, ThreadType.STEP)); - // Set operation context for logging in this thread - stepContext.getLogger().setOperationContext(getOperationId(), getName(), attempt); - try { - // Check if we need to send START - var existing = getOperation(); - if (existing == null || existing.status() != OperationStatus.STARTED) { - var startUpdate = OperationUpdate.builder().action(OperationAction.START); - - if (config.semantics() == StepSemantics.AT_MOST_ONCE_PER_RETRY) { - // AT_MOST_ONCE: await START checkpoint before executing user code - sendOperationUpdate(startUpdate); - } else { - // AT_LEAST_ONCE: fire-and-forget START checkpoint - sendOperationUpdateAsync(startUpdate); + CompletableFuture.runAsync( + () -> { + // Set thread local ThreadContext on the executor thread + setCurrentThreadContext(new ThreadContext(stepThreadId, ThreadType.STEP)); + + // use a try-with-resources to clear logger properties + try (StepContext stepContext = + getContext().createStepContext(getOperationId(), getName(), attempt)) { + try { + // Check if we need to send START + var existing = getOperation(); + if (existing == null || existing.status() != OperationStatus.STARTED) { + var startUpdate = OperationUpdate.builder().action(OperationAction.START); + + if (config.semantics() == StepSemantics.AT_MOST_ONCE_PER_RETRY) { + // AT_MOST_ONCE: await START checkpoint before executing user code + sendOperationUpdate(startUpdate); + } else { + // AT_LEAST_ONCE: fire-and-forget START checkpoint + sendOperationUpdateAsync(startUpdate); + } + } + + // Execute the function + T result = function.apply(stepContext); + + // Send SUCCEED + var successUpdate = OperationUpdate.builder() + .action(OperationAction.SUCCEED) + .payload(serializeResult(result)); + + // sendOperationUpdate must be synchronous here. When waiting for the return of this call, + // the + // context + // threads waiting for the result of this step operation will be wakened up and registered. + sendOperationUpdate(successUpdate); + } catch (Throwable e) { + handleStepFailure(e, attempt); + } finally { + try { + deregisterActiveThread(stepThreadId); + } catch (SuspendExecutionException e) { + // Expected when this is the last active thread. Must catch here because: + // 1/ This runs in a worker thread detached from handlerFuture + // 2/ Uncaught exception would prevent stepAsync().get() from resume + // Suspension/Termination is already signaled via + // suspendExecutionFuture/terminateExecutionFuture + // before the throw. + } + } } - } - - // Execute the function - T result = function.apply(stepContext); - - // Send SUCCEED - var successUpdate = OperationUpdate.builder() - .action(OperationAction.SUCCEED) - .payload(serializeResult(result)); - - // sendOperationUpdate must be synchronous here. When waiting for the return of this call, the context - // threads waiting for the result of this step operation will be wakened up and registered. - sendOperationUpdate(successUpdate); - } catch (Throwable e) { - handleStepFailure(e, attempt); - } finally { - try { - deregisterActiveThread(stepThreadId); - } catch (SuspendExecutionException e) { - // Expected when this is the last active thread. Must catch here because: - // 1/ This runs in a worker thread detached from handlerFuture - // 2/ Uncaught exception would prevent stepAsync().get() from resume - // Suspension/Termination is already signaled via suspendExecutionFuture/terminateExecutionFuture - // before the throw. - } - stepContext.getLogger().clearOperationContext(); - } - }); + }, + userExecutor); } private void handleStepFailure(Throwable exception, int attempt) { exception = ExceptionHelper.unwrapCompletableFuture(exception); + if (exception instanceof SuspendExecutionException) { + ExceptionHelper.sneakyThrow(exception); + } if (exception instanceof UnrecoverableDurableExecutionException) { // terminate the execution and throw the exception if it's not recoverable terminateExecution((UnrecoverableDurableExecutionException) exception); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/DurableContextTest.java b/sdk/src/test/java/software/amazon/lambda/durable/DurableContextTest.java index 688b0ba..051647d 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/DurableContextTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/DurableContextTest.java @@ -13,14 +13,20 @@ import software.amazon.lambda.durable.execution.SuspendExecutionException; import software.amazon.lambda.durable.execution.ThreadContext; import software.amazon.lambda.durable.execution.ThreadType; +import software.amazon.lambda.durable.model.DurableExecutionInput; import software.amazon.lambda.durable.retry.RetryStrategies; class DurableContextTest { + private static final String EXECUTION_NAME = "349beff4-a89d-4bc8-a56f-af7a8af67a5f"; + private static final String INVOCATION_ID = "20dae574-53da-37a1-bfd5-b0e2e6ec715d"; private static final Operation EXECUTION_OP = Operation.builder() - .id("0") + .id(INVOCATION_ID) .type(OperationType.EXECUTION) .status(OperationStatus.STARTED) .build(); + private static final String OPERATION_ID1 = "1"; + private static final String OPERATION_ID2 = "2"; + private static final String OPERATION_ID3 = "3"; private DurableContext createTestContext() { return createTestContext(List.of()); @@ -33,10 +39,11 @@ private DurableContext createTestContext(List initialOperations) { var initialExecutionState = CheckpointUpdatedExecutionState.builder().operations(operations).build(); var executionManager = new ExecutionManager( - "arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/" - + "349beff4-a89d-4bc8-a56f-af7a8af67a5f/20dae574-53da-37a1-bfd5-b0e2e6ec715d", - "test-token", - initialExecutionState, + new DurableExecutionInput( + "arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/" + + EXECUTION_NAME + "/" + INVOCATION_ID, + "test-token", + initialExecutionState), DurableConfig.builder().withDurableExecutionClient(client).build()); var root = DurableContext.createRootContext( executionManager, DurableConfig.builder().build(), null); @@ -62,8 +69,8 @@ void testGetExecutionContext() { assertNotNull(executionContext); assertNotNull(executionContext.getDurableExecutionArn()); assertEquals( - "arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/" - + "349beff4-a89d-4bc8-a56f-af7a8af67a5f/20dae574-53da-37a1-bfd5-b0e2e6ec715d", + "arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/" + EXECUTION_NAME + "/" + + INVOCATION_ID, executionContext.getDurableExecutionArn()); } @@ -80,7 +87,7 @@ void testStepExecution() { void testStepReplay() { // Create context with existing operation var existingOp = Operation.builder() - .id("1") + .id(OPERATION_ID1) .status(OperationStatus.SUCCEEDED) .stepDetails(StepDetails.builder().result("\"Cached Result\"").build()) .build(); @@ -106,7 +113,7 @@ void testStepAsync() throws Exception { void testStepAsyncReplay() throws Exception { // Create context with existing operation var existingOp = Operation.builder() - .id("1") + .id(OPERATION_ID1) .status(OperationStatus.SUCCEEDED) .stepDetails( StepDetails.builder().result("\"Cached Async Result\"").build()) @@ -131,8 +138,10 @@ void testWait() { @Test void testWaitReplay() { // Create context with completed wait operation - var existingOp = - Operation.builder().id("1").status(OperationStatus.SUCCEEDED).build(); + var existingOp = Operation.builder() + .id(OPERATION_ID1) + .status(OperationStatus.SUCCEEDED) + .build(); var context = createTestContext(List.of(existingOp)); // Wait should complete immediately (no exception) @@ -168,17 +177,19 @@ void testCombinedSyncAsyncWait() throws Exception { void testCombinedReplay() throws Exception { // Create context with all operations completed var syncOp = Operation.builder() - .id("1") + .id(OPERATION_ID1) .status(OperationStatus.SUCCEEDED) .stepDetails(StepDetails.builder().result("\"Replayed Sync\"").build()) .build(); var asyncOp = Operation.builder() - .id("2") + .id(OPERATION_ID2) .status(OperationStatus.SUCCEEDED) .stepDetails(StepDetails.builder().result("100").build()) .build(); - var waitOp = - Operation.builder().id("3").status(OperationStatus.SUCCEEDED).build(); + var waitOp = Operation.builder() + .id(OPERATION_ID3) + .status(OperationStatus.SUCCEEDED) + .build(); var context = createTestContext(List.of(syncOp, asyncOp, waitOp)); // All operations should replay from cache @@ -229,7 +240,7 @@ void testStepWithTypeToken() { void testStepWithTypeTokenReplay() { // Create context with existing operation var existingOp = Operation.builder() - .id("1") + .id(OPERATION_ID1) .status(OperationStatus.SUCCEEDED) .stepDetails(StepDetails.builder() .result("[\"cached1\",\"cached2\"]") @@ -280,7 +291,7 @@ void testStepAsyncWithTypeToken() throws Exception { void testStepAsyncWithTypeTokenReplay() throws Exception { // Create context with existing operation var existingOp = Operation.builder() - .id("1") + .id(OPERATION_ID1) .status(OperationStatus.SUCCEEDED) .stepDetails(StepDetails.builder() .result("[\"async-cached1\",\"async-cached2\"]") diff --git a/sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java index 8f94dcd..0f86128 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java @@ -17,13 +17,20 @@ import software.amazon.awssdk.services.lambda.model.StepDetails; import software.amazon.lambda.durable.exception.NonDeterministicExecutionException; import software.amazon.lambda.durable.execution.ExecutionManager; +import software.amazon.lambda.durable.execution.ThreadContext; +import software.amazon.lambda.durable.execution.ThreadType; +import software.amazon.lambda.durable.model.DurableExecutionInput; class ReplayValidationTest { + public static final String EXECUTION_NAME = "exec-name"; + public static final String INVOCATION_ID = "invocation-id"; + public static final String OPERATION_ID1 = "1"; private DurableContext createTestContext(List initialOperations) { var client = TestUtils.createMockClient(); var executionOp = Operation.builder() - .id("0") + .id(INVOCATION_ID) + .name(EXECUTION_NAME) .type(OperationType.EXECUTION) .status(OperationStatus.STARTED) .build(); @@ -32,12 +39,14 @@ private DurableContext createTestContext(List initialOperations) { var initialExecutionState = CheckpointUpdatedExecutionState.builder().operations(operations).build(); var executionManager = new ExecutionManager( - "arn:aws:lambda:us-east-1:123456789012:function:test", - "test-token", - initialExecutionState, + new DurableExecutionInput( + "arn:aws:lambda:us-east-1:123456789012:function:test", "test-token", initialExecutionState), DurableConfig.builder().withDurableExecutionClient(client).build()); - return DurableContext.createRootContext( + var context = DurableContext.createRootContext( executionManager, DurableConfig.builder().build(), null); + executionManager.setCurrentThreadContext(new ThreadContext(INVOCATION_ID + "-execution", ThreadType.CONTEXT)); + + return context; } @Test @@ -53,7 +62,7 @@ void shouldPassValidationWhenNoCheckpointExists() { void shouldPassValidationWhenStepTypeAndNameMatch() { // Given: Existing STEP operation with matching name var existingOp = Operation.builder() - .id("1") + .id(OPERATION_ID1) .name("test") .type(OperationType.STEP) .status(OperationStatus.SUCCEEDED) @@ -70,7 +79,7 @@ void shouldPassValidationWhenStepTypeAndNameMatch() { void shouldPassValidationWhenWaitTypeMatches() { // Given: Existing WAIT operation var existingOp = Operation.builder() - .id("1") + .id(OPERATION_ID1) .type(OperationType.WAIT) .status(OperationStatus.SUCCEEDED) .build(); @@ -85,7 +94,7 @@ void shouldPassValidationWhenWaitTypeMatches() { void shouldThrowWhenOperationTypeMismatches() { // Given: Existing WAIT operation but current is STEP var existingOp = Operation.builder() - .id("1") + .id(OPERATION_ID1) .name("test") .type(OperationType.WAIT) .status(OperationStatus.SUCCEEDED) @@ -106,7 +115,7 @@ void shouldThrowWhenOperationTypeMismatches() { void shouldThrowWhenOperationNameMismatches() { // Given: Existing STEP operation with different name var existingOp = Operation.builder() - .id("1") + .id(OPERATION_ID1) .name("original") .type(OperationType.STEP) .status(OperationStatus.SUCCEEDED) @@ -128,7 +137,7 @@ void shouldThrowWhenOperationNameMismatches() { void shouldHandleNullNamesCorrectly() { // Given: Existing STEP operation with null name var existingOp = Operation.builder() - .id("1") + .id(OPERATION_ID1) .name(null) .type(OperationType.STEP) .status(OperationStatus.SUCCEEDED) @@ -145,7 +154,7 @@ void shouldHandleNullNamesCorrectly() { void shouldThrowWhenNameChangesFromNullToValue() { // Given: Existing STEP operation with null name var existingOp = Operation.builder() - .id("1") + .id(OPERATION_ID1) .name(null) .type(OperationType.STEP) .status(OperationStatus.SUCCEEDED) @@ -167,7 +176,7 @@ void shouldThrowWhenNameChangesFromNullToValue() { void shouldThrowWhenNameChangesFromValueToNull() { // Given: Existing STEP operation with a name var existingOp = Operation.builder() - .id("1") + .id(OPERATION_ID1) .name("existingName") .type(OperationType.STEP) .status(OperationStatus.SUCCEEDED) @@ -189,7 +198,7 @@ void shouldThrowWhenNameChangesFromValueToNull() { void shouldValidateStepAsyncOperations() { // Given: Existing WAIT operation but current is STEP (async) var existingOp = Operation.builder() - .id("1") + .id(OPERATION_ID1) .name("test") .type(OperationType.WAIT) .status(OperationStatus.SUCCEEDED) @@ -211,7 +220,7 @@ void shouldValidateStepAsyncOperations() { void shouldSkipValidationWhenOperationTypeIsNull() { // Given: Existing operation with null type (edge case) var existingOp = Operation.builder() - .id("1") + .id(OPERATION_ID1) .name("test") .type((OperationType) null) .status(OperationStatus.SUCCEEDED) diff --git a/sdk/src/test/java/software/amazon/lambda/durable/TestContext.java b/sdk/src/test/java/software/amazon/lambda/durable/TestContext.java index 5eb99c9..f1a9f21 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/TestContext.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/TestContext.java @@ -11,9 +11,15 @@ // A concrete implementation of Context. We aren't actually going to use most of this so it doesn't matter that it's // mostly hardcoded. public class TestContext implements Context { + private final String requestId; + + public TestContext(String requestId) { + this.requestId = requestId; + } + @Override public String getAwsRequestId() { - return "dcc80a71-efe9-4820-a289-1205adbdfd64"; + return requestId; } @Override diff --git a/sdk/src/test/java/software/amazon/lambda/durable/execution/ExecutionManagerTest.java b/sdk/src/test/java/software/amazon/lambda/durable/execution/ExecutionManagerTest.java index 412ce22..6f10d79 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/execution/ExecutionManagerTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/execution/ExecutionManagerTest.java @@ -17,6 +17,7 @@ import software.amazon.lambda.durable.DurableConfig; import software.amazon.lambda.durable.TestUtils; import software.amazon.lambda.durable.client.DurableExecutionClient; +import software.amazon.lambda.durable.model.DurableExecutionInput; class ExecutionManagerTest { private DurableExecutionClient client; @@ -26,9 +27,8 @@ private ExecutionManager createManager(List operations) { var initialState = CheckpointUpdatedExecutionState.builder().operations(operations).build(); return new ExecutionManager( - "arn:aws:lambda:us-east-1:123456789012:function:test", - "test-token", - initialState, + new DurableExecutionInput( + "arn:aws:lambda:us-east-1:123456789012:function:test", "test-token", initialState), DurableConfig.builder().withDurableExecutionClient(client).build()); } @@ -129,9 +129,8 @@ void emptyInitialState() { .nextMarker("marker") .build(); var executionManager = new ExecutionManager( - "arn:aws:lambda:us-east-1:123456789012:function:test", - "test-token", - initialState, + new DurableExecutionInput( + "arn:aws:lambda:us-east-1:123456789012:function:test", "test-token", initialState), DurableConfig.builder().withDurableExecutionClient(client).build()); assertNotNull(executionManager.getExecutionOperation()); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java b/sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java index 6c75b32..68835f8 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java @@ -9,6 +9,9 @@ import org.mockito.MockedStatic; import org.slf4j.Logger; import org.slf4j.MDC; +import software.amazon.lambda.durable.DurableConfig; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.TestContext; import software.amazon.lambda.durable.execution.ExecutionManager; class DurableLoggerTest { @@ -38,7 +41,16 @@ void setUp() { private DurableLogger createLogger(Mode mode, Suppression suppression) { when(mockExecutionManager.isReplaying()).thenReturn(mode == Mode.REPLAYING); - return new DurableLogger(mockLogger, mockExecutionManager, REQUEST_ID, suppression == Suppression.ENABLED); + return new DurableLogger(mockLogger, createDurableContext(REQUEST_ID, suppression)); + } + + private DurableContext createDurableContext(String requestId, Suppression suppression) { + return DurableContext.createRootContext( + mockExecutionManager, + DurableConfig.builder() + .withLoggerConfig(new LoggerConfig(suppression == Suppression.ENABLED)) + .build(), + new TestContext(requestId)); } @Test @@ -87,12 +99,14 @@ void setsExecutionMdcInConstructor() { } @Test - void setOperationContextSetsMdc() { + void setStepThreadPropertiesSetsMdc() { try (MockedStatic mdcMock = mockStatic(MDC.class)) { - var logger = createLogger(Mode.EXECUTING, Suppression.ENABLED); mdcMock.clearInvocations(); - - logger.setOperationContext("op-1", "validateOrder", 2); + when(mockExecutionManager.isReplaying()).thenReturn(false); + var logger = new DurableLogger( + mockLogger, + createDurableContext(REQUEST_ID, Suppression.ENABLED) + .createStepContext("op-1", "validateOrder", 2)); mdcMock.verify(() -> MDC.put(DurableLogger.MDC_OPERATION_ID, "op-1")); mdcMock.verify(() -> MDC.put(DurableLogger.MDC_OPERATION_NAME, "validateOrder")); @@ -101,23 +115,21 @@ void setOperationContextSetsMdc() { } @Test - void clearOperationContextRemovesMdc() { + void clearThreadPropertiesRemovesMdc() { try (MockedStatic mdcMock = mockStatic(MDC.class)) { var logger = createLogger(Mode.EXECUTING, Suppression.ENABLED); mdcMock.clearInvocations(); - logger.clearOperationContext(); + logger.clearThreadProperties(); - mdcMock.verify(() -> MDC.remove(DurableLogger.MDC_OPERATION_ID)); - mdcMock.verify(() -> MDC.remove(DurableLogger.MDC_OPERATION_NAME)); - mdcMock.verify(() -> MDC.remove(DurableLogger.MDC_ATTEMPT)); + mdcMock.verify(() -> MDC.clear()); } } @Test void replayModeTransitionAllowsSubsequentLogs() { when(mockExecutionManager.isReplaying()).thenReturn(true, false); - var logger = new DurableLogger(mockLogger, mockExecutionManager, REQUEST_ID, true); + var logger = new DurableLogger(mockLogger, createDurableContext(REQUEST_ID, Suppression.ENABLED)); // During replay - suppressed logger.info("suppressed"); @@ -153,7 +165,7 @@ void allLogLevelsDelegateCorrectly() { void handlesNullRequestId() { try (MockedStatic mdcMock = mockStatic(MDC.class)) { when(mockExecutionManager.isReplaying()).thenReturn(false); - new DurableLogger(mockLogger, mockExecutionManager, null, true); + new DurableLogger(mockLogger, createDurableContext(null, Suppression.DISABLED)); mdcMock.verify(() -> MDC.put(DurableLogger.MDC_EXECUTION_ARN, EXECUTION_ARN)); mdcMock.verify(() -> MDC.put(eq(DurableLogger.MDC_REQUEST_ID), anyString()), never()); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java index c0a6169..1854b5a 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java @@ -24,6 +24,7 @@ import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.execution.ThreadContext; import software.amazon.lambda.durable.execution.ThreadType; +import software.amazon.lambda.durable.model.DurableExecutionInput; import software.amazon.lambda.durable.serde.JacksonSerDes; import software.amazon.lambda.durable.serde.SerDes; @@ -86,9 +87,8 @@ private ExecutionManager createExecutionManager(List initialOperation var initialState = CheckpointUpdatedExecutionState.builder().operations(operations).build(); var executionManager = new ExecutionManager( - "arn:aws:lambda:us-east-1:123456789012:function:test", - "test-token", - initialState, + new DurableExecutionInput( + "arn:aws:lambda:us-east-1:123456789012:function:test", "test-token", initialState), DurableConfig.builder().withDurableExecutionClient(client).build()); executionManager.setCurrentThreadContext(new ThreadContext("Root", ThreadType.CONTEXT)); return executionManager; From 0d1408b0a37ea288301dea5502834e155e7236f0 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Fri, 27 Feb 2026 23:18:12 -0800 Subject: [PATCH 2/2] rename close method --- .../java/software/amazon/lambda/durable/DurableContext.java | 4 ++-- .../main/java/software/amazon/lambda/durable/StepContext.java | 2 +- .../software/amazon/lambda/durable/logging/DurableLogger.java | 2 +- .../amazon/lambda/durable/logging/DurableLoggerTest.java | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java index 42a40b7..631fd52 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java @@ -308,7 +308,7 @@ public DurableLogger getLogger() { if (logger == null) { synchronized (this) { if (logger == null) { - logger = new DurableLogger(LoggerFactory.getLogger(StepContext.class), this); + logger = new DurableLogger(LoggerFactory.getLogger(DurableContext.class), this); } } } @@ -321,7 +321,7 @@ public DurableLogger getLogger() { */ public void close() { if (logger != null) { - logger.clearThreadProperties(); + logger.close(); } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/StepContext.java b/sdk/src/main/java/software/amazon/lambda/durable/StepContext.java index 27d8d46..8437e57 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/StepContext.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/StepContext.java @@ -52,7 +52,7 @@ public DurableLogger getLogger() { @Override public void close() { if (logger != null) { - logger.clearThreadProperties(); + logger.close(); } } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java b/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java index 95072d9..cf735ea 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java @@ -59,7 +59,7 @@ public DurableLogger(Logger delegate, BaseContext context) { } } - public void clearThreadProperties() { + public void close() { MDC.clear(); } diff --git a/sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java b/sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java index 68835f8..5847827 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/logging/DurableLoggerTest.java @@ -120,7 +120,7 @@ void clearThreadPropertiesRemovesMdc() { var logger = createLogger(Mode.EXECUTING, Suppression.ENABLED); mdcMock.clearInvocations(); - logger.clearThreadProperties(); + logger.close(); mdcMock.verify(() -> MDC.clear()); }