Gluing Node libraries together with Event Emitters

The art of using Event Emitters for cleaner code

The Inferno

Picture a landscape of Dante's Inferno and you can begin to imagine what it is to traverse the some of the code created in Node.js projects. With each callback, you begin to suspect that you're descening ever deeper into hell.

Because Node.js, and its powerful counterpart npm, have so many useful libraries, many Node application become an exercise in creating "glue code".

For instance, one day you may find yourself taking tweets from Twitter and dumping them into MySQL. Then you might sort the most popular and save those results into Redis. Your app may also happen to listen for HTTP requests and respond to them by pulling the popular tweets from Redis. Perhaps you also listen for a socket connection from browsers and relay this information in "real time" to human users.

This complexity is an unfortunate side effect of having so many great libraries. Callback hell is real.

Salvation

How do we escape hell? One way is to study the effective use of Events, and of course a little bit of code polish.

Let's consider a little bit of the above nightmare application - Simply connecting to Twitter's API and dumping tweets into MySQL. We'll use two (ficticious…ish) libraries, a Twitter client and a MySQL client.

var Mysql = require('mysqlclient'),
    Twitter = require('twitterclient'),
    twit = new Twitter({
        consumer_key: 'some_key',
        consumer_secret: 'some_secret',
        access_token_key: 'some_token_key',
        access_token_secret: 'some_token_secret'
    });

var MysqlConfig = {
        host: 'localhost',
        user: 'some_user',
        pass: 'some_pass',
        database: 'some_database'
    },
    MysqlConnection = Mysql.connection(MysqlConfig);

MysqlConnection.on('error', function(err) {
    if( err.mssg === 'connection_error' )
    {
        // Reconnect
        MysqlConnection = Mysql.connection(MysqlConfig);
    }
});

twit.stream('status/filter', function(stream) {

    stream.on('data', function(data) {
        var tweet = data.tweet.escaped;
        MysqlConnection.query('INSERT INTO tweets SET tweet = "'.tweet.'"', function(err, result) {
            if( err )
            {
                // Likely a query error
                console.log(err);
            }
            // Success, wait for next tweet
        });
    });

    stream.on('end', function(response) {
        // Uh oh, a disconnect - reconnect
        twit.stream('status/filter', function(stream) {
            var tweet = data.tweet.escaped;
            MysqlConnection.query('INSERT INTO tweets SET tweet = "'.tweet.'"', function(err, result) {
                if( err )
                {
                    // Likely a query error
                    console.log(err);
                }
                // Success, wait for next tweet
            });
        });
    });

    stream.on('destroy', function(response) {
        // A Silent Disconnect - reconnect
        twit.stream('status/filter', function(stream) {
            var tweet = data.tweet.escaped;
            MysqlConnection.query('INSERT INTO tweets SET tweet = "'.tweet.'"', function(err, result) {
                if( err )
                {
                    // Likely a query error
                    console.log(err);
                }
                // Success, wait for next tweet
            });
        });
    });

});

Here are some of the issues with the above code:

  1. Configuration is hard-coded
  2. While we're still using some events, there are a lot of callbacks. And callbacks within those callbacks. For instance, reconnecting and saving tweets if we get disconnected. Whoa.
  3. Not all possible situations are handled - for instance, errors aren't handled gracefully. Some errors aren't handled at all (SQL query issues, MySQL disconnect issues, other).
  4. We're coding like it's 1999 - all of our code is in one long javascript file.

The asynchronous behaviour of Node certainly is a departure from our usual "one web request means one run-through of our program" paradigm. Because code isn't necessary fired in the same order it appears in code, we need to find a way to handle timing, especially when errors occur.

Using Events can help that. Here's a strategy I employ:

  1. Encapsulate and abtract away business logic for each library in our own smaller modules.
  2. Use our main script to implement our new modules and tie their emitted events together.

Let's see what that means for our above example. First, let's work on encapsulating our business logic into our own modules. For each library used, I wrap its functionality into a module of my own.

Encapsulation

Where before our files appeared like this:

someapp
    app.js
    node_modules
        twitterclient
        mysqlclient

I add a libraries directly for my own application. The directly structure might now look like this:

someapp
    app.js
    config.js
    lib
        twitter
            index.js
        mysql
            index.js
    node_modules
        twitterclient
        mysqlclient

Let's review the lib/twitter/index.js file:

// Include twitter AND EventEmitter
var Twitter = require('twitterclient'),
    EventEmitter = require('events').EventEmitter;

// Do some tracking for reconnect attempts
var reconnectAttempts = 0,
    currentlyStreaming = false;

// Pass in a config object
function Twitter(config)
{

    if (!(this instanceof Twitter)) return new Twitter(config);

    EventEmitter.call(this);

    this.config = config;
    this.twit = new Twitter({
        consumer_key: this.config.twitter.consumer_key,
        consumer_secret: this.config.twitter.consumer_secret,
        access_token_key: this.config.twitter.access_token_key,
        access_token_secret: this.config.twitter.access_token_secret,
    });

}

Twitter.prototype = Object.create(EventEmitter.prototype);

