import boto3\r\nimport logging\r\n\r\n\r\nclass KinesisFirehoseDeliveryStreamHandler(logging.StreamHandler):\r\n\r\n def __init__(self):\r\n # By default, logging.StreamHandler uses sys.stderr if stream parameter is not specified\r\n logging.StreamHandler.__init__(self)\r\n\r\n self.__firehose = None\r\n self.__stream_buffer = []\r\n\r\n try:\r\n self.__firehose = boto3.client('firehose')\r\n except Exception:\r\n print('Firehose client initialization failed.')\r\n\r\n self.__delivery_stream_name = \"logging-test\"\r\n\r\n def emit(self, record):\r\n try:\r\n msg = self.format(record)\r\n\r\n if self.__firehose:\r\n self.__stream_buffer.append({\r\n 'Data': msg.encode(encoding=\"UTF-8\", errors=\"strict\")\r\n })\r\n else:\r\n stream = self.stream\r\n stream.write(msg)\r\n stream.write(self.terminator)\r\n\r\n self.flush()\r\n except Exception:\r\n self.handleError(record)\r\n\r\n def flush(self):\r\n self.acquire()\r\n\r\n try:\r\n if self.__firehose and self.__stream_buffer:\r\n self.__firehose.put_record_batch(\r\n DeliveryStreamName=self.__delivery_stream_name,\r\n Records=self.__stream_buffer\r\n )\r\n\r\n self.__stream_buffer.clear()\r\n except Exception as e:\r\n print(\"An error occurred during flush operation.\")\r\n print(f\"Exception: {e}\")\r\n print(f\"Stream buffer: {self.__stream_buffer}\")\r\n finally:\r\n if self.stream and hasattr(self.stream, \"flush\"):\r\n self.stream.flush()\r\n\r\n self.release()\r\n<\/pre>\nAs you can see, and to be more specific, the provided example shows a class, the KinesisFirehoseDeliveryStreamHandler, that inherits the behavior of the native StreamHandler class. The StreamHandler\u2018s methods that were customized are emit and flush.<\/p>\n
The emit method is responsible for invoking the format method, adding log records to the stream, and invoking the flush method. How log data is formatted depends on the type of formatter configured for the handler. Regardless of how it is formatted, log data will be appended to the __stream_buffer array or, in case something went wrong during Firehose client\u2019s initialization, to the default stream, i.e. sys.stderr.<\/p>\n
The flush method is responsible for streaming data directly into the Kinesis Data Firehose delivery stream through the put_record_batch API. Once records are streamed to the Cloud, local _stream_buffer will be cleared. The last step of the flush method consists of flushing the default stream.<\/p>\n
This is an illustrative yet robust implementation that you are free to copy and tailor to your specific needs.<\/p>\n
Once you have included the KinesisFirehoseDeliveryStreamHandler in your codebase, you\u2019re ready to add it to the loggers\u2019 configuration. Let\u2019s see how the previous dictionary configuration changes to introduce the new handler.<\/p>\n
config = {\r\n \"version\": 1,\r\n \"disable_existing_loggers\": False,\r\n \"formatters\": {\r\n \"standard\": {\r\n \"format\": \"%(asctime)s %(name)s %(levelname)s %(message)s\",\r\n \"datefmt\": \"%Y-%m-%dT%H:%M:%S%z\",\r\n },\r\n \"json\": {\r\n \"format\": \"%(asctime)s %(name)s %(levelname)s %(message)s\",\r\n \"datefmt\": \"%Y-%m-%dT%H:%M:%S%z\",\r\n \"class\": \"pythonjsonlogger.jsonlogger.JsonFormatter\"\r\n }\r\n },\r\n \"handlers\": {\r\n \"standard\": {\r\n \"class\": \"logging.StreamHandler\",\r\n \"formatter\": \"json\"\r\n },\r\n \"kinesis\": {\r\n \"class\": \"KinesisFirehoseDeliveryStreamHandler.KinesisFirehoseDeliveryStreamHandler\",\r\n \"formatter\": \"json\"\r\n }\r\n },\r\n \"loggers\": {\r\n \"\": {\r\n \"handlers\": [\"standard\", \"kinesis\"],\r\n \"level\": logging.INFO\r\n }\r\n }\r\n}\r\n<\/pre>\nTo include the new custom handler to our configuration, it is enough to add a “kinesis” entry to the “handlers” dictionary and another one to the root logger\u2019s “handlers” array.<\/p>\n
In the “handlers” dictionary\u2019s “kinesis” entry we should specify the custom handler\u2019s class and the formatter used by the handler to format log records.<\/p>\n
By adding this entry to the root logger\u2019s “handlers” array, we are telling the root logger to write log records both in the console and in the Kinesis Data Firehose delivery stream.<\/p>\n
PS: the root logger is identified by “” in the “loggers” section. \nThat\u2019s all with the Kinesis Data Firehose log data producer configuration. Let\u2019s focus on the infrastructure behind the put_record_batch API, the one used by the KinesisFirehoseDeliveryStreamHandler to stream log records to the Cloud. \nBeyond Kinesis Data Firehose\u2019s put_record_batch API \nThe architecture components needed to aggregate your application\u2019s log records and make them available and searchable from a centralized dashboard are the following:<\/p>\n
a Kinesis Data Firehose delivery stream; \nan Amazon Elasticsearch Service cluster.<\/p>\n
To create a Kinesis Data Firehose delivery stream, we move to the AWS management console\u2019s Kinesis dashboard. From the left side menu, we select Data Firehose. Once selected, we should see a list of delivery streams present in a specific region of your AWS account. To set up a brand new delivery stream, we\u2019ll click on the Create delivery stream button in the top right corner of the page. \n \nIn the Create delivery stream wizard, we\u2019ll be asked to configure the delivery stream\u2019s source, transformation process, destination, and other settings like the permissions needed to Kinesis Data Firehose to load streaming data to the specified destinations.<\/p>\n
Since we\u2019re loading data directly from our logger through boto3 SDK, we have to choose Direct PUT or other sources as delivery stream\u2019s Source. \n \nWe\u2019ll leave \u201ctransform\u201d and \u201cconvert\u201d options disabled since they\u2019re not fundamental for the sake of this article.<\/p>\n
The third step of the wizard asks to specify the delivery stream\u2019s destinations. Assuming that we\u2019ve already created an Amazon Elasticsearch Service cluster in our AWS account, we set it as our primary destination, specifying the Elasticsearch Index name, rotation frequency, mapping type, and retry duration, i.e. how long a failed index request should be retried. \n \nAs a secondary destination of our delivery stream, we will set up an S3 bucket. As already mentioned before, this bucket will contain historical logs that are not subject to Elasticsearch index\u2019s rotation logic. \n \nWe will let S3 compression, S3 encryption, and error logging disabled and focus on the permissions. This last section requires us to specify or create a new IAM Role with a policy that allows Kinesis Data Firehose to stream data to the specified destinations. By clicking on Create new we\u2019ll be guided in the creation of an IAM Role with the required permissions policy set. \nLog records streaming test \nOnce the delivery stream is created, we can finally test if code and architecture were correctly integrated. The following scheme illustrates the actors in play: \n \nFrom our local machine, we\u2019re going to simulate an App component that loads log data directly to a Kinesis Data Firehose delivery stream. For this test, we will use the config dictionary that already includes the KinesisFirehoseDeliveryStreamHandler.<\/p>\n
import logging.config\r\n\r\nconfig = {...}\r\n\r\nlogging.config.dictConfig(config)\r\nlogger = logging.getLogger(__name__)\r\n\r\n\r\ndef test():\r\n try:\r\n raise NameError(\"fake NameError\")\r\n except NameError as e:\r\n logger.error(e, exc_info=True)\r\n<\/pre>\nRunning this test, a new log record will be generated and written either in the console and in the delivery stream. \nHere\u2019s the console output of the test:<\/p>\n
{\"asctime\": \"2020-05-11T14:44:44+0200\", \"name\": \"logging_test5\", \"levelname\": \"ERROR\", \"message\": \"fake NameError\", \"exc_info\": \"Traceback (most recent call last):\\n File \\\"\/Users\/ericvilla\/Projects\/logging-test\/src\/logging_test5.py\\\", line 42, in test\\n raise NameError(\\\"fake NameError\\\")\\nNameError: fake NameError\"}<\/pre>\nWell, nothing new. What we expect in addition to the console output is to find the log record in our Kibana console too.<\/p>\n
To enable search and analysis of log records from our Kibana console, we need to create an Index pattern, used by Kibana to retrieve data from specific Elasticsearch Indexes.<\/p>\n
The name we gave to the Elasticsearch index is logging-test. Therefore, indexes will be stored as logging-test-. Basically, to make Kibana retrieve log records from each Index that starts with logging-test-, we should define the Index pattern logging-test-*. If our KinesisFirehoseDeliveryStreamHandler worked as expected, the Index pattern should match a new Index. \n \nTo filter log records by time, we can use the asctime field that our JSON formatter added to the log record. \n \nOnce the Index pattern is created, we can finally search and analyze our application\u2019s log records from the Kibana console!<\/p>\n
\nIt is possible to further customize log records search and analysis experience to debug your application more efficiently, adding filters, and creating dashboards.<\/p>\n
With all being said, this concludes our coverage of Python\u2019s logging module, best practices, and log aggregation techniques. We hope you\u2019ve enjoyed reading through all of this information and maybe learned a few tricks. \nUntil the next article, stay safe \ud83d\ude42<\/p>\n
Read Part 1<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"In this second part of our journey (missed Part 1? Read it here!) covering the secrets and practices of Python\u2019s […]<\/p>\n","protected":false},"author":7,"featured_media":1446,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[478],"tags":[270,262,266,274],"class_list":["post-1418","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-cloud-native-development-en","tag-amazon-elasticsearch-service-en","tag-amazon-kinesis-data-firehose-en","tag-kibana-en","tag-python-en"],"yoast_head":"\n
Part II: Python logging best practices and how to integrate with Kibana Dashboard through AWS Kinesis Stream and Amazon ElasticSearch Service - Proud2beCloud Blog<\/title>\n \n \n \n \n \n \n \n \n \n \n \n \n\t \n\t \n\t \n \n \n \n \n \n \n\t \n\t \n\t \n