Angular 2 async pipe not rendering/updating Observable data automatically

Issue

I am facing an issue with the Angular2 router and an async pipe.

I am trying to render an RxJs Observable and the data does not render automatically.

One has to click on the link for the route for the data to render.

Here is the root app:

import {bootstrap}    from 'angular2/platform/browser';
import {HTTP_PROVIDERS} from 'angular2/http';
import {ROUTER_PROVIDERS} from 'angular2/router';
import {AppComponent} from './app.component.ts';

bootstrap(AppComponent, [HTTP_PROVIDERS, ROUTER_PROVIDERS]);

Here is the root component:

import {Component} from 'angular2/core';
import {RouteConfig, ROUTER_DIRECTIVES} from 'angular2/router';
import {FirstComponent} from './app.first-component.ts';
import {SecondComponent} from './app.second-component.ts';
import {AppService} from "./app.services.ts";


@Component({
    selector: 'my-app',
    providers: [AppService, FirstComponent, SecondComponent],
    directives: [FirstComponent, SecondComponent, ROUTER_DIRECTIVES],
    template: `<h1>An Angular 2 App</h1>
               <a [routerLink]="['First']">first-default</a> 
               <a [routerLink]="['Second']">second</a> 
               <router-outlet></router-outlet>`
})
@RouteConfig([
    {path: '/', name: 'First', component: FirstComponent, useAsDefault: true},
    {path: '/second', name: 'Second', component: SecondComponent}
])
export class AppComponent {
}

Here is First component:

import {Component} from "angular2/core";
import {AppService} from "./app.services.ts";
import "rxjs/Rx";


@Component({
    selector: 'my-first',
    template: `
<div>
    <ul>
        <li *ngFor="#s of appService.someObservable$ | async">
           a string: {{ s }}
        </li>
    </ul>
 </div>`
})
export class FirstComponent {

    constructor(private appService:AppService) {
        console.log('constructor', 'first');
    }
}

and finally the service (where the data resides):

import {Injectable} from "angular2/core";
import {Observable} from "rxjs/Rx";

@Injectable()
export class AppService {

    constructor() {
        console.log('constructor', 'appService');
        this.constructSomeObservable();
    }

    someObservable$:Observable <string[]>;

    constructSomeObservable() {
        this.someObservable$ = Observable.create(observer => {
                const eventSource = new EventSource('/interval-sse-observable');
                eventSource.onmessage = x => observer.next(JSON.parse(x.data));
                eventSource.onerror = x => observer.error(console.log('EventSource failed'));
                return () => {
                    eventSource.close();
                };
            })
            .startWith([])
            .scan((acc, value) => acc.concat(value));
    }
}

What I am getting wrong with the router or the pipe?

See sample project on github here.

edit: Here is the modified version of the component:

import {Component} from "angular2/core";
import {AppService} from "./app.services.ts";
import {Observable} from "rxjs/Rx";


@Component({
    selector: 'my-first',
    template: `
<div>
    <ul>
        <li *ngFor="#s of someObservable$ | async">
           a string: {{ s }}
        </li>
    </ul>
 </div>`
})
export class FirstComponent {

    someObservable$:Observable <string[]>;

    constructor(private appService:AppService) {
        console.log('constructor', 'first');
        this.someObservable$ = appService.someObservable$;
    }
}

The data is not updated in the template. Is it to do with two/one way binding?

Solution

I think angular zone doesn’t patch events emitted from eventSource.onmessage unlike e.g. setTimeout, SetInterval or xhr request

From angular2-polyfills.js

/***/ function(module, exports, __webpack_require__) {

    /* WEBPACK VAR INJECTION */(function(global) {"use strict";
    __webpack_require__(1);
    var event_target_1 = __webpack_require__(2);
    var define_property_1 = __webpack_require__(4);
    var register_element_1 = __webpack_require__(5);
    var property_descriptor_1 = __webpack_require__(6);
    var utils_1 = __webpack_require__(3);
    var set = 'set';
    var clear = 'clear';
    var blockingMethods = ['alert', 'prompt', 'confirm'];
    var _global = typeof window == 'undefined' ? global : window;
    patchTimer(_global, set, clear, 'Timeout');
    patchTimer(_global, set, clear, 'Interval');
    patchTimer(_global, set, clear, 'Immediate');
    patchTimer(_global, 'request', 'cancelMacroTask', 'AnimationFrame');
    patchTimer(_global, 'mozRequest', 'mozCancel', 'AnimationFrame');
    patchTimer(_global, 'webkitRequest', 'webkitCancel', 'AnimationFrame');
    for (var i = 0; i < blockingMethods.length; i++) {
        var name = blockingMethods[i];
        utils_1.patchMethod(_global, name, function (delegate, symbol, name) {
            return function (s, args) {
                return Zone.current.run(delegate, _global, args, name);
            };
        });
    }
    event_target_1.eventTargetPatch(_global);
    property_descriptor_1.propertyDescriptorPatch(_global);
    utils_1.patchClass('MutationObserver');
    utils_1.patchClass('WebKitMutationObserver');
    utils_1.patchClass('FileReader');
    define_property_1.propertyPatch();
    register_element_1.registerElementPatch(_global);
    // Treat XMLHTTPRequest as a macrotask.
    patchXHR(_global);
    var XHR_TASK = utils_1.zoneSymbol('xhrTask');
    function patchXHR(window) {
        function findPendingTask(target) {
            var pendingTask = target[XHR_TASK];
            return pendingTask;
        }

Therefore you need to wrap your callback for eventsource.onmessage something like:

app.services.ts

import {Injectable, NgZone} from "angular2/core"; // <=== 1) Don't forget to import the NgZone class
import {Observable} from "rxjs/Rx";

@Injectable()
export class AppService {

  constructor(private zone: NgZone) { // <== 2) Don't forget also to inject zone in constructor
    console.log('constructor', 'appService');
    this.constructSomeObservable();
  }

  someObservable$: Observable<string[]>;

  constructSomeObservable() {
    this.someObservable$ = Observable.create(observer => {
      const eventSource = new EventSource('/interval-sse-observable');
      eventSource.onmessage = x => this.zone.run(() => observer.next(JSON.parse(x.data))); // <=== 3) Wrap onmessage event
      eventSource.onerror = x => observer.error(console.log('EventSource failed'));
      return () => {
        eventSource.close();
      };
    })
      .startWith([])
      .scan((acc, value) => acc.concat(value));
  }
}

Answered By – yurzui

Answer Checked By – Jay B. (AngularFixing Admin)

Leave a Reply

Your email address will not be published.