Saturday, November 30, 2019

Rails database migrations on a cluster with AWS CodeDeploy

A typical Ruby / Rails solution will comprise of a number of web servers behind a load balancer. Each of the web servers will read/write from a central database. In the course of new features being added to the application, the database schema goes through changes, what we refer to as "migrations".

When new code is deployed, the migrations that are needed for new code needs to be run first. If AWS Code Deploy is used for deployment of new code, we can set up the AfterInstall hooks to run the migrations before re-starting the web server.

So the usual flow in a deployment goes something like this:

  1. Stop the web server
  2. Migrate the database
  3. Start the web server

However, our application is hosted on a number of web servers. We don't want to bring down all servers at once. A typical blue/green deployment will have us deploy to just one third of the server fleet at once.

So if we have 27 web servers, we will be running the above steps on 9 of them at the same time. The main problem with this is that when the Rails migrate runs on multiple servers at once, it is likely to fail on a number of them. This is because Rails takes an advisory lock on the database and throws an exception on concurrent migrations. You can read more about the advisory locking here as well as a way to work around the problem.

But the solution is not without its drawbacks. If you prevent the migrations running on all but one machine, it is possible that the new code will be deployed sooner on those machines before the migration has finished. This is specially true for long running migrations. Then there is potential for new code to be running against an old database schema. New features that depend on the new schema will likely fail.

A better solution would be:
  1. Run the migration on a single instance - this could be one of the web servers, or a dynamically provisioned EC2 instance that can access the database.
  2. For all web servers
              2.1 Stop the server
              2.2 Deploy new code to it
              2.3 Start the server
The advantage of this solution is
  1. We side-step the concurrent migration issue. We run the migrations on a single instance and then do the rest of the deployment without incurring any database issues.
  2. We bring up the new code only after the database is migrated, so the new features work reliably from the start.
So the new database schema changes need to be backward compatible. But this is a general constraint we have anyway since on a blue/green deployment some part of the code is old and will be hitting the new database.

While this solution is pretty straightforward, it requires some effort to implement this in the AWS CodeDeploy environment.

What I ended up doing was to use a new Deployment Group (called staging) to bring up a single EC2 instance, change the start up code to only run the migration on that deployment group. Then I hooked this deployment group right after the deployment to a test instance, but before the code is deployed to the production servers.

In the startup code, we can check the current deployment group via ENV['DEPLOYMENT_GROUP_NAME']. In our scripts, we set the RAILS_ENV equal to the Deployment Group. This allows code to take different paths based on where it runs (in a local dev environment, a staging server or like in this case on a migrator server).

This is what our migrate script now looks like:


It is important to set the inequality, as we want the migrations to run on our test servers - we just don't want them running on the production web servers.

We add this to our database.yml, notice the environment is staging, to match the deployment group. Notice the database is the production instance.


In our case, we read credentials from AWS Secrets Manager. You don't have to.

This is how our staging step in CodePipleline looks like:


On the last CodeDeploy step, hit the edit button and set the Application Name and the Deployment Group correctly.



Now before the code is deployed to the production servers, the database migration has been completed on the staging instance. If the migration fails, CodeDeploy won't advance to the deploy stage. When the production servers start with new code, they will all use the new database schema.

After the migration has finished and before code is deployed, the old code will start using the new database schema. As long as the new schema is backward compatible, this will not cause a problem.

You may have to run the release pipline a few times till AWS co-operates with the new changes. But it should eventually start working.

Monday, July 08, 2019

Kinesis/Firehose/Athena - Creating queryable data from logs

Amazon has a data transformation pipeline that allows log data to be queried with a SQL like syntax. This can be useful to gain insight that is buried in log data generally thought of as temporary. When was the last time you went over 6 months of logs? Right, just what I thought.

Wading through logs is painful and with the growth of data all the more so. No wonder that when confronted with the task of gleaning information from past data, engineers build specialized table structures with relational queries in mind or provision specialized map/reduce jobs to crunch over the data for detailed answers to specific questions.

But this time consuming exercise can be done away with by using the Amazon Kinesis pipeline. The flow looks something like this -> The application writes a JSON formatted record that captures a particular item of interest to a Kinesis data stream.  A Firehose instance is attached to the output of the data stream. This Firehose instance converts the data to a "JSON like" format and writes them into a S3 bucket at a specified folder. Another Amazon service Glue provides a crawler that can then process new files that get uploaded to the S3 bucket. The Glue crawler infers the schema from the JSON it finds in the S3 files and creates a Glue table. To query the data, Amazon provides yet another service - Athena, which sports a SQL syntax and a user friendly query console. Phew, yeah, it is quite the mother of all pipelines.

