0%

Command-Query Based System Part 1: Command Invokers and Query Invokers for Domain Driven Design

December 30, 2025

DDD

Springboot

1. Event Entity

The model definition of an Event entity is defined by:

package com.scriptmanager.common.entity

import dev.james.processor.GenerateDTO
import jakarta.persistence.*
import org.hibernate.annotations.DynamicInsert
import org.hibernate.annotations.Generated

@Entity
@GenerateDTO
@DynamicInsert
@Table(name = "event")
class Event(
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Int? = null,

    @Column(name = "request_id", nullable = false)
    var requestId: String = "",

    @Column(name = "created_at")
    @Generated
    val createdAt: Double? = null,

    @Column(name = "created_at_hk")
    @Generated
    val createdAtHk: String? = null,

    @Column(name = "event_type", nullable = false)
    var eventType: String = "",

    @Column(name = "event", nullable = false, columnDefinition = "TEXT")
    var payload: String = "",

    @Column(name = "request_user_email", nullable = false)
    var requestUserEmail: String = "",

    @Column(name = "success", nullable = false)
    var success: Boolean = true,

    @Column(name = "failure_reason", nullable = false)
    var failureReason: String = ""
)

2. Command Invoker

2.1. Command

package com.scriptmanager.domain.infrastructure

/**
 * Marker interface for commands that return a result of type R.
 * Commands are write operations that may modify state and produce domain events.
 */
interface Command<R>

2.2. CommandHandler

package com.scriptmanager.domain.infrastructure

interface CommandHandler<T : Any, R> {
    fun handle(eventQueue: EventQueue, command: T): R
}

2.3. Invoker

2.3.1. Event Queue

In my desktop application I communicate with SQLite via Spring Boot because Spring is good at modeling domain concept via classes. In that case, my command invoker is like:

1package com.scriptmanager.domain.infrastructure
2
3import com.fasterxml.jackson.databind.ObjectMapper
4import com.fasterxml.jackson.databind.SerializationFeature
5import com.fasterxml.jackson.module.kotlin.KotlinModule
6import com.scriptmanager.common.entity.Event
7import com.scriptmanager.common.utils.JsonNodeUtil
8import com.scriptmanager.repository.EventRepository
9import jakarta.persistence.EntityManager
10import org.slf4j.MDC
11import org.springframework.context.ApplicationEventPublisher
12import org.springframework.data.repository.findByIdOrNull
13import org.springframework.stereotype.Component
14import org.springframework.transaction.PlatformTransactionManager
15import org.springframework.transaction.annotation.Propagation
16import org.springframework.transaction.annotation.Transactional
17import org.springframework.transaction.support.TransactionSynchronization
18import org.springframework.transaction.support.TransactionSynchronizationManager
19import org.springframework.transaction.support.TransactionTemplate
20import java.util.UUID
21
22// Event timing enum (internal use only)
23enum class DispatchTiming {
24    IMMEDIATE,
25    POST_COMMIT
26}
27
28// Event wrapper to hold timing information (internal use only)
29data class EventWrapper<T : Any>(
30    val event: T,
31    val timing: DispatchTiming,
32    var context: ExecutionContext? = null,
33)
34
35// Simplified EventQueue interface
36interface EventQueue {
37    fun add(event: Any)
38    fun addTransactional(event: Any)
39
40    val immediateEvents: List<EventWrapper<Any>>
41    val postCommitEvents: List<EventWrapper<Any>>
42    val allEvents: List<EventWrapper<Any>>
43
44    // Keep backward compatibility
45    val events: List<EventWrapper<Any>>
46        get() = immediateEvents + postCommitEvents
47}

Note that we have separated immediateEvents and postCommitEvents because events can be transactional (should dispatch only after a transaction is finished).

Our event queue definition will include two methods: add and addTransactional, which is to add those events into two queues:

48// Updated EventQueue implementation
49class SmartEventQueue : EventQueue {
50    private val _events = mutableListOf<EventWrapper<Any>>()
51
52    override fun add(event: Any) {
53        val context = captureCurrentCommandContext()
54        _events.add(EventWrapper(event, DispatchTiming.IMMEDIATE, context))
55    }
56
57    override fun addTransactional(event: Any) {
58        val context = captureCurrentCommandContext()
59        _events.add(EventWrapper(event, DispatchTiming.POST_COMMIT, context))
60    }
61
62    private fun captureCurrentCommandContext(): ExecutionContext {
63        var commandName: String? = null
64
65        return ExecutionContext(
66            userEmail = "me",
67            requestId = MDC.get("requestId"),
68            originalMDC = MDC.getCopyOfContextMap(),
69            commandName = commandName
70        )
71    }
72
73    override val immediateEvents: List<EventWrapper<Any>>
74        get() = _events.filter { it.timing == DispatchTiming.IMMEDIATE }
75
76    override val postCommitEvents: List<EventWrapper<Any>>
77        get() = _events.filter { it.timing == DispatchTiming.POST_COMMIT }
78
79    override val allEvents: List<EventWrapper<Any>>
80        get() = _events.toList()
81}

Each event will be wrapped by EventWrapper when being dispatched, we will determine the timing to dispatch (publish) this event in line-112.

