Wednesday, January 10, 2024

Ruby lazy collections and with_index


The lazy collections don't quite give us a way to use `with_index` , but, we can use `each_with_index`.

It took me a moment to figure out the problem was with the collection being lazy.

Thursday, August 25, 2022

Using Redis for web request counting



Problem Statement

With the growth of our user base and increasing traffic to the web servers, we wanted to come up with some realtime counters to measure traffic.

We wanted the system to give us an idea of:

  • The total web requests the site is receiving each minute
  • These totals aggregated hourly and displayed for the day (24 entries)
  • The counts broken up by the specific request path
  • The number of unique visitors broken up by the minute, hour and day

Limitations of available solutions

We use Heroku for deploying the app. The Heroku dashboard provides us a rough idea of the request counts, but it is difficult to incorporate that into our own operational dashboards. Also Heroku doesn't break down the counts by the request path, which was important to us in understanding the source of traffic and how we could scale resources based on the source. Unique visitor counts are similarly an important metric not readily available.

Counters help scale the app

Breaking down the traffic by the source was important in coming up with a way to efficiently scale our service. Currently most of our application sits in a single service. So scaling the app means we add machines to this single fleet, even though only requests from one or two sources really benefit from this added capacity.

We have 3 main sources of traffic:
  1. Our users hit the service for various features provided from their mobile app
  2. Our users send us their geo location from the mobile app
  3. We receive near real-time data of our users gig work from a third party
Our primary application performance goal is providing a sub second experience to users as they use our mobile app; thus mainly, we want to optimize resourcing on the backend with a focus on 1.

However, we get so much more traffic from 2. and 3. which consume most of the server bandwidth. Keeping all three services as a single service degrades the experience for the user.

Mind you, 2. and 3. do not have a real time processing requirement. While a web server is needed to accept these requests, the actual processing is handled by an asynchronous worker outside of the web server.

But still, since there are so many of these web requests, for the few milliseconds each of them sit on the web server, it takes away bandwidth from the user requests.

Why Redis?

Redis provides the ideal data structures for counting requests with minimal overhead.  (And must I say that it is fast, as in really, really fast) A few keys can be used to keep a running total for each minute per request type, then a sorted set can be used to aggregate the last N minutes of requests. (For our purposes, we decided to keep 60 counts, thus providing a picture of activity for the last hour, but you can choose to keep counts for longer than that.) 

The same idea can be extended to measure days worth of traffic broken by the hour.

Choice of the Sorted Set

Why did we decide on the sorted set for aggregating the counters? Well, the sorted set allows us to have the counters sorted by time. This way, we can quickly get the list of counts for the last hour ordered from minute 1 down. To be fair, it is a bit overkill to use a set for this, as we are never going to be mutating an element (since the timestamp is always increasing), but it does suit our purposes just fine!

Before going any further, let us briefly recap the salient features of the sorted set. It allows us to insert elements along with a score, and the elements are sorted in real time by the score. It scales really well for even millions (or more) of elements as each insert operation takes O(log(n)) time -- much like a binary tree. While we do not need that level of scale, one can think of keeping extremely granular counts for a long period of time, which could come in handy for debugging bizzare performance problems after the fact!

We can use the timestamp as the score. Redis will then always sort the set by the timestamp. This has the advantage that if you wanted to change the counter later (imagine a design where you quickly provide a rough estimate of the count, but later do a second pass to provide exact counts), you can simply add a new count to the set with the same timestamp and the position of the element will not change.

The counters will need to be reset at the start of the minute. I first made the mistake of using the expiry time of the key set to 1 minute, but realized that this introduces a race at the point of aggregating the count on to the sorted set. Which is that we may be unlucky that before the aggregation, redis could have expired the key, resulting in a substantial undercount in the set. (This was a difficult bug to track down, and of course the most obvious, I had a face-palm moment as you can imagine.)

There is a slight difficulty we need to work around here w.r.t the sorted set. If we keep the count as the element in the set, a count that happens to be the same as one already stored will replace the previous count (with the score modified). Since we are using the timestamp as the score, this will essentially remove the entry we had for the earlier timestamp. This is how sets work after all - it is a data structure suited for keeping unique members. But we can easily overcome this by prepending the timestamp of the count to the count and storing that as the element of the set. To read the count, we merely have to split the element by the delimiter (we used the colon here for the delimited, which is somewhat of a standard in redis for splitting keys), and use the second part.

