| 注册
请输入搜索内容

热门搜索

Java Linux MySQL PHP JavaScript Hibernate jQuery Nginx
simon066
8年前发布

ReactiveCocoa 中 RACSignal 所有变换操作底层实现分析(上)

   <p style="text-align:center"><img src="https://simg.open-open.com/show/5cf27fce6a1d15756f42137a06469fff.png"></p>    <h3><strong>前言</strong></h3>    <p>在ReactiveCocoa整个库中,RACSignal占据着比较重要的位置,而RACSignal的变换操作更是整个RACStream流操作核心之一。在上篇文章中也详细分析了bind操作的实现。RACsignal很多变换操作都是基于bind操作来实现的。在开始本篇底层实现分析之前,先简单回顾一下上篇文章中分析的bind函数,这是这篇文章分析的基础。</p>    <p>bind函数可以简单的缩写成下面这样子。</p>    <pre>  <code class="language-objectivec">- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block;  {    return [RACSignal createSignal:^RACDisposable *(id subscriber) {          RACStreamBindBlock bindBlock = block();          [self subscribeNext:^(id x) {    //(1)              BOOL stop = NO;              RACSignal *signal = (RACSignal *)bindBlock(x, &stop); //(2)              if (signal == nil || stop) {                  [subscriber sendCompleted];              } else {                  [signal subscribeNext:^(id x) {                      [subscriber sendNext:x];  //(3)                  } error:^(NSError *error) {                      [subscriber sendError:error];                  } completed:^{                  }];              }          } error:^(NSError *error) {              [subscriber sendError:error];          } completed:^{              [subscriber sendCompleted];          }];        return nil;      }];  }</code></pre>    <ul>     <li> <p>当bind变换之后的信号被订阅,就开始执行bind函数中return的block闭包。</p> </li>     <li> <p>在bind闭包中,先订阅原先的信号A。</p> </li>     <li> <p>在订阅原信号A的didSubscribe闭包中进行信号变换,变换中用到的block闭包是外部传递进来的,也就是bind函数的入参。变换结束,得到新的信号B</p> </li>    </ul>    <p>订阅新的信号B,拿到bind变化之后的信号的订阅者subscriber,对其发送新的信号值。</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/33974bc37b2d8910cdf25a20c52a2538.jpg"></p>    <p>简要的过程如上图,bind函数中进行了2次订阅的操作,第一次订阅是为了拿到signalA的值,第二次订阅是为了把signalB的新值发给bind变换之后得到的signalB的订阅者。</p>    <p>回顾完bind底层实现之后,就可以开始继续本篇文章的分析了。</p>    <h3><strong>目录</strong></h3>    <ul>     <li> <p>变换操作</p> </li>     <li> <p>时间操作</p> </li>    </ul>    <p><strong>一、变换操作</strong></p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/8c5cc831d6f93619fb58f3d2607c8b0a.png"></p>    <p>我们都知道RACSignal是继承自RACStream的,而在底层的RACStream上也定义了一些基本的信号变换的操作,所以这些操作在RACSignal上同样适用。如果在RACsignal中没有重写这些方法,那么调用这些操作,实际是调用的父类RACStream的操作。下面分析的时候,会把实际调用父类RACStream的操作的地方都标注出来。</p>    <p><strong>1.Map: (在父类RACStream中定义的)</strong></p>    <p>map操作一般是用来信号变换的。</p>    <pre>  <code class="language-objectivec">RACSignal *signalB = [signalA map:^id(NSNumber *value) {      return @([value intValue] * 10);  }];</code></pre>    <p style="text-align:center"><img src="https://simg.open-open.com/show/f9d10cbaa2f1f47a7510b88f2e54a23f.png"></p>    <p>来看看底层是如何实现的。</p>    <pre>  <code class="language-objectivec">- (instancetype)map:(id (^)(id value))block {      NSCParameterAssert(block != nil);      Class class = self.class;            return [[self flattenMap:^(id value) {                return [class return:block(value)];      }] setNameWithFormat:@"[%@] -map:", self.name];  }</code></pre>    <p>这里实现代码比较严谨,先判断self的类型。因为RACStream的子类中会有一些子类会重写这些方法,所以需要判断self的类型,在回调中可以回调到原类型的方法中去。</p>    <p>由于本篇文章中我们都分析RACSignal的操作,所以这里的self.class是RACDynamicSignal类型的。相应的在return返回值中也返回class,即RACDynamicSignal类型的信号。</p>    <p>从map实现代码上来看,map实现是用了flattenMap函数来实现的。把map的入参闭包,放到了flattenMap的返回值中。</p>    <p>在来看看flattenMap的实现:</p>    <pre>  <code class="language-objectivec">- (instancetype)flattenMap:(RACStream * (^)(id value))block {      Class class = self.class;        return [[self bind:^{          return ^(id value, BOOL *stop) {              id stream = block(value) ?: [class empty];              NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);                return stream;          };      }] setNameWithFormat:@"[%@] -flattenMap:", self.name];  }</code></pre>    <p>flattenMap算是对bind函数的一种封装。bind函数的入参是一个RACStreamBindBlock类型的闭包。而flattenMap函数的入参是一个value,返回值RACStream类型的闭包。</p>    <p>在flattenMap中,返回block(value)的信号,如果信号为nil,则返回[class empty]。</p>    <p>先来看看为空的情况。当block(value)为空,返回[RACEmptySignal empty],empty就是创建了一个RACEmptySignal类型的信号:</p>    <pre>  <code class="language-objectivec">+ (RACSignal *)empty {  #ifdef DEBUG      // Create multiple instances of this class in DEBUG so users can set custom      // names on each.      return [[[self alloc] init] setNameWithFormat:@"+empty"];  #else      static id singleton;      static dispatch_once_t pred;        dispatch_once(&pred, ^{          singleton = [[self alloc] init];      });        return singleton;  #endif  }</code></pre>    <p>RACEmptySignal类型的信号又是什么呢?</p>    <pre>  <code class="language-objectivec">- (RACDisposable *)subscribe:(id)subscriber {      NSCParameterAssert(subscriber != nil);        return [RACScheduler.subscriptionScheduler schedule:^{          [subscriber sendCompleted];      }];  }</code></pre>    <p>RACEmptySignal是RACSignal的子类,一旦订阅它,它就会同步的发送completed完成信号给订阅者。</p>    <p>所以flattenMap返回一个信号,如果信号不存在,就返回一个completed完成信号给订阅者。</p>    <p>再来看看flattenMap返回的信号是怎么变换的。</p>    <p>block(value)会把原信号发送过来的value传递给flattenMap的入参。flattenMap的入参是一个闭包,闭包的参数也是value的:</p>    <pre>  <code class="language-objectivec">^(id value) { return [class return:block(value)]; }</code></pre>    <p>这个闭包返回一个信号,信号类型和原信号的类型一样,即RACDynamicSignal类型的,值是block(value)。这里的闭包是外面map传进来的:</p>    <pre>  <code class="language-objectivec">^id(NSNumber *value) { return @([value intValue] * 10); }</code></pre>    <p>在这个闭包中把原信号的value值传进去进行变换,变换结束之后,包装成和原信号相同类型的信号,返回。返回的信号作为bind函数的闭包的返回值。这样订阅新的map之后的信号就会拿到变换之后的值。</p>    <p><strong>2.MapReplace: (在父类RACStream中定义的)</strong></p>    <p>一般用法如下:</p>    <pre>  <code class="language-objectivec">RACSignal *signalB = [signalA mapReplace:@"A"];</code></pre>    <p style="text-align:center"><img src="https://simg.open-open.com/show/49afddb69c5189c2379c7835bd5c0b53.png"></p>    <p>效果是不管A信号发送什么值,都替换成@“A”。</p>    <pre>  <code class="language-objectivec">- (instancetype)mapReplace:(id)object {      return [[self map:^(id _) {          return object;      }] setNameWithFormat:@"[%@] -mapReplace: %@", self.name, [object rac_description]];  }</code></pre>    <p>看底层源码就知道,它并不去关心原信号发送的是什么值,原信号发送什么值,都返回入参object的值。</p>    <p><strong>3.reduceEach:   (在父类RACStream中定义的)</strong></p>    <p>reduce是减少,聚合在一起的意思,reduceEach就是每个信号内部都聚合在一起。</p>    <pre>  <code class="language-objectivec">RACSignal *signalB = [signalA reduceEach:^id(NSNumber *num1 , NSNumber *num2){          return @([num1 intValue] + [num2 intValue]);  }];</code></pre>    <p style="text-align:center"><img src="https://simg.open-open.com/show/94905e2739e51e05451b1b078b83e26c.png"></p>    <p>reduceEach后面比如传入一个元组RACTuple类型,否则会报错。</p>    <pre>  <code class="language-objectivec">- (instancetype)reduceEach:(id (^)())reduceBlock {      NSCParameterAssert(reduceBlock != nil);        __weak RACStream *stream __attribute__((unused)) = self;      return [[self map:^(RACTuple *t) {          NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);          return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];      }] setNameWithFormat:@"[%@] -reduceEach:", self.name];  }</code></pre>    <p>这里有两个断言,一个是判断传入的reduceBlock闭包是否为空,另一个断言是判断闭包的入参是否是RACTuple类型的。</p>    <pre>  <code class="language-objectivec">@interface RACBlockTrampoline : NSObject  @property (nonatomic, readonly, copy) id block;  + (id)invokeBlock:(id)block withArguments:(RACTuple *)arguments;  @end</code></pre>    <p>RACBlockTrampoline就是一个保存了一个block闭包的对象,它会根据传进来的参数,动态的构造一个NSInvocation,并执行。</p>    <p>reduceEach把入参reduceBlock作为RACBlockTrampoline的入参invokeBlock传进去,以及每个RACTuple也传到RACBlockTrampoline中。</p>    <pre>  <code class="language-objectivec">- (id)invokeWithArguments:(RACTuple *)arguments {      SEL selector = [self selectorForArgumentCount:arguments.count];      NSInvocation *invocation = [NSInvocation invocationWithMethodSignature:[self methodSignatureForSelector:selector]];      invocation.selector = selector;      invocation.target = self;        for (NSUInteger i = 0; i < arguments.count; i++) {          id arg = arguments[i];          NSInteger argIndex = (NSInteger)(i + 2);          [invocation setArgument:&arg atIndex:argIndex];      }        [invocation invoke];        __unsafe_unretained id returnVal;      [invocation getReturnValue:&returnVal];      return returnVal;  }</code></pre>    <p>第一步就是先计算入参一个元组RACTuple里面元素的个数。</p>    <pre>  <code class="language-objectivec">- (SEL)selectorForArgumentCount:(NSUInteger)count {      NSCParameterAssert(count > 0);        switch (count) {          case 0: return NULL;          case 1: return @selector(performWith:);          case 2: return @selector(performWith::);          case 3: return @selector(performWith:::);          case 4: return @selector(performWith::::);          case 5: return @selector(performWith:::::);          case 6: return @selector(performWith::::::);          case 7: return @selector(performWith:::::::);          case 8: return @selector(performWith::::::::);          case 9: return @selector(performWith:::::::::);          case 10: return @selector(performWith::::::::::);          case 11: return @selector(performWith:::::::::::);          case 12: return @selector(performWith::::::::::::);          case 13: return @selector(performWith:::::::::::::);          case 14: return @selector(performWith::::::::::::::);          case 15: return @selector(performWith:::::::::::::::);      }        NSCAssert(NO, @"The argument count is too damn high! Only blocks of up to 15 arguments are currently supported.");      return NULL;  }</code></pre>    <p>可以看到最多支持元组内元素的个数是15个。</p>    <p>这里我们假设以元组里面有2个元素为例。</p>    <pre>  <code class="language-objectivec">- (id)performWith:(id)obj1 :(id)obj2 {      id (^block)(id, id) = self.block;      return block(obj1, obj2);  }</code></pre>    <p>对应的 <a href="/misc/goto?guid=4959728441815917290" rel="nofollow,noindex">Type Encoding</a> 如下:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/3782ecc61de892a891f7fd0637e185d3.png"></p>    <p>argument0和argument1分别对应着隐藏参数self和_cmd,所以对应着的类型是@和:,从argument2开始,就是入参的Type Encoding了。</p>    <p>所以在构造invocation的参数的时候,argIndex是要偏移2个位置的。即从(i + 2)开始设置参数。</p>    <p>当动态构造了一个invocation方法之后,[invocation invoke]调用这个动态方法,也就是执行了外部的reduceBlock闭包,闭包里面是我们想要信号变换的规则。</p>    <p>闭包执行结束得到returnVal返回值。这个返回值就是整个RACBlockTrampoline的返回值了。这个返回值也作为了map闭包里面的返回值。</p>    <p>接下去的操作就完全转换成了map的操作了。上面已经分析过map操作了,这里就不赘述了。</p>    <p><strong>4. reduceApply</strong></p>    <p>举个例子:</p>    <pre>  <code class="language-objectivec">    RACSignal *signalA = [RACSignal createSignal:                           ^RACDisposable *(id subscriber)                           {                               id block = ^id(NSNumber *first,NSNumber *second,NSNumber *third) {                                   return @(first.integerValue + second.integerValue * third.integerValue);                               };                               [subscriber sendNext:RACTuplePack(block,@2 , @3 , @8)];                               [subscriber sendNext:RACTuplePack((id)(^id(NSNumber *x){return @(x.intValue * 10);}),@9,@10,@30)];                               [subscriber sendCompleted];                               return [RACDisposable disposableWithBlock:^{                                   NSLog(@"signal dispose");                               }];                           }];      RACSignal *signalB = [signalA reduceApply];</code></pre>    <p>使用reduceApply的条件也是需要信号里面的值是元组RACTuple。不过这里和reduceEach不同的是,原信号中每个元祖RACTuple的第0位必须要为一个闭包,后面n位为这个闭包的入参,第0位的闭包有几个参数,后面就需要跟几个参数。</p>    <p>如上述例子中,第一个元组第0位的闭包有3个参数,所以第一个元组后面还要跟3个参数。第二个元组的第0位的闭包只有一个参数,所以后面只需要跟一个参数。</p>    <p>当然后面可以跟更多的参数,如第二个元组,闭包后面跟了3个参数,但是只有第一个参数是有效值,后面那2个参数是无效不起作用的。唯一需要注意的就是后面跟的参数个数一定不能少于第0位闭包入参的个数,否则就会报错。</p>    <p>上面例子输出</p>    <pre>  <code class="language-objectivec">26  // 26 = 2 + 3 * 8;  90  // 90 = 9 * 10;</code></pre>    <p>看看底层实现:</p>    <pre>  <code class="language-objectivec">- (RACSignal *)reduceApply {      return [[self map:^(RACTuple *tuple) {          NSCAssert([tuple isKindOfClass:RACTuple.class], @"-reduceApply must only be used on a signal of RACTuples. Instead, received: %@", tuple);          NSCAssert(tuple.count > 1, @"-reduceApply must only be used on a signal of RACTuples, with at least a block in tuple[0] and its first argument in tuple[1]");            // We can't use -array, because we need to preserve RACTupleNil          NSMutableArray *tupleArray = [NSMutableArray arrayWithCapacity:tuple.count];          for (id val in tuple) {              [tupleArray addObject:val];          }          RACTuple *arguments = [RACTuple tupleWithObjectsFromArray:[tupleArray subarrayWithRange:NSMakeRange(1, tupleArray.count - 1)]];            return [RACBlockTrampoline invokeBlock:tuple[0] withArguments:arguments];      }] setNameWithFormat:@"[%@] -reduceApply", self.name];  }</code></pre>    <p>这里也有2个断言,第一个是确保传入的参数是RACTuple类型,第二个断言是确保元组RACTuple里面的元素各种至少是2个。因为下面取参数是直接从1号位开始取的。</p>    <p>reduceApply做的事情和reduceEach基本是等效的,只不过变换规则的block闭包一个是外部传进去的,一个是直接打包在每个信号元组RACTuple中第0位中。</p>    <p><strong>5. materialize</strong></p>    <p>这个方法会把信号包装成RACEvent类型。</p>    <pre>  <code class="language-objectivec">- (RACSignal *)materialize {      return [[RACSignal createSignal:^(id subscriber) {          return [self subscribeNext:^(id x) {              [subscriber sendNext:[RACEvent eventWithValue:x]];          } error:^(NSError *error) {              [subscriber sendNext:[RACEvent eventWithError:error]];              [subscriber sendCompleted];          } completed:^{              [subscriber sendNext:RACEvent.completedEvent];              [subscriber sendCompleted];          }];      }] setNameWithFormat:@"[%@] -materialize", self.name];  }</code></pre>    <p>sendNext会包装成[RACEvent eventWithValue:x],error会包装成[RACEvent eventWithError:error],completed会被包装成RACEvent.completedEvent。注意,当原信号error和completed,新信号都会发送sendCompleted。</p>    <p><strong>6. dematerialize</strong></p>    <p>这个操作是materialize的逆向操作。它会把包装成RACEvent信号重新还原为正常的值信号。</p>    <pre>  <code class="language-objectivec">- (RACSignal *)dematerialize {      return [[self bind:^{          return ^(RACEvent *event, BOOL *stop) {              switch (event.eventType) {                  case RACEventTypeCompleted:                      *stop = YES;                      return [RACSignal empty];                    case RACEventTypeError:                      *stop = YES;                      return [RACSignal error:event.error];                    case RACEventTypeNext:                      return [RACSignal return:event.value];              }          };      }] setNameWithFormat:@"[%@] -dematerialize", self.name];  }</code></pre>    <p>这里的实现也用到了bind函数,它会把原信号进行一个变换。新的信号会根据event.eventType进行转换。RACEventTypeCompleted被转换成[RACSignal empty],RACEventTypeError被转换成[RACSignal error:event.error],RACEventTypeNext被转换成[RACSignal return:event.value]。</p>    <p><strong>7. not</strong></p>    <pre>  <code class="language-objectivec">- (RACSignal *)not {      return [[self map:^(NSNumber *value) {          NSCAssert([value isKindOfClass:NSNumber.class], @"-not must only be used on a signal of NSNumbers. Instead, got: %@", value);                    return @(!value.boolValue);      }] setNameWithFormat:@"[%@] -not", self.name];  }</code></pre>    <p>not操作需要传入的值都是NSNumber类型的。不是NSNumber类型会报错。not操作会把每个NSNumber按照BOOL的规则,取非,当成新信号的值。</p>    <p><strong>8. and</strong></p>    <pre>  <code class="language-objectivec">- (RACSignal *)and {      return [[self map:^(RACTuple *tuple) {          NSCAssert([tuple isKindOfClass:RACTuple.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);          NSCAssert(tuple.count > 0, @"-and must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");            return @([tuple.rac_sequence all:^(NSNumber *number) {              NSCAssert([number isKindOfClass:NSNumber.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);                return number.boolValue;          }]);      }] setNameWithFormat:@"[%@] -and", self.name];  }</code></pre>    <p>and操作需要原信号的每个信号都是元组RACTuple类型的,因为只有这样,RACTuple类型里面的每个元素的值才能进行&运算。</p>    <p>and操作里面有3处断言。第一处,判断入参是不是元组RACTuple类型的。第二处,判断RACTuple类型里面至少包含一个NSNumber。第三处,判断RACTuple里面是否都是NSNumber类型,有一个不符合,都会报错。</p>    <pre>  <code class="language-objectivec">- (RACSequence *)rac_sequence {      return [RACTupleSequence sequenceWithTupleBackingArray:self.backingArray offset:0];  }</code></pre>    <p>RACTuple类型先转换成RACTupleSequence。</p>    <pre>  <code class="language-objectivec">+ (instancetype)sequenceWithTupleBackingArray:(NSArray *)backingArray offset:(NSUInteger)offset {      NSCParameterAssert(offset _tupleBackingArray = backingArray;      seq->_offset = offset;      return seq;  }</code></pre>    <p>backingArray是一个数组NSArry。这里关于RACTupleSequence和RACTuple会在以后的文章中详细分析,本篇以分析RACSignal为主。</p>    <p>RACTuple类型先转换成RACTupleSequence,即存成了一个数组。</p>    <pre>  <code class="language-objectivec">- (BOOL)all:(BOOL (^)(id))block {      NSCParameterAssert(block != NULL);        NSNumber *result = [self foldLeftWithStart:@YES reduce:^(NSNumber *accumulator, id value) {          return @(accumulator.boolValue && block(value));      }];      return result.boolValue;  }    - (id)foldLeftWithStart:(id)start reduce:(id (^)(id, id))reduce {      NSCParameterAssert(reduce != NULL);        if (self.head == nil) return start;        for (id value in self) {          start = reduce(start, value);      }        return start;  }</code></pre>    <p>for会遍历RACSequence里面存的每一个值,分别都去调用reduce( )闭包。start的初始值为YES。reduce( )闭包是:</p>    <pre>  <code class="language-objectivec">^(NSNumber *accumulator, id value) { return @(accumulator.boolValue && block(value)); }</code></pre>    <p>这里又会去调用block( )闭包:</p>    <pre>  <code class="language-objectivec">^(NSNumber *number) { return number.boolValue; }</code></pre>    <p>number是原信号RACTuple的第一个值。第一次循环reduce( )闭包是拿YES和原信号RACTuple的第一个值进行&计算。第二个循环reduce( )闭包是拿原信号RACTuple的第一个值和第二个值进行&计算,得到的值参与下一次循环,与第三个值进行&计算,如此下去。这也是折叠函数的意思,foldLeft从左边开始折叠。fold函数会从左至右,把RACTuple转换成的数组里面每个值都一个接着一个进行&计算。</p>    <p>每个RACTuple都map成这样的一个BOOL值。接下去信号就map成了一个新的信号。</p>    <p><strong>9. or</strong></p>    <pre>  <code class="language-objectivec">- (RACSignal *)or {      return [[self map:^(RACTuple *tuple) {          NSCAssert([tuple isKindOfClass:RACTuple.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);          NSCAssert(tuple.count > 0, @"-or must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");            return @([tuple.rac_sequence any:^(NSNumber *number) {              NSCAssert([number isKindOfClass:NSNumber.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);                return number.boolValue;          }]);      }] setNameWithFormat:@"[%@] -or", self.name];  }</code></pre>    <p>or操作的实现和and操作的实现大体类似。3处断言的作用和and操作完全一致,这里就不再赘述了。or操作的重点在any函数的实现上。or操作的入参也必须是RACTuple类型的。</p>    <pre>  <code class="language-objectivec">- (BOOL)any:(BOOL (^)(id))block {      NSCParameterAssert(block != NULL);        return [self objectPassingTest:block] != nil;  }      - (id)objectPassingTest:(BOOL (^)(id))block {      NSCParameterAssert(block != NULL);        return [self filter:block].head;  }      - (instancetype)filter:(BOOL (^)(id value))block {      NSCParameterAssert(block != nil);        Class class = self.class;        return [[self flattenMap:^ id (id value) {          if (block(value)) {              return [class return:value];          } else {              return class.empty;          }      }] setNameWithFormat:@"[%@] -filter:", self.name];  }</code></pre>    <p>any会依次判断RACTupleSequence数组里面的值,依次每个进行filter。如果value对应的BOOL值是YES,就转换成一个RACTupleSequence信号。如果对应的是NO,则转换成一个empty信号。</p>    <p>只要RACTuple为NO,就一直返回empty信号,直到BOOL值为YES,就返回1。map变换信号后变成成1。找到了YES之后的值就不会再判断了。如果没有找到YES,中间都是NO的话,一直遍历到数组最后一个,信号只能返回0。</p>    <p><strong>10. any:</strong></p>    <pre>  <code class="language-objectivec">- (RACSignal *)any:(BOOL (^)(id object))predicateBlock {      NSCParameterAssert(predicateBlock != NULL);        return [[[self materialize] bind:^{          return ^(RACEvent *event, BOOL *stop) {              if (event.finished) {                  *stop = YES;                  return [RACSignal return:@NO];              }                if (predicateBlock(event.value)) {                  *stop = YES;                  return [RACSignal return:@YES];              }                return [RACSignal empty];          };      }] setNameWithFormat:@"[%@] -any:", self.name];  }</code></pre>    <p>原信号会先经过materialize转换包装成RACEvent事件。依次判断predicateBlock(event.value)值的BOOL值,如果返回YES,就包装成RACSignal的新信号,发送YES出去,并且stop接下来的信号。如果返回MO,就返回[RACSignal empty]空信号。直到event.finished,返回[RACSignal return:@NO]。</p>    <p>所以any:操作的目的是找到第一个满足predicateBlock条件的值。找到了就返回YES的RACSignal的信号,如果没有找到,返回NO的RACSignal。</p>    <p><strong>11. any</strong></p>    <pre>  <code class="language-objectivec">- (RACSignal *)any {      return [[self any:^(id x) {          return YES;      }] setNameWithFormat:@"[%@] -any", self.name];  }</code></pre>    <p>any操作是any:操作中的一种情况。即predicateBlock闭包永远都返回YES,所以any操作之后永远都只能得到一个只发送一个YES的新信号。</p>    <p><strong>12. all:</strong></p>    <pre>  <code class="language-objectivec">- (RACSignal *)all:(BOOL (^)(id object))predicateBlock {      NSCParameterAssert(predicateBlock != NULL);        return [[[self materialize] bind:^{          return ^(RACEvent *event, BOOL *stop) {              if (event.eventType == RACEventTypeCompleted) {                  *stop = YES;                  return [RACSignal return:@YES];              }                if (event.eventType == RACEventTypeError || !predicateBlock(event.value)) {                  *stop = YES;                  return [RACSignal return:@NO];              }                return [RACSignal empty];          };      }] setNameWithFormat:@"[%@] -all:", self.name];  }</code></pre>    <p>all:操作和any:有点类似。原信号会先经过materialize转换包装成RACEvent事件。对原信号发送的每个信号都依次判断predicateBlock(event.value)是否是NO 或者event.eventType == RACEventTypeError。如果predicateBlock(event.value)返回NO或者出现了错误,新的信号都返回NO。如果一直都没出现问题,在RACEventTypeCompleted的时候发送YES。</p>    <p>all:可以用来判断整个原信号发送过程中是否有错误事件RACEventTypeError,或者是否存在predicateBlock为NO的情况。可以把predicateBlock设置成一个正确条件。如果原信号出现错误事件,或者不满足设置的错误条件,都会发送新信号返回NO。如果全过程都没有出错,或者都满足predicateBlock设置的条件,则一直到RACEventTypeCompleted,发送YES的新信号。</p>    <p><strong>13. repeat</strong></p>    <pre>  <code class="language-objectivec">- (RACSignal *)repeat {      return [[RACSignal createSignal:^(id subscriber) {          return subscribeForever(self,                                  ^(id x) {                                      [subscriber sendNext:x];                                  },                                  ^(NSError *error, RACDisposable *disposable) {                                      [disposable dispose];                                      [subscriber sendError:error];                                  },                                  ^(RACDisposable *disposable) {                                      // Resubscribe.                                  });      }] setNameWithFormat:@"[%@] -repeat", self.name];  }</code></pre>    <p>repeat操作返回一个subscribeForever闭包,闭包里面要传入4个参数。</p>    <pre>  <code class="language-objectivec">static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) {      next = [next copy];      error = [error copy];      completed = [completed copy];        RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];        RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) {          RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];          [compoundDisposable addDisposable:selfDisposable];            __weak RACDisposable *weakSelfDisposable = selfDisposable;            RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) {              @autoreleasepool {                  error(e, compoundDisposable);                  [compoundDisposable removeDisposable:weakSelfDisposable];              }                recurse();          } completed:^{              @autoreleasepool {                  completed(compoundDisposable);                  [compoundDisposable removeDisposable:weakSelfDisposable];              }                recurse();          }];            [selfDisposable addDisposable:subscriptionDisposable];      };        // Subscribe once immediately, and then use recursive scheduling for any      // further resubscriptions.      recursiveBlock(^{          RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];            RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock];          [compoundDisposable addDisposable:schedulingDisposable];      });        return compoundDisposable;  }</code></pre>    <p>subscribeForever有4个参数,第一个参数是原信号,第二个是传入的next闭包,第三个是error闭包,最后一个是completed闭包。</p>    <p>subscribeForever一进入这个函数就会调用recursiveBlock( )闭包,闭包中有一个recurse( )的入参的参数。在recursiveBlock( )闭包中对原信号RACSignal进行订阅。next,error,completed分别会先调用传进来的闭包。然后error,completed执行完error( )和completed( )闭包之后,还会继续再执行recurse( ),recurse( )是recursiveBlock的入参。</p>    <pre>  <code class="language-objectivec">- (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock {      RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];        [self scheduleRecursiveBlock:[recursiveBlock copy] addingToDisposable:disposable];      return disposable;  }    - (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable {      @autoreleasepool {          RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];          [disposable addDisposable:selfDisposable];            __weak RACDisposable *weakSelfDisposable = selfDisposable;            RACDisposable *schedulingDisposable = [self schedule:^{ // 此处省略 }];            [selfDisposable addDisposable:schedulingDisposable];      }  }</code></pre>    <p>先取到当前的currentScheduler,即recursiveScheduler,执行scheduleRecursiveBlock,在这个函数中,会调用schedule函数。这里的recursiveScheduler是RACQueueScheduler类型的。</p>    <pre>  <code class="language-objectivec">- (RACDisposable *)schedule:(void (^)(void))block {      NSCParameterAssert(block != NULL);        RACDisposable *disposable = [[RACDisposable alloc] init];        dispatch_async(self.queue, ^{          if (disposable.disposed) return;          [self performAsCurrentScheduler:block];      });        return disposable;  }</code></pre>    <p>如果原信号没有disposed,dispatch_async会继续执行block,在这个block中还会继续原信号的发送。所以原信号只要没有error信号,disposable.disposed就不会返回YES,就会一直调用block。所以在subscribeForever的error和completed的最后都会调用recurse( )闭包。error调用recurse( )闭包是为了结束调用block,结束所有的信号。completed调用recurse( )闭包是为了继续调用block( )闭包,也就是repeat的本质。原信号会继续发送信号,如此无限循环下去。</p>    <p><strong>14. retry:</strong></p>    <pre>  <code class="language-objectivec">- (RACSignal *)retry:(NSInteger)retryCount {      return [[RACSignal createSignal:^(id subscriber) {          __block NSInteger currentRetryCount = 0;          return subscribeForever(self,                                  ^(id x) {                                      [subscriber sendNext:x];                                  },                                  ^(NSError *error, RACDisposable *disposable) {                                      if (retryCount == 0 || currentRetryCount < retryCount) {                                          // Resubscribe.                                          currentRetryCount++;                                          return;                                      }                                        [disposable dispose];                                      [subscriber sendError:error];                                  },                                  ^(RACDisposable *disposable) {                                      [disposable dispose];                                      [subscriber sendCompleted];                                  });      }] setNameWithFormat:@"[%@] -retry: %lu", self.name, (unsigned long)retryCount];  }</code></pre>    <p>在retry:的实现中,和repeat实现的区别是中间加入了一个currentRetryCount值。如果currentRetryCount > retryCount的话,就会在error中调用[disposable dispose],这样subscribeForever就不会再无限循环下去了。</p>    <p>所以retry:操作的用途就是在原信号在出现error的时候,重试retryCount的次数,如果依旧error,那么就会停止重试。</p>    <p>如果原信号没有发生错误,那么原信号在发送结束,subscribeForever也就结束了。retry:操作对于没有任何error的信号相当于什么都没有发生。</p>    <p><strong>15. retry</strong></p>    <pre>  <code class="language-objectivec">- (RACSignal *)retry {      return [[self retry:0] setNameWithFormat:@"[%@] -retry", self.name];  }</code></pre>    <p>这里的retry操作就是一个无限重试的操作。因为retryCount设置成0之后,在error的闭包中中,retryCount 永远等于 0,原信号永远都不会被dispose,所以subscribeForever会一直无限重试下去。</p>    <p>同样的,如果对一个没有error的信号调用retry操作,也是不起任何作用的。</p>    <p><strong>16. scanWithStart: reduceWithIndex: (在父类RACStream中定义的)</strong></p>    <p>先写出测试代码:</p>    <pre>  <code class="language-objectivec">    RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id subscriber)                           {                               [subscriber sendNext:@1];                               [subscriber sendNext:@1];                               [subscriber sendNext:@4];                               return [RACDisposable disposableWithBlock:^{                               }];                           }];        RACSignal *signalB = [signalA scanWithStart:@(2) reduceWithIndex:^id(NSNumber * running, NSNumber * next, NSUInteger index) {          return @(running.intValue * next.intValue + index);      }];</code></pre>    <pre>  <code class="language-objectivec">2    // 2 * 1 + 0 = 2  3    // 2 * 1 + 1 = 3  14   // 3 * 4 + 2 = 14</code></pre>    <p style="text-align:center"><img src="https://simg.open-open.com/show/8e223697785c21fed254d36f0c50eb66.png"></p>    <p>scanWithStart这个变换由初始值,变换函数reduceBlock( ),和index步进的变量组成。原信号的每个信号都会由变换函数reduceBlock( )进行变换。index每次都是自增。变换的初始值是由入参startingValue传入的。</p>    <pre>  <code class="language-objectivec">- (instancetype)scanWithStart:(id)startingValue reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {      NSCParameterAssert(reduceBlock != nil);        Class class = self.class;        return [[self bind:^{          __block id running = startingValue;          __block NSUInteger index = 0;            return ^(id value, BOOL *stop) {              running = reduceBlock(running, value, index++);              return [class return:running];          };      }] setNameWithFormat:@"[%@] -scanWithStart: %@ reduceWithIndex:", self.name, [startingValue rac_description]];  }</code></pre>    <p><strong>17. scanWithStart: reduce: (在父类RACStream中定义的)</strong></p>    <pre>  <code class="language-objectivec">- (instancetype)scanWithStart:(id)startingValue reduce:(id (^)(id running, id next))reduceBlock {      NSCParameterAssert(reduceBlock != nil);        return [[self               scanWithStart:startingValue               reduceWithIndex:^(id running, id next, NSUInteger index) {                   return reduceBlock(running, next);               }]              setNameWithFormat:@"[%@] -scanWithStart: %@ reduce:", self.name, [startingValue rac_description]];  }</code></pre>    <p style="text-align:center"><img src="https://simg.open-open.com/show/93ea1ba1f9a30b0f2cc5dcfa99b475a2.png"></p>    <p>scanWithStart: reduce:就是scanWithStart: reduceWithIndex: 的缩略版。变换函数也是外面闭包reduceBlock( )传进来的。只不过变换过程中不会使用index自增的这个变量。</p>    <p><strong>18. aggregateWithStart: reduceWithIndex:</strong></p>    <pre>  <code class="language-objectivec">- (RACSignal *)aggregateWithStart:(id)start reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {      return [[[[self                 scanWithStart:start reduceWithIndex:reduceBlock]                startWith:start]               takeLast:1]              setNameWithFormat:@"[%@] -aggregateWithStart: %@ reduceWithIndex:", self.name, [start rac_description]];  }</code></pre>    <p style="text-align:center"><img src="https://simg.open-open.com/show/30620c5bc6571ab69756b38ab7a94177.png"></p>    <p>aggregate是合计的意思。所以最后变换出来的信号只有最后一个值。</p>    <p>aggregateWithStart: reduceWithIndex:操作调用了scanWithStart: reduceWithIndex:,原理和它完全一致。不同的是多了两步额外的操作,一个是startWith:,一个是takeLast:1。startWith:是在scanWithStart: reduceWithIndex:变换之后的信号之前加上start信号。takeLast:1是取最后一个信号。takeLast:和startWith:的详细分析文章下面会详述。</p>    <p>值得注意的一点是,原信号如果没有发送complete信号,那么该函数就不会输出新的信号值。因为在一直等待结束。</p>    <p><strong>19. aggregateWithStart: reduce:</strong></p>    <pre>  <code class="language-objectivec">- (RACSignal *)aggregateWithStart:(id)start reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {      return [[[[self                 scanWithStart:start reduceWithIndex:reduceBlock]                startWith:start]               takeLast:1]              setNameWithFormat:@"[%@] -aggregateWithStart: %@ reduceWithIndex:", self.name, [start rac_description]];  }</code></pre>    <p style="text-align:center"><img src="https://simg.open-open.com/show/1575e007ab104aa12b784706a8001009.png"></p>    <p>aggregateWithStart: reduce:调用aggregateWithStart: reduceWithIndex:函数,只不过没有只用index值。同样,如果原信号没有发送complete信号,也不会输出任何信号。</p>    <p><strong>20. aggregateWithStartFactory: reduce:</strong></p>    <pre>  <code class="language-objectivec">- (RACSignal *)aggregateWithStartFactory:(id (^)(void))startFactory reduce:(id (^)(id running, id next))reduceBlock {      NSCParameterAssert(startFactory != NULL);      NSCParameterAssert(reduceBlock != NULL);        return [[RACSignal defer:^{          return [self aggregateWithStart:startFactory() reduce:reduceBlock];      }] setNameWithFormat:@"[%@] -aggregateWithStartFactory:reduce:", self.name];  }</code></pre>    <p>aggregateWithStartFactory: reduce:内部实现就是调用aggregateWithStart: reduce:,只不过入参多了一个产生start的startFactory( )闭包罢了。</p>    <p><strong>21. collect</strong></p>    <pre>  <code class="language-objectivec">- (RACSignal *)collect {      return [[self aggregateWithStartFactory:^{          return [[NSMutableArray alloc] init];      } reduce:^(NSMutableArray *collectedValues, id x) {          [collectedValues addObject:(x ?: NSNull.null)];          return collectedValues;      }] setNameWithFormat:@"[%@] -collect", self.name];  }</code></pre>    <p style="text-align:center"><img src="https://simg.open-open.com/show/565cdda3961f2a9648d9ca3115547297.png"></p>    <p>collect函数会调用aggregateWithStartFactory: reduce:方法。把所有原信号的值收集起来,保存在NSMutableArray中。</p>    <h3><strong>二、时间操作</strong></h3>    <p style="text-align:center"><img src="https://simg.open-open.com/show/45d4d8b232ea11a4816177785be2154e.png"></p>    <p><strong>1. throttle:valuesPassingTest:</strong></p>    <p>这个操作传入一个时间间隔NSTimeInterval,和一个判断条件的闭包predicate。原信号在一个时间间隔NSTimeInterval之间发送的信号,如果还能满足predicate,则原信号都被“吞”了,直到一个时间间隔NSTimeInterval结束,会再次判断predicate,如果不满足了,原信号就会被发送出来。</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/4d870d7e07517972dd252470a342acf9.png"></p>    <p>如上图,原信号发送1以后,间隔NSTimeInterval的时间内,没有信号发出,并且predicate也为YES,就把1变换成新的信号发出去。接下去由于原信号发送2,3,4的过程中,都在间隔NSTimeInterval的时间内,所以都被“吞”了。直到原信号发送5之后,间隔NSTimeInterval的时间内没有新的信号发出,所以把原信号的5发送出来。原信号的6也是如此。</p>    <p>再来看看具体实现:</p>    <pre>  <code class="language-objectivec">- (RACSignal *)throttle:(NSTimeInterval)interval valuesPassingTest:(BOOL (^)(id next))predicate {      NSCParameterAssert(interval >= 0);      NSCParameterAssert(predicate != nil);        return [[RACSignal createSignal:^(id subscriber) {          RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];            RACScheduler *scheduler = [RACScheduler scheduler];            __block id nextValue = nil;          __block BOOL hasNextValue = NO;          RACSerialDisposable *nextDisposable = [[RACSerialDisposable alloc] init];            void (^flushNext)(BOOL send) = ^(BOOL send) { // 暂时省略 };            RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {              // 暂时省略          } error:^(NSError *error) {              [compoundDisposable dispose];              [subscriber sendError:error];          } completed:^{              flushNext(YES);              [subscriber sendCompleted];          }];            [compoundDisposable addDisposable:subscriptionDisposable];          return compoundDisposable;      }] setNameWithFormat:@"[%@] -throttle: %f valuesPassingTest:", self.name, (double)interval];  }</code></pre>    <p>看这段实现,里面有2处断言。会先判断传入的interval是否大于0,小于0当然是不行的。还有一个就是传入的predicate闭包不能为空,这个是接下来用来控制流程的。</p>    <p>接下来的实现还是按照套路来,返回值是一个信号,新信号的闭包里面再订阅原信号进行变换。</p>    <p>那么整个变换的重点就落在了flushNext闭包和订阅原信号subscribeNext闭包中了。</p>    <p>当新的信号一旦被订阅,闭包执行到此处,就会对原信号进行订阅。</p>    <pre>  <code class="language-objectivec">[self subscribeNext:^(id x) {      RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;      BOOL shouldThrottle = predicate(x);        @synchronized (compoundDisposable) {             flushNext(NO);          if (!shouldThrottle) {              [subscriber sendNext:x];              return;          }          nextValue = x;          hasNextValue = YES;          nextDisposable.disposable = [delayScheduler afterDelay:interval schedule:^{              flushNext(YES);          }];      }  }</code></pre>    <ul>     <li> <p>首先先创建一个delayScheduler。先判断当前的currentScheduler是否存在,不存在就取之前创建的[RACScheduler scheduler]。这里虽然两处都是RACTargetQueueScheduler类型的,但是currentScheduler是com.ReactiveCocoa.RACScheduler.mainThreadScheduler,而[RACScheduler scheduler]创建的是com.ReactiveCocoa.RACScheduler.backgroundScheduler。</p> </li>     <li> <p>调用predicate( )闭包,传入原信号发来的信号值x,经过predicate判断以后,得到是否打开节流开关的BOOL变量shouldThrottle。</p> </li>     <li> <p>之所以把RACCompoundDisposable作为线程间互斥信号量,因为RACCompoundDisposable里面会加入所有的RACDisposable信号。接着下面的操作用@synchronized给线程间加锁。</p> </li>     <li> <p>flushNext( )这个闭包是为了hook住原信号的发送。</p> </li>    </ul>    <pre>  <code class="language-objectivec">void (^flushNext)(BOOL send) = ^(BOOL send) {      @synchronized (compoundDisposable) {          [nextDisposable.disposable dispose];                    if (!hasNextValue) return;          if (send) [subscriber sendNext:nextValue];                    nextValue = nil;          hasNextValue = NO;      }  };</code></pre>    <p>这个闭包中如果传入的是NO,那么原信号就无法立即sendNext。如果传入的是YES,并且hasNextValue = YES,原信号待发送的还有值,那么就发送原信号。</p>    <p>shouldThrottle是一个阀门,随时控制原信号是否可以被发送。</p>    <p>小结一下,每个原信号发送过来,通过在throttle:valuesPassingTest:里面的did subscriber闭包中进行订阅。这个闭包中主要干了4件事情:</p>    <ul>     <li> <p>调用flushNext(NO)闭包判断能否发送原信号的值。入参为NO,不发送原信号的值。</p> </li>     <li> <p>判断阀门条件predicate(x)能否发送原信号的值。</p> </li>     <li> <p>如果以上两个条件都满足,nextValue中进行赋值为原信号发来的值,hasNextValue = YES代表当前有要发送的值。</p> </li>     <li> <p>开启一个delayScheduler,延迟interval的时间,发送原信号的这个值,即调用flushNext(YES)。</p> </li>    </ul>    <p>现在再来分析一下整个throttle:valuesPassingTest:的全过程</p>    <p>原信号发出第一个值,如果在interval的时间间隔内,没有新的信号发送,那么delayScheduler延迟interval的时间,执行flushNext(YES),发送原信号的这个第一个值。</p>    <pre>  <code class="language-objectivec">- (RACDisposable *)afterDelay:(NSTimeInterval)delay schedule:(void (^)(void))block {      return [self after:[NSDate dateWithTimeIntervalSinceNow:delay] schedule:block];  }    - (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {      NSCParameterAssert(date != nil);      NSCParameterAssert(block != NULL);        RACDisposable *disposable = [[RACDisposable alloc] init];        dispatch_after([self.class wallTimeWithDate:date], self.queue, ^{          if (disposable.disposed) return;          [self performAsCurrentScheduler:block];      });        return disposable;  }</code></pre>    <p>注意,在dispatch_after闭包里面之前[self performAsCurrentScheduler:block]之前,有一个关键的判断:</p>    <pre>  <code class="language-objectivec">if (disposable.disposed) return;</code></pre>    <p>这个判断就是用来判断从第一个信号发出,在间隔interval的时间之内,还有没有其他信号存在。如果有,第一个信号肯定会disposed,这里会执行return,所以也就不会把第一个信号发送出来了。</p>    <p>这样也就达到了节流的目的:原来每个信号都会创建一个delayScheduler,都会延迟interval的时间,在这个时间内,如果原信号再没有发送新值,即原信号没有disposed,就把原信号的值发出来;如果在这个时间内,原信号还发送了一个新值,那么第一个值就被丢弃。在发送过程中,每个信号都要判断一次predicate( ),这个是阀门的开关,如果随时都不节流了,原信号发的值就需要立即被发送出来。</p>    <p>还有二点需要注意的是,第一点,正好在interval那一时刻,有新信号发送出来,原信号也会被丢弃,即只有在>=interval的时间之内,原信号没有发送新值,原来的这个值才能发送出来。第二点,原信号发送completed时,会立即执行flushNext(YES),把原信号的最后一个值发送出来。</p>    <p><strong>2. throttle:</strong></p>    <pre>  <code class="language-objectivec">- (RACSignal *)throttle:(NSTimeInterval)interval {      return [[self throttle:interval valuesPassingTest:^(id _) {          return YES;      }] setNameWithFormat:@"[%@] -throttle: %f", self.name, (double)interval];  }</code></pre>    <p>这个操作其实就是调用了throttle:valuesPassingTest:方法,传入时间间隔interval,predicate( )闭包则永远返回YES,原信号的每个信号都执行节流操作。</p>    <p><strong>3. bufferWithTime:onScheduler:</strong></p>    <p>这个操作的实现是类似于throttle:valuesPassingTest:的实现。</p>    <pre>  <code class="language-objectivec">- (RACSignal *)bufferWithTime:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {      NSCParameterAssert(scheduler != nil);      NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);        return [[RACSignal createSignal:^(id subscriber) {          RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init];          NSMutableArray *values = [NSMutableArray array];            void (^flushValues)() = ^{              // 暂时省略          };            RACDisposable *selfDisposable = [self subscribeNext:^(id x) {              // 暂时省略          } error:^(NSError *error) {              [subscriber sendError:error];          } completed:^{              flushValues();              [subscriber sendCompleted];          }];            return [RACDisposable disposableWithBlock:^{              [selfDisposable dispose];              [timerDisposable dispose];          }];      }] setNameWithFormat:@"[%@] -bufferWithTime: %f onScheduler: %@", self.name, (double)interval, scheduler];  }</code></pre>    <p>bufferWithTime:onScheduler:的实现和throttle:valuesPassingTest:的实现给出类似。开始有2个断言,2个都是判断scheduler的,第一个断言是判断scheduler是否为nil。第二个断言是判断scheduler的类型的,scheduler类型不能是immediateScheduler类型的,因为这个方法是要缓存一些信号的,所以不能是immediateScheduler类型的。</p>    <pre>  <code class="language-objectivec">RACDisposable *selfDisposable = [self subscribeNext:^(id x) {      @synchronized (values) {          if (values.count == 0) {              timerDisposable.disposable = [scheduler afterDelay:interval schedule:flushValues];          }          [values addObject:x ?: RACTupleNil.tupleNil];      }  }</code></pre>    <p>在subscribeNext中,当数组里面是没有存任何原信号的值,就会开启一个scheduler,延迟interval时间,执行flushValues闭包。如果里面有值了,就继续加到values的数组中。关键的也是闭包里面的内容,代码如下:</p>    <pre>  <code class="language-objectivec">void (^flushValues)() = ^{      @synchronized (values) {          [timerDisposable.disposable dispose];            if (values.count == 0) return;            RACTuple *tuple = [RACTuple tupleWithObjectsFromArray:values];          [values removeAllObjects];          [subscriber sendNext:tuple];      }  };</code></pre>    <p>flushValues( )闭包里面主要是把数组包装成一个元组,并且全部发送出来,原数组里面就全部清空了。这也是bufferWithTime:onScheduler:的作用,在interval时间内,把这个时间间隔内的原信号都缓存起来,并且在interval的那一刻,把这些缓存的信号打包成一个元组,发送出来。</p>    <p>和throttle:valuesPassingTest:方法一样,在原信号completed的时候,立即执行flushValues( )闭包,把里面存的值都发送出来。</p>    <p><strong>4. delay:</strong></p>    <p>delay:函数的操作和上面几个套路都是一样的,实现方式也都是模板式的,唯一的不同都在subscribeNext中,和一个判断是否发送的闭包中。</p>    <pre>  <code class="language-objectivec">- (RACSignal *)delay:(NSTimeInterval)interval {      return [[RACSignal createSignal:^(id subscriber) {          RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];            // We may never use this scheduler, but we need to set it up ahead of          // time so that our scheduled blocks are run serially if we do.          RACScheduler *scheduler = [RACScheduler scheduler];            void (^schedule)(dispatch_block_t) = ^(dispatch_block_t block) {              // 暂时省略          };            RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {              // 暂时省略          } error:^(NSError *error) {              [subscriber sendError:error];          } completed:^{              schedule(^{                  [subscriber sendCompleted];              });          }];            [disposable addDisposable:subscriptionDisposable];          return disposable;      }] setNameWithFormat:@"[%@] -delay: %f", self.name, (double)interval];  }</code></pre>    <p>在delay:的subscribeNext中,就单纯的执行了schedule的闭包。</p>    <pre>  <code class="language-objectivec">        RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {              schedule(^{                  [subscriber sendNext:x];              });          }</code></pre>    <pre>  <code class="language-objectivec">  void (^schedule)(dispatch_block_t) = ^(dispatch_block_t block) {            RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;            RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block];            [disposable addDisposable:schedulerDisposable];        };</code></pre>    <p>在schedule闭包中做的时间就是延迟interval的时间发送原信号的值。</p>    <p><strong>5. interval:onScheduler:withLeeway:</strong></p>    <pre>  <code class="language-objectivec">+ (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler withLeeway:(NSTimeInterval)leeway {      NSCParameterAssert(scheduler != nil);      NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);        return [[RACSignal createSignal:^(id subscriber) {          return [scheduler after:[NSDate dateWithTimeIntervalSinceNow:interval] repeatingEvery:interval withLeeway:leeway schedule:^{              [subscriber sendNext:[NSDate date]];          }];      }] setNameWithFormat:@"+interval: %f onScheduler: %@ withLeeway: %f", (double)interval, scheduler, (double)leeway];  }</code></pre>    <p>在这个操作中,实现代码不难。先来看看2个断言,都是保护入参类型的,scheduler不能为空,且不能是immediateScheduler的类型,原因和上面是一样的,这里是延迟操作。</p>    <p>主要的实现就在after:repeatingEvery:withLeeway:schedule:上了。</p>    <pre>  <code class="language-objectivec">- (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block {      NSCParameterAssert(date != nil);      NSCParameterAssert(interval > 0.0 && interval < INT64_MAX / NSEC_PER_SEC);      NSCParameterAssert(leeway >= 0.0 && leeway < INT64_MAX / NSEC_PER_SEC);      NSCParameterAssert(block != NULL);        uint64_t intervalInNanoSecs = (uint64_t)(interval * NSEC_PER_SEC);      uint64_t leewayInNanoSecs = (uint64_t)(leeway * NSEC_PER_SEC);        dispatch_source_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, self.queue);      dispatch_source_set_timer(timer, [self.class wallTimeWithDate:date], intervalInNanoSecs, leewayInNanoSecs);      dispatch_source_set_event_handler(timer, block);      dispatch_resume(timer);        return [RACDisposable disposableWithBlock:^{          dispatch_source_cancel(timer);      }];  }</code></pre>    <p>这里的实现就是用GCD在self.queue上创建了一个Timer,时间间隔是interval,修正时间是leeway。</p>    <p>leeway这个参数是为dispatch source指定一个期望的定时器事件精度,让系统能够灵活地管理并唤醒内核。例如系统可以使用leeway值来提前或延迟触发定时器,使其更好地与其它系统事件结合。创建自己的定时器时,应该尽量指定一个leeway值。不过就算指定leeway值为0,也不能完完全全期望定时器能够按照精确的纳秒来触发事件。</p>    <p>这个定时器在interval执行sendNext操作,也就是发送原信号的值。</p>    <p><strong>6. interval:onScheduler:</strong></p>    <pre>  <code class="language-objectivec">+ (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {      return [[RACSignal interval:interval onScheduler:scheduler withLeeway:0.0] setNameWithFormat:@"+interval: %f onScheduler: %@", (double)interval, scheduler];  }</code></pre>    <p>这个操作就是调用上一个方法interval:onScheduler:withLeeway:,只不过leeway = 0.0。具体实现上面已经分析过了,这里不再赘述。</p>    <h3><strong>最后</strong></h3>    <p>本来想穷尽分析每一个RACSignal的操作的实现,但是发现所有操作加起来实在太多,用一篇文章全部写完篇幅太长了,简书一篇放不下了,还是拆成几篇,RACSignal还剩过滤操作,多信号组合操作,冷热信号转换操作,高阶信号操作,下篇接着继续分析。最后请大家多多指教。</p>    <p> </p>    <p>来自:http://www.cocoachina.com/ios/20161206/18300.html</p>    <p> </p>    
 本文由用户 simon066 自行上传分享,仅供网友学习交流。所有权归原作者,若您的权利被侵害,请联系管理员。
 转载本站原创文章,请注明出处,并保留原始链接、图片水印。
 本站是一个以用户分享为主的开源技术平台,欢迎各类分享!
 本文地址:https://www.open-open.com/lib/view/open1481082261256.html
iOS开发 移动开发 ReactiveCocoa