This article has participated in the good article call order activity, click to see: back end, big front end double track submission, 20,000 yuan prize pool for you to challenge!

Many interview questions are asked about multi-threading. For example:

“There are multiple parallel tasks such as Task1 and Task2. How to execute Task3 after all tasks are completed?”

In Kotlin, we have a variety of implementations, all of which are summarized in this article.

1. Thread.join

2. Synchronized

3. ReentrantLock

4. BlockingQueue

5. CountDownLatch

6. CyclicBarrier

7. CAS

8. Future

9. CompletableFuture

10. Rxjava

11. Coroutine

12. Flow

Task3 is a string based on the results returned by Task1 and Task2. Each Task is simulated by sleep:

val task1: () -> String = {
    sleep(2000)
    "Hello".also { println("task1 finished: $it")}}val task2: () -> String = {
    sleep(2000)
    "World".also { println("task2 finished: $it")}}val task3: (String, String) -> String = { p1, p2 ->
    sleep(2000)
    "$p1 $p2".also { println("task3 finished: $it")}}Copy the code


1. Thread.join()

Kotlin is Java compliant, and all Java threading tools are available by default. The simplest way to synchronize threads is to use Thread’s join() :

@Test
fun test_join(a) {
    lateinit var s1: String
    lateinit var s2: String

    val t1 = Thread { s1 = task1() }
    val t2 = Thread { s2 = task2() }
    t1.start()
    t2.start()

    t1.join()
    t2.join()
    
    task3(s1, s2)

}
Copy the code


2. Synchronized

Use synchronized locks for synchronization

	@Test
    fun test_synchrnoized(a) {
        lateinit var s1: String
        lateinit var s2: String

        Thread {
            synchronized(Unit) {
                s1 = task1()
            }
        }.start()
        s2 = task2()

        synchronized(Unit) {
            task3(s1, s2)
        }

    }
Copy the code

However, if there are more than three tasks, synchrnoized is a tricky way to write: n locks are declared and n synchronized are nested in order to synchronize the results of multiple parallel tasks.


3. ReentrantLock

ReentrantLock is a thread lock provided by JUC that can replace the use of synchronized

	@Test
    fun test_ReentrantLock(a) {

        lateinit var s1: String
        lateinit var s2: String

        val lock = ReentrantLock()
        Thread {
            lock.lock()
            s1 = task1()
            lock.unlock()
        }.start()
        s2 = task2()

        lock.lock()
        task3(s1, s2)
        lock.unlock()

    }
Copy the code

The benefit of ReentrantLock is that there is no synchrnoized problem when you have multiple parallel tasks, but you still need to create multiple locks to manage different tasks.

4. BlockingQueue

Blocking queues are also implemented internally by locking, so synchronous locking can also be achieved

	@Test
    fun test_blockingQueue(a) {

        lateinit var s1: String
        lateinit var s2: String

        val queue = SynchronousQueue<Unit>()

        Thread {
            s1 = task1()
            queue.put(Unit)
        }.start()

        s2 = task2()

        queue.take()
        task3(s1, s2)
    }
Copy the code

Of course, blocking queues are used more for synchronization in production/consumption scenarios.


5. CountDownLatch

Most locks in JUC are based on AQS, which can be divided into exclusive locks and shared locks. ReentrantLock is an exclusive lock. In contrast, shared locks are more suitable for this scenario. For example, CountDownLatch, which keeps one thread blocked until the execution of the other threads is complete:

	@Test
    fun test_countdownlatch(a) {

        lateinit var s1: String
        lateinit var s2: String
        val cd = CountDownLatch(2) Thread() { s1 = task1() cd.countDown() }.start() Thread() { s2 = task2() cd.countDown() }.start() cd.await() task3(s1,  s2) }Copy the code

The advantage of shared locks is that you don’t have to create a separate lock for each task, and it’s easy to write even more parallel tasks


6. CyclicBarrier

CyclicBarrier is another shared locking mechanism provided by JUC. It allows a group of threads to reach a synchronization point and then continue to run together. If any thread does not reach the synchronization point, the other arriving threads will be blocked.