This is all pretty straightforward to set up starting from Kinesis console itself. You should start with the Data streams tab in Kinesis, create a data stream, then create a Kinesis Firehose with source equal to the data stream you just created. Specify that firehose data will be written with the API like so:



Since we are writing JSON to Kinesis, there is no need to convert record format, and we will use the data as is without transformation to Firehose, well more on this later, but we can leave the default settings for Source record transformation and Record format conversion

Finally you need to specify where you want this data to live, in our case S3:


Now head over to Glue and add a crawler. Specify the S3 folder that you used above for the "include path". For simplicity, I would start with a single schema for all S3 records, under "Grouping Behaviors".



Now head over to your favorite editor and let's write some code - finally!
It's up to you how you want to structure the code to do this. In the application I'm building, it is literally the logs that I want sent over to Kinesis. Anticipating this, I wrote a function that the app calls for writing logs, and this function was the ideal place to add in the write to Kinesis. It looks something like this:



That will be all to it, except there is an annoying bug in the pipeline that we need to work around. The issue is that Firehose writes "JSON like" data to S3 that is all a single line. The Glue crawler expects each record to exist in a single line. So when all the records are squished into a single line in S3, the crawler processes the first and throws away the rest. Imagine my surprise when only 1 out of 17 of my log records appeared in the Athena queries.

The workaround is to write a Lambda function with a Kinesis trigger. What this does is that every time a Kinesis record is written, the Lambda gets triggered. Well, that is not strictly true - Kinesis will batch a bunch of records and invoke the lambda once per batch. The batch size (or time for trigger) can be specified from the console.

Or if you are using serverless, this can be specified in the serverless.yml like so:



Without further ado, here's the Lambda that adds the newline:



This is written in node.js, and I used the serverless framework with the node.js template to write it. I'm exporting a single function named newlines. This is triggered when there is a batch of records in the Kinesis data stream. We map over the records, transforming each record by adding a new line. This is done in the add_new_line function.

To let the node engine know what we did, we use the callback. It is standard node.js to pass an error object for errors and null when there are no errors (we succeeded).

firehose.putBatchRecord is for efficiency - we could just as well have used firehose.PutRecord and the results would be the same besides throughput.

Monday, June 04, 2018

JSON to objects in a few languages

When working with data services, we often have a need to convert JSON strings to objects that model our data. Here is a list of code snippets in different languages that convert this Facebook Graph JSON data.

The list is presented in ascending order of verbosity. Predictably, Javascript most succinctly expresses its wishes, whereas Java uses a copious amount of code. Scala avoids some of that verbosity by using case classes that remove boilerplate code for constructors. Jackson (Java) requires getters and setters to identify which attributes of the object are to be serialized, causing code bloat.

JSON:
Javascript:
Ruby:
Python:
Scala:
Java:

Friday, April 20, 2018

Goldbach Conjecture

In the 18th century, two mathematicians came up with a conjecture - known by its original creator - named Goldbach conjecture. It says that any even number greater than 2 can be expressed as a sum of two primes. There is no theoretical proof for this yet, but it is said to hold up to 400 trillion.


A program to test Golbach conjecture for a given integer:

This program demonstrates two algorithms that are well known.


  1. The sieve of Eratosthenes to calculate all primes upto a given number 
  2. A linear algorithm to find if two numbers in a list sum to a given number.


To prove the Goldbach conjecture for a given n, we use the sieve to find all prime numbers up to n, then use the linear algorithm to find two primes from this list that sums up to n.


Friday, April 06, 2018

Timing with Jupyter notebook

Pieces of code can be timed within the Jupyter notebook using the %timeit magic.

Here is an example where a grid walk algorithm is implemented three times with progressively better run time, timed with %timeit and graphed using bokeh:

Code:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def num_paths(n):
    M = [[0] * n for i in range(n)]
    for i in range(n):
        M[n-1][i] = 1

    for r in range(n-2, -1, -1):
        for c in range(n-r-1, n):
            M[r][c] = M[r][c-1] + M[r+1][c]
    return M[0][n-1]

def num_paths_from(r, c, n, M):
    if M[r][c] > 0:
        return M[r][c]
    if r == 0 and c == n-1:
        return 1
    paths = ([(x,y) for (x,y) in 
              [(r-1, c), (r, c+1)] if y >= n-x-1 
                                   and y<n])
    npaths = 0
    for x,y in paths:
        npaths += num_paths_from(x,y,n,M)
    M[r][c] = npaths
    return npaths

