preface

ReactiveCocoa is a (first?) Bring the paradigm of functional responsive programming to objective-C’s open source library. ReactiveCocoa was written by Josh Abernathy and Justin Spahr-Summers during the development of GitHub for Mac. Justin Spahr-Summers reached the first major milestone with his first commit on November 13, 2011 at 12.35pm until his 1.0 release was released on February 13, 2013 at 3.05am. The ReactiveCocoa community is also very active and ReactiveCocoa 5.0.0-alpha.3 has been completed in the latest release and is currently under development at 5.0.0-alpha.4.

ReactiveCocoa V2.5 is widely recognized as the most stable version of Objective-C and is therefore used by a large number of OC as the primary language clients. ReactiveCocoa V3.x is primarily based on Swift 1.2, while ReactiveCocoa V4.x is primarily based on Swift 2.x, and ReactiveCocoa 5.0 has full support for Swift 3.0 and maybe later Swift 4.0. The next few blogs will start with ReactiveCocoa V2.5 as an example to look at the RAC implementation of THE OC version (perhaps just after RAC 5.0 is analyzed). It’s a blessing written on the eve of ReactiveCocoa 5.0.

directory

  • 1. What is ReactiveCocoa?
  • 2. Core RACSignal send and subscribe process in RAC
  • 3. Core BIND implementation of the RACSignal operation
  • 4.RACSignal basic operation concat and zipWith implementation
  • 5. The last

I. What is ReactiveCocoa?

ReactiveCocoa (RAC for short) is a new framework for iOS and OS X development open-source by Github. RAC features functional programming (FP) and responsive programming (RP). It draws primarily from the design and implementation of.NET Reactive Extensions.

The purpose of ReactiveCocoa is Streams of values over time.

ReactiveCocoa mainly solves the following problems:

  • UI data binding

UI controls usually need to be bound to an event, and RAC can easily bind any data flow to the control.

  • User interaction event binding

RAC provides a series of methods for interactive UI controls to send Signal signals. These data flows are passed to each other in user interactions.

  • Solve the problem of too many dependencies between and between states

With RAC binding, you don’t have to worry about all sorts of complicated states, such as isSelect, isFinish… It also solves the problem that these states are difficult to maintain in the later stage.

  • Great unification of messaging mechanisms

There are several different message passing mechanisms in OC: Delegate, Block Callback, Target-Action, Timers, KVO, objC There is an article on how to choose between these five message delivery modes in OC: Communication Patterns. Now with RAC, all the above five methods can be handled with RAC.

Core RACSignal in RAC

One of the core concepts in ReactiveCocoa is the signal RACStream. There are two subclasses of RACRACStream — RACSignal and RACSequence. This article starts with an analysis of RACSignal.

We often see the following code:


RACSignal *signal = [RACSignal createSignal:
                     ^RACDisposable *(id<RACSubscriber> subscriber)
{
    [subscriber sendNext:@1];
    [subscriber sendNext:@2];
    [subscriber sendNext:@3];
    [subscriber sendCompleted];
    return [RACDisposable disposableWithBlock:^{
        NSLog(@"signal dispose");
    }];
}];
RACDisposable *disposable = [signal subscribeNext:^(id x) {
    NSLog(@"subscribe value = %@", x);
} error:^(NSError *error) {
    NSLog(@"error: %@", error);
} completed:^{
    NSLog(@"completed");
}];

[disposable dispose];Copy the code

This is the complete process by which RACSignal is subscribed. What happens in the process of being subscribed?


+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
 return [RACDynamicSignal createSignal:didSubscribe];
}Copy the code

When RACSignal calls createSignal, the RACDynamicSignal createSignal method is called.

RACDynamicSignal is a subclass of RACSignal. The parameter after createSignal is a block.


