Based on the Reactive Streams Reactive flows specification, R2DBC is an open specification that provides an interface for driver vendors and consumers (R2DBC-SPI). Unlike the blocking nature of JDBC, it provides a fully Reactive non-blocking API to interact with relational databases.

In short, the R2DBC project is a bridge to support the use of reactive programming apis to access relational databases, defining a unified interface specification that can be implemented by different database vendors providing driver packages.

R2DBC defines the SPI that all data storage drivers must implement. Currently, R2DBC SPI drivers include:

  • R2dbc-h2: driver for H2 implementation;

  • R2dbc Mariadb: driver for Mariadb implementation;

  • R2dbc MSSQL: a local driver for Microsoft SQL Server;

  • R2dbc mysql: driver for mysql;

  • R2dbc PostGREs: a PostgreSQL driver.

R2dbc also provides reactive connection pool r2DB-pool (github.com/r2dbc/r2dbc…

The content of this article:

  • Connect to the mysql database using the R2DBC-mysql driver package
  • Use r2DBc-pool to get the database connection
  • Spring-data-r2dbc add, delete, change check API
  • Use of transactions
  • R2DBC Repository

Connect to the mysql database using the R2DBC-mysql driver package

Add r2DBc-mysql dependency:

<! -- r2dbc mysql-->
<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <version>0.8.2. RELEASE</version>
</dependency>
Copy the code

R2dbc -mysql implements the ConnectionFactory SPI interface of R2DBC.

The connection factory is first created and then used to obtain the connection to the database.

ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
                .option(DRIVER, "mysql")
                .option(HOST, "127.0.0.1")
                .option(USER, "root")
                .option(PORT, 3306)
                .option(PASSWORD, "")
                .option(DATABASE, "r2dbc_stu")
                .option(CONNECT_TIMEOUT, Duration.ofSeconds(3))
                .build();
ConnectionFactory connectionFactory = ConnectionFactories.get(options);
Copy the code

Get a database connection from the connection factory:

Publisher<? extends Connection> connectionPublisher = connectionFactory.create();
Copy the code

Execute an SQL statement with a connection:

Mono.from(connectionPublisher)
    .flatMapMany(conn -> conn.createStatement(
       "insert into person (id,name,age) values ('1111','wujiuye',25)")
       .execute())
    .flatMap(Result::getRowsUpdated)
    .switchIfEmpty(Mono.just(0)) 
    .onErrorResume(throwable -> {
        throwable.printStackTrace();
        return Mono.empty();
    })
    .subscribe(System.out::println);
Copy the code
  • flatMapMany(conn -> conn.createStatement("sql").execute()): createStatementperformsql;
  • flatMap(Result::getRowsUpdated): getsqlNumber of rows affected by execution (selectStatement does not have the result);
  • switchIfEmptyIf:insertThe number of rows updated is0Will be executed;
  • onErrorResume: handlingconnectionAbnormal connection,sqlStatement execution exception;

Use r2DBc-pool to get the database connection

Add the r2DBc-pool dependency

<dependencies>
  <! -- r2dbc mysql -->
  <dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <version>0.8.2. RELEASE</version>
  </dependency>
  <! -- r2dbc-pool -->
  <dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-pool</artifactId>
    <version>0.8.2. RELEASE</version>
  </dependency>
</dependencies>
Copy the code

Create a ConnectionPool based on the ConnectionFactory:

ConnectionFactory connectionFactory = ....
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
                .maxIdleTime(Duration.ofMillis(1000))
                .maxSize(20)
                .build();
ConnectionPool pool = new ConnectionPool(configuration);
Copy the code
  • ConnectionPoolConfiguration: connection pool configuration object that specifies the size of the connection pool, the maximum idle time for a connection, and so on.
  • ConnectionPool: Connection pool, alsoConnectionFactoryThe implementation class of the interface;

Create a connection using a connection pool:

Mono<Connection> connectionMono = pool.create();
// Release the connection back to the connection pool
connectionMono.flatMapMany(Connection::close).subscribe();
// Destroy the connection pool
pool.dispose();
Copy the code

R2dbc does not define a connection pool interface. R2db-pool takes over the creation of connections and manages the connection lifecycle by implementing the ConnectionFactory interface.

When using spring-data-R2DBC, we simply replace the ConnectionFactory registered with the bean factory with ConnectionPool, for example:

@Bean
public ConnectionFactory connectionFactory(a){
   ConnectionFactory connectionFactory = ....
   ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
                .maxIdleTime(Duration.ofMillis(1000))
                .maxSize(20)
                .build();
    // ConnectionPool implements the ConnectionFactory interface, replacing ConnectionFactory with ConnectionFactory
    return new ConnectionPool(configuration);
}
Copy the code

But we don’t need to configure the ConnectionFactory ourselves, spring-data-R2DBC automatically does that for us.

So when we use spring-data-R2DBC, we use DatabaseClient to execute the SQL as follows, and ultimately get the connection execution from the connection pool as well:

public class xxx{
    @Resource
    private ConnectionFactory connectionFactory;
  
    @Test
    public void test(a){
       DatabaseClient client = DatabaseClient.create(connectionFactory);
       / /...}}Copy the code

Breakpoint validation is performed at the point where the connection is obtained:

Spring-data-r2dbc add, delete, change check API

Spring-data-r2dbc can be used directly through the dependent starter, which will import the required JAR packages into the project:

<dependencies>
    <! -- R2DBC mysql database -->
    <dependency>
       <groupId>dev.miku</groupId>
       <artifactId>r2dbc-mysql</artifactId>
       <version>0.8.2. RELEASE</version>
    </dependency>
    <! R2dbc-pool r2DBC-pool
    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-data-r2dbc</artifactId>
       <version>2.3.0. RELEASE</version>
    </dependency>
</dependencies>
Copy the code

Add the configuration to the application.yml configuration file:

### r2dbc
spring:
  r2dbc:
    url: r2dbc:mysql://localhost:3306/r2dbc_stu? useUnicode=true&characterEncoding=UTF-8
    username: root
    password: 
    pool:
      enabled: true
      max-size: 10
      initial-size: 10
      validation-query: select 1
Copy the code

DatabaseClient

DatabaseClient is a functional reactive non-blocking API provided by Spring Data R2DBC for interacting with databases.

DatabaseClient encapsulates the processing of resources, such as opening and closing connections, making it easier to add, delete, change and query SQL without worrying about releasing connections.

DatabaseClient by spirng – data – r2dbc org. Springframework. Data. R2dbc. Config. AbstractR2dbcConfiguration class is complete, We can also inherited AbstractR2dbcConfiguration class, replace some default configuration.

@Configuration
public class R2dbcConfiguration extends AbstractR2dbcConfiguration {
    @Override
    @Bean // This annotation cannot be omitted
    public ConnectionFactory connectionFactory(a) {
        / /...
    }
    
    @Bean
    ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return newR2dbcTransactionManager(connectionFactory); }}Copy the code

