Set up the Lambda function that listens to the DynamoDB stream and writes the items to Firehose.
Step 1 – Login to AWS lambda console. Click “Create function”
Step 2 – Provide “function name” and select Runtime as Node.js. Select “Create a new role with basic Lambda permissions” for execution role. Click Create function
Step 3 – On the new lambda function screen, scroll down to “execution role” and click on link “View the<role name> role on the IAM console. We have to add access to DynamoDB and Kinesis Firehose.
Step 4 – On the IAM screen, click on “Attach policies” button
Step 5 – Search “AWSLambdaDynamoDBExecutionRole” and “AmazonKinesisFirehoseFullAccess“. Select both and click Attach policy
Step 6 – Now on lambda function screen, Click on Add trigger.
Step 4 – Provide DynamoDB details on the Add trigger screen and click Add
Step 7 – The DynamoDB trigger is added now
Step 8 – Now on function code screen, add the Node.js code. This function will send the newly added or updated DynamoDB record to Kinesis Firehose. Provide the region and the Firehose Delivery stream name. Change the function as per your requirement.
'use strict'; const AWS = require('aws-sdk'); var parse = AWS.DynamoDB.Converter.output; const firehose = new AWS.Firehose({ region: 'ap-southeast-2' }); exports.handler = (event, context, callback) => { var fireHoseInput = []; event.Records.forEach((record) => { console.log(record); if ((record.eventName == "INSERT")||(record.eventName == "MODIFY")) { fireHoseInput.push({ Data: JSON.stringify(parse({ "M": record.dynamodb.NewImage })) }); } }); var params = { DeliveryStreamName: 'TestLoc', Records: fireHoseInput }; if(fireHoseInput.length != 0) { firehose.putRecordBatch(params, function (err, data) { if (err) console.log(err, err.stack); // an error occurred else console.log(data); // successful response }); } else { console.log("No data to transmit"); } callback(null, `Successfully processed records.`); };
Step 9 -Set timeout for function to 1 minute and Save it.
Lambda function is ready to receive DynamoDB stream data and then send it to Kinesis Firehose delivery stream in batches.