Книга: Angular 2 Cookbook
Назад: Creating an Observable authentication service using BehaviorSubjects
Дальше: Using QueryLists and Observables to follow changes in ViewChildren

Building a generalized Publish-Subscribe service to replace $broadcast, $emit, and $on

In Angular 1, the $emit and $broadcast behaviors were indeed very useful tools. They gave you the ability to send custom events upwards and downwards through the scope tree to any listeners that might be waiting for such an event. This pushed the developer towards a very useful pattern: the ability for many components to be able to transmit events to and from a central source. However, using $emit and $broadcast for such a purpose was grossly inappropriate; they had the effect of feeding the event through huge numbers of scopes only to reach the single intended target.

Note

In the previous edition of this book, the corresponding recipe demonstrated how to build a Publish-Subscribe service that used the $emit and $rootScope injection. The version in this recipe, although different in a handful of ways, achieves similar results in a substantially cleaner and more elegant fashion.

It is preferable to create a single entity that can serve as a generic throughway for events to pass from publishers to their subscribers.

Note

The code, links, and a live example of this are available at .

Getting ready

Begin with a skeleton service injected into a component:

[app/node.component.ts]      import {Component, Input} from '@angular/core';   import {PubSubService} from './publish-subscribe.service';      @Component({     selector: 'node',     template: `       <p>Heard {{count}} of {{subscribeChannel}}</p>       <button (click)="send()">Send {{publishChannel}}</button>     `   })   export class NodeComponent {     @Input() publishChannel:string;     @Input() subscribeChannel:string;     count:number = 0;        constructor(private pubSubService_:PubSubService) {}          send() {}   }   [app/publish-subscribe.service.ts]      import {Injectable} from '@angular/core';      @Injectable()   export class PubSubService {     constructor() {}        publish() {}        subscribe() {}   }   

How to do it...

The groundwork for this implementation should be pretty obvious. The service is going to host a single Subject instance that is going to funnel events of any type into the service and out through the observers of the Subject.

First, implement the following so that subscribe() and publish() actually do work when you involve the Subject instance:

[app/publish-subscribe.service.ts]      import {Injectable} from '@angular/core';   import {Subject} from 'rxjs/Subject';   import {Observable} from 'rxjs/Observable';   import {Observer} from 'rxjs/Observer';    import {Subscriber} from 'rxjs/Subscriber;      @Injectable()   export class PubSubService {     private publishSubscribeSubject_:Subject<any> = new Subject();     emitter_:Observable<any>;        constructor() {       this.emitter_ = this.publishSubscribeSubject_.asObservable();     }        publish(event:any):void {       this.publishSubscribeSubject_.next(event);     }        subscribe(handler:NextObserver<any>):Subscriber {       return this.emitter_.subscribe(handler);     }   }   

This is terrific for an initial implementation, but yields a problem: every event published to this service will be broadcasted to all the subscribers.

Introducing channel abstraction

It is possible and in fact quite easy to restrict publish and subscribe in such a way that they will only pay attention to the channel they specify. First, modify publish() to nest the event inside the emitted object:

[app/publish-subscribe.service.ts]      import {Injectable} from '@angular/core';   import {Subject} from 'rxjs/Subject';   import {Observable} from 'rxjs/Observable';   import {Observer} from 'rxjs/Observer';    import {Subscriber} from 'rxjs/Subscriber;   @Injectable()   export class PubSubService {     private publishSubscribeSubject_:Subject<any> = new Subject();     emitter_:Observable<any>;        constructor() {       this.emitter_ = this.publishSubscribeSubject_.asObservable();     }        publish(channel:string, event:any):void {       this.publishSubscribeSubject_.next({         channel: channel,         event: event       });     }        subscribe(handler:NextObserver<any>):Subscriber {       return this.emitter_.subscribe(handler);     }   }   

With this, you are now able to utilize some Observable behavior to restrict which events the subscription is paying attention to.

Observable emissions can have filter() and map() applied to them. filter() will return a new Observable instance that only emits whichever emissions evaluate as true in its filter function. map() returns a new Observable instance that transforms all emissions into a new value.

[app/publish-subscribe.service.ts]      import {Injectable} from '@angular/core';   import {Subject} from 'rxjs/Subject';   import {Observable} from 'rxjs/Observable';   import {Observer} from 'rxjs/Observer';    import {Subscriber} from 'rxjs/Subscriber;   import 'rxjs/add/operator/filter';   import 'rxjs/add/operator/map';      @Injectable()   export class PubSubService {     private publishSubscribeSubject_:Subject<any> = new Subject();     emitter_:Observable<any>;        constructor() {       this.emitter_ = this.publishSubscribeSubject_.asObservable();     }        publish(channel:string, event:any):void {       this.publishSubscribeSubject_.next({         channel: channel,         event: event       });     }        subscribe(channel:string, handler:((value:any) => void)):Subscriber {       return this.emitter_         .filter(emission => emission.channel === channel)         .map(emission => emission.event)         .subscribe(handler);     }   }   

Hooking components into the service

The service is complete, but the component doesn't yet have the ability to use it. Use the injected service to link the component to the channels specified by its input strings:

