!

R2DBC introduction

On the R2DBC official website (R2DBC. IO /), there is a one-sentence introduction to R2DBC:

The Reactive Relational Database Connectivity (R2DBC) project brings reactive programming APIs to relational databases.

R2DBC stands for Reactive Relational Database Connectivity, an incubator for Relational Database integration using Reactive drivers. It uses relational data access techniques on top of reactive programming.

R2DBC was originally an experiment and proof-of-concept to integrate SQL databases into systems using a responsive programming model. JDBC uses a blocking API, while R2DBC allows developers to access relational databases using a non-blocking API because R2DBC includes the Reactive Streams specification. R2DBC supports the following responsive frameworks: Reactor, RxJava, And Smallrye Mutiny.

R2DBC is currently an open specification that establishes a service provider interface (SPI) for driver vendor implementations and client usage.

In addition, R2DBC is a specification proposed by the Official Spring team that provides the R2DBC connection pool and R2DBC agent in addition to the driver implementation.

Currently, R2DBC supports the following driver implementations:

  • cloud-spanner-r2dbc – driver for Google Cloud Spanner.
  • jasync-sql – R2DBC wrapper for Java & Kotlin Async Database Driver for MySQL and PostgreSQL (written in Kotlin).
  • r2dbc-h2 – native driver implemented for H2 as a test database.
  • r2dbc-mariadb – native driver implemented for MariaDB.
  • r2dbc-mssql – native driver implemented for Microsoft SQL Server.
  • r2dbc-mysql – native driver implemented for MySQL.
  • r2dbc-postgres – native driver implemented for PostgreSQL.

R2DBC use

Configure Spring Boot and R2DBC dependency libraries in Gradle:

    implementation "IO. R2dbc: r2dbc - h2:0.8.4. RELEASE"
    implementation "Com. H2database: h2:1.4.200"

    implementation "Org. Springframework. Data: spring - data - r2dbc: 1.0.0. RELEASE"
    implementation "Org. Springframework. The boot: spring - the boot - starter - physical: 2.3.5. RELEASE"
    implementation "Org. Springframework. The boot: spring - the boot - starter - data - r2dbc: 2.3.5. RELEASE"
    implementation "Org. Springframework. The boot: spring - the boot - starter - webflux: 2.3.5. RELEASE"
    annotationProcessor "Org. Springframework. The boot: spring - the boot - configuration - processor: 2.3.5. RELEASE"

    implementation "IO. Projectreactor. Kotlin: reactor - kotlin - extensions: 1.1.0." "
    implementation "Org. Jetbrains. Kotlinx: kotlinx coroutines - reactor: 1.3.9"
Copy the code

Connecting to a Database

We registered and configured ConnectionFactoryInitializer bean, and through the ConnectionFactory to initialize the database:

@Configuration
@EnableR2dbcRepositories
open class AppConfiguration {...@Bean
    open fun initializer(@Qualifier("connectionFactory") connectionFactory: ConnectionFactory): ConnectionFactoryInitializer {
        val initializer = ConnectionFactoryInitializer()
        initializer.setConnectionFactory(connectionFactory)
        val populator = CompositeDatabasePopulator()
        populator.addPopulators(ResourceDatabasePopulator(ClassPathResource("schema.sql")))
        populator.addPopulators(ResourceDatabasePopulator(ClassPathResource("data.sql")))
        initializer.setDatabasePopulator(populator)
        return initializer
    }
}
Copy the code

This initialization support is automatically configured by Spring Boot R2DBC via schema. SQL and data.sql to ConnectionFactory.

Create an interface based on routing function mode

WebFlux provides two development modes. One is the traditional annotation-based development mode, which uses Controller + annotations for development. The other is the routing function pattern, which uses a functional programming style.

Routing Function mode mainly uses HandlerFunction and RouterFunction.

  • HandlerFunction represents a function that generates responses for requests routed to them.
  • RouterFunction can replace the @requestMapping annotation. We can use it to route requests to handler functions.

They are just like annotated controllers, except HTTP Methods are built responsively.

CoRouter () allows you to easily create RouterFunctions using the Kotlin DSL and Coroutines. Such as:

@Configuration
@EnableR2dbcRepositories
open class AppConfiguration {

    @Bean
    open fun userRoute(userHandler: UserHandler) = coRouter {
        GET("/users", userHandler::findAll)
        GET("/users/search", userHandler::search)
        GET("/users/{id}", userHandler::findUser)
        POST("/users", userHandler::addUser)
        PUT("/users/{id}", userHandler::updateUser)
        DELETE("/users/{id}", userHandler::deleteUser)
    }

    ......
}
Copy the code

Create HandlerFunctions

UserHandler is a collection of their handlerfunctions. A Handler is a bit like a Service:

@Component
class UserHandler {

    private val logger = LoggerFactory.getLogger(UserHandler::class.java)

    @Autowired
    lateinit var service: UserService

    suspend fun findAll(request: ServerRequest): ServerResponse {
        val users = service.findAll()
        return ServerResponse.ok().json().bodyAndAwait(users)
    }

