Since Room 2.1, developers can now use the Kotlin coroutines by defining the suspend DAO function. Coroutines are exceptionally good at handling asynchronous operations, allowing you to handle time-consuming operations such as operating on a database in a natural sequence of code, rather than having to switch tasks, results, or errors back and forth between threads. With Room’s support for coroutines, you can use some of the benefits of concurrency scope, lifecycle, and nesting in your database operations.

As we added coroutine support to Room, we encountered and solved some problems that we hadn’t thought of in the coroutine model and the Android SQL API. In this article, we’ll explain some of the problems we encountered and what we did to solve them.

Unexpected problem

Take a look at the following code. It looks safe, but in fact it has a problem:

/** * TransferMoney from [Accounta] to [Accountb] */ suspend fun transferMoney(Accounta: String, Accountb: String) String, amount: Int) {// Use the IO Dispatcher, So the DB operation in IO Thread withContext (Dispatchers. IO) {database. BeginTransaction () / / in IO - began to perform transactions on the Thread Thread - 1 try {/ / A coroutine can be bound to and continue execution on any thread associated with the scheduler (in this case, Dispatchers.io). Also, since the transaction was also started in IO-Thread-1, we might just be able to execute the query successfully. If the coroutine continues to execute on IO-Thread-2, then the following code that operates on the database may cause a deadlock. It cannot continue until the execution of the IO-Thread-1 Thread has finished. moneyDao.increase(accountB, Amount) / / hang function database. SetTransactionSuccessful () / / never perform this line} finally {database. EndTransaction () / / never perform this line}}}

SQLite transactions on Android are subject to a single thread

The problem with the above code is that Android’s SQLite transactions are subject to a single thread. When a query in an ongoing transaction is executed in the current thread, it is considered part of the transaction and allowed to continue execution. But when the query is executed in another thread, it is no longer part of the transaction, which can cause the query to be blocked until the transaction completes in the other thread. This is a prerequisite for the atomicity of the BeginTransaction and EndTransaction APIs. There is no problem with the API when all the database transactions are done on a single thread, but with coroutines, the problem is that coroutines are not bound to any particular thread. In other words, the root cause of the problem is that the bound thread continues to execute after the coroutine is suspended, and there is no guarantee that it is the same thread that was bound before the suspension.

The use of database transaction operations in coroutines can cause deadlocks

Simple implementation

To address this limitation of Android SQLite, we need an API similar to RunInTransaction that accepts pending blocks of code. This API is implemented as if writing a single-threaded scheduler:

suspend fun <T> RoomDatabase.runInTransaction(
    block: suspend () -> T
): T = withContext(newSingleThreadContext("DB")) {
    beginTransaction()
    try {
        val result = block.invoke(this)
        setTransactionSuccessful()
        return@runBlocking result
    } finally {
        endTransaction()
    }
}

The above implementation is just the beginning, but when you use another scheduler in a pending block of code, it becomes a problem:

Suspend fun sendTaxRefund(federalAccount: String, taypayerList: List<Taxpayer>) { database.runInTransaction { val refundJobs = taypayerList.map { taxpayer -> coroutineScope { // Async (Dispatchers.IO) {val amount = irstoi.calculaterefund (divider) moneyda. decrease(federalAccount, Amount) moneyDAO. Increase (amount. Account, amount)}} // Wait for all computations to finish refundJobs.joinAll()}}

Because the received parameter is a pending block of code, it is possible for this part of code to use a different scheduler to start the subcoroutines, causing a different thread to perform the database operation. Therefore, a good implementation would allow the use of a standard coroutine constructor such as async, launch, or WithContext. In practice, only database operations need to be scheduled to a single transactional thread.

Introduce withTransaction

In order to solve the above problem, we constructed the withTransaction API. The withTransaction (kotlin. Coroutines. SuspendFunction0)), it simulates the withContext API, However, provided that the coroutine context is built specifically for securely performing Room transactions, you can write the code as follows:

fun transferMoney(
    accountA: String,
    accountB: String,
    amount: Int
) = GlobalScope.launch(Dispatchers.Main) {
    roomDatabase.withTransaction {
        moneyDao.decrease(accountA, amount)
        moneyDao.increase(accountB, amount)
    }
    Toast.makeText(context, "Transfer Completed.", Toast.LENGTH_SHORT).show()
}

Before delving into the implementation of the Room WithTransaction API, let’s review some of the coroutine concepts already mentioned. CoroutineContext contains the information needed to schedule the coroutine task, it carries the current CoroutineDispatcher and Job objects, as well as some additional data, but it can also be extended to include more information. An important feature of CoroutInContext is that they are inherited by subcoroutines within the same coroutine scope, such as the scope of the WithContext code block. This mechanism allows the child coroutines to continue to use the same scheduler, or if the parent coroutine is canceled, they are canceled together. Essentially, the pending transaction API provided by ROOM creates a specialized coroutine context to perform database operations under the same transaction scope.

The WithTransaction API creates three key elements in the context:

  • Single-threaded scheduler for performing database operations
  • Context elements that help the DAO function determine whether it is in a transaction;
  • ThreadContextElement, which marks the scheduling thread used in the transaction coroutine.

Transaction scheduler

The CoroutineDispatcher determines to which thread the coroutine is bound to execute. For example, Dispatchers.io uses a shared thread pool to shove operations that are likely to block, while Dispatchers.main executes coroutines on the Android Main thread. A transaction scheduler created by Room can take a single thread from Room’s Executor and distribute the transaction to that thread, rather than to a new thread created at random. This is important because Executors can be configured by users and can be used as testing tools. At the beginning of a transaction, Room gains control of a thread in the Executor until the end of the transaction. During the execution of a transaction, the database operations that have been performed are allocated to the transaction thread, even if the scheduler factor coroutine changes.

Getting a transactional thread is not a blocking operation, nor should it, because if no threads are available, a suspend operation should be performed and the caller notified to avoid disrupting the execution of the other coroutines. It also inserts a Runnable into the queue and waits for it to run, which is a sign that the thread is runnable. SuspendCancellableCoroutine function for us to set up a connection based on correction of apis and the bridge between coroutines. In this case, once the previously queued Runnable is executed, a thread is available, and we’ll use runBlocking to start an event loop to gain control of this thread. The scheduler created by runBlocking then distends the block of code that is about to be executed to the acquired thread. In addition, the JOB is used to suspend and keep the thread available until the transaction completes. Be aware that precautions should be taken if the coroutine is cancelled or the thread cannot be acquired. The code associated with getting the transaction thread is as follows:

/** * builds and returns a ContinuationInterceptor that distributes the coroutine to the acquired thread and executes the transaction. ControlJob is used to control the release of threads by canceling tasks. */ private suspend fun Executor.acquireTransactionThread( controlJob: Job ): ContinuationInterceptor = suspendCancellableCoroutine { continuation -> continuation.invokeOnCancellation { // If we fail or the task is cancelled while we are waiting for the available thread, we cannot stop the waiting action, but we can cancel the ControlJob so that once we gain control, it will be released quickly. ControlJob.cancel ()} try {execute {runBlocking {block () {block () {block () {block () {block (); Created by returning a runBlocking interceptors to restore suspendCancellableCoroutine, The interceptor will be used to intercept and distribute blocks of code to the acquired thread continuation.resume(CoroutInContext [ContinuationInterceptor]!!). // suspend the runBlocking coroutine until the ControlJob completes. Since the coroutine is empty, this will prevent runBlocking from ending immediately. controlJob.join() } } } catch (ex: RejectedExecutionException) {/ / unable to get the thread, Cancel (IllegalStateException("Unable to acquire a thread to perform the transaction.", ex))}}

Transaction context element

With the scheduler in place, we can create elements in the transaction to be added to the context, keeping references to the scheduler. If the DAO function is called within the transaction scope, the DAO function can be rerouted to the appropriate thread. The transaction elements we created are as follows:

internal class TransactionElement( private val transactionThreadControlJob: Job, internal val transactionDispatcher: ContinuationInterceptor) : CoroutInContext.Element {// The Singleton key is used to retrieve the Element Companion object key in this context: CoroutineContext.Key<TransactionElement> override val key: CoroutInContext.Key<TransactionElement> get() = TransactionElement /** * This element is used to count the number of transactions (including nested transactions). Call [acquire] to increase the count and call [release] to decrease the count. If the count reaches zero when [Release] is called, the transaction is canceled, Transaction thread will be released * / private val referenceCount = AtomicInteger (0) fun acquire () {referenceCount. IncrementAndGet ()} fun release() { val count = referenceCount.decrementAndGet() if (count < 0) { throw IllegalStateException( "Transaction was Never started or was already released.")} else if (count == 0) {// Cancelling the job that controls the transaction thread will cause it to be released transactionThreadControlJob.cancel() } } }

Acquire and release in the transactionElement function are used to trace nested transactions. Since beginTransaction and endTransaction allow nested calls, we also want to keep this feature, but we only need to release the transaction thread when the outermost transaction completes. The use of these functions will be covered later in the WithTransaction implementation.

Transaction thread flag

The last key element required to create the transactional context mentioned above is the ThreadContextElement. This element in CoroutInContext is similar to ThreadLocal in that it keeps track of whether there are ongoing transactions in the thread. This element is supported by a ThreadLocal, which sets a value on the ThreadLocal for each thread used by the scheduler to execute the coroutine code block. This value is reset once the thread has completed its task. In our example, this value is meaningless, and it is only necessary to determine whether the value exists in Room. If the coroutine context has access to a ThreadLocal

that exists on the platform, then it can issue the BEGIN/END command from any thread that the coroutine is bound to. If not, the thread can only be blocked until the transaction completes. But we still need to keep track of which transaction each blocked database method is running on, and which thread is responsible for the platform transaction.

The ThreadContextElement used in the Room’s WithTransaction API identifies the blocking function in the database. Blocking functions in Room, including those generated by the DAO, are treated specially after they are called by the transaction coroutine to ensure that they do not run on another scheduler. If your DAO has both types of functionality, you can mix and match blocking and suspending functions in a WithTransaction block. By adding a ThreadContextElement to the coroutine context and accessing it from the DAO function, we can verify that the blocking function is in the correct scope. If not, we will throw an exception instead of causing a deadlock. Later, we plan to reroute the blocking function to the transaction thread as well.

private final ThreadLocal<Integer> mSuspendingTransactionId = new ThreadLocal<>(); public void assertNotSuspendingTransaction() { if (! inTransaction() && mSuspendingTransactionId.get() ! = null) { throw new IllegalStateException("Cannot access database on a different" + " coroutine context inherited from a  suspending transaction."); }}

The combination of these three elements makes up our transaction context:

private suspend fun RoomDatabase.createTransactionContext(): CoroutineContext { val controlJob = Job() val dispatcher = queryExecutor.acquireTransactionThread(controlJob) val transactionElement = TransactionElement(controlJob, dispatcher) val threadLocalElement = suspendingTransactionId.asContextElement(controlJob.hashCode()) return dispatcher +  transactionElement + threadLocalElement }

Transaction API implementation

With the transaction context created, we were finally able to provide a secure API for performing database transactions within the coroutine. The next thing to do is combine this context with the usual begin/end transaction pattern:

suspend fun <R> RoomDatabase.withTransaction( block: suspend () -> R ): R {// If possible, use an inherited transactionContext, which allows nested pending transactions val TransactionContext = CoroutInContext [TransactionElement]? .transactionDispatcher ? : createTransactionContext() return withContext(transactionContext) { val transactionElement = coroutineContext[TransactionElement]!! TransactionElement. Acquire the try {beginTransaction () () the try {/ / in a new scope of encapsulation suspend code block, Val result = coroutineScope {block.invoke(this)} setTransactionSuccessful() return@withContext result} finally  { endTransaction() } } finally { transactionElement.release() } } }

The threading limit for SQLite on Android is reasonable, and was designed to be so before Kotlin came along. Coroutines introduce a new programming paradigm that changes some of the thinking patterns of traditional Java concurrent programming. Removing the restriction of Android threads on SQLite transactions directly wasn’t feasible because we wanted to provide a backward-compatible solution, and the combination of these approaches ultimately allowed us to get creative with our solution using coroutines and the Fluent API.