The functions of the Buffer operator are described

If it was an Observable

, I would have to process strings one by one in onNext. Using the Buffer operator, we can group n strings together and process them in onNext

use

You can manipulate data that needs to be processed together. For example, for a group of two, I always print the maximum, or for a group of three, I print the maximum.

Let’s do some code
btSub.setOnClickListener({

observable? .subscribe({ msg ->

tvContent.text = tvContent.text.toString() + "\n" + msg.toString()

})

})

observable = Observable.just("test1","test2","test3").buffer(2)

Copy the code

The purpose of this code is to print every two strings on a line of text.

So let’s learnjustWhat the hell is
 Observable.create(object : Observable.OnSubscribe<String> {

override fun call(t: Subscriber<in String>) {

t.onNext("test1")

t.onNext("test2")

t.onNext("test3")

}

})

.

Observable.just("test1","test2","test3")

Copy the code

This effect is consistent with the 2 lines of code (when we, source code, but will find that there are some different, but we are now learning operator, and then come and watch the farewell, actually very simple) I’m not here just for further, mainly composed of operator, if interested can look at ourselves.

Look at the source code

Observable

.

public final Observable<List<T>> buffer(int count) {

return buffer(count, count);

}

.

public final Observable<List<T>> buffer(int count, int skip) {

return lift(new OperatorBufferWithSize<T>(count, skip));

}

.

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {

return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));

}

.

Copy the code

Here we see a very cool thing called Lift. In the introduction, I also recommend a classic article about RxJava for you to read first.

Since we mentioned lift, we have to mention the Operator interface. In fact, I have already introduced the four most important classes or interfaces in RxJava. If you forget or don’t know much about them, you can go back to that article. I am the simplest operator — Create

 public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {

// cover for generics insanity

}



public interface Func1<T, R> extends Function {

R call(T t);

}

Copy the code

When you look at this code carefully, you will find that Operator is also a Func1, with only one call interface for converting a Subscriber to another.

Now that we know about the Operator, let’s move on to what Lift does. Look at the code step by step.

Before I introduced a way to read the source code, that is, we already know its function, with his function to read the source code, here is also applicable.

As an example, the demo code at the beginning of me actually converts Subscriber > to Subscriber

.

In fact, it is very difficult for me to understand it. Why is it to convert Subscriber to Subscriber

instead of Subscriber

to Subscriber >?


I’m not going to explain it, but I’m going to look at OperatorBufferWithSize

OperatorBufferWithSize

.

public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {

.

}

.

Copy the code

Because the logic that we’re used to thinking about is top to bottom, left to right, because we start with Test1, Test2, Test3, and then we do [Test1, Test2], [Test3], Therefore, it is normal to convert Subscriber

to Subscriber >. This is why I find the RxJava source code tiring.

There is a picture in the classic article introduced in the introduction, which I will not quote, but you can see for yourself, RxJava runs in the order of first down, then up, and then top to bottom. Let’s go straight to the source code to find the answer.

Observable

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {

return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));

}

Copy the code

OnSubscribeLift

.

public void call(Subscriber<? super R> o) {

try {

Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);

try {

st.onStart();

parent.call(st);

}

.

}

Copy the code

I won’t cover RxJavaHooks, which have been explained in previous articles, so this code is simplified to

.