2.3.2. Event Dispatcher
82data class ExecutionContext(
83    val userEmail: String?,
84    val requestId: String?,
85    val originalMDC: Map<String, String>?,
86    val commandName: String? = null,
87)
88
89// Updated DomainEventDispatcher
90interface DomainEventDispatcher {
91    fun dispatchNow(eventQueue: EventQueue, requestId: String? = null)
92    fun dispatch(eventQueue: EventQueue, requestId: String? = null)
93}
94
95@Component
96class SpringDomainEventDispatcher(
97    private val applicationEventPublisher: ApplicationEventPublisher,
98) : DomainEventDispatcher {
99
100    override fun dispatchNow(eventQueue: EventQueue, requestId: String?) {
101        // Keep backward compatibility - dispatch all events immediately
102        dispatchEvents(eventQueue.events, requestId)
103    }
104
105    override fun dispatch(eventQueue: EventQueue, requestId: String?) {
106        // New method that respects timing
107        // Dispatch immediate events right away
108        dispatchEvents(eventQueue.immediateEvents, requestId)
109
110        // Register post-commit events for later dispatch
111        if (eventQueue.postCommitEvents.isNotEmpty()) {
112            registerPostCommitEvents(eventQueue.postCommitEvents, requestId)
113        }
114    }
115
116    private fun dispatchEvents(wrappedEvents: List<EventWrapper<Any>>, requestId: String?) {
117        // Don't modify MDC here - preserve existing context for policy handlers
118        // The requestId should already be set by the CommandInvoker
119        wrappedEvents.forEach { wrappedEvent ->
120            // First publish the wrapper for audit logging (separate transaction)
121            applicationEventPublisher.publishEvent(wrappedEvent)
122
123            // Then publish the actual business event (same transaction)
124            // This ensures business side effects are atomic with main transaction
125            applicationEventPublisher.publishEvent(wrappedEvent.event)
126        }
127    }
128
129    private fun registerPostCommitEvents(wrapperEvents: List<EventWrapper<Any>>, requestId: String?) {
130        val capturedContext = captureCurrentContext(requestId)
131
132        if (TransactionSynchronizationManager.isSynchronizationActive()) {
133            TransactionSynchronizationManager.registerSynchronization(
134                object : TransactionSynchronization {
135                    override fun afterCommit() {
136                        withContext(capturedContext) { ctx ->
137                            wrapperEvents.forEach { wrappedEvent ->
138                                wrappedEvent.context = ctx
139                                // Publish both wrapper (for audit) and event (for business logic)
140                                applicationEventPublisher.publishEvent(wrappedEvent)
141                                applicationEventPublisher.publishEvent(wrappedEvent.event)
142                            }
143                        }
144                    }
145                }
146            )
147        } else {
148            // If no transaction is active, dispatch immediately
149            dispatchEvents(wrapperEvents, requestId)
150        }
151    }
2.3.3. ExecutionContext (Metadata for the Request)
  • Case 1 (Without Authentication). Our captureCurrentContext is as simple as passing hard-coded string. Since the purpose is for logging and the only user is the owner of the computer:

    152    private fun captureCurrentContext(requestId: String?): ExecutionContext {
    153        val user = "me"
    154        return ExecutionContext(
    155            userEmail = "me",
    156            requestId = requestId,
    157            originalMDC = MDC.getCopyOfContextMap()
    158        )
    159    }
  • Case 2 (With Authentication ). Usually a request is dilivered by a thread, and for each thread we authenticate the message passed from the request header (not to be confused by the single-threading model of nodejs, they work differently).

    Once authenticated, we put userID, userEmail, or some other extra information into MDC, a thread-local object via MDC.put("key", value).

    For example, let's consider an implmentation of AuthAspect for a pointcut used to get user from JWT-token:

    Implementation of AuthAspect
    @Target(AnnotationTarget.CLASS)
    @Retention(AnnotationRetention.RUNTIME)
    annotation class AccessToken
    
    @Target(AnnotationTarget.VALUE_PARAMETER)
    @Retention(AnnotationRetention.RUNTIME)
    annotation class RequestUser
    
    @Aspect
    @Component
    class AuthAspect(
        private val jwtService: JwtService,
        private val eventPublisher: ApplicationEventPublisher,
    ) {
        private val authHeader: String = "authorization"
    
        companion object {
            private val currentUser = ThreadLocal<JwtPayload>()
            private val currentPublisher = ThreadLocal<ApplicationEventPublisher>()
            fun getCurrentUser(): JwtPayload? = currentUser.get()
            fun getEventPublisher(): ApplicationEventPublisher? = currentPublisher.get()
        }
    
        @Pointcut("@within(dev.james.alicetimetable.commons.aop.AccessToken)")
        fun getUserPointcut() {
        }
    
        @Around("getUserPointcut()")
        fun logBefore(joinPoint: ProceedingJoinPoint): Any? {
            val requestAttributes = RequestContextHolder.getRequestAttributes() as? ServletRequestAttributes
            val request = requestAttributes?.request
            try {
                val accessToken = request?.getHeader(authHeader)?.replace("Bearer ", "") ?: ""
                if (accessToken == "") {
                    throw Exception("AccessToken cannot be empty")
                }
                val payload: JwtPayload = jwtService.parseAndVerifyToken(accessToken)!!
                currentUser.set(payload)
                currentPublisher.set(eventPublisher)
    
                val method = (joinPoint.signature as MethodSignature).method
                val args = joinPoint.args
                val modifiedArgs = Array(args.size) { index ->
                    if (method.parameters[index].isAnnotationPresent(RequestUser::class.java)) {
                        payload
                    } else {
                        args[index]
                    }
                }
                return joinPoint.proceed(modifiedArgs)
    
            } catch (exception: Exception) {
                if (exception is JWTExpiredException) {
                    throw JWTExpiredException()
                } else {
                    throw Exception(exception.toString())
                }
            }
        }
    }

    Now with authentication:

    152    private fun captureCurrentContext(requestId: String?): ExecutionContext {
    153        val user = AuthAspect.getCurrentUser()
    154        return ExecutionContext(
    155            userEmail = user?.company_email,
    156            requestId = requestId,
    157            originalMDC = MDC.getCopyOfContextMap()
    158        )
    159    }

In both cases, the original requestId is instantiated from the invoke method of our commandHandler (see line-245 below), and each thread will have a consistent requestId.

This is especially helpful to trace a sequence of commands one after another which are triggered via event by event, as required by a single request.

Next we define a helpful trailing closure when our function requires to access the ExecutionContext for metadata:

160    private fun withContext(context: ExecutionContext, block: (context: ExecutionContext) -> Unit) {
161        // Set up context for event handlers
162        context.userEmail?.let { MDC.put("userEmail", it) }
163        context.requestId?.let { MDC.put("requestId", it) }
164        context.originalMDC?.forEach { (key, value) -> MDC.put(key, value) }
165
166        // Temporarily set user in ThreadLocal if your AuthAspect supports it
167        // This depends on how AuthAspect stores the current user
168
169        try {
170            block(context)
171        } finally {
172            MDC.clear()
173        }
174    }
175}
2.3.4. CommandInvoker