(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribeCopy the code

The return value of the block is RACDisposable, and the block is called didSubscribe. The only argument to a block is the ID type subscriber that must follow the RACSubscriber protocol.

RACSubscriber is a protocol under which there are four protocol methods:


@protocol RACSubscriber <NSObject>
@required

- (void)sendNext:(id)value;
- (void)sendError:(NSError *)error;
- (void)sendCompleted;
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;

@endCopy the code

So the task of creating Signal falls entirely on the RACDynamicSignal subclass.



@interface RACDynamicSignal(a)
// The block to invoke for each subscriber.
@property (nonatomic.copy.readonly) RACDisposable * (^didSubscribe)(id<RACSubscriber> subscriber);
@endCopy the code

RACDynamicSignal is a very simple class that holds a block called didSubscribe.



+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
     RACDynamicSignal *signal = [[self alloc] init];
     signal->_didSubscribe = [didSubscribe copy];
     return [signal setNameWithFormat:@"+createSignal:"];
}Copy the code

This method creates a new RACDynamicSignal object, signal, and stores the didSubscribe block passed in to the didSubscribe property of the newly created signal object. Finally name the signal +createSignal:.


- (instancetype)setNameWithFormat:(NSString *)format, ... {
 if (getenv("RAC_DEBUG_SIGNAL_NAMES") = =NULL) return self;

   NSCParameterAssert(format ! =nil);

   va_list args;
   va_start(args, format);

   NSString *str = [[NSString alloc] initWithFormat:format arguments:args];
   va_end(args);

   self.name = str;
   return self;
}Copy the code

SetNameWithFormat is a RACStream method, and since RACDynamicSignal inherits from RACSignal, it can also call this method.

The RACSignal block is thus saved, so when will it be executed?

Block closures are “released” at subscription time.

RACSignal calls the subscribeNext method, which returns a RACDisposable.


- (RACDisposable *)subscribeNext:(void(^) (id x))nextBlock error:(void(^) (NSError *error))errorBlock completed:(void(^) (void))completedBlock {
   NSCParameterAssert(nextBlock ! =NULL);
   NSCParameterAssert(errorBlock ! =NULL);
   NSCParameterAssert(completedBlock ! =NULL);

   RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
   return [self subscribe:o];
}Copy the code

In this method, a RACSubscriber object is created and nextBlock, errorBlock, completedBlock is passed.


@interface RACSubscriber(a)

// These callbacks should only be accessed while synchronized on self.
@property (nonatomic.copy) void (^next)(id value);
@property (nonatomic.copy) void (^error)(NSError *error);
@property (nonatomic.copy) void (^completed)(void);
@property (nonatomic.strong.readonly) RACCompoundDisposable *disposable;

@endCopy the code

The RACSubscriber class is very simple. It has only four properties: nextBlock, errorBlock, completedBlock and a RACCompoundDisposable signal.


+ (instancetype)subscriberWithNext:(void(^) (id x))next error:(void(^) (NSError *error))error completed:(void(^) (void))completed {
 RACSubscriber *subscriber = [[self alloc] init];

   subscriber->_next = [next copy];
   subscriber->_error = [error copy];
   subscriber->_completed = [completed copy];

   return subscriber;
}Copy the code

The subscriberWithNext method saves the three incoming blocks into their corresponding blocks respectively.

RACSignal calls the subscribeNext method. When RACSignal returns, it calls [self Subscribe: O], which is the subscribe method in the RACDynamicSignal class.



- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
 NSCParameterAssert(subscriber ! =nil);

   RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
   subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

   if (self.didSubscribe ! =NULL) {
      RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
      RACDisposable *innerDisposable = self.didSubscribe(subscriber);
      [disposable addDisposable:innerDisposable];
  }];

    [disposable addDisposable:schedulingDisposable];
 }

 return disposable;
}Copy the code

RACDisposable has 3 subclasses, one of which is RACCompoundDisposable.



@interface RACCompoundDisposable : RACDisposable
+ (instancetype)compoundDisposable;
+ (instancetype)compoundDisposableWithDisposables:(NSArray *)disposables;
- (void)addDisposable:(RACDisposable *)disposable;
- (void)removeDisposable:(RACDisposable *)disposable;
@endCopy the code

