Often applications, especially if we are talking about e-Commerce or enterprise software, consist of complex repeatable scenarios(workflows) that must be executed in a response to some event.
Let's take a look at a typical example of such workflow that we can see in e-Commerce software — order processing:
In the world of serveless solutions, each action(node) of the workflow can be represented as a corresponding AWS Lambda function. It will be short, easy to test and will have a single responsibility — doing its actual job. But who will be responsible for coordinating those functions, chaining them, checking conditions and deciding what to do next? Earlier, it was the responsibility of the developer to implement the way multiple connected Lambda functions should interact. But recently Amazon has announced a new service that allows coordinating various AWS services by using the workflow abstraction and visual tools — AWS Step Functions.
In this article, I'm going to demonstrate some basic features of AWS Step Functions and how they can be executed in a response to any event.
We'll create a simple workflow that will be executed once a new file is uploaded to an S3 bucket. It will process it, save contents to the DynamoDB, move the file to the "processed" folder and notify the user via email in 10 minutes after processing. It will be something like this:
Input files will contain sensor data in the CSV format:
<SENSOR_UID>,<TIMESTAMP>,<VALUE>
For example:
0000-0001,1483906366,10.10
0000-0011,1483906376,-5.10
0000-0093,1483906376,3.80
0000-0107,1483906520,115.45
0000-0001,1483906520,27.54
0000-2101,1483908322,340.00
0000-0001,1483908322,-12.93
0000-0001,1483906366,00.35
Before we proceed with creating our first workflow, we need to go over the next checklist to ensure all required software is installed and an account with proper permissions is used.
We have four nodes in our workflow, but one of them describes a wait state, meaning the workflow itself requires three AWS Lambda functions.
Unfortunately, there is no way at the moment to execute a Step Function as a response to some event. Instead, the execution should be explicitly started using the API. To execute our workflow each time file is uploaded, we can simply create an additional Lambda function that will be triggered by the file upload and will execute our workflow. So we will need four functions: executeWorkflow, processFile, moveFile, and sendEmail.
First of all, ensure serverless is installed. It can be done using the following command:
sudo npm install -g serverless serverless-step-functions
Once the serverless is installed, a new project should be created:
mkdir step-functions-demo
cd step-functions-demo
serverless create -t aws-nodejs
npm init -y
npm install async aws-sdk --save
The serverless framework simplifies our lives and allows to quickly create Lambda-backed applications without a need of writing long CloudFormation scripts and deploying functions manually.
Update: This section was missing initially. Kudos to HackerNews user djhworld who pointed that out! AWS Step Functions doesn’t have an option to trigger the execution by events similar to AWS Lambda: S3, IoT, SNS, etc. The only alternative at the moment is explicitly executing a step function using the AWS SDK.
While it may seem like an issue at first, the problem can be easily solved by wiring your event to a separate Lambda that acts as a proxy and passes its input parameters to the target step function.
Here is a related code snippet:
module.exports.executeWorkflow = function (event, context) {
if ('Records' in event) {
const stateMachineName = process.env.STEP_FUNCTION_NAME;
const stepfunctions = new AWS.StepFunctions();
async.waterfall([
(next) => {
console.log('Fetching the list of available workflows');
return stepfunctions.listStateMachines({}, next);
},
(data, next) => {
console.log(data, next);
console.log('Searching for the step function', data);
for (var i = 0; i < data.stateMachines.length; i++) {
const item = data.stateMachines[i];
if (item.name === stateMachineName) {
console.log('Found the step function', item);
return next(null, item.stateMachineArn);
}
}
throw 'Step function with the given name doesn\'t exist';
},
(stateMachineArn, next) => {
console.log('Executing the step function', stateMachineArn);
const eventData = event.Records[0];
return stepfunctions.startExecution({
stateMachineArn: stateMachineArn,
input: JSON.stringify({ objectKey: eventData.s3.object.key, bucketName: eventData.s3.bucket.name })
}, next);
},
() => {
return context.succeed('OK');
}
]);
} else {
return context.fail('Incoming message doesn\'t contain "Records", it will be ignored', event);
}
};
Note the Lambda function uses the STEPFUNCTIONNAME environment variable. This workaround was needed because at the moment the post was written, we didn't have a way to reference a Step Function ARN in other parts of the serverless configuration.
Let's begin with the Lambda function that will be responsible for parsing the CSV and saving it to a DynamoDB table.
Here is a code fragment used for processing:
module.exports.processFile = (event, context, callback) => {
const csv = require('fast-csv');
const s3 = new AWS.S3();
const dynamodb = new AWS.DynamoDB();
async.waterfall([
(next) => {
console.log('Waiting until the uploaded object becomes available',
'[bucket = ', event.bucketName, ', key = ',
event.objectKey, ' ]');
s3.waitFor('objectExists', {
Bucket: event.bucketName,
Key: event.objectKey
}, next);
},
(result, next) => {
console.log('Downloading the CSV file from S3 [bucket = ',
event.bucketName, ', key = ', event.objectKey, ' ]');
const csvStream = s3.getObject({
Bucket: event.bucketName,
Key: event.objectKey
}).createReadStream();
csv.fromStream(csvStream).on('data', (data) => {
dynamodb.putItem({
Item: {
'sensor_id': {
'S': data[0]
},
'timestamp': {
'N': data[1]
},
'value': {
'N': data[2]
}
},
TableName: "sensor_data"
});
});
next(null);
},
], (err, results) => {
if (err) {
console.log('Failed execution');
return context.fail('Execution failed');
} else {
console.log('Successful execution');
return context.succeed(event);
}
});
};
Note the first function in the waterfall chain. By adding an AWS waiter call we are protecting against the drawback of a distributed nature of the AWS S3. Sometimes, when a Lambda function is being triggered by the S3 event, an attempt to read object results in "Object doesn't exist" error. It's because the S3 objects are eventually consistent. More details can be found in the official documentation.
Let's proceed with the second Lambda function that will be moving files to the "processed" folder. Here is the relevant code fragment
module.exports.moveFile = function (event, context) {
const objectKey = event.objectKey;
const bucketName = event.bucketName;
const newLocation = 'processed/' + objectKey;
const targetBucket = process.env.TARGET_BUCKET;
const s3 = new AWS.S3();
console.log('Moving "', objectKey, '" to new location "', newLocation, '"');
async.waterfall([
(next) => {
s3.copyObject({
Bucket: targetBucket,
Key: newLocation,
CopySource: bucketName + '/' + encodeURIComponent(objectKey)
}, next);
},
(data, next) => {
s3.waitFor('objectExists', {
Bucket: targetBucket,
Key: newLocation
}, next);
},
(data, next) => {
s3.deleteObject({
Bucket: bucketName,
Key: objectKey
}, next);
}
], (error) => {
if (error) {
console.log('Failed to move file', error);
context.fail();
} else {
context.succeed({
bucketName: event.bucketName,
objectKey: event.objectKey,
newLocation: newLocation
});
}
});
};
Once the copying is complete, we can proceed with sending a notification.
My favorite part about the AWS is the fact that in 99% of cases Amazon has an easy-to-use and scalable service to solve the problem. Of course, there is one for sending emails — Amazon Simple Email Service. We'll use it to notify ourselves once the file processing is finished. The Lambda function responsible for it is shown below:
module.exports.sendEmail = function (event, context) {
const objectKey = event.objectKey;
const bucketName = event.sourceBucket;
const ses = new AWS.SES();
console.log('Sending an email about "', objectKey, '"');
async.waterfall([
(next) => {
ses.sendEmail({
Destination: {
ToAddresses: [process.env.DEST_EMAIL]
},
Message: {
Body: {
Text: {
Data: 'Processed file ' + objectKey
}
},
Subject: {
Data: 'File processed'
}
},
Source: process.env.DEST_EMAIL
}, next);
}], (err, results) => {
if (err) {
console.log('Failed to send an email', err);
context.fail();
} else {
context.succeed("OK");
}
});
};
As I mentioned before, we are going to notify the user in 10 minutes. As you can see, there are no any delays in the Lambda code. Furthermore, having such delays would affect the total cost of the solution, since AWS Lambda costs are based on the time of the execution. The more efficient our Lambda functions, the less money they cost. They delay instead will be implemented as a part of the workflow.
Luckily there is a Step Functions plugin for serverless that can be used to describe workflows using YAML:
stepFunctions:
stateMachines:
stepfunctionsdemo:
Comment: "Example StepFunction"
StartAt: ProcessFile
States:
ProcessFile:
Type: Task
Resource: processFile
Next: MoveFile
MoveFile:
Type: Task
Resource: moveFIle
Next: SendEmail
SendEmail:
Type: Task
Resource: sendEmail
End: true
Recently Amazon announced the support for the Step Functions in CloudFormation templates, but using the serverless still saves a lot of time and reduces the amount of a boilerplate code.
To deploy functions and create all the needed infrastructure run following commands of the serverless:
serverless deploy
serverless deploy stepf
After the deployment is complete, try uploading a CSV file to the source bucket and check the AWS Step Functions console. Select the created step function and check it's executions. If everything was configured correctly, you should see a successful execution:
It's possible to get execution logs, input, and output for every node by clicking it. Detailed logs for each Lambda function can be found in the CloudWatch console also.
Here are some common issues that may prevent the code from working correctly:
AWS Step Functions is a rather young service, meaning it still has some space for improvements. Here are some limitations we faced while working on our projects:
AWS Step Functions is a great tool to create sophisticated workflows and state machines for zero-infrastructure applications. There are still many areas which should be improved, but even in its current state AWS Step Functions can be used to solve real-world problems.
Here are some documents that can be useful when developing your own step functions: