Monday, July 08, 2019

Kinesis/Firehose/Athena - Creating queryable data from logs

Amazon has a data transformation pipeline that allows log data to be queried with a SQL like syntax. This can be useful to gain insight that is buried in log data generally thought of as temporary. When was the last time you went over 6 months of logs? Right, just what I thought.

Wading through logs is painful and with the growth of data all the more so. No wonder that when confronted with the task of gleaning information from past data, engineers build specialized table structures with relational queries in mind or provision specialized map/reduce jobs to crunch over the data for detailed answers to specific questions.

But this time consuming exercise can be done away with by using the Amazon Kinesis pipeline. The flow looks something like this -> The application writes a JSON formatted record that captures a particular item of interest to a Kinesis data stream.  A Firehose instance is attached to the output of the data stream. This Firehose instance converts the data to a "JSON like" format and writes them into a S3 bucket at a specified folder. Another Amazon service Glue provides a crawler that can then process new files that get uploaded to the S3 bucket. The Glue crawler infers the schema from the JSON it finds in the S3 files and creates a Glue table. To query the data, Amazon provides yet another service - Athena, which sports a SQL syntax and a user friendly query console. Phew, yeah, it is quite the mother of all pipelines.

This is all pretty straightforward to set up starting from Kinesis console itself. You should start with the Data streams tab in Kinesis, create a data stream, then create a Kinesis Firehose with source equal to the data stream you just created. Specify that firehose data will be written with the API like so:



Since we are writing JSON to Kinesis, there is no need to convert record format, and we will use the data as is without transformation to Firehose, well more on this later, but we can leave the default settings for Source record transformation and Record format conversion

Finally you need to specify where you want this data to live, in our case S3:


Now head over to Glue and add a crawler. Specify the S3 folder that you used above for the "include path". For simplicity, I would start with a single schema for all S3 records, under "Grouping Behaviors".



Now head over to your favorite editor and let's write some code - finally!
It's up to you how you want to structure the code to do this. In the application I'm building, it is literally the logs that I want sent over to Kinesis. Anticipating this, I wrote a function that the app calls for writing logs, and this function was the ideal place to add in the write to Kinesis. It looks something like this:



That will be all to it, except there is an annoying bug in the pipeline that we need to work around. The issue is that Firehose writes "JSON like" data to S3 that is all a single line. The Glue crawler expects each record to exist in a single line. So when all the records are squished into a single line in S3, the crawler processes the first and throws away the rest. Imagine my surprise when only 1 out of 17 of my log records appeared in the Athena queries.

The workaround is to write a Lambda function with a Kinesis trigger. What this does is that every time a Kinesis record is written, the Lambda gets triggered. Well, that is not strictly true - Kinesis will batch a bunch of records and invoke the lambda once per batch. The batch size (or time for trigger) can be specified from the console.

Or if you are using serverless, this can be specified in the serverless.yml like so:



Without further ado, here's the Lambda that adds the newline:



This is written in node.js, and I used the serverless framework with the node.js template to write it. I'm exporting a single function named newlines. This is triggered when there is a batch of records in the Kinesis data stream. We map over the records, transforming each record by adding a new line. This is done in the add_new_line function.

To let the node engine know what we did, we use the callback. It is standard node.js to pass an error object for errors and null when there are no errors (we succeeded).

firehose.putBatchRecord is for efficiency - we could just as well have used firehose.PutRecord and the results would be the same besides throughput.

Monday, June 04, 2018

JSON to objects in a few languages

When working with data services, we often have a need to convert JSON strings to objects that model our data. Here is a list of code snippets in different languages that convert this Facebook Graph JSON data.

The list is presented in ascending order of verbosity. Predictably, Javascript most succinctly expresses its wishes, whereas Java uses a copious amount of code. Scala avoids some of that verbosity by using case classes that remove boilerplate code for constructors. Jackson (Java) requires getters and setters to identify which attributes of the object are to be serialized, causing code bloat.

JSON:
Javascript:
Ruby:
Python:
Scala:
Java:

Friday, April 20, 2018

Goldbach Conjecture

In the 18th century, two mathematicians came up with a conjecture - known by its original creator - named Goldbach conjecture. It says that any even number greater than 2 can be expressed as a sum of two primes. There is no theoretical proof for this yet, but it is said to hold up to 400 trillion.


A program to test Golbach conjecture for a given integer:

