background

The core concept of Redux is one-way data flow, which can only be modified by dispatch(Action). Using React-Redux, the following data flow loop can be formed between components and Redux:

view ->  action -> reducer -> state -> view
Copy the code

However, there are a lot of asynchronous scenarios in actual services. The most direct way is to initiate asynchronous requests in the React component and call Dispatch (Action) to modify the data at the data layer after receiving the data. However, this approach makes the view layer and the data layer coupled together, resulting in later maintenance difficulties.

The Redux authors recommend using middleware to handle asynchronous flows because we have the flexibility to control the timing of dispatches in middleware, which is very effective for handling asynchronous scenarios. There are two common practices:

  1. Change the type of action, such as redux-thunk, to replace it with a function;
  2. Do something about the actions you receive in middleware, such as Redux-saga.

The combination of RXJS and Redux, which we’ll talk about today, takes a second approach to asynchronous processes.

Middleware intervention

One such middleware already exists: redux-Observable. What we’re going to do today is take you step by step and implement your own Redux-Observable.

The principle of this middleware can be simplified to the following code:

export default store => next= > action => {
    const result = next(action);
    if (action.type === 'ping') {
        store.dispatch({ type: 'pong'})}return result;
}
Copy the code

The principle is really simple. After next(Action), it makes judgment according to action, makes some asynchronous logic, and initiates dispatch to modify data. Redux-observable just adds some features of RxJs on this basis.

Ideas for handling asynchronous logic

If you are familiar with Redux, you will know that the middleware of Redux is an Onion model. As mentioned above, we will redispatch some actions in the back of the middleware based on your actions. The core idea of Rxjs is to stream data. So you can think of actions flowing into a pipe at the end of the middleware, and eventually out of the pipe are some actions that will eventually be dispatched by the store again.

As for what changes and operations are made in this pipe, that is the domain of Rxjs, and with the power of Rxjs operators, asynchronous logic can be implemented very elegantly.

So, there needs to be a stream to hold all the actions so that you can fetch with this action$:

action$.pipe(
    switchMap(
        (a)= > fromPromise(fetch('/api/whatever')).pipe(
            map(res= > action)
        )
    ),
    catchError((a)= >{}))Copy the code

This emashes asynchronous logic into the flow.

Create the Action flow

The core idea is action in, Action out, so the outgoing action is reconsumed by store.dispatch, so action$is an Observable.

Meanwhile, at dispatch, the action passes through the middleware, actionAlso an observer.

Thus, action$is both an observer and an observable, and is a Subject object:

A simple way to replace middleware becomes:

import { Subject } from 'rxjs/Subject';

export default (store) => {

  const action$ = new Subject();
  
  action$.subscribe(store.dispatch);
  
  return next= > (action) => {
    const result = next(action);
    action$.next(action);
    return result;
  };
};
Copy the code

In the above code we put an action in middleware, and by subscribing, the Store triggers dispatch.

But, if we just write it this way, it’s an infinite loop, because any action is going into actionGo to.

As you are smart enough to know, we haven’t done any filtering yet for the actions going into action$, and this filtering process is where we need to handle the asynchronous logic.

Now we’re going to add this step.

Flow converter Epic

In order to achieve a transformation of the action, we will isolate the process. The intermediate processing logic is called Epic. The form of Epic can be roughly written as:

const epic = (action$) = > {
    return action$.pipe(
        // Because all the actions are coming
        // So we only need to deal with the aciton we want
        filter(action= > action.type === 'GET_USER'),
        switchMap(
            // Convert fetch to stream as well
            () => fromPromise(fetch('/api/user/get', {
                method: 'POST'.body: {
                    id: 1
                },
            })).pipe(
                map(res= > ({ type: 'GET_USER_SUCCESS'.payload: res })),
                catchError(error= > ({ type: 'GET_USER_FAILED'.payload: error }))
            )
        )
    )
}

Copy the code

Epic is essentially a function. In this function, we add pipe control to the action$to generate another stream. This stream is the action stream that controls the action.

And what we’re going to do is integrate this Epic into that middle.

To do this, we simply switch the subscription from action$to the new stream:

import { Subject } from 'rxjs/Subject';

export default (store) => {

  const action$ = new Subject();
  const newAction$ = epic(action$);
  
  newAction$.subscribe(store.dispatch);
  
  return next= > (action) => {
    const result = next(action);
    action$.next(action);
    return result;
  };
};
Copy the code

Thus, when action$receives a new action, it flows through the pipe defined by Epic before dispatch

Multiple Epic mergers

At this point, our middleware has the initial ability to handle asynchronous logic, but, in reality, we can’t have only one asynchronous logic, so epic will have many streams, and the store can only subscribe to one stream, so so many epic streams have to be merged into one stream.

Merge stream operations, powerful RxJs is naturally arranged, believe you think of the operator merge, we can provide a combineEpics function:

export const combineEpics = (. epics) = > {
  const merger = (. args) = >merge( ... epics.map((epic) = > {
      constoutput$ = epic(... args);returnoutput$; }));return merger;
};
Copy the code

The above code is not difficult to understand. CombineEpics consolidates all incoming EpIcs and then returns a merger that uses the merge operator to merge all epICS flows into a single stream.

The code form is:

const pingEpic = action$= > action$.pipe(
  filter(action= > action.type === 'ping'),
  map((a)= > ({ type: 'pong'})));const getUserEpic = action$= > action$.pipe(
  filter(action= > action.type === 'GET_USER'),
  map((a)= > ({ type: 'GET_USER_SUCCESS'.payload: { user: { name: 'kang'}}})));const rootEpic = combineEpics(pingEpic, getUserEpic);

