{"id":2625,"date":"2021-02-19T11:05:48","date_gmt":"2021-02-19T10:05:48","guid":{"rendered":"https:\/\/blog.besharp.it\/?p=2625"},"modified":"2021-03-17T15:34:25","modified_gmt":"2021-03-17T14:34:25","slug":"come-orchestrare-una-pipeline-di-data-analytics-e-business-intelligence-via-step-function","status":"publish","type":"post","link":"https:\/\/blog.besharp.it\/it\/come-orchestrare-una-pipeline-di-data-analytics-e-business-intelligence-via-step-function\/","title":{"rendered":"Come orchestrare una pipeline di Data Analytics e Business Intelligence via Step Function"},"content":{"rendered":"\n
Le pipeline di ETL su AWS di solito hanno un comportamento lineare: si inizia da un servizio e si termina con un altro. Questa volta, tuttavia, vorremmo presentare una configurazione pi\u00f9 flessibile, in cui alcuni job ETL potrebbero essere saltati a seconda dei dati. Inoltre, alcuni dei dati trasformati nel nostro datalake verranno interrogati da AWS Athena per generare dashboard di BI in QuickSight, mentre altre partizioni di dati verranno utilizzate per addestrare un rilevamento di anomalie ad-hoc tramite Sagemaker.<\/p>\n\n\n\n
Un potente strumento per orchestrare questo tipo di pipeline ETL \u00e8 il servizio AWS StepFunctions.<\/p>\n\n\n\n
In questo articolo, vogliamo mostrarti alcuni dei passaggi coinvolti nella creazione della pipeline citata e quali servizi AWS per l’analisi dei dati si possano utilizzare in scenari quasi in tempo reale per gestire un volume elevato di dati in modo scalabile.<\/p>\n\n\n\n
In particolare, esamineremo i connettori e i crawler di AWS Glue, AWS Athena, QuickSight, Kinesis Data Firehose e infine una breve spiegazione su come utilizzare SageMaker per creare previsioni a partire dai dati raccolti. Per saperne di pi\u00f9 su Sagemaker puoi anche dare un’occhiata ai nostri altri articoli<\/a>.<\/p>\n\n\n\n Iniziamo!<\/p>\n\n\n\n In questo esempio, configureremo diversi sensori per inviare dati di temperatura e diagnostici alla nostra pipeline ed eseguiremo diverse analisi BI, per verificarne l’efficienza; useremo infine un modello di Sagemaker per ricercare la presenza di anomalie.<\/p>\n\n\n\n Per mantenere le cose interessanti, vogliamo anche acquisire i dati storici da due posizioni diverse: un bucket S3 e un database che risiede su un’istanza EC2 in una VPC diversa da quella della nostra pipeline ETL.<\/p>\n\n\n\n Useremo diversi job ETL per recuperare ed estrarre i dati puliti dalle tuple a disposizione e AWS Step Functions per orchestrare tutti i crawler e i job.<\/p>\n\n\n\n Kinesis Data Firehose recuperer\u00e0 continuamente i dati dei sensori e con AWS Athena interrogheremo le informazioni, dai dati aggregati e per sensore, per mostrare le statistiche grafiche in Amazon Quicksight.<\/p>\n\n\n\n Ecco un semplice schema che illustra i servizi coinvolti e il flusso completo.<\/p>\n\n\n\n Kinesis Data Firehose pu\u00f2 essere utilizzato per ottenere dati quasi in tempo reale dai sensori, che sfruttano IoT Core SDK per connettersi ai dispositivi effettivi. Come visto in questo articolo<\/a>, possiamo creare una “Cosa”, generando cos\u00ec un topic<\/strong>. Collegandosi a tale topic<\/strong>, diversi dispositivi possono raccogliere le proprie metriche tramite Firehose inviando messaggi utilizzando il protocollo MQTT<\/a> e, se necessario, IoT Core pu\u00f2 anche gestire l’autenticazione<\/strong> del dispositivo.<\/p>\n\n\n\n Per iniziare a inviare i dati dei sensori, dobbiamo scaricare il kit di connessione dalla pagina AWS IoT<\/a> seguendo le istruzioni presentate.<\/p>\n\n\n\n Una volta scaricato, inizializziamo un nuovo progetto Node.js e installiamo AWS-IoT-device-SDK<\/strong>. Dopodich\u00e9, \u00e8 possibile eseguire lo script start.sh<\/strong> incluso, assicurandosi che tutti i certificati, scaricati insieme al kit, siano nella stessa directory. Ora possiamo creare uno script locale per inviare dati a un topic, passando i moduli richiesti e utilizzando device.publish (“<topic>”, payload)<\/strong>:<\/p>\n\n\n\n I dati inviati sono strutturati in formato JSON con la seguente struttura:<\/p>\n\n\n\n Per creare un flusso di consegna di Firehose, andiamo alla dashboard del servizio Kinesis Firehose<\/strong> nella console Web di AWS, facciamo clic su “Crea flusso di consegna”, selezioniamo un nome, quindi “Direct PUT or other sources” come in figura:<\/p>\n\n\n\n Lasciamo \u201cData transformation\u201d e \u201cRecord format conversion\u201d come di default. Selezioniamo una destinazione di S3 come target. Ricordiamoci di definire anche una IoT Rule<\/strong> per inviare i messaggi IoT a Firehose mediante delivery stream.<\/p>\n\n\n\n AWS Glue pu\u00f2 essere utilizzato per estrarre e trasformare dati da una moltitudine di origini dati diverse, grazie alla possibilit\u00e0 di definire diversi tipi di connettori.<\/p>\n\n\n\n Database su istanza EC2<\/strong><\/p>\n\n\n\n Vogliamo essere in grado di generare un Glue Data Catalog da un database Microsoft SQL Server, che risiede su un’istanza EC2 in un\u2019 altra VPC. Per fare ci\u00f2, dobbiamo creare una connessione JDBC, che pu\u00f2 essere eseguita facilmente accedendo alla pagina del servizio AWS Glue e aggiungendo una nuova connessione; questa si trova nella sezione “Catalogo dati – Database” del menu della barra laterale.<\/p>\n\n\n\n Basta aggiungere un nome alla connessione (che verr\u00e0 utilizzata dal relativo Crawler Job), l’URL JDBC, seguendo la giusta convenzione per ORACLE DB, nome utente e password, VPC e sottorete richiesti.<\/p>\n\n\n\n Per stabilire una connessione glue al database, dobbiamo creare una nuova VPC dedicata che verr\u00e0 utilizzata solo da Glue. La VPC \u00e8 connessa a quella che contiene il data-warehouse tramite peering VPC<\/a>, ma sono possibili anche altre opzioni, ad esempio avremmo potuto utilizzare AWS Transit Gateway. Una volta stabilito il peering, dobbiamo ricordarci di aggiungere le rotte corrette, sia alla sottorete Glue che a quella del DB, in modo che le VPC possano scambiare traffico e di aprire il security group del DB, per consentire il traffico in entrata sulla porta pertinente, al security group di Glue nella nuova VPC.<\/p>\n\n\n\n Dati su S3<\/strong><\/p>\n\n\n\n I dati su S3 non richiedono un connettore e possono essere configurati direttamente dalla console di AWS Glue. Creiamo un nuovo crawler, selezionando “data stores” per il tipo di origine del crawler<\/strong>; quindi selezioniamo anche “Crawl all folder”. Dopodich\u00e9 \u00e8 solo questione di impostare il bucket S3, il ruolo IAM corretto e creare un nuovo Schema di GLue per questo crawler. Impostare anche “Run on demand”.<\/p>\n\n\n\n Glue Job<\/strong><\/p>\n\n\n\n I Glue Jobs sono i passaggi della pipeline ETL. Consentono di estrarre, trasformare e salvare i dati in un datalake. Nel nostro esempio, vorremmo mostrare due diversi approcci: job gestiti da AWS Glue Studio<\/strong> e mediante l\u2019utilizzo di codice personalizzato<\/strong>. Entrambi i job verranno successivamente richiamati da AWS Step Function.<\/p>\n\n\n\n Per i dati storici su S3, possiamo definire i job da Glue Studio. Per S3 selezionare le seguenti opzioni nell\u2019ordine:<\/p>\n\n\n\n Per estrarre i dati dalla nostra istanza EC2, invece, abbiamo bisogno di un custom job. Per crearlo, dobbiamo scrivere noi stessi uno script, ma non preoccupatevi: \u00e8 piuttosto semplice! Ecco i punti chiave che si devono conoscere per creare uno Spark Job con Glue: il processo ETL \u00e8 composto da 6 aree distinte nello script:<\/p>\n\n\n\n Import delle librerie<\/strong><\/p>\n\n\n\n Set base necessario al funzionamento dello script:<\/p>\n\n\n\n Prepariamo i connettori e altre variabili<\/strong><\/p>\n\n\n\n Da usare all\u2019interno dello script:<\/p>\n\n\n\n Recuperate i Dynamic Frame dal Glue Catalog ottenuto tramite un Crawler<\/strong><\/p>\n\n\n\n I dynamic frame vengono utilizzati per fare query e trasformare i dati<\/p>\n\n\n\n L\u2019ultima linea di codice permette di modificare un dynamic frame.<\/p>\n\n\n\n Applichiamo le operazioni di SQL<\/strong><\/p>\n\n\n\n Per estrarre informazioni specifiche<\/p>\n\n\n\n Nel nostro caso, abbiamo bisogno di generare 3 risultati distinti, uno per ogni room, e per questo usiamo un semplice WHERE room_id = <value><\/strong><\/p>\n\n\n\n Applichiamo il nostro mapping<\/strong><\/p>\n\n\n\n Per generare uno schema di conversione<\/p>\n\n\n\n Salviamo di nuovo su S3<\/strong><\/p>\n\n\n\n Per poter manipolare i dati in seguito<\/p>\n\n\n\n La Step Function rappresenta il nucleo, la logica della nostra soluzione di esempio. Il suo scopo principale \u00e8 gestire tutti i lavori ETL, mantenerli sincronizzati e gestire gli errori. Un vantaggio \u00e8 che possiamo usare la Step Function per regolare i dati iniettati nel bucket S3 centrale, dove salviamo tutti i valori puliti.<\/p>\n\n\n\n Per iniziare, questo \u00e8 lo schema della step function che abbiamo usato per questo esempio:<\/p>\n\n\n\n Nel nostro esempio ci sono un paio di hint interessanti che vorremmo condividere su Step Function; in primo luogo, abbiamo 2 crawler loop principali: il primo, ha branch e gestisce 2 crawler contemporaneamente (uno standard per S3 e uno per il database EC2 che \u00e8 quello personalizzato); il secondo prende tutti i dati recuperati sia dalle sorgenti di dati storici che da quella in real-time (da Kinesis Firehose) ed estrae i set di dati per room, che verranno poi utilizzati con Amazon SageMaker.<\/p>\n\n\n\n Poich\u00e9 i crawler sono asincroni, non possiamo aspettarli, quindi abbiamo dovuto creare 2 cicli di attesa per entrambi gli step di esecuzione.<\/p>\n\n\n\n AWS Lambda viene utilizzato per chiamare le API di AWS Glue per avviare i job che abbiamo configurato in precedenza.<\/p>\n\n\n\n Per darvi qualche spunto, ecco alcune parti interessanti descritte nel file JSON che rappresenta la macchina a stati.<\/p>\n\n\n\n In AWS Step Function, possiamo avviare attivit\u00e0 in parallelo (per noi, i due processi glue sui dati storici) utilizzando “Type: Parallel” e “Branches”. Inoltre dopo la chiave \u201cBranches\u201d, vediamo come \u00e8 possibile recuperare il risultato dei job in parallelo.<\/p>\n\n\n\n Possiamo eseguire un Glue Job sincrono definito nella console, passando il nome del job stesso e anche abilitando la generazione di un Glue catalog durante il processo.<\/p>\n\n\n\n Possiamo inoltre risolvere le eccezioni del codice direttamente in Step Function portandoci in uno step di errore mediante la chiave \u201cCatch\u201d:<\/p>\n\n\n\n Poich\u00e9 non abbiamo un modo standard per attendere il completamento dei lavori, utilizziamo l’output dei lavori paralleli e un ciclo di attesa di Step Functions per verificare se l’operazione \u00e8 stata eseguita; per questo, usiamo la chiave “Wait”:<\/p>\n\n\n\n Il resto del flusso \u00e8 praticamente una ripetizione di questi componenti.<\/p>\n\n\n\n Il fatto interessante \u00e8 che possiamo applicare alcune condizioni di partenza per alterare l’esecuzione del flusso, come evitare alcuni lavori se non necessari al momento o anche eseguire un’altra macchina a stati da un passo preciso per prendere il nostro esempio e modularizzare le parti pi\u00f9 complicate.<\/p>\n\n\n\n Athena pu\u00f2 generare tabelle che possono essere interrogate utilizzando il linguaggio SQL standard, non solo: i risultati delle query Athena possono essere importati in Amazon QuickSight per generare rapidamente grafici e report, basati sui tuoi dati.<\/p>\n\n\n\n Nel nostro flusso di lavoro, \u00e8 possibile eseguire query Athena sul bucket S3 di destinazione che contiene sia i dati della temperatura globale sia quelli specifici dei sensori. Vediamo rapidamente come fare:<\/p>\n\n\n\n Tramite questi 3 step, Athena generer\u00e0 il risultato della query come mostrato in figura:<\/p>\n\n\n\n Un paio di trucchi interessanti quando si lavora con Athena:<\/p>\n\n\n\n Quicksight pu\u00f2 leggere query di Athena e presentare grafici e diagrammi da esse. \u00c8 molto semplice: andiamo alla pagina del servizio Quicksight e seguiamo uno dei tanti tutorial<\/a> a riguardo, tenendo presente alcune cose importanti:<\/p>\n\n\n\n Se non vogliamo, o non possiamo, utilizzare Quicksight, possiamo sempre chiamare direttamente le API di Athena e creare la nostra dashboard customizzata da zero.<\/p>\n\n\n\n L’algoritmo di apprendimento automatico che esploreremo in questo articolo si chiama Random Cut Forest. L’algoritmo prende un insieme di data point casuali (Random), li taglia allo stesso numero di punti<\/strong> e crea alberi (Cut). Infine, controlla tutti gli alberi insieme (Forest) per verificare se un particolare data point deve essere considerato un’anomalia.<\/p>\n\n\n\n In generale, un albero \u00e8 un modo ordinato di memorizzare dati numerici e, per crearlo, suddividiamo casualmente i data point fino a quando \u00e8 possibile isolare il punto che stiamo testando per determinare se si tratta di un’anomalia. Ogni volta che suddividiamo i punti viene creato un nuovo livello dell’albero.<\/p>\n\n\n\n SageMaker offre un’implementazione managed di Random Cut Forest che accetta data points in formato CSV. Possiamo recuperare facilmente i dataset con:<\/p>\n\n\n\n I dati contengono un timestamp<\/strong>, il valore della temperatura<\/strong> in C \u00b0 e un room_id<\/strong>, che identifica una particolare stanza in cui \u00e8 stato installato il sensore. Abbiamo gi\u00e0 utilizzato la nostra Step Function per dividere i dati provenienti da stanze diverse in modo da poter passare direttamente il CSV all’Estimator.<\/p>\n\n\n\n Abbiamo fatto riferimento a questo articolo<\/a> per verificare come i dati possano essere passati all\u2019Estimator. Stando alla documentazione ufficiale, dobbiamo passare 3 iperparametri principali:<\/p>\n\n\n\n L\u2019Estimator \u00e8 definito in questo modo:<\/p>\n\n\n\n Alcune informazioni da tenere in considerazione sono che generiamo execution_role<\/strong> e sagemaker_session<\/strong> utilizzando i metodi incorporati di SageMaker. Per il nostro training utilizziamo un’istanza ml.m4xlarge<\/strong>, mentre per l\u2019inferenza abbiamo utilizzato una ml.c5.xlarge <\/strong>come suggerito dalla documentazione. Non sprechiamo crediti per le istanze GPU poich\u00e9 l’algoritmo RCF non tiene conto della GPU.<\/p>\n\n\n\n Per il deploy possiamo utilizzare l’approccio standard:<\/p>\n\n\n\n E cos\u00ec ci siamo! Abbiamo raggiunto la fine di questo workflow. Vediamo alcune referenze e riassumiamo quanto analizzato fin\u2019ora.<\/p>\n\n\n\n In questo articolo, abbiamo visto molti servizi di AWS perfettamente adatti per l’analisi dei dati quando si tratta di scenari quasi in tempo reale. Abbiamo discusso di AWS Step Function e di come pu\u00f2 essere utilizzata per orchestrare flussi di lavoro non lineari, offrendo agli sviluppatori la possibilit\u00e0 di avere pi\u00f9 scelte nella manipolazione ed estrazione dei dati per diversi tipi di analisi.<\/p>\n\n\n\n AWS Glue si \u00e8 dimostrato sufficientemente flessibile da prendersi cura di origini di dati residenti in luoghi diversi: istanze EC2, S3 e in account diversi. \u00c8 stata una scelta perfetta, anche per la semplicit\u00e0 di impostare Spark Job. Abbiamo visto in particolare come connettersi a un’origine dati utilizzando una connessione JDBC.<\/p>\n\n\n\n Athena ha dimostrato di essere lo strumento perfetto per estrarre i risultati ETL per la fruizione da parte della Business Intelligence e Quicksight \u00e8 la scelta pi\u00f9 ovvia per mostrare i risultati, poich\u00e9 \u00e8 nativamente compatibile con le query di Athena.<\/p>\n\n\n\n Come in molti altri scenari che abbiamo affrontato, Kinesis Data Firehose \u00e8 stato utilizzato anche per trasferire dati quasi in tempo reale a S3 da una fonte non AWS.<\/p>\n\n\n\n Abbiamo anche visto come Amazon S3 sia sempre un must quando si tratta di flussi di lavoro di big data, problemi di machine learning e creazione di data lake. I suoi standard di durabilit\u00e0, oltre alla compatibilit\u00e0 con qualsiasi altro servizio AWS, lo rendono la scelta perfetta sia per l’archiviazione a lungo termine che per il buffer intermedio.<\/p>\n\n\n\n Per concludere, abbiamo fornito alcuni suggerimenti su come manipolare i dati in SageMaker per eseguire inferenze per il rilevamento di anomalie.<\/p>\n\n\n\n Questo conclude il nostro viaggio di oggi, come sempre sentitevi liberi di commentare e raggiungerci per discutere qualsiasi domanda, dubbio o idea che vi venga in mente. Saremo lieti di rispondere il prima possibile!<\/p>\n\n\n\nIl nostro setup<\/h2>\n\n\n\n
Kinesis Data Firehose <\/h2>\n\n\n\n
<\/figcaption><\/figure>\n\n\n\nconst deviceModule = require('aws-iot-device-sdk').device;\nconst cmdLineProcess = require('aws-iot-device-sdk\/examples\/lib\/cmdline');\n\u2026\ndevice.publish('topic', JSON.stringify(payload));\n<\/pre>\n\n\n\n
{\t\n \u201ctimestamp\u201d: \u201cYYYY-MM-DD HH:MM:SS\u201d,\n \u201croom_id\u201d: \u201cXXXX\u201d,\n \u201ctemperature\u201d: 99\n}\n<\/pre>\n\n\n\n
Glue crawlers e connettori<\/h2>\n\n\n\n
import sys\nfrom awsglue.transforms import *\nfrom awsglue.utils import getResolvedOptions\nfrom pyspark.context import SparkContext\nfrom awsglue.context import GlueContext\nfrom awsglue.job import Job\nfrom awsglue.dynamicframe import DynamicFrame\n<\/pre>\n\n\n\n
args = getResolvedOptions(sys.argv, ['JOB_NAME'])\nsc = SparkContext()\nglueContext = GlueContext(sc)\nspark = glueContext.spark_session\njob = Job(glueContext)\njob.init(args['JOB_NAME'], args)\n<\/pre>\n\n\n\n
rooms_temperatures_df = glueContext.create_dynamic_frame.from_catalog(database = \"raw_temperatures\", table_name = \"temperatures\", transformation_ctx = \"temperature_transforms\").toDF()\nrooms_temperatures_df.createOrReplaceTempView(\"TEMPERATURES\")\n<\/pre>\n\n\n\n
result = glueContext.sql(\"
dynamicFrameResult = DynamicFrame.fromDF(result, glueContext, \"Result\")\napplymapping = ApplyMapping.apply(frame = dynamicFrameResult, mappings = [(\"temp\", \"bigint\", \"temp\",\"bigint\"), (\"room_id\", \"string\", \"room_id\",\"string\"), (\"timestamp\", \"string\", \"timestamp\",\"string\")])\n<\/pre>\n\n\n\n
to_be_written = glueContext.write_dynamic_frame.from_options(frame = applymapping, connection_type = \"s3\", connection_options = {\"path\": \"s3:\/\/
Step Function<\/h2>\n\n\n\n
\"Type\": \"Parallel\",\n \"Branches\": [\n {\n \"StartAt\": \"Import Raw from EXTERNAL_DB\",\n \"States\": {\n \"Import Raw from EXTERNAL_DB\": {\n \"Type\": \"Task\",\n \"Resource\": \"arn:aws:states:::glue:startJobRun.sync\",\n<\/pre>\n\n\n\n
\"ResultPath\": \"$.ParallelExecutionOutput\",\n\"Next\": \"Start LAKE_DATA Crawler\"\n<\/pre>\n\n\n\n
\"Parameters\": {\n \"JobName\": \"EXTERNAL_DB_IMPORT_TO_RAW\",\n \"Arguments\": {\n \"--enable-glue-datacatalog\": \"true\",\n<\/pre>\n\n\n\n
\"Catch\": [\n {\n \"ErrorEquals\": [\n \"States.TaskFailed\"\n ],\n \"Next\": \"Data Pipeline Failure\"\n }\n],\n<\/pre>\n\n\n\n
\"Wait for LAKE_DATA Crawler\": {\n \"Type\": \"Wait\",\n \"Seconds\": 5,\n \"Next\": \"Check LAKE_DATA Crawler\"\n},\n<\/pre>\n\n\n\n
Athena e Quicksight<\/h2>\n\n\n\n
SageMaker: Random Cut Forest anomaly detection<\/h2>\n\n\n\n
data_location = f\u201ds3:\/\/{bucket}\/{key}\u201d\ndf=pd.read_csv(data_location,delimiter=\u2019,\u2019)\n<\/pre>\n\n\n\n
<\/figcaption><\/figure>\n\n\n\nimport sagemaker\nfrom sagemaker import RandomCutForest\n \nexecution_role = sagemaker.get_execution_role()\nsagemaker_session = sagemaker.Session()\nbucket = \u201c
rcf.deploy(initial_instance_count=1, instance_type=\"ml.m4.xlarge\")\n<\/pre>\n\n\n\n
Referenze<\/h2>\n\n\n\n
\n
Takeaways<\/h2>\n\n\n\n