This program demonstrates two algorithms that are well known.


  1. The sieve of Eratosthenes to calculate all primes upto a given number 
  2. A linear algorithm to find if two numbers in a list sum to a given number.


To prove the Goldbach conjecture for a given n, we use the sieve to find all prime numbers up to n, then use the linear algorithm to find two primes from this list that sums up to n.


Friday, April 06, 2018

Timing with Jupyter notebook

Pieces of code can be timed within the Jupyter notebook using the %timeit magic.

Here is an example where a grid walk algorithm is implemented three times with progressively better run time, timed with %timeit and graphed using bokeh:

Code:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def num_paths(n):
    M = [[0] * n for i in range(n)]
    for i in range(n):
        M[n-1][i] = 1

    for r in range(n-2, -1, -1):
        for c in range(n-r-1, n):
            M[r][c] = M[r][c-1] + M[r+1][c]
    return M[0][n-1]

def num_paths_from(r, c, n, M):
    if M[r][c] > 0:
        return M[r][c]
    if r == 0 and c == n-1:
        return 1
    paths = ([(x,y) for (x,y) in 
              [(r-1, c), (r, c+1)] if y >= n-x-1 
                                   and y<n])
    npaths = 0
    for x,y in paths:
        npaths += num_paths_from(x,y,n,M)
    M[r][c] = npaths
    return npaths

def num_pathz_from(r, c, n):
    if r == 0 and c == n-1:
        return 1
    paths = ([(x,y) for (x,y) in 
              [(r-1, c), (r, c+1)] if y >= n-x-1 
                                   and y<n])
    npaths = 0
    for x,y in paths:
        npaths += num_pathz_from(x,y,n)
    return npaths

def num_paths_slow(n):
    M = [[0] * n for i in range(n)]
    return num_paths_from(n-1, 0, n, M)

def num_paths_super_slow(n):
    return num_pathz_from(n-1, 0, n)


for sz in range(5,15):
    %timeit num_paths(sz)
    %timeit num_paths_slow(sz)
    %timeit num_paths_super_slow(sz)

Timing:



 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
100000 loops, best of 3: 7.74 µs per loop
10000 loops, best of 3: 26.2 µs per loop
10000 loops, best of 3: 62.1 µs per loop
100000 loops, best of 3: 9.27 µs per loop
10000 loops, best of 3: 32.9 µs per loop
10000 loops, best of 3: 200 µs per loop
100000 loops, best of 3: 11.3 µs per loop
10000 loops, best of 3: 43 µs per loop
1000 loops, best of 3: 615 µs per loop
100000 loops, best of 3: 13.9 µs per loop
10000 loops, best of 3: 56.9 µs per loop
100 loops, best of 3: 2.05 ms per loop
100000 loops, best of 3: 16.6 µs per loop
10000 loops, best of 3: 70.9 µs per loop
100 loops, best of 3: 6.67 ms per loop
100000 loops, best of 3: 19.4 µs per loop
10000 loops, best of 3: 97.4 µs per loop
10 loops, best of 3: 23.7 ms per loop
10000 loops, best of 3: 22.1 µs per loop
10000 loops, best of 3: 105 µs per loop
10 loops, best of 3: 80.2 ms per loop
10000 loops, best of 3: 25.6 µs per loop
10000 loops, best of 3: 135 µs per loop
1 loop, best of 3: 287 ms per loop
10000 loops, best of 3: 29.8 µs per loop
10000 loops, best of 3: 149 µs per loop
1 loop, best of 3: 1.05 s per loop
10000 loops, best of 3: 32.7 µs per loop
10000 loops, best of 3: 171 µs per loop
1 loop, best of 3: 3.78 s per loop

Chart:


Code for the plot:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from bokeh.palettes import Spectral11
from bokeh.plotting import figure, show, output_file

p = figure(plot_width=300, plot_height=300)
slowest = [62,200,615,2050,6670,23700,80200,287000,1050000,3780000]
slower = [26,32,43,56,70,97,105,135,149,171]
fast = [7,9,11,13,16,19,22,25,29,32]
st = 5
end = 8
mypalette=Spectral11[0:3]
p.multi_line(xs=[list(range(st,end)), list(range(st,end)), list(range(st,end))], 
             ys=[slowest[:end-st], 
                 slower[:end-st],
                 fast[:end-st]
                ],
             line_color=mypalette,
             line_width=5
             )

