0%
August 3, 2024

Subscription Query and Error Handling of API Calls

axon-framework

kotlin

springboot

Preface

  • Almost every request from frontend will require the latest information from backend.

  • Therefore it is crucial to implement a "waiting mechanism" to await for the eventual outcome of multiple events triggered by a single command.

API Calls in Aggregate, Regular EventHandler or Saga (SagaEventHandler)?

Reference
Important Takeaways

If you want the command to fail in case the call fails, you should do it on the command side, or with a subscribing event handler

No Please, Don't try to split commands and events in different folders

  • Most of the courses in the internet suggest splitting commands and events in two separate files or packages, which I strongly disagree.

  • Commands and Events usually spawn in pair, most of them are sequential, naming the commands and events by Step1, Step2, ... alone is not enough.

  • If the order needs to be rearranged, we are going to change the naming of classes in two files, why not just group all of them into one file?

    We suggest the following pattern instead!

    import com.machingclee.payment.enums.PaymentPlan
    import org.axonframework.modelling.command.TargetAggregateIdentifier
    
    class CommandAndEvents {
        class SubscriptionPlanOrder {
            class Step1 {
                data class CreateCustomerCommand(
                    @TargetAggregateIdentifier
                    val orderId: String,
                    val userEmail: String,
                    val userName: String,
                    val paymentPlan: PaymentPlan,
                    val numOfPersons: Int
                )
    
                data class CustomerCreatedEvent(
                    val orderId: String,
                    val userEmail: String,
                    val paymentPlan: PaymentPlan,
                    val numOfPersons: Int,
                    val customerId: String
                )
    
                class Failed {
                    data class CancelCreateCustomerCommand(
                        @TargetAggregateIdentifier
                        val orderId: String,
                        val reason: String
                    )
    
                    data class CreateCustomerCancelledEvent(
                        val orderId: String,
                        val reason: String
                    )
                }
            }
    
            class Step2 {
                data class OrderSubscriptionPlanCommand(
                    @TargetAggregateIdentifier
                    val orderId: String,
                )
    
                data class SubscriptionPlanOrderedEvent(
                    val orderId: String,
                    val email: String
                )
    
                class Failed {
                    data class CancelOrderPlanCommand(
                        @TargetAggregateIdentifier
                        val orderId: String,
                        val reason: String
                    )
    
                    data class OrderPlanCancelledEvent(
                        val orderId: String,
                        val reason: String
                    )
                }
            }
    
            class Step3 {
                data class CreateSessionIdCommand(
                    @TargetAggregateIdentifier
                    val orderId: String,
                )
    
                data class SessionIdCreatedEvent(
                    val orderId: String,
                    val sessionId: String
                )
    
                class Failed {
                    data class CancelCreateSessionIdCommand(
                        @TargetAggregateIdentifier
                        val orderId: String,
                        val reason: String
                    )
    
                    data class CreateSessionIdCancelledEvent(
                        val orderId: String,
                        val reason: String
                    )
                }
    
            }
    
            class Step4 {
                data class DoStripePaymentCommand(
                    @TargetAggregateIdentifier
                    val orderId: String,
                )
    
                data class StripePaymentDoneEvent(
                    val orderId: String,
                )
    
    
                class Failed {
                    data class CancelStripePaymentCommand(
                        @TargetAggregateIdentifier
                        val orderId: String,
                        val reason: String
                    )
    
                    data class StripePaymentCancelledCommand(
                        val orderId: String,
                        val reason: String
                    )
                }
    
    
            }
    
            class Step5 {
                data class SAdjustDBPermissionCommand(
                    @TargetAggregateIdentifier
                    val orderId: String,
                )
    
                data class DBPermissionAdjustedEvent(
                    val orderId: String,
                )
    
                class Failed {
                    data class CancelAdjustDBPermissionCommand(
                        @TargetAggregateIdentifier
                        val orderId: String,
                        val reason: String
                    )
    
                    data class AdjustDBPermissionCancelledEvent(
                        val orderId: String,
                        val reason: String
                    )
                }
            }
        }
    }

    In this way our CQRS flow is much more managible.

Controller Side with Subscription Query

import com.machingclee.db.tables.pojos.Stripeorder
import com.machingclee.payment.command.CommandAndEvents
import com.machingclee.payment.dto.OrderSubscriptionPlanDto
import com.machingclee.payment.model.UserContext
import com.machingclee.payment.query.CheckoutOrderQuery
import com.machingclee.payment.service.DbService
import org.axonframework.commandhandling.gateway.CommandGateway
import org.axonframework.messaging.responsetypes.ResponseTypes
import org.axonframework.queryhandling.QueryGateway
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController

@RestController
@RequestMapping("/plan")
class SubscriptionPlanController(
    private val commandGateway: CommandGateway,
    private val queryGateway: QueryGateway,
    private val dbService: DbService
) {
    // diagram: https://miro.com/app/board/uXjVKubCBHc=/
    @PostMapping("/purchase")
    fun orderPlan(
        @RequestBody orderSubscriptionPlanDto: OrderSubscriptionPlanDto
    ): Stripeorder {
        val orderId = dbService.generateULIDasUUID()
        val user = UserContext.instance.getUser()
        val queryResult = queryGateway.subscriptionQuery(
            CheckoutOrderQuery(
                orderId = orderId
            ),
            ResponseTypes.instanceOf(Stripeorder::class.java),
            ResponseTypes.instanceOf(Stripeorder::class.java),
        )
        queryResult.use { queryResult ->
            commandGateway.send<String>(
                CommandAndEvents.SubscriptionPlanOrder.Step1.CreateCustomerCommand(
                    orderId = orderId,
                    userEmail = user.email,
                    userName = user.name,
                    paymentPlan = orderSubscriptionPlanDto.plan,
                    numOfPersons = orderSubscriptionPlanDto.numOfPersons ?: 0
                )
            )
            val result = queryResult.updates().blockFirst()
                ?: throw Exception("Stripe order cannot be found, orderId=$orderId")

            return result
        }

        // subscription query here
    }
}

