{"id":2625,"date":"2021-02-19T11:05:48","date_gmt":"2021-02-19T10:05:48","guid":{"rendered":"https:\/\/blog.besharp.it\/?p=2625"},"modified":"2021-03-17T15:34:25","modified_gmt":"2021-03-17T14:34:25","slug":"come-orchestrare-una-pipeline-di-data-analytics-e-business-intelligence-via-step-function","status":"publish","type":"post","link":"https:\/\/blog.besharp.it\/it\/come-orchestrare-una-pipeline-di-data-analytics-e-business-intelligence-via-step-function\/","title":{"rendered":"Come orchestrare una pipeline di Data Analytics e Business Intelligence via Step Function"},"content":{"rendered":"\n

Le pipeline di ETL su AWS di solito hanno un comportamento lineare: si inizia da un servizio e si termina con un altro. Questa volta, tuttavia, vorremmo presentare una configurazione pi\u00f9 flessibile, in cui alcuni job ETL potrebbero essere saltati a seconda dei dati. Inoltre, alcuni dei dati trasformati nel nostro datalake verranno interrogati da AWS Athena per generare dashboard di BI in QuickSight, mentre altre partizioni di dati verranno utilizzate per addestrare un rilevamento di anomalie ad-hoc tramite Sagemaker.<\/p>\n\n\n\n

Un potente strumento per orchestrare questo tipo di pipeline ETL \u00e8 il servizio AWS StepFunctions.<\/p>\n\n\n\n

In questo articolo, vogliamo mostrarti alcuni dei passaggi coinvolti nella creazione della pipeline citata e quali servizi AWS per l’analisi dei dati si possano utilizzare in scenari quasi in tempo reale per gestire un volume elevato di dati in modo scalabile.<\/p>\n\n\n\n

In particolare, esamineremo i connettori e i crawler di AWS Glue, AWS Athena, QuickSight, Kinesis Data Firehose e infine una breve spiegazione su come utilizzare SageMaker per creare previsioni a partire dai dati raccolti. Per saperne di pi\u00f9 su Sagemaker puoi anche dare un’occhiata ai nostri altri articoli<\/a>.<\/p>\n\n\n\n

Iniziamo!<\/p>\n\n\n\n

Il nostro setup<\/h2>\n\n\n\n

In questo esempio, configureremo diversi sensori per inviare dati di temperatura e diagnostici alla nostra pipeline ed eseguiremo diverse analisi BI, per verificarne l’efficienza; useremo infine un modello di Sagemaker per ricercare la presenza di anomalie.<\/p>\n\n\n\n

Per mantenere le cose interessanti, vogliamo anche acquisire i dati storici da due posizioni diverse: un bucket S3 e un database che risiede su un’istanza EC2 in una VPC diversa da quella della nostra pipeline ETL.<\/p>\n\n\n\n

Useremo diversi job ETL per recuperare ed estrarre i dati puliti dalle tuple a disposizione e AWS Step Functions per orchestrare tutti i crawler e i job.<\/p>\n\n\n\n

Kinesis Data Firehose recuperer\u00e0 continuamente i dati dei sensori e con AWS Athena interrogheremo le informazioni, dai dati aggregati e per sensore, per mostrare le statistiche grafiche in Amazon Quicksight.<\/p>\n\n\n\n

Ecco un semplice schema che illustra i servizi coinvolti e il flusso completo.<\/p>\n\n\n\n

\"La
La nostra infrastruttura<\/em><\/figcaption><\/figure>\n\n\n\n

Kinesis Data Firehose <\/h2>\n\n\n\n

Kinesis Data Firehose pu\u00f2 essere utilizzato per ottenere dati quasi in tempo reale dai sensori, che sfruttano IoT Core SDK per connettersi ai dispositivi effettivi. Come visto in questo articolo<\/a>, possiamo creare una “Cosa”, generando cos\u00ec un topic<\/strong>. Collegandosi a tale topic<\/strong>, diversi dispositivi possono raccogliere le proprie metriche tramite Firehose inviando messaggi utilizzando il protocollo MQTT<\/a> e, se necessario, IoT Core pu\u00f2 anche gestire l’autenticazione<\/strong> del dispositivo.<\/p>\n\n\n\n

Per iniziare a inviare i dati dei sensori, dobbiamo scaricare il kit di connessione dalla pagina AWS IoT<\/a> seguendo le istruzioni presentate.<\/p>\n\n\n\n

\"Selezione
Seleziona OS e linguaggio di programmazione per scaricare il connection kit<\/em>
<\/figcaption><\/figure>\n\n\n\n

Una volta scaricato, inizializziamo un nuovo progetto Node.js e installiamo AWS-IoT-device-SDK<\/strong>. Dopodich\u00e9, \u00e8 possibile eseguire lo script start.sh<\/strong> incluso, assicurandosi che tutti i certificati, scaricati insieme al kit, siano nella stessa directory. Ora possiamo creare uno script locale per inviare dati a un topic, passando i moduli richiesti e utilizzando device.publish (“<topic>”, payload)<\/strong>:<\/p>\n\n\n\n

const deviceModule = require('aws-iot-device-sdk').device;\nconst cmdLineProcess = require('aws-iot-device-sdk\/examples\/lib\/cmdline');\n\u2026\ndevice.publish('topic', JSON.stringify(payload));\n<\/pre>\n\n\n\n

I dati inviati sono strutturati in formato JSON con la seguente struttura:<\/p>\n\n\n\n

{\t\n   \u201ctimestamp\u201d: \u201cYYYY-MM-DD HH:MM:SS\u201d,\n   \u201croom_id\u201d: \u201cXXXX\u201d,\n   \u201ctemperature\u201d: 99\n}\n<\/pre>\n\n\n\n

Per creare un flusso di consegna di Firehose, andiamo alla dashboard del servizio Kinesis Firehose<\/strong> nella console Web di AWS, facciamo clic su “Crea flusso di consegna”, selezioniamo un nome, quindi “Direct PUT or other sources” come in figura:<\/p>\n\n\n\n

\"Delivery
Creare una nuova delivery stream di Firehose<\/em><\/figcaption><\/figure>\n\n\n\n

Lasciamo \u201cData transformation\u201d e \u201cRecord format conversion\u201d come di default. Selezioniamo una destinazione di S3 come target. Ricordiamoci di definire anche una IoT Rule<\/strong> per inviare i messaggi IoT a Firehose mediante delivery stream.<\/p>\n\n\n\n

Glue crawlers e connettori<\/h2>\n\n\n\n

AWS Glue pu\u00f2 essere utilizzato per estrarre e trasformare dati da una moltitudine di origini dati diverse, grazie alla possibilit\u00e0 di definire diversi tipi di connettori.<\/p>\n\n\n\n

Database su istanza EC2<\/strong><\/p>\n\n\n\n

Vogliamo essere in grado di generare un Glue Data Catalog da un database Microsoft SQL Server, che risiede su un’istanza EC2 in un\u2019 altra VPC. Per fare ci\u00f2, dobbiamo creare una connessione JDBC, che pu\u00f2 essere eseguita facilmente accedendo alla pagina del servizio AWS Glue e aggiungendo una nuova connessione; questa si trova nella sezione “Catalogo dati – Database” del menu della barra laterale.<\/p>\n\n\n\n

Basta aggiungere un nome alla connessione (che verr\u00e0 utilizzata dal relativo Crawler Job), l’URL JDBC, seguendo la giusta convenzione per ORACLE DB, nome utente e password, VPC e sottorete richiesti.<\/p>\n\n\n\n

\"JDBC
JDBC – parametri di connessione<\/em><\/figcaption><\/figure>\n\n\n\n

Per stabilire una connessione glue al database, dobbiamo creare una nuova VPC dedicata che verr\u00e0 utilizzata solo da Glue. La VPC \u00e8 connessa a quella che contiene il data-warehouse tramite peering VPC<\/a>, ma sono possibili anche altre opzioni, ad esempio avremmo potuto utilizzare AWS Transit Gateway. Una volta stabilito il peering, dobbiamo ricordarci di aggiungere le rotte corrette, sia alla sottorete Glue che a quella del DB, in modo che le VPC possano scambiare traffico e di aprire il security group del DB, per consentire il traffico in entrata sulla porta pertinente, al security group di Glue nella nuova VPC.<\/p>\n\n\n\n

Dati su S3<\/strong><\/p>\n\n\n\n

I dati su S3 non richiedono un connettore e possono essere configurati direttamente dalla console di AWS Glue. Creiamo un nuovo crawler, selezionando “data stores” per il tipo di origine del crawler<\/strong>; quindi selezioniamo anche “Crawl all folder”. Dopodich\u00e9 \u00e8 solo questione di impostare il bucket S3, il ruolo IAM corretto e creare un nuovo Schema di GLue per questo crawler. Impostare anche “Run on demand”.<\/p>\n\n\n\n

Glue Job<\/strong><\/p>\n\n\n\n

I Glue Jobs sono i passaggi della pipeline ETL. Consentono di estrarre, trasformare e salvare i dati in un datalake. Nel nostro esempio, vorremmo mostrare due diversi approcci: job gestiti da AWS Glue Studio<\/strong> e mediante l\u2019utilizzo di codice personalizzato<\/strong>. Entrambi i job verranno successivamente richiamati da AWS Step Function.<\/p>\n\n\n\n

Per i dati storici su S3, possiamo definire i job da Glue Studio. Per S3 selezionare le seguenti opzioni nell\u2019ordine:<\/p>\n\n\n\n

  1. Nella pagina Manage Jobs<\/strong>, selezioniamo sorgente e destinazione da aggiungere alle opzioni del nodo. Quindi, scegliamo S3 come Source e comeTarget.<\/li>
  2. Clicchiamo su \u201cS3 Data source\u201d, quindi selezioniamo il bucket di origine.<\/li>
  3. Nella tab \u201cNode Properties\u201d, inseriamo un nome. Clicchiamo poi sulla tab \u201cData source properties \u2013 S3\u201d nel pannello dei dettagli del nodo. Selezioniamo il nostro schema dalla lista di database nel Glue Data Catalog. Selezioniamo quindi la tabella corretta dal Catalogo.<\/li>
  4. Verifichiamo che il mapping sia corretto.<\/li>
  5. Nel Nodo \u201cS3 Data target\u201d, selezioniamo il bucket di destinazione, CSV come formato (parquet \u00e8 meglio, ma abbiamo bisogno di CSV per il Random Cut forest), nessuna compressione.<\/li><\/ol>\n\n\n\n
    \"Propriet\u00e0
    Propriet\u00e0 del nodo di destinazione<\/em><\/figcaption><\/figure>\n\n\n\n

    Per estrarre i dati dalla nostra istanza EC2, invece, abbiamo bisogno di un custom job. Per crearlo, dobbiamo scrivere noi stessi uno script, ma non preoccupatevi: \u00e8 piuttosto semplice! Ecco i punti chiave che si devono conoscere per creare uno Spark Job con Glue: il processo ETL \u00e8 composto da 6 aree distinte nello script:<\/p>\n\n\n\n

    Import delle librerie<\/strong><\/p>\n\n\n\n

    Set base necessario al funzionamento dello script:<\/p>\n\n\n\n

    import sys\nfrom awsglue.transforms import *\nfrom awsglue.utils import getResolvedOptions\nfrom pyspark.context import SparkContext\nfrom awsglue.context import GlueContext\nfrom awsglue.job import Job\nfrom awsglue.dynamicframe import DynamicFrame\n<\/pre>\n\n\n\n

    Prepariamo i connettori e altre variabili<\/strong><\/p>\n\n\n\n

    Da usare all\u2019interno dello script:<\/p>\n\n\n\n

    args = getResolvedOptions(sys.argv, ['JOB_NAME'])\nsc = SparkContext()\nglueContext = GlueContext(sc)\nspark = glueContext.spark_session\njob = Job(glueContext)\njob.init(args['JOB_NAME'], args)\n<\/pre>\n\n\n\n

    Recuperate i Dynamic Frame dal Glue Catalog ottenuto tramite un Crawler<\/strong><\/p>\n\n\n\n

    I dynamic frame vengono utilizzati per fare query e trasformare i dati<\/p>\n\n\n\n

    rooms_temperatures_df = glueContext.create_dynamic_frame.from_catalog(database = \"raw_temperatures\", table_name = \"temperatures\", transformation_ctx = \"temperature_transforms\").toDF()\nrooms_temperatures_df.createOrReplaceTempView(\"TEMPERATURES\")\n<\/pre>\n\n\n\n

    L\u2019ultima linea di codice permette di modificare un dynamic frame.<\/p>\n\n\n\n

    Applichiamo le operazioni di SQL<\/strong><\/p>\n\n\n\n

    Per estrarre informazioni specifiche<\/p>\n\n\n\n

    result = glueContext.sql(\"\u201d)<\/pre>\n\n\n\n

    Nel nostro caso, abbiamo bisogno di generare 3 risultati distinti, uno per ogni room, e per questo usiamo un semplice WHERE room_id = <value><\/strong><\/p>\n\n\n\n

    Applichiamo il nostro mapping<\/strong><\/p>\n\n\n\n

    Per generare uno schema di conversione<\/p>\n\n\n\n

    dynamicFrameResult = DynamicFrame.fromDF(result, glueContext, \"Result\")\napplymapping = ApplyMapping.apply(frame = dynamicFrameResult, mappings = [(\"temp\", \"bigint\", \"temp\",\"bigint\"), (\"room_id\", \"string\", \"room_id\",\"string\"), (\"timestamp\", \"string\", \"timestamp\",\"string\")])\n<\/pre>\n\n\n\n

    Salviamo di nuovo su S3<\/strong><\/p>\n\n\n\n

    Per poter manipolare i dati in seguito<\/p>\n\n\n\n

    to_be_written = glueContext.write_dynamic_frame.from_options(frame = applymapping, connection_type = \"s3\", connection_options = {\"path\": \"s3:\/\/\", \"partitionKeys\": [\"timestamp\"]}, format = \"csv\", transformation_ctx = \"to_be_written\")\njob.commit()\n<\/pre>\n\n\n\n

    Step Function<\/h2>\n\n\n\n

    La Step Function rappresenta il nucleo, la logica della nostra soluzione di esempio. Il suo scopo principale \u00e8 gestire tutti i lavori ETL, mantenerli sincronizzati e gestire gli errori. Un vantaggio \u00e8 che possiamo usare la Step Function per regolare i dati iniettati nel bucket S3 centrale, dove salviamo tutti i valori puliti.<\/p>\n\n\n\n

    Per iniziare, questo \u00e8 lo schema della step function che abbiamo usato per questo esempio:<\/p>\n\n\n\n

    \"La
    La nostra pipeline di esempio<\/em><\/figcaption><\/figure>\n\n\n\n

    Nel nostro esempio ci sono un paio di hint interessanti che vorremmo condividere su Step Function; in primo luogo, abbiamo 2 crawler loop principali: il primo, ha branch e gestisce 2 crawler contemporaneamente (uno standard per S3 e uno per il database EC2 che \u00e8 quello personalizzato); il secondo prende tutti i dati recuperati sia dalle sorgenti di dati storici che da quella in real-time (da Kinesis Firehose) ed estrae i set di dati per room, che verranno poi utilizzati con Amazon SageMaker.<\/p>\n\n\n\n

    Poich\u00e9 i crawler sono asincroni, non possiamo aspettarli, quindi abbiamo dovuto creare 2 cicli di attesa per entrambi gli step di esecuzione.<\/p>\n\n\n\n

    AWS Lambda viene utilizzato per chiamare le API di AWS Glue per avviare i job che abbiamo configurato in precedenza.<\/p>\n\n\n\n

    Per darvi qualche spunto, ecco alcune parti interessanti descritte nel file JSON che rappresenta la macchina a stati.<\/p>\n\n\n\n

    \"Type\": \"Parallel\",\n  \"Branches\": [\n        {\n          \"StartAt\": \"Import Raw from EXTERNAL_DB\",\n          \"States\": {\n            \"Import Raw from EXTERNAL_DB\": {\n              \"Type\": \"Task\",\n              \"Resource\": \"arn:aws:states:::glue:startJobRun.sync\",\n<\/pre>\n\n\n\n

    In AWS Step Function, possiamo avviare attivit\u00e0 in parallelo (per noi, i due processi glue sui dati storici) utilizzando “Type: Parallel” e “Branches”. Inoltre dopo la chiave \u201cBranches\u201d, vediamo come \u00e8 possibile recuperare il risultato dei job in parallelo.<\/p>\n\n\n\n

    \"ResultPath\": \"$.ParallelExecutionOutput\",\n\"Next\": \"Start LAKE_DATA Crawler\"\n<\/pre>\n\n\n\n

    Possiamo eseguire un Glue Job sincrono definito nella console, passando il nome del job stesso e anche abilitando la generazione di un Glue catalog durante il processo.<\/p>\n\n\n\n

    \"Parameters\": {\n                \"JobName\": \"EXTERNAL_DB_IMPORT_TO_RAW\",\n                \"Arguments\": {\n                  \"--enable-glue-datacatalog\": \"true\",\n<\/pre>\n\n\n\n

    Possiamo inoltre risolvere le eccezioni del codice direttamente in Step Function portandoci in uno step di errore mediante la chiave \u201cCatch\u201d:<\/p>\n\n\n\n

    \"Catch\": [\n        {\n          \"ErrorEquals\": [\n            \"States.TaskFailed\"\n          ],\n          \"Next\": \"Data Pipeline Failure\"\n        }\n],\n<\/pre>\n\n\n\n

    Poich\u00e9 non abbiamo un modo standard per attendere il completamento dei lavori, utilizziamo l’output dei lavori paralleli e un ciclo di attesa di Step Functions per verificare se l’operazione \u00e8 stata eseguita; per questo, usiamo la chiave “Wait”:<\/p>\n\n\n\n

    \"Wait for LAKE_DATA Crawler\": {\n      \"Type\": \"Wait\",\n      \"Seconds\": 5,\n      \"Next\": \"Check LAKE_DATA Crawler\"\n},\n<\/pre>\n\n\n\n

    Il resto del flusso \u00e8 praticamente una ripetizione di questi componenti.<\/p>\n\n\n\n

    Il fatto interessante \u00e8 che possiamo applicare alcune condizioni di partenza per alterare l’esecuzione del flusso, come evitare alcuni lavori se non necessari al momento o anche eseguire un’altra macchina a stati da un passo preciso per prendere il nostro esempio e modularizzare le parti pi\u00f9 complicate.<\/p>\n\n\n\n

    Athena e Quicksight<\/h2>\n\n\n\n

    Athena pu\u00f2 generare tabelle che possono essere interrogate utilizzando il linguaggio SQL standard, non solo: i risultati delle query Athena possono essere importati in Amazon QuickSight per generare rapidamente grafici e report, basati sui tuoi dati.<\/p>\n\n\n\n

    Nel nostro flusso di lavoro, \u00e8 possibile eseguire query Athena sul bucket S3 di destinazione che contiene sia i dati della temperatura globale sia quelli specifici dei sensori. Vediamo rapidamente come fare:<\/p>\n\n\n\n

    1. Se abbiamo gi\u00e0 creato un Glue Crawler, avremo un Datasource e una table.<\/li>
    2. Selezioniamo il database e la tabella nella dashboard di Athena, nella sidebar a sinistra (abbiamo utilizzato temperatures_db e temperatures, ottenuti dai nostri crawlers).<\/li>
    3. Creiamo una semplice query che possa essere utilizzata poi in QuickSight per mostrare un grafico, ad esempio, una semplice \u201cSELECT * FROM temperatures\u201d. <\/li><\/ol>\n\n\n\n

      Tramite questi 3 step, Athena generer\u00e0 il risultato della query come mostrato in figura:<\/p>\n\n\n\n

      \"Athena
      Athena – query di esempio<\/em><\/figcaption><\/figure>\n\n\n\n

      Un paio di trucchi interessanti quando si lavora con Athena:<\/p>\n\n\n\n