Preface
-
Almost every request from frontend will require the latest information from backend.
-
Therefore it is crucial to implement a "waiting mechanism" to await for the eventual outcome of multiple events triggered by a single command.
API Calls in Aggregate, Regular EventHandler or Saga (SagaEventHandler)?
Reference
Important Takeaways
If you want the command to fail in case the call fails, you should do it on the command side, or with a subscribing event handler
No Please, Don't try to split commands and events in different folders
-
Most of the courses in the internet suggest splitting commands and events in two separate files or packages, which I strongly disagree.
-
Commands and Events usually spawn in pair, most of them are sequential, naming the commands and events by Step1, Step2, ... alone is not enough.
-
If the order needs to be rearranged, we are going to change the naming of classes in two files, why not just group all of them into one file?
We suggest the following pattern instead!
import com.machingclee.payment.enums.PaymentPlan import org.axonframework.modelling.command.TargetAggregateIdentifier class CommandAndEvents { class SubscriptionPlanOrder { class Step1 { data class CreateCustomerCommand( @TargetAggregateIdentifier val orderId: String, val userEmail: String, val userName: String, val paymentPlan: PaymentPlan, val numOfPersons: Int ) data class CustomerCreatedEvent( val orderId: String, val userEmail: String, val paymentPlan: PaymentPlan, val numOfPersons: Int, val customerId: String ) class Failed { data class CancelCreateCustomerCommand( @TargetAggregateIdentifier val orderId: String, val reason: String ) data class CreateCustomerCancelledEvent( val orderId: String, val reason: String ) } } class Step2 { data class OrderSubscriptionPlanCommand( @TargetAggregateIdentifier val orderId: String, ) data class SubscriptionPlanOrderedEvent( val orderId: String, val email: String ) class Failed { data class CancelOrderPlanCommand( @TargetAggregateIdentifier val orderId: String, val reason: String ) data class OrderPlanCancelledEvent( val orderId: String, val reason: String ) } } class Step3 { data class CreateSessionIdCommand( @TargetAggregateIdentifier val orderId: String, ) data class SessionIdCreatedEvent( val orderId: String, val sessionId: String ) class Failed { data class CancelCreateSessionIdCommand( @TargetAggregateIdentifier val orderId: String, val reason: String ) data class CreateSessionIdCancelledEvent( val orderId: String, val reason: String ) } } class Step4 { data class DoStripePaymentCommand( @TargetAggregateIdentifier val orderId: String, ) data class StripePaymentDoneEvent( val orderId: String, ) class Failed { data class CancelStripePaymentCommand( @TargetAggregateIdentifier val orderId: String, val reason: String ) data class StripePaymentCancelledCommand( val orderId: String, val reason: String ) } } class Step5 { data class SAdjustDBPermissionCommand( @TargetAggregateIdentifier val orderId: String, ) data class DBPermissionAdjustedEvent( val orderId: String, ) class Failed { data class CancelAdjustDBPermissionCommand( @TargetAggregateIdentifier val orderId: String, val reason: String ) data class AdjustDBPermissionCancelledEvent( val orderId: String, val reason: String ) } } } }
In this way our CQRS flow is much more managible.
Controller Side with Subscription Query
import com.machingclee.db.tables.pojos.Stripeorder import com.machingclee.payment.command.CommandAndEvents import com.machingclee.payment.dto.OrderSubscriptionPlanDto import com.machingclee.payment.model.UserContext import com.machingclee.payment.query.CheckoutOrderQuery import com.machingclee.payment.service.DbService import org.axonframework.commandhandling.gateway.CommandGateway import org.axonframework.messaging.responsetypes.ResponseTypes import org.axonframework.queryhandling.QueryGateway import org.springframework.web.bind.annotation.PostMapping import org.springframework.web.bind.annotation.RequestBody import org.springframework.web.bind.annotation.RequestMapping import org.springframework.web.bind.annotation.RestController @RestController @RequestMapping("/plan") class SubscriptionPlanController( private val commandGateway: CommandGateway, private val queryGateway: QueryGateway, private val dbService: DbService ) { // diagram: https://miro.com/app/board/uXjVKubCBHc=/ @PostMapping("/purchase") fun orderPlan( @RequestBody orderSubscriptionPlanDto: OrderSubscriptionPlanDto ): Stripeorder { val orderId = dbService.generateULIDasUUID() val user = UserContext.instance.getUser() val queryResult = queryGateway.subscriptionQuery( CheckoutOrderQuery( orderId = orderId ), ResponseTypes.instanceOf(Stripeorder::class.java), ResponseTypes.instanceOf(Stripeorder::class.java), ) queryResult.use { queryResult -> commandGateway.send<String>( CommandAndEvents.SubscriptionPlanOrder.Step1.CreateCustomerCommand( orderId = orderId, userEmail = user.email, userName = user.name, paymentPlan = orderSubscriptionPlanDto.plan, numOfPersons = orderSubscriptionPlanDto.numOfPersons ?: 0 ) ) val result = queryResult.updates().blockFirst() ?: throw Exception("Stripe order cannot be found, orderId=$orderId") return result } // subscription query here } }
CommandHandler Side
1@Aggregate 2class SubscriptionPlanOrderAggregate() { 3 @AggregateIdentifier 4 private var orderId: String? = null 5 private var userEmail: String? = null 6 private var customerId: String? = null 7 private var paymentPlan: PaymentPlan? = null 8 private var numOfPersons: Int? = null 9 private var orderStarted: Boolean = false 10 private var stripeSessionId: String? = null 11 12 companion object { 13 var logger: KLogger = KotlinLogging.logger {} 14 } 15 16 private final fun cancelCheckoutSubscriptionQuery( 17 queryUpdateEmitter: QueryUpdateEmitter, 18 orderId: String, 19 e: Exception 20 ) { 21 queryUpdateEmitter.completeExceptionally( 22 CheckoutOrderQuery::class.java, { query -> 23 query.orderId == orderId 24 }, e 25 ) 26 }
-
Here we define
cancelCheckoutSubscriptionQuery
and propagate the exception viaqueryResult
channel. This channel is identified by the query objectCheckoutOrderQuery
and theorderId
parameter. -
Now the
sessoinId
is supposed to be published to thequeryResult
channel by thesagaEventHandler
handlingStep3.SessionIdCreatedEvent
. -
Commands that potentially cause
Exception
should be wrapped by atry-catch
(for example, calling Stripe API via its SDK).
Let's study the try-catch blocks below:
27 @CommandHandler 28 constructor ( 29 cmd: Step1.CreateCustomerCommand, 30 stripeService: StripeService, 31 queryUpdateEmitter: QueryUpdateEmitter 32 ) : this() { 33 try { 34 logger.info { 35 "CreateCustomerCommand" 36 } 37 38 val stripeCustomerId = stripeService.createCustomer( 39 CreateCustomerDto( 40 email = cmd.userEmail, 41 name = cmd.userName 42 ) 43 ) 44 val event = Step1.CustomerCreatedEvent( 45 orderId = cmd.orderId, 46 userEmail = cmd.userEmail, 47 paymentPlan = cmd.paymentPlan, 48 numOfPersons = cmd.numOfPersons, 49 customerId = stripeCustomerId 50 ) 51 apply(event) 52 } catch (e: Exception) { 53 cancelCheckoutSubscriptionQuery(queryUpdateEmitter, cmd.orderId, e) 54 apply( 55 Step1.Failed.CreateCustomerCancelledEvent( 56 orderId = cmd.orderId, 57 reason = e.message ?: "" 58 ) 59 ) 60 } 61 } 62 63 @EventSourcingHandler 64 fun on(event: Step1.CustomerCreatedEvent) { 65 this.orderId = event.orderId 66 this.userEmail = event.userEmail 67 this.paymentPlan = event.paymentPlan 68 this.numOfPersons = event.numOfPersons 69 this.customerId = event.customerId 70 71 } 72 73 @CommandHandler 74 fun handle(cmd: Step2.OrderSubscriptionPlanCommand) { 75 logger.info { 76 "OrderSubscriptionPlanCommand" 77 } 78 apply( 79 Step2.SubscriptionPlanOrderedEvent( 80 orderId = this.orderId!!, 81 email = this.userEmail!! 82 ) 83 ) 84 } 85 86 @EventSourcingHandler 87 fun on(e: Step2.SubscriptionPlanOrderedEvent) { 88 orderStarted = true 89 } 90 91 @CommandHandler 92 fun handle( 93 cmd: Step3.CreateSessionIdCommand, 94 stripeService: StripeService, 95 queryUpdateEmitter: QueryUpdateEmitter 96 ) { 97 logger.info { 98 "CreateSessionIdCommand" 99 } 100 val plan = paymentPlan ?: throw Exception("Payment plan cannot be null") 101 try { 102 val sessionId = stripeService.createSession( 103 this.customerId!!, 104 PriceId.fromPaymentPlan(plan)!! 105 ) 106 apply( 107 Step3.SessionIdCreatedEvent( 108 orderId = this.orderId!!, 109 sessionId = sessionId 110 ) 111 ) 112 } catch (e: Exception) { 113 // compensating command 114 cancelCheckoutSubscriptionQuery(queryUpdateEmitter, cmd.orderId, e) 115 apply( 116 Step3.Failed.CreateSessionIdCancelledEvent( 117 orderId = cmd.orderId, 118 reason = e.message ?: "" 119 ) 120 ) 121 } 122 } 123}
-
Note that we also dispatch the related compensating events with reason.
-
Some entity in our database may also record the error arised. thus we save all of them in database in regular
eventHandler
's. -
We dispatch compensating actions of the previous command via
sagaEventHandler
.For example, if
Step3.DoSomethingCommand
fails, then we dispatch the following chain of actions: -
In regular
eventHandler
we handle the state change for error messages (if any). -
In the last step we also end the lifecycle of
saga
. -
Not only that, we listen on
Step1.Failed.AnotherThingCancelledEvent
in regular event handler to send alert to stakeholders.