Although RACCompoundDisposable is a subclass of RACDisposable, it can add multiple RACDisposable objects. Dispose method can be called to destroy the signal at one go if necessary. When RACCompoundDisposable is disposed, it automatically dispose all RACCompoundDisposable in the container.

RACPassthroughSubscriber is a private class.


@interface RACPassthroughSubscriber : NSObject <RACSubscriber>
@property (nonatomic.strong.readonly) id<RACSubscriber> innerSubscriber;
@property (nonatomic.unsafe_unretained.readonly) RACSignal *signal;
@property (nonatomic.strong.readonly) RACCompoundDisposable *disposable;
- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable;
@endCopy the code

The RACPassthroughSubscriber class only has this method. The goal is to pass all signal events from one subscriber to another who is not disposed.

The RACPassthroughSubscriber class holds three very important objects, RACSubscriber, RACSignal, and RACCompoundDisposable. RACSubscriber is the subscriber of the signal to be forwarded. RACCompoundDisposable is the subscriber’s destruction object, and once it is disposed, the innerSubscriber can no longer receive the event stream.

The important thing to note here is that a RACSignal is saved internally, and its attribute is unsafe_unretained. This is different from the other two attributes, which are strong. This is not weak because the reference to RACSignal is just a probe for DTrace Probes dynamic tracing technology. If this parameter is set to weak, unnecessary performance loss is caused. So unsafe_unretained is enough.


- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable {
   NSCParameterAssert(subscriber ! =nil);

   self = [super init];
   if (self= =nil) return nil;

   _innerSubscriber = subscriber;
   _signal = signal;
   _disposable = disposable;

   [self.innerSubscriber didSubscribeWithDisposable:self.disposable];
   return self;
}Copy the code

Back in the SUBSCRIBE method in the RACDynamicSignal class, we now have the RACCompoundDisposable and RACPassthroughSubscriber objects created.


 if (self.didSubscribe ! =NULL) {
  RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
   RACDisposable *innerDisposable = self.didSubscribe(subscriber);
   [disposable addDisposable:innerDisposable];
  }];

  [disposable addDisposable:schedulingDisposable];
 }Copy the code

RACScheduler subscriptionScheduler is a global singleton.



+ (instancetype)subscriptionScheduler {
   static dispatch_once_t onceToken;
   static RACScheduler *subscriptionScheduler;
   dispatch_once(&onceToken, ^{
    subscriptionScheduler = [[RACSubscriptionScheduler alloc] init];
   });

   return subscriptionScheduler;
}Copy the code

The RACScheduler then proceeds to call the Schedule method.



- (RACDisposable *)schedule:(void(^) (void))block {
   NSCParameterAssert(block ! =NULL);
   if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
   block();
   return nil;
}Copy the code


+ (BOOL)isOnMainThread {
 return [NSOperationQueue.currentQueue isEqual:NSOperationQueue.mainQueue] || [NSThread isMainThread];
}

+ (instancetype)currentScheduler {
 RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
 if(scheduler ! =nil) return scheduler;
 if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;

 return nil;
}Copy the code

When fetching currentScheduler, it determines whether it exists and is in the main thread. If none is present, the background backgroundScheduler is called to execute schedule.

The entry to schedule is a block, and the block is executed when the schedule is executed. That is, to perform:


RACDisposable *innerDisposable = self.didSubscribe(subscriber);
   [disposable addDisposable:innerDisposable];Copy the code

These two key sentences. This is where the block stored in the signal is “released” and executed. The sentence self.didSubscribe(subscriber) executes the didSubscribe closure for the signal save.

In the didSubscribe closure there are sendNext, sendError, and sendCompleted, which call the corresponding methods in RACPassthroughSubscriber, respectively.


- (void)sendNext:(id)value {
 if (self.disposable.disposed) return;
 if (RACSIGNAL_NEXT_ENABLED()) {
  RACSIGNAL_NEXT(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString([value description]));
 }
 [self.innerSubscriber sendNext:value];
}