    suspend fun search(request: ServerRequest): ServerResponse {
        val criterias = request.queryParams()
        return when {
            criterias.isEmpty() -> ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Search must have query params"))

            criterias.contains("name") - > {val criteriaValue = criterias.getFirst("name")
                if (criteriaValue.isNullOrBlank()) {
                    ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Incorrect search criteria value"))}else {
                    ServerResponse.ok().json().bodyAndAwait(service.findByName(criteriaValue))
                }
            }

            criterias.contains("email") - > {val criteriaValue = criterias.getFirst("email")
                if (criteriaValue.isNullOrBlank()) {
                    ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Incorrect search criteria value"))}else {
                    ServerResponse.ok().json().bodyAndAwait(service.findByEmail(criteriaValue))
                }
            }

            else -> ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Incorrect search criteria"))}}suspend fun findUser(request: ServerRequest): ServerResponse {
        val id = request.pathVariable("id").toLongOrNull()

        return if (id == null) {
            ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("`id` must be numeric"))}else {
            val user = service.findById(id)
            if (user == null) ServerResponse.notFound().buildAndAwait()
            else ServerResponse.ok().json().bodyValueAndAwait(user)
        }
    }

    suspend fun addUser(request: ServerRequest): ServerResponse {
        val newUser = try {
            request.bodyToMono<UserDTO>().awaitFirstOrNull()
        } catch (e: Exception) {
            logger.error("Decoding body error", e)
            null
        }

        return if (newUser == null) {
            ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Invalid body"))}else {
            val user = service.addUser(newUser)
            if (user == null) ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).json().bodyValueAndAwait(ErrorMessage("Internal error"))
            else ServerResponse.status(HttpStatus.CREATED).json().bodyValueAndAwait(user)
        }
    }

    suspend fun updateUser(request: ServerRequest): ServerResponse {
        val id = request.pathVariable("id").toLongOrNull()

        return if (id == null) {
            ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("`id` must be numeric"))}else {
            val updateUser = try {
                request.bodyToMono<UserDTO>().awaitFirstOrNull()
            } catch (e: Exception) {
                logger.error("Decoding body error", e)
                null
            }
            if (updateUser == null) {
                ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("Invalid body"))}else {
                val user = service.updateUser(id, updateUser)
                if (user == null) ServerResponse.status(HttpStatus.NOT_FOUND).json().bodyValueAndAwait(ErrorMessage("Resource $id not found"))
                else ServerResponse.status(HttpStatus.OK).json().bodyValueAndAwait(user)
            }
        }
    }

    suspend fun deleteUser(request: ServerRequest): ServerResponse {
        val id = request.pathVariable("id").toLongOrNull()

        return if (id == null) {
            ServerResponse.badRequest().json().bodyValueAndAwait(ErrorMessage("`id` must be numeric"))}else {
            if (service.deleteUser(id)) ServerResponse.noContent().buildAndAwait()
            else ServerResponse.status(HttpStatus.NOT_FOUND).json().bodyValueAndAwait(ErrorMessage("Resource $id not found"))}}}Copy the code

The ServerResponse returned by each HandlerFunction provides access to the Http response, which can be created using the Build method. The Builder can set the response code, the response title, or the body.

Create a Service

UserHandler implements specific services through UserService.

@Service
class UserService {

    @Autowired
    private lateinit var userRepository: UserRepository

    suspend fun findAll(a) = userRepository.findAll().asFlow()

    suspend fun findById(id: Long) = userRepository.findById(id).awaitFirstOrNull()

    suspend fun findByName(name: String) = userRepository.findByName(name).asFlow()

    suspend fun findByEmail(email: String) = userRepository.findByEmail(email).asFlow()

    suspend fun addUser(user: UserDTO) = userRepository.save(user.toModel()).awaitFirstOrNull()

    suspend fun updateUser(id: Long, userDTO: UserDTO): User? {
        val user = findById(id)
        return if(user ! =null)
            userRepository.save(userDTO.toModel(id = id)).awaitFirstOrNull()
        else null
    }

    suspend fun deleteUser(id: Long): Boolean {
        val user = findById(id)
        return if(user ! =null) {
            userRepository.delete(user).awaitFirstOrNull()
            true
        } else false}}Copy the code

UserService findAll(), findByName(), and findByEmail() return a Flow<User> object. This is because Spring Data R2DBC’s Coroutines extend the responsive infrastructure so that UserService’s methods can be defined as suspend functions and Flux results converted to Kotlin’s Flow type.

Create the Repository

UserService calls Repository to work with the database. Before creating Repository, we create the entity class User:

@Table("users")
data class User(
    @Id
    val id: Long? = null.val name: String,
    val password: String,
    val email: String,
)
Copy the code

The User class has a unique identifier and some fields. With the entity class in place, we can create an appropriate Repository as follows:

interface UserRepository : ReactiveCrudRepository<User, Long> {

    @Query("SELECT u.* FROM users u WHERE u.name = :name")
    fun findByName(name: String): Flux<User>

    @Query("SELECT u.* FROM users u WHERE u.email = :email")
    fun findByEmail(email: String): Flux<User>
}
Copy the code

Note that with R2DBC, there is no ORM, but a reactive approach.

Running effect

Display a list of users

Search the user

3. Summary

This article introduced the background to R2DBC, followed by the WebFlux Routing function pattern and the use of RouterFunction and HandlerFunction to create routes to handle requests and generate responses.

When the WebFlux and R2DBC configurations are used, each layer of the program is created with data processed asynchronously.