We will have a Event table storing all the Commands and Events.

One Command can produce multiple Events, and side effect must be carried out via new components with name being suffixed by Policy.

176interface CommandInvoker {
177    fun <T : Any, R> invoke(handler: CommandHandler<T, R>, command: T): R
178    fun <R> invoke(command: Command<R>): R
179}
180
181@Component
182class OneTransactionCommandInvoker(
183    private val transactionManager: PlatformTransactionManager,
184    private val domainEventDispatcher: DomainEventDispatcher,
185    private val commandAuditor: CommandAuditor,
186    private val eventRepository: EventRepository,
187    private val commandHandlers: List<CommandHandler<*, *>>
188) : CommandInvoker {
189    private val transactionTemplate: TransactionTemplate = TransactionTemplate(transactionManager)
190
191    /**
192     * Map of command class to its handler for fast lookup
193     */
194    private val handlerMap: Map<Class<*>, CommandHandler<*, *>> = buildHandlerMap()
195
196    private fun buildHandlerMap(): Map<Class<*>, CommandHandler<*, *>> {
197        val map = mutableMapOf<Class<*>, CommandHandler<*, *>>()
198
199        commandHandlers.forEach { handler ->
200            val commandClass = extractCommandClass(handler)
201            if (commandClass != null) {
202                if (map.containsKey(commandClass)) {
203                    throw IllegalStateException(
204                        "Multiple handlers found for command: ${commandClass.simpleName}"
205                    )
206                }
207                map[commandClass] = handler
208                println("Registered command handler: ${handler::class.simpleName} for ${commandClass.simpleName}")
209            }
210        }
211
212        return map
213    }
214
215    private fun extractCommandClass(handler: CommandHandler<*, *>): Class<*>? {
216        val handlerClass = handler::class.java
217        val genericInterfaces = handlerClass.genericInterfaces
218
219        for (genericInterface in genericInterfaces) {
220            if (genericInterface is java.lang.reflect.ParameterizedType) {
221                val rawType = genericInterface.rawType
222                if (rawType == CommandHandler::class.java) {
223                    val typeArgs = genericInterface.actualTypeArguments
224                    if (typeArgs.isNotEmpty()) {
225                        return typeArgs[0] as? Class<*>
226                    }
227                }
228            }
229        }
230        return null
231    }
  • By default each commandHandler is executed within a transaction automatically (we manage to do this using transactionTemplate, see line-281 below), and when either one of the nested command failed, it will make everything be rollbacked.

  • Also by the dependency injection interface List<CommandHandler<*, *>>, it will grab all the beans that satisfy the interface CommandHandler, from which we can prepare a mapping of Command-to-Handlerso that in our controller level we can simply run

    commandInvoker.invoke(command)

    without specifying which handler to handle it.

232    @Suppress("UNCHECKED_CAST")
233    override fun <R> invoke(command: Command<R>): R {
234        val handler = handlerMap[command::class.java]
235            ?: throw IllegalArgumentException(
236                "No handler registered for command: ${command::class.simpleName}"
237            )
238
239        return invoke(handler as CommandHandler<Any, R>, command as Any)
240    }
241
242    override fun <T : Any, R> invoke(handler: CommandHandler<T, R>, command: T): R {
243        // Preserve existing requestId for nested commands, or create new one for top-level commands
244        val existingRequestId = MDC.get("requestId")
245        val requestId = existingRequestId ?: UUID.randomUUID().toString()
246        val isNestedCommand = existingRequestId != null
247
248        // Always ensure MDC has the requestId
249        MDC.put("requestId", requestId)
250        MDC.put("userEmail", "me")
251        var commandEventId: Int? = null
252        var dispatchedEvents: List<EventWrapper<Any>> = emptyList()
253
254        // Debug logging
255        println("Command: ${command.javaClass.simpleName}, isNested: $isNestedCommand, requestId: $requestId")
256
257        try {
258            // Execute all commands the same way - use existing transaction if available, otherwise create new one
259            val result = if (isNestedCommand && TransactionSynchronizationManager.isSynchronizationActive()) {
260                // Execute directly in existing transaction for nested commands
261                println("Executing nested command in existing transaction")
262                val eventQueue = SmartEventQueue()
263
264                // Log command audit INSIDE the transaction
265                val commandEvent = commandAuditor.logCommandInTransaction(command, requestId)
266                commandEventId = commandEvent.id
267
268                val result = handler.handle(eventQueue, command)
269                dispatchedEvents = eventQueue.allEvents
270                domainEventDispatcher.dispatch(eventQueue, requestId)
271
272                // Mark as success immediately (same transaction)
273                commandEvent.success = true
274                eventRepository.save(commandEvent)
275
276                result
277            } else {
278                // Create new transaction for top-level commands
279                println("Executing top-level command in new transaction")
280                var tempDispatchedEvents: List<EventWrapper<Any>> = emptyList()
281                val result = transactionTemplate.execute { _ ->
282                    val eventQueue = SmartEventQueue()
283
284                    // Log command audit INSIDE the transaction
285                    val commandEvent = commandAuditor.logCommandInTransaction(command, requestId)
286                    commandEventId = commandEvent.id
287
288                    val result = handler.handle(eventQueue, command)
289                    tempDispatchedEvents = eventQueue.allEvents
290                    domainEventDispatcher.dispatch(eventQueue, requestId)
291
292                    // Mark as success immediately (same transaction)
293                    commandEvent.success = true
294                    eventRepository.save(commandEvent)
295
296                    result
297                } ?: throw IllegalStateException()
298                dispatchedEvents = tempDispatchedEvents
299                result
300            }
301
302            println("Command completed successfully: ${command.javaClass.simpleName}")
303            return result
304        } catch (e: Exception) {
305            println("Command failed: ${command.javaClass.simpleName}, error: ${e.message}")
306            e.printStackTrace()
307
308            // Note: Command audit was rolled back with the transaction
309            // We cannot log the failure because the command event was not persisted
310            println("Warning: Command audit was rolled back due to transaction failure")
311
312            throw e
313        } finally {
314            // Only clear MDC for top-level commands to preserve context for nested commands
315            if (!isNestedCommand) {
316                MDC.clear()
317            }
318        }
319    }
320}
2.3.5. Logging
2.3.5.1. Definitions

