Within the past year, AWS has doubled-down on its mission to become the data-centric cloud company. With services that include having a 45-foot shipping container transfer data from your on-premises data center to AWS, a library full of machine learning algorithms, Lex (the conversational robot), and many more services, it is easy to get overwhelmed when starting big data on AWS.
We wanted to take some time to demonstrate a sample serverless data architecture combining several AWS services: Kinesis, Athena, S3, Lambda, etc. to provide a guided experience for those who may not know how to start with big data on AWS. In our example, we (you included, as we’ve included the source code needed to complete this walkthrough) will build a data processing pipeline that performs two common tasks with data:
- Real-time data processing – using Amazon Kinesis Analytics to perform anomaly detection on a data stream
- Serverless querying of data – using Amazon Athena to perform SQL queries of historic data.
A common thread for both tasks above is that ANSI SQL is used in each use case, providing a common and familiar language with which we can interact with our data.
To make this blog post a bit more concrete with a scenario, let’s say that our architecture is deployed by an IoT manufacturer that makes widgets (also referred to here as devices), each of which send telemetry data back to the cloud. We (the manufacturer) want to be able to both ask questions of historical data and alert on anomalies from the widgets. Perhaps a production manager wants to know if a given widget / device is generating a lot of anomalies – this device may need to be replaced.
There are several additional AWS services used to tie together the architecture:
- Kinesis Firehose – simulates our IoT ingestion pipeline and automatically persists device data to S3 (where we’ll query it using Athena). The Firehose delivery stream is also the data source for our Kinesis Analytics application.
- Note – to keep things simple and focus on the data, we’re not using AWS IoT as our ingestion point, although this example architecture could easily be modified to use AWS IoT.
- Kinesis Streams – when Kinesis Analytics detects an anomaly, those anomalous records will then be written to a Kinesis stream.
- Lambda and SNS – a Lambda function watches the Kinesis stream, and is invoked when an anomaly is written to that stream. The Lambda function simply captures the message payload, and sends an alert email using SNS.
- CloudFormation – to make things easy to get started, we’ve modeled most of these components (as well as IAM roles and policies to keep things secure) in a CloudFormation template. That way, we can focus on the Kinesis Analytics and Athena elements of this example, rather than the required infrastructure.
The overall architecture is represented in the following graphic:
In our example, the widget (or device) data is first generated from a Python script (using the AWS SDK) that simulates data that would come from a set of IoT devices. For this demo, there are five devices, “aaaa”, “bbbb”, “cccc”, “dddd”, and (unsurprisingly) “eeee”. Again, this script is short-circuiting the data ingest process by sending data directly to a Kinesis Firehose delivery stream, instead of leveraging the AWS IoT service to handle device security and routing of messages to the appropriate AWS service.
Part One – Anomaly Detection
The code for this walkthrough can be found here. You’ll need to download the Git repo, as some of the code needs to be executed locally – specifically, the data generator script and the anomaly generator script. You’ll need Python (of course), as well as boto3 installed. Boto will need some AWS IAM credentials that it can use to send data to the Kinesis Firehose delivery stream created from the CloudFormation template; if you have an IAM access key/secret key configured with admin access, that will simplify things.
Additionally, make sure you’re running through this walkthrough in an AWS region that supports all of the AWS services used. As of this writing, the two regions that support all of the services used in this walkthrough are N. Virginia (us-east-1) and Oregon (us-west-2).
After pulling down the Git repository (https://github.com/theemadnes/iot_anomaly_analytics), deploy the CloudFormation template. Included in this CloudFormation template is the definition for almost all of the required AWS infrastructure, including IAM roles, Kinesis Firehose, SNS topic and email address (as a parameter to be filled in), Lambda function, S3 buckets and bucket policies, Kinesis stream, etc. The following screenshot shows the CloudFormation template being created after uploading the YAML template:
Once you have entered your email address (when prompted) and deployed the stack, you should get a confirmation email to subscribe to the SNS topic of anomaly detection alerts. The confirmation to the subscription should look like this:
The next step is to start the data generator using the Python scripts provided. Before starting the data_generator.py, you’ll need to update the ‘delivery_stream_id’ in settings.py to reference your Firehose delivery stream. This value can be found in the output of your CloudFormation stack as ‘DataKinesisFirehoseId”, and will be unique to your deployment:
Once that’s done, start the data generation script (not the generate single anomaly script) to start data creation and streaming to your Firehouse. Once started, the script will stream data every second to Kinesis Firehose, which in turn will both persist the data to your S3 bucket and also be accessible to Kinesis Analytics. The output of the script will look something like this:
After starting the data generator script, the next step is to create a new application in the Kinesis Analytics console. Give the application a name that makes sense to you. Define your source as the Firehouse delivery stream created in your CloudFormation stack, and reference the IAM role created for Kinesis Analytics by CloudFormation (which will include ‘KinesisAnalyticsRole’ in its name) under the section titled ‘Permission to access the stream’ –> ‘Choose an IAM role’.
At the bottom of this screen, you’ll need to make a slight modification to the data schema that was autodetected, because Kinesis Analytics will assign a data type for ‘sensorReading’ of ‘TINYINT’. This is correct for what the data generator script sends to Kinesis Firehose, but lacks the precision to handle the anomalous values our anomaly script is going to send. Click on ‘Edit schema’ and change ‘TINYINT’ to ‘SMALLINT’, and save:
It will take a few moments for your Kinesis Analytics application to start. Click ‘Exit’, which will take you back to the ‘Source’ menu. From there, click ‘Save and Continue’.
Time to define your Kinesis Application code and define the destination stream for anomalies detected. From the main application menu, click on ‘Go to SQL editor’:
First define your destination stream by clicking on the ‘Destination’ tab, and clicking ‘Add a destination’:
Select the Kinesis Stream with ‘KinesisAnomalyStream’ in its name and select the same IAM role you used when defining the source. Then click ‘Save and continue’:
Once the menu has returned to the application console, again click on ‘Go to SQL editor’ and cut and paste the SQL from kinesis_analytics_anomaly_detection.SQL into the SQL editor window. Then, click ‘Save and run SQL’ to begin the app.
This SQL code uses the random cut forest function to calculate an anomaly score for each data record. When an anomaly score greater than 4 (a threshold that can be modified in the SQL code) is found, the record is then written to the downstream Kinesis anomaly stream (which you defined as the destination stream). This Kinesis stream is an ordered stream record that triggers a Lambda function to send an email (SNS topic and email address from the CloudFormation template) to alert you that an anomaly has been detected.
To bring this back to our scenario, an example of how this would be used is that this trigger would be alerting a production manager that a sensor has produced an anomalous data point or something of that nature – maybe the device needs to be replaced? The detection of an anomaly using the random cut forest function is a machine learning algorithm that provides an absolute anomaly score for each data point, but contextualizes it with the score’s relative amount compared to the other data generated prior to the data point.
Note – In this architecture, all of the components are “pay-as-you-go” – for Kinesis Analytics, you pay for running an application by the hour. This architecture will consume 1 KPU/hr, which, if left running 24/7, will cost ~$80/month. To avoid this, stop the application when you’re not using it, and clean up your environment when you’re done with this walkthrough.
For processing these anomalous records and creating email alerts, go into the Lambda console. A Lambda function (including the string ‘AlertingLambdaFunction’ in its name) has been created via the CloudFormation stack, but you need to set the trigger for your Lambda function to the Kinesis anomaly stream, with a batch size of one (1). Select your function, and click the ‘Triggers’ tab – it won’t have any triggers yet, so click on ‘Add Trigger’:
Click the space within the dashed line to define your trigger source, and in the resulting drop-down menu select ‘Kinesis’. From there, point to your Kinesis stream, set the ‘Batch size’ to 1, and set the ‘Starting position’ to ‘Latest’, and click ‘Submit’:
To test that everything is working properly (and begin generating some anomalous data entries), run the generate_single_anomaly Python script once and confirm that you received an email alerting you of an anomaly:
In a few seconds, you should receive an alert email – notice how the ‘ANOMALY_SCORE’ is greater than the anomaly threshold defined in the SQL code (4):
You can run this as many times as you like, but make sure to give about a minute or so between executions to make sure that the anomalies do not normalize within the dataset. You can adjust the sensitivity/time decay in the Kinesis Analytics application code.
Step Two – Interactive Queries
The final piece of this is to be able to query the historic data that has been stored in your S3 bucket. Perhaps our data scientists want to identify some trends in telemetry data.
To enable these workflows, we’ll use the new serverless query engine from AWS, Athena. Amazon Athena is an interactive query service that makes it easy to analyze data (in this case, Amazon S3) using standard SQL. The service is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.
Note – We’ll preface this section by acknowledging that the queries in this walkthrough are not optimized; while the data that Athena is querying in S3 is compressed, we aren’t partitioning the data for our queries, meaning they’ll be more expensive than they otherwise could be. Athena pricing is based on the amount of data scanned in the query, so partitioning is important to both optimize cost and optimize performance.
To begin, head over to the Athena console.You’ll first need to create an external table referencing the device data in your S3 bucket. In the ‘Catalog Manager’ tab, click on ‘Add table’. Create a new database, set your database name, and give your table a name. In the example below, the database name is ‘analyticsdemo’ and the table name is ‘devicedata’; the location of the data is the S3 bucket created by the CloudFormation stack (see the stack outputs under ‘DataRepositoryBucketName’) with a trailing ‘/’:
Set the ‘Data Format’ in Step 2 to JSON. The following column names should be defined with the corresponding ‘Column type’, clicking ‘Add a column’ to add all three columns:
- ‘deviceId’ – ‘string’
- ‘sensorReading’ – ‘smallint’
- ‘readingTimestamp’ – ‘timestamp’
As previously mentioned, you won’t create partitions in this walkthrough, so click ‘Create table’ to complete the database creation. You’ll be returned to the Query Editor, and now you can begin asking questions of your data. To do so, start writing some SQL into the console. Some simple examples have been provided as part of this walkthrough in athena_examples.sql. Here’s an example, showing how many records have been received per deviceId:
Here’s another simple example that looks for sensorReading’s outside of the expect range (sensorReading < -5) or (sensorReading > 5):
Step Three – Cleanup
Once you’re doing playing around with this walkthrough, go ahead and tear down the environment. The order of operations should be:
- Stop the data generator script
- Delete your Kinesis Analytics application
- Delete your database and table in Athena
- Empty the S3 bucket (a bucket with objects in it can’t be deleted)
- Delete the CloudFormation stack
We’ve covered a lot of ground in this post. In order to support our IoT widget maker’s data objectives, we’ve created a data pipeline that takes streaming data and performed both real-time processing and batch processing. We’ve used a broad range of AWS services (including all of the current Kinesis features). And to top it off, we did it without managing any infrastructure.
To learn more about building architectures like these (and many more), contact AHEAD today to meet with us one-on-one. With over 50 combined AWS certifications, our experts have a breadth of AWS knowledge and are able to help in more ways than one. Additionally, check out some of our demonstrations offered at AWS re:Invent and sign up now for a personalized demo of your own.