[app/node.component.ts]      import {Component, Input} from '@angular/core';   import {PubSubService} from './publish-subscribe.service';      @Component({     selector: 'node',     template: `       <p>Heard {{count}} of {{subscribeChannel}}</p>       <button (click)="send()">Send {{publishChannel}}</button>     `   })   export class NodeComponent {     @Input() publishChannel:string;     @Input() subscribeChannel:string;     count:number = 0;        constructor(private pubSubService_:PubSubService) {}        send() {       this.pubSubService_         .publish(this.publishChannel, {});     }        ngAfterViewInit() {       this.pubSubService_         .subscribe(this.subscribeChannel,                     event => ++this.count);     }   }   

Tip

The publish() method has an empty object literal as its second argument. This is the payload for the published message, which isn't used in this recipe. If you want to send data along with a message, this is where it would go.

With all of this, test your application with the following:

[app/root.component.ts]      import {Component} from '@angular/core';      @Component({     selector: 'root',     template: `       <node subscribeChannel="foo"              publishChannel="bar">       </node>       <node subscribeChannel="bar"              publishChannel="foo">       </node>     `   })   export class RootComponent {}   

You will see that channel publishing and subscribing is happening as you would expect.

Unsubscribing from channels

Of course, you want to avoid memory leaks wherever possible. This requires that you explicitly complete the cleanup process when your component instance is destroyed:

[app/node.component.ts]      import {Component, Input, OnDestroy} from '@angular/core';   import {PubSubService} from './publish-subscribe.service';   import {Subscription} from 'rxjs/Subscription';      @Component({     selector: 'node',     template: `       <p>Heard {{count}} of {{subscribeChannel}}</p>       <button (click)="send()">Send {{publishChannel}}</button>     `   })   export class NodeComponent implements OnDestroy {     @Input() publishChannel:string;     @Input() subscribeChannel:string;     count:number = 0;     private pubSubServiceSubscription_:Subscription;        constructor(private pubSubService_:PubSubService) {}        send() {       this.pubSubService_         .publish(this.publishChannel, {});     }        ngAfterViewInit() {       this.pubSubService_         .subscribe(this.subscribeChannel,                     event => ++this.count);     }        ngOnDestroy() {       this.pubSubServiceSubscription_.unsubscribe(); }   }   

How it works...

Each time publish() is invoked, the provided event is wrapped by the provided channel and submitted to a central Subject, which is private inside the service. At the same time, the fact that each invocation of subscribe() wants to listen to a different channel presents a problem. This is because an Observable does not draw distinctions regarding what is being emitted without explicit direction.

You are able to utilize the filter() and map() operators to establish a customized view of the emissions of Subject and use this view in the application of the Observer handler. Each time subscribe() is invoked, it creates a new Observable instance; however, these are all merely points of indirection from the one true Observable, which is owned by the private instance hidden inside the service.

There's more...

It's important to understand why this service is not built in a different way.

An important feature of Observables is their ability to be composed. That is, several Observable instances independently emitting events can be combined into one Observable instance, which will emit all the events from a combined source. This can be accomplished in several different ways, including flatMap() or merge(). This ability is what is being referred to when ReactiveX Observables are described as "composable."

Therefore, a developer might see this composition ability and think it would be suitable for a Publish-Subscribe entity. The entity would accept Observable instances from the publishers. They would be combined to create a single Observable instance, and subscribers would attach Observable to this combination. What could possibly go wrong?

Considerations of an Observable's composition and manipulation

One primary concern is that the composed Observable that the subscribers are being attached to will change constantly. As is the case with map() and filter(), any modulation performed on an Observable instance, including composition, will return a new Observable instance. This new instance would become the Observable that subscribers would attach to, and therein lies the problem.

Let's examine this problem step by step:

  1. PubSub service emits events from Observable A.
  2. Node X subscribes to the service and receives events from Observable A.
  3. Some other part of the application adds Observable B to the PubSub service.
  4. The PubSub service composes Observable A and Observable B into Observable AB.
  5. Node Y subscribes to the service and receives events from Observable AB.

Note that in this case, Node X would still receive events from only Observable A since that is the Observable instance where it invoked subscribe().

Certainly, there are steps that can be taken to mitigate this problem, such as having an additional level of indirection between the subscribe Observable and the composed Observable. However, a wise engineer will step back at this point and take stock of the situation. Publish-Subscribe is supposed to be a relatively "dumb" protocol, meaning that it shouldn't be delegated too much responsibility around managing the events it has been passed with—messages in and messages out, with no real concern for what is contained as long as they get there. One could make a very strong argument that introducing Observables in the Publish side greatly overcomplicates things.

In the case of this recipe, you have developed an elegant and simple version of a Publish-Subscribe module, and it feels right to delegate complexity outside of it. In the case of entities wanting to use Publish with Observables, a solution might be to just pipe the Observable emissions into the service's publish() method.

See also

  • Basic utilization of Observables with HTTP demonstrates the basics of how to use an observable interface
  • Implementing a Publish-Subscribe model using Subjects shows you how to configure input and output for RxJS Observables
  • Creating an Observable authentication service using BehaviorSubjects instructs you on how to reactively manage the state in your application
  • Building a fully featured AutoComplete with Observables gives you a broad tour of some of the utilities offered to you as part of the RxJS library
Назад: Creating an Observable authentication service using BehaviorSubjects
Дальше: Using QueryLists and Observables to follow changes in ViewChildren

thank you
Flame
cant read the code since it is all on a single line. Also this comments section is russian
Rakuneque
DATA COLLECTION AND ANALYSIS Two reviewers extracted data and assessed methodological quality independently lasix torsemide conversion Many others were in that space already