show(p)

This shows how the algorithm with exponential time complexity deteriorates for higher values of n:

Now that I've shown you a bunch of performance numbers and visualization, if you are curious about the algorithm, it is a contrived example of finding the number of paths from one corner of a grid to another, here the squares to the north of the diagonal from top right to bottom left are out of bounds - that is, the path is restricted to the right of the diagonal. In this image, we show the problem for n = 5.



The exponential algorithm recursively finds the number of paths from each point to the end point (the top right corner). But since you can reach a single point by a number of paths (and this number increases exponentially with n), the same computation of finding the number of paths from this point to the grid corner is repeated, causing the slowdown.

The next improvement is to remember the number of paths once calculated. Say if we are on [4,2], we will calculate the path to the grid end from here and mark it in M[4][2]. Next time we are at [4,2], we no longer need to calculate again, as the result can be looked up from M[4][2].

The last algorithm uses dynamic programming to do even less work. It works based on the simple observation that a cell (i,j) can only be reached from just 2 cells. Those are the cell to its immediate left, (i,j-1) and the cell right below it, (i+1,j). Then there is just a single path from these two to (i,j). So if we know the number of paths to those two cells, we can add them up to find the number of paths to (i,j). Then we can keep calculating the paths to each cell, walking from bottom row up, going right on the columns and eventually, we will fill the cell at the top right (0, n -1).

Wednesday, April 04, 2018

Pandas snippets

Here are some useful snippets that can come in handy when cleaning data with pandas. This was useful for me in completing the coursework for python data science course.

Extract a subset of columns from the dataframe based on a regular expression:
Code:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
persona1 = pd.Series({
                        'Last Post On': '02/04/2017',
                        'Friends-2015': 10,
                        'Friends-2016': 20,
                        'Friends-2017': 300
})

persona2 = pd.Series({
                        'Last Post On': '02/04/2018',
                        'Friends-2015': 100,
                        'Friends-2016': 240,
                        'Friends-2017': 560
})

persona3 = pd.Series({
                        'Last Post On': '02/04/2014',
                        'Friends-2015': 120,
                        'Friends-2016': 120,
                        'Friends-2017': 120
})

df = pd.DataFrame([persona1, persona2, persona3], 
                  index=['Chris', 'Bella', 'Laura'])
df.filter(regex=("Friends-\d{4}"))

Output:
Friends-2015 Friends-2016 Friends-2017
Chris 10 20 300
Bella 100 240 560
Laura 120 120 120

Set a column based on the value of both the current row and adjacent rows:

For this example, we define regulars to the gym as those who have gone to the gym last year at least 3 months in a row:
Code:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import datetime
df = pd.DataFrame({'Month': 
                   [datetime.date(2008, i, 1).strftime('%B')
                             for i in range(1,13)] * 3, 
                   'visited': [False]*36},
                   index=['Alice']*12 + 
                         ['Bob']*12 + 
                         ['Bridgett']*12)

df = df.reset_index()

def make_regular(r, name):
    r['visited'] = (r['visited'] or (r['index'] == name) and 
                  ((r['Month'] == 'February') or
                   (r['Month'] == 'March') or
                   (r['Month'] == 'April')))
    return r

df = df.apply(make_regular, axis=1, args=('Alice',))
df = df.apply(make_regular, axis=1, args=('Bob',))
regular = ((df['visited'] == True) & 
          (df['visited'].shift(-1) == True) & 
          (df['visited'].shift(-2) == True))
df[regular]['index'].values .tolist()

Output:
1
['Alice', 'Bob']


Friday, March 23, 2018

Pushing your code to pypi



Here is a good document that describes how to push your code to the Pypi repository.

A URL has changed slightly. In your ~/.pypirc set the URL as follows:


[pypitest]
repository=https://test.pypi.org/legacy/

The register step is no longer required. All you need to do is upload the files.

python setup.py sdist upload -r pypitest

Each time you initiate an upload, you'd need to change the version number and the URL.

While this uploaded the package to test.pypi.org, the upload steps had changed for pypi.org:


thushara@ figleaf (master)$ python setup.py sdist upload -r pypi
/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/distutils/dist.py:267: UserWarning: Unknown distribution option: 'install_requires'
  warnings.warn(msg)
running sdist
running check
warning: sdist: manifest template 'MANIFEST.in' does not exist (using default file list)