insert api

Example: Insert a record into the PERSON table

client.insert()
      .into("person")
      .value("id"."123")
      .value("name"."wujiuye")
      .nullValue("age", Integer.class)
      .fetch()
      .rowsUpdated()
      .subscribe(System.out::println);
Copy the code
  • intoIn:xxInsert a record into a table;
  • value: Assigns a value to a field (table field name) of the new record;
  • nullValue: assigns a null value to a field;
  • fetch: performsqlAnd get a response;
  • rowsUpdated: Gets the number of updated records, in this case, the number of inserted records;
  • subscribeSubscribe, triggering the stream execution and consuming the final result.

Multiple SQL combinations executed:

Mono<Void> insert1 = client.insert().into("person")
         .value("id"."12345")
         .value("name"."wujiuye")
         .nullValue("age", Integer.class)
         .then();
Mono<Void> insert2 = client.insert().into("person")
          .value("id"."123445555555")
          .then();
insert1.then(insert2).subscribe();
Copy the code
  • then: Does not consume any results, the method returns oneMono<Void>Is used to connect to the next step, but it does not pass the result of the previous step to the next step;
  • subscribe(): subscribe, only trigger stream execution, not care about the result;

update api

Example: Update the record with ID 12345 in the person table:

client.update().table("person")
      .using(Update.update("name"."1111").set("age".18))
      .matching(Criteria.where("id").is("12345"))
      .fetch()
      .rowsUpdated()
      .subscribe(rows -> System.out.println("Number of updated records:" + rows));
Copy the code
  • using: Receive aUpdateObject,UpdateDecide which fields to update;
  • matching: Receive aCriteriaObject, sets the matching condition, that issqlthewherePart;

delete api

Select * from person where name = 1111 and age = 18;

client.delete().from("person")
      .matching(Criteria.where("name").is("1111").and("age").is(18))
      .fetch()
      .rowsUpdated()
      .subscribe(rows -> System.out.println("The total number of deleted records is:" + rows));
Copy the code

select api

Select * from person where name = null;

Flux<Person> list = client.select().from("person")
         .matching(Criteria.where("name").isNull())
         .as(Person.class)
         .fetch()
         .all();