- (void)sendError:(NSError *)error {
 if (self.disposable.disposed) return;
 if (RACSIGNAL_ERROR_ENABLED()) {
  RACSIGNAL_ERROR(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString(error.description));
 }
 [self.innerSubscriber sendError:error];
}

- (void)sendCompleted {
 if (self.disposable.disposed) return;
 if (RACSIGNAL_COMPLETED_ENABLED()) {
  RACSIGNAL_COMPLETED(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description));
 }
 [self.innerSubscriber sendCompleted];
}Copy the code

The subscriber at this time is RACPassthroughSubscriber. The innerSubscriber in RACPassthroughSubscriber is the ultimate actual subscriber, and RACPassthroughSubscriber passes the value to the innerSubscriber.


- (void)sendNext:(id)value {
 @synchronized (self) {
  void (^nextBlock)(id) = [self.next copy];
  if (nextBlock == nil) return; nextBlock(value); }} - (void)sendError:(NSError *)e {
 @synchronized (self) {
  void (^errorBlock)(NSError *) = [self.error copy];
  [self.disposable dispose];

  if (errorBlock == nil) return; errorBlock(e); }} - (void)sendCompleted {
 @synchronized (self) {
  void (^completedBlock)(void) = [self.completed copy];
  [self.disposable dispose];

  if (completedBlock == nil) return; completedBlock(); }}Copy the code

InnerSubscriber is RACSubscriber. When calling sendNext, it will first copy its own self. Next closure and then call it, and the whole process is thread safe and protected by @synchronized. The closure for the final subscriber is invoked here.

The same is true for sendError and sendCompleted.

To sum up:

  1. RACSignal calls the subscribeNext method to create a new RACSubscriber.
  2. The newly created RACSubscriber will be stored in its own attribute variables including copy, nextBlock, errorBlock and completedBlock.
  3. RACDynamicSignal, a subclass of RACSignal, calls subscribe.
  4. Create RACCompoundDisposable and RACPassthroughSubscriber objects. RACPassthroughSubscriber saves the reference to RACSignal, RACSubscriber, and RACCompoundDisposable. Note that the reference to RACSignal is unsafe_unretained.
  5. RACDynamicSignal calls the didSubscribe closure. Call the corresponding sendNext, sendError, sendCompleted methods of RACPassthroughSubscriber first.
  6. RACPassthroughSubscriber then calls self.innerSubscriber, i.e. NextBlock of RACSubscriber, errorBlock, completedBlock. Note that the call is also copied and then executed using the closure.

The core bind implementation of RACSignal

The RACSignal source code contains two basic operations, concat and zipWith. But before we look at these two operations, let’s look at a more core function, the bind operation.

Here’s what the bind function does:

  1. Will subscribe to the original signal.
  2. Any time the original signal sends a value, the bound block is converted once.
  3. As soon as the bound block converts the value into a signal, it subscribes immediately and sends the value to the subscriber.
  4. Once the bound block terminates the binding, the original signal is complete.
  5. When all signals are complete, the completed signal is sent to the subscriber.
  6. If any error occurs during the signal, the error should be sent to subscriber

- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
 NSCParameterAssert(block ! =NULL);

 return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  RACStreamBindBlock bindingBlock = block();

  NSMutableArray *signals = [NSMutableArray arrayWithObject:self];

  RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

  void (^completeSignal)(RACSignal *, RACDisposable *) = ^(RACSignal *signal, RACDisposable *finishedDisposable) { /* * */ };
  void (^addSignal)(RACSignal *) = ^(RACSignal *signal) { /* * */ };

  @autoreleasepool {
   RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
   [compoundDisposable addDisposable:selfDisposable];

   RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
    // Manually check disposal to handle synchronous errors.
    if (compoundDisposable.disposed) return;

    BOOL stop = NO;
    id signal = bindingBlock(x, &stop);

    @autoreleasepool {
     if(signal ! =nil) addSignal(signal);
     if (signal == nil || stop) {
      [selfDisposable dispose];
      completeSignal(self, selfDisposable);
     }
    }
   } error:^(NSError *error) {
    [compoundDisposable dispose];
    [subscriber sendError:error];
   } completed:^{
    @autoreleasepool {
     completeSignal(self, selfDisposable); }}]; selfDisposable.disposable = bindingDisposable; }return compoundDisposable;
 }] setNameWithFormat:@"[%@] -bind:".self.name];
}Copy the code

