0%
November 30, 2024

Redis Task Distribution in Kotlin

springboot

Dependencies

implementation("org.springframework.boot:spring-boot-starter-data-redis")

RedisService

import kotlinx.coroutines.delay
import kotlinx.coroutines.reactor.awaitSingle
import kotlinx.coroutines.reactor.awaitSingleOrNull
import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.stereotype.Service


@Service
class RedisService(
    private val redisTemplate: ReactiveRedisTemplate<String, String>
) {
    suspend fun leftPush(key: String, value: String): Long? {
        return redisTemplate.opsForList().leftPush(key, value).awaitSingle()
    }

    suspend fun blockRightPop(key: String): String? {
        var value: String?
        do {
            value = redisTemplate.opsForList().rightPop(key).awaitSingleOrNull()
            if (value == null) {
                delay(500)
            }
        } while (value == null)
        return value
    }
}

Demonstration

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ActiveProfiles("dbmigration")
class DbMigrationTests {
   @Test
   fun insert() {
       runBlocking {
           repeat(5) { index ->
               redisService.leftPush("LIST", "$index")
           }
       }
   }

   @Test
   fun awaitForList() = runBlocking {
       println("Waiting ...")
       brpop@ while (true) {
           val value = redisService.blockRightPop("LIST")
           println("poped value $value")
           if (value === "ENDED") {
               break@brpop
           }
       }
   }
}