1implementation("org.springframework.boot:spring-boot-starter-data-redis")
1import kotlinx.coroutines.delay
2import kotlinx.coroutines.reactor.awaitSingle
3import kotlinx.coroutines.reactor.awaitSingleOrNull
4import org.springframework.data.redis.core.ReactiveRedisTemplate
5import org.springframework.stereotype.Service
6
7
8@Service
9class RedisService(
10 private val redisTemplate: ReactiveRedisTemplate<String, String>
11) {
12 suspend fun leftPush(key: String, value: String): Long? {
13 return redisTemplate.opsForList().leftPush(key, value).awaitSingle()
14 }
15
16 suspend fun blockRightPop(key: String): String? {
17 var value: String?
18 do {
19 value = redisTemplate.opsForList().rightPop(key).awaitSingleOrNull()
20 if (value == null) {
21 delay(500)
22 }
23 } while (value == null)
24 return value
25 }
26}
1@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
2@ActiveProfiles("dbmigration")
3class DbMigrationTests {
4 @Test
5 fun insert() {
6 runBlocking {
7 repeat(5) { index ->
8 redisService.leftPush("LIST", "$index")
9 }
10 }
11 }
12
13 @Test
14 fun awaitForList() = runBlocking {
15 println("Waiting ...")
16 brpop@ while (true) {
17 val value = redisService.blockRightPop("LIST")
18 println("poped value $value")
19 if (value === "ENDED") {
20 break@brpop
21 }
22 }
23 }
24}