The rest is definitions from trial and error to achieve comprehensive logging. For example (events happen in descending order over creation time):

321@Component
322class CommandAuditor(
323    private val eventRepository: EventRepository,
324    private val entityManager: EntityManager,
325) {
326    private val objectMapper = ObjectMapper().apply {
327        registerModule(KotlinModule.Builder().build())
328        configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
329    }
330
331    @Transactional(propagation = Propagation.MANDATORY)
332    fun <T : Any> logCommandInTransaction(command: T, requestId: String): Event {
333        try {
334            val eventJsonNode = JsonNodeUtil.toJsonNode(command).toString()
335            val userEmail = "me"
336
337            // Detect if this command is being called from a policy
338            val commandEventType = detectPolicyOrigin(command.javaClass.simpleName)
339
340            // Get unique timestamp in milliseconds (no decimals)
341            val baseTimestamp = System.currentTimeMillis()
342            val nanoOffset = (System.nanoTime()%1000).toInt()  // Use last 3 digits of nanos for uniqueness
343            val uniqueTimestamp = baseTimestamp + nanoOffset
344
345            val eventToSave = Event(
346                createdAt = uniqueTimestamp.toDouble(),  // Convert to Double for database
347                requestId = requestId,
348                eventType = commandEventType,
349                payload = eventJsonNode,
350                requestUserEmail = userEmail,
351                success = false  // Will be updated to true if command succeeds
352            )
353
354            val savedEvent = eventRepository.save(eventToSave)
355            println("AUDIT: Command logged in transaction with createdAt = $uniqueTimestamp")
356            return savedEvent
357        } catch (e: Exception) {
358            println("AUDIT ERROR: Failed to save command: ${e.message}")
359            e.printStackTrace()
360            throw e
361        }
362    }
363
364    @Transactional(propagation = Propagation.REQUIRES_NEW)
365    fun <T : Any> logCommandWithPreciseTiming(command: T, requestId: String): Event {
366        try {
367            val eventJsonNode = JsonNodeUtil.toJsonNode(command).toString()
368            val user = "me"
369            val userEmail = "me" ?: ""
370
371            // Detect if this command is being called from a policy
372            val commandEventType = detectPolicyOrigin(command.javaClass.simpleName)
373
374            // Get unique timestamp in milliseconds (no decimals)
375            val baseTimestamp = System.currentTimeMillis()
376            val nanoOffset = (System.nanoTime()%1000).toInt()  // Use last 3 digits of nanos for uniqueness
377            val uniqueTimestamp = baseTimestamp + nanoOffset
378
379            val eventToSave = Event(
380                createdAt = uniqueTimestamp.toDouble(),  // Convert to Double for database
381                requestId = requestId,
382                eventType = commandEventType,
383                payload = eventJsonNode,
384                requestUserEmail = userEmail
385            )
386
387            val savedEvent = eventRepository.save(eventToSave)
388            entityManager.flush() // Force immediate write
389
390            println("AUDIT: Command saved immediately with createdAt = $uniqueTimestamp")
391            return savedEvent
392        } catch (e: Exception) {
393            println("AUDIT ERROR: Failed to save command: ${e.message}")
394            e.printStackTrace()
395            throw e
396        }
397    }
398
399    @Transactional(propagation = Propagation.REQUIRES_NEW)
400    fun <T : Any> logCommand(command: T, requestId: String): Event {
401        try {
402            val eventJsonNode = JsonNodeUtil.toJsonNode(command).toString()
403            val user = "me"
404            val userEmail = "me" ?: ""
405            MDC.put("userEmail", userEmail)
406
407            // Detect if this command is being called from a policy
408            val commandEventType = detectPolicyOrigin(command.javaClass.simpleName)
409
410            val eventToSave = Event(
411                requestId = requestId,
412                eventType = commandEventType,
413                payload = eventJsonNode,
414                requestUserEmail = userEmail
415            )
416
417            val savedEvent = eventRepository.save(eventToSave)
418
419            // Force flush to ensure the data is actually written to database
420            entityManager.flush()
421
422            println("AUDIT: Command saved to database with ID: ${savedEvent.id}")
423            return savedEvent
424        } catch (e: Exception) {
425            println("AUDIT ERROR: Failed to save command to database: ${e.message}")
426            e.printStackTrace()
427            throw e
428        }
429    }
430
431    @Transactional(propagation = Propagation.MANDATORY)
432    fun logSuccess(eventId: Int) {
433        val command = eventRepository.findByIdOrNull(eventId) ?: return
434        command.success = true
435        eventRepository.save(command)
436    }
437
438    @Transactional(propagation = Propagation.REQUIRES_NEW)
439    fun logFailure(commandEventId: Int, error: String) {
440        val command = eventRepository.findByIdOrNull(commandEventId) ?: return
441        command.success = false
442        command.failureReason = error
443        eventRepository.save(command)
444    }
445
446    @Transactional(propagation = Propagation.REQUIRES_NEW)
447    fun logEventFailures(dispatchedEvents: List<EventWrapper<Any>>, requestId: String, failureMessage: String) {
448        try {
449            val requestUuid = requestId
450
451            // Find all events for this request that were dispatched
452            dispatchedEvents.forEach { eventWrapper ->
453                val eventTypeName = eventWrapper.event.javaClass.simpleName
454
455                // Find events in database that match this event type and request ID
456                // We need to find events that were logged for this specific event
457                val eventsToUpdate = findEventsByTypeAndRequest(eventTypeName, requestUuid)
458
459                eventsToUpdate.forEach { event ->
460                    event.success = false
461                    event.failureReason = failureMessage
462                    // Update the event type to indicate failure
463                    if (!event.eventType.endsWith("-- Failed!")) {
464                        event.eventType = "${event.eventType} -- Failed!"
465                    }
466                    eventRepository.save(event)
467                    println("Updated event ${event.eventType} (ID: ${event.id}) with failure reason")
468                }
469            }
470
471            entityManager.flush()
472            println("Successfully updated all dispatched events with failure reasons")
473        } catch (e: Exception) {
474            println("Error updating events with failure: ${e.message}")
475            e.printStackTrace()
476            throw e
477        }
478    }
479
480    private fun findEventsByTypeAndRequest(eventType: String, requestId: String): List<Event> {
481        return try {
482            // Find events that match both the event type and request ID
483            val matchingEvents = eventRepository.findAllByRequestIdAndEventType(requestId, eventType)
484            println("Found ${matchingEvents.size} events of type $eventType for request $requestId")
485            matchingEvents
486        } catch (e: Exception) {
487            println("Could not find events for type $eventType and request $requestId: ${e.message}")
488            emptyList()
489        }
490    }
491
492    private fun detectPolicyOrigin(commandName: String): String {
493        return try {
494            // Get the current stack trace
495            val stackTrace = Thread.currentThread().stackTrace
496
497            // Look for policy classes in the stack trace
498            var policyName: String? = null
499            var eventMethodName: String? = null
500
501            for (stackElement in stackTrace) {
502                val className = stackElement.className
503                val methodName = stackElement.methodName
504
505                if (className.contains(".policy.") && className.endsWith("Policy")) {
506                    // Extract just the policy class name (without package)
507                    policyName = className.substringAfterLast(".")
508                    eventMethodName = methodName
509                    break
510                }
511            }
512
513            if (policyName != null) {
514                // Try to derive the event name from the method name
515                val eventName = deriveEventNameFromMethod(eventMethodName)
516                return if (eventName != null) {
517                    "$eventName > $policyName > $commandName"
518                } else {
519                    "$policyName > $commandName"
520                }
521            }
522
523            // If no policy found, return just the command name
524            commandName
525        } catch (e: Exception) {
526            // If anything goes wrong, just return the command name
527            println("Warning: Failed to detect policy origin: ${e.message}")
528            commandName
529        }
530    }
531
532    private fun deriveEventNameFromMethod(methodName: String?): String? {
533        return try {
534            if (methodName == null) return null
535
536            // Common patterns in policy method names:
537            // - resetClassNumbersOn(event) -> event type in parameter
538            // - extendClassesOnClassMoved -> ClassMovedEvent
539            // - extendClassesOnClassesCreated -> ClassesCreatedEvent
540
541            when {
542                methodName.contains("On") -> {
543                    // Extract the part after "On" and convert to event name
544                    val eventPart = methodName.substringAfterLast("On")
545                    if (eventPart.isNotEmpty()) {
546                        // Convert camelCase to EventName (e.g., "classMoved" -> "ClassMovedEvent")
547                        val eventName = eventPart.replaceFirstChar { it.uppercase() }
548                        if (!eventName.endsWith("Event")) {
549                            "${eventName}Event"
550                        } else {
551                            eventName
552                        }
553                    } else null
554                }
555
556                else -> null
557            }
558        } catch (e: Exception) {
559            null
560        }
561    }
562}
2.3.5.2. On propagation = Propagation.MANDATORY