def num_pathz_from(r, c, n):
    if r == 0 and c == n-1:
        return 1
    paths = ([(x,y) for (x,y) in 
              [(r-1, c), (r, c+1)] if y >= n-x-1 
                                   and y<n])
    npaths = 0
    for x,y in paths:
        npaths += num_pathz_from(x,y,n)
    return npaths

def num_paths_slow(n):
    M = [[0] * n for i in range(n)]
    return num_paths_from(n-1, 0, n, M)

def num_paths_super_slow(n):
    return num_pathz_from(n-1, 0, n)


for sz in range(5,15):
    %timeit num_paths(sz)
    %timeit num_paths_slow(sz)
    %timeit num_paths_super_slow(sz)

Timing:



 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
100000 loops, best of 3: 7.74 µs per loop
10000 loops, best of 3: 26.2 µs per loop
10000 loops, best of 3: 62.1 µs per loop
100000 loops, best of 3: 9.27 µs per loop
10000 loops, best of 3: 32.9 µs per loop
10000 loops, best of 3: 200 µs per loop
100000 loops, best of 3: 11.3 µs per loop
10000 loops, best of 3: 43 µs per loop
1000 loops, best of 3: 615 µs per loop
100000 loops, best of 3: 13.9 µs per loop
10000 loops, best of 3: 56.9 µs per loop
100 loops, best of 3: 2.05 ms per loop
100000 loops, best of 3: 16.6 µs per loop
10000 loops, best of 3: 70.9 µs per loop
100 loops, best of 3: 6.67 ms per loop
100000 loops, best of 3: 19.4 µs per loop
10000 loops, best of 3: 97.4 µs per loop
10 loops, best of 3: 23.7 ms per loop
10000 loops, best of 3: 22.1 µs per loop
10000 loops, best of 3: 105 µs per loop
10 loops, best of 3: 80.2 ms per loop
10000 loops, best of 3: 25.6 µs per loop
10000 loops, best of 3: 135 µs per loop
1 loop, best of 3: 287 ms per loop
10000 loops, best of 3: 29.8 µs per loop
10000 loops, best of 3: 149 µs per loop
1 loop, best of 3: 1.05 s per loop
10000 loops, best of 3: 32.7 µs per loop
10000 loops, best of 3: 171 µs per loop
1 loop, best of 3: 3.78 s per loop

Chart:


Code for the plot:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from bokeh.palettes import Spectral11
from bokeh.plotting import figure, show, output_file

p = figure(plot_width=300, plot_height=300)
slowest = [62,200,615,2050,6670,23700,80200,287000,1050000,3780000]
slower = [26,32,43,56,70,97,105,135,149,171]
fast = [7,9,11,13,16,19,22,25,29,32]
st = 5
end = 8
mypalette=Spectral11[0:3]
p.multi_line(xs=[list(range(st,end)), list(range(st,end)), list(range(st,end))], 
             ys=[slowest[:end-st], 
                 slower[:end-st],
                 fast[:end-st]
                ],
             line_color=mypalette,
             line_width=5
             )

show(p)

This shows how the algorithm with exponential time complexity deteriorates for higher values of n:

Now that I've shown you a bunch of performance numbers and visualization, if you are curious about the algorithm, it is a contrived example of finding the number of paths from one corner of a grid to another, here the squares to the north of the diagonal from top right to bottom left are out of bounds - that is, the path is restricted to the right of the diagonal. In this image, we show the problem for n = 5.



The exponential algorithm recursively finds the number of paths from each point to the end point (the top right corner). But since you can reach a single point by a number of paths (and this number increases exponentially with n), the same computation of finding the number of paths from this point to the grid corner is repeated, causing the slowdown.

The next improvement is to remember the number of paths once calculated. Say if we are on [4,2], we will calculate the path to the grid end from here and mark it in M[4][2]. Next time we are at [4,2], we no longer need to calculate again, as the result can be looked up from M[4][2].

The last algorithm uses dynamic programming to do even less work. It works based on the simple observation that a cell (i,j) can only be reached from just 2 cells. Those are the cell to its immediate left, (i,j-1) and the cell right below it, (i+1,j). Then there is just a single path from these two to (i,j). So if we know the number of paths to those two cells, we can add them up to find the number of paths to (i,j). Then we can keep calculating the paths to each cell, walking from bottom row up, going right on the columns and eventually, we will fill the cell at the top right (0, n -1).

Wednesday, April 04, 2018

Pandas snippets

Here are some useful snippets that can come in handy when cleaning data with pandas. This was useful for me in completing the coursework for python data science course.

