import boto3\r\nimport logging\r\n\r\n\r\nclass KinesisFirehoseDeliveryStreamHandler(logging.StreamHandler):\r\n\r\n def __init__(self):\r\n # By default, logging.StreamHandler uses sys.stderr if stream parameter is not specified\r\n logging.StreamHandler.__init__(self)\r\n\r\n self.__firehose = None\r\n self.__stream_buffer = []\r\n\r\n try:\r\n self.__firehose = boto3.client('firehose')\r\n except Exception:\r\n print('Firehose client initialization failed.')\r\n\r\n self.__delivery_stream_name = \"logging-test\"\r\n\r\n def emit(self, record):\r\n try:\r\n msg = self.format(record)\r\n\r\n if self.__firehose:\r\n self.__stream_buffer.append({\r\n 'Data': msg.encode(encoding=\"UTF-8\", errors=\"strict\")\r\n })\r\n else:\r\n stream = self.stream\r\n stream.write(msg)\r\n stream.write(self.terminator)\r\n\r\n self.flush()\r\n except Exception:\r\n self.handleError(record)\r\n\r\n def flush(self):\r\n self.acquire()\r\n\r\n try:\r\n if self.__firehose and self.__stream_buffer:\r\n self.__firehose.put_record_batch(\r\n DeliveryStreamName=self.__delivery_stream_name,\r\n Records=self.__stream_buffer\r\n )\r\n\r\n self.__stream_buffer.clear()\r\n except Exception as e:\r\n print(\"An error occurred during flush operation.\")\r\n print(f\"Exception: {e}\")\r\n print(f\"Stream buffer: {self.__stream_buffer}\")\r\n finally:\r\n if self.stream and hasattr(self.stream, \"flush\"):\r\n self.stream.flush()\r\n\r\n self.release()\r\n<\/pre>\nCome si pu\u00f2 notare, andando sullo specifico, l\u2019esempio mostra una classe, KinesisFirehoseDeliveryStreamHandler, che eredita il comportamento nativo della classe StreamHandler. I metodi di StreamHandler modificati per questo esempio sono emit e flush. \nIl metodo emit \u00e8 responsabile dell’invocazione del metodo di format, dell’aggiunta di record di log allo stream e del metodo di flush. La modalit\u00e0 di formattazione dei dati di log dipende dal tipo di formattatore configurato per il gestore. Indipendentemente dalla modalit\u00e0 di formattazione, i dati di log verranno aggiunti all’array __stream_buffer o, nel caso in cui qualcosa sia andato storto durante l’inizializzazione del client Firehose, al flusso predefinito, ovvero sys.stderr.<\/p>\n
Il metodo flush \u00e8 responsabile dello streaming dei dati direttamente nel delivery stream di Kinesis Data Firehose attraverso l’API put_record_batch. Una volta che i record vengono trasmessi in streaming sul Cloud, lo _stream_buffer locale verr\u00e0 cancellato. L’ultimo passaggio del metodo flush consiste nel flushing dello stream di default.<\/p>\n
Questa implementazione \u00e8 puramente illustrativa ma cionondimeno solida per cui ci si senta liberi di copiare e personalizzare lo snippet in base alle proprie esigenze specifiche.<\/p>\n
Dopo aver incluso KinesisFirehoseDeliveryStreamHandler nella propria codebase, si deve poi aggiungerlo alla configurazione dei logger. Vediamo come cambia la configurazione del dizionario precedente nell\u2019introdurre il nuovo gestore.<\/p>\n
config = {\r\n \"version\": 1,\r\n \"disable_existing_loggers\": False,\r\n \"formatters\": {\r\n \"standard\": {\r\n \"format\": \"%(asctime)s %(name)s %(levelname)s %(message)s\",\r\n \"datefmt\": \"%Y-%m-%dT%H:%M:%S%z\",\r\n },\r\n \"json\": {\r\n \"format\": \"%(asctime)s %(name)s %(levelname)s %(message)s\",\r\n \"datefmt\": \"%Y-%m-%dT%H:%M:%S%z\",\r\n \"class\": \"pythonjsonlogger.jsonlogger.JsonFormatter\"\r\n }\r\n },\r\n \"handlers\": {\r\n \"standard\": {\r\n \"class\": \"logging.StreamHandler\",\r\n \"formatter\": \"json\"\r\n },\r\n \"kinesis\": {\r\n \"class\": \"KinesisFirehoseDeliveryStreamHandler.KinesisFirehoseDeliveryStreamHandler\",\r\n \"formatter\": \"json\"\r\n }\r\n },\r\n \"loggers\": {\r\n \"\": {\r\n \"handlers\": [\"standard\", \"kinesis\"],\r\n \"level\": logging.INFO\r\n }\r\n }\r\n}\r\n<\/pre>\nPer includere il nuovo handler personalizzato nella propria configurazione, \u00e8 sufficiente aggiungere una voce “kinesis” al dizionario “handlers” e una voce “kinesis” nell’array “handlers” del logger di root.<\/p>\n
Nella voce “kinesis” del dizionario “handlers” dovremmo specificare la classe del handler personalizzato e il formatter utilizzato da quest\u2019ultimo per formattare i record di log.<\/p>\n
Aggiungendo una voce “kinesis” all’array “handlers” del logger di root, si sta indicando a quest\u2019ultimo di scrivere record di log sia in console che nel delivery stream di Kinesis Data Firehose.<\/p>\n
PS: il logger di root \u00e8 identificato da “” nella sezione “loggers”.<\/p>\n
Questo \u00e8 tutto ci\u00f2 che serve per la configurazione del producer dei dati di log di Kinesis Data Firehose. Concentriamoci ora sull’infrastruttura dietro l’API put_record_batch, quella utilizzata da KinesisFirehoseDeliveryStreamHandler per lo streaming dei record di log sul cloud. \nDietro le quinte dell\u2019API put_record_batch<\/p>\n
I componenti dell’architettura necessari per aggregare i record di log dell’applicazione e renderli disponibili e ricercabili da una dashboard centralizzata sono i seguenti:<\/p>\n
un delivery stream di Kinesis Data Firehose; \nun cluster del servizio Amazon Elasticsearch.<\/p>\n
Per creare un delivery stream di Kinesis Data Firehose, passiamo alla dashboard Kinesis della console di gestione AWS. Dal men\u00f9 a sinistra, selezioniamo Data Firehose. Una volta selezionato, dovremmo visualizzare un elenco di data stream presenti in una regione specifica del proprio account AWS. Per impostare un nuovo delivery stream, faremo clic sul pulsante Create delivery stream nell’angolo in alto a destra della pagina. \n \nNella procedura guidata di Create delivery stream ci verr\u00e0 chiesto di configurare l’origine del delivery stream, il processo di trasformazione, la destinazione e altre impostazioni come le autorizzazioni necessarie a Kinesis Data Firehose per caricare i dati di streaming nelle destinazioni specificate.<\/p>\n
Poich\u00e9 stiamo caricando i dati direttamente dal nostro logger tramite l\u2019SDK di boto3, dobbiamo scegliere Direct PUT or other sources come sorgente del delivery stream. \n \nLasciamo le opzioni \u201ctransform\u201d e \u201cconvert\u201d disabilitate in quanto non fondamentali ai risultati presentati in questo articolo.<\/p>\n
Il terzo step del wizard richiede di specificare le destinazioni del delivery stream. Assumendo che si sia gi\u00e0 creato un cluster di Amazon Elasticsearch nel proprio account AWS, questo verr\u00e0 specificato come destinazione primaria, indicando l\u2019index name di Elasticsearch, la rotation frequency, il mapping type e la retry duration, ovvero per quanto a lungo una richiesta fallita deve essere ritentata. \n \nCome destinazione secondaria del nostro delivery stream, imposteremo un bucket S3. Come gi\u00e0 accennato in precedenza, questo bucket conterr\u00e0 registri storici non soggetti alla logica di rotazione dell’indice di Elasticsearch. \n \nLasceremo disabilitate la compressione S3, la crittografia S3 e la registrazione degli errori e ci concentreremo sulle autorizzazioni. Quest’ultima sezione richiede di specificare o creare un nuovo ruolo IAM con una politica che consenta a Kinesis Data Firehose di trasmettere i dati alle destinazioni specificate. Facendo clic su Create new verremo guidati nella creazione di un ruolo IAM con il set di criteri di autorizzazione richiesto.<\/p>\n
Log record streaming test \nUna volta che il delivery stream \u00e8 creato, possiamo testare finalmente se il codice e l\u2019architettura sono stati correttamente integrati. Il seguente schema illustra gli attori in gioco: \n \nDalla propria macchina locale si andr\u00e0 a simulare una applicazione che andr\u00e0 a caricare i dati direttamente su un Kinesis Data Firehose delivery stream. Per questo test si andr\u00e0 ad utilizzare la configurazione a dizionario che gi\u00e0 include il KinesisFirehoseDeliveryStreamHandler.<\/p>\n
import logging.config\r\n\r\nconfig = {...}\r\n\r\nlogging.config.dictConfig(config)\r\nlogger = logging.getLogger(__name__)\r\n\r\n\r\ndef test():\r\n try:\r\n raise NameError(\"fake NameError\")\r\n except NameError as e:\r\n logger.error(e, exc_info=True)\r\n<\/pre>\nEseguendo il test, un nuovo record di log verr\u00e0 generato e scritto sia in console che sul delivery stream. \nDi seguito l\u2019output della console in fase di test:<\/p>\n
{\"asctime\": \"2020-05-11T14:44:44+0200\", \"name\": \"logging_test5\", \"levelname\": \"ERROR\", \"message\": \"fake NameError\", \"exc_info\": \"Traceback (most recent call last):\\n File \\\"\/Users\/ericvilla\/Projects\/logging-test\/src\/logging_test5.py\\\", line 42, in test\\n raise NameError(\\\"fake NameError\\\")\\nNameError: fake NameError\"}<\/pre>\nBeh, niente di nuovo. Ci\u00f2 che ci si aspetterebbe oltre all’output della console \u00e8 trovare anche il record di log nella nostra console di Kibana.<\/p>\n
Per consentire la ricerca e l’analisi dei record di log direttamente da Kibana, \u00e8 necessario creare un index pattern, utilizzato da Kibana per recuperare dati da specifici indici di Elasticsearch.<\/p>\n
Il nome che abbiamo dato all’indice Elasticsearch \u00e8 logging-test. Pertanto, gli indici verranno archiviati come logging-test-. Fondamentalmente, per far s\u00ec che Kibana recuperi i record di log da ciascun indice che inizi con logging-test-, si dovr\u00e0 definire il pattern di log logging-test- *. Se il nostro KinesisFirehoseDeliveryStreamHandler avr\u00e0 funzionato come previsto, l\u2019index pattern dovrebbe corrispondere ad un nuovo indice. \n \nPer filtrare i record di log per orario, possiamo usare la chiave asctime che il formatter JSON avr\u00e0 aggiunto a tale record. \n \nUna volta che L\u2019index pattern \u00e8 creato, possiamo finalmente ricercare ed analizzare i log direttamente dalla console di Kibana!<\/p>\n
\n\u00c8 possibile personalizzare ulteriormente l’esperienza di ricerca e analisi dei record di log, per eseguire il debug dell’applicazione in modo pi\u00f9 efficiente, aggiungendo filtri e creando dashboard.<\/p>\n
Detto tutto questo, qui si conclude il nostro viaggio alla scoperta del modulo di logging di Python, delle best practices e delle tecniche per aggregare log distribuiti. Speriamo vivamente che vi sia piaciuto leggere questo articolo e che abbiate potuto impare qualche nuovo trucco. Fino al prossimo articolo, state al sicuro \ud83d\ude42<\/p>\n
Leggi la Parte 1<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"In questa seconda parte del nostro viaggio alla scoperta dei segreti e delle best practice del logging in Python (se […]<\/p>\n","protected":false},"author":7,"featured_media":1445,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[477],"tags":[269,261,265,273],"class_list":["post-1443","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-cloud-native-development","tag-amazon-elasticsearch-service","tag-amazon-kinesis-data-firehose","tag-kibana","tag-python"],"yoast_head":"\n
Parte II: Best practice per il logging su Python e come integrarsi con la dashboard di Kibana tramite Amazon Kinesis Data Firehose e Amazon Elasticsearch Service - Proud2beCloud Blog<\/title>\n \n \n \n \n \n \n \n \n \n \n \n \n\t \n\t \n\t \n \n \n \n \n \n \n\t \n\t \n\t \n