{"id":2517,"date":"2021-02-04T12:49:38","date_gmt":"2021-02-04T11:49:38","guid":{"rendered":"https:\/\/blog.besharp.it\/?p=2517"},"modified":"2021-03-17T15:33:44","modified_gmt":"2021-03-17T14:33:44","slug":"deploy-di-una-pipeline-di-real-time-data-ingestion-e-analytics-con-aws-iot-core-amazon-kinesis-e-amazon-sagemaker","status":"publish","type":"post","link":"https:\/\/blog.besharp.it\/it\/deploy-di-una-pipeline-di-real-time-data-ingestion-e-analytics-con-aws-iot-core-amazon-kinesis-e-amazon-sagemaker\/","title":{"rendered":"Deploy di una pipeline di real-time Data Ingestion e Analytics con AWS IoT Core, Amazon Kinesis e Amazon SageMaker"},"content":{"rendered":"\n
Il Machine Learning sta rapidamente entrando a far parte della nostra vita quotidiana. Sempre pi\u00f9 software e dispositivi sono oggi in grado di connettersi ad internet e di gestire autonomamente routine e attivit\u00e0 di tutti i giorni senza l’intervento umano. Si pensi ad esempio alla domotica, alle luci e ai riscaldamenti smart <\/em>o ai robot che puliscono i pavimenti in autonomia senza difficolt\u00e0 alcuna anche in ambienti domestici complessi pieni di ostacoli.<\/p>\n\n\n\n In questo contesto, le informazioni che possiamo raccogliere dai dispositivi connessi sono infinite. Il costo contenuto di acquisizione del dato e della potenza di calcolo necessaria ad elaborare grandi quantit\u00e0 di informazioni hanno reso accessibile l\u2019applicazione del Machine Learning ai pi\u00f9 diversi casi d\u2019uso. Uno dei pi\u00f9 interessanti riguarda sicuramente l\u2019ingestion e l\u2019analisi real-time dei dati provenienti da dispositivi connessi.<\/p>\n\n\n\n In questo articolo, descriveremo una soluzione basata sui servizi gestiti di AWS per l\u2019elaborazione in tempo reale di elevati volumi di dati provenienti da uno o pi\u00f9 dispositivi connessi IoT e mostreremo come realizzare una pipeline completa di real-time Data Ingestion e Analytics. <\/p>\n\n\n\n Esploreremo alcuni concetti chiave relativi all’apprendimento automatico, all\u2019ETL, alla pulizia dei dati e alla preparazione del data lake.<\/p>\n\n\n\n Prima di passare alla progettazione del codice e dell’infrastruttura, per\u00f2, facciamo un breve riepilogo su alcuni concetti chiave relativi a Machine Learning, ETL, pulizia e preparazione dei dati, creazione dei data lake ed IoT. Partiamo!<\/p>\n\n\n\n Internet of Things (IoT) \u00e8 la definizione utilizzata per descrivere un insieme di dispositivi fisici – “things” – interconnessi e dotati di sensori in grado di inviare dati e scambiare informazioni via internet.<\/p>\n\n\n\n L’IoT si \u00e8 evoluto rapidamente grazie alla diminuzione dei costi dei sensori intelligenti e alla diffusione di metodologie come analisi in tempo reale, apprendimento automatico e sistemi integrati.<\/p>\n\n\n\n Naturalmente, anche i settori tradizionali dei sistemi embedded, delle reti di sensori wireless, dei sistemi di controllo e dell’automazione contribuiscono al mondo dell’IoT.<\/p>\n\n\n\n Il Machine Learning \u00e8 nato come un\u2019evoluzione dell’intelligenza artificiale<\/strong>. Il Machine Learning tradizionale richiede ai programmatori di scrivere euristiche complesse e difficili da mantenere per eseguire un compito tradizionalmente umano (ad esempio il riconoscimento del testo nelle immagini) utilizzando un computer.<\/p>\n\n\n\n Grazie al ML, \u00e8 il sistema stesso che impara le relazioni tra i dati.<\/p>\n\n\n\n Per esempio, in un\u2019ipotetica partita di scacchi, baster\u00e0 fornire un set di dati di caratteristiche riguardanti le partite di scacchi e il modello imparer\u00e0 a giocare da solo. In una pipeline di Machine Learning, i dati devono essere uniformi, ovvero standardizzati. Le differenze nei dati possono derivare dalla loro provenienza da fonti eterogenee, da \u201cdatabase schema\u201d differenti o flussi di importazione dei dati diversi.<\/p>\n\n\n\n La trasformazione dei dati o flusso di ETL (Estrazione, Trasformazione, Caricamento) \u00e8 quindi un passaggio essenziale in tutte le pipeline di ML. I dati standardizzati non sono solo essenziali nell’addestramento del modello di ML, ma sono anche molto pi\u00f9 facili da analizzare e visualizzare nella fase preliminare di data discovery<\/strong>.<\/p>\n\n\n\n Per le attivit\u00e0 di pulizia e formattazione del dato sono generalmente utilizzate librerie come Scipy Pandas o simili.<\/p>\n\n\n\n – NumPy<\/strong>: <\/em>libreria utilizzata per la gestione di array multidimensionali. Generalmente utilizzata per le fasi di import e lettura di un dataset.<\/p>\n\n\n\n – Pandas<\/strong> Dataframe<\/strong>: libreria utilizzata per la gestione di dati in formato tabulare. Colleziona data point da file di tipo CSV<\/strong>, JSON<\/strong>, Excel<\/strong>, e pickle <\/strong>e li trasforma in tabelle.<\/p>\n\n\n\n – SciKit-Learn<\/strong>: libreria utilizzata per la manipolazione e il training finale dei dati.<\/p>\n\n\n\n Pulire e formattare i dati \u00e8 essenziale per ottenere un modello performante in grado di convergere <\/strong>alla soluzione che si vuole ottenere.<\/p>\n\n\n\n Per la soluzione che andremo a realizzare faremo largo uso dei servizi gestiti messi a disposizione da AWS. Ecco un semplice schema infrastrutturale raffigurante gli attori principali nella nostra Pipeline di ML:<\/p>\n\n\n\n Entriamo nel merito di ciascun servizio. <\/p>\n\n\n\n La pipeline sar\u00e0 organizzata in 5 fasi principali: ingestion<\/strong>, preparazione del data lake<\/strong>, trasformazione<\/strong>, training<\/strong> e inferenza<\/strong>. Per mandare le informazioni al nostro data lake su Amazon S3, utilizzeremo Amazon Kinesis Data Firehose<\/a> e la feature che permette la lettura di messaggi IoT Core.<\/p>\n\n\n\n Per trasformare i dati e renderli disponibili per Amazon SageMaker, utilizzeremo invece AWS Glue<\/a>, il servizio di ETL managed in grado di trovare, preparare e combinare tra di loro i dati, per l\u2019analisi, il machine learning e il deploy dell\u2019applicativo. Mettendo a disposizione tutti questi strumenti, esso permette di analizzare grandi moli di dati in pochi minuti, anzich\u00e9 in mesi. \u00c8 il momento di connettere i nostri dispositivi di test attraverso le feature di AWS IoT Core.<\/p>\n\n\n\n Accediamo al nostro account AWS ed entriamo nella pagina del servizio. Clicchiamo su \u201cGet started\u201d e poi procediamo con \u201cOnboard a device\u201d.<\/p>\n\n\n\n Seguiamo i passaggi descritti nel wizard per connettere i dispositivi. <\/p>\n\n\n\n Gli obiettivi di questa fase sono:<\/p>\n\n\n\n Stabilire una connessione con AWS \u00e8 importante anche per permettere a Kinesis Firehose di leggere i messaggi mandati da AWS IoT Core. Ricordiamo che il dispositivo che stiamo connettendo necessiter\u00e0 di una connessione TCP pubblica sulla porta 8883.<\/p>\n\n\n\n Dal wizard, selezioniamo Linux come sistema operativo e un SDK (nel nostro caso Node.js):<\/p>\n\n\n\n A questo punto, diamo un nome al nostro dispositivo e otteniamo il nostro kit di connessione contenente:<\/p>\n\n\n\n Una volta scaricato il kit, inizializziamo un nuovo progetto Node.js e installiamo AWS-IoT-device-SDK.<\/strong> In questo modo, i node module richiesti verranno installati. Dopodich\u00e9 sar\u00e0 possibile lanciare lo script start.sh<\/strong> incluso, aggiungendo tutti i certificati scaricati nel kit nella stessa directory del progetto.<\/p>\n\n\n\n Abbiamo sviluppato il nostro esempio partendo dal codice di device-example.js<\/strong> come semplice base per capire come connettere un dispositivo ad AWS IoT:<\/p>\n\n\n\n Importiamo i moduli di Node.js necessari a connettere i nostri dispositivi ad AWS e di pubblicare su un canale a noi rilevante. \u00e8 possibile leggere i dati dai sensori dei dispositivi in qualunque modo, ad esempio, nel caso in cui un device possa scrivere le informazioni in una specifica destinazione sul disco, baster\u00e0 leggere e rendere i dati una stringa utilizzando device.publish(‘<YOUR_TOPIC>’, JSON.stringify(payload))<\/strong>.<\/p>\n\n\n\n L\u2019ultima parte di codice chiama semplicemente la funzione principale al fine di mandare le informazioni alla console.<\/p>\n\n\n\n Per eseguire lo script, utilizziamo lo script start.sh incluso nel development kit assicurandoci di puntare al nostro codice e non al codice di esempio fornito da AWS<\/strong><\/p>\n\n\n\n Nota: per la natura esemplificativa dell\u2019articolo, il codice del dispositivo che utilizziamo \u00e8 semplificato. Consigliamo di non utilizzarlo per un ambiente di produzione.<\/em> \u00e8 il momento di connettere Kinesis Firehose per cominciare a inviare i dati ad Amazon S3.<\/p>\n\n\n\n Trasferire i dati raccolti dai dispositivi, arricchire il data lake e migliorare il modello \u00e8 estremamente importante per evitare il problema chiamato Concept Drift<\/a>,<\/strong> un problema che si verifica al graduale disallineamento del modello deployato rispetto ai dati reali<\/strong>. Questo succede in quando i dati storici non sarebbero in grado di rappresentare un problema nel frattempo evoluto.<\/p>\n\n\n\n Per risolvere il problema dobbiamo assicurare un logging efficiente e capire quando intervenire sul modello, ad esempio effettuando nuovamente il training o aggiornando la versione per poi rideployarla. Definiamo quindi una \u201caction\u201d di Kinesis Firehose specifica per registrare automaticamente e trasportare ciascun messaggio MQTT distribuito dal dispositivo, direttamente su Amazon S3, in modo da alimentare il nostro data lake con dati sempre aggiornati.<\/p>\n\n\n\n Per creare lo stream di Firehose, cerchiamo \u201cKinesis firehose\u201d nella search bar, selezioniamolo e clicchiamo su \u201cCreate delivery stream\u201d, come mostrato in figura:<\/p>\n\n\n\n Per utilizzare lo stream creato, occorre prima connetterlo con AWS IoT tramite una IoT Rule<\/strong>. l\u2019IoT Rule autorizzer\u00e0 Kinesis a ricevere i messaggi e a scriverli nel bucket S3. Per configurare AWS IoT per mandare messaggi a Firehose abbiamo eseguito i seguenti passaggi:<\/p>\n\n\n\n Ecco un esempio di come apparir\u00e0 la regola che andreamo a creare:<\/p>\n\n\n\n Se avremo svolto correttamente tutti i passaggi, cominceremo a veder comparire i dati nel bucket:<\/p>\n\n\n\n Amazon S3 \u00e8 il servizio di storage ideale per costruire data lake<\/a>. Con una possibilit\u00e0 di scalare pressoch\u00e9 illimitata, un data lake basato su Amazon S3 per l\u2019analisi dei big data, presenta diversi benefici. <\/p>\n\n\n\n L’architettura dati centralizzata di S3 semplifica la creazione di un ambiente multi-tenant in cui pi\u00f9 utenti possono utilizzare il proprio strumento di analisi di Big Data su un insieme comune di dati.<\/p>\n\n\n\n Inoltre, S3 si integra perfettamente con altri servizi Amazon come Amazon Athena, Amazon Redshift e, come nel caso presentato, AWS Glue.<\/p>\n\n\n\n S3 consente inoltre di separare lo storage dall’elaborazione dei dati per ottimizzare i costi e i flussi di lavoro, oltre a mantenere la soluzione dry, scalabile e gestibile.<\/p>\n\n\n\n Inoltre, S3 consente di archiviare qualsiasi tipo di dati strutturati, semi-strutturati o anche non strutturati nel suo formato nativo. Nel nostro caso siamo semplicemente interessati nel salvataggio di dati \u201cmoccati\u201d da un device di test per eseguire semplici algoritmi di forecasting.<\/p>\n\n\n\n Anche se i dati vengono salvati su Amazon S3 quasi in tempo reale, non sono ancora sufficienti per consentirci di gestire un modello Amazon SageMaker. Come abbiamo spiegato nell’introduzione, infatti, i dati devono essere preparati e quando si tratta di algoritmi AWS SageMaker predefiniti, <\/strong>\u00e8 necessario tenere presente alcune impostazioni di default.<\/p>\n\n\n\n Ad esempio SageMaker non accetta headers e, nel caso in cui volessimo definire un training supervisionato<\/strong>, dobbiamo anche mettere la \u201cground truth\u201d come prima colonna del dataset.<\/p>\n\n\n\n In questo semplice esempio abbiamo utilizzato Glue Studio per trasformare i dati grezzi nel bucket S3 di sorgente in file di parquet strutturati da salvare in un Bucket di output dedicato. Il bucket di output verr\u00e0 utilizzato da Sagemaker come origine dati.<\/p>\n\n\n\n Un AWS Glue job consiste di almeno 3 nodi principali, che sono source<\/strong>, transform<\/strong>, e target<\/strong>. Per farlo, ecco qui gli step che abbiamo seguito:<\/p>\n\n\n\n 3. Premiamo il pulsante \u201cCreate\u201d per avviare il processo di creazione del lavoro.<\/p>\n\n\n\n Ora vedremo un grafico a tre nodi che rappresenta i passaggi coinvolti nel processo ETL. Quando AWS Glue viene istruito a leggere da un’origine dati S3, creer\u00e0 anche uno schema interno, chiamato Glue Data Catalog<\/strong>.<\/p>\n\n\n\n Per configurare il source node, clicchiamo su di esso nel grafico:<\/p>\n\n\n\n La stessa cosa pu\u00f2 essere fatta per il nodo di trasformazione: cliccando su di esso, \u00e8 possibile definire quale tipo di trasformazione si vuole applicare ai dati di input. Qui puoi anche verificare che il JSON sia stato importato correttamente:<\/p>\n\n\n\n Infine, possiamo selezionare il nodo di destinazione, specificando di nuovo S3 come destinazione e utilizzando .parquet come formato di output.<\/p>\n\n\n\n Ora dobbiamo impostare i parametri del lavoro ETL dato il grafico dei nodi appena creato. Andiamo nella scheda “Job details” alla destra di quella del grafico, assegniamo un nome e selezioniamo un ruolo in grado di gestire i dati e di eseguire nuovamente il deploy su S3. <\/p>\n\n\n\n Lasciamo il resto come predefinito.<\/p>\n\n\n\n Tieniamo presente che dobbiamo avere questo snippet nella scheda “Trust Relationship” del ruolo per far s\u00ec che venga assunto da Glue:<\/p>\n\n\n\n Se tutto \u00e8 definito correttamente, il job partir\u00e0, e contestualmente, inizier\u00e0 anche la conversione dei dati in formato parquet. I file verranno inseriti nella directory di nostra scelta all\u2019interno del bucket.<\/p>\n\n\n\n Abbiamo scelto di utilizzare .parquet invece di .csv per il dataset di destinazione. Inoltre rispetto al file archiviato in formato .csv abbiamo questi vantaggi in termini di risparmio sui costi:<\/p>\n\n\n\n Amazon SageMaker offre 17 algoritmi pronti all’uso che coprono una pletora di argomenti relativi ai problemi di Machine Learning. Nel nostro caso, volevamo semplificare lo sviluppo di un modello per fare previsioni sui dati recuperati dal nostro dispositivo, quindi, invece di mostrare il paradigma bring your own algorithm<\/strong>, come nel nostro articolo precedente<\/a>, questa volta ne useremo uno gi\u00e0 pronto.<\/p>\n\n\n\n Come spiegato in precedenza, oltre alla pulizia dei dati, il nostro processo ETL \u00e8 stato eseguito per trasformare i dati in modo che fossero compatibili con gli algoritmi SageMaker gi\u00e0 pronti.<\/p>\n\n\n\n SageMaker API e la libreria di Sklearn offrono metodi per recuperare i dati, chiamare il metodo di training, salvare il modello e distribuirlo in produzione per inferenze real-time o batch.<\/p>\n\n\n\n Iniziamo andando alla pagina di SageMaker e creiamo una nuova istanza notebook, per questo articolo scegliamo una ml.t3.medium<\/strong>. Aggiungiamo un nome e creiamo un nuovo ruolo IAM<\/strong>.<\/p>\n\n\n\n Lasciamo il resto come predefinito e clicchiamo su “Create notebook”.<\/p>\n\n\n\n L\u2019accesso \u00e8 possibile da Jupiter o Jupiter Lab, noi scegliamo il secondo. Nota: il codice \u00e8 realizzato esclusivamente per questo articolo e non \u00e8 pensato per un ambiente di produzione in quanto non vi \u00e8 alcuna indagine preliminare sui dati e nessuna convalida dei risultati. Tuttavia, tutto il codice presentato \u00e8 testato e utilizzabile per casi d’uso simili a quello presentato.<\/em><\/p>\n\n\n\n Iniziamo importando tutte le librerie necessarie:<\/p>\n\n\n\n Abbiamo anche impostato le basi per i nostri generatori casuali per garantire la riproducibilit\u00e0. Dopodich\u00e9, dobbiamo recuperare i nostri file parquet<\/strong> da S3<\/strong> e ottenere da loro un Pandas Dataframe.<\/p>\n\n\n\n Inizialmente, prepariamo tutti i percorsi di S3 che verranno utilizzati nel Notebook, generiamo una sessione SageMaker<\/strong> e un ruolo IAM<\/strong> valido con get_execution_role()<\/strong>. Come possiamo vedere SageMaker si prende cura di questi aspetti per noi.<\/p>\n\n\n\n Nel passaggio precedente abbiamo recuperato il nostro forecasting Estimator, DeepAR<\/strong>. Un estimator \u00e8 una classe in SageMaker in grado di generare, apprendere e testare un modello che verr\u00e0 poi salvato su S3.<\/p>\n\n\n\n Prima di iniziare a leggere i file parquet aggiungiamo anche un paio di costanti per il nostro esperimento:<\/p>\n\n\n\n Con freq<\/strong> (frequenza) diciamo che vogliamo analizzare la TimeSeries con metriche orarie. Abbiamo creato due metodi di supporto per leggere dai file parquet:<\/p>\n\n\n\n Quindi leggiamo effettivamente i datasets:<\/p>\n\n\n\n Qui manipoliamo il dataset per renderlo utilizzabile con DeepAR che ha il suo formato proprietario. Usiamo df.iloc[:, :8]<\/span> per mantenere solo le colonne originali senza quelle generate da Glue Schema. Generiamo una nuova colonna hour<\/strong> per velocizzare le cose, infine, dividiamo il set di dati in proporzioni 80\/20 per l'addestramento e il test.<\/p>\n\n\n\n Quindi riscriviamo temporaneamente i dati su S3 come richiesto da DeepAR, creando file JSON con serie al loro interno.<\/p>\n\n\n\n Generiamo un JSON in un formato simile a questo:<\/p>\n\n\n\n Dopodich\u00e9, possiamo scrivere i nostri file JSON su S3.<\/p>\n\n\n\n Usiamo sagemaker_session.upload_data ()<\/strong> per questo, passando il percorso di output. Successivamente, possiamo definire lo stimatore:<\/p>\n\n\n\n Passiamo la sessione SageMaker, l'immagine dell'algoritmo, il tipo di istanza e il percorso di output del modello. Abbiamo anche bisogno di configurare alcuni Iperparametri:<\/p>\n\n\n\n Questi valori sono presi direttamente dagli esempi AWS ufficiali su DeepAR. Dobbiamo anche passare i due canali, training e test, allo stimatore per avviare il \u201cprocesso di adattamento\u201d (fitting process<\/strong>).<\/p>\n\n\n\n Dopo il training e il test di un modello, \u00e8 possibile distribuirlo utilizzando un Real-time Predictor.<\/strong><\/p>\n\n\n\n Il predictor genera un endpoint visibile anche dalla console AWS.<\/p>\n\n\n\n L'endpoint pu\u00f2 essere chiamato da qualsiasi applicazione abilitata REST che passa una richiesta con un formato come quello di seguito:<\/p>\n\n\n\n I \u201ctargets\u201d sono dei valori di esempio a partire dal periodo impostato in \u201cstart\u201d dal quale si vuole generare la previsione.<\/p>\n\n\n\n Infine, se non abbiamo pi\u00f9 bisogno dell'endpoint, possiamo eliminarlo con:<\/p>\n\n\n\n L'inferenza in tempo reale si riferisce alla previsione fornita in tempo reale da alcuni modelli. Questo \u00e8 il tipico caso d'uso di molti sistemi di raccomandazione o generalmente quando la previsione \u00e8 ad uso singolo. Viene utilizzata quando:<\/p>\n\n\n\n In genere \u00e8 un p\u00f2 pi\u00f9 complessa da gestire rispetto a ci\u00f2 che abbiamo fatto nel Notebook ed \u00e8 tipicamente definita in una pipeline separata, a causa della sua natura di alta disponibilit\u00e0 e tempi di risposta rapidi.<\/p>\n\n\n\n Quando deployamo utilizzando l'API SageMaker \u00e8 possibile creare un processo di distribuzione molto simile a come viene rilasciata o aggiornata un'applicazione web, tenendo conto di cose come il reindirizzamento del traffico e le tecniche di distribuzione come Blue\/Green o Canary. Nota: attraverso le <\/em>production variants possiamo implementare diverse strategie di Deploy come A\/B e BLUE\/GREEN.<\/em><\/p>\n\n\n\n Deploy Blue \/ Green<\/strong><\/p>\n\n\n\n Reindirizziamo il traffico su Green. Se Green \u00e8 ok, con un altro UpdateEndpointApi<\/strong> cancelliamo il vecchio modello.<\/p>\n\n\n\n Deploy A \/ B<\/strong><\/p>\n\n\n\n Da utilizzare specificatamente se si vuole misurare le performance tra modelli rispetto ad una metrica di alto livello.<\/p>\n\n\n\n Alla fine escludiamo 1 o pi\u00f9 modelli (in questo caso 50\/50 uno dei due).<\/p>\n\n\n\n Nota: la propriet\u00e0 multi-modello per endpoint consente di gestire pi\u00f9 modelli contemporaneamente, la memoria della macchina viene gestita automaticamente in base al traffico. Questo approccio pu\u00f2 far risparmiare denaro grazie all'uso ottimizzato delle risorse.<\/em><\/p>\n\n\n\n In questo articolo abbiamo visto come sviluppare una pipeline utilizzando le risorse AWS, per acquisire dati da un dispositivo connesso all'ecosistema AWS tramite le funzionalit\u00e0 IoT Core. Per eseguire ETL abbiamo scelto AWS Glue Studio, dimostrando quanto facilmente possa essere configurato per creare un crawler per leggere, trasformare e reinserire i dati in S3, pronti per essere utilizzati per la definizione del modello.<\/p>\n\n\n\n Abbiamo visto come l'utilizzo di un set di dati archiviato in parquet sia migliore di uno in semplice formato CSV. Soprattutto ci siamo focalizzati sulle sue maggiori performance in fase di import\/export, per le query Athena e di come sia molto pi\u00f9 conveniente, in termini di prezzo di AWS S3, grazie alle dimensioni ridotte dei suoi file.<\/p>\n\n\n\n Abbiamo parlato di come SageMaker pu\u00f2 essere utilizzato out-of-the-box con il suo set di algoritmi preconfigurati, in particolare, abbiamo visto come implementare la previsione su un set di dati costituito da informazioni sull'inquinamento e sull'ambiente.<\/p>\n\n\n\n Infine, abbiamo visto come mettere in produzione un modello pronto per essere utilizzato, sfruttando l'API di SageMaker per creare una pipeline di distribuzione che tenga conto del problema Concept Drift, permettendo cos\u00ec frequenti aggiornamenti del modello in base all'evoluzione del set di dati nel tempo. Ci\u00f2 \u00e8 particolarmente vero per le serie temporali e i modelli di previsione, che migliorano man mano che il set di dati aumenta.<\/p>\n\n\n\n Siamo finalmente giunti alla fine del viaggio, sperando di farvi divertire e, naturalmente, di partire con qualcosa di utile su cui iniziare a lavorare. Come sempre sentiti libero di commentare dandoci le tue opinioni e idee. E i tuoi casi d'uso? Che tipo di dispositivi usi? Connettiti con noi e parlane!<\/p>\n\n\n\nIoT, Machine Learning e Data Transformation: concetti chiave<\/h2>\n\n\n\n
IoT<\/h3>\n\n\n\n
Machine Learning<\/h3>\n\n\n\n
Tutto ci\u00f2 acquista ancora pi\u00f9 rilevanza se lo si pensa in un contesto distribuito<\/strong> dove la previsione dovr\u00e0 scalare<\/strong>.<\/p>\n\n\n\nData Transformation<\/h3>\n\n\n\n
La Pipeline<\/h2>\n\n\n\n
<\/figure>\n\n\n\n
Per la fase di ingestion<\/strong>, i dati saranno raccolti dai dispositivi connessi utilizzando AWS IoT Core<\/strong>, un servizio che permette di connettere i dispositivi ad AWS senza dover gestire server o complessit\u00e0 di comunicazione<\/a>. I dati collezionati saranno poi inviati utilizzando il protocollo MQTT<\/a> per minimizzare il code da scrivere e la banda richiesta. Con IoT Core \u00e8 possibile anche gestire l\u2019autenticazione dei device<\/strong>.<\/p>\n\n\n\n
Infine, vedremo come utilizzare gli algoritmi di Amazon SageMaker, in particolare DeepAR,<\/strong> per \u201cistruire\u201d e deployare il modello per l\u2019inferenza.<\/p>\n\n\n\nIngestion: da IoT Core a Kinesis Firehose<\/h2>\n\n\n\n
AWS IoT Core<\/h3>\n\n\n\n
<\/figure>\n\n\n\n
const deviceModule = require('aws-iot-device-sdk').device;\nconst cmdLineProcess = require('aws-iot-device-sdk\/examples\/lib\/cmdline');\n\nprocessPollutionData = (args) => {\n\n \/\/ Device properties which are needed\n const device = deviceModule({\n keyPath: args.privateKey,\n certPath: args.clientCert,\n caPath: args.caCert,\n clientId: args.clientId,\n region: args.region,\n baseReconnectTimeMs: args.baseReconnectTimeMs,\n keepalive: args.keepAlive,\n protocol: args.Protocol,\n port: args.Port,\n host: args.Host,\n debug: args.Debug\n });\n\n const minimumDelay = 250; \/\/ ms\n const interval = Math.max(args.delay, minimumDelay);\n\n \/\/ Send device information\n setInterval(function() {\n \/\/ Prepare Data to be sent by the device\n const payload = {\n ozone: Math.round(Math.random() * 100),\n particullate_matter: Math.round(Math.random() * 100),\n carbon_monoxide: Math.round(Math.random() * 100),\n sulfure_dioxide: Math.round(Math.random() * 100),\n nitrogen_dioxide: Math.round(Math.random() * 100),\n longitude: 10.250786139881143,\n latitude: 56.20251117218925,\n timestamp: new Date()\n };\n\n device.publish('
Per testare il funzionamento di ci\u00f2 che abbiamo realizzato fin qui, accediamo alla console AWS IoT, entriamo nella sezione Test<\/strong> dalla sidebar sulla sinistra e inseriamo il nome del nostro topic. Clicchiamo su \u201cSubscribe to topic\u201d e, se tutto \u00e8 correttamente setuppato, dovremmo vedere qualcosa di simile a questo screenshot:<\/p>\n\n\n\n<\/figure>\n\n\n\n
Kinesis Firehose<\/h3>\n\n\n\n
Creiamo lo stream di Firehose<\/h3>\n\n\n\n
<\/figure>\n\n\n\n
Selezioniamo un nome valido in \u201cDelivery stream name\u201d, \u201cDirect PUT or other sources\u201d nella sezione \u201cSources\u201d e, nella pagina successiva, lasciamo tutto come da default. Convertiremo i dati in S3 pi\u00f9 tardi. Infine, nell\u2019ultima pagina, selezioniamo S3<\/strong> come destinazione e aggiungiamo eventualmente un prefisso ai dati inseriti nel bucket. Clicchiamo su \u201cNext\u201d per creare lo stream.<\/p>\n\n\n\nCreiamo la IoT Rule<\/h3>\n\n\n\n
<\/figure>\n\n\n\n
{\n \"topicRulePayload\": {\n \"sql\": \"SELECT * FROM '
<\/figure>\n\n\n\n
Apriamo uno dei file caricati nel bucket e\u2026 ecco i file generati dai nostri device!<\/p>\n\n\n\n<\/figure>\n\n\n\n
Datalake: S3<\/h2>\n\n\n\nProcesso di ETL: AWS Glue<\/h2>\n\n\n\n
<\/figure>\n\n\n\n
Attiviamo il Crawler appena creato, cliccando su \u201cRun crawler\u201d.<\/p>\n\n\n\n<\/figure>\n\n\n\n
Il prossimo step \u00e8 configurare un job di Glue Studio, utilizzando il \u201cCatalog\u201d come sorgente d\u2019ingresso dei dati.<\/p>\n\n\n\nETL job<\/h3>\n\n\n\n
Abbiamo bisogno di configurare tutti e tre i nodi per definire un crawler<\/strong> in grado di leggere e trasformare dati al volo.<\/p>\n\n\n\n<\/figure>\n\n\n\n
Il valore che inseriamo sar\u00e0 utilizzato come etichetta per il data source node nel grafico.
Scegliamo \u201cData source properties – S3\u201d nella scheda dettagli del nodo.<\/li>{ \n \"Version\": \"2012-10-17\", \n \"Statement\": [ \n { \n \"Effect\": \"Allow\", \n \"Principal\": { \"Service\": \"glue.amazonaws.com\" }, \n \"Action\": \"sts:AssumeRole\" \n } \n ]\n}\n<\/pre>\n\n\n\n
Ottimizzazione del dataset: perch\u00e9 parquet rispetto al CSV<\/h2>\n\n\n\n
Il parquet \u00e8 un formato colonnare altamente compresso, che utilizza l’algoritmo di distruzione e assemblaggio dei record, molto superiore al semplice appiattimento di namespace annidati. Esso presenta i seguenti vantaggi:<\/p>\n\n\n\nLa fase di machine learning: previsione con Amazon SageMaker<\/h2>\n\n\n\n
Siamo riusciti a mettere in piedi un semplice notebook, che illustra tutti i passaggi coinvolti nell’utilizzo di un algoritmo DeepAR preimpostato da AWS.<\/p>\n\n\n\nimport time\nimport io\nimport math\nimport random\nimport numpy as np\nimport pandas as pd\nimport JSON\nimport matplotlib.pyplot as plt\nimport boto3\nimport sagemaker\nfrom sagemaker import get_execution_role\n\n# set random seeds for reproducibility\nnp.random.seed(42)\nrandom.seed(42)\n<\/pre>\n\n\n\n
bucket = \"
from sagemaker.amazon.amazon_estimator import get_image_uri\nimage_uri = get_image_uri(boto3.Session().region_name, \"forecasting-deepar\")\n<\/pre>\n\n\n\n
freq = \"H\"\nprediction_length = 24\ncontext_length = 24 # usually prediction and context are set equal or similar\n<\/pre>\n\n\n\n
La previsione e la durata del contesto sono impostate su 1 giorno e indicano rispettivamente quante ore vogliamo prevedere in futuro e quante ore in passato utilizzeremo per la previsione.
Di solito, questi valori sono definiti in termini di giorni poich\u00e9 il dataset \u00e8 molto pi\u00f9 grande.<\/p>\n\n\n\n# Read single parquet file from S3\ndef pd_read_s3_parquet(key, bucket, s3_client=None, **args):\n if not s3_client:\n s3_client = boto3.client('s3')\n obj = s3_client.get_object(Bucket=bucket, Key=key)\n return pd.read_parquet(io.BytesIO(obj['Body'].read()), **args)\n\n# Read multiple parquets from a folder on S3 generated by spark\ndef pd_read_s3_multiple_parquets(filepath, bucket, **args):\n if not filepath.endswith('\/'):\n filepath = filepath + '\/' # Add '\/' to the end\n \n s3_client = boto3.client('s3') \n s3 = boto3.resource('s3')\n s3_keys = [item.key for item in s3.Bucket(bucket).objects.filter(Prefix=filepath)\n if item.key.endswith('.parquet')]\n if not s3_keys:\n print('No parquet found in', bucket, filepath)\n \n dfs = [pd_read_s3_parquet(key, bucket=bucket, s3_client=s3_client, **args) \n for key in s3_keys]\n return pd.concat(dfs, ignore_index=True)\n<\/pre>\n\n\n\n
# get all retrieved parquet in a single dataframe with helpers functions\ndf = pd_read_s3_multiple_parquets(data, bucket)\ndf = df.iloc[:, :8] # get only relevant columns\ndf['hour'] = pd.to_datetime(df['timestamp']).dt.hour #add hour column for the timeseries format\n\n# split in test and training\nmsk = np.random.rand(len(df)) < 0.8 # 80% mask\n\n# Dividing in test and training\ntraining_df = df[msk]\ntest_df = df[~msk]\n<\/pre>\n\n\n\n
# We need to resave our data in JSON because this is how DeepAR works\n# Note: we know this is redundant but is for the article to show how many ways \n# there are to transform dataset back and forth from when data is acquired\n\ntrain_key = 'deepar_training.json'\ntest_key = 'deepar_test.json'\n\n# Write data in DeepAR format\ndef writeDataset(filename, data): \n file=open(filename,'w')\n previous_hour = -1\n for hour in data['hour']:\n if not math.isnan(hour):\n if hour != previous_hour:\n previous_hour = hour\n # One JSON sample per line\n line = f\"\\\"start\\\":\\\"2021-02-05 {int(hour)}:00:00\\\",\\\"target\\\":{data[data['hour'] == hour]['ozone'].values.tolist()}\"\n file.write('{'+line+'}\\n')\n<\/pre>\n\n\n\n
{\"start\":\"2021-02-05 13:00:00\",\"target\":[69.0, 56.0, 2.0, \u2026]}<\/pre>\n\n\n\n
writeDataset(train_key, training_df) \nwriteDataset(test_key, test_df)\n\ntrain_prefix = 'model\/train'\ntest_prefix = 'model\/test'\n\ntrain_path = sagemaker_session.upload_data(train_key, bucket=bucket, key_prefix=train_prefix)\ntest_path = sagemaker_session.upload_data(test_key, bucket=bucket, key_prefix=test_prefix)\n<\/pre>\n\n\n\n
estimator = sagemaker.estimator.Estimator(\n sagemaker_session=sagemaker_session,\n image_uri=image_uri,\n role=role,\n instance_count=1,\n instance_type=\"ml.c4.xlarge\",\n base_job_name=\"pollution-deepar\",\n output_path=f\"s3:\/\/{s3_output_path}\",\n)\n<\/pre>\n\n\n\n
hyperparameters = {\n \"time_freq\": freq,\n \"context_length\": str(context_length),\n \"prediction_length\": str(prediction_length),\n \"num_cells\": \"40\",\n \"num_layers\": \"3\",\n \"likelihood\": \"gaussian\",\n \"epochs\": \"20\",\n \"mini_batch_size\": \"32\",\n \"learning_rate\": \"0.001\",\n \"dropout_rate\": \"0.05\",\n \"early_stopping_patience\": \"10\",\n}\n\nestimator.set_hyperparameters(**hyperparameters)\n<\/pre>\n\n\n\n
data_channels = {\"train\": train_path, \"test\": test_path}\nestimator.fit(inputs=data_channels)\n<\/pre>\n\n\n\n
# Deploy for real time prediction\njob_name = estimator.latest_training_job.name\n\nendpoint_name = sagemaker_session.endpoint_from_job(\n job_name=job_name,\n initial_instance_count=1,\n instance_type='ml.m4.xlarge',\n role=role\n)\n\npredictor = sagemaker.predictor.RealTimePredictor(\n endpoint_name, \n sagemaker_session=sagemaker_session, \n content_type=\"application\/json\")\n<\/pre>\n\n\n\n
<\/figure>\n\n\n\n
{\n \"instances\": [ \n {\n \"start\": \"2021-02-05 00:00:00\",\n \"target\": [88.3, 85.4, ...]\n }\n ],\n \"configuration\": {\n \"output_types\": [\"mean\", \"quantiles\", \"samples\"],\n \"quantiles\": [\"0.1\", \"0.9\"], \n \"num_samples\": 100\n }\n}\n<\/pre>\n\n\n\n
sagemaker_session.delete_endpoint(endpoint_name)<\/pre>\n\n\n\n
Inferenza in tempo reale: dall'idea alla produzione<\/h2>\n\n\n\n
Vogliamo condividere con voi una guida riassuntiva per entrambi i metodi da provare da soli!<\/p>\n\n\n\nCome deployare<\/h3>\n\n\n\n
<\/li><\/ul><\/li>
L\u2019infrastruttura Green<\/strong> \u00e8 aggiunta, qui \u00e8 dove possiamo fare synthetic testing<\/a>.<\/li><\/ol>\n\n\n\nReferenze<\/h2>\n\n\n\n
\n
Facciamo il punto<\/h2>\n\n\n\n
Abbiamo visto come leggere e archiviare in modo efficiente i dati mentre vengono elaborati dal dispositivo utilizzando Kinesis Data Firehose, che agisce come un flusso quasi in tempo reale, per generare il nostro datalake su S3.<\/p>\n\n\n\n