By requiring the propagation of the transaction be Propagation.MANDATORY, we require the transaction for event logging be within the same transaction of the command invokation.

This is only a tradeoff for SQLite database because there cannot be two transactions writing into SQLite db at the same time, which is designed on purpose.

Event should not be logged synchorously because we don't want the failure of event logging interupt the successful transaction. For traditional database such as PostgreSQL and MySQL one should switch the propagation mode of transaction from MANDATORY to REQUIRES_NEW.

3. Query Invoker

QueryInvoker is much more simple because it does not involve any transaction (we can always add logging in the invoker level later on).

3.1. Query

package com.scriptmanager.domain.infrastructure

/**
 * Marker interface for queries that return a result of type R.
 * Queries are read-only operations that do not modify state.
 */
interface Query<R>

3.2. Invoker

1package com.scriptmanager.domain.infrastructure
2
3import org.slf4j.LoggerFactory
4import org.slf4j.MDC
5import org.springframework.stereotype.Component
6import org.springframework.transaction.annotation.Transactional
7import java.lang.reflect.ParameterizedType
8import java.util.UUID
9
10/**
11 * QueryInvoker is responsible for routing queries to their appropriate handlers.
12 * Unlike CommandInvoker, queries are read-only operations and:
13 * - Do not produce domain events
14 * - Use read-only transactions (@Transactional(readOnly = true))
15 * - Are typically lighter weight
16 * - Can be cached or optimized for read performance
17 */
18interface QueryInvoker {
19    /**
20     * Invokes the appropriate query handler for the given query.
21     * @param query The query to execute
22     * @return The query result
23     */
24    fun <R> invoke(query: Query<R>): R
25}
26
27@Component
28class DefaultQueryInvoker(
29    private val queryHandlers: List<QueryHandler<*, *>>
30) : QueryInvoker {
31
32    private val logger = LoggerFactory.getLogger(DefaultQueryInvoker::class.java)
33
34    /**
35     * Map of query class to its handler for fast lookup
36     */
37    private val handlerMap: Map<Class<*>, QueryHandler<*, *>> = buildHandlerMap()
38
39    private fun buildHandlerMap(): Map<Class<*>, QueryHandler<*, *>> {
40        val map = mutableMapOf<Class<*>, QueryHandler<*, *>>()
41
42        queryHandlers.forEach { handler ->
43            val queryClass = extractQueryClass(handler)
44            if (queryClass != null) {
45                if (map.containsKey(queryClass)) {
46                    throw IllegalStateException(
47                        "Multiple handlers found for query: ${queryClass.simpleName}"
48                    )
49                }
50                map[queryClass] = handler
51                logger.info("Registered query handler: ${handler::class.simpleName} for ${queryClass.simpleName}")
52            } else {
53                logger.warn("Could not determine query type for handler: ${handler::class.simpleName}")
54            }
55        }
56
57        return map
58    }
59
60    private fun extractQueryClass(handler: QueryHandler<*, *>): Class<*>? {
61        // Look through all generic interfaces
62        val genericInterfaces = handler::class.java.genericInterfaces
63
64        for (genericInterface in genericInterfaces) {
65            if (genericInterface is ParameterizedType &&
66                genericInterface.rawType == QueryHandler::class.java
67            ) {
68                val typeArguments = genericInterface.actualTypeArguments
69                if (typeArguments.isNotEmpty()) {
70                    return typeArguments[0] as? Class<*>
71                }
72            }
73        }
74
75        return null
76    }
77
78    @Transactional(readOnly = true)
79    @Suppress("UNCHECKED_CAST")
80    override fun <R> invoke(query: Query<R>): R {
81        // Set up MDC for request tracing (optional for queries, but useful for debugging)
82        val existingRequestId = MDC.get("requestId")

Here usually there is non-null requestId because events are dispatched from our command.

But there are ocassions something has happended from other domain (a domain event) that needs to be noticed from our domain service, in that case a event is dispatched via a controller endpoint and requestId is now being null:

83        val requestId = existingRequestId ?: UUID.randomUUID().toString()
84
85        if (existingRequestId == null) {
86            MDC.put("requestId", requestId)
87        }
88
89        try {
90            val handler = handlerMap[query::class.java] as? QueryHandler<Query<R>, R>
91                ?: throw IllegalArgumentException(
92                    "No handler found for query: ${query::class.simpleName}. " +
93                            "Available handlers: ${handlerMap.keys.map { it.simpleName }}"
94                )
95
96            logger.debug("Executing query: ${query::class.simpleName} with requestId: $requestId")
97
98            val result = handler.handle(query)
99
100            logger.debug("Query completed: ${query::class.simpleName}")
101
102            return result
103        } catch (e: Exception) {
104            logger.error("Query failed: ${query::class.simpleName}, error: ${e.message}", e)
105            throw e
106        } finally {
107            // Only clear MDC if we created it
108            if (existingRequestId == null) {
109                MDC.remove("requestId")
110            }
111        }
112    }
113}