Extract a subset of columns from the dataframe based on a regular expression:
Code:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
persona1 = pd.Series({
                        'Last Post On': '02/04/2017',
                        'Friends-2015': 10,
                        'Friends-2016': 20,
                        'Friends-2017': 300
})

persona2 = pd.Series({
                        'Last Post On': '02/04/2018',
                        'Friends-2015': 100,
                        'Friends-2016': 240,
                        'Friends-2017': 560
})

persona3 = pd.Series({
                        'Last Post On': '02/04/2014',
                        'Friends-2015': 120,
                        'Friends-2016': 120,
                        'Friends-2017': 120
})

df = pd.DataFrame([persona1, persona2, persona3], 
                  index=['Chris', 'Bella', 'Laura'])
df.filter(regex=("Friends-\d{4}"))

Output:
Friends-2015 Friends-2016 Friends-2017
Chris 10 20 300
Bella 100 240 560
Laura 120 120 120

Set a column based on the value of both the current row and adjacent rows:

For this example, we define regulars to the gym as those who have gone to the gym last year at least 3 months in a row:
Code:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import datetime
df = pd.DataFrame({'Month': 
                   [datetime.date(2008, i, 1).strftime('%B')
                             for i in range(1,13)] * 3, 
                   'visited': [False]*36},
                   index=['Alice']*12 + 
                         ['Bob']*12 + 
                         ['Bridgett']*12)

df = df.reset_index()

def make_regular(r, name):
    r['visited'] = (r['visited'] or (r['index'] == name) and 
                  ((r['Month'] == 'February') or
                   (r['Month'] == 'March') or
                   (r['Month'] == 'April')))
    return r

df = df.apply(make_regular, axis=1, args=('Alice',))
df = df.apply(make_regular, axis=1, args=('Bob',))
regular = ((df['visited'] == True) & 
          (df['visited'].shift(-1) == True) & 
          (df['visited'].shift(-2) == True))
df[regular]['index'].values .tolist()

Output:
1
['Alice', 'Bob']


Friday, March 23, 2018

Pushing your code to pypi



Here is a good document that describes how to push your code to the Pypi repository.

A URL has changed slightly. In your ~/.pypirc set the URL as follows:


[pypitest]
repository=https://test.pypi.org/legacy/

The register step is no longer required. All you need to do is upload the files.

python setup.py sdist upload -r pypitest

Each time you initiate an upload, you'd need to change the version number and the URL.

While this uploaded the package to test.pypi.org, the upload steps had changed for pypi.org:


thushara@ figleaf (master)$ python setup.py sdist upload -r pypi
/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/distutils/dist.py:267: UserWarning: Unknown distribution option: 'install_requires'
  warnings.warn(msg)
running sdist
running check
warning: sdist: manifest template 'MANIFEST.in' does not exist (using default file list)

warning: sdist: standard file not found: should have one of README, README.txt

writing manifest file 'MANIFEST'
creating figleaf-0.2
creating figleaf-0.2/figleaf
making hard links in figleaf-0.2...
hard linking setup.cfg -> figleaf-0.2
hard linking setup.py -> figleaf-0.2
hard linking figleaf/__init__.py -> figleaf-0.2/figleaf
hard linking figleaf/graph.py -> figleaf-0.2/figleaf
Creating tar archive
removing 'figleaf-0.2' (and everything under it)
running upload
Submitting dist/figleaf-0.2.tar.gz to https://pypi.python.org/pypi
Upload failed (410): Gone (This API has been deprecated and removed from legacy PyPI in favor of using the APIs available in the new PyPI.org implementation of PyPI (located at https://pypi.org/). For more information about migrating your use of this API to PyPI.org, please see https://packaging.python.org/guides/migrating-to-pypi-org/#uploading. For more information about the sunsetting of this API, please see https://mail.python.org/pipermail/distutils-sig/2017-June/030766.html)
error: Upload failed (410): Gone (This API has been deprecated and removed from legacy PyPI in favor of using the APIs available in the new PyPI.org implementation of PyPI (located at https://pypi.org/). For more information about migrating your use of this API to PyPI.org, please see https://packaging.python.org/guides/migrating-to-pypi-org/#uploading. For more information about the sunsetting of this API, please see https://mail.python.org/pipermail/distutils-sig/2017-June/030766.html)

To upload to pypi I used twine. Installing that on MacOS High Sierra required the removal of SIP.

In ~/.pypirc, I removed the repository line under [pypi]


python setup.py sdist

Remove old tars under dist, and

twine upload dist/*

Now I could see the project under pypi