1. Event Entity
The model definition of an Event entity is defined by:
package com.scriptmanager.common.entity import dev.james.processor.GenerateDTO import jakarta.persistence.* import org.hibernate.annotations.DynamicInsert import org.hibernate.annotations.Generated @Entity @GenerateDTO @DynamicInsert @Table(name = "event") class Event( @Id @GeneratedValue(strategy = GenerationType.IDENTITY) val id: Int? = null, @Column(name = "request_id", nullable = false) var requestId: String = "", @Column(name = "created_at") @Generated val createdAt: Double? = null, @Column(name = "created_at_hk") @Generated val createdAtHk: String? = null, @Column(name = "event_type", nullable = false) var eventType: String = "", @Column(name = "event", nullable = false, columnDefinition = "TEXT") var payload: String = "", @Column(name = "request_user_email", nullable = false) var requestUserEmail: String = "", @Column(name = "success", nullable = false) var success: Boolean = true, @Column(name = "failure_reason", nullable = false) var failureReason: String = "" )
2. Command Invoker
2.1. Command
package com.scriptmanager.domain.infrastructure /** * Marker interface for commands that return a result of type R. * Commands are write operations that may modify state and produce domain events. */ interface Command<R>
2.2. CommandHandler
package com.scriptmanager.domain.infrastructure interface CommandHandler<T : Any, R> { fun handle(eventQueue: EventQueue, command: T): R }
2.3. Invoker
2.3.1. Event Queue
In my desktop application I communicate with SQLite via Spring Boot because Spring is good at modeling domain concept via classes. In that case, my command invoker is like:
1package com.scriptmanager.domain.infrastructure 2 3import com.fasterxml.jackson.databind.ObjectMapper 4import com.fasterxml.jackson.databind.SerializationFeature 5import com.fasterxml.jackson.module.kotlin.KotlinModule 6import com.scriptmanager.common.entity.Event 7import com.scriptmanager.common.utils.JsonNodeUtil 8import com.scriptmanager.repository.EventRepository 9import jakarta.persistence.EntityManager 10import org.slf4j.MDC 11import org.springframework.context.ApplicationEventPublisher 12import org.springframework.data.repository.findByIdOrNull 13import org.springframework.stereotype.Component 14import org.springframework.transaction.PlatformTransactionManager 15import org.springframework.transaction.annotation.Propagation 16import org.springframework.transaction.annotation.Transactional 17import org.springframework.transaction.support.TransactionSynchronization 18import org.springframework.transaction.support.TransactionSynchronizationManager 19import org.springframework.transaction.support.TransactionTemplate 20import java.util.UUID 21 22// Event timing enum (internal use only) 23enum class DispatchTiming { 24 IMMEDIATE, 25 POST_COMMIT 26} 27 28// Event wrapper to hold timing information (internal use only) 29data class EventWrapper<T : Any>( 30 val event: T, 31 val timing: DispatchTiming, 32 var context: ExecutionContext? = null, 33) 34 35// Simplified EventQueue interface 36interface EventQueue { 37 fun add(event: Any) 38 fun addTransactional(event: Any) 39 40 val immediateEvents: List<EventWrapper<Any>> 41 val postCommitEvents: List<EventWrapper<Any>> 42 val allEvents: List<EventWrapper<Any>> 43 44 // Keep backward compatibility 45 val events: List<EventWrapper<Any>> 46 get() = immediateEvents + postCommitEvents 47}
Note that we have separated immediateEvents and postCommitEvents because events can be transactional (should dispatch only after a transaction is finished).
Our event queue definition will include two methods: add and addTransactional, which is to add those events into two queues:
48// Updated EventQueue implementation 49class SmartEventQueue : EventQueue { 50 private val _events = mutableListOf<EventWrapper<Any>>() 51 52 override fun add(event: Any) { 53 val context = captureCurrentCommandContext() 54 _events.add(EventWrapper(event, DispatchTiming.IMMEDIATE, context)) 55 } 56 57 override fun addTransactional(event: Any) { 58 val context = captureCurrentCommandContext() 59 _events.add(EventWrapper(event, DispatchTiming.POST_COMMIT, context)) 60 } 61 62 private fun captureCurrentCommandContext(): ExecutionContext { 63 var commandName: String? = null 64 65 return ExecutionContext( 66 userEmail = "me", 67 requestId = MDC.get("requestId"), 68 originalMDC = MDC.getCopyOfContextMap(), 69 commandName = commandName 70 ) 71 } 72 73 override val immediateEvents: List<EventWrapper<Any>> 74 get() = _events.filter { it.timing == DispatchTiming.IMMEDIATE } 75 76 override val postCommitEvents: List<EventWrapper<Any>> 77 get() = _events.filter { it.timing == DispatchTiming.POST_COMMIT } 78 79 override val allEvents: List<EventWrapper<Any>> 80 get() = _events.toList() 81}
Each event will be wrapped by EventWrapper when being dispatched, we will determine the timing to dispatch (publish) this event in line-112.
2.3.2. Event Dispatcher
82data class ExecutionContext( 83 val userEmail: String?, 84 val requestId: String?, 85 val originalMDC: Map<String, String>?, 86 val commandName: String? = null, 87) 88 89// Updated DomainEventDispatcher 90interface DomainEventDispatcher { 91 fun dispatchNow(eventQueue: EventQueue, requestId: String? = null) 92 fun dispatch(eventQueue: EventQueue, requestId: String? = null) 93} 94 95@Component 96class SpringDomainEventDispatcher( 97 private val applicationEventPublisher: ApplicationEventPublisher, 98) : DomainEventDispatcher { 99 100 override fun dispatchNow(eventQueue: EventQueue, requestId: String?) { 101 // Keep backward compatibility - dispatch all events immediately 102 dispatchEvents(eventQueue.events, requestId) 103 } 104 105 override fun dispatch(eventQueue: EventQueue, requestId: String?) { 106 // New method that respects timing 107 // Dispatch immediate events right away 108 dispatchEvents(eventQueue.immediateEvents, requestId) 109 110 // Register post-commit events for later dispatch 111 if (eventQueue.postCommitEvents.isNotEmpty()) { 112 registerPostCommitEvents(eventQueue.postCommitEvents, requestId) 113 } 114 } 115 116 private fun dispatchEvents(wrappedEvents: List<EventWrapper<Any>>, requestId: String?) { 117 // Don't modify MDC here - preserve existing context for policy handlers 118 // The requestId should already be set by the CommandInvoker 119 wrappedEvents.forEach { wrappedEvent -> 120 // First publish the wrapper for audit logging (separate transaction) 121 applicationEventPublisher.publishEvent(wrappedEvent) 122 123 // Then publish the actual business event (same transaction) 124 // This ensures business side effects are atomic with main transaction 125 applicationEventPublisher.publishEvent(wrappedEvent.event) 126 } 127 } 128 129 private fun registerPostCommitEvents(wrapperEvents: List<EventWrapper<Any>>, requestId: String?) { 130 val capturedContext = captureCurrentContext(requestId) 131 132 if (TransactionSynchronizationManager.isSynchronizationActive()) { 133 TransactionSynchronizationManager.registerSynchronization( 134 object : TransactionSynchronization { 135 override fun afterCommit() { 136 withContext(capturedContext) { ctx -> 137 wrapperEvents.forEach { wrappedEvent -> 138 wrappedEvent.context = ctx 139 // Publish both wrapper (for audit) and event (for business logic) 140 applicationEventPublisher.publishEvent(wrappedEvent) 141 applicationEventPublisher.publishEvent(wrappedEvent.event) 142 } 143 } 144 } 145 } 146 ) 147 } else { 148 // If no transaction is active, dispatch immediately 149 dispatchEvents(wrapperEvents, requestId) 150 } 151 }
2.3.3. ExecutionContext (Metadata for the Request)
-
Case 1 (Without Authentication). Our
captureCurrentContextis as simple as passing hard-coded string. Since the purpose is for logging and the only user is the owner of the computer:152 private fun captureCurrentContext(requestId: String?): ExecutionContext { 153 val user = "me" 154 return ExecutionContext( 155 userEmail = "me", 156 requestId = requestId, 157 originalMDC = MDC.getCopyOfContextMap() 158 ) 159 } -
Case 2 (With Authentication ). Usually a request is dilivered by a thread, and for each thread we authenticate the message passed from the request header (not to be confused by the single-threading model of nodejs, they work differently).
Once authenticated, we put
userID,userEmail, or some other extra information intoMDC, a thread-local object viaMDC.put("key", value).For example, let's consider an implmentation of
AuthAspectfor a pointcut used to get user from JWT-token:Implementation of AuthAspect
@Target(AnnotationTarget.CLASS) @Retention(AnnotationRetention.RUNTIME) annotation class AccessToken @Target(AnnotationTarget.VALUE_PARAMETER) @Retention(AnnotationRetention.RUNTIME) annotation class RequestUser @Aspect @Component class AuthAspect( private val jwtService: JwtService, private val eventPublisher: ApplicationEventPublisher, ) { private val authHeader: String = "authorization" companion object { private val currentUser = ThreadLocal<JwtPayload>() private val currentPublisher = ThreadLocal<ApplicationEventPublisher>() fun getCurrentUser(): JwtPayload? = currentUser.get() fun getEventPublisher(): ApplicationEventPublisher? = currentPublisher.get() } @Pointcut("@within(dev.james.alicetimetable.commons.aop.AccessToken)") fun getUserPointcut() { } @Around("getUserPointcut()") fun logBefore(joinPoint: ProceedingJoinPoint): Any? { val requestAttributes = RequestContextHolder.getRequestAttributes() as? ServletRequestAttributes val request = requestAttributes?.request try { val accessToken = request?.getHeader(authHeader)?.replace("Bearer ", "") ?: "" if (accessToken == "") { throw Exception("AccessToken cannot be empty") } val payload: JwtPayload = jwtService.parseAndVerifyToken(accessToken)!! currentUser.set(payload) currentPublisher.set(eventPublisher) val method = (joinPoint.signature as MethodSignature).method val args = joinPoint.args val modifiedArgs = Array(args.size) { index -> if (method.parameters[index].isAnnotationPresent(RequestUser::class.java)) { payload } else { args[index] } } return joinPoint.proceed(modifiedArgs) } catch (exception: Exception) { if (exception is JWTExpiredException) { throw JWTExpiredException() } else { throw Exception(exception.toString()) } } } }Now with authentication:
152 private fun captureCurrentContext(requestId: String?): ExecutionContext { 153 val user = AuthAspect.getCurrentUser() 154 return ExecutionContext( 155 userEmail = user?.company_email, 156 requestId = requestId, 157 originalMDC = MDC.getCopyOfContextMap() 158 ) 159 }
In both cases, the original requestId is instantiated from the invoke method of our commandHandler (see line-245 below), and each thread will have a consistent requestId.
This is especially helpful to trace a sequence of commands one after another which are triggered via event by event, as required by a single request.
Next we define a helpful trailing closure when our function requires to access the ExecutionContext for metadata:
160 private fun withContext(context: ExecutionContext, block: (context: ExecutionContext) -> Unit) { 161 // Set up context for event handlers 162 context.userEmail?.let { MDC.put("userEmail", it) } 163 context.requestId?.let { MDC.put("requestId", it) } 164 context.originalMDC?.forEach { (key, value) -> MDC.put(key, value) } 165 166 // Temporarily set user in ThreadLocal if your AuthAspect supports it 167 // This depends on how AuthAspect stores the current user 168 169 try { 170 block(context) 171 } finally { 172 MDC.clear() 173 } 174 } 175}
2.3.4. CommandInvoker
We will have a Event table storing all the Commands and Events.
One Command can produce multiple Events, and side effect must be carried out via new components with name being suffixed by Policy.
176interface CommandInvoker { 177 fun <T : Any, R> invoke(handler: CommandHandler<T, R>, command: T): R 178 fun <R> invoke(command: Command<R>): R 179} 180 181@Component 182class OneTransactionCommandInvoker( 183 private val transactionManager: PlatformTransactionManager, 184 private val domainEventDispatcher: DomainEventDispatcher, 185 private val commandAuditor: CommandAuditor, 186 private val eventRepository: EventRepository, 187 private val commandHandlers: List<CommandHandler<*, *>> 188) : CommandInvoker { 189 private val transactionTemplate: TransactionTemplate = TransactionTemplate(transactionManager) 190 191 /** 192 * Map of command class to its handler for fast lookup 193 */ 194 private val handlerMap: Map<Class<*>, CommandHandler<*, *>> = buildHandlerMap() 195 196 private fun buildHandlerMap(): Map<Class<*>, CommandHandler<*, *>> { 197 val map = mutableMapOf<Class<*>, CommandHandler<*, *>>() 198 199 commandHandlers.forEach { handler -> 200 val commandClass = extractCommandClass(handler) 201 if (commandClass != null) { 202 if (map.containsKey(commandClass)) { 203 throw IllegalStateException( 204 "Multiple handlers found for command: ${commandClass.simpleName}" 205 ) 206 } 207 map[commandClass] = handler 208 println("Registered command handler: ${handler::class.simpleName} for ${commandClass.simpleName}") 209 } 210 } 211 212 return map 213 } 214 215 private fun extractCommandClass(handler: CommandHandler<*, *>): Class<*>? { 216 val handlerClass = handler::class.java 217 val genericInterfaces = handlerClass.genericInterfaces 218 219 for (genericInterface in genericInterfaces) { 220 if (genericInterface is java.lang.reflect.ParameterizedType) { 221 val rawType = genericInterface.rawType 222 if (rawType == CommandHandler::class.java) { 223 val typeArgs = genericInterface.actualTypeArguments 224 if (typeArgs.isNotEmpty()) { 225 return typeArgs[0] as? Class<*> 226 } 227 } 228 } 229 } 230 return null 231 }
-
By default each
commandHandleris executed within a transaction automatically (we manage to do this usingtransactionTemplate, see line-281 below), and when either one of the nested command failed, it will make everything be rollbacked. -
Also by the dependency injection interface
List<CommandHandler<*, *>>, it will grab all the beans that satisfy the interfaceCommandHandler, from which we can prepare a mapping ofCommand-to-Handlerso that in our controller level we can simply runcommandInvoker.invoke(command)without specifying which handler to handle it.
232 @Suppress("UNCHECKED_CAST") 233 override fun <R> invoke(command: Command<R>): R { 234 val handler = handlerMap[command::class.java] 235 ?: throw IllegalArgumentException( 236 "No handler registered for command: ${command::class.simpleName}" 237 ) 238 239 return invoke(handler as CommandHandler<Any, R>, command as Any) 240 } 241 242 override fun <T : Any, R> invoke(handler: CommandHandler<T, R>, command: T): R { 243 // Preserve existing requestId for nested commands, or create new one for top-level commands 244 val existingRequestId = MDC.get("requestId") 245 val requestId = existingRequestId ?: UUID.randomUUID().toString() 246 val isNestedCommand = existingRequestId != null 247 248 // Always ensure MDC has the requestId 249 MDC.put("requestId", requestId) 250 MDC.put("userEmail", "me") 251 var commandEventId: Int? = null 252 var dispatchedEvents: List<EventWrapper<Any>> = emptyList() 253 254 // Debug logging 255 println("Command: ${command.javaClass.simpleName}, isNested: $isNestedCommand, requestId: $requestId") 256 257 try { 258 // Execute all commands the same way - use existing transaction if available, otherwise create new one 259 val result = if (isNestedCommand && TransactionSynchronizationManager.isSynchronizationActive()) { 260 // Execute directly in existing transaction for nested commands 261 println("Executing nested command in existing transaction") 262 val eventQueue = SmartEventQueue() 263 264 // Log command audit INSIDE the transaction 265 val commandEvent = commandAuditor.logCommandInTransaction(command, requestId) 266 commandEventId = commandEvent.id 267 268 val result = handler.handle(eventQueue, command) 269 dispatchedEvents = eventQueue.allEvents 270 domainEventDispatcher.dispatch(eventQueue, requestId) 271 272 // Mark as success immediately (same transaction) 273 commandEvent.success = true 274 eventRepository.save(commandEvent) 275 276 result 277 } else { 278 // Create new transaction for top-level commands 279 println("Executing top-level command in new transaction") 280 var tempDispatchedEvents: List<EventWrapper<Any>> = emptyList() 281 val result = transactionTemplate.execute { _ -> 282 val eventQueue = SmartEventQueue() 283 284 // Log command audit INSIDE the transaction 285 val commandEvent = commandAuditor.logCommandInTransaction(command, requestId) 286 commandEventId = commandEvent.id 287 288 val result = handler.handle(eventQueue, command) 289 tempDispatchedEvents = eventQueue.allEvents 290 domainEventDispatcher.dispatch(eventQueue, requestId) 291 292 // Mark as success immediately (same transaction) 293 commandEvent.success = true 294 eventRepository.save(commandEvent) 295 296 result 297 } ?: throw IllegalStateException() 298 dispatchedEvents = tempDispatchedEvents 299 result 300 } 301 302 println("Command completed successfully: ${command.javaClass.simpleName}") 303 return result 304 } catch (e: Exception) { 305 println("Command failed: ${command.javaClass.simpleName}, error: ${e.message}") 306 e.printStackTrace() 307 308 // Note: Command audit was rolled back with the transaction 309 // We cannot log the failure because the command event was not persisted 310 println("Warning: Command audit was rolled back due to transaction failure") 311 312 throw e 313 } finally { 314 // Only clear MDC for top-level commands to preserve context for nested commands 315 if (!isNestedCommand) { 316 MDC.clear() 317 } 318 } 319 } 320}
2.3.5. Logging
2.3.5.1. Definitions
The rest is definitions from trial and error to achieve comprehensive logging. For example (events happen in descending order over creation time):

321@Component 322class CommandAuditor( 323 private val eventRepository: EventRepository, 324 private val entityManager: EntityManager, 325) { 326 private val objectMapper = ObjectMapper().apply { 327 registerModule(KotlinModule.Builder().build()) 328 configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false) 329 } 330 331 @Transactional(propagation = Propagation.MANDATORY) 332 fun <T : Any> logCommandInTransaction(command: T, requestId: String): Event { 333 try { 334 val eventJsonNode = JsonNodeUtil.toJsonNode(command).toString() 335 val userEmail = "me" 336 337 // Detect if this command is being called from a policy 338 val commandEventType = detectPolicyOrigin(command.javaClass.simpleName) 339 340 // Get unique timestamp in milliseconds (no decimals) 341 val baseTimestamp = System.currentTimeMillis() 342 val nanoOffset = (System.nanoTime()%1000).toInt() // Use last 3 digits of nanos for uniqueness 343 val uniqueTimestamp = baseTimestamp + nanoOffset 344 345 val eventToSave = Event( 346 createdAt = uniqueTimestamp.toDouble(), // Convert to Double for database 347 requestId = requestId, 348 eventType = commandEventType, 349 payload = eventJsonNode, 350 requestUserEmail = userEmail, 351 success = false // Will be updated to true if command succeeds 352 ) 353 354 val savedEvent = eventRepository.save(eventToSave) 355 println("AUDIT: Command logged in transaction with createdAt = $uniqueTimestamp") 356 return savedEvent 357 } catch (e: Exception) { 358 println("AUDIT ERROR: Failed to save command: ${e.message}") 359 e.printStackTrace() 360 throw e 361 } 362 } 363 364 @Transactional(propagation = Propagation.REQUIRES_NEW) 365 fun <T : Any> logCommandWithPreciseTiming(command: T, requestId: String): Event { 366 try { 367 val eventJsonNode = JsonNodeUtil.toJsonNode(command).toString() 368 val user = "me" 369 val userEmail = "me" ?: "" 370 371 // Detect if this command is being called from a policy 372 val commandEventType = detectPolicyOrigin(command.javaClass.simpleName) 373 374 // Get unique timestamp in milliseconds (no decimals) 375 val baseTimestamp = System.currentTimeMillis() 376 val nanoOffset = (System.nanoTime()%1000).toInt() // Use last 3 digits of nanos for uniqueness 377 val uniqueTimestamp = baseTimestamp + nanoOffset 378 379 val eventToSave = Event( 380 createdAt = uniqueTimestamp.toDouble(), // Convert to Double for database 381 requestId = requestId, 382 eventType = commandEventType, 383 payload = eventJsonNode, 384 requestUserEmail = userEmail 385 ) 386 387 val savedEvent = eventRepository.save(eventToSave) 388 entityManager.flush() // Force immediate write 389 390 println("AUDIT: Command saved immediately with createdAt = $uniqueTimestamp") 391 return savedEvent 392 } catch (e: Exception) { 393 println("AUDIT ERROR: Failed to save command: ${e.message}") 394 e.printStackTrace() 395 throw e 396 } 397 } 398 399 @Transactional(propagation = Propagation.REQUIRES_NEW) 400 fun <T : Any> logCommand(command: T, requestId: String): Event { 401 try { 402 val eventJsonNode = JsonNodeUtil.toJsonNode(command).toString() 403 val user = "me" 404 val userEmail = "me" ?: "" 405 MDC.put("userEmail", userEmail) 406 407 // Detect if this command is being called from a policy 408 val commandEventType = detectPolicyOrigin(command.javaClass.simpleName) 409 410 val eventToSave = Event( 411 requestId = requestId, 412 eventType = commandEventType, 413 payload = eventJsonNode, 414 requestUserEmail = userEmail 415 ) 416 417 val savedEvent = eventRepository.save(eventToSave) 418 419 // Force flush to ensure the data is actually written to database 420 entityManager.flush() 421 422 println("AUDIT: Command saved to database with ID: ${savedEvent.id}") 423 return savedEvent 424 } catch (e: Exception) { 425 println("AUDIT ERROR: Failed to save command to database: ${e.message}") 426 e.printStackTrace() 427 throw e 428 } 429 } 430 431 @Transactional(propagation = Propagation.MANDATORY) 432 fun logSuccess(eventId: Int) { 433 val command = eventRepository.findByIdOrNull(eventId) ?: return 434 command.success = true 435 eventRepository.save(command) 436 } 437 438 @Transactional(propagation = Propagation.REQUIRES_NEW) 439 fun logFailure(commandEventId: Int, error: String) { 440 val command = eventRepository.findByIdOrNull(commandEventId) ?: return 441 command.success = false 442 command.failureReason = error 443 eventRepository.save(command) 444 } 445 446 @Transactional(propagation = Propagation.REQUIRES_NEW) 447 fun logEventFailures(dispatchedEvents: List<EventWrapper<Any>>, requestId: String, failureMessage: String) { 448 try { 449 val requestUuid = requestId 450 451 // Find all events for this request that were dispatched 452 dispatchedEvents.forEach { eventWrapper -> 453 val eventTypeName = eventWrapper.event.javaClass.simpleName 454 455 // Find events in database that match this event type and request ID 456 // We need to find events that were logged for this specific event 457 val eventsToUpdate = findEventsByTypeAndRequest(eventTypeName, requestUuid) 458 459 eventsToUpdate.forEach { event -> 460 event.success = false 461 event.failureReason = failureMessage 462 // Update the event type to indicate failure 463 if (!event.eventType.endsWith("-- Failed!")) { 464 event.eventType = "${event.eventType} -- Failed!" 465 } 466 eventRepository.save(event) 467 println("Updated event ${event.eventType} (ID: ${event.id}) with failure reason") 468 } 469 } 470 471 entityManager.flush() 472 println("Successfully updated all dispatched events with failure reasons") 473 } catch (e: Exception) { 474 println("Error updating events with failure: ${e.message}") 475 e.printStackTrace() 476 throw e 477 } 478 } 479 480 private fun findEventsByTypeAndRequest(eventType: String, requestId: String): List<Event> { 481 return try { 482 // Find events that match both the event type and request ID 483 val matchingEvents = eventRepository.findAllByRequestIdAndEventType(requestId, eventType) 484 println("Found ${matchingEvents.size} events of type $eventType for request $requestId") 485 matchingEvents 486 } catch (e: Exception) { 487 println("Could not find events for type $eventType and request $requestId: ${e.message}") 488 emptyList() 489 } 490 } 491 492 private fun detectPolicyOrigin(commandName: String): String { 493 return try { 494 // Get the current stack trace 495 val stackTrace = Thread.currentThread().stackTrace 496 497 // Look for policy classes in the stack trace 498 var policyName: String? = null 499 var eventMethodName: String? = null 500 501 for (stackElement in stackTrace) { 502 val className = stackElement.className 503 val methodName = stackElement.methodName 504 505 if (className.contains(".policy.") && className.endsWith("Policy")) { 506 // Extract just the policy class name (without package) 507 policyName = className.substringAfterLast(".") 508 eventMethodName = methodName 509 break 510 } 511 } 512 513 if (policyName != null) { 514 // Try to derive the event name from the method name 515 val eventName = deriveEventNameFromMethod(eventMethodName) 516 return if (eventName != null) { 517 "$eventName > $policyName > $commandName" 518 } else { 519 "$policyName > $commandName" 520 } 521 } 522 523 // If no policy found, return just the command name 524 commandName 525 } catch (e: Exception) { 526 // If anything goes wrong, just return the command name 527 println("Warning: Failed to detect policy origin: ${e.message}") 528 commandName 529 } 530 } 531 532 private fun deriveEventNameFromMethod(methodName: String?): String? { 533 return try { 534 if (methodName == null) return null 535 536 // Common patterns in policy method names: 537 // - resetClassNumbersOn(event) -> event type in parameter 538 // - extendClassesOnClassMoved -> ClassMovedEvent 539 // - extendClassesOnClassesCreated -> ClassesCreatedEvent 540 541 when { 542 methodName.contains("On") -> { 543 // Extract the part after "On" and convert to event name 544 val eventPart = methodName.substringAfterLast("On") 545 if (eventPart.isNotEmpty()) { 546 // Convert camelCase to EventName (e.g., "classMoved" -> "ClassMovedEvent") 547 val eventName = eventPart.replaceFirstChar { it.uppercase() } 548 if (!eventName.endsWith("Event")) { 549 "${eventName}Event" 550 } else { 551 eventName 552 } 553 } else null 554 } 555 556 else -> null 557 } 558 } catch (e: Exception) { 559 null 560 } 561 } 562}
2.3.5.2. On propagation = Propagation.MANDATORY
By requiring the propagation of the transaction be Propagation.MANDATORY, we require the transaction for event logging be within the same transaction of the command invokation.
This is only a tradeoff for SQLite database because there cannot be two transactions writing into SQLite db at the same time, which is designed on purpose.
Event should not be logged synchorously because we don't want the failure of event logging interupt the successful transaction. For traditional database such as PostgreSQL and MySQL one should switch the propagation mode of transaction from MANDATORY to REQUIRES_NEW.
3. Query Invoker
QueryInvoker is much more simple because it does not involve any transaction (we can always add logging in the invoker level later on).
3.1. Query
package com.scriptmanager.domain.infrastructure /** * Marker interface for queries that return a result of type R. * Queries are read-only operations that do not modify state. */ interface Query<R>
3.2. Invoker
1package com.scriptmanager.domain.infrastructure 2 3import org.slf4j.LoggerFactory 4import org.slf4j.MDC 5import org.springframework.stereotype.Component 6import org.springframework.transaction.annotation.Transactional 7import java.lang.reflect.ParameterizedType 8import java.util.UUID 9 10/** 11 * QueryInvoker is responsible for routing queries to their appropriate handlers. 12 * Unlike CommandInvoker, queries are read-only operations and: 13 * - Do not produce domain events 14 * - Use read-only transactions (@Transactional(readOnly = true)) 15 * - Are typically lighter weight 16 * - Can be cached or optimized for read performance 17 */ 18interface QueryInvoker { 19 /** 20 * Invokes the appropriate query handler for the given query. 21 * @param query The query to execute 22 * @return The query result 23 */ 24 fun <R> invoke(query: Query<R>): R 25} 26 27@Component 28class DefaultQueryInvoker( 29 private val queryHandlers: List<QueryHandler<*, *>> 30) : QueryInvoker { 31 32 private val logger = LoggerFactory.getLogger(DefaultQueryInvoker::class.java) 33 34 /** 35 * Map of query class to its handler for fast lookup 36 */ 37 private val handlerMap: Map<Class<*>, QueryHandler<*, *>> = buildHandlerMap() 38 39 private fun buildHandlerMap(): Map<Class<*>, QueryHandler<*, *>> { 40 val map = mutableMapOf<Class<*>, QueryHandler<*, *>>() 41 42 queryHandlers.forEach { handler -> 43 val queryClass = extractQueryClass(handler) 44 if (queryClass != null) { 45 if (map.containsKey(queryClass)) { 46 throw IllegalStateException( 47 "Multiple handlers found for query: ${queryClass.simpleName}" 48 ) 49 } 50 map[queryClass] = handler 51 logger.info("Registered query handler: ${handler::class.simpleName} for ${queryClass.simpleName}") 52 } else { 53 logger.warn("Could not determine query type for handler: ${handler::class.simpleName}") 54 } 55 } 56 57 return map 58 } 59 60 private fun extractQueryClass(handler: QueryHandler<*, *>): Class<*>? { 61 // Look through all generic interfaces 62 val genericInterfaces = handler::class.java.genericInterfaces 63 64 for (genericInterface in genericInterfaces) { 65 if (genericInterface is ParameterizedType && 66 genericInterface.rawType == QueryHandler::class.java 67 ) { 68 val typeArguments = genericInterface.actualTypeArguments 69 if (typeArguments.isNotEmpty()) { 70 return typeArguments[0] as? Class<*> 71 } 72 } 73 } 74 75 return null 76 } 77 78 @Transactional(readOnly = true) 79 @Suppress("UNCHECKED_CAST") 80 override fun <R> invoke(query: Query<R>): R { 81 // Set up MDC for request tracing (optional for queries, but useful for debugging) 82 val existingRequestId = MDC.get("requestId")
Here usually there is non-null requestId because events are dispatched from our command.
But there are ocassions something has happended from other domain (a domain event) that needs to be noticed from our domain service, in that case a event is dispatched via a controller endpoint and requestId is now being null:
83 val requestId = existingRequestId ?: UUID.randomUUID().toString() 84 85 if (existingRequestId == null) { 86 MDC.put("requestId", requestId) 87 } 88 89 try { 90 val handler = handlerMap[query::class.java] as? QueryHandler<Query<R>, R> 91 ?: throw IllegalArgumentException( 92 "No handler found for query: ${query::class.simpleName}. " + 93 "Available handlers: ${handlerMap.keys.map { it.simpleName }}" 94 ) 95 96 logger.debug("Executing query: ${query::class.simpleName} with requestId: $requestId") 97 98 val result = handler.handle(query) 99 100 logger.debug("Query completed: ${query::class.simpleName}") 101 102 return result 103 } catch (e: Exception) { 104 logger.error("Query failed: ${query::class.simpleName}, error: ${e.message}", e) 105 throw e 106 } finally { 107 // Only clear MDC if we created it 108 if (existingRequestId == null) { 109 MDC.remove("requestId") 110 } 111 } 112 } 113}
4. Event Listeners for Side Effects
4.1. Policies
4.1.1. Concrete Example
As mentioned each side effect must be carried out from Policys in order not to burry the side effect logic into the sea of uncontrolled and exploded number of services.
Essentially such a policy is as simple as a event listener defined as the following exmaple:
package dev.james.alicetimetable.domain.context.timetable.policy import dev.james.alicetimetable.domain.context.timetable.commandhandler.* import dev.james.alicetimetable.domain.context.timetable.command.* import dev.james.alicetimetable.domain.context.timetable.IHasPackageId import dev.james.alicetimetable.domain.context.timetable.TimetableDomainEvent import dev.james.alicetimetable.infra.CommandInvoker import org.springframework.context.event.EventListener import org.springframework.stereotype.Component @Component class ResetClassNumbersPolicy( private val commandInvoker: CommandInvoker, private val resetClassNumbersHandler: ResetClassNumbersHandler, ) { @EventListener fun resetClassNumbersOn(event_: TimetableDomainEvent.GroupOfClassesRemovedEvent) { val event = event_ as IHasPackageId resetClassNumbersOfPackage(event) } @EventListener fun resetClassNumbersOn(event_: TimetableDomainEvent.SingleClassRemovedEvent) { val event = event_ as IHasPackageId resetClassNumbersOfPackage(event) } @EventListener fun resetClassNumbersOn(event_: TimetableDomainEvent.ClassMovedEvent) { val event = event_ as IHasPackageId resetClassNumbersOfPackage(event) } @EventListener fun resetClassNumbersOn(event_: TimetableDomainEvent.ClassesCreatedEvent) { val event = event_ as IHasPackageId resetClassNumbersOfPackage(event) } @EventListener fun resetClassNumbersOn(event_: TimetableDomainEvent.ClassesRemovedEvent) { val event = event_ as IHasPackageId resetClassNumbersOfPackage(event) } @EventListener fun resetClassNumbersOn(event_: TimetableDomainEvent.ClassDuplicatedEvent) { val event = event_ as IHasPackageId resetClassNumbersOfPackage(event) } private fun resetClassNumbersOfPackage(event: IHasPackageId) { val packageId = event.packageId val cmd = ResetClassNumbersCommand(packageId = packageId) commandInvoker.invoke(resetClassNumbersHandler, cmd) } }
4.1.2. Advantages
As you can see, there are so many reasons we need to reset the numbering of classes in a timetable system of an art school.
Now
-
From documentation point of view we know all the reasons why we need to reset the class numbers, and
-
When one of the cases need special treatment (not managable by the shared logic
resetClassNumbersOfPackage), we can immediately adjust one of the event listener by another private function.This avoids the altering of one method breaks the functionality of another method unexpectedly. Of course this can be mitigated by writing comprehensive test case (if any).
-
Now we can write test cases using the event produced by our command handler. We will discuss writing tests in the next article.
4.2. DomainEventLogger
Side effect means a mutation of the state of the system, and event logging is no exception.
package com.scriptmanager.domain.infrastructure import com.scriptmanager.common.entity.Event import com.scriptmanager.common.utils.JsonNodeUtil import com.scriptmanager.repository.EventRepository import org.slf4j.MDC import org.springframework.context.ApplicationEventPublisher import org.springframework.context.event.EventListener import org.springframework.stereotype.Component import org.springframework.transaction.annotation.Propagation import org.springframework.transaction.annotation.Transactional import org.springframework.transaction.event.TransactionPhase import org.springframework.transaction.event.TransactionalEventListener import java.util.* @Component class DomainEventLogger( private val eventRepository: EventRepository, private val applicationEventPublisher: ApplicationEventPublisher, ) { @EventListener @Transactional(propagation = Propagation.MANDATORY) // Join existing transaction fun recordSynchronousEvent(wrapperEvent: EventWrapper<Any>) { if (wrapperEvent.timing != DispatchTiming.IMMEDIATE) { return } try { // Immediate event audit with precise timing // This runs in the SAME transaction as the command persistEventWithPreciseTiming(wrapperEvent) } catch (e: Exception) { // Log audit failure but don't break the flow println("Warning: Failed to persist synchronous event: ${e.message}") e.printStackTrace() } } @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) @Transactional(propagation = Propagation.REQUIRES_NEW) fun recordTransactionalEvent(wrapperEvent: EventWrapper<Any>) { if (wrapperEvent.timing != DispatchTiming.POST_COMMIT) { return } try { // For post-commit events, persist with precise timing persistEventWithPreciseTiming(wrapperEvent) applicationEventPublisher.publishEvent(wrapperEvent.event) } catch (e: Exception) { // Log the error but don't let event logging failures break the application println("Warning: Failed to persist or publish transactional event: ${e.message}") e.printStackTrace() } } private fun persistEventWithPreciseTiming(wrappedEvent: EventWrapper<Any>) { val event = wrappedEvent.event val ctx = wrappedEvent.context val requestID = try { UUID.fromString(MDC.get("requestId") ?: ctx?.requestId).toString() } catch (e: Exception) { "" } val userEmail = "me" // Detect which command dispatched this event val eventTypeName = event::class.simpleName ?: "" val commandAwareEventType = if (ctx?.commandName != null) { "${ctx.commandName} > $eventTypeName" } else { detectCommandOrigin(eventTypeName) } // Get unique timestamp in milliseconds (no decimals) val baseTimestamp = System.currentTimeMillis() val nanoOffset = (System.nanoTime()%1000).toInt() // Use last 3 digits of nanos for uniqueness val uniqueTimestamp = baseTimestamp + nanoOffset val eventJsonNode = JsonNodeUtil.toJsonNode(event) val eventToSave = Event( createdAt = uniqueTimestamp.toDouble(), // Convert to Double for database eventType = commandAwareEventType, payload = eventJsonNode.toString(), requestUserEmail = userEmail, requestId = requestID ) // Save immediately with precise timing eventRepository.save(eventToSave) println("AUDIT: Event saved immediately with createdAt = $uniqueTimestamp") } private fun persistEvent(wrappedEvent: EventWrapper<Any>) { val event = wrappedEvent.event val ctx = wrappedEvent.context val requestID = try { UUID.fromString(MDC.get("requestId") ?: ctx?.requestId).toString() } catch (e: Exception) { "" } // MDC.set("userEmail") has been executed when we call the command // this event may be handled by another thread when it is transactional event listener val userEmail = "me" // Detect which command dispatched this event val eventTypeName = event::class.simpleName ?: "" val commandAwareEventType = if (ctx?.commandName != null) { "${ctx.commandName} > $eventTypeName" } else { detectCommandOrigin(eventTypeName) } val eventJsonNode = JsonNodeUtil.toJsonNode(event).toString() val eventToStore = Event( eventType = commandAwareEventType, payload = eventJsonNode, requestUserEmail = userEmail, requestId = requestID ) eventRepository.save(eventToStore) } private fun detectCommandOrigin(eventTypeName: String): String { return try { // Get the current stack trace val stackTrace = Thread.currentThread().stackTrace // Look for command handler classes in the stack trace for (stackElement in stackTrace) { val className = stackElement.className // Look for the specific command handler package structure if (className.contains("dev.james.alicetimetable.domain.context.timetable.commandHandler") && className.endsWith("Handler") ) { // Extract the command name from handler name // e.g., "MoveClassHandler" -> "MoveClassCommand" val handlerName = className.substringAfterLast(".") val commandName = handlerName.replace("Handler", "Command") return "$commandName > $eventTypeName" } // Also look for user context command handlers if (className.contains("dev.james.alicetimetable.domain.context.user.commandHandler") && className.endsWith("Handler") ) { val handlerName = className.substringAfterLast(".") val commandName = handlerName.replace("Handler", "Command") return "$commandName > $eventTypeName" } // Also look for notification context command handlers if (className.contains("dev.james.alicetimetable.domain.context.notification.commandHandler") && className.endsWith("Handler") ) { val handlerName = className.substringAfterLast(".") val commandName = handlerName.replace("Handler", "Command") return "$commandName > $eventTypeName" } } // If no command handler found, return just the event name eventTypeName } catch (e: Exception) { // If anything goes wrong, just return the event name eventTypeName } } }