Twitter.prototype.stream = function()
{

    var _this = this;

    if( currentlyStreaming === true )
    {
        // Don't want to ever be streaming in parallel
        return;
    }

    _this.twit.stream('statuses/filter', function(stream)
    {   
        currentlyStreaming = true;

        // Handle receiving tweet
        stream.on('data', function (data)
        {
            // Don't process tweet - just send it along
            // in an event
            _this.emit('tweet', data);
        });

        // Handle disconnect
        stream.on('end', function (response)
        {   
            currentlyStreaming = false;

            reconnectAttempts ++;

            if( reconnectAttempts >= _this.config.twitter.reconnect_attempts )
            {
                return _this.emit('error', new Error('@end: Too many reconnection attempts'));
            }

            // We can just re-call _this.stream to re-connect!
            _this.stream(); // Re-connect to stream
        });

        // Handle silent disconnect
        stream.on('destroy', function (response)
        {   
            currentlyStreaming = false;

            reconnectAttempts ++;

            if( reconnectAttempts >= _this.config.twitter.reconnect_attempts )
            {
                return _this.emit('error', new Error('@destroy: Too many reconnection attempts'));
            }

            _this.stream(); // Re-connect to stream
        });
    });

}

module.exports = Twitter;

The above is just the Twitter portion. Note that we actually extend, and therefore inherit, the EventEmitter object. This lets us create and emit our own events. In this case, we emit a 'tweet' and 'error' event - our application can listen for these.

Next, let's look at lib/mysql/index.js:

var MysqlClient = require('mysql'),
    EventEmitter = require('events').EventEmitter,
    Connection;

/**
 * Setup
 * Get Config, "extend" EventEmitter
 */
function Mysql(config)
{

    if (!(this instanceof Mysql)) return new Mysql(config);

    EventEmitter.call(this);

    this.config = config;
    this.connected = false;

};

Mysql.prototype = Object.create(EventEmitter.prototype);


/**
 * Setup Connection from config, and error handler
 */
Mysql.prototype.init = function()
{

    var _this = this;

    Connection = MysqlClient.createConnection(
    {
        host     : _this.config.mysql.host,
        user     : _this.config.mysql.username,
        password : _this.config.mysql.password,
        database : _this.config.mysql.database,
        port     : _this.config.mysql.port
    });

    Connection.on('error', _this.handleError);

    _this.connect();

}

/**
 * Connect and emit connection upon success
 */
Mysql.prototype.connect = function()
{
    var _this = this;

    Connection.connect(function(err)
    {
        if( err )
        {
            return _this.handleError(err);
        }

        _this.connected = true;
        _this.emit('connect');
    });
}

/**
 * Handle any error received
 */
Mysql.prototype.handleError = function(err)
{   
    var _this = this;

    // If we lost connection, attempt reconnect
    if( err.code === 'PROTOCOL_CONNECTION_LOST' )
    {
        _this.connected = false;

        // Re-initialize application
        Connection.removeListener('error', _this.handleError) // Remove error handler, in case if effects GC
        Connection = null; // Remove reference to Connection
        return _this.init(); // Create new Connection and reset Event Listener(s)
    }

    _this.emit('error', err);
}


/**
 * Create Tweet
 * Expecting: { 
 *      tw_id: xxx,
 *      tw_handle: xxx,
 *      tw_img: xxx,
 *      tw_content: xxx,
 *      tw_timestamp: xxx
 * }
 */
Mysql.prototype.create = function(data)
{   
    var _this = this;

    // Check if its a date object first (else assume string in twitter api format)
    if( data.tw_timestamp instanceof Date === false )
    {
        data.tw_timestamp = _this.twitterDate(data.tw_timestamp);
    }

    // Performs auto-escaping
    var query = Connection.query('INSERT INTO tweets SET ?', data, function(err, result)
                {
                    if( err )
                    {
                        return _this.handleError(err);
                    }



                    _this.emit('insert', data);
                });
}

Mysql.prototype.twitterDate = function(date)
{
    if( typeof date === 'undefined' || date === '')
    {
        return null;
    }
    return new Date(Date.parse(date.replace(/( \+)/, ' UTC$1')));
    //sample: Wed Mar 13 09:06:07 +0000 2013 
}

module.exports = Mysql;

Very similar to our twitter class, we are inheriting from the EventEmitter object. We use a handleError() method to handle all errors, and reconnect to the database if necessary. We also handle creating a tweet, emiting an 'insert' event upon successful insertion. This also fires an 'error' and 'connect' event.

Implementation

Let's now see how this is all implemented together. This is how our app.js could now look:

var Config = require('./config'), // Require an external config.js file
    Mysql = require('./lib/mysql'),
    Twitter = require('./lib/twitter');

var conf = new Config(),
    mysql = new Mysql(conf),
    twitter = new Twitter(conf);


/****************************************
* Error handlers ( Perhaps send to file logs in production )
/****************************/

mysql.on('error', function(err)
{
    console.log(err);
});

twitter.on('error', function(err)
{
    console.log(err);
});


/****************************************
* Application handlers
/****************************/

// Insert tweet into db
twitter.on('tweet', function(rawTweet)
{
    mysql.create({
        tw_id: rawTweet.id_str,
        tw_handle: rawTweet.user.screen_name,
        tw_img: rawTweet.user.profile_image_url_https,
        tw_content: rawTweet.text,
        tw_timestamp: rawTweet.created_at,
    });
});

// Listen for mysql connection first
mysql.on('connect', function() {
    twitter.stream()   
});

// Optionally include this
mysql.on('insert', function(tweet)
{
    console.log('Tweet inserted:', tweet);
});

// FIRE!
mysql.init();

Most notably, look how clean our main application code now is. We're simply listening to events, and firing some code based on them. Our business logic classes take care of the heavy lifting.

In a larger application, the code inside of our app.js could even be moved to its own module, or be just one of many node applications which work together to form one larget app. The premise of "many, small modules/apps" is powerful in Node.

If you're interested in a TL;DR, here's a gist of the fancy event emitting.

comments powered by Disqus