A look at the running counters

A ZRANGE command can retrieve the counts like so:

























Counting the unique visitors is only slightly more involved. First we need to keep a regular Redis set and update it for each request. In our case, the user id is encoded in the request header, we decode it and add it to the Redis set. Now if the same user visits the site again, since we are using a set, we will not add it again, and we still have just one element in the set. This way, we can take the length of the set at any point and know how many unique visitors we have from the time we started writing to the set.

The only thing left to do is, create the set at the start of the time interval we need to measure the count, and reset it at the end of the time interval. So we can set this up to reset every minute for a minute by minute count of unique visitors. Then we can use the infrastructure we built above to aggregate the count over to the sorted set, so we have a running count of unique visitors for the past N minutes.

(You may have a different technique for figuring out the ID of the user making the request. Once the ID is extracted, you can use a Redis set to keep track of the visit.)

Here is how we see the unique visitor count changing dynamically:







We can just as easily use another excellent Redis command to see all the user ids in this set. Here is a snippet in our case :





Implementation

We implemented the counters using Ruby, with the redis gem as our client. This involves several steps:
  1. Initializing the counters
  2. Resetting the counters at minute, hour, day intervals
  3. Incrementing the appropriate counters for each request
  4. Aggregating the count onto the set
The first two steps can be combined. We used a scheduler that sits within the app via the ruby clock gem. Heroku allows provisioning a single process that runs a scheduler based on the schedule we set via the ruby clock. This is pretty similar to how one would use cron on a Unix machine to schedule a task.

Heroku does provide a scheduler as well. We did not use it as it does not have the same reliability guarantees as the ruby clock gem. I have seen cases where the Heroku scheduler does not fire and fires very late, all documented behaviors.

Since we use Rails for our app, we utilized its framework built on top of controllers to track request counts.

A controller encapsulates serving requests for a specific web route (think of this as having one controller for yoursite.com/user/login and another controller for yoursite.com/reports/account). Now each controller is a subclass of a class we implement called ApplicationController which itself is a subclass of the Rails class ActionController::Base.

Rails allows us to hook all requests at the ApplicationController level with a simple before_action hook. We implemented the request counting using this hook, and it looks something like this:

class ApplicationController < ActionController::Base before_action :update_usage_counters def update_usage_counters PerfCounters.increment(user_id: current_user&.id, request_path: request&.fullpath) end end

Now each request goes through update_usage_counters, which delegates the work to the PerfCounters class we wrote. request is an object provided by the Rails routing framework, and request.fullpath contains the URL. The method current_user (not shown) extracts the logged in user's ID from the request headers.

I will reproduce pieces of a simplified version of PerfCounters that will illustrate the logic:

The incrementing logic looks like this:
class PerfCounters def self.increment(user_id:, request_path:) $redis.pipelined do |pipeline| if user_id.present? pipeline.incr('USER_REQUESTS_MINUTE') pipeline.sadd('UNIQUE_VISITORS_MINUTE', user_id) if request_path&.include?("/geo/send") pipeline.incr('GEO_REQUESTS_MINUTE') end else pipeline.incr('OTHER_REQUESTS_MINUTE') end end end end

Notice that a request made on behalf of a logged in user will have user_id parameter set. The request_path is the path of the URL, and we use it here to separate the counts made to track the location of the user.

Another neat redis feature we use here is pipelining. The idea is that if we need to make a number of independent requests to redis, we can open a socket to the redis server and send all that data and close the socket at the end. Redis server will return an array of replies in order after it processes all requests. This is a powerful feature that is more efficient than creating a socket for each separate request. It is not without cost - as the server has to buffer each request, technically blocking the request thread from processing other requests. The rule of thumb is to make sure that each request is pretty fast - O(1) would be ideal, and to not pipeline too many requests in a single call. As with everything, you must test this against all the other traffic you serve and compromise if you must!