To find out exactly what bind does, write the test code:


    RACSignal *signal = [RACSignal createSignal:
                         ^RACDisposable *(id<RACSubscriber> subscriber)
    {
        [subscriber sendNext:@1];
        [subscriber sendNext:@2];
        [subscriber sendNext:@3];
        [subscriber sendCompleted];
        return [RACDisposable disposableWithBlock:^{
            NSLog(@"signal dispose");
        }];
    }];

    RACSignal *bindSignal = [signal bind:^RACStreamBindBlock{
        return ^RACSignal *(NSNumber *value, BOOL *stop){
            value = @(value.integerValue * 2);
            return [RACSignal return:value];
        };
    }];

    [bindSignal subscribeNext:^(id x) {
        NSLog(@"subscribe value = %@", x);
    }];Copy the code

Since the first chapter explained the whole process of RACSignal creation and subscription in detail, this is also for the purpose of explaining the method, creating RACDynamicSignal, RACCompoundDisposable, RACPassthroughSubscriber and so on are skipped. Here’s a look at the creation and subscription process for the various closures of BIND.

In case the rest of the analysis will confuse the reader, the blocks to be used are numbered.


    RACSignal *signal = [RACSignal createSignal:
                         ^RACDisposable *(id<RACSubscriber> subscriber)
    {
        // block 1
    }

    RACSignal *bindSignal = [signal bind:^RACStreamBindBlock{
        // block 2
        return ^RACSignal *(NSNumber *value, BOOL *stop){
            // block 3
        };
    }];

    [bindSignal subscribeNext:^(id x) {
        // block 4
    }];

- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
        // block 5
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        // block 6
        RACStreamBindBlock bindingBlock = block();
        NSMutableArray *signals = [NSMutableArray arrayWithObject:self];

        void (^completeSignal)(RACSignal *, RACDisposable *) = ^(RACSignal *signal, RACDisposable *finishedDisposable) {
        // block 7
        };

        void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
        // block 8
            RACDisposable *disposable = [signal subscribeNext:^(id x) {
            // block 9
            }];
        };

        @autoreleasepool {
            RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
                // block 10
                id signal = bindingBlock(x, &stop);

                @autoreleasepool {
                    if(signal ! =nil) addSignal(signal);
                    if (signal == nil || stop) {
                        [selfDisposable dispose];
                        completeSignal(self, selfDisposable);
                    }
                }
            } error:^(NSError *error) {
                [compoundDisposable dispose];
                [subscriber sendError:error];
            } completed:^{
                @autoreleasepool {
                    completeSignal(self, selfDisposable); }}]; }returncompoundDisposable; }]; }Copy the code

So let’s create signal, didSubscribe and save the block1 copy.

When the signal calls bind, block5 is called, didSubscribe saves the block6 copy.

When subscribers start subscribing to bindSignal, the process is as follows:

  1. BindSignal executes a block of didSubscribe, that is, block6.
  2. The first line of code in block6 is to call RACStreamBindBlock bindingBlock = block(), where block is the block passed in from the outside, and block2 is called. After block2 is executed, a RACStreamBindBlock object is returned.
  3. Bind (self); signal (self); signal (self); signal (self); Block1 is therefore executed by subscribeNext.
  4. Block1 is executed, sendNext calls nextBlock of subscriber, and block10 is executed.
  5. Block10 calls bindingBlock first, which is the return value of the previous call to block2. This RACStreamBindBlock object holds block3. So start calling block3.
  6. In block3, the entry is a value, which is the value sent in Signal sendNext. In block3, the value can be transformed, and when completed, a new signal signal’ is returned.
  7. If the returned signal’ is empty, completeSignal is called, which calls block7. Block7 sends sendCompleted. If the returned signal’ is not empty, addSignal, which is block8, is called. Block8 continues to subscribe to Signal ‘. Perform block9.
  8. In block9, sendNext is the input parameter of block6, so calling sendNext on subscriber will be called to block4 of the subscriber of bindSignal.
  9. SendCompleted is also called after sendNext is executed in block9. Here you are executing the completed closure from Block9. completeSignal(signal, selfDisposable); And then I call the completeSignal, which is block7.
  10. Once block7 is executed, sendNext is sent from signal.

