Easily Create Complex Workflows With AWS Step Functions
Volodymyr Rudyi
Warning: you are reading a very old blog post on the topic. In fact, it's now a part of history and kept here for archival purposes. Have a great day ahead! And read something new on the topic!
If you are interested in implementing workflows and related tools, check our recent blog post about Temporal.io:
Often applications, especially if we are talking about e-Commerce or enterprise software, consist of complex repeatable scenarios(workflows) that must be executed in a response to some event.
Let's take a look at a typical example of such workflow that we can see in e-Commerce software — order processing:
In the world of serveless solutions, each action(node) of the workflow can be represented as a corresponding AWS Lambda function. It will be short, easy to test and will have a single responsibility — doing its actual job. But who will be responsible for coordinating those functions, chaining them, checking conditions and deciding what to do next? Earlier, it was the responsibility of the developer to implement the way multiple connected Lambda functions should interact. But recently Amazon has announced a new service that allows coordinating various AWS services by using the workflow abstraction and visual tools — AWS Step Functions.
In this article, I'm going to demonstrate some basic features of AWS Step Functions and how they can be executed in a response to any event.
We'll create a simple workflow that will be executed once a new file is uploaded to an S3 bucket. It will process it, save contents to the DynamoDB, move the file to the "processed" folder and notify the user via email in 10 minutes after processing. It will be something like this:
Input files will contain sensor data in the CSV format:
<SENSOR_UID>,<TIMESTAMP>,<VALUE>
For example:
0000-0001,1483906366,10.10
0000-0011,1483906376,-5.10
0000-0093,1483906376,3.80
0000-0107,1483906520,115.45
0000-0001,1483906520,27.54
0000-2101,1483908322,340.00
0000-0001,1483908322,-12.93
0000-0001,1483906366,00.35
Preparation
Before we proceed with creating our first workflow, we need to go over the next checklist to ensure all required software is installed and an account with proper permissions is used.
- Check that AWS CLI is installed. You should be able to execute "aws --version" in your command line successfully. In a case, if the AWS CLI is not installed, please follow instructions from the AWS CLI Page.
- Double check AWS credentials are correctly configured. The easiest way to do it is to execute aws configure command.
- Ensure the current user has enough permissions to view/create CloudFormation Stacks, view/create AWS Lambda functions, view/create AWS Step Functions, view/create buckets.
- Since we will be writing our Lambda functions using JavaScript, ensure NodeJS & NPM are installed. Please refer to the official documentation to find instructions on how to install it for your OS.
Creating Lambda Functions
We have four nodes in our workflow, but one of them describes a wait state, meaning the workflow itself requires three AWS Lambda functions.
Unfortunately, there is no way at the moment to execute a Step Function as a response to some event. Instead, the execution should be explicitly started using the API. To execute our workflow each time file is uploaded, we can simply create an additional Lambda function that will be triggered by the file upload and will execute our workflow. So we will need four functions: executeWorkflow, processFile, moveFile, and sendEmail.
Create a new Serverless.io project
First of all, ensure serverless is installed. It can be done using the following command:
sudo npm install -g serverless serverless-step-functions
Once the serverless is installed, a new project should be created:
mkdir step-functions-demo
cd step-functions-demo
serverless create -t aws-nodejs
npm init -y
npm install async aws-sdk --save
The serverless framework simplifies our lives and allows to quickly create Lambda-backed applications without a need of writing long CloudFormation scripts and deploying functions manually.
How to trigger an execution of a step function by an S3 event?
Update: This section was missing initially. Kudos to HackerNews user djhworld who pointed that out! AWS Step Functions doesn’t have an option to trigger the execution by events similar to AWS Lambda: S3, IoT, SNS, etc. The only alternative at the moment is explicitly executing a step function using the AWS SDK.
While it may seem like an issue at first, the problem can be easily solved by wiring your event to a separate Lambda that acts as a proxy and passes its input parameters to the target step function.
Here is a related code snippet:
module.exports.executeWorkflow = function (event, context) {
if ('Records' in event) {
const stateMachineName = process.env.STEP_FUNCTION_NAME;
const stepfunctions = new AWS.StepFunctions();
async.waterfall([
(next) => {
console.log('Fetching the list of available workflows');
return stepfunctions.listStateMachines({}, next);
},
(data, next) => {
console.log(data, next);
console.log('Searching for the step function', data);
for (var i = 0; i < data.stateMachines.length; i++) {
const item = data.stateMachines[i];
if (item.name === stateMachineName) {
console.log('Found the step function', item);
return next(null, item.stateMachineArn);
}
}
throw 'Step function with the given name doesn\'t exist';
},
(stateMachineArn, next) => {
console.log('Executing the step function', stateMachineArn);
const eventData = event.Records[0];
return stepfunctions.startExecution({
stateMachineArn: stateMachineArn,
input: JSON.stringify({ objectKey: eventData.s3.object.key, bucketName: eventData.s3.bucket.name })
}, next);
},
() => {
return context.succeed('OK');
}
]);
} else {
return context.fail('Incoming message doesn\'t contain "Records", it will be ignored', event);
}
};
Note the Lambda function uses the STEPFUNCTIONNAME environment variable. This workaround was needed because at the moment the post was written, we didn't have a way to reference a Step Function ARN in other parts of the serverless configuration.
File processing Lambda
Let's begin with the Lambda function that will be responsible for parsing the CSV and saving it to a DynamoDB table.
Here is a code fragment used for processing:
module.exports.processFile = (event, context, callback) => {
const csv = require('fast-csv');
const s3 = new AWS.S3();
const dynamodb = new AWS.DynamoDB();
async.waterfall([
(next) => {
console.log('Waiting until the uploaded object becomes available',
'[bucket = ', event.bucketName, ', key = ',
event.objectKey, ' ]');
s3.waitFor('objectExists', {
Bucket: event.bucketName,
Key: event.objectKey
}, next);
},
(result, next) => {
console.log('Downloading the CSV file from S3 [bucket = ',
event.bucketName, ', key = ', event.objectKey, ' ]');
const csvStream = s3.getObject({
Bucket: event.bucketName,
Key: event.objectKey
}).createReadStream();
csv.fromStream(csvStream).on('data', (data) => {
dynamodb.putItem({
Item: {
'sensor_id': {
'S': data[0]
},
'timestamp': {
'N': data[1]
},
'value': {
'N': data[2]
}
},
TableName: "sensor_data"
});
});
next(null);
},
], (err, results) => {
if (err) {
console.log('Failed execution');
return context.fail('Execution failed');
} else {
console.log('Successful execution');
return context.succeed(event);
}
});
};
Note the first function in the waterfall chain. By adding an AWS waiter call we are protecting against the drawback of a distributed nature of the AWS S3. Sometimes, when a Lambda function is being triggered by the S3 event, an attempt to read object results in "Object doesn't exist" error. It's because the S3 objects are eventually consistent. More details can be found in the official documentation.
Lambda to move processed files
Let's proceed with the second Lambda function that will be moving files to the "processed" folder. Here is the relevant code fragment
module.exports.moveFile = function (event, context) {
const objectKey = event.objectKey;
const bucketName = event.bucketName;
const newLocation = 'processed/' + objectKey;
const targetBucket = process.env.TARGET_BUCKET;
const s3 = new AWS.S3();
console.log('Moving "', objectKey, '" to new location "', newLocation, '"');
async.waterfall([
(next) => {
s3.copyObject({
Bucket: targetBucket,
Key: newLocation,
CopySource: bucketName + '/' + encodeURIComponent(objectKey)
}, next);
},
(data, next) => {
s3.waitFor('objectExists', {
Bucket: targetBucket,
Key: newLocation
}, next);
},
(data, next) => {
s3.deleteObject({
Bucket: bucketName,
Key: objectKey
}, next);
}
], (error) => {
if (error) {
console.log('Failed to move file', error);
context.fail();
} else {
context.succeed({
bucketName: event.bucketName,
objectKey: event.objectKey,
newLocation: newLocation
});
}
});
};
Once the copying is complete, we can proceed with sending a notification.
Lambda to send email notifications
My favorite part about the AWS is the fact that in 99% of cases Amazon has an easy-to-use and scalable service to solve the problem. Of course, there is one for sending emails — Amazon Simple Email Service. We'll use it to notify ourselves once the file processing is finished. The Lambda function responsible for it is shown below:
module.exports.sendEmail = function (event, context) {
const objectKey = event.objectKey;
const bucketName = event.sourceBucket;
const ses = new AWS.SES();
console.log('Sending an email about "', objectKey, '"');
async.waterfall([
(next) => {
ses.sendEmail({
Destination: {
ToAddresses: [process.env.DEST_EMAIL]
},
Message: {
Body: {
Text: {
Data: 'Processed file ' + objectKey
}
},
Subject: {
Data: 'File processed'
}
},
Source: process.env.DEST_EMAIL
}, next);
}], (err, results) => {
if (err) {
console.log('Failed to send an email', err);
context.fail();
} else {
context.succeed("OK");
}
});
};
As I mentioned before, we are going to notify the user in 10 minutes. As you can see, there are no any delays in the Lambda code. Furthermore, having such delays would affect the total cost of the solution, since AWS Lambda costs are based on the time of the execution. The more efficient our Lambda functions, the less money they cost. They delay instead will be implemented as a part of the workflow.
Creating the workflow
Luckily there is a Step Functions plugin for serverless that can be used to describe workflows using YAML:
stepFunctions:
stateMachines:
stepfunctionsdemo:
Comment: "Example StepFunction"
StartAt: ProcessFile
States:
ProcessFile:
Type: Task
Resource: processFile
Next: MoveFile
MoveFile:
Type: Task
Resource: moveFIle
Next: SendEmail
SendEmail:
Type: Task
Resource: sendEmail
End: true
Recently Amazon announced the support for the Step Functions in CloudFormation templates, but using the serverless still saves a lot of time and reduces the amount of a boilerplate code.
Deployment
To deploy functions and create all the needed infrastructure run following commands of the serverless:
serverless deploy
serverless deploy stepf
After the deployment is complete, try uploading a CSV file to the source bucket and check the AWS Step Functions console. Select the created step function and check it's executions. If everything was configured correctly, you should see a successful execution:
It's possible to get execution logs, input, and output for every node by clicking it. Detailed logs for each Lambda function can be found in the CloudWatch console also.
Common issues and ways to solve them
Here are some common issues that may prevent the code from working correctly:
- Step function and underlying Lambda functions work correctly, but no log records are present in the CloudWatch. In 99% the root cause is missing CloudWatch permissions for the Lambda execution role. Remaining 1% can be split into two parts — 0.5% for cases when you are looking for logs in the incorrect region, another 0.5% cases are caused by the fact it takes several seconds for log streams to appear in the CloudWatch console.
- Step-function is not being executed — make sure the Lambda execution role has corresponding permissions to list and execute step functions.
- AWS Lambda timeout happens even if the function doesn't have any time-consuming operations in it and "Task timed out after 30.00 seconds" message appears. Such issue occurs when you have a "waitFor" call in your code, but don't have enough permissions to execute the GetObject operation. In this case, AWS waiter will continue querying the S3 again and again (20 times each 5 seconds by default).
Limitations
AWS Step Functions is a rather young service, meaning it still has some space for improvements. Here are some limitations we faced while working on our projects:
- Step Functions Console display only last 1000 executions for each step function. Once the number of executions crosses 1000, this value will be frozen by making the console less informative.
- Deleting a step function and creating a new one with the same name immediately results in a very strange behavior of the Step Functions console. On each page refresh, it may display data both from new and old step function.
- Some of the step functions that don't have active executions still can't be deleted and will remain in the "Deleting" state forever.
Conclusion
AWS Step Functions is a great tool to create sophisticated workflows and state machines for zero-infrastructure applications. There are still many areas which should be improved, but even in its current state AWS Step Functions can be used to solve real-world problems.
Useful links
Here are some documents that can be useful when developing your own step functions: