0%
September 30, 2024

Application Event Publisher for Monolithic DDD in Nodejs

DDD

express

nodejs

Preface

In the past we have discussed monolithic DDD in an article

Now in this article we will be creating an ApplicationEventPublisher that works almost the same by using eventemitter3.

Publisher Result

In the following id is generated when inserting the data:

As is to be seen when we define AppliactionEvent in the next section, event.result is null before dispatching the event. Therefore the presence of event.result indicates the ApplicationEventPublisher instance functions as expected.

ApplicationEvent and Application Event Publisher

import EventEmitter from 'eventemitter3';

export class ApplicationEvent<S extends string, T, R = T> {
    public result: R | null = null
    constructor(
        public type: S,
        public data: T,
    ) { }
}
  • Here result is used to retrieve the eventual result from the event handler in which the event object gets mutated.

  • For example when we have created a Student object, we may wish to get the instance back and return at least the id (generated by database) to the frontend.

9type InferEventType<T> = T extends ApplicationEvent<infer U, any> ? U : never;
10
11type EventProcessor<Event = any> = (event: Event) => void | Promise<void>;
12type OrderedEventHandler<Event = any> = { processor: EventProcessor<Event>, order: number }
13
14class ApplicationEventPublisher {
15    private eventEmitter: EventEmitter;
16    private handlers: Map<string, OrderedEventHandler[]> = new Map<string, OrderedEventHandler[]>()
17
18    constructor() {
19        this.eventEmitter = new EventEmitter();
20    }
21
22    async publishEvent<S extends string, T>(event: ApplicationEvent<S, T>): Promise<void> {
23        const handlers = this.handlers.get(event.type).sort((a, b) => a.order - b.order);
24        if (handlers) {
25            for (const handler of handlers) {
26                const processor = handler.processor as EventProcessor<typeof event>
27                await processor(event)
28            }
29        }
30    }
31
32    addEventHandler = <T>(evenType: InferEventType<T>, handler: EventProcessor<T>, order: number = 1): void => {
33        if (!this.handlers.has(evenType)) {
34            this.handlers.set(evenType, []);
35        }
36        this.handlers.get(evenType)!.push({ processor: handler, order });
37    }
38}
39
40export const applicationEventPublisher = new ApplicationEventPublisher();
41export default ApplicationEventPublisher;
EventHandlers
studentEventHandler.ts
import { db } from "../../db/database";
import { StudentCreatedEvent } from "../repository/StudentRepository";
import { applicationEventPublisher } from "../../src/util/ApplicationEventPublisher";
import { PackageDeletedEvent, PackageUpdatedEvent, StudentInfoUpdatedEvent } from "../aggregate/StudentDomain";

export const registerStudentEvents = () => {
    const on = applicationEventPublisher.addEventHandler

    on<StudentCreatedEvent>("StudentCreatedEvent", async (event) => {
        const student = event.data
        const student_ = await db.insertInto("Student").values(student).returningAll().executeTakeFirst()
        if (!student_) {
            throw new Error("Studnet insertion fails")
        }
        event.result = student_
    })

    on<StudentInfoUpdatedEvent>("StudentInfoUpdatedEvent", async (event) => {
        const { update } = event.data
        await db.updateTable("Student").set(update).where("Student.id", "=", update.id).execute();
    })

    on<PackageDeletedEvent>("PackageDeletedEvent", async (event) => {
        const { packageId } = event.data;
        await db.deleteFrom("Student_package").where("Student_package.id", "=", Number(packageId)).execute();
    })
}

Experiment

Before we Design Aggregates from Table of Relations

Let's consider the following relations:

  • A student has many packages, each package has many classes.

  • Each of the three entities has very rich domain behaviours.

  • Hierarchically packages and classes should be inside of the StudentDomain aggregate by what we learn about what consitutes an aggregate. But recall from

    What makes an Aggregate (DDD)? Hint: it's NOT hierarchy & relationships

    by CodeOpinion

    • We should think about what's the actual behaviours?

    • What consistency do we need within a cluster of entities?

    • Query Performance? Do we always need all informations?

    By these considerations it makes sense to break an aggregate into smaller aggregates, when a vending machine dispatch an alarm, we let Route aggregate to listen on the domain event, consume it, and execute subsequent action:

Define Aggregates
AbstractAggregateRoot

Let's define a base class for our aggregates. We intentionally not to use abstract class because special config in package.json needs to be set to make it work.

import { ApplicationEvent, applicationEventPublisher } from "../../src/util/ApplicationEventPublisher"