Bind completes the whole process.

RACSignal basic operation concat and zipWith implementation

Let’s look at the other two basic operations in RACSignal.

1. concat

Write the test code:



    RACSignal *signal = [RACSignal createSignal:
                         ^RACDisposable *(id<RACSubscriber> subscriber)
    {
        [subscriber sendNext:@1];
        [subscriber sendCompleted];
        return [RACDisposable disposableWithBlock:^{
            NSLog(@"signal dispose");
        }];
    }];


    RACSignal *signals = [RACSignal createSignal:
                         ^RACDisposable *(id<RACSubscriber> subscriber)
    {
        [subscriber sendNext:@2];
        [subscriber sendNext:@3];
        [subscriber sendNext:@6];
        [subscriber sendCompleted];
        return [RACDisposable disposableWithBlock:^{
            NSLog(@"signal dispose");
        }];
    }];

    RACSignal *concatSignal = [signal concat:signals];

    [concatSignal subscribeNext:^(id x) {
        NSLog(@"subscribe value = %@", x);
    }];Copy the code

A concat operation simply merges two signals. Note that the merge has an order of precedence.


- (RACSignal *)concat:(RACSignal *)signal {
   return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
    RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];

    RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
     // Send the value of the first signal
     [subscriber sendNext:x];
    } error:^(NSError *error) {
     [subscriber sendError:error];
    } completed:^{
     // Subscribe to the second signal
     RACDisposable *concattedDisposable = [signal subscribe:subscriber];
     serialDisposable.disposable = concattedDisposable;
  }];

    serialDisposable.disposable = sourceDisposable;
    return serialDisposable;
 }] setNameWithFormat:@"[%@] -concat: %@".self.name, signal];
}Copy the code

Before merging, signal and Signals each save a copy of their didSubscribe. After the merge, after the merge didSubscribe of the new signal will save a copy of the block.

When the merged signal is subscribed:

  1. Call didSubscribe for the new merge signal.
  2. Since it is the concat method called by the first signal, self in the block is the previous signal. The merged signal didSubscribe first subscribes to signal.
  3. Since you subscribed to Signal, start signal’s didSubscribe, sendNext, sendError.
  4. After the current signal sends sendCompleted, signals will subscribe to the next signal, which calls Signals’ didSubscribe.
  5. The latter signal starts sending sendNext, sendError, sendCompleted because it subscribed to the latter signal.

The two signals are then pieced together in an orderly fashion.

One thing to note here is that after the two concats are together, the end signal of the new signal ends at the end of the second signal. As described in the figure above, the sending length of the new signal is equal to the sum of the length of the previous two signals, and the end signal of the new signal after concat is also the end signal of the second signal.

2. zipWith

Write the test code:



    RACSignal *concatSignal = [signal zipWith:signals];

    [concatSignal subscribeNext:^(id x) {
        NSLog(@"subscribe value = %@", x);
    }];Copy the code

The source code is as follows:


