For buffered channel approach the reader may refer to this article. This time we dicuss another native API from kotlin called Semaphore.
1import kotlinx.coroutines.* 2import kotlinx.coroutines.sync.Semaphore 3import kotlinx.coroutines.sync.withPermit 4 5 6class SomeConcurrenTask( 7 private val transactionTemplate: TransactionTemplate, 8) { 9 // divide tasks into batches with each being of size at most 20 10 val taskIdsBatches = taskIds.chunked(20) 11 // concurrnet limit to 10 12 val semaphore = Semaphore(10) 13 14 runBlocking { 15 val deferredResults = taskIdsBatches.mapIndexed { batchIndex, taskIdsBatch -> 16 async(Dispatchers.IO) { 17 semaphore.withPermit { 18 println("Processing batch $batchIndex/${taskIdsBatches.size}") 19 transactionTemplate.execute { 20 someTransaction(taskIdsBatch) 21 } 22 } 23 } 24 } 25 26 println("Waiting for everything to finish ...") 27 deferredResults.awaitAll() 28 println("Finished!") 29 } 30}







