preface

The RxJS Observable is a little hard to understand, and the concepts associated with RxJS are a little hard to understand. After all, RxJS introduces a new paradigm of reactive programming, and it’s normal to get used to it. But you have to understand something, and when you learn something new, it’s easier to understand it if you can compare it with an appropriate known thing. An Observable is a good analogy to a JS function.

start

encapsulation

Let’s start with an example of a normal function call:

function foo() {
  console.log('process... ')
}

foo()
/ / output:
// process...
Copy the code

Quite simply, the function foo() encapsulates a piece of logic (here it just prints to the console) and then executes the internal logic by calling the function.

An example of an RxJS Observable:

var foo = Rx.Observable.create((a)= > {
  console.log('process... ')
})

foo.subscribe()
/ / output:
// process...
Copy the code

In the above example, creating an Observable with rx.Observable.create () also encapsulates a piece of code logic into Observable foo, and executes the encapsulated code logic with foo.subscribe().

For normal functions and Observables, the wrapped code logic is reexecuted each time it is called. From this point of view, Observables can wrap code and reuse it just like normal functions.

The return value

A function call can have a return value:

function foo() {
  console.log('process... ')
  return 42
}

console.log(foo())
/ / output:
// process...
/ / 42
Copy the code

Observable returns a value as well, but instead of returning it directly, an Observable returns a value as a callback:

var foo = Rx.Observable.create((observer) = > {
  console.log('process... ')
  observer.next(42)
})

foo.subscribe(value= > console.log(value))
/ / output:
// process...
/ / 42
Copy the code

Observable returns values internally via observer.next(42), and callers receive the returned data via callbacks. A bit more verbose in form than a normal function that returns a value directly.

From the caller’s point of view, the two procedures are:

  • Normal functions: call > Execute logic > Return data
  • Observable: Subscribe > Execute logic > Return data

In terms of getting the return value, calling a function is a mode of getting data directly, “pulling” data from the function; After an Observable subscribes, it “pushes” data to the caller by indirectly calling the callback function.

The important difference between pull and push is that in push mode, an Observable decides when to return a value and how many values to return (that is, how many times to call the callback).

var foo = Rx.Observable.create((observer) = > {
  console.log('process... ')
  observer.next(1)
  setTimeout((a)= > observer.next(2), 1000)})console.log('before')
foo.subscribe(value= > console.log(value))
console.log('after')
/ / output:
// before
// process...
/ / 1
// after
/ / 2
Copy the code

Observable returns two values. The first value is returned synchronously and the second value is returned asynchronously after a second.

In other words, Observable differs from normal functions in terms of return value:

  • Multiple values can be returned
  • You can return values asynchronously

Exception handling

Function execution may be abnormal, for example:

function foo() {
  console.log('process... ')
  throw new Error('BUG! ')}Copy the code

We can catch the exception state for processing:

try {
   foo()
} catch(e) {
  console.log('error: ' + e)
}
Copy the code

Observables also have error handling mechanisms:

var foo = Rx.Observable.create((observer) = > {
  console.log('process... ')
  observer.error(new Error('BUG! '))
})

foo.subscribe(
  value= > console.log(value),
  e => console.log('error: ' + e)
)
Copy the code

Observable’s Subscribe () method supports passing in additional callbacks to handle exceptions. Like function execution, an Observable stops returning data after an error.

The subscribe() method also supports another form of incoming callback:

foo.subscribe({
  next(value) { console.log(value) },
  error(e) { console.log('error: ' + e) }
})
Copy the code

In this way, the form of the incoming object and observer parameters in Observable execution functions are similar.

Suspend execution

Observable logic can asynchronously return multiple values, or even an infinite number of values:

var foo = Rx.Observable.create((observer) = > {
  let i = 0
  setInterval((a)= > observer.next(i++), 1000)
})

foo.subscribe(i= > console.log(i))
/ / output:
/ / 0
/ / 1
/ / 2
// ...
Copy the code

In the example above, the Observable returns a value to the caller every second. Even if the caller no longer needs the data, it continues to push data to the call through the callback function.

RxJS provides a mechanism to abort an Observable:

var foo = Rx.Observable.create((observer) = > {
  console.log('start')
  let i = 0
  let timer = setInterval((a)= > observer.next(i++), 1000)
  return (a)= > {
    clearInterval(timer)
    console.log('end')}})var subscription = foo.subscribe(i= > console.log(i))
setTimeout((a)= > subscription.unsubscribe(), 2500)
/ / output:
// start
/ / 0
/ / 1
/ / 2
// end
Copy the code

The subscribe() method returns a subscription object, and the unsubscribe() method is used to unsubscribe, terminating the Observable’s internal logic and returning new data.

An Observable aborts by returning a function to abort after execution, as in the example above.

Observable triggers the observer’s complete callback when it finishes executing, so it looks like this:

foo.subscribe({
  next(value) { console.log(value) },
  complete() { console.log('completed')}})Copy the code

Observable observers have three callbacks:

  • Next: Get the data
  • Error: Exceptions are processed
  • Complete: The execution is complete

Next can be called multiple times, and error and complete can only be called at most once (no other callbacks are triggered after either call).

Data conversion

For a function return value, sometimes we need to convert it before using it, for example:

function foo() {
  return 1
}

console.log(f00() * 2)
/ / output:
/ / 2
Copy the code

A similar situation applies to the value returned by an Observable, but usually in the following way:

var foo = Rx.Observable.create((observer) = > {
  let i = 0
  setInterval((a)= > observer.next(i++), 1000)
})

foo.map(i= > i * 2).subscribe(i= > console.log(i))
/ / output:
/ / 0
/ / 2
/ / 4
// ...
Copy the code

Foo.map () returns a new Observable, equivalent to:

var foo2 = foo.map(i= > i * 2)
foo2.subscribe(i= > console.log(i))
Copy the code

The internal logic that foo2, an Observable, executes when subscribing can be simply seen as:

function subscribe(observer) {
  let mapFn = v= > v * 2
  foo.subscribe(v= > {
    observer.next(mapFn(v))
  })
}
Copy the code

Compare this processing of data to arrays:

var array = [0.1.2.3.4.5]
array.map(i= > i * 2).forEach(i= > console.log(i))
Copy the code

Isn’t it kind of like that?

In addition to map(), an Observable provides a variety of transformation methods, such as filter() for filtering data, find() for returning the first data that meets the criteria, and reduce() for cumulative data processing and returning the final data after execution. These methods function similarly to array methods, except that they process data returned asynchronously. Other conversion methods are more powerful, such as debounceTime(), which intercepts data in the temporal dimension, and so on.

The essence of the transform method of an Observable is to create a new Observable, which transforms the returned value of the original Observable based on certain logic and then pushes it to the observer.

conclusion

Observable is a strange function that has a lot of function-like features, such as encapsulating a piece of logic, reexecuting the logic every time it is called, returning data, etc. There are also more special features, such as data being pushed back to the calling method, the return value can be asynchronous, multiple values can be returned, etc.

But it’s helpful to think of Observables as special functions, at least in terms of understanding them.

An Observable is also known as a data stream in the sense that an Observable can return multiple values, while a data transform creates a new data stream based on the current data stream, for example:





observable.map(x => 10 * x)

But what you see above is just data, and when you treat an Observable as a special function, you shouldn’t forget its internal logic or how the data is generated.