- (RACSignal *)zipWith:(RACSignal *)signal {
    NSCParameterAssert(signal ! =nil);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        __block BOOL selfCompleted = NO;
        NSMutableArray *selfValues = [NSMutableArray array];

        __block BOOL otherCompleted = NO;
        NSMutableArray *otherValues = [NSMutableArray array];

        void (^sendCompletedIfNecessary)(void) = ^ {@synchronized (selfValues) {
                BOOL selfEmpty = (selfCompleted && selfValues.count == 0);
                BOOL otherEmpty = (otherCompleted && otherValues.count == 0);

                // If any of the signals are complete and the array is empty, the signal is complete
                if(selfEmpty || otherEmpty) [subscriber sendCompleted]; }};void (^sendNext)(void) = ^ {@synchronized (selfValues) {

                // Return if the array is empty.
                if (selfValues.count == 0) return;
                if (otherValues.count == 0) return;

                // Take the 0th bit of each array and pack it into a tuple
                RACTuple *tuple = RACTuplePack(selfValues[0], otherValues[0]);
                [selfValues removeObjectAtIndex:0];
                [otherValues removeObjectAtIndex:0];

                // Send the tuple[subscriber sendNext:tuple]; sendCompletedIfNecessary(); }};// Subscribe to first signal
        RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
            @synchronized (selfValues) {

                // Add the first signal value to the array
                [selfValues addObject:x ?: RACTupleNil.tupleNil];
                sendNext();
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            @synchronized (selfValues) {

                // Determine whether to send a completion signal when the subscription completes
                selfCompleted = YES; sendCompletedIfNecessary(); }}];// Subscribe to the second signal
        RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
            @synchronized (selfValues) {

                // Add the second signal to the array
                [otherValues addObject:x ?: RACTupleNil.tupleNil];
                sendNext();
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            @synchronized (selfValues) {

                // Determine whether to send a completion signal when the subscription completes
                otherCompleted = YES; sendCompletedIfNecessary(); }}];return [RACDisposable disposableWithBlock:^{

            // Destroy two signals
            [selfDisposable dispose];
            [otherDisposable dispose];
        }];
    }] setNameWithFormat:@"[%@] -zipWith: %@".self.name, signal];
}Copy the code

When the two signals are passed through zipWith, as in the image above, the two sides of the zipper are pulled together by the middle. Since it is a zipper, there are corresponding positions of one by one. The first position of the upper zipper can only be opposite to the first position of the lower zipper, so that the zippers can be pulled together.

Concrete implementation:

ZipWith has two arrays that store the values of the two signals.

  1. Once you subscribe to the signal after zipWith, you start executing the didSubscribe closure.
  2. The first signal is subscribed to in the closure. It is assumed that the first signal emits a value before the second signal. Each value emitted by the first signal is added to the first array and saved, and the sendNext() closure is called. In the sendNext() closure, it checks if both arrays are empty and returns if either array is empty. Since the second signal has not sent a value yet, that is, the array of the second signal is empty, so the first value here cannot be sent. So once the first signal is subscribed, the sent value is stored in the first array, not sent.
  3. The value of the second signal is then sent, and each time the second signal sends a value, it is stored in the second array, but when you call the sendNext() closure, you don’t return because there are values in both arrays, and there is a value at position 0 in both arrays. Once the value is there, it is packaged into a tuple RACTuple and sent out. And empties the values stored at position 0 of the two arrays.
  4. The two signals, one at a time, are stored in the array, and as long as there is a “pair” of other signals, they are packaged together as a tuple RACTuple and sent out. It can also be seen from the figure that for new signals after zipWith, the sending time of each signal is equal to the time when two signals send the latest signal.
  5. The completion time of the new signal is when either signal is complete and the array is empty. So finally the value of 5 sent by the first signal is discarded.

The values of 1,2,3,4 sent by the first signal, and the values of A, B, C, and D sent by the second signal, come together one by one, like A zipper holding them together. Because the five doesn’t match, the zipper won’t close.

5. The last

This article tried to analyze Map, combineLatest, flattenten, but the flattener Map turned out to be too many RACSingnal operations, so the flattener split the source file and analyzed all the operations in the RACSignal file. The main operations in RACSignal are bind, concat, and zipWith. The next article will reanalyze all the Operations in the RACSignal+Operations file.

Please give us more advice.