Dynamodb Stream Api in NodeJS
08-10-2020Nodejs Stream Api
const config = require('./config.js');
const AWS = require('aws-sdk');
// Set the region
AWS.config.update(config.aws_remote_config);
// Create the DynamoDB service object
const db = new AWS.DynamoDBStreams({apiVersion: 'latest'});
exports.handler = async (event) => {
let lastEvaluatedShardId = null;
let params = {
"ExclusiveStartShardId": null,
"Limit": 100,
"StreamArn": config.aws_stream_arn
};
const describeStreamResult = await db.describeStream(params).promise();
do {
let shards = describeStreamResult.StreamDescription.Shards;
let shardIndex=0;
for (const shard of shards) {
let params = {
"ShardId": shard.ShardId,
"ShardIteratorType": "TRIM_HORIZON",
"StreamArn": config.aws_stream_arn
};
let processedRecordCount = 0;
let maxItemCount = 100;
const shardIterator = await db.getShardIterator(params).promise();
let currentShardIter = shardIterator.ShardIterator;
while (currentShardIter !== "undefined" && currentShardIter !== undefined && processedRecordCount < maxItemCount) {
let params = {
"ShardIterator": currentShardIter
};
let recordData = await db.getRecords(params).promise();
currentShardIter = recordData.NextShardIterator;
for (let item of recordData.Records) {
console.log(JSON.stringify(item));
}
processedRecordCount += recordData.Records.length;
}
console.log("Shard index: "+(shardIndex++));
}
lastEvaluatedShardId = describeStreamResult.StreamDescription.LastEvaluatedShardId;
console.log(lastEvaluatedShardId);
} while (lastEvaluatedShardId !== null && lastEvaluatedShardId !== undefined);
console.log("Finished");
}
config.js File
module.exports = {
aws_table_name: 'Currencies',
aws_local_config: {
//Provide details for local configuration
},
aws_remote_config: {
accessKeyId: 'AKIAT6QDYTERBB3JQCE4',
secretAccessKey: 't0ZIzMu1BxqQJc4DjGkSi/MxMsAIsWnFY',
region: 'us-east-2',
},
aws_stream_arn:'arn:aws:dynamodb:us-east-2:2716345:table/Currencies/stream/2020-10-08T04:17:33.823'
};