public void call(Subscriber<? super R> o) {

try {

Subscriber<? super T> st = operator.call(o);

try {

st.onStart();

parent.call(st);

}

.

}

Copy the code

When we click the button in the demo, we call the call method above directly. The parent is an onSubscribe passed to the Observable, which is what we saw earlier

Observable.OnSubscribe<String> {

override fun call(t: Subscriber<in String>) {

t.onNext("test1")

t.onNext("test2")

t.onNext("test3")

}

}

Copy the code

So you can see why Subscriber is converted to Subscriber

. So let’s look at this code again.

OnSubscribeLift

.

public void call(Subscriber<? super R> o) {

try {

Subscriber<? super T> st = operator.call(o);

try {

st.onStart();

parent.call(st);

}

.

Copy the code

The parameter Subscriber
O is the Subscriber we passed in the subscribe method in the demo. So R is actually List

, and parent is called in the call method. Subscriber

is passed in the call method. The following method is called

Observable.OnSubscribe<String> {

override fun call(t: Subscriber<in String>) {

t.onNext("test1")

t.onNext("test2")

t.onNext("test3")

}

}

Copy the code

So I said, well, the way RxJava executes, the last OnSubscribe call is executed, and then we call OnSubscribe, and the first OnSubscribe calls the onNext method, and we execute it step by step.

Here we have an overview of how the method works, but we have not yet started parsing the actual implementation of the buffer operator.

So let’s move on

OperatorBufferWithSize

.

public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {

if (skip == count) {

BufferExact<T> parent = new BufferExact<T>(child, count);

.

return parent;

}

if (skip > count) {

BufferSkip<T> parent = new BufferSkip<T>(child, count, skip);

.

return parent;

}

BufferOverlap<T> parent = new BufferOverlap<T>(child, count, skip);

.

return parent;

}

.

Copy the code

Here, it can be clearly divided into 3 cases. Skip means that several elements are ignored, and count means that several elements are one group. What does that mean? Let’s just take an example here. Observable. Just (” Test1 “, “Test2”, “Test3”, “Test4”, “Test5”) skip=2,count=2, Observable = 3 [Test1 and Test2] “, “Test3, Test4”, “[Test5]”. If skip=3,count=2, then it will be divided into two groups, “[Test1,Test2]”, “[Test4,Test5]”.

I’m not going to give you an example, but I just want to show you that there are 3 different cases, and it doesn’t really matter, we just need to see one of them, and the other 2 are pretty much the same. Observable = Observable.just(“test1”, “test2”, “test3”).buffer(2) skip==2&&count==2.

So let’s just look at it

.

if (skip == count) {

BufferExact<T> parent = new BufferExact<T>(child, count);

.

return parent;

}

.

static final class BufferExact<T> extends Subscriber<T> {

final Subscriber<? super List<T>> actual;

final int count;



List<T> buffer;



public BufferExact(Subscriber<? super List<T>> actual, int count) {

this.actual = actual;

this.count = count;

this.request(0L);

}



@Override

public void onNext(T t) {

List<T> b = buffer;

if (b == null) {

b = new ArrayList<T>(count);

buffer = b;

}



b.add(t);



if (b.size() == count) {

buffer = null;

actual.onNext(b);

}

}



.

Copy the code

Operator is introduced to convert Subscriber > to Subscriber

, and BufferExact is the Subscriber

. So the main thing to look at is the implementation of onNext.

public void onNext(T t) {

List<T> b = buffer;

if (b == null) {

b = new ArrayList<T>(count);

buffer = b;

}



b.add(t);



if (b.size() == count) {

buffer = null;

actual.onNext(b);

}

}

Copy the code

The implementation is very simple, first save the data transferred from onNext in the List, and then call the following Subscriber > when the count number is enough.

So in general, the buffer Operator is simple to implement, but it looks complicated as it involves lift and Operator. You can figure that out for yourself.

additional

Let me just say

 Observable.create(object : Observable.OnSubscribe<String> {

override fun call(t: Subscriber<in String>) {

t.onNext("test1")

t.onNext("test2")

t.onNext("test3")

}

})

Observable.just("test1", "test2", "test3")

Copy the code

These two are about the same. Actually, we can go a little bit further.

Observable

.

public static <T> Observable<T> just(T t1, T t2, T t3) {

return from((T[])new Object[] { t1, t2, t3 });

}

.

public static <T> Observable<T> from(T[] array) {

int n = array.length;

if (n == 0) {

return empty();

} else

if (n == 1) {

return just(array[0]);

}

return unsafeCreate(new OnSubscribeFromArray<T>(array));

}

.

Copy the code

You’re essentially creating an OnSubscribe FromarRay, which is an OnSubscribe. further

OnSubscribeFromArray

.

public void call(Subscriber<? super T> child) {

child.setProducer(new FromArrayProducer<T>(child, array));

}

.

static final class FromArrayProducer<T>

extends AtomicLong

implements Producer {

.

@Override

public void request(long n) {

if (n < 0) {

throw new IllegalArgumentException("n >= 0 required but it was " + n);

}

if (n == Long.MAX_VALUE) {

if (BackpressureUtils.getAndAddRequest(this, n) == 0) {

fastPath();

}

} else

if (n ! = 0) {

if (BackpressureUtils.getAndAddRequest(this, n) == 0) {

slowPath(n);

}

}

}

.

Copy the code

If it’s OnSubscribe, we’re basically just looking at a call method. Here’s the direct child.setProducer. As we’ve seen before, setProducer can’t be a method that calls the request directly from the Producer. In the end, it turns out that there are two main methods to look at: fastPath and slowPath.

.

void fastPath() {

final Subscriber<? super T> child = this.child;



for (T t : array) {

if (child.isUnsubscribed()) {

return;

}



child.onNext(t);

}



if (child.isUnsubscribed()) {

return;

}

child.onCompleted();

}

.

void slowPath(long r) {

final Subscriber<? super T> child = this.child;

final T[] array = this.array;

final int n = array.length;



long e = 0L;

int i = index;



for (;;) {



while (r ! = 0L && i ! = n) {

if (child.isUnsubscribed()) {

return;

}



child.onNext(array[i]);



i++;



if (i == n) {

if (! child.isUnsubscribed()) {

child.onCompleted();

}

return;

}



r--;

e--;

}



r = get() + e;



if (r == 0L) {

index = i;

r = addAndGet(e);

if (r == 0L) {

return;

}

e = 0L;

}

}

}

.

Copy the code

It’s a little more refreshing to look at fastPath, which is to go through the array and call onNext one by one, so what is the array, which is “test1”, “test2”, “test3”.

The key code for slowPath is also onNext. Can not go into depth to understand its specific meaning, in fact, see the name can be seen, one is fast processing, one is slowly processing.

conclusion

As a whole, I have covered every line of code as much as possible, but the jump in the middle is really confusing to many people. If something is not clear enough, please point it out and I can fix it.

WeChat pay

Alipay