export default (store) => {
  const action$ = new Subject();
  const newAction$ = rootEpic(action$);

  newAction$.subscribe(store.dispatch);
  return next= > (action) => {
    const result = next(action);
    action$.next(action);
    return result;
  };
};
Copy the code

The state to obtain

In Epic, it is inevitable that we will use the data in state to do different processing, so we need to get state, so you can expose the state function by adding a parameter to the epCI execution function in the middleware:

export default(store) => { ... const newAction$ = rootEpic(action$, store.getState); . };Copy the code

Epic can use getState() to getState:

const pingEpic = (action$, getState) = > action$.pipe(
  filter(action= > action.type === 'ping'),
  map((a)= > ({ type: 'pong'.payload: getState() })),
);
Copy the code

Further optimization: Fluidize state as well

The above approach is to directly obtain the state, such approach is active acquisition, does not conform to the functional responsive programming mode. In the function response, the changing state of state should be observable.

When state can also be observed by response, we can do more functions. For example, when some data of state changes, we can save it in real time.

In traditional mode, you could write something like this in middleware:

export default store => next= > action => {
    const oldState = store.getState();
    const result = next(action);
    const newState = store.getState();
    // Something like this
    if(newState.xxx ! == oldState.xxx) { fetch('/api/save', {
            method: 'POST'.body: {
            
            }
        }).then((a)= > {}).catch((a)= >{})}return result;
}
Copy the code

This processing logic needs to be isolated to a middleware, and if you stream state as well, you can use Epic directly:

const saveEpic = (action$, state$) = > state$.pipe(
const autoSaveEpic = (action$, state$) = >
  return action$.pipe(
    filter(action= > action.type === 'AUTO_SAVE_ENABLE'), // Autosave enabled
    exhaustMap((a)= > state$.pipe(
        pluck('xxx'), / / to get state. XXX
        distinctUntilChanged(), // Only when the value is different.
        concatMap((value) = > {
            // fetch to save
        }),
        // Autosave is off
        takeUntil(action$.pipe(
            filter(action= > action.type === 'AUTO_SAVE_DISABLE')
        ))
    ))
  )
)
Copy the code

If you read the code carefully, you can see that this is a very elegant way to control the autosave. It can be used in conjunction with Action $to quickly switch on and off autosave, and it can take advantage of RxJs features to solve the asynchronous execution delay of saves.

If you just want to get the latest state, you can use the withLatestFrom operator:

const countEpic = (action$, state$) = > action$.pipe(
  filter(action= > action.type === 'count'),
  withLatestFrom(state$),
  switchMap(([action, state]) = > {
    return of({ type: 'whatever'}); }));Copy the code

Add state flow to middleware:

export default (store) => {
  const action$ = new Subject();
  const state$ = new Subject();
  const source$ = rootEpic(action$, state$);

  source$.subscribe(store.dispatch);
  return next= > (action) => {
    const result = next(action);
    state$.next(store.getState());
    action$.next(action);
    return result;
  };
};
Copy the code

Pay attention to the stateThe latest value is not the default state.

The order of actions

If you have the patience to read this, then you have a pretty good understanding of redux in combination with RxJs. However, there is still a problem with the order in which actions take effect.

const epic1 = action$= > action$.pipe(
  filter(action= > action.type === 'one'),
  mergeMap((a)= > of({ type: 'two' }, { type: 'three'})));const epic2 = action$= > action$.pipe(
  filter(action= > action.type === 'two'),
  mergeMap((a)= > of({ type: 'four'})));Copy the code

When store.dispatch({type: ‘one’}) is executed, the action sequence is:

'one' -> 'two' -> 'four' -> 'three'
Copy the code

This is because the default RxJs scheduler is synchronous, with a simple code that looks something like this:

class Print {
  constructor(name, nexts = []) {
    this.name = name;
    this.nexts = nexts;
  }
  print() {
    console.log(this.name);
    this.nexts.forEach((p) = >{ p.print(); }); }}const three = new Print('three');
const four = new Print('four');
const two = new Print('two', [four]);
const one = new Print('one', [two, three]);
one.print(); // one, two, four, three
Copy the code

If you use the code above, you will be familiar with it. It will also say yes to the output, but we need the effect

'one' -> 'two' -> 'three' -> 'four'
Copy the code

How do you do that?

Obviously, you need to replace the scheduler with something else. RxJs has several schedulers: NULL (Synchronous), ASAP, Queue, Async, animationFrame. The last one is the scheduler of the animation scene, which is eliminated directly. The default is the first one, which leaves ASAP, Queue and Async. All three schedulers are feasible in this scenario, but queue is good for performance when there is a large amount of data, so it can be used here. Asap is Micro Task, Async is Macro Task, queue is synchronous when delay is 0, async is synchronous when delay is not 0.

Middleware:

  const action$ = new Subject().pipe(
    observeOn(queue)
  );
Copy the code

The result is:

'one' -> 'two' -> 'three' -> 'four'
Copy the code

If you use simple code, this is what happens:

class Print {
  constructor(name, nexts = []) {
    this.name = name;
    this.nexts = nexts;
  }
  print() {
    console.log(this.name);
    this.nexts.forEach((p) = > {
      setTimeout((a)= > p.print(), 0); }); }}const three = new Print('three');
const four = new Print('four');
const two = new Print('two', [four]);
const one = new Print('one', [two, three]);
one.print(); // one, two, three, four

Copy the code

conclusion

This is the end of this paper. This time, I introduced how to realize a Redux-Observable by myself. Next time, I will talk about some practical applications of Redux-Observable, such as how to implement modular development similar to DVA and how to deal with loading and error in a unified manner.