In Angular 1, the $emit
and $broadcast
behaviors were indeed very useful tools. They gave you the ability to send custom events upwards and downwards through the scope tree to any listeners that might be waiting for such an event. This pushed the developer towards a very useful pattern: the ability for many components to be able to transmit events to and from a central source. However, using $emit
and $broadcast
for such a purpose was grossly inappropriate; they had the effect of feeding the event through huge numbers of scopes only to reach the single intended target.
In the previous edition of this book, the corresponding recipe demonstrated how to build a Publish-Subscribe service that used the $emit
and $rootScope
injection. The version in this recipe, although different in a handful of ways, achieves similar results in a substantially cleaner and more elegant fashion.
It is preferable to create a single entity that can serve as a generic throughway for events to pass from publishers to their subscribers.
The code, links, and a live example of this are available at .
Begin with a skeleton service injected into a component:
[app/node.component.ts] import {Component, Input} from '@angular/core'; import {PubSubService} from './publish-subscribe.service'; @Component({ selector: 'node', template: ` <p>Heard {{count}} of {{subscribeChannel}}</p> <button (click)="send()">Send {{publishChannel}}</button> ` }) export class NodeComponent { @Input() publishChannel:string; @Input() subscribeChannel:string; count:number = 0; constructor(private pubSubService_:PubSubService) {} send() {} } [app/publish-subscribe.service.ts] import {Injectable} from '@angular/core'; @Injectable() export class PubSubService { constructor() {} publish() {} subscribe() {} }
The groundwork for this implementation should be pretty obvious. The service is going to host a single Subject
instance that is going to funnel events of any type into the service and out through the observers of the Subject
.
First, implement the following so that subscribe()
and publish()
actually do work when you involve the Subject
instance:
[app/publish-subscribe.service.ts] import {Injectable} from '@angular/core'; import {Subject} from 'rxjs/Subject'; import {Observable} from 'rxjs/Observable'; import {Observer} from 'rxjs/Observer'; import {Subscriber} from 'rxjs/Subscriber; @Injectable() export class PubSubService { private publishSubscribeSubject_:Subject<any> = new Subject(); emitter_:Observable<any>; constructor() { this.emitter_ = this.publishSubscribeSubject_.asObservable(); } publish(event:any):void { this.publishSubscribeSubject_.next(event); } subscribe(handler:NextObserver<any>):Subscriber { return this.emitter_.subscribe(handler); } }
This is terrific for an initial implementation, but yields a problem: every event published to this service will be broadcasted to all the subscribers.
It is possible and in fact quite easy to restrict publish and subscribe in such a way that they will only pay attention to the channel they specify. First, modify publish()
to nest the event inside the emitted object:
[app/publish-subscribe.service.ts] import {Injectable} from '@angular/core'; import {Subject} from 'rxjs/Subject'; import {Observable} from 'rxjs/Observable'; import {Observer} from 'rxjs/Observer'; import {Subscriber} from 'rxjs/Subscriber; @Injectable() export class PubSubService { private publishSubscribeSubject_:Subject<any> = new Subject(); emitter_:Observable<any>; constructor() { this.emitter_ = this.publishSubscribeSubject_.asObservable(); } publish(channel:string, event:any):void { this.publishSubscribeSubject_.next({ channel: channel, event: event }); } subscribe(handler:NextObserver<any>):Subscriber { return this.emitter_.subscribe(handler); } }
With this, you are now able to utilize some Observable
behavior to restrict which events the subscription is paying attention to.
Observable
emissions can have filter()
and map()
applied to them. filter()
will return a new Observable
instance that only emits whichever emissions evaluate as true in its filter
function. map()
returns a new Observable
instance that transforms all emissions into a new value.
[app/publish-subscribe.service.ts] import {Injectable} from '@angular/core'; import {Subject} from 'rxjs/Subject'; import {Observable} from 'rxjs/Observable'; import {Observer} from 'rxjs/Observer'; import {Subscriber} from 'rxjs/Subscriber; import 'rxjs/add/operator/filter'; import 'rxjs/add/operator/map'; @Injectable() export class PubSubService { private publishSubscribeSubject_:Subject<any> = new Subject(); emitter_:Observable<any>; constructor() { this.emitter_ = this.publishSubscribeSubject_.asObservable(); } publish(channel:string, event:any):void { this.publishSubscribeSubject_.next({ channel: channel, event: event }); } subscribe(channel:string, handler:((value:any) => void)):Subscriber { return this.emitter_ .filter(emission => emission.channel === channel) .map(emission => emission.event) .subscribe(handler); } }
The service is complete, but the component doesn't yet have the ability to use it. Use the injected service to link the component to the channels specified by its input strings:
[app/node.component.ts] import {Component, Input} from '@angular/core'; import {PubSubService} from './publish-subscribe.service'; @Component({ selector: 'node', template: ` <p>Heard {{count}} of {{subscribeChannel}}</p> <button (click)="send()">Send {{publishChannel}}</button> ` }) export class NodeComponent { @Input() publishChannel:string; @Input() subscribeChannel:string; count:number = 0; constructor(private pubSubService_:PubSubService) {} send() { this.pubSubService_ .publish(this.publishChannel, {}); } ngAfterViewInit() { this.pubSubService_ .subscribe(this.subscribeChannel, event => ++this.count); } }
The publish()
method has an empty object literal as its second argument. This is the payload for the published message, which isn't used in this recipe. If you want to send data along with a message, this is where it would go.
With all of this, test your application with the following:
[app/root.component.ts] import {Component} from '@angular/core'; @Component({ selector: 'root', template: ` <node subscribeChannel="foo" publishChannel="bar"> </node> <node subscribeChannel="bar" publishChannel="foo"> </node> ` }) export class RootComponent {}
You will see that channel publishing and subscribing is happening as you would expect.
Of course, you want to avoid memory leaks wherever possible. This requires that you explicitly complete the cleanup process when your component instance is destroyed:
[app/node.component.ts] import {Component, Input, OnDestroy} from '@angular/core'; import {PubSubService} from './publish-subscribe.service'; import {Subscription} from 'rxjs/Subscription'; @Component({ selector: 'node', template: ` <p>Heard {{count}} of {{subscribeChannel}}</p> <button (click)="send()">Send {{publishChannel}}</button> ` }) export class NodeComponent implements OnDestroy { @Input() publishChannel:string; @Input() subscribeChannel:string; count:number = 0; private pubSubServiceSubscription_:Subscription; constructor(private pubSubService_:PubSubService) {} send() { this.pubSubService_ .publish(this.publishChannel, {}); } ngAfterViewInit() { this.pubSubService_ .subscribe(this.subscribeChannel, event => ++this.count); } ngOnDestroy() { this.pubSubServiceSubscription_.unsubscribe(); } }
Each time publish()
is invoked, the provided event is wrapped by the provided channel and submitted to a central Subject
, which is private inside the service. At the same time, the fact that each invocation of subscribe()
wants to listen to a different channel presents a problem. This is because an Observable
does not draw distinctions regarding what is being emitted without explicit direction.
You are able to utilize the filter()
and map()
operators to establish a customized view of the emissions of Subject
and use this view in the application of the Observer
handler. Each time subscribe()
is invoked, it creates a new Observable
instance; however, these are all merely points of indirection from the one true Observable
, which is owned by the private instance hidden inside the service.
It's important to understand why this service is not built in a different way.
An important feature of Observables
is their ability to be composed. That is, several Observable
instances independently emitting events can be combined into one Observable
instance, which will emit all the events from a combined source. This can be accomplished in several different ways, including flatMap()
or merge()
. This ability is what is being referred to when ReactiveX Observables are described as "composable."
Therefore, a developer might see this composition ability and think it would be suitable for a Publish-Subscribe entity. The entity would accept Observable
instances from the publishers. They would be combined to create a single Observable
instance, and subscribers would attach Observable
to this combination. What could possibly go wrong?
One primary concern is that the composed Observable
that the subscribers are being attached to will change constantly. As is the case with map()
and filter()
, any modulation performed on an Observable
instance, including composition, will return a new Observable
instance. This new instance would become the Observable
that subscribers would attach to, and therein lies the problem.
Let's examine this problem step by step:
Note that in this case, Node X would still receive events from only Observable A since that is the Observable instance where it invoked subscribe()
.
Certainly, there are steps that can be taken to mitigate this problem, such as having an additional level of indirection between the subscribe Observable
and the composed Observable
. However, a wise engineer will step back at this point and take stock of the situation. Publish-Subscribe is supposed to be a relatively "dumb" protocol, meaning that it shouldn't be delegated too much responsibility around managing the events it has been passed with—messages in and messages out, with no real concern for what is contained as long as they get there. One could make a very strong argument that introducing Observables
in the Publish side greatly overcomplicates things.
In the case of this recipe, you have developed an elegant and simple version of a Publish-Subscribe module, and it feels right to delegate complexity outside of it. In the case of entities wanting to use Publish with Observables
, a solution might be to just pipe the Observable
emissions into the service's publish()
method.