4. Event Listeners for Side Effects

4.1. Policies

4.1.1. Concrete Example

As mentioned each side effect must be carried out from Policys in order not to burry the side effect logic into the sea of uncontrolled and exploded number of services.

Essentially such a policy is as simple as a event listener defined as the following exmaple:

package dev.james.alicetimetable.domain.context.timetable.policy

import dev.james.alicetimetable.domain.context.timetable.commandhandler.*
import dev.james.alicetimetable.domain.context.timetable.command.*
import dev.james.alicetimetable.domain.context.timetable.IHasPackageId
import dev.james.alicetimetable.domain.context.timetable.TimetableDomainEvent
import dev.james.alicetimetable.infra.CommandInvoker
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Component

@Component
class ResetClassNumbersPolicy(
    private val commandInvoker: CommandInvoker,
    private val resetClassNumbersHandler: ResetClassNumbersHandler,
) {
    @EventListener
    fun resetClassNumbersOn(event_: TimetableDomainEvent.GroupOfClassesRemovedEvent) {
        val event = event_ as IHasPackageId
        resetClassNumbersOfPackage(event)
    }

    @EventListener
    fun resetClassNumbersOn(event_: TimetableDomainEvent.SingleClassRemovedEvent) {
        val event = event_ as IHasPackageId
        resetClassNumbersOfPackage(event)
    }

    @EventListener
    fun resetClassNumbersOn(event_: TimetableDomainEvent.ClassMovedEvent) {
        val event = event_ as IHasPackageId
        resetClassNumbersOfPackage(event)
    }

    @EventListener
    fun resetClassNumbersOn(event_: TimetableDomainEvent.ClassesCreatedEvent) {
        val event = event_ as IHasPackageId
        resetClassNumbersOfPackage(event)
    }

    @EventListener
    fun resetClassNumbersOn(event_: TimetableDomainEvent.ClassesRemovedEvent) {
        val event = event_ as IHasPackageId
        resetClassNumbersOfPackage(event)
    }

    @EventListener
    fun resetClassNumbersOn(event_: TimetableDomainEvent.ClassDuplicatedEvent) {
        val event = event_ as IHasPackageId
        resetClassNumbersOfPackage(event)
    }

    private fun resetClassNumbersOfPackage(event: IHasPackageId) {
        val packageId = event.packageId
        val cmd = ResetClassNumbersCommand(packageId = packageId)
        commandInvoker.invoke(resetClassNumbersHandler, cmd)
    }
}
4.1.2. Advantages