Unlike CountDownLatch, which is one-time, CyclicBarrier can be reset and reused, which is why it is named Cyclic

	@Test
    fun test_CyclicBarrier(a) {

        lateinit var s1: String
        lateinit var s2: String
        val cb = CyclicBarrier(3)

        Thread {
            s1 = task1()
            cb.await()
        }.start()

        Thread() {
            s2 = task1()
            cb.await()
        }.start()

        cb.await()
        task3(s1, s2)

    }
Copy the code


7. CAS

AQS is internally synchronized through spin-locking, which essentially uses CompareAndSwap to avoid the overhead of thread blocking. Therefore, we can use the CAS based atomic class count to achieve the goal of locking free operations.

 	@Test
    fun test_cas(a) {

        lateinit var s1: String
        lateinit var s2: String

        val cas = AtomicInteger(2)

        Thread {
            s1 = task1()
            cas.getAndDecrement()
        }.start()

        Thread {
            s2 = task2()
            cas.getAndDecrement()
        }.start()

        while (cas.get() != 0) {}

        task3(s1, s2)

    }
Copy the code

While loop idling may seem like a waste of resources, but that’s the nature of spin locking, so CAS is only good for cpu-intensive short task synchronization.


volatile

When you look at the lock-free implementation of CAS, many of you may think of volatile. Can you achieve lock-free thread-safety as well?

 	@Test
    fun test_Volatile(a) {
        lateinit var s1: String
        lateinit var s2: String

        Thread {
            s1 = task1()
            cnt--
        }.start()

        Thread {
            s2 = task2()
            cnt--
        }.start()

        while(cnt ! =0) {
        }

        task3(s1, s2)

    }
Copy the code

Volatile guarantees visibility, but not atomicity. CNT — is not thread-safe and requires locking


8. Future

It is inconvenient to define two variables, S1 and S2, to record the results, whether there is a lock operation or no lock operation. Since Java 1.5, Callable and Future have been provided to return results at the end of task execution.

@Test
fun test_future(a) {

    val future1 = FutureTask(Callable(task1))
    val future2 = FutureTask(Callable(task2))

    Executors.newCachedThreadPool().execute(future1)
    Executors.newCachedThreadPool().execute(future2)

    task3(future1.get(), future2.get()}Copy the code

With future.get(), you can wait for the results to return synchronously, which is easy to write


9. CompletableFuture

Future.get () is convenient, but it blocks the thread. Java 8 introduced the CompletableFuture, which implements the CompletionStage interface along with the Future interface. CompletableFuture can logically combine multiple CompletionStages to achieve complex asynchronous programming. These logically combined methods avoid thread blocking in the form of callbacks:

@Test
fun test_CompletableFuture(a) {
    CompletableFuture.supplyAsync(task1)
        .thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->
             task3(p1, p2)
        }.join()
}
Copy the code


10. RxJava

The various operators and thread-switching capabilities provided by RxJava can also help: the ZIP operator can combine two Observable results; SubscribeOn Starts an asynchronous task

@Test
fun test_Rxjava(a) {

    Observable.zip(
        Observable.fromCallable(Callable(task1))
            .subscribeOn(Schedulers.newThread()),
        Observable.fromCallable(Callable(task2))
            .subscribeOn(Schedulers.newThread()),
        BiFunction(task3)
    ).test().awaitTerminalEvent()

}
Copy the code


11. Coroutine

So many of these are actually Java tools. Coroutine is finally Kotlin’s unique tool:

@Test
fun test_coroutine(a) {

    runBlocking {
        val c1 = async(Dispatchers.IO) {
            task1()
        }

        val c2 = async(Dispatchers.IO) {
            task2()
        }

        task3(c1.await(), c2.await())
    }
}
Copy the code

It is very comfortable to write, and it is a combination of the advantages of the previous tools.


12. Flow

Flow is a Coroutine version of RxJava with many RxJava operators, such as zip:


@Test
fun test_flow(a) {

    val flow1 = flow<String> { emit(task1()) }
    val flow2 = flow<String> { emit(task2()) }
        
    runBlocking {
         flow1.zip(flow2) { t1, t2 ->
             task3(t1, t2)
        }.flowOn(Dispatchers.IO)
        .collect()

    }

}
Copy the code

FlowOn causes tasks to compute and emit results asynchronously.


conclusion

All the ways mentioned above are just like the four ways of writing hui in anise beans. It’s not necessary to master all of them. In conclusion, the best thread synchronization scheme on Kotlin starts with coroutines!