From 3ebd6b878eebd7f751fd8c0e92c37eabdbd67f92 Mon Sep 17 00:00:00 2001 From: DamienBates Date: Mon, 29 Apr 2024 14:47:47 +1000 Subject: [PATCH 1/2] handler concurrency --- .../handler-concurrency.integration.ts | 155 ++++++++++++++++++ .../bus-core/src/workflow/test/run-task.ts | 4 +- .../bus-core/src/workflow/test/task-ran.ts | 4 +- .../src/workflow/test/test-command.ts | 4 +- 4 files changed, 164 insertions(+), 3 deletions(-) create mode 100644 packages/bus-core/src/workflow/handler-concurrency.integration.ts diff --git a/packages/bus-core/src/workflow/handler-concurrency.integration.ts b/packages/bus-core/src/workflow/handler-concurrency.integration.ts new file mode 100644 index 00000000..491fbb1f --- /dev/null +++ b/packages/bus-core/src/workflow/handler-concurrency.integration.ts @@ -0,0 +1,155 @@ +import { It, Mock, Times } from 'typemoq' +import { Handler, Workflow, WorkflowMapper, WorkflowState } from '../' +import { Bus, BusInstance } from '../service-bus' +import { ClassConstructor, sleep } from '../util' +import { InMemoryPersistence } from './persistence' +import { FinalTask, RunTask, TaskRan, TestCommand } from './test' +import { Command, MessageAttributes } from '@node-ts/bus-messages' +import * as uuid from 'uuid' + +jest.setTimeout(10_000) + +class HandlerTestWorkflowState extends WorkflowState { + static NAME = 'HandlerTestWorkflowState' + $name = HandlerTestWorkflowState.NAME + + property1: string + eventValue: string + listIds: number[] +} + +/** + * This intention of this test is to illustrate that no collisions occur when a singular workflow + * batch sends many commands to be processed concurrently. In particular, when a command fails to + * update state and an error occurs, the failing message that becomes visible again should not collide + * nor spawn further commands that may lead to the repeat processing of commands. + */ +describe('Handler Concurrency', () => { + const completeCallback = + Mock.ofType<(workflowId: string, correlationId: string) => void>() + + class TestWorkflow extends Workflow { + listIds: number[] + static step2Counter = 0 + + constructor(private bus: BusInstance) { + super() + } + + configureWorkflow( + mapper: WorkflowMapper + ): void { + mapper + .withState(HandlerTestWorkflowState) + .startedBy(TestCommand, 'step1') + .when(TaskRan, 'step2', { + lookup: message => message.value, + mapsTo: 'property1' + }) + .when(FinalTask, 'step3') // Maps on workflow id + } + + async step1({ property1, listIds }: TestCommand, _: any) { + const [firstList, ...remainingListIds] = listIds! + await this.bus.send(new RunTask(property1!, firstList)) + + // Batch send 10 commands (10 list of IDs in state) + Promise.all([ + listIds!.map(async listId => { + await this.bus.send(new RunTask(property1!, listId)) + }) + ]) + + return { property1, listIds: remainingListIds } + } + + async step2({ value }: TaskRan, state: HandlerTestWorkflowState) { + TestWorkflow.step2Counter++ + + if (state.listIds.length > 0) { + const [nextList, ...remainingListIds] = state.listIds + await this.bus.send(new RunTask(value, nextList)) + return { property1: value, listIds: remainingListIds } + } else { + return this.bus.send(new FinalTask()) + } + } + + async step3( + _: FinalTask, + __: WorkflowState, + { + correlationId, + stickyAttributes: { workflowId } + }: MessageAttributes<{}, { workflowId: string }> + ) { + completeCallback.object(workflowId, correlationId!) + return this.completeWorkflow() + } + } + + class RunTaskHandler implements Handler { + messageType = RunTask + static retryCount = 0 + + constructor(private bus: BusInstance) {} + + async handle(message: RunTask) { + console.log(message.listId) + + if (RunTaskHandler.retryCount < 3) { + RunTaskHandler.retryCount++ + throw new Error('Test error') + } + + await this.bus.send(new TaskRan(message.value, message.listId)) + } + } + + const CONSUME_TIMEOUT = 5_000 + let bus: BusInstance + const inMemoryPersistence = new InMemoryPersistence() + + beforeAll(async () => { + bus = Bus.configure() + .withPersistence(inMemoryPersistence) + .withContainer({ + get(ctor: ClassConstructor) { + return new ctor(bus) + } + }) + .withWorkflow(TestWorkflow) + .withHandler(RunTaskHandler) + .withConcurrency(10) + .build() + + await bus.initialize() + await bus.start() + + // List of IDs to simulate a batch of commands sent by the first handler + const ids = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + const correlationId = uuid.v4() + + await bus.send(new TestCommand(uuid.v4(), ids), { correlationId }) + await sleep(CONSUME_TIMEOUT) + }) + + afterAll(async () => { + await bus.dispose() + }) + + describe('when multiple messages are concurrently handled', () => { + it('should not spawn more messages', () => { + completeCallback.verify( + c => + c( + It.is(workflowId => !!workflowId), + It.isAny() + ), + Times.exactly(1) // Workflow should only be called once + ) + + expect(TestWorkflow.step2Counter).toEqual(10) + }) + }) +}) diff --git a/packages/bus-core/src/workflow/test/run-task.ts b/packages/bus-core/src/workflow/test/run-task.ts index 457f3b1a..38b439f1 100644 --- a/packages/bus-core/src/workflow/test/run-task.ts +++ b/packages/bus-core/src/workflow/test/run-task.ts @@ -4,8 +4,10 @@ export class RunTask extends Command { static NAME = '@node-ts/bus-core/run-task' $name = RunTask.NAME $version = 0 + listId?: number - constructor(readonly value: string) { + constructor(readonly value: string, listId?: number) { super() + this.listId = listId } } diff --git a/packages/bus-core/src/workflow/test/task-ran.ts b/packages/bus-core/src/workflow/test/task-ran.ts index 24cea694..6b75fe45 100644 --- a/packages/bus-core/src/workflow/test/task-ran.ts +++ b/packages/bus-core/src/workflow/test/task-ran.ts @@ -4,8 +4,10 @@ export class TaskRan extends Event { static NAME = '@node-ts/bus-core/task-ran' $name = TaskRan.NAME $version = 0 + listIdCompleted: number - constructor(readonly value: string) { + constructor(readonly value: string, listIdCompleted?: number) { super() + listIdCompleted = this.listIdCompleted } } diff --git a/packages/bus-core/src/workflow/test/test-command.ts b/packages/bus-core/src/workflow/test/test-command.ts index a1c5d319..9ba90f50 100644 --- a/packages/bus-core/src/workflow/test/test-command.ts +++ b/packages/bus-core/src/workflow/test/test-command.ts @@ -4,8 +4,10 @@ export class TestCommand extends Command { static NAME = '@node-ts/bus-core/test-command' $name = TestCommand.NAME $version = 0 + listIds?: number[] - constructor(readonly property1: string | undefined) { + constructor(readonly property1: string | undefined, listIds?: number[]) { super() + this.listIds = listIds } } From 690b88e96a76f870e5175cdddb3bcc0047ba4ed8 Mon Sep 17 00:00:00 2001 From: DamienBates Date: Mon, 29 Apr 2024 14:51:47 +1000 Subject: [PATCH 2/2] rm log --- .../bus-core/src/workflow/handler-concurrency.integration.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/bus-core/src/workflow/handler-concurrency.integration.ts b/packages/bus-core/src/workflow/handler-concurrency.integration.ts index 491fbb1f..4e0a4d58 100644 --- a/packages/bus-core/src/workflow/handler-concurrency.integration.ts +++ b/packages/bus-core/src/workflow/handler-concurrency.integration.ts @@ -95,8 +95,6 @@ describe('Handler Concurrency', () => { constructor(private bus: BusInstance) {} async handle(message: RunTask) { - console.log(message.listId) - if (RunTaskHandler.retryCount < 3) { RunTaskHandler.retryCount++ throw new Error('Test error')