As you can see, there are so many reasons we need to reset the numbering of classes in a timetable system of an art school.

Now

  1. From documentation point of view we know all the reasons why we need to reset the class numbers, and

  2. When one of the cases need special treatment (not managable by the shared logic resetClassNumbersOfPackage), we can immediately adjust one of the event listener by another private function.

    This avoids the altering of one method breaks the functionality of another method unexpectedly. Of course this can be mitigated by writing comprehensive test case (if any).

  3. Now we can write test cases using the event produced by our command handler. We will discuss writing tests in the next article.

4.2. DomainEventLogger

Side effect means a mutation of the state of the system, and event logging is no exception.

package com.scriptmanager.domain.infrastructure

import com.scriptmanager.common.entity.Event
import com.scriptmanager.common.utils.JsonNodeUtil
import com.scriptmanager.repository.EventRepository

import org.slf4j.MDC
import org.springframework.context.ApplicationEventPublisher
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Propagation
import org.springframework.transaction.annotation.Transactional
import org.springframework.transaction.event.TransactionPhase
import org.springframework.transaction.event.TransactionalEventListener
import java.util.*


@Component
class DomainEventLogger(
    private val eventRepository: EventRepository,
    private val applicationEventPublisher: ApplicationEventPublisher,
) {
    @EventListener
    @Transactional(propagation = Propagation.MANDATORY)  // Join existing transaction
    fun recordSynchronousEvent(wrapperEvent: EventWrapper<Any>) {
        if (wrapperEvent.timing != DispatchTiming.IMMEDIATE) {
            return
        }

        try {
            // Immediate event audit with precise timing
            // This runs in the SAME transaction as the command
            persistEventWithPreciseTiming(wrapperEvent)
        } catch (e: Exception) {
            // Log audit failure but don't break the flow
            println("Warning: Failed to persist synchronous event: ${e.message}")
            e.printStackTrace()
        }
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    fun recordTransactionalEvent(wrapperEvent: EventWrapper<Any>) {
        if (wrapperEvent.timing != DispatchTiming.POST_COMMIT) {
            return
        }
        try {
            // For post-commit events, persist with precise timing
            persistEventWithPreciseTiming(wrapperEvent)
            applicationEventPublisher.publishEvent(wrapperEvent.event)
        } catch (e: Exception) {
            // Log the error but don't let event logging failures break the application
            println("Warning: Failed to persist or publish transactional event: ${e.message}")
            e.printStackTrace()
        }
    }

    private fun persistEventWithPreciseTiming(wrappedEvent: EventWrapper<Any>) {
        val event = wrappedEvent.event
        val ctx = wrappedEvent.context
        val requestID = try {
            UUID.fromString(MDC.get("requestId") ?: ctx?.requestId).toString()
        } catch (e: Exception) {
            ""
        }

        val userEmail = "me"

        // Detect which command dispatched this event
        val eventTypeName = event::class.simpleName ?: ""
        val commandAwareEventType = if (ctx?.commandName != null) {
            "${ctx.commandName} > $eventTypeName"
        } else {
            detectCommandOrigin(eventTypeName)
        }

        // Get unique timestamp in milliseconds (no decimals)
        val baseTimestamp = System.currentTimeMillis()
        val nanoOffset = (System.nanoTime()%1000).toInt()  // Use last 3 digits of nanos for uniqueness
        val uniqueTimestamp = baseTimestamp + nanoOffset

        val eventJsonNode = JsonNodeUtil.toJsonNode(event)
        val eventToSave = Event(
            createdAt = uniqueTimestamp.toDouble(),  // Convert to Double for database
            eventType = commandAwareEventType,
            payload = eventJsonNode.toString(),
            requestUserEmail = userEmail,
            requestId = requestID
        )

        // Save immediately with precise timing
        eventRepository.save(eventToSave)
        println("AUDIT: Event saved immediately with createdAt = $uniqueTimestamp")
    }

    private fun persistEvent(wrappedEvent: EventWrapper<Any>) {
        val event = wrappedEvent.event
        val ctx = wrappedEvent.context
        val requestID = try {
            UUID.fromString(MDC.get("requestId") ?: ctx?.requestId).toString()
        } catch (e: Exception) {
            ""
        }

        // MDC.set("userEmail") has been executed when we call the command
        // this event may be handled by another thread when it is transactional event listener
        val userEmail = "me"

        // Detect which command dispatched this event
        val eventTypeName = event::class.simpleName ?: ""
        val commandAwareEventType = if (ctx?.commandName != null) {
            "${ctx.commandName} > $eventTypeName"
        } else {
            detectCommandOrigin(eventTypeName)
        }

        val eventJsonNode = JsonNodeUtil.toJsonNode(event).toString()
        val eventToStore = Event(
            eventType = commandAwareEventType,
            payload = eventJsonNode,
            requestUserEmail = userEmail,
            requestId = requestID
        )
        eventRepository.save(eventToStore)
    }

    private fun detectCommandOrigin(eventTypeName: String): String {
        return try {
            // Get the current stack trace
            val stackTrace = Thread.currentThread().stackTrace

            // Look for command handler classes in the stack trace
            for (stackElement in stackTrace) {
                val className = stackElement.className

                // Look for the specific command handler package structure
                if (className.contains("dev.james.alicetimetable.domain.context.timetable.commandHandler") &&
                    className.endsWith("Handler")
                ) {
                    // Extract the command name from handler name
                    // e.g., "MoveClassHandler" -> "MoveClassCommand"
                    val handlerName = className.substringAfterLast(".")
                    val commandName = handlerName.replace("Handler", "Command")
                    return "$commandName > $eventTypeName"
                }

                // Also look for user context command handlers
                if (className.contains("dev.james.alicetimetable.domain.context.user.commandHandler") &&
                    className.endsWith("Handler")
                ) {
                    val handlerName = className.substringAfterLast(".")
                    val commandName = handlerName.replace("Handler", "Command")
                    return "$commandName > $eventTypeName"
                }

                // Also look for notification context command handlers
                if (className.contains("dev.james.alicetimetable.domain.context.notification.commandHandler") &&
                    className.endsWith("Handler")
                ) {
                    val handlerName = className.substringAfterLast(".")
                    val commandName = handlerName.replace("Handler", "Command")
                    return "$commandName > $eventTypeName"
                }
            }

            // If no command handler found, return just the event name
            eventTypeName
        } catch (e: Exception) {
            // If anything goes wrong, just return the event name
            eventTypeName
        }
    }

}