Also notice that we are demonstrating the use of three counters USER_REQUESTS_MINUTE, GEO_REQUESTS_MINUTE and OTHER_REQUESTS_MINUTE, alongside a set called UNIQUE_VISITORS_MINUTE. This last one actually keeps the user ids of all visitors. The sadd command adds the visitor id to the set upon the first time we see them.

The ruby clock gem takes its inputs via a file named Clockfile. This is in fact a file that uses ruby syntax, i:e it is evaluated by the ruby interpreter. All we do is define the aggregator to run every minute, like so:

schedule.cron '* * * * *' do PerfCounters.aggregate_minute end
This is what the minute aggregation looks like:
def self.aggregate_minute tm_obj = Time.current - 1.minute # aggregate last minute's stats tm = tm_obj.to_i # get all current minute counters, add them to the hour list before zeroing them out user_rpm, other_rpm, geo_rpm, unique_visitors_last_minute = $redis.pipelined do |pipeline| pipeline.get('USER_REQUESTS_MINUTE') pipeline.get('OTHER_REQUESTS_MINUTE') pipeline.get('GEO_REQUESTS_MINUTE') pipeline.scard('UNIQUE_VISITORS_MINUTE') end $redis.pipelined do |pipeline| # ZADD key score value : keep timestamp as score so we get the counters sorted by time # append the timestamp to the counter to make sure entries don't overwrite. pipeline.zadd('USER_REQUESTS_LAST_HOUR', tm, "#{user_rpm}:#{tm}") pipeline.zadd('OTHER_REQUESTS_LAST_HOUR', tm, "#{other_rpm}:#{tm}") pipeline.zadd('GEO_REQUESTS_LAST_HOUR', tm, "#{geo_rpm}:#{tm}") pipeline.zadd('UNIQUE_VISITORS_LAST_HOUR', tm, "#{unique_visitors_last_minute}:#{tm}") pipeline.del('USER_REQUESTS_MINUTE') pipeline.del('OTHER_REQUESTS_MINUTE') pipeline.del('GEO_REQUESTS_MINUTE') pipeline.del('UNIQUE_VISITORS_MINUTE') end end
As you can see, there are two types of counters. One tracks the count for each minute, the other aggregates it for the hour. So for example, take USER_REQUESTS_MINUTE. This is incremented for each request made on behalf of a logged in user. Then upon the dawn of the minute, this counter is added to the sorted set USER_REQUESTS_LAST_HOUR and then immediately deleted.

You can chop the aggregations every Mth hour since otherwise these sets will keep growing eventually taking all of Redis memory! I won't show that code, but it should be fairly straightforward to implement.

After having implemented this solution and writing this article, I have come to see that there are other ways to implement counting using Redis. Redis provides such a versatile set of data structures and algorithms that there is always a simpler or more elegant technique somewhere!

For example, when we use a container like a sorted set or a list, we must set its bounds, clearing it at certain time intervals and thus restricting its memory usage. But if you use the Redis stack, there is an excellent data structure - the Redis Timeseries that does much of this bookkeeping for you. Basically, you can configure the time series to expire old entries (something that most other Redis data structures do not do for you - you can expire the complete key or nothing at all). Besides that it has commands very similar to the set or the sorted set.

Another advantage of a time series vs a sorted set would be the trivially simple management of a "rolling" window of counts. This is typical in performance monitoring that you want the "last 72 hours" or the last "30 days" of performance data, which is more useful than data "for all of today" or "for the current hour".

I leave this as an exercise to the reader. Maybe I can talk about this in greater detail in a future article as well!

Friday, December 18, 2020

Set GIT Hash on Heroku Deploys


There are several ways to set the GIT hash for heroku. I prefer setting this just before pushing the main branch to the heroku remote.

Also, I prefer to automate this with a `pre-push` hook.

One issue I ran into was that it was not straightforward to know if there is anything to push. Meaning, if the remote was up-to-date or not. This is important, as we don't want heroku deploying a version when there are no changes to be deployed (which it will if we set the APP_VERSION).

The simplest way to know if the remote was already up-to-date seemed to be to do a `--dry-run` of `git push`.

But of course that runs the hook again, so this is an endless loop situation.

