Let’s go back to our ProjectService class and add some logic to the constructor function in order to emit project data using a ReplaySubject class.

We will start off with some member field initialization, for which we’re going to need to implement the logic:

this.dataProvider = dataProvider;
this.projects = [];
// We're exposing a replay subject that will emit events whenever 
// the projects list change
this.change = new ReplaySubject(1);

Note that we created a new ReplaySubject class with a buffer length of one, and assign it to the member filed with the name change.

We also assign the DataProvider service, which was previously injected in the constructor parameters, to the dataProvider member field.

Now, it’s time to make use of the DataProvider service in order to subscribe for any changes within our data store. This establishes a reactive connection to our data that is stored in PouchDB:

// Setting up our functional reactive subscription to receive
// project changes from the database
this.projectsSubscription = this.dataProvider.getLiveChanges()
  // First convert the change records to actual documents
  .map((change) => change.doc)
  // Filter so that we only receive project documents
  .filter((document) => document.type === 'project')
  // Finally we subscribe to the change observer and deal with 
  // project changes in the function parameter
  .subscribe((changedProject) => {
    this.projects = this.projects.slice();
    // On every project change we need to update our projects list 
    const projectIndex = this.projects.findIndex(
      (project) => project._id === changedProject._id
    );
    if (projectIndex === -1) {
      this.projects.push(changedProject);
    } else {
      this.projects.splice(projectIndex, 1, changedProject);
    }
    // Emit an event on our replay subject
    this.change.next(this.projects);
  });

The observable that is returned by the getLiveChanges() function emits data in our data store as changes. In addition to this, this will also emit any future changes that are applied to our store after we’ve received the initial data. You can imagine a persistent connection to the database, and whenever a document is updated in the database, our observer will receive this change as a value.

Observables provide a large amount of so-called operators that allow you to transform the data stream that originated at the observable. You might already know about some of these functional operators from the ECMAScript 5 array extra functions, such as map and filter. Using operators, you can model a whole transformation flow until you finally subscribe to the data.

As we receive changed objects from the data store, we first need to transform them into document objects. This is fairly easy because each change object contains a doc property that actually holds the whole data of the changed document for which we have received an update. Using the map function, we can transform the changed objects into project objects before we return them back into the data flow:

.map((change) => change.doc)

DataProvider will provide us with data for all the documents in the store. As we are only interested in project data at the moment, we also apply a filter that filters out all the documents that are not of the project type:

.filter((document) => document.type === 'project')

Finally, we can subscribe to the transformed stream, as follows:

.subscribe((changedProject) => {})

The subscribe operator is where we terminate our observation path. This is like an endpoint at which we sit and observe. Inside our subscribe function, we listen for project document updates and incorporate them into the projects member property of our App component. This includes not only adding the new projects that were added to the document store, but it also includes updating existing projects. Each project contains a unique identifier that can be accessed by the _id property. This allows us to find the right action easily.

After updating our actual view of the projects and storing the list of projects in the projects member field, we can emit the updated list using our ReplaySubject class:

this.change.next(this.projects);

Our ProjectService class is now ready to be used, and applications components that need to obtain project data can subscribe to the exposed ReplaySubject class in order to react on these data changes.

Let’s refactor our App component in lib/app.js and get rid of the fake TaskListService that we’ve used so far:

import {Component, ViewEncapsulation, Inject} from '@angular/core';
import {ProjectService} from './project/project-service/project-service';
import template from './app.html!text';

@Component({
  selector: 'ngc-app',
  template,
  encapsulation: ViewEncapsulation.None,
  providers: [ProjectService]
})
export class App {
  constructor(@Inject(ProjectService) projectService) {
    this.projectService = projectService;
    this.projects = [];

    // Setting up our functional reactive subscription to receive 
    // project changes
    this.projectsSubscription = projectService.change
      // We subscribe to the change observer of our service
      .subscribe((projects) => {
        this.projects = projects;
      });
  }

  // If this component gets destroyed, we need to remember to 
  // clean up the project subscription
  ngOnDestroy() {
    this.projectsSubscription.unsubscribe();
  }
}

In our App component, we now obtain a list of projects using the changed observable on our ProjectService class. Using a reactive subscription to ReplaySubject on the service, we make sure that our projects list that is stored in the App component will always be updated after any changes.

We use the OnDestroy lifecycle hook to unsubscribe from the ProjectService change observable. This is a necessary manual step if you like to do proper housekeeping in your application. Depending on the source, forgetting to unsubscribe could lead to memory leaks.

With the preceding code, we already established the base for a reactive data architecture. We observe our data store for changes and our user interface will react on them:

Reactive programming with observable data structures

This figure shows the reactive data flow end-to-end from our data-store into the view