{"id":2584,"date":"2021-02-04T12:49:38","date_gmt":"2021-02-04T11:49:38","guid":{"rendered":"https:\/\/blog.besharp.it\/?p=2584"},"modified":"2023-03-24T18:30:16","modified_gmt":"2023-03-24T17:30:16","slug":"iot-ingestion-and-ml-analytics-pipeline-with-aws-iot-kinesis-and-sagemaker","status":"publish","type":"post","link":"https:\/\/blog.besharp.it\/iot-ingestion-and-ml-analytics-pipeline-with-aws-iot-kinesis-and-sagemaker\/","title":{"rendered":"IoT ingestion and ML analytics pipeline with AWS IoT, Kinesis and SageMaker"},"content":{"rendered":"\n
Machine Learning is rapidly becoming part of our daily life: it lets software and devices manage routines without human intervention and moreover gives us the ability to automate, standardize, and simplify many daily tasks. One interesting topic is, for example, home automation, where it is now possible to have intelligent lights, smart heating, and autonomous robots that clean floors even in complex home landscapes filled with obstacles. <\/p>\n\n\n\n
Generally speaking, information retrievable from connected devices is nearly infinite. Cheap cost of data acquisition, and computational power to manage big data, made Machine Learning accessible to many use-cases. One of the most interesting is ingestion and real-time analysis of IoT connected devices.<\/p>\n\n\n\n
In this article, we would like to share a solution that takes advantage of AWS Managed Services to handle high volumes of data in real-time coming from one or more IoT connected devices. We\u2019ll show in detail how to set up a pipeline to give access to potential users to near real-time forecasting results based on the received IoT data. <\/p>\n\n\n\n
The solution will also explore some key concepts related to Machine Learning, ETL jobs, Data Cleaning, and data lake preparation.<\/p>\n\n\n\n
But before jumping into code and infrastructure design, a little recap on ML, IoT, and ETL is needed. Let\u2019s dive together into it!<\/p>\n\n\n\n
The Internet of things (IoT) is a common way to describe a set of interconnected physical devices \u2014 \u201cthings\u201d \u2014 fitted with sensors, that to exchange data to each other and over the Internet.<\/p>\n\n\n\n
IoT has evolved rapidly due to the decreasing cost of smart sensors, and to the convergence of multiple technologies like real-time analytics, machine learning, and embedded systems.<\/p>\n\n\n\n
Of course, traditional fields of embedded systems, wireless sensor networks, control systems, and automation, also contribute to the IoT world.<\/p>\n\n\n\n
ML was born as an evolution of Artificial Intelligence<\/strong> . Traditional ML required the programmers to write complex and difficult to maintain heuristics in order to carry out a traditionally human task (e.g. text recognition in images) using a computer.<\/p>\n\n\n\n With Machine Learning it is the system itself that learns relationships between data.<\/p>\n\n\n\n For example, in a chess game, there is no longer an algorithm that makes chess play, but by providing a dataset of features concerning chess games, the model learns to play by itself. <\/p>\n\n\n\n Machine Learning also makes sense in a distributed context<\/strong> where the prediction must scale<\/strong>.<\/p>\n\n\n\n In a Machine Learning pipeline, the data must be uniform, i.e. standardized. Differences in the data can result from heterogeneous sources, such as different DB table schema, or different data ingestion workflows . <\/p>\n\n\n\n Transformation (ETL: Extract, transform, load) of data is thus an essential step in all ML pipelines. Standardized data are not only essential in training the ML model but are also much easier to analyse and visualize in the preliminary data discovery<\/strong> step.<\/p>\n\n\n\n In general, for data cleaning and formatting, Scipy Pandas and similar libraries are usually used.<\/p>\n\n\n\n – NumPy<\/strong>: <\/em> – library for the management of multidimensional arrays, it is mainly used in the importing and reading phase of a dataset.<\/p>\n\n\n\n – Pandas<\/strong> Dataframe<\/strong>: – library for managing data in table format. It takes data points from CSV<\/strong>, JSON<\/strong>, Excel<\/strong>, and pickle<\/strong> files and transforms them into tables. <\/p>\n\n\n\n – SciKit-Learn<\/strong>: – library for final data manipulation and training. To achieve our result, we will make extensive use of what AWS gives us in terms of managed services. Here is a simple sketch, showing the main actors involved in our Machine Learning pipeline.<\/p>\n\n\n\n Let\u2019s see take a look at the purpose of each component before going into the details of each one of them.<\/p>\n\n\n\n The pipeline is organized into 5 main phases: ingestion<\/strong>, datalake preparation<\/strong>, transformation<\/strong>, training<\/strong>, inference<\/strong>.<\/p>\n\n\n\n The ingestion phase <\/strong>will receive data from our connected devices using AWS IoT Core<\/strong> to allow connecting them with AWS services without managing servers and communication complexities<\/a>. Data\u00a0 from the devices will be sent\u00a0 using the MQTT protocol<\/a> to minimize code footprint and network bandwidth. Should you need it AWS IoT Core can also manage device authentication<\/strong>.<\/p>\n\n\n\n To send information to our Amazon S3 data lake we will use Amazon Kinesis Data Firehose<\/a> which comes with a built-in action for reading AWS IoT Core messages. Finally, to train and then deploy our model for online inference we will show how to leverage built-in algorithms from Amazon SageMaker, in particular, DeepAR<\/strong>.<\/p>\n\n\n\n To connect our test device to AWS we used AWS IoT Core capabilities. In the following, we assume that the reader already has an AWS account ready. <\/p>\n\n\n\n Go to your account and then search for \u201cIoT Core\u201d then in the service page, in the sidebar menu, choose \u201cGet started\u201d and then select \u201cOnboard a device\u201d. <\/p>\n\n\n Follow the wizard to connect a device as we did. The purpose is to:<\/p>\n\n\n\n This is important because we also connect Amazon Kinesis Data Firehose to read the messages sent from AWS IoT Core. As a side note, remember that you need access to the device and that device must have a TCP connection to the public internet on port 8883.<\/p>\n\n\n\n Following the wizard, select Linux as the OS and an SDK (in our case Node.js):<\/p>\n\n\n After that, we gave a name to the new \u201cthing\u201d and got the connection kit which contains:<\/p>\n\n\n\n Once downloaded, initialize a new Node.js project and install AWS-IoT-device-SDK<\/strong>. This will install the required node modules; after that, it is possible to run the included start.sh<\/strong> script, by including all the certificates downloaded with the kit in the same project directory. We required the Node.js modules necessary to connect the device to AWS and to publish to a relevant topic. You can read data from your sensor in any way you want, for example, if the device can write sensor data in a specific location, just read and stringify that data using device.publish(‘<YOUR_TOPIC>’, JSON.stringify(payload))<\/strong>.<\/p>\n\n\n\n The last part of the code just calls the main function to start sending information to the console.<\/p>\n\n\n\n To run the script use the start.sh script included in the development kit, be sure to point to your code and not the example one from AWS<\/strong>. Leave the certificates and client ID as they are because they were generated from your previous setup.<\/p>\n\n\n\n Note: for the sake of this article the device code is oversimplified, don\u2019t use it like this in production environments.<\/em><\/p>\n\n\n\n To test that everything is working as intended, access the AWS IoT console, go to the Test<\/strong> section in the left sidebar and when asked, type the name of your topic and click \u201cSubscribe to topic\u201d. If everything is set up correctly you should see something like the screenshot below:<\/p>\n\n\n\n Keeping the data lake up to date populate the data lake with the data sent by the device, is extremely important to avoid a problem called Concept Drift which happens when there is a gradual misalignment of the deployed model with respect to the world of real data<\/strong>; this happens because the historical data can no longer represent the problem that has evolved.\u00a0<\/p>\n\n\n\n To overcome this problem we must ensure efficient logging and the means to understand when to intervene on the model e.g. by retraining or upgrading the version and redeploy the updated version. To help with this kind of \u201cproblem\u201d we define Amazon Kinesis Data Firehose action, specifically to automatically register and transport every MQTT message delivered from the device, directly to Amazon S3 to provide our data lake always with fresh data.<\/p>\n\n\n\n To create a Firehose stream search for \u201cKinesis firehose\u201d in the service search bar, select it, then \u201cCreate delivery stream\u201d like in the figure:<\/p>\n\n\n\n To use the stream we must connect it with AWS IoT by the means of an IoT Rule<\/strong>; this rule will allow Amazon Kinesis Data Firehose to receive messages and write them to an Amazon S3 bucket. To configure AWS IoT to send to Firehose we followed these steps:.<\/p>\n\n\n\n This is an example of how such a rule will then be created:<\/p>\n\n\n\n If everything is ok, you\u2019ll start seeing data showing in your bucket soon, like in the example below:<\/p>\n\n\n\n Amazon Simple Storage Service is an object storage service ideal for building a datalake<\/a>. With almost unlimited scalability, an Amazon S3 datalake provides many benefits when developing analytics for Big Data. <\/p>\n\n\n\n The centralized data architecture of Amazon S3 makes it simple to build a multi-tenant environment where multiple users can bring their own Big Data analytics tool to analyze a common set of data.\u00a0<\/p>\n\n\n\n Moreover, Amazon S3 integrates seamlessly with other Amazon Web Services such as Amazon Athena, Amazon Redshift, and like in the case presented, AWS Glue.\u00a0<\/p>\n\n\n\n Amazon S3 also enables storage to be decoupled from compute and data processing to optimize costs and data processing workflows, as well as to keep the solution dry, scalable, and maintainable.<\/p>\n\n\n\n Additionally, Amazon S3 allows you to store any type of structured, semi-structured, or even unstructured data in its native format. In our case, we are simply interested in saving mocked data from a test device to make some simple forecasting predictions.\u00a0<\/p>\n\n\n\n Even if the data is saved on Amazon S3 on a near-real time basis, it is still not enough to allow us\u00a0 to train an Amazon SageMaker model. As we explained in the introduction in fact the data must be prepared and when dealing with predefined Amazon SageMaker algorithms<\/strong> some defaults must be kept in mind.<\/p>\n\n\n\n For example, Amazon SageMaker doesn\u2019t accept headers, and in case we want to define a supervised training<\/strong>, we also need to put the ground truth as the first column of the dataset.<\/p>\n\n\n\n In this simple example we used AWS Glue studio to transform the raw data in the input Amazon S3 bucket to structured parquet files to be saved in a dedicated output Bucket. The output bucket will be used by Amazon Sagemaker as the data source.<\/p>\n\n\n\n At first, we need a Crawler to read from the source bucket\u00a0to generate an AWS Glue Schema. To create it go to the AWS Glue page, select Crawlers under \u201cGlue console\u201d, add a new crawler, by simply giving a name, selecting the source Amazon S3 bucket and the root folder created by Amazon Kinesis Data Firehose. A new schema will be created from this information. Leave all other options as default.<\/p>\n\n\n Activate the Crawler once created, by clicking on \u201cRun crawler\u201d.<\/p>\n\n\n The next step is to set up an AWS Glue Studio job using the Catalog as the data source..<\/p>\n\n\n\n An AWS Glue Studio job consists of at least 3 main nodes, which are source<\/strong>, transform<\/strong>, and target<\/strong>. We need to configure all three nodes to define a crawler<\/strong> capable of reading and transforming data on the fly.<\/p>\n\n\n\n To do so, here are the step we followed:<\/p>\n\n\n\n Now you\u2019ll see a\u00a0 three-node graph displayed that represents the steps involved in the ETL process. When AWS Glue is instructed to read from an Amazon S3 data source, it will also create an internal schema, called Glue Data Catalog<\/strong>.<\/p>\n\n\n To configure the source node, click on it in the graph:<\/p>\n\n\n\n The same can be done for the transform node: by clicking on it is possible to define what kind of transformation we want to apply to input data. Here you can also verify that the JSON data is imported correctly:<\/p>\n\n\n Finally, we can select the target node, specifying, again Amazon S3 as a target, and using .parquet as the output format.<\/p>\n\n\n Now we need to set the ETL job parameters for the workflow just created. Go to the \u201cJob details\u201d tab on the right, give a name, and select a role capable of managing data and deploying again on Amazon S3.\u00a0<\/p>\n\n\n\n Leave the rest as default. <\/p>\n\n\n\n Note that you must have this snippet on the \u201cTrust Relationship\u201d tab of the role to let it assume be assumed by AWS Glue:<\/p>\n\n\n\n If everything is defined correctly, the job will start and begin converting your data in parquet format. The files will be put in your out directory in the bucket of your choice.<\/p>\n\n\n <\/p>\n\n\n\n We chose to use parquet instead of the CSV data format for the target dataset. Parquet is a highly compressed columnar format, which uses the record shredding and assembly algorithm, vastly superior to the simple flattening of nested namespaces. It has the following advantages:<\/p>\n\n\n\n Also compared to file stored in .csv format we have these advantages in terms of cost savings:<\/p>\n\n\n\n Amazon SageMaker offers 17 prebuilt algorithms out-of-the-box that cover a plethora of topics related to Machine Learning problems. In our case, we wanted to simplify the development of a forecasting model for the data retrieved from our device, so instead of showing how to bring your own algorithm, like in our previous article, this time we\u2019ll be using a pre-cooked one.<\/p>\n\n\n\n As explained before, apart from cleaning data, our ETL process was done to transform the data to be compatible with ready-made Amazon SageMaker algorithms.<\/p>\n\n\n\n Amazon SageMaker API and Sklearn Library offer methods to retrieve the data, call the training method, save the model, and deploy it to production for online or batch inference.<\/p>\n\n\n\n Start by going to the Amazon SageMaker page and creating a new notebook instance, for this article we chose an ml.t3.medium<\/strong>. Add a name and create a new IAM role<\/strong>.<\/p>\n\n\n\n Leave the rest as default and click \u201cCreate notebook\u201d.<\/p>\n\n\n\n Access it by either Jupiter or Jupiter Lab, we chose the second. We managed to put up a simple notebook illustrating all the steps involved in using the pre-backed DeepAR algorithm by Amazon Sagemaker.\u00a0<\/p>\n\n\n\n Note: the code is made solely for this article and is not meant for production as there is no preliminary investigation on data and no validation of the results. Still, all the code presented is tested and usable for use cases similar to the one presented.<\/em><\/p>\n\n\n\n We start by importing all the necessary libraries:<\/p>\n\n\n\n We also set the seed for our random methods to ensure reproducibility. After that, we need to recover our parquet<\/strong> files from Amazon S3<\/strong> and obtain a Pandas Dataframe from them.<\/p>\n\n\n\n At first, we prepare all the Amazon S3 \u201cpaths\u201d that will be used in the notebook, and we generate an Amazon SageMaker Session<\/strong> and a valid IAM<\/strong> Role<\/strong> with get_execution_role()<\/strong>. As you can see Amazon SageMaker takes care of all these aspects for us.<\/p>\n\n\n\n In the step above we recover our forecasting Estimator, DeepAR. <\/strong>An estimator is a class in Amazon SageMaker capable of generating, and testing a model which will then be saved on Amazon S3.<\/p>\n\n\n\n Before starting to read the parquet files we also add a couple of constants to our experiment:<\/p>\n\n\n\n With freq <\/strong>(frequency) we say that we want to analyze the TimeSeries by hourly metrics. Prediction and Context length are set to 1 day and they are respectively how many hours we want to predict in the future and how many in the past we\u2019ll use for the prediction. Usually, these values are defined in terms of days as the dataset is much bigger.<\/p>\n\n\n\n We created two helper methods to read from parquet files:<\/p>\n\n\n\n Then we actually read the datasets:<\/p>\n\n\n\n Here we manipulate the dataset to make it usable with DeepAR which has its proprietary input format. We use df.iloc[:, :8]<\/mark> to keep only the original columns without the ones produced by AWS Glue Schema. We generate a new hour<\/strong> column to speed things up, finally, we split the dataset in 80\/20 proportion for training and testing.<\/p>\n\n\n\n We then write back data to Amazon S3 temporarily as required by DeepAR, by building JSON files with series in them.<\/p>\n\n\n\n The generated JSON documents are structured in a format like this:<\/p>\n\n\n\n After that, we can write our JSON files to Amazon S3.<\/p>\n\n\n\n We use sagemaker_session.upload_data()<\/strong> for that, passing the output location. Now we can finally define the estimator:<\/p>\n\n\n\n We\u2019ll pass the Amazon SageMaker session, the algorithm image, the instance type, and the model output path to the estimator. We also need to configure some Hyperparameters:<\/p>\n\n\n\n These values are taken directly from the official AWS examples on DeepAR. We also need to pass the two channels, training, and test, to the estimator to start the fitting process<\/strong>.<\/p>\n\n\n\n After training and testing a model, we can deploy it using a Real-time Predictor<\/strong>.<\/p>\n\n\n\n The predictor generates an endpoint that is visible from the AWS console.<\/p>\n\n\n\n The endpoint can be called by any REST enabled application passing a request with a format like the one below:<\/p>\n\n\n\n The \u201ctargets\u201d are some sample values starting from the period set in \u201cstart\u201d by which we want to generate the prediction.<\/p>\n\n\n\n Finally, if we don\u2019t need the endpoint anymore, we can delete it with:<\/p>\n\n\n\n Real-time inference refers to the prediction given in real-time by some models. This is the typical use case of many recommendation systems or generally when the prediction concerns a single-use. It is used when:<\/p>\n\n\n\n It's typically a bit more complex to manage compared to what we have done in the notebook and is typically defined in a separate pipeline, due to its nature of high availability and fast response time.<\/p>\n\n\n\n When deploying using Amazon SageMaker API it is possible to create a deploy process that is very similar to how a web application is deployed or upgraded, taking into account things like traffic rerouting and deploy techniques like Blue\/Green or Canary. We want to share with you a summary guide for both methods so that you can try them by yourself!<\/p>\n\n\n\n Note: through the production variants we can implement different Deploy strategies such as A\/B and BLUE\/GREEN.<\/em><\/p>\n\n\n\n Deploy Blue \/ Green<\/strong><\/p>\n\n\n\n Deploy A \/ B<\/strong><\/p>\n\n\n\n To be used if we want to measure the performance of different models with respect to a high-level metric.<\/p>\n\n\n\n In the end, exclude one or more models.<\/p>\n\n\n\n Note: the multi-model for endpoint property allows for managing multiple models at the same time, the machine memory is managed automatically based on the traffic. This approach can save money through the optimized use of resources.<\/em><\/p>\n\n\n\nData Transformation<\/h3>\n\n\n\n
Cleaning and formatting the data is essential to ensure the best chance for the model to converge well <\/strong>to the desired solution.<\/p>\n\n\n\nThe Pipeline<\/h2>\n\n\n\n
<\/figure>\n\n\n\n
To transform data and make it available for Amazon SageMaker we will use AWS Glue<\/a>: a serverless data integration service that makes it easy to find, prepare and combine data for analytics, machine learning, and application development. AWS Glue provides all the capabilities needed for data integration, to start analyzing and using data in minutes rather than months.<\/p>\n\n\n\nIngestion: AWS IoT Core to Amazon Kinesis Data Firehose<\/h2>\n\n\n\n
AWS IoT Core<\/h3>\n\n\n\n
\n
<\/figure><\/div>\n\n\n
\n
We developed our example using device-example.js<\/strong> as a simple base to understand how to connect a device to AWS IoT. <\/p>\n\n\n\nconst deviceModule = require('aws-iot-device-sdk').device;\nconst cmdLineProcess = require('aws-iot-device-sdk\/examples\/lib\/cmdline');\nprocessPollutionData = (args) => {\n \/\/ Device properties which are needed\n const device = deviceModule({\n keyPath: args.privateKey,\n certPath: args.clientCert,\n caPath: args.caCert,\n clientId: args.clientId,\n region: args.region,\n baseReconnectTimeMs: args.baseReconnectTimeMs,\n keepalive: args.keepAlive,\n protocol: args.Protocol,\n port: args.Port,\n host: args.Host,\n debug: args.Debug\n });\n const minimumDelay = 250; \/\/ ms\n const interval = Math.max(args.delay, minimumDelay);\n \/\/ Send device information\n setInterval(function() {\n \/\/ Prepare Data to be sent by the device\n const payload = {\n ozone: Math.round(Math.random() * 100),\n particullate_matter: Math.round(Math.random() * 100),\n carbon_monoxide: Math.round(Math.random() * 100),\n sulfure_dioxide: Math.round(Math.random() * 100),\n nitrogen_dioxide: Math.round(Math.random() * 100),\n longitude: 10.250786139881143,\n latitude: 56.20251117218925,\n timestamp: new Date()\n };\n device.publish('
<\/figure>\n\n\n\n
Now we need to connect Amazon Kinesis Data Firehose to start sending data to Amazon S3.<\/p>\n\n\n\nAmazon Kinesis Data Firehose<\/h3>\n\n\n\n
Create the Firehose stream<\/h3>\n\n\n\n
<\/figure>\n\n\n\n
Select a valid name, under \u201cDelivery stream name\u201d, \u201cDirect PUT or other sources\u201d under \u201cSources\u201d, and then, on the next page, leave everything as it is (we will convert data in Amazon S3 later), finally in the last page select Amazon S3<\/strong> as a destination and eventually add a prefix to the data inserted in the bucket. Click \u201cNext\u201d and create the stream.<\/p>\n\n\n\nCreate the IoT Rule<\/h3>\n\n\n\n
\n
<\/figure>\n\n\n\n
\n
{\n \"topicRulePayload\": {\n \"sql\": \"SELECT * FROM '
<\/figure>\n\n\n\n
And opening one of these will show the data generated from our device!<\/p>\n\n\n\n<\/figure>\n\n\n\n
Datalake: Amazon S3<\/h2>\n\n\n\nETL process: AWS Glue<\/h2>\n\n\n\n
AWS Glue Crawler<\/h3>\n\n\n\n
ETL job<\/h3>\n\n\n\n
\n
<\/figure>\n\n\n\n
\n
\n
{ \n \"Version\": \"2012-10-17\", \n \"Statement\": [ \n { \n \"Effect\": \"Allow\", \n \"Principal\": { \"Service\": \"glue.amazonaws.com\" }, \n \"Action\": \"sts:AssumeRole\" \n } \n ]\n}\n<\/pre>\n\n\n\n
Dataset optimization: why parquet over CSV<\/h2>\n\n\n\n
\n
\n
\n
\n
\n
\n
<\/li>\n<\/ul>\n\n\n\nThe Machine Learning step: Forecasting with Amazon SageMaker<\/h2>\n\n\n\n
import time\nimport io\nimport math\nimport random\nimport numpy as np\nimport pandas as pd\nimport JSON\nimport matplotlib.pyplot as plt\nimport boto3\nimport sagemaker\nfrom sagemaker import get_execution_role\n\n# set random seeds for reproducibility\nnp.random.seed(42)\nrandom.seed(42)\n<\/pre>\n\n\n\n
bucket = \"
from sagemaker.amazon.amazon_estimator import get_image_uri\nimage_uri = get_image_uri(boto3.Session().region_name, \"forecasting-deepar\")\n<\/pre>\n\n\n\n
freq = \"H\"\nprediction_length = 24\ncontext_length = 24 # usually prediction and context are set equal or similar\n<\/pre>\n\n\n\n
# Read single parquet file from S3\ndef pd_read_s3_parquet(key, bucket, s3_client=None, **args):\n if not s3_client:\n s3_client = boto3.client('s3')\n obj = s3_client.get_object(Bucket=bucket, Key=key)\n return pd.read_parquet(io.BytesIO(obj['Body'].read()), **args)\n\n# Read multiple parquets from a folder on S3 generated by spark\ndef pd_read_s3_multiple_parquets(filepath, bucket, **args):\n if not filepath.endswith('\/'):\n filepath = filepath + '\/' # Add '\/' to the end\n \n s3_client = boto3.client('s3') \n s3 = boto3.resource('s3')\n s3_keys = [item.key for item in s3.Bucket(bucket).objects.filter(Prefix=filepath)\n if item.key.endswith('.parquet')]\n if not s3_keys:\n print('No parquet found in', bucket, filepath)\n \n dfs = [pd_read_s3_parquet(key, bucket=bucket, s3_client=s3_client, **args) \n for key in s3_keys]\n return pd.concat(dfs, ignore_index=True)\n<\/pre>\n\n\n\n
# get all retrieved parquet in a single dataframe with helpers functions\ndf = pd_read_s3_multiple_parquets(data, bucket)\ndf = df.iloc[:, :8] # get only relevant columns\ndf['hour'] = pd.to_datetime(df['timestamp']).dt.hour #add hour column for the timeseries format\n# split in test and training\nmsk = np.random.rand(len(df)) < 0.8 # 80% mask\n# Dividing in test and training\ntraining_df = df[msk]\ntest_df = df[~msk]\n<\/pre>\n\n\n\n
# We need to resave our data in JSON because this is how DeepAR works\n# Note: we know this is redundant but is for the article to show how many ways \n# there are to transform dataset back and forth from when data is acquired\n\ntrain_key = 'deepar_training.json'\ntest_key = 'deepar_test.json'\n\n# Write data in DeepAR format\ndef writeDataset(filename, data): \n file=open(filename,'w')\n previous_hour = -1\n for hour in data['hour']:\n if not math.isnan(hour):\n if hour != previous_hour:\n previous_hour = hour\n # One JSON sample per line\n line = f\"\\\"start\\\":\\\"2021-02-05 {int(hour)}:00:00\\\",\\\"target\\\":{data[data['hour'] == hour]['ozone'].values.tolist()}\"\n file.write('{'+line+'}\\n')\n<\/pre>\n\n\n\n
{\"start\":\"2021-02-05 13:00:00\",\"target\":[69.0, 56.0, 2.0, \u2026]}\n<\/pre>\n\n\n\n
writeDataset(train_key, training_df) \nwriteDataset(test_key, test_df)\n\ntrain_prefix = 'model\/train'\ntest_prefix = 'model\/test'\n\ntrain_path = sagemaker_session.upload_data(train_key, bucket=bucket, key_prefix=train_prefix)\ntest_path = sagemaker_session.upload_data(test_key, bucket=bucket, key_prefix=test_prefix)\n<\/pre>\n\n\n\n
estimator = sagemaker.estimator.Estimator(\n sagemaker_session=sagemaker_session,\n image_uri=image_uri,\n role=role,\n instance_count=1,\n instance_type=\"ml.c4.xlarge\",\n base_job_name=\"pollution-deepar\",\n output_path=f\"s3:\/\/{s3_output_path}\",\n)\n<\/pre>\n\n\n\n
hyperparameters = {\n \"time_freq\": freq,\n \"context_length\": str(context_length),\n \"prediction_length\": str(prediction_length),\n \"num_cells\": \"40\",\n \"num_layers\": \"3\",\n \"likelihood\": \"gaussian\",\n \"epochs\": \"20\",\n \"mini_batch_size\": \"32\",\n \"learning_rate\": \"0.001\",\n \"dropout_rate\": \"0.05\",\n \"early_stopping_patience\": \"10\",\n}\n\nestimator.set_hyperparameters(**hyperparameters)\n<\/pre>\n\n\n\n
data_channels = {\"train\": train_path, \"test\": test_path}\nestimator.fit(inputs=data_channels)\n<\/pre>\n\n\n\n
# Deploy for real time prediction\njob_name = estimator.latest_training_job.name\n\nendpoint_name = sagemaker_session.endpoint_from_job(\n job_name=job_name,\n initial_instance_count=1,\n instance_type='ml.m4.xlarge',\n role=role\n)\n\npredictor = sagemaker.predictor.RealTimePredictor(\n endpoint_name, \n sagemaker_session=sagemaker_session, \n content_type=\"application\/json\")\n<\/pre>\n\n\n\n
<\/figure>\n\n\n\n
{\n \"instances\": [ \n {\n \"start\": \"2021-02-05 00:00:00\",\n \"target\": [88.3, 85.4, ...]\n }\n ],\n \"configuration\": {\n \"output_types\": [\"mean\", \"quantiles\", \"samples\"],\n \"quantiles\": [\"0.1\", \"0.9\"], \n \"num_samples\": 100\n }\n}\n<\/pre>\n\n\n\n
sagemaker_session.delete_endpoint(endpoint_name)<\/pre>\n\n\n\n
Real-Time Inference: from concept to production<\/h2>\n\n\n\n
\n
How to deploy<\/h3>\n\n\n\n
\n
\n\n
\n
\n
\n
References<\/h2>\n\n\n\n
\n