It does not seem that git allows us to see the git argument on the hook. If it did, we could explicitly not run the hook on a dry run.

But there is a `--no-verify` option that bypasses the hooks, and can be used when we do our dry run.

Here is the complete script:




Saturday, November 30, 2019

Rails database migrations on a cluster with AWS CodeDeploy

A typical Ruby / Rails solution will comprise of a number of web servers behind a load balancer. Each of the web servers will read/write from a central database. In the course of new features being added to the application, the database schema goes through changes, what we refer to as "migrations".

When new code is deployed, the migrations that are needed for new code needs to be run first. If AWS Code Deploy is used for deployment of new code, we can set up the AfterInstall hooks to run the migrations before re-starting the web server.

So the usual flow in a deployment goes something like this:

  1. Stop the web server
  2. Migrate the database
  3. Start the web server

However, our application is hosted on a number of web servers. We don't want to bring down all servers at once. A typical blue/green deployment will have us deploy to just one third of the server fleet at once.

So if we have 27 web servers, we will be running the above steps on 9 of them at the same time. The main problem with this is that when the Rails migrate runs on multiple servers at once, it is likely to fail on a number of them. This is because Rails takes an advisory lock on the database and throws an exception on concurrent migrations. You can read more about the advisory locking here as well as a way to work around the problem.

But the solution is not without its drawbacks. If you prevent the migrations running on all but one machine, it is possible that the new code will be deployed sooner on those machines before the migration has finished. This is specially true for long running migrations. Then there is potential for new code to be running against an old database schema. New features that depend on the new schema will likely fail.

A better solution would be:
  1. Run the migration on a single instance - this could be one of the web servers, or a dynamically provisioned EC2 instance that can access the database.
  2. For all web servers
              2.1 Stop the server
              2.2 Deploy new code to it
              2.3 Start the server
The advantage of this solution is
  1. We side-step the concurrent migration issue. We run the migrations on a single instance and then do the rest of the deployment without incurring any database issues.
  2. We bring up the new code only after the database is migrated, so the new features work reliably from the start.
So the new database schema changes need to be backward compatible. But this is a general constraint we have anyway since on a blue/green deployment some part of the code is old and will be hitting the new database.

While this solution is pretty straightforward, it requires some effort to implement this in the AWS CodeDeploy environment.

What I ended up doing was to use a new Deployment Group (called staging) to bring up a single EC2 instance, change the start up code to only run the migration on that deployment group. Then I hooked this deployment group right after the deployment to a test instance, but before the code is deployed to the production servers.

In the startup code, we can check the current deployment group via ENV['DEPLOYMENT_GROUP_NAME']. In our scripts, we set the RAILS_ENV equal to the Deployment Group. This allows code to take different paths based on where it runs (in a local dev environment, a staging server or like in this case on a migrator server).

This is what our migrate script now looks like:


It is important to set the inequality, as we want the migrations to run on our test servers - we just don't want them running on the production web servers.

We add this to our database.yml, notice the environment is staging, to match the deployment group. Notice the database is the production instance.


In our case, we read credentials from AWS Secrets Manager. You don't have to.

This is how our staging step in CodePipleline looks like:


On the last CodeDeploy step, hit the edit button and set the Application Name and the Deployment Group correctly.



Now before the code is deployed to the production servers, the database migration has been completed on the staging instance. If the migration fails, CodeDeploy won't advance to the deploy stage. When the production servers start with new code, they will all use the new database schema.

After the migration has finished and before code is deployed, the old code will start using the new database schema. As long as the new schema is backward compatible, this will not cause a problem.

You may have to run the release pipline a few times till AWS co-operates with the new changes. But it should eventually start working.

Monday, July 08, 2019

Kinesis/Firehose/Athena - Creating queryable data from logs

Amazon has a data transformation pipeline that allows log data to be queried with a SQL like syntax. This can be useful to gain insight that is buried in log data generally thought of as temporary. When was the last time you went over 6 months of logs? Right, just what I thought.

Wading through logs is painful and with the growth of data all the more so. No wonder that when confronted with the task of gleaning information from past data, engineers build specialized table structures with relational queries in mind or provision specialized map/reduce jobs to crunch over the data for detailed answers to specific questions.

