Dependencies
dependencies { implementation("org.springframework.boot:spring-boot-starter-data-redis") }
application.yml
spring: data: redis: host: XXX.upstash.io port: 6379 password: AdIpAAIjcDEzYTcXXX database: 0 ssl: enabled: true
Implementation of the Lock
Essentially it is implemented by using setnx
(set value if not exists), a standard command in redis.
package com.billie.payment.util import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.delay import kotlinx.coroutines.reactor.awaitSingle import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeoutOrNull import org.springframework.data.redis.core.ReactiveRedisTemplate import org.springframework.stereotype.Component @Component class DistributiveLock( private val redisTemplate: ReactiveRedisTemplate<String, String>, ) { suspend private fun setIfNotExists(key: String, value: String): Boolean { return redisTemplate.opsForValue().setIfAbsent(key, value).awaitSingle() ?: false } suspend fun acquireLock(id: String, lockTimeout: Long = 30): Boolean { return setIfNotExists(id, id) } suspend fun releaseLock(id: String) { redisTemplate.delete(id).awaitSingle() } suspend fun isLocked(id: String): Boolean { return redisTemplate.hasKey(id).awaitSingle() } suspend fun waitForLock(id: String, timeout: Long = 10000): Boolean { return withTimeoutOrNull(timeout) { while (!acquireLock(id)) { delay(100) } true } ?: false } }
Usage
Handle Stripe's Repeated Event
Stripe happends to send the same event repeatedly to our backend.
Here our domain event TeamplanAddedEvent
is triggered by the Stripe event twice (as indicated by the same stripe_event_id
with 3 seconds after one another). To handle this:
-
Concurrency Checking. We create distributive lock to disable processing the same event within short period (even concurrenly).
-
Database History Checking. We mark our
order
as completed byorderId
. Next, we check that iforder.status
is adjusted in the table, if so, we don't process the event.
Since we wish to handle specific event at most once, we have the next code block:
Application
@Service class StripeEventApplicationService( private val distributiveLock: DistributiveLock ) { suspend fun handleStripeEvent(event: Event) = coroutineScope { when (event.type) { PaymentEventType.SUBSCRIPTION_UPDATED.code -> { logger.info { event.id } logger.info { "SUBSCRIPTION_UPDATED event: $event" } val (orderId, subscriptionId) = stripeService.getOrderIdAndSubscripionIdFromSubscriptionUpdatedEvent(event) if (orderId == null) { return@coroutineScope } if (!distributiveLock.acquireLock(orderId)) { return@coroutineScope } ... } } } }