warning: sdist: standard file not found: should have one of README, README.txt

writing manifest file 'MANIFEST'
creating figleaf-0.2
creating figleaf-0.2/figleaf
making hard links in figleaf-0.2...
hard linking setup.cfg -> figleaf-0.2
hard linking setup.py -> figleaf-0.2
hard linking figleaf/__init__.py -> figleaf-0.2/figleaf
hard linking figleaf/graph.py -> figleaf-0.2/figleaf
Creating tar archive
removing 'figleaf-0.2' (and everything under it)
running upload
Submitting dist/figleaf-0.2.tar.gz to https://pypi.python.org/pypi
Upload failed (410): Gone (This API has been deprecated and removed from legacy PyPI in favor of using the APIs available in the new PyPI.org implementation of PyPI (located at https://pypi.org/). For more information about migrating your use of this API to PyPI.org, please see https://packaging.python.org/guides/migrating-to-pypi-org/#uploading. For more information about the sunsetting of this API, please see https://mail.python.org/pipermail/distutils-sig/2017-June/030766.html)
error: Upload failed (410): Gone (This API has been deprecated and removed from legacy PyPI in favor of using the APIs available in the new PyPI.org implementation of PyPI (located at https://pypi.org/). For more information about migrating your use of this API to PyPI.org, please see https://packaging.python.org/guides/migrating-to-pypi-org/#uploading. For more information about the sunsetting of this API, please see https://mail.python.org/pipermail/distutils-sig/2017-June/030766.html)

To upload to pypi I used twine. Installing that on MacOS High Sierra required the removal of SIP.

In ~/.pypirc, I removed the repository line under [pypi]


python setup.py sdist

Remove old tars under dist, and

twine upload dist/*

Now I could see the project under pypi

Installing Twine on MacOS High Sierra


thushara@ wildhops (master)*$ sudo -H pip install twine
Password:
Collecting twine
  Downloading twine-1.11.0-py2.py3-none-any.whl
Collecting pkginfo>=1.4.2 (from twine)
  Downloading pkginfo-1.4.2-py2.py3-none-any.whl
Requirement already satisfied: setuptools>=0.7.0 in /System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python (from twine)
Collecting tqdm>=4.14 (from twine)
  Downloading tqdm-4.19.8-py2.py3-none-any.whl (52kB)
    100% |████████████████████████████████| 61kB 2.1MB/s 
Collecting requests-toolbelt>=0.8.0 (from twine)
  Downloading requests_toolbelt-0.8.0-py2.py3-none-any.whl (54kB)
    100% |████████████████████████████████| 61kB 1.6MB/s 
Requirement already satisfied: requests!=2.15,!=2.16,>=2.5.0 in /Library/Python/2.7/site-packages (from twine)
Installing collected packages: pkginfo, tqdm, requests-toolbelt, twine
Exception:
Traceback (most recent call last):
  File "/Library/Python/2.7/site-packages/pip/basecommand.py", line 215, in main
    status = self.run(options, args)
  File "/Library/Python/2.7/site-packages/pip/commands/install.py", line 342, in run
    prefix=options.prefix_path,
  File "/Library/Python/2.7/site-packages/pip/req/req_set.py", line 784, in install
    **kwargs
  File "/Library/Python/2.7/site-packages/pip/req/req_install.py", line 851, in install
    self.move_wheel_files(self.source_dir, root=root, prefix=prefix)
  File "/Library/Python/2.7/site-packages/pip/req/req_install.py", line 1064, in move_wheel_files
    isolated=self.isolated,
  File "/Library/Python/2.7/site-packages/pip/wheel.py", line 377, in move_wheel_files
    clobber(source, dest, False, fixer=fixer, filter=filter)
  File "/Library/Python/2.7/site-packages/pip/wheel.py", line 316, in clobber
    ensure_dir(destdir)
  File "/Library/Python/2.7/site-packages/pip/utils/__init__.py", line 83, in ensure_dir
    os.makedirs(path)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/os.py", line 150, in makedirs
    makedirs(head, mode)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/os.py", line 157, in makedirs
    mkdir(name, mode)
OSError: [Errno 1] Operation not permitted: '/System/Library/Frameworks/Python.framework/Versions/2.7/man'

The only way to get write access under /System is to boot into Recovery Mode and run this command on the Terminal:

csrutil disable


Reboot, install again