export default class AbstractAggregateRoot {

    private events: any[] = []

    public save = async () => {
        for (const event of this.events) {
            await applicationEventPublisher.publishEvent(event)
        }
        this.events = []
    }

    public registerEvent = <S extends string, T>(event: ApplicationEvent<S, T>) => {
        this.events.push(event)
    }
}
StudentInfoUpdatedEvent, PackageUpdatedEvent and PackageDeletedEvent
import { Student, Student_package } from "@prisma/client"
import AbstractAggregateRoot from "./AbstractAggregateRoot"
import ApplicationEventPublisher, { ApplicationEvent } from "../../src/util/ApplicationEventPublisher";
import { UpdatePackageRequest, UpdateStudentRequest } from "../../dto/dto";
import updateValues from "../../src/util/updateValues";

export class StudentInfoUpdatedEvent extends ApplicationEvent<"StudentInfoUpdatedEvent", { update: UpdateStudentRequest }> { }
export class PackageUpdatedEvent extends ApplicationEvent<"PackageUpdatedEvent", { update: UpdatePackageRequest }> { }
export class PackageDeletedEvent extends ApplicationEvent<"PackageDeletedEvent", { packageId: number }> { }

Note that when no constructor is defined, the parent constructor will be called automatically.

StudentDomain
export default class StudentDomain extends AbstractAggregateRoot {
    constructor(
        private student: Student | null,
        private packages: Student_package[],
    ) {
        super()
    }

    updateInfo = (update: UpdateStudentRequest) => {
        this.student = { ...this.student, ...update }
        this.registerEvent(new StudentInfoUpdatedEvent("StudentInfoUpdatedEvent", { update }))
    }

    updatePackage = (update: UpdatePackageRequest) => {
        const package_ = this.packages.find(p => p.id === update.id)
        updateValues(package_, update)
        this.registerEvent(new PackageUpdatedEvent("PackageUpdatedEvent", { update }))
    }

    deletePackage = (packageId: number) => {
        const index = this.packages.findIndex(p => p.id === packageId)
        this.packages.splice(index, 1)
        this.registerEvent(new PackageDeletedEvent("PackageDeletedEvent", { packageId }))
    }
}

Recall that domain behaviours need to be published for other domain object to subscribe. By using save() method we will dispatch all the events we registered (refer to the AbstractAggregateRoot definition).

Define StudentCreatedEvent and StudentRepository

Recall that a repository by convention is defined to return aggregate root(s). In our convention each aggregate will be called something-Domain.

import { Student, Student_package } from "@prisma/client"
import StudentDomain from "../aggregate/StudentDomain"
import { db } from "../../db/database"
import { jsonArrayFrom, jsonObjectFrom } from "kysely/helpers/postgres";
import { applicationEventPublisher, ApplicationEvent } from "../../src/util/ApplicationEventPublisher"
import { CreateStudentRequest } from "../../dto/dto"

export class StudentCreatedEvent extends ApplicationEvent<"StudentCreatedEvent", CreateStudentRequest> { }


class StudentRepository {
    createRoot = async (student: CreateStudentRequest): Promise<StudentDomain> => {
        const event = new StudentCreatedEvent("StudentCreatedEvent", student)
        const result = await applicationEventPublisher.publishEvent(event)
        return new StudentDomain(event.result as Student, [], applicationEventPublisher)
    }

    getStudentById = async (uuid: string): Promise<StudentDomain> => {
        const result = await studentAggQuery
            .where("Student.id", "=", uuid)
            .executeTakeFirst()
        if (!result) {
            return new StudentDomain(null, [], applicationEventPublisher)
        }
        const { studentPackages, ...student } = result
        const studentDomain = new StudentDomain(student, studentPackages, applicationEventPublisher)
        return studentDomain
    }

    getStudentsByParentEmail = async (email: string): Promise<StudentDomain[]> => {
        const result = await studentAggQuery
            .where("Student.parent_email", "=", email)
            .execute()
        if (!result) {
            return []
        }
        const studentDomains: StudentDomain[] = [];
        result.forEach(r => {
            const { studentPackages, ...student_ } = r
            const studentDomain = new StudentDomain(student_, studentPackages, applicationEventPublisher)
            studentDomains.push(studentDomain)
        })
        return studentDomains
    }
}

const studentAggQuery = db.selectFrom("Student")
    .selectAll("Student")
    .select((eb) => [
        jsonArrayFrom(
            eb
                .selectFrom("Student_package")
                .selectAll("Student_package")
                .whereRef("Student_package.student_id", "=", "Student.id")
        ).as("studentPackages"),
    ])

export const studentRepository = new StudentRepository();
export default StudentRepository;
Let's Dispatch a StudentCreatedEvent!

Let's study the simplest case of domain event: create a Student aggregate. Recall that we have defined a listener to create a Student in studentEventHandler.ts.

const createStudent = async (req: Request, res: Response) => {
  const body = req.body as CreateStudentRequest;
  const studentDomain = await studentRepository.createRoot(body);
  res.json({
    success: true,
  });
};

We have demonstrated the result at the beginning of this article, let's review it:

Let's Execute StudentDomain.{updateInfo, updatePackage, deletePackage}

Finally let's go through basic CRUD examples using this methodology:

const updateStudent = async (req: Request, res: Response) => {
    const body = req.body as UpdateStudentRequest;
    const studentDomain = await studentRepository.getStudentById(body.id)
    studentDomain.updateInfo(body)
    await studentDomain.save()

    res.json({
        success: true,
        result: { student: body },
    });
};

const deletePackage = async (req: Request, res: Response) => {
    const params = req.query as { studentId: string, packageId: string };
    const { packageId, studentId } = params;
    const studentDomain = await studentRepository.getStudentById(studentId)
    studentDomain.deletePackage(Number(packageId))
    await studentDomain.save()
    res.json({
        success: true,
    });
};

const updatePackage = async (req: Request, res: Response) => {
    const packageUpdate = req.body as UpdatePackageRequest;
    const { student_id } = packageUpdate;
    const studentDomain = await studentRepository.getStudentById(student_id)
    studentDomain.updatePackage(req.body)
    await studentDomain.save();

    res.json({
        success: true,
        result: packageUpdate,
    });
};

Now to add new features, let's say we add an email notification after deletePackage is finished, then it is as simply as adding

on<PackageDeletedEvent>("PackageDeletedEvent", async (event) => {
    // some logic to notify student by email
})

Order of EventHandlers

Back to our deletePackage example, it is implemented by:

await db.deleteFrom("Student_package").where("Student_package.id", "=", Number(packageId)).execute();

However, Student_package is referenced by Class's via a foreign key, the request for now will result in the following error:

Since we didn't set the foreign key as CASCADE_DELETE, our Class entities will not be deleted automatically, therefore we have to sequentially:

  1. Delete the classes referencing to this package

  2. Delete this package

This can be handled by two eventHandlers (we can condense them into one of course, but let's demonstrate how to order events).

// studentEventHandler.ts
    on<PackageDeletedEvent>("PackageDeletedEvent", async (event) => {
        const { packageId } = event.data;
        await db.deleteFrom("Student_package").where("Student_package.id", "=", packageId).execute();
    }, 2)

    on<PackageDeletedEvent>("PackageDeletedEvent", async (event) => {
        const { packageId } = event.data;
        await db.deleteFrom("Class").where("Class.student_package_id", "=", packageId).execute();
    }, 1)

Further Code Simplification by @listener, @order using reflect-metadata

In my another new article on this topic, with the help of reflect-metadata we can further simplify our ApplicationEvent into

export class ApplicationEvent<
    DataType,
    ContextType extends ContextBaseType = ContextBaseType,
> {
    constructor(public data: DataType, public ctx: ContextType) { }
}

and all listeners become

export default class WalkEventListener {
    @listener
    async insertRelIssueToSummaryUuid(event: WalkEvent.SuccessPath.IssueSummaryUuidRelationInsertedEvent) {
        const { issueToSummaryOidUpdates } = event.data;
        const selectedIssueIds = issueToSummaryOidUpdates.map(({ issueId }) => issueId);
        await issueDao.inactivateSummaryOfSelectedIssues(selectedIssueIds);
        await issueDao.insertIssueSummaryUuids(issueToSummaryOidUpdates);
    }

    @listener
    @order(2)
    async furtherAaction(event: WalkEvent.SuccessPath.IssueSummaryUuidRelationInsertedEvent) {
        // logic here ...
    }
}
  • Now we can happily devide our long task into several reusuable subtasks, and each eventlistener has at most one try-catch.

  • In this way it is extremely easy to introduce a detailed retry/fallback mechanism for a long chain of tasks for which each of them can possibly fail. For example:

    For sure we can skip the command part and replace everything by event since we may not implement a saga to orchestrate those commands (it is over-complicated for monolithic application).

  • For more concept about saga, please refer to saga-pattern introduced by axon framework documentation.

References