list.subscribe(System.out::println);
Copy the code
  • all: Get all results;
  • as: Maps the result toPersonInstance;

Use of transactions

A common pattern when using relational databases, which typically associate transactions with a single connection, is to group multiple queries into a unit of work protected by transactions. Therefore, using different connections results in using different transactions.

Spring-data-r2dbc includes transaction awareness in DatabaseClient, allowing you to group multiple statements in the same transaction using Spring’s transaction management.

Example: Join multiple SQL statements with THEN and roll back the transaction if one SQL execution fails:

ReactiveTransactionManager tm = new R2dbcTransactionManager(connectionFactory);
TransactionalOperator operator = TransactionalOperator.create(tm);

Mono<Void> atomicOperation = client.execute("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
      .bind("id"."joe") // Bind parameters
      .bind("name"."Joe")
      .bind("age".34)
      .fetch().rowsUpdated()
      .then(client.execute("INSERT INTO contacts (id, name) VALUES(:id, :name)")
                  .bind("id"."joe")
                  .bind("name"."Joe")
                  .fetch().rowsUpdated())
      .then();
// Execute atomicOperation in a transaction
operator.transactional(atomicOperation).subscribe();
Copy the code
  • ReactiveTransactionManagerResponsive programming transaction manager,R2dbcTransactionManagerisspring-data-r2dbcTo provide theReactiveTransactionManagerAn implementation of;

Spring-data-r2dbc also supports annotation-based declarative transactions. Spring – data – r2dbc automatically configure the ReactiveTransactionManager @ EnableTransactionManagement and declarative transaction management, thus can be used directly, and don’t add additional configuration. Such as:

@Component
public class TxService {
    
    @Resource
    private DatabaseClient client;

    @Transactional(rollbackFor = Throwable.class)
    public Mono<Void> txInsert(a) {
        Person person = new Person();
        person.setId('12123');
        returnclient.insert().into(Person.class) .using(person) .fetch().rowsUpdated() .then(client.insert().into(Person.class) .using(person) .fetch().rowsUpdated() .then()); }}/ / test
public class R2dbcTxTest extends SupporSpringBoot {

    @Resource
    private TxService txService;

    @Test
    public void testTx(a) {
        txService.txInsert().doOnError(throwable -> {
            System.out.println("Execution failed"); throwable.printStackTrace(); }).subscribe(); }}Copy the code

R2DBC Repository

Spring-data-r2dbc also implements the Spring Data Repository reaction API.

Now, let’s use an example to wrap a series of operations that invoke the Repository method using declarative transactions.

1, declare persistent object PO:

@Table("person")
public static class Person {
   @Id
   private String id;
   private String name;
   private int age;
}
Copy the code

Create a Dao(to distinguish it from Repository in dome-driven design) and inherit the R2dbcRepository interface:

public interface PersonDao extends R2dbcRepository<R2dbcStuMain.Person.String> {
    @Modifying
    @Query("insert into person (id,name,age) values(:id,:name,:age)")
    Mono<Integer> insertPerson(String id, String name, Integer age);
}
Copy the code

Create a Service, create a transaction method in the Service, link to the insertPerson method that calls PersonDao multiple times.

@Service
public class PersonService {

    @Resource
    private PersonDao personDao;

    @Transactional(rollbackFor = Throwable.class)
    public Mono<Integer> addPerson(Person... persons) {
        Mono<Integer> txOp = null;
        for (Person person : persons) {
            if (txOp == null) {
                txOp = personDao.insertPerson(person.getId(), person.getName(), person.getAge());
            } else{ txOp = txOp.then(personDao.insertPerson(person.getId(), person.getName(), person.getAge())); }}returntxOp; }}Copy the code

Test whether the transaction is effective:

@SpringBootApplication
@EnableR2dbcRepositories
public class R2dbcApplication {
    public static void main(String[] args) throws InterruptedException {
        ConfigurableApplicationContext context = SpringApplication.run(R2dbcApplication.class);
        PersonService personService = context.getBean(PersonService.class);
        Person person = new Person();
        person.setId("12347");
        person.setName("wjy");
        person.setAge(25);
        // Test the transaction method to verify if any data was successfully inserted when the primary key is repeatedpersonService.addPerson(person, person) .doOnError(Throwable::printStackTrace) .subscribe(System.out::println); TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); }}Copy the code

reference

  • Spring Data R2DBCOfficial documents:Docs. Spring. IO/spring – the data…
  • r2dbc.io:r2dbc.io/