Posts Tagged: sockets

Push Notifications from a Child Process using Node, Angular, and Socket.IO

It seems that every example on the internet involving Node and web sockets is some sort of chat application. Let’s break out of the mold for a bit and do something a little different.

Here’s a breakdown of what this example is going to give you:

  • Create a web socket application without chat (It’s just crazy, I know)
  • Instead of the client initiating updates, let the server push things down (broadcast)
  • Spawn a child process in node to handle background processing
  • Control that background process from any connected client/browser
  • Make the UI all ajaxy, `cuz that’s what the cool kids do

What we need to do it:

  • Working, installed version of Node
  • Express npm install express
  • Jade Template Engine npm install jade
  • Socket.IO npm install socket.io socket.io-client
  • AngularJS

If you’re really impatient and just want the code, Clone it from Github and run node server.js from the root to see the working example.

Node Server

Let’s start by setting up a basic Node server for rendering a page from which we can control everything. We are going to use the Jade view engine, and add a couple helpers for logging static content compression.

var jade = require('jade'),
    express = require('express'),
    app = express(),
        server = require('http').createServer(app);

app.set('views', __dirname + '/views');
app.set('view engine', 'jade');

app.use(express.logger());
app.use(express.compress());
app.use(express.static(__dirname + '/public'));

app.get('/', function(req, res){
    res.render('index');
});

server.listen(process.env.PORT || 4561);

That should get us serving a basic jade template from our ./views/ directory. The basic format of the ./views/index.jade page will look something like this:

!!!5
html
    head
        title Ticker
        link(rel='stylesheet', href='/css/bootstrap.min.css')
        link(rel='stylesheet', href='/css/main.css')

    body
        header
        main
        footer

    script(src='//ajax.googleapis.com/ajax/libs/angularjs/1.0.7/angular.min.js')
    script(src='/socket.io/socket.io.js')
    script(src='/js/main.js')

Even if you’re not familiar with Jade, the syntax is fairly easy to decipher at a glance. Here we are just setting up a basic page with some references to the scripts and stylesheets we are going to use later. We’ll come back to each part later, but this will serve as a good starting point.

Child Process

Node has several simple methods for creating a child process. The most common one is the spawn method of child_process. Spawn is limited, however, in that you can’t communicate with the child process, and because of this you can’t control it.

var child = require('child_process').spawn('module name');

To solve this, Node also provides the fork(module) method, which opens a communication channel between the two node processes. This allows us to send messages back and forth to the child process.

var child = require('child_process').fork('module name');

// listening for messages from the child process
child.on('message', function(message) {
    console.log(message);
});

// sending a message to the child
child.send({ message: 'I am your father!'});

The only events you can receive from the child process are error, exit, disconnect, close, and message. The message event can send a parsed JSON object or primitive value back to the parent process. Since there is only one event coming from the child process, you can respond to multiple events by including a property in your message.

{
    event: 'processing',
    progress: 0.56
}

Sending a message to the child process is done using the send({}) method. In a similar fashion, if you need the child to respond differently to each message, you can include a property to differentiate the operation you want.

{
    op: 'delete',
    id: 12
}

Socket.IO

We really want to have an efficient, as close to real time as we can get solution for communicating to the client. Socket.IO provides both a server and a client side solution that does really well at gracefully degrading depending on what features the client supports.

To integrate it with our Node application, we need very little new code. Caution, the following code will only work with Express 3.

var jade = require('jade'),
    express = require('express'),
    app = express(),
        server = require('http').createServer(app),
    io = require('socket.io').listen(server);

io.sockets.on('connection', function (socket) {
    console.log('Someone connected to me, hooray!');

    // sending a message back to the client
    socket.emit('connected', { message: 'Thanks for connecting!' });

    // listening for messages from the client
    socket.on('message', function(message) {
         console.log(message);
    });
});

On the client side, we include a script pointing to script(src='/socket.io/socket.io.js'). You don’t actually need a physical js file for the client. Socket.IO will listen for connections to that path and produce the js file on the fly. Cool Beans!

WAIT! In order for the client side of Socket.IO to work, you must have npm install socket.io and npm install socket.io-client.

Setting up the connection on the client side is just as easy.

<script src="/socket.io/socket.io.js"></script>
<script>
    var socket = io.connect('http://localhost');

    // listening for the connected event from the server
    socket.on('connected', function(data) {
        console.log('The server said: ' + data.message);
    });

    // sending a message event to the server
    socket.emit('message', { message: 'Hi!' });
</script>

Notice how events can be named anything you want, and the client and server have identical API’s. This really streamlines development of websockets in Node.

If you are just as impressed as I am about how dead simple Socket.IO is, be sure to check out their official docs for more info.

AngularJS

To set up a nice responsive UI on the client side, I’m going to use AngularJS.

To get started with Angular, we need to build our model, controller, and module for the page.

// creating a module in Angular
var app = angular.module('stockStatusModule', []);

// creating a controller for our page
app.controller('updateController', function($scope) {
    $scope.status = 'Angular Loaded';
    $scope.stocks = [
        { name: "Apple", price: 4.50 },
        { name: "Microsoft", price: 4.00 },
        { name: "Facebook", price: 3.75 }
    ];
});

If you are looking through the docs on the Angular website, take note that most of the examples do not use a module. Using a module is the recommended way to use Angular and I think its an unfortunate decision on the doc writers part to exclude it from the documentation even though they make reference to that fact.

With our model in place we can bind it to our view with the following code:

!!!5
html
    head(ng-app="stockStatusModule")
        title Ticker
        link(rel='stylesheet', href='/css/bootstrap.min.css')
        link(rel='stylesheet', href='/css/main.css')

    body
        header
            h1 {{status}}
        main(ng-controller='updateController')
            table
                tr(ng-repeat='s in stocks')
                    td {{s.name}}
                    td {{s.price | currency}}
        footer

    script(src='//ajax.googleapis.com/ajax/libs/angularjs/1.0.7/angular.min.js')
    script(src='/socket.io/socket.io.js')
    script(src='/js/main.js')

Above, we tell Angular we want our status bound to the <h1> element text and we also want our array of stocks to render out as a table. We are also going to take advantage of the currency filter in Angular, that will render our price as formatted currency.

So far, this is all great, but now we need to connect all the pieces together so we get something more dynamic.

Two kids at a playground

To get Angular and Socket.IO to play nice, we can make use of the dependency injection in Angular and inject our Socket.IO code into our page controller.

// adding an additional parameter for the socket
app.controller('updateController', function ($scope, socket) { ... }

// using the Angular factory to inject
app.factory('socket', function ($rootScope) {
    var socket = io.connect();
    return {
        on: function (eventName, callback) {
            socket.on(eventName, function () {
                var args = arguments;
                $rootScope.$apply(function () {
                    callback.apply(socket, args);
                });
            });
        },
        emit: function (eventName, data, callback) {
            socket.emit(eventName, data, function () {
                var args = arguments;
                $rootScope.$apply(function () {
                    if (callback) {
                        callback.apply(socket, args);
                    }
                });
            });
        }
    };
});

Why don’t we just use a global instance of Socket.IO? No. Yep that’s my final answer.

Above you can see we are creating a wrapper for Socket.IO and letting our module inject that into the constructor of our page controller. Inside our controller, we can now use the socket just like we were before.

app.controller('updateController', function ($scope, socket) {
    $scope.status = 'AngularJS successfully loaded.';
    $scope.stocks = [
        { name: "Apple", price: 4.50 },
        { name: "Microsoft", price: 4.00 },
        { name: "Facebook", price: 3.75 }
    ];
    socket.on('status', function (data) {
        $scope.status = data.message;
    });
    socket.on('update', function (data) {
        $scope.stocks = data.stocks;
    });
});

Putting it all together

In our Node application, we are going to add some more code to handle different messages and events from both the client and the child process.

app.set('views', __dirname + '/views');
app.set('view engine', 'jade');

app.use(express.logger());
app.use(express.compress());
app.use(express.static(__dirname + '/public'));

app.get('/', function(req, res){
    res.render('index');
});

server.listen(process.env.PORT || 4561);

var cp = require('child_process').fork('ticker');
cp.on('message', function (message) {
    io.sockets.emit('update', message);
});

io.sockets.on('connection', function (socket) {
    socket.emit('status', { message: "EHLO OK Connected" });
    socket.on('start', function (data) {
        cp.send({ op: 'start' });
        socket.emit('status', { message: "Processing" });
    });
    socket.on('stop', function (data) {
        cp.send({ op: 'stop' });
        socket.emit('status', { message: "Stopped" });
    });
    socket.on('buy', function (data) {
        cp.send({ op: 'buy', name: data });
    });
});

We are doing several things in the above code:

  • Starting a child process from a module named ticker
  • Listening to the message event from the child process
    • When the event occurs, broadcast that message to all connected clients
  • Listening for web socket connections from clients
    • When a client connects, emit a status message to that client EHLO OK Connected
    • Listen for a start event from the client
      • When the event occurs, send a message to the child process with an opcode of start
      • Emit a message to the client that processing has started
    • Listen for a stop event from the client
      • When the event occurs, send a message to the child process with an opcode of stop
      • Emit a message to the client that processing has stopped
    • Listen for a buy event from the client
      • When the event occurs, send a message to the child process with an opcode of buy, and the name of the stock that was purchased

We also need to adapt our Angular controller and view to work with our new events.

app.controller('updateController', function ($scope, socket) {
    $scope.status = 'AngularJS successfully loaded.';
    $scope.stocks = [
        { name: "Apple", price: 4.50 },
        { name: "Microsoft", price: 4.00 },
        { name: "Facebook", price: 3.75 }
    ];
    $scope.start = function () {
        socket.emit('start');
    };
    $scope.stop = function () {
        socket.emit('stop');
    };
    $scope.buy = function (element) {
        socket.emit('buy', element.name);
    };
    socket.on('status', function (data) {
        $scope.status = data.message;
    });
    socket.on('update', function (data) {
        $scope.stocks = data.stocks;
    });
});

We added some new methods to our controller scope for starting and stopping the process, and buying stock. We need to create the corresponding elements in the view to use these. We can attach the click events of buttons to the scope using the ng-click property of Angular on our plain jane HTML buttons.

!!!5
html
    head(ng-app="stockStatusModule")
        title Ticker
        link(rel='stylesheet', href='/css/bootstrap.min.css')
        link(rel='stylesheet', href='/css/main.css')

    body
        header
            h1 {{status}}
        main(ng-controller='updateController')
            div
                button(type='button', ng-click='start()') Start Decay
                button(type='button', ng-click='stop()') Stop Decay
            table
                tr(ng-repeat='s in stocks')
                    td {{s.name}}
                    td {{s.price | currency}}
                    td
                        button(type='button', ng-click='buy(s)') Buy
        footer

    script(src='//ajax.googleapis.com/ajax/libs/angularjs/1.0.7/angular.min.js')
    script(src='/socket.io/socket.io.js')
    script(src='/js/main.js')

From the client side, clicking the start or stop buttons will relay the message through a websocket to the Node application, which is listening for that event. The Node application will then pass that onto the child process for further handling. We also have a buy button that passes the current object in the array to the buy method on the controller scope. The Node application is listening for the buy event and expecting the name of the stock to be sent with it.

What about the ticker module code?

Up to this point, we haven’t discussed anything about the actual code inside the child process. A child process in node is just a plain javascript file that gets loaded into the child-process when you fork it. If you are looking through the documentation, its easy to confuse this with a normal node module, but be sure you aren’t trying to use it that way; exports will not work as expected.

The child process retains a reference to its parent and can send messages in the same way the parent does.

// sending a message to the parent process
process.send({ message: 'Hi!' });

// listening for messages from the parent process
process.on('message', function(message) {
    console.log(message);
});

Note that in the above example, process is injected by node into the module; we don’t need to define anything to use it.

The ticker is going to simulate stock market prices that decay exponentially over time and grow when purchased. This means that once our child process is started, it will continually run in the background until stopped by the client. This could be useful for many things including background processing of large files, real time game loops, and many other applications where you need to avoid holding up the main thread.

function Ticker(opt) {
    this._decay = opt.decay;
    this._growth = opt.growth;
    this._stocks = opt.stocks;
    this._state = null;
};
Ticker.prototype.execute = function () {
    var self = this;
    this._stocks.forEach(function (p, i) {
        p.price -= p.price / self._decay;
    });
    process.send({ stocks: this._stocks });

    this._state = setTimeout(function () {
        self.execute();
    }, 1000);
};
Ticker.prototype.stop = function () {
    clearTimeout(this._state);
    this._state = null;
};
Ticker.prototype.buy = function (name) {
    var self = this;
    this._stocks.forEach(function (p, i) {
        if(p.name === name)
            p.price += p.price / self._growth;
    });
};

var ticker = new Ticker({
    growth: 10.0,
    decay: 1800.0,
    stocks: [
        { name: "Apple", price: 14.50 },
        { name: "Microsoft", price: 43.00 },
        { name: "Facebook", price: 37.75 }
    ]
});
process.on('message', function (msg) {
    if (msg.op === 'start') {
        ticker.execute();
    }
    else if (msg.op === 'stop') {
        ticker.stop();
    }
    else if (msg.op === 'buy') {
        ticker.buy(msg.name);
    }
});

The ticker listens for messages from the parent process and depending on the op code it receives calls a different method. The execute method just decays the prices at a specified rate, then sets a timeout to repeat that every second until the stop method is called and ends the loop.

Example

Clone the example code from Github and run node server.js from the root to see the working example. To really see where this benefits, open up 3 or 4 browser windows at once to see how everything stays in near-perfect sync.

Special thanks to Travis and Kendall for help with various bits and pieces.