But this time consuming exercise can be done away with by using the Amazon Kinesis pipeline. The flow looks something like this -> The application writes a JSON formatted record that captures a particular item of interest to a Kinesis data stream.  A Firehose instance is attached to the output of the data stream. This Firehose instance converts the data to a "JSON like" format and writes them into a S3 bucket at a specified folder. Another Amazon service Glue provides a crawler that can then process new files that get uploaded to the S3 bucket. The Glue crawler infers the schema from the JSON it finds in the S3 files and creates a Glue table. To query the data, Amazon provides yet another service - Athena, which sports a SQL syntax and a user friendly query console. Phew, yeah, it is quite the mother of all pipelines.

This is all pretty straightforward to set up starting from Kinesis console itself. You should start with the Data streams tab in Kinesis, create a data stream, then create a Kinesis Firehose with source equal to the data stream you just created. Specify that firehose data will be written with the API like so:



Since we are writing JSON to Kinesis, there is no need to convert record format, and we will use the data as is without transformation to Firehose, well more on this later, but we can leave the default settings for Source record transformation and Record format conversion

Finally you need to specify where you want this data to live, in our case S3:


Now head over to Glue and add a crawler. Specify the S3 folder that you used above for the "include path". For simplicity, I would start with a single schema for all S3 records, under "Grouping Behaviors".



Now head over to your favorite editor and let's write some code - finally!
It's up to you how you want to structure the code to do this. In the application I'm building, it is literally the logs that I want sent over to Kinesis. Anticipating this, I wrote a function that the app calls for writing logs, and this function was the ideal place to add in the write to Kinesis. It looks something like this:



That will be all to it, except there is an annoying bug in the pipeline that we need to work around. The issue is that Firehose writes "JSON like" data to S3 that is all a single line. The Glue crawler expects each record to exist in a single line. So when all the records are squished into a single line in S3, the crawler processes the first and throws away the rest. Imagine my surprise when only 1 out of 17 of my log records appeared in the Athena queries.

The workaround is to write a Lambda function with a Kinesis trigger. What this does is that every time a Kinesis record is written, the Lambda gets triggered. Well, that is not strictly true - Kinesis will batch a bunch of records and invoke the lambda once per batch. The batch size (or time for trigger) can be specified from the console.

Or if you are using serverless, this can be specified in the serverless.yml like so:



Without further ado, here's the Lambda that adds the newline:



This is written in node.js, and I used the serverless framework with the node.js template to write it. I'm exporting a single function named newlines. This is triggered when there is a batch of records in the Kinesis data stream. We map over the records, transforming each record by adding a new line. This is done in the add_new_line function.

To let the node engine know what we did, we use the callback. It is standard node.js to pass an error object for errors and null when there are no errors (we succeeded).

firehose.putBatchRecord is for efficiency - we could just as well have used firehose.PutRecord and the results would be the same besides throughput.

Monday, June 04, 2018

JSON to objects in a few languages

When working with data services, we often have a need to convert JSON strings to objects that model our data. Here is a list of code snippets in different languages that convert this Facebook Graph JSON data.

The list is presented in ascending order of verbosity. Predictably, Javascript most succinctly expresses its wishes, whereas Java uses a copious amount of code. Scala avoids some of that verbosity by using case classes that remove boilerplate code for constructors. Jackson (Java) requires getters and setters to identify which attributes of the object are to be serialized, causing code bloat.

JSON:
Javascript:
Ruby:
Python:
Scala:
Java:

Friday, April 20, 2018

Goldbach Conjecture

In the 18th century, two mathematicians came up with a conjecture - known by its original creator - named Goldbach conjecture. It says that any even number greater than 2 can be expressed as a sum of two primes. There is no theoretical proof for this yet, but it is said to hold up to 400 trillion.


A program to test Golbach conjecture for a given integer:

This program demonstrates two algorithms that are well known.


  1. The sieve of Eratosthenes to calculate all primes upto a given number 
  2. A linear algorithm to find if two numbers in a list sum to a given number.


To prove the Goldbach conjecture for a given n, we use the sieve to find all prime numbers up to n, then use the linear algorithm to find two primes from this list that sums up to n.