Streams, Promises and Throttles

Recently in my journey I came across a nice little challenge to read a really large csv, pull data from it and then call a couple of APIs IOT to get some data then output a CSV after processing was complete.  The file I was provided basically contained a long list if IDs which I needed to use to get information from one API then use that information to call Watson.  I hadn’t flexed my muscles with the stream API for a while so it was fun.  I just wanted to share this example in case other are looking for a way to solve a similar problem.

const fs = require('fs');
const axios = require('axios');
const csv = require('csv');
const throttle = require('throttle')
const AssistantV1 = require('watson-developer-cloud/assistant/v1');
const {DataStream} = require('scramjet');
const _ = require('lodash');

// Config
const WA_USERNAME = 'my user name';
const WA_PASSWORD = 'my passwrod';
const WA_WORKSPACE_ID = 'a workspace ID';
const REST_ENDPOINT_URL = 'https://my-custom-rest-endpoint.com/service';
const REST_ENDPOINT_API_KEY= 'my API KEY';
const INPUT_FILE = 'inputfile.csv';

// Create a read stream to read the file
const readStream = fs.createReadStream(`./${INPUT_FILE}`, {encodidng:'utf8'});

//Setup a throttle so our REST ENDPOINT doesn't get overloaded
const choke = new throttle(300);

// Create our Watson Instance
const WA = new AssistantV1({
    version: '2018-02-16',
    username: WA_USERNAME,
    password: WA_PASSWORD,
    url: 'https://gateway.watsonplatform.net/assistant/api', // default
    headers: {
        'X-Watson-Learning-Opt-Out': 'true'
    }
});

//promisify the watson message method
const ask = async (textInput) => {
    const params = {
      workspace_id: WA_WORKSPACE_ID,
      input: {'text': textInput}
    };
  
    return new Promise((resolve, reject) => {
      WA.message(params, function (err, response) {
        if (err) {
          reject(err);
        } else {
          resolve(response);
        }
      });
    });
  };

// Process each ID
const getIntent = async (record) => {
    let id = record[0];
    if(id){
        id = id.trim();
    }
    console.log(id);

    return await axios.get(REST_ENDPOINT_URL+id,
    {
        headers: {
            'Content-Type': 'application/json',
            'x-vcc_auth_token': REST_ENDPOINT_API_KEY
        }
    }).then(response => {
        // console.log(response.data.doc.wa.input.text);
        const inputText = _.get(response, 'data.doc.wa.input.text', '');
        return ask(inputText, WORKSPACE_ID);
    }).then((response => {
        // console.log(response);
        let res = [];
        res[0] = id;
        res[1] = _.get(response, 'intents[0].intent', '');
        res[2] = _.get(response, 'intents[0].confidence', '');
        
        // console.log(res);
        return res;
    })).catch((error) => {
        console.error(error);
    });
}

// Write to file
const timestamp = new Date();
const outputFileName = `output/output${timestamp.toISOString()}.csv`;
const writeStream = fs.createWriteStream(outputFileName);

// Start the stream and pipe the data to the csv parser
DataStream.from( readStream.pipe(choke).pipe(csv.parse()) )
    .map(getIntent(record))
    .pipe(csv.stringify({columns: ['id','intent', 'confidence'], header: true}))
    .pipe(writeStream);

First we import our required libraries and set our configuration variables.

Next we create a read stream for our file.  This gives us a pipe to work with and ensure we create a memory leak by storing every record and API response in memory an longer than we have to.

Next we setup a throttle (https://www.npmjs.com/package/throttle).  This was added because the custom API endpoint I was using couldn’t handle more than a couple connections at a time.  You could scale this up or down depending on your requirements.

After establishing those we setup our Watson Assistant call (https://github.com/watson-developer-cloud/node-sdk#assistant-v1).  I’ve promiseified the call so I can chain it later.

Next we setup our transformation work horse.  This async function takes an incoming record ID sends it to our first API, once that responses it calls the 2nd API (Watson) and once that responds we return create a result in the format of an array (the format our subsequent pipe will expect).

Final I setup a write stream to output the result to a CSV file.

NOW FOR THE EXECUTION PART!

Let break down what’s happening in the last section line by line since it’s a little tricky to understand at first glance.

DataStream.from( 
  readStream
  .pipe(choke)
  .pipe(csv.parse())
)
    .map(getIntent(record))
    .pipe(csv.stringify({columns: ['id','intent', 'confidence'], header: true}))
    .pipe(writeStream);

Line 1: I’m using the scramjet library’s DataStream as a pass through.  This method takes a stream and then provides a number of useful methods we can use in conjunction with our stream.  Most importantly the .map method allows me to use a Promise mid-stream.  This isn’t available OOTB with Node (at least not when I tried it).  The argument I provide the from() method follows:

Line 2: readStream, which I created above, loads in our .csv file

Line 3: pipes that data through the throttle I establish.  Each ID I was working with represented about 150 bytes so I set the throttle to 300 so I could process about 2 per second.

Line 4: this takes the information from the throttle and pipes it into the csv.parse method.  This streamable library makes working with CSV files a breeze.

Line 5: Close from() method.

Line 6: As things come through our pipe we hand them off the DataStreams map() method.  This method ensures that promises resolve prior to handing things off to the next pipe.  We pass the map our getIntent method and it processes our promises.

Line 7: Takes the resulting array from our transformer and pipes it to the csv.stringify method.  This makes short work of the array and applies a custom header for our CSV file.

Line 8: Writes the output from the last to our designated file.

That’s it!

The file I was processing took several hours to finish but no memory leaks or crashes prevented it from completing successfully.

Hope you find this useful.

Tell me what you REALLY think...

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>