RACChannel.m 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. //
  2. // RACChannel.m
  3. // ReactiveObjC
  4. //
  5. // Created by Uri Baghin on 01/01/2013.
  6. // Copyright (c) 2013 GitHub, Inc. All rights reserved.
  7. //
  8. #import "RACChannel.h"
  9. #import "RACDisposable.h"
  10. #import "RACReplaySubject.h"
  11. #import "RACSignal+Operations.h"
  12. @interface RACChannelTerminal<ValueType> ()
  13. /// The values for this terminal.
  14. @property (nonatomic, strong, readonly) RACSignal<ValueType> *values;
  15. /// A subscriber will will send values to the other terminal.
  16. @property (nonatomic, strong, readonly) id<RACSubscriber> otherTerminal;
  17. - (instancetype)initWithValues:(RACSignal<ValueType> *)values otherTerminal:(id<RACSubscriber>)otherTerminal;
  18. @end
  19. @implementation RACChannel
  20. - (instancetype)init {
  21. self = [super init];
  22. // We don't want any starting value from the leadingSubject, but we do want
  23. // error and completion to be replayed.
  24. RACReplaySubject *leadingSubject = [[RACReplaySubject replaySubjectWithCapacity:0] setNameWithFormat:@"leadingSubject"];
  25. RACReplaySubject *followingSubject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"followingSubject"];
  26. // Propagate errors and completion to everything.
  27. [[leadingSubject ignoreValues] subscribe:followingSubject];
  28. [[followingSubject ignoreValues] subscribe:leadingSubject];
  29. _leadingTerminal = [[[RACChannelTerminal alloc] initWithValues:leadingSubject otherTerminal:followingSubject] setNameWithFormat:@"leadingTerminal"];
  30. _followingTerminal = [[[RACChannelTerminal alloc] initWithValues:followingSubject otherTerminal:leadingSubject] setNameWithFormat:@"followingTerminal"];
  31. return self;
  32. }
  33. @end
  34. @implementation RACChannelTerminal
  35. #pragma mark Lifecycle
  36. - (instancetype)initWithValues:(RACSignal *)values otherTerminal:(id<RACSubscriber>)otherTerminal {
  37. NSCParameterAssert(values != nil);
  38. NSCParameterAssert(otherTerminal != nil);
  39. self = [super init];
  40. _values = values;
  41. _otherTerminal = otherTerminal;
  42. return self;
  43. }
  44. #pragma mark RACSignal
  45. - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
  46. return [self.values subscribe:subscriber];
  47. }
  48. #pragma mark <RACSubscriber>
  49. - (void)sendNext:(id)value {
  50. [self.otherTerminal sendNext:value];
  51. }
  52. - (void)sendError:(NSError *)error {
  53. [self.otherTerminal sendError:error];
  54. }
  55. - (void)sendCompleted {
  56. [self.otherTerminal sendCompleted];
  57. }
  58. - (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable {
  59. [self.otherTerminal didSubscribeWithDisposable:disposable];
  60. }
  61. @end