Democratize data access through a self-service Data Platform – Part 1
11 February 2025 - 8 min. read

Matteo Goretti
DevOps Engineer
import boto3 import logging class KinesisFirehoseDeliveryStreamHandler(logging.StreamHandler): def __init__(self): # By default, logging.StreamHandler uses sys.stderr if stream parameter is not specified logging.StreamHandler.__init__(self) self.__firehose = None self.__stream_buffer = [] try: self.__firehose = boto3.client('firehose') except Exception: print('Firehose client initialization failed.') self.__delivery_stream_name = "logging-test" def emit(self, record): try: msg = self.format(record) if self.__firehose: self.__stream_buffer.append({ 'Data': msg.encode(encoding="UTF-8", errors="strict") }) else: stream = self.stream stream.write(msg) stream.write(self.terminator) self.flush() except Exception: self.handleError(record) def flush(self): self.acquire() try: if self.__firehose and self.__stream_buffer: self.__firehose.put_record_batch( DeliveryStreamName=self.__delivery_stream_name, Records=self.__stream_buffer ) self.__stream_buffer.clear() except Exception as e: print("An error occurred during flush operation.") print(f"Exception: {e}") print(f"Stream buffer: {self.__stream_buffer}") finally: if self.stream and hasattr(self.stream, "flush"): self.stream.flush() self.release()As you can see, and to be more specific, the provided example shows a class, the KinesisFirehoseDeliveryStreamHandler, that inherits the behavior of the native StreamHandler class. The StreamHandler‘s methods that were customized are emit and flush.The emit method is responsible for invoking the format method, adding log records to the stream, and invoking the flush method. How log data is formatted depends on the type of formatter configured for the handler. Regardless of how it is formatted, log data will be appended to the __stream_buffer array or, in case something went wrong during Firehose client’s initialization, to the default stream, i.e. sys.stderr.The flush method is responsible for streaming data directly into the Kinesis Data Firehose delivery stream through the put_record_batch API. Once records are streamed to the Cloud, local _stream_buffer will be cleared. The last step of the flush method consists of flushing the default stream.This is an illustrative yet robust implementation that you are free to copy and tailor to your specific needs.Once you have included the KinesisFirehoseDeliveryStreamHandler in your codebase, you’re ready to add it to the loggers’ configuration. Let’s see how the previous dictionary configuration changes to introduce the new handler.
config = { "version": 1, "disable_existing_loggers": False, "formatters": { "standard": { "format": "%(asctime)s %(name)s %(levelname)s %(message)s", "datefmt": "%Y-%m-%dT%H:%M:%S%z", }, "json": { "format": "%(asctime)s %(name)s %(levelname)s %(message)s", "datefmt": "%Y-%m-%dT%H:%M:%S%z", "class": "pythonjsonlogger.jsonlogger.JsonFormatter" } }, "handlers": { "standard": { "class": "logging.StreamHandler", "formatter": "json" }, "kinesis": { "class": "KinesisFirehoseDeliveryStreamHandler.KinesisFirehoseDeliveryStreamHandler", "formatter": "json" } }, "loggers": { "": { "handlers": ["standard", "kinesis"], "level": logging.INFO } } }To include the new custom handler to our configuration, it is enough to add a "kinesis" entry to the "handlers" dictionary and another one to the root logger’s "handlers" array.In the "handlers" dictionary’s "kinesis" entry we should specify the custom handler’s class and the formatter used by the handler to format log records.By adding this entry to the root logger’s "handlers" array, we are telling the root logger to write log records both in the console and in the Kinesis Data Firehose delivery stream.PS: the root logger is identified by "" in the "loggers" section.That’s all with the Kinesis Data Firehose log data producer configuration. Let’s focus on the infrastructure behind the put_record_batch API, the one used by the KinesisFirehoseDeliveryStreamHandler to stream log records to the Cloud.Beyond Kinesis Data Firehose’s put_record_batch APIThe architecture components needed to aggregate your application’s log records and make them available and searchable from a centralized dashboard are the following:a Kinesis Data Firehose delivery stream;an Amazon Elasticsearch Service cluster.To create a Kinesis Data Firehose delivery stream, we move to the AWS management console’s Kinesis dashboard. From the left side menu, we select Data Firehose. Once selected, we should see a list of delivery streams present in a specific region of your AWS account. To set up a brand new delivery stream, we’ll click on the Create delivery stream button in the top right corner of the page.
import logging.config config = {...} logging.config.dictConfig(config) logger = logging.getLogger(__name__) def test(): try: raise NameError("fake NameError") except NameError as e: logger.error(e, exc_info=True)Running this test, a new log record will be generated and written either in the console and in the delivery stream.Here’s the console output of the test:
{"asctime": "2020-05-11T14:44:44+0200", "name": "logging_test5", "levelname": "ERROR", "message": "fake NameError", "exc_info": "Traceback (most recent call last):\n File \"/Users/ericvilla/Projects/logging-test/src/logging_test5.py\", line 42, in test\n raise NameError(\"fake NameError\")\nNameError: fake NameError"}Well, nothing new. What we expect in addition to the console output is to find the log record in our Kibana console too.To enable search and analysis of log records from our Kibana console, we need to create an Index pattern, used by Kibana to retrieve data from specific Elasticsearch Indexes.The name we gave to the Elasticsearch index is logging-test. Therefore, indexes will be stored as logging-test-. Basically, to make Kibana retrieve log records from each Index that starts with logging-test-, we should define the Index pattern logging-test-*. If our KinesisFirehoseDeliveryStreamHandler worked as expected, the Index pattern should match a new Index.