CommandHandler Side

1@Aggregate
2class SubscriptionPlanOrderAggregate() {
3    @AggregateIdentifier
4    private var orderId: String? = null
5    private var userEmail: String? = null
6    private var customerId: String? = null
7    private var paymentPlan: PaymentPlan? = null
8    private var numOfPersons: Int? = null
9    private var orderStarted: Boolean = false
10    private var stripeSessionId: String? = null
11
12    companion object {
13        var logger: KLogger = KotlinLogging.logger {}
14    }
15
16    private final fun cancelCheckoutSubscriptionQuery(
17        queryUpdateEmitter: QueryUpdateEmitter,
18        orderId: String,
19        e: Exception
20    ) {
21        queryUpdateEmitter.completeExceptionally(
22            CheckoutOrderQuery::class.java, { query ->
23                query.orderId == orderId
24            }, e
25        )
26    }
  • Here we define cancelCheckoutSubscriptionQuery and propagate the exception via queryResult channel. This channel is identified by the query object CheckoutOrderQuery and the orderId parameter.

  • Now the sessoinId is supposed to be published to the queryResult channel by the sagaEventHandler handling Step3.SessionIdCreatedEvent.

  • Commands that potentially cause Exception should be wrapped by a try-catch (for example, calling Stripe API via its SDK).

Let's study the try-catch blocks below:

27    @CommandHandler
28    constructor (
29        cmd: Step1.CreateCustomerCommand,
30        stripeService: StripeService,
31        queryUpdateEmitter: QueryUpdateEmitter
32    ) : this() {
33        try {
34            logger.info {
35                "CreateCustomerCommand"
36            }
37
38            val stripeCustomerId = stripeService.createCustomer(
39                CreateCustomerDto(
40                    email = cmd.userEmail,
41                    name = cmd.userName
42                )
43            )
44            val event = Step1.CustomerCreatedEvent(
45                orderId = cmd.orderId,
46                userEmail = cmd.userEmail,
47                paymentPlan = cmd.paymentPlan,
48                numOfPersons = cmd.numOfPersons,
49                customerId = stripeCustomerId
50            )
51            apply(event)
52        } catch (e: Exception) {
53            cancelCheckoutSubscriptionQuery(queryUpdateEmitter, cmd.orderId, e)
54            apply(
55                Step1.Failed.CreateCustomerCancelledEvent(
56                    orderId = cmd.orderId,
57                    reason = e.message ?: ""
58                )
59            )
60        }
61    }
62
63    @EventSourcingHandler
64    fun on(event: Step1.CustomerCreatedEvent) {
65        this.orderId = event.orderId
66        this.userEmail = event.userEmail
67        this.paymentPlan = event.paymentPlan
68        this.numOfPersons = event.numOfPersons
69        this.customerId = event.customerId
70
71    }
72
73    @CommandHandler
74    fun handle(cmd: Step2.OrderSubscriptionPlanCommand) {
75        logger.info {
76            "OrderSubscriptionPlanCommand"
77        }
78        apply(
79            Step2.SubscriptionPlanOrderedEvent(
80                orderId = this.orderId!!,
81                email = this.userEmail!!
82            )
83        )
84    }
85
86    @EventSourcingHandler
87    fun on(e: Step2.SubscriptionPlanOrderedEvent) {
88        orderStarted = true
89    }
90
91    @CommandHandler
92    fun handle(
93        cmd: Step3.CreateSessionIdCommand,
94        stripeService: StripeService,
95        queryUpdateEmitter: QueryUpdateEmitter
96    ) {
97        logger.info {
98            "CreateSessionIdCommand"
99        }
100        val plan = paymentPlan ?: throw Exception("Payment plan cannot be null")
101        try {
102            val sessionId = stripeService.createSession(
103                this.customerId!!,
104                PriceId.fromPaymentPlan(plan)!!
105            )
106            apply(
107                Step3.SessionIdCreatedEvent(
108                    orderId = this.orderId!!,
109                    sessionId = sessionId
110                )
111            )
112        } catch (e: Exception) {
113            // compensating command
114            cancelCheckoutSubscriptionQuery(queryUpdateEmitter, cmd.orderId, e)
115            apply(
116                Step3.Failed.CreateSessionIdCancelledEvent(
117                    orderId = cmd.orderId,
118                    reason = e.message ?: ""
119                )
120            )
121        }
122    }
123}
  • Note that we also dispatch the related compensating events with reason.

  • Some entity in our database may also record the error arised. thus we save all of them in database in regular eventHandler's.

  • We dispatch compensating actions of the previous command via sagaEventHandler.

    For example, if Step3.DoSomethingCommand fails, then we dispatch the following chain of actions:

  • In regular eventHandler we handle the state change for error messages (if any).

  • In the last step we also end the lifecycle of saga.

  • Not only that, we listen on Step1.Failed.AnotherThingCancelledEvent in regular event handler to send alert to stakeholders.