Let’s go back to our ProjectService
class and add some logic to the constructor
function in order to emit project data using a ReplaySubject
class.
We will start off with some member field initialization, for which we’re going to need to implement the logic:
this.dataProvider = dataProvider; this.projects = []; // We're exposing a replay subject that will emit events whenever // the projects list change this.change = new ReplaySubject(1);
Note that we created a new ReplaySubject
class with a buffer length of one, and assign it to the member filed with the name change
.
We also assign the DataProvider
service, which was previously injected in the constructor parameters, to the dataProvider
member field.
Now, it’s time to make use of the DataProvider
service in order to subscribe for any changes within our data store. This establishes a reactive connection to our data that is stored in PouchDB:
// Setting up our functional reactive subscription to receive // project changes from the database this.projectsSubscription = this.dataProvider.getLiveChanges() // First convert the change records to actual documents .map((change) => change.doc) // Filter so that we only receive project documents .filter((document) => document.type === 'project') // Finally we subscribe to the change observer and deal with // project changes in the function parameter .subscribe((changedProject) => { this.projects = this.projects.slice(); // On every project change we need to update our projects list const projectIndex = this.projects.findIndex( (project) => project._id === changedProject._id ); if (projectIndex === -1) { this.projects.push(changedProject); } else { this.projects.splice(projectIndex, 1, changedProject); } // Emit an event on our replay subject this.change.next(this.projects); });
The observable that is returned by the getLiveChanges()
function emits data in our data store as changes. In addition to this, this will also emit any future changes that are applied to our store after we’ve received the initial data. You can imagine a persistent connection to the database, and whenever a document is updated in the database, our observer will receive this change as a value.
Observables provide a large amount of so-called operators that allow you to transform the data stream that originated at the observable. You might already know about some of these functional operators from the ECMAScript 5 array extra functions, such as map
and filter
. Using operators, you can model a whole transformation flow until you finally subscribe to the data.
As we receive changed objects from the data store, we first need to transform them into document objects. This is fairly easy because each change object contains a doc
property that actually holds the whole data of the changed document for which we have received an update. Using the map
function, we can transform the changed objects into project objects before we return them back into the data flow:
.map((change) => change.doc)
DataProvider
will provide us with data for all the documents in the store. As we are only interested in project data at the moment, we also apply a filter that filters out all the documents that are not of the project
type:
.filter((document) => document.type === 'project')
Finally, we can subscribe to the transformed stream, as follows:
.subscribe((changedProject) => {})
The subscribe
operator is where we terminate our observation path. This is like an endpoint at which we sit and observe. Inside our subscribe
function, we listen for project document updates and incorporate them into the projects
member property of our App
component. This includes not only adding the new projects that were added to the document store, but it also includes updating existing projects. Each project contains a unique identifier that can be accessed by the _id
property. This allows us to find the right action easily.
After updating our actual view of the projects and storing the list of projects in the projects
member field, we can emit the updated list using our ReplaySubject
class:
this.change.next(this.projects);
Our ProjectService
class is now ready to be used, and applications components that need to obtain project data can subscribe to the exposed ReplaySubject
class in order to react on these data changes.
Let’s refactor our App
component in lib/app.js
and get rid of the fake TaskListService
that we’ve used so far:
import {Component, ViewEncapsulation, Inject} from '@angular/core'; import {ProjectService} from './project/project-service/project-service'; import template from './app.html!text'; @Component({ selector: 'ngc-app', template, encapsulation: ViewEncapsulation.None, providers: [ProjectService] }) export class App { constructor(@Inject(ProjectService) projectService) { this.projectService = projectService; this.projects = []; // Setting up our functional reactive subscription to receive // project changes this.projectsSubscription = projectService.change // We subscribe to the change observer of our service .subscribe((projects) => { this.projects = projects; }); } // If this component gets destroyed, we need to remember to // clean up the project subscription ngOnDestroy() { this.projectsSubscription.unsubscribe(); } }
In our App
component, we now obtain a list of projects using the changed observable on our ProjectService
class. Using a reactive subscription to ReplaySubject
on the service, we make sure that our projects list that is stored in the App
component will always be updated after any changes.
We use the OnDestroy
lifecycle hook to unsubscribe from the ProjectService
change observable. This is a necessary manual step if you like to do proper housekeeping in your application. Depending on the source, forgetting to unsubscribe could lead to memory leaks.
With the preceding code, we already established the base for a reactive data architecture. We observe our data store for changes and our user interface will react on them:
This figure shows the reactive data flow end-to-end from our data-store into the view