Preface
In the past we have discussed monolithic DDD in an article
- Monolithic DDD Without ORM by Separating Data and Domain Behaviour
where we rely heavily on the
ApplicationEventPublisher
instance injected by spring boot framework.
Now in this article we will be creating an ApplicationEventPublisher
that works almost the same by using eventemitter3.
Publisher Result
In the following id
is generated when inserting the data:
As is to be seen when we define AppliactionEvent
in the next section, event.result
is null
before dispatching the event. Therefore the presence of event.result
indicates the ApplicationEventPublisher
instance functions as expected.
ApplicationEvent and Application Event Publisher
import EventEmitter from 'eventemitter3'; export class ApplicationEvent<S extends string, T, R = T> { public result: R | null = null constructor( public type: S, public data: T, ) { } }
-
Here
result
is used to retrieve the eventual result from the event handler in which theevent
object gets mutated. -
For example when we have created a
Student
object, we may wish to get the instance back and return at least theid
(generated by database) to the frontend.
9type InferEventType<T> = T extends ApplicationEvent<infer U, any> ? U : never; 10 11type EventProcessor<Event = any> = (event: Event) => void | Promise<void>; 12type OrderedEventHandler<Event = any> = { processor: EventProcessor<Event>, order: number } 13 14class ApplicationEventPublisher { 15 private eventEmitter: EventEmitter; 16 private handlers: Map<string, OrderedEventHandler[]> = new Map<string, OrderedEventHandler[]>() 17 18 constructor() { 19 this.eventEmitter = new EventEmitter(); 20 } 21 22 async publishEvent<S extends string, T>(event: ApplicationEvent<S, T>): Promise<void> { 23 const handlers = this.handlers.get(event.type).sort((a, b) => a.order - b.order); 24 if (handlers) { 25 for (const handler of handlers) { 26 const processor = handler.processor as EventProcessor<typeof event> 27 await processor(event) 28 } 29 } 30 } 31 32 addEventHandler = <T>(evenType: InferEventType<T>, handler: EventProcessor<T>, order: number = 1): void => { 33 if (!this.handlers.has(evenType)) { 34 this.handlers.set(evenType, []); 35 } 36 this.handlers.get(evenType)!.push({ processor: handler, order }); 37 } 38} 39 40export const applicationEventPublisher = new ApplicationEventPublisher(); 41export default ApplicationEventPublisher;
EventHandlers
studentEventHandler.ts
import { db } from "../../db/database"; import { StudentCreatedEvent } from "../repository/StudentRepository"; import { applicationEventPublisher } from "../../src/util/ApplicationEventPublisher"; import { PackageDeletedEvent, PackageUpdatedEvent, StudentInfoUpdatedEvent } from "../aggregate/StudentDomain"; export const registerStudentEvents = () => { const on = applicationEventPublisher.addEventHandler on<StudentCreatedEvent>("StudentCreatedEvent", async (event) => { const student = event.data const student_ = await db.insertInto("Student").values(student).returningAll().executeTakeFirst() if (!student_) { throw new Error("Studnet insertion fails") } event.result = student_ }) on<StudentInfoUpdatedEvent>("StudentInfoUpdatedEvent", async (event) => { const { update } = event.data await db.updateTable("Student").set(update).where("Student.id", "=", update.id).execute(); }) on<PackageDeletedEvent>("PackageDeletedEvent", async (event) => { const { packageId } = event.data; await db.deleteFrom("Student_package").where("Student_package.id", "=", Number(packageId)).execute(); }) }
Experiment
Before we Design Aggregates from Table of Relations
Let's consider the following relations:
-
A student has many packages, each package has many classes.
-
Each of the three entities has very rich domain behaviours.
-
Hierarchically packages and classes should be inside of the
StudentDomain
aggregate by what we learn about what consitutes an aggregate. But recall fromWhat makes an Aggregate (DDD)? Hint: it's NOT hierarchy & relationships
by CodeOpinion
-
We should think about what's the actual behaviours?
-
What consistency do we need within a cluster of entities?
-
Query Performance? Do we always need all informations?
By these considerations it makes sense to break an aggregate into smaller aggregates, when a vending machine dispatch an alarm, we let
Route
aggregate to listen on the domain event, consume it, and execute subsequent action: -
Define Aggregates
AbstractAggregateRoot
Let's define a base class for our aggregates. We intentionally not to use abstract class because special config in package.json
needs to be set to make it work.
import { ApplicationEvent, applicationEventPublisher } from "../../src/util/ApplicationEventPublisher" export default class AbstractAggregateRoot { private events: any[] = [] public save = async () => { for (const event of this.events) { await applicationEventPublisher.publishEvent(event) } this.events = [] } public registerEvent = <S extends string, T>(event: ApplicationEvent<S, T>) => { this.events.push(event) } }
StudentInfoUpdatedEvent, PackageUpdatedEvent and PackageDeletedEvent
import { Student, Student_package } from "@prisma/client" import AbstractAggregateRoot from "./AbstractAggregateRoot" import ApplicationEventPublisher, { ApplicationEvent } from "../../src/util/ApplicationEventPublisher"; import { UpdatePackageRequest, UpdateStudentRequest } from "../../dto/dto"; import updateValues from "../../src/util/updateValues"; export class StudentInfoUpdatedEvent extends ApplicationEvent<"StudentInfoUpdatedEvent", { update: UpdateStudentRequest }> { } export class PackageUpdatedEvent extends ApplicationEvent<"PackageUpdatedEvent", { update: UpdatePackageRequest }> { } export class PackageDeletedEvent extends ApplicationEvent<"PackageDeletedEvent", { packageId: number }> { }
Note that when no constructor is defined, the parent constructor will be called automatically.
StudentDomain
export default class StudentDomain extends AbstractAggregateRoot { constructor( private student: Student | null, private packages: Student_package[], ) { super() } updateInfo = (update: UpdateStudentRequest) => { this.student = { ...this.student, ...update } this.registerEvent(new StudentInfoUpdatedEvent("StudentInfoUpdatedEvent", { update })) } updatePackage = (update: UpdatePackageRequest) => { const package_ = this.packages.find(p => p.id === update.id) updateValues(package_, update) this.registerEvent(new PackageUpdatedEvent("PackageUpdatedEvent", { update })) } deletePackage = (packageId: number) => { const index = this.packages.findIndex(p => p.id === packageId) this.packages.splice(index, 1) this.registerEvent(new PackageDeletedEvent("PackageDeletedEvent", { packageId })) } }
Recall that domain behaviours need to be published for other domain object to subscribe. By using save()
method we will dispatch all the events we registered (refer to the AbstractAggregateRoot
definition).
Define StudentCreatedEvent and StudentRepository
Recall that a repository by convention is defined to return aggregate root(s). In our convention each aggregate will be called something-Domain
.
import { Student, Student_package } from "@prisma/client" import StudentDomain from "../aggregate/StudentDomain" import { db } from "../../db/database" import { jsonArrayFrom, jsonObjectFrom } from "kysely/helpers/postgres"; import { applicationEventPublisher, ApplicationEvent } from "../../src/util/ApplicationEventPublisher" import { CreateStudentRequest } from "../../dto/dto" export class StudentCreatedEvent extends ApplicationEvent<"StudentCreatedEvent", CreateStudentRequest> { } class StudentRepository { createRoot = async (student: CreateStudentRequest): Promise<StudentDomain> => { const event = new StudentCreatedEvent("StudentCreatedEvent", student) const result = await applicationEventPublisher.publishEvent(event) return new StudentDomain(event.result as Student, [], applicationEventPublisher) } getStudentById = async (uuid: string): Promise<StudentDomain> => { const result = await studentAggQuery .where("Student.id", "=", uuid) .executeTakeFirst() if (!result) { return new StudentDomain(null, [], applicationEventPublisher) } const { studentPackages, ...student } = result const studentDomain = new StudentDomain(student, studentPackages, applicationEventPublisher) return studentDomain } getStudentsByParentEmail = async (email: string): Promise<StudentDomain[]> => { const result = await studentAggQuery .where("Student.parent_email", "=", email) .execute() if (!result) { return [] } const studentDomains: StudentDomain[] = []; result.forEach(r => { const { studentPackages, ...student_ } = r const studentDomain = new StudentDomain(student_, studentPackages, applicationEventPublisher) studentDomains.push(studentDomain) }) return studentDomains } } const studentAggQuery = db.selectFrom("Student") .selectAll("Student") .select((eb) => [ jsonArrayFrom( eb .selectFrom("Student_package") .selectAll("Student_package") .whereRef("Student_package.student_id", "=", "Student.id") ).as("studentPackages"), ]) export const studentRepository = new StudentRepository(); export default StudentRepository;
Let's Dispatch a StudentCreatedEvent!
Let's study the simplest case of domain event: create a Student
aggregate. Recall that we have defined a listener to create a Student
in studentEventHandler.ts
.
const createStudent = async (req: Request, res: Response) => { const body = req.body as CreateStudentRequest; const studentDomain = await studentRepository.createRoot(body); res.json({ success: true, }); };
We have demonstrated the result at the beginning of this article, let's review it:
Let's Execute StudentDomain.{updateInfo, updatePackage, deletePackage}
Finally let's go through basic CRUD examples using this methodology:
const updateStudent = async (req: Request, res: Response) => { const body = req.body as UpdateStudentRequest; const studentDomain = await studentRepository.getStudentById(body.id) studentDomain.updateInfo(body) await studentDomain.save() res.json({ success: true, result: { student: body }, }); }; const deletePackage = async (req: Request, res: Response) => { const params = req.query as { studentId: string, packageId: string }; const { packageId, studentId } = params; const studentDomain = await studentRepository.getStudentById(studentId) studentDomain.deletePackage(Number(packageId)) await studentDomain.save() res.json({ success: true, }); }; const updatePackage = async (req: Request, res: Response) => { const packageUpdate = req.body as UpdatePackageRequest; const { student_id } = packageUpdate; const studentDomain = await studentRepository.getStudentById(student_id) studentDomain.updatePackage(req.body) await studentDomain.save(); res.json({ success: true, result: packageUpdate, }); };
Now to add new features, let's say we add an email notification after deletePackage
is finished, then it is as simply as adding
on<PackageDeletedEvent>("PackageDeletedEvent", async (event) => { // some logic to notify student by email })
Order of EventHandlers
Back to our deletePackage
example, it is implemented by:
await db.deleteFrom("Student_package").where("Student_package.id", "=", Number(packageId)).execute();
However, Student_package
is referenced by Class
's via a foreign key, the request for now will result in the following error:
Since we didn't set the foreign key as CASCADE_DELETE
, our Class
entities will not be deleted automatically, therefore we have to sequentially:
-
Delete the classes referencing to this package
-
Delete this package
This can be handled by two eventHandlers (we can condense them into one of course, but let's demonstrate how to order events).
// studentEventHandler.ts on<PackageDeletedEvent>("PackageDeletedEvent", async (event) => { const { packageId } = event.data; await db.deleteFrom("Student_package").where("Student_package.id", "=", packageId).execute(); }, 2) on<PackageDeletedEvent>("PackageDeletedEvent", async (event) => { const { packageId } = event.data; await db.deleteFrom("Class").where("Class.student_package_id", "=", packageId).execute(); }, 1)
Further Code Simplification by @listener, @order using reflect-metadata
In my another new article on this topic, with the help of reflect-metadata
we can further simplify our ApplicationEvent
into
export class ApplicationEvent< DataType, ContextType extends ContextBaseType = ContextBaseType, > { constructor(public data: DataType, public ctx: ContextType) { } }
and all listeners become
export default class WalkEventListener { @listener async insertRelIssueToSummaryUuid(event: WalkEvent.SuccessPath.IssueSummaryUuidRelationInsertedEvent) { const { issueToSummaryOidUpdates } = event.data; const selectedIssueIds = issueToSummaryOidUpdates.map(({ issueId }) => issueId); await issueDao.inactivateSummaryOfSelectedIssues(selectedIssueIds); await issueDao.insertIssueSummaryUuids(issueToSummaryOidUpdates); } @listener @order(2) async furtherAaction(event: WalkEvent.SuccessPath.IssueSummaryUuidRelationInsertedEvent) { // logic here ... } }
-
Now we can happily devide our long task into several reusuable subtasks, and each eventlistener has at most one
try-catch
. -
In this way it is extremely easy to introduce a detailed retry/fallback mechanism for a long chain of tasks for which each of them can possibly fail. For example:
For sure we can skip the
command
part and replace everything byevent
since we may not implement asaga
to orchestrate those commands (it is over-complicated for monolithic application). -
For more concept about
saga
, please refer tosaga
-pattern introduced byaxon
framework documentation.