Monday, July 08, 2019

Kinesis/Firehose/Athena - Creating queryable data from logs

Amazon has a data transformation pipeline that allows log data to be queried with a SQL like syntax. This can be useful to gain insight that is buried in log data generally thought of as temporary. When was the last time you went over 6 months of logs? Right, just what I thought.

Wading through logs is painful and with the growth of data all the more so. No wonder that when confronted with the task of gleaning information from past data, engineers build specialized table structures with relational queries in mind or provision specialized map/reduce jobs to crunch over the data for detailed answers to specific questions.

But this time consuming exercise can be done away with by using the Amazon Kinesis pipeline. The flow looks something like this -> The application writes a JSON formatted record that captures a particular item of interest to a Kinesis data stream.  A Firehose instance is attached to the output of the data stream. This Firehose instance converts the data to a "JSON like" format and writes them into a S3 bucket at a specified folder. Another Amazon service Glue provides a crawler that can then process new files that get uploaded to the S3 bucket. The Glue crawler infers the schema from the JSON it finds in the S3 files and creates a Glue table. To query the data, Amazon provides yet another service - Athena, which sports a SQL syntax and a user friendly query console. Phew, yeah, it is quite the mother of all pipelines.

This is all pretty straightforward to set up starting from Kinesis console itself. You should start with the Data streams tab in Kinesis, create a data stream, then create a Kinesis Firehose with source equal to the data stream you just created. Specify that firehose data will be written with the API like so:



Since we are writing JSON to Kinesis, there is no need to convert record format, and we will use the data as is without transformation to Firehose, well more on this later, but we can leave the default settings for Source record transformation and Record format conversion

Finally you need to specify where you want this data to live, in our case S3:


Now head over to Glue and add a crawler. Specify the S3 folder that you used above for the "include path". For simplicity, I would start with a single schema for all S3 records, under "Grouping Behaviors".



Now head over to your favorite editor and let's write some code - finally!
It's up to you how you want to structure the code to do this. In the application I'm building, it is literally the logs that I want sent over to Kinesis. Anticipating this, I wrote a function that the app calls for writing logs, and this function was the ideal place to add in the write to Kinesis. It looks something like this:



That will be all to it, except there is an annoying bug in the pipeline that we need to work around. The issue is that Firehose writes "JSON like" data to S3 that is all a single line. The Glue crawler expects each record to exist in a single line. So when all the records are squished into a single line in S3, the crawler processes the first and throws away the rest. Imagine my surprise when only 1 out of 17 of my log records appeared in the Athena queries.

The workaround is to write a Lambda function with a Kinesis trigger. What this does is that every time a Kinesis record is written, the Lambda gets triggered. Well, that is not strictly true - Kinesis will batch a bunch of records and invoke the lambda once per batch. The batch size (or time for trigger) can be specified from the console.

Or if you are using serverless, this can be specified in the serverless.yml like so:



Without further ado, here's the Lambda that adds the newline:



This is written in node.js, and I used the serverless framework with the node.js template to write it. I'm exporting a single function named newlines. This is triggered when there is a batch of records in the Kinesis data stream. We map over the records, transforming each record by adding a new line. This is done in the add_new_line function.

To let the node engine know what we did, we use the callback. It is standard node.js to pass an error object for errors and null when there are no errors (we succeeded).

firehose.putBatchRecord is for efficiency - we could just as well have used firehose.PutRecord and the results would be the same besides throughput.

1 comment:

Henderson Literature Circles said...

Verry thoughtful blog