From f99c4adee2f74e49066c01034e05958da75f28c3 Mon Sep 17 00:00:00 2001 From: Rishabh Kumar Maurya Date: Thu, 29 Sep 2016 01:23:19 +0530 Subject: [PATCH] #7 - context.succeed would be called multiple times --- src/kinesis_lambda_es.js | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/kinesis_lambda_es.js b/src/kinesis_lambda_es.js index c042c46..fe89cfa 100644 --- a/src/kinesis_lambda_es.js +++ b/src/kinesis_lambda_es.js @@ -17,6 +17,7 @@ /* == Imports == */ var AWS = require('aws-sdk'); var path = require('path'); +var when = require('when'); /* == Globals == */ var esDomain = { @@ -39,17 +40,21 @@ var creds = new AWS.EnvironmentCredentials('AWS'); /* Lambda "main": Execution begins here */ exports.handler = function(event, context) { console.log(JSON.stringify(event, null, ' ')); + var promises = []; event.Records.forEach(function(record) { var jsonDoc = new Buffer(record.kinesis.data, 'base64'); - postToES(jsonDoc.toString(), context); + postToES(jsonDoc.toString(), context, promises); }); + when.all(promises).then(function(res) { + context.succeed('Lambda Event Processed'); + }) } /* * Post the given document to Elasticsearch */ -function postToES(doc, context) { +function postToES(doc, context, promises) { var req = new AWS.HttpRequest(endpoint); req.method = 'POST'; @@ -63,6 +68,7 @@ function postToES(doc, context) { signer.addAuthorization(creds, new Date()); var send = new AWS.NodeHttpClient(); + var deferred = when.defer(); send.handleRequest(req, null, function(httpResp) { var respBody = ''; httpResp.on('data', function (chunk) { @@ -70,10 +76,11 @@ function postToES(doc, context) { }); httpResp.on('end', function (chunk) { console.log('Response: ' + respBody); - context.succeed('Lambda added document ' + doc); + promises.push(deferred.reject); }); }, function(err) { console.log('Error: ' + err); context.fail('Lambda failed with error ' + err); + promises.push(deferred.reject); }); }