{"id":2627,"date":"2021-02-19T11:05:48","date_gmt":"2021-02-19T10:05:48","guid":{"rendered":"https:\/\/blog.besharp.it\/?p=2627"},"modified":"2023-02-22T17:05:12","modified_gmt":"2023-02-22T16:05:12","slug":"orchestrating-data-analytics-and-business-intelligence-pipelines-via-step-function","status":"publish","type":"post","link":"https:\/\/blog.besharp.it\/orchestrating-data-analytics-and-business-intelligence-pipelines-via-step-function\/","title":{"rendered":"Orchestrating Data Analytics and Business Intelligence pipelines via AWS Step Functions"},"content":{"rendered":"\n

ETL pipelines on AWS usually have a linear behavior: starting from one service and ending to another one. This time though, we would like to present a more flexible setup, in which some ETL jobs could be skipped depending on data. Furthermore, some of the transformed data in our data lake need to be queried by AWS Athena in order to generate BI dashboards in QuickSight while other data partitions are used to train ad-hoc anomaly detection via Sagemaker.<\/p>\n\n\n\n

A powerful tool to orchestrate this type of ETL pipelines is the AWS StepFunctions service.<\/p>\n\n\n\n

In this article, we want to show you some of the steps involved in the creation of the pipeline as well as how many AWS services for data analytics can be used in near real-time scenarios to manage a high volume of data in a scalable way.<\/p>\n\n\n\n

In particular, we\u2019ll investigate AWS Glue connectors and Crawlers, AWS Athena, QuickSight, Kinesis Data Firehose, and finally a brief explanation on how to use of SageMaker to create forecasts starting from the collected data. To learn more about Sagemaker you can also take a look at our other articles<\/a>.<\/p>\n\n\n\n

Let\u2019s start!<\/p>\n\n\n\n

Our setup<\/h2>\n\n\n\n

In this example, we\u2019ll set up several temperature sensors to send temperature and diagnostic data to our pipeline and we\u2019ll perform different BI analyses to verify efficiency, and we\u2019ll use a Sagemaker model to check for anomalies.<\/p>\n\n\n\n

To keep things interesting we also want to grab historical data from two different locations: an S3 bucket and a Database residing on an EC2 instance in a different VPC from one of our ETL pipelines.<\/p>\n\n\n\n

We will use different ETL jobs to recover and extract cleaned data from row data and AWS Step Functions to orchestrate all the crawlers and jobs.<\/p>\n\n\n\n

Kinesis Data Firehose will continuously fetch sensors\u2019 data and with AWS Athena we will query information out of both aggregated and per-sensor data to show Graphical stats in Amazon Quicksight.<\/p>\n\n\n\n

Here is a simple schema illustrating the services involved and the complete flow.<\/p>\n\n\n\n

\"infrastructure
infrastructure diagram<\/em><\/figcaption><\/figure>\n\n\n\n

Kinesis Data Firehose <\/h2>\n\n\n\n

Kinesis Data Firehose can be used to obtain near real-time data from sensors leveraging IoT Core SDK to connect to the actual devices. As seen in this article<\/a>, we can create a \u201cThing\u201d, thus generating a topic<\/strong>. By connecting to that topic<\/strong>, several devices can collect their metrics through Firehose by sending messages using the MQTT protocol<\/a>, and, should you need it, IoT Core can also manage device authentication<\/strong>.<\/p>\n\n\n\n

To start sending sensors\u2019 data, we need to download the connection kit from the AWS IoT<\/a> page following in-page instructions.<\/p>\n\n\n\n

\"Select
Select OS and Language for downloading the connection kit<\/em><\/figcaption><\/figure>\n\n\n\n

Once downloaded, initialize a new Node.js project and install AWS-IoT-device-SDK<\/strong>. After that, it is possible to run the included start.sh<\/strong> script, making sure all the certificates, downloaded alongside the kit, are in the same directory. We can now create a local script to send data to a topic, passing the required modules and usingdevice.publish (“<topic>”, payload)<\/strong>:<\/p>\n\n\n\n

const deviceModule = require('aws-iot-device-sdk').device;\nconst cmdLineProcess = require('aws-iot-device-sdk\/examples\/lib\/cmdline');\n\u2026\ndevice.publish('topic', JSON.stringify(payload));\n<\/pre>\n\n\n\n

The data sent is structured in JSON format with the following structure:<\/p>\n\n\n\n

{\t\n   \u201ctimestamp\u201d: \u201cYYYY-MM-DD HH:MM:SS\u201d,\n   \u201croom_id\u201d: \u201cXXXX\u201d,\n   \u201ctemperature\u201d: 99\n}\n<\/pre>\n\n\n\n

To create a Firehose delivery stream go to the Kinesis firehose<\/strong> service dashboard in the AWS web console, click \u201cCreate delivery stream\u201d, select a name, and then \u201cDirect PUT or other sources\u201d like in figure:<\/p>\n\n\n\n

\"Delivery
Creating a new Firehose delivery stream<\/em><\/figcaption><\/figure>\n\n\n\n

Leave \u201cData transformation\u201d and \u201cRecord format conversion\u201d as default. Choose an S3 destination as the target. Remember to also define an IoT Rule<\/strong> to send IoT messages to a Firehose delivery stream.<\/p>\n\n\n\n

Glue crawlers and connectors<\/h2>\n\n\n\n

AWS Glue can be used to Extract and Transform data from a multitude of different data sources, thanks to the possibility of defining different types of connectors.<\/p>\n\n\n\n

Database on EC2 instance<\/strong><\/p>\n\n\n\n

We want to be able to generate a Glue Data Catalog from a Microsoft SQL Server DB residing on an EC2 Instance in another VPC. To do so we need to create a JDBC connection, which can be done easily by going to the AWS Glue service page and by adding a new connection, found under the \u201cData Catalog – Databases\u201d section of the sidebar menu.<\/p>\n\n\n\n

Just add a name to the connection (which will be used by the related Crawler Job), the JDBC URL, following the right convention for ORACLE DBs, username and password, and the required VPC and subnet.<\/p>\n\n\n\n

\"JDBC
JDBC connection parameters<\/em><\/figcaption><\/figure>\n\n\n\n

In order to establish a glue connection to the database, we need to create a new dedicated VPC that will be only used by Glue. The VPC is connected to the one containing the data-warehouse using VPC peering<\/a> but other options are also possible, for example, we could have used AWS Transit Gateway. Once the peering is established remember to add routes to both the Glue and the DB subnets so that they can exchange traffic and to open the DB security group to allow incoming traffic on the relevant port from the Glue security group in the new VPC.<\/p>\n\n\n\n

Data on S3<\/strong><\/p>\n\n\n\n

Data on S3 doesn\u2019t need a connector and can be set up directly from the AWS Glue console. Create a new crawler, selecting \u201cdata stores\u201d for the crawler source type<\/strong>; then check also \u201cCrawl all folder\u201d. After that is just a matter of setting the S3 bucket, the right IAM role and creating a new Glue Schema for this crawler. Also set \u201cRun on demand\u201d.<\/p>\n\n\n\n

Glue Jobs<\/strong><\/p>\n\n\n\n

Glue jobs are the steps of the ETL pipeline. They allow extracting, transforming, and saving data back to a datalake. In our example, we would like to show two different approaches: jobs managed by AWS Glue Studio<\/strong> and using custom code<\/strong>. Both jobs will be later called by AWS Step Function.<\/p>\n\n\n\n

For historical data on S3, we can define Jobs from Glue Studio. For S3 select the following options in order:<\/p>\n\n\n\n

    \n
  1. On the Manage Jobs<\/strong> page, choose the source and target added to the graph option. Then, choose S3 for the Source and S3 for the Target.<\/li>\n\n\n\n
  2. Click on the S3 Data source, select the source bucket.<\/li>\n\n\n\n
  3. On the Node Properties tab, enter a name. Choose the Data source properties \u2013 S3 tab in the node details panel. Select your schema from the list of available databases in the Glue Data Catalog. Choose the correct table from the Catalog.<\/li>\n\n\n\n
  4. Verify the mapping is correct.<\/li>\n\n\n\n
  5. On the Node S3 Data target, select the output bucket, CSV as format (parquet is better, but we need CSV for Random Cut forest), no compression.<\/li>\n<\/ol>\n\n\n\n
    \"Target
    Target node properties<\/em><\/figcaption><\/figure>\n\n\n\n

    In order to extract data from our EC2 instance instead, we need a custom job. To create it we need to write a script ourselves, don\u2019t worry, it\u2019s fairly easy! Here are the key points you need to know to create a Spark Job with Glue: the ETL process is composed of 6 distinct areas in the script:<\/p>\n\n\n\n

    Import libraries<\/strong><\/p>\n\n\n\n

    Basic set needed for a script:<\/p>\n\n\n\n

    import sys\nfrom awsglue.transforms import *\nfrom awsglue.utils import getResolvedOptions\nfrom pyspark.context import SparkContext\nfrom awsglue.context import GlueContext\nfrom awsglue.job import Job\nfrom awsglue.dynamicframe import DynamicFrame\n<\/pre>\n\n\n\n

    Prepare connectors and other variables<\/strong><\/p>\n\n\n\n

    To be used inside the script:<\/p>\n\n\n\n

    args = getResolvedOptions(sys.argv, ['JOB_NAME'])\nsc = SparkContext()\nglueContext = GlueContext(sc)\nspark = glueContext.spark_session\njob = Job(glueContext)\njob.init(args['JOB_NAME'], args)\n<\/pre>\n\n\n\n

    Get Dynamic Frames out of a Glue Catalog obtained by a Crawler<\/strong><\/p>\n\n\n\n

    Use these dynamic frames to perform queries and transform data<\/p>\n\n\n\n

    rooms_temperatures_df = glueContext.create_dynamic_frame.from_catalog(database = \"raw_temperatures\", table_name = \"temperatures\", transformation_ctx = \"temperature_transforms\").toDF()\nrooms_temperatures_df.createOrReplaceTempView(\"TEMPERATURES\")\n<\/pre>\n\n\n\n

    The last line enables modifying the dynamic frame.<\/p>\n\n\n\n

    Apply SQL operations<\/strong><\/p>\n\n\n\n

    To extract distinct information<\/p>\n\n\n\n

    result = glueContext.sql(\"\u201d)<\/query><\/pre>\n\n\n\n

    In our case, we needed to generate 3 distinct results, one for each room using a simple WHERE room_id = <value><\/strong><\/p>\n\n\n\n

    Apply mapping<\/strong><\/p>\n\n\n\n

    To generate a conversion schema<\/p>\n\n\n\n

    dynamicFrameResult = DynamicFrame.fromDF(result, glueContext, \"Result\")\napplymapping = ApplyMapping.apply(frame = dynamicFrameResult, mappings = [(\"temp\", \"bigint\", \"temp\",\"bigint\"), (\"room_id\", \"string\", \"room_id\",\"string\"), (\"timestamp\", \"string\", \"timestamp\",\"string\")])\n<\/pre>\n\n\n\n

    Save back to S3<\/strong><\/p>\n\n\n\n

    To manipulate data later on<\/p>\n\n\n\n

    to_be_written = glueContext.write_dynamic_frame.from_options(frame = applymapping, connection_type = \"s3\", connection_options = {\"path\": \"s3:\/\/\", \"partitionKeys\": [\"timestamp\"]}, format = \"csv\", transformation_ctx = \"to_be_written\")\njob.commit()\n<\/path><\/pre>\n\n\n\n

    Step Function<\/h2>\n\n\n\n

    Step function represents the core, the logic of our sample solution. Its main purpose is to manage all the ETL jobs, keep them synchronized, and manage errors. One advantage is that we can use Step Function to regulate the data being injected into the central S3 bucket which is where we save all cleaned data.<\/p>\n\n\n\n

    To start, this is the step function schema we used for this example:<\/p>\n\n\n\n

    \"Our
    Our example pipeline<\/em><\/figcaption><\/figure>\n\n\n\n

    In our example there are a couple of things we would like to share about Step Function; firstly we have 2 main crawler loops: the first one, has branches and runs 2 crawlers (one for S3 standard, and one for the EC2 database which is the custom one); the second one takes all the data retrieved from both historical data sources and live one (from Kinesis Firehose), and extract per-room datasets in order to use them with Amazon SageMaker.<\/p>\n\n\n\n

    As Crawlers are asynchronous we can\u2019t wait for them so we needed to create 2 waiting loops for both of the execution steps<\/p>\n\n\n\n

    AWS Lambda is used to call AWS Glue APIs in order to start the jobs we have configured before.<\/p>\n\n\n\n

    To give a hint here are some interesting parts described in the JSON file representing the state machine.<\/p>\n\n\n\n

    \"Type\": \"Parallel\",\n  \"Branches\": [\n        {\n          \"StartAt\": \"Import Raw from EXTERNAL_DB\",\n          \"States\": {\n            \"Import Raw from EXTERNAL_DB\": {\n              \"Type\": \"Task\",\n              \"Resource\": \"arn:aws:states:::glue:startJobRun.sync\",\n<\/pre>\n\n\n\n

    In AWS Step Function, we can launch tasks in parallel (for us, the two historical data glue jobs) using \u201cType: Parallel\u201d and \u201cBranches\u201d. Also after the key \u201cBranches\u201d, it is possible to retrieve the parallel result.<\/p>\n\n\n\n

    \"ResultPath\": \"$.ParallelExecutionOutput\",\n\"Next\": \"Start LAKE_DATA Crawler\"\n<\/pre>\n\n\n\n

    We can run a synchronous Glue job defined in the console by passing the job\u2019s name, and you can also enable the generation of a glue catalog during the process.<\/p>\n\n\n\n

    \"Parameters\": {\n                \"JobName\": \"EXTERNAL_DB_IMPORT_TO_RAW\",\n                \"Arguments\": {\n                  \"--enable-glue-datacatalog\": \"true\",\n<\/pre>\n\n\n\n

    It is possible to catch exceptions directly in Step Function by moving to an error state using \u201cCatch\u201d:<\/p>\n\n\n\n

    \"Catch\": [\n        {\n          \"ErrorEquals\": [\n            \"States.TaskFailed\"\n          ],\n          \"Next\": \"Data Pipeline Failure\"\n        }\n],\n<\/pre>\n\n\n\n

    Because we don\u2019t have a standard way to wait for the jobs to finish, we use the parallel jobs output and a StepFunctions wait cycle to check if the operation is done; for that, we use the \u201cWait\u201d key:<\/p>\n\n\n\n

    \"Wait for LAKE_DATA Crawler\": {\n      \"Type\": \"Wait\",\n      \"Seconds\": 5,\n      \"Next\": \"Check LAKE_DATA Crawler\"\n},\n<\/pre>\n\n\n\n

    The rest of the flow is pretty much a repetition of these components.<\/p>\n\n\n\n

    The interesting fact is that we can apply some starting conditions to alter the execution of the flow, like avoiding some jobs if not needed at the moment or even run another state machine from a precise step to take our example and modularize the most complicated parts.<\/p>\n\n\n\n

    Athena and Quicksight<\/h2>\n\n\n\n

    Athena can generate tables that can be queried using standard SQL language, not only: results of Athena queries can be imported into Amazon Quicksight to rapidly generate charts and reports, based on your data.<\/p>\n\n\n\n

    In our workflow, it is possible to run Athena queries on the target S3 bucket which contains both global temperature data and sensor\u2019s specific ones. Let\u2019s review quickly how to do that:<\/p>\n\n\n\n

      \n
    1. If you have already created a Glue Crawler, you\u2019ll have a Datasource and a table.<\/li>\n\n\n\n
    2. Select the database and table in Athena\u2019s dashboard in the left sidebar (we used temperatures_db and temperatures from our crawlers).<\/li>\n\n\n\n
    3. Create a simple query that can later be used by QuickSight to show a chart, for example, a simple \u201cSELECT * FROM temperatures\u201d. <\/li>\n<\/ol>\n\n\n\n

      By doing these 3 steps Athena will show the result of the query as shown below:<\/p>\n\n\n\n

      \"Athena
      Athena sample query<\/em><\/figcaption><\/figure>\n\n\n\n

      A couple of tips when working with Athena:<\/p>\n\n\n\n