Logging millions of requests everyday and what it takes

HackerEarth’s web servers handle millions of requests every day. These request logs can be analyzed to mine some highly useful insights as well as metrics critical for the business, for example, no. of views per day, no. of views per sub product, most popular user navigation flow etc. Initial Thoughts HackerEarth uses Django as its […]

HackerEarth’s web servers handle millions of requests every day. These request
logs can be analyzed to mine some highly useful insights as well as metrics
critical for the business, for example, no. of views per day, no. of views per
sub product, most popular user navigation flow etc.

Initial Thoughts

HackerEarth uses Django as its primary web development framework and a host of
other components which have been customized for performance and scalability.
During normal operations, our servers handle 80-90 requests/sec on an average
and this surges to 200-250 requests/sec when multiple contests overlap in a
time delta. We needed a system which could easily scale to a peak traffic
500 requests/sec. Also, this system should add
minimum processing overhead to the webservers and the data collected should be
stored for crunching and offline processing.


The diagram above shows a high level architecture of our request log
collection system. The solid connection lines represent the data flow between
different components and the dotted lines represent the communications. The
whole architecture is message based and stateless and so individual components
can easily be removed/replaced without any downtime.

Below is a more detailed explanation about each component in the order of data

Web Servers

On the web servers, we employ a Django Middleware that asynchronously retrieves
required data for a given request and then forwards it to the Transporter
Cluster servers. This is done using a thread and the middleware adds an
overhead of 2 milli seconds to the Django request/response cycle.

class RequestLoggerMiddleware(object):
    Logs data from requests
    def process_request(self, request):
        if settings.LOCAL or settings.DEBUG:
            return None

        if request.is_ajax():
            is_ajax = True
        request.META['IS_AJAX'] = is_ajax

        before = datetime.datetime.now()

        DISALLOWED_USER_AGENTS = ["ELB-HealthChecker/1.0"]

        http_user_agent = request.environ.get('HTTP_USER_AGENT','')

        if http_user_agent in DISALLOWED_USER_AGENTS:
            return None

        # this creates a thread which collects required data and forwards
        # it to the transporter cluster
        run_async(log_request_async, request)
        after = datetime.datetime.now()

        log("TotalTimeTakenByMiddleware %s"%((after-before).total_seconds()))
        return None

Transporter Cluster

The transporter cluster is an array of Non Blocking Thrift servers for the sole purpose of receiving data from the web servers and routing them to any other component like MongoDB, RabbitMQ, Kafka etc. Where a given message should be routed to is specified in the message itself from the webservers. There is only one way communication from webservers to the transporter servers and this saves some time resource spent in the acknowledgement of the message reception by thrift servers. We may lose some request logs due to this but we can afford to do so. The request logs are currently routed to the Kafka cluster.
The communication between the webservers and the transporter servers takes 1-2 milli seconds on an average and can be horizontally scaled to handle an increase in load.

Following is a part of the thrift config file. The file defines a DataTransporter service supporting a method with oneway as a modifier which basically means that the RPC call will return immedeiately.

service DataTransporter {
    oneway void transport(1:map<string, string> message)

Kafka Cluster

Kafka is a high throughput distributed messaging system that supports publish/subscribe messaging pattern. This messaging infrastructure enables us to build other pipelines that depend upon this stream of request logs. Our Kafka cluster stores last 15 days worth of logs and so we can make any new consumer that we implement start processing data 15 days back in time.

Useful reference for setting up a kafka cluster.

Pipeline Manager Server

This server manages the consumption of request log messages from the Kafka topics, storing them in MongoDB and then later moving them to Amazon S3 as well as Amazon Redshift.
MongoDB acts merely as a staging area for the data consumed from the Kafka topics and this data is transferred to S3 at hourly intervals. Every file that is saved in S3 is loaded into Amazon Redshift which is a data warehouse solution that can scale to petabytes of data. We use Amazon Redshift for analyzing/metrics calculation from request log data.
This server works in conjunction with a RabbitMQ cluster which it uses to communicate about task completion and initiation.

Here is the script that loads data from S3 into Redshift. This script handles insertion of duplicate data first by removing any duplicate rows and then inserting the new data.

import os
import sys
import subprocess

from django.conf import settings

def load_s3_delta_into_redshift(s3_delta_file_path):
    """s3_delta_file_path is path after the bucket
    bigdata_bucket = settings.BIGDATA_S3_BUCKET

    attrs = {
        'bigdata_bucket': bigdata_bucket,
        's3_delta_file_path': s3_delta_file_path,

    complete_delta_file_path = "s3://{bigdata_bucket}/{s3_delta_file_path}".format(**attrs)

    schema_file_path = "s3://{bigdata_bucket}/request_log/s3_col_schema.json".format(**attrs)

    data = {
            'AWS_ACCESS_KEY_ID': settings.AWS_ACCESS_KEY_ID,
            'LOG_FILE':  complete_delta_file_path,
            'schema_file_path': schema_file_path

    S3_REDSHIFT_COPY_COMMAND = " ".join([
        "copy requestlog_staging from '{LOG_FILE}' ",
        "CREDENTIALS 'aws_access_key_id={AWS_ACCESS_KEY_ID};aws_secret_access_key={AWS_SECRET_ACCESS_KEY}'",
        "json '{schema_file_path}';"

    LOADDATA_COMMAND = " ".join([
        "begin transaction;",
        "create temp table if not exists requestlog_staging(like requestlog);",
        'delete from requestlog using requestlog_staging where requestlog.row_id=requestlog_staging.row_id;',
        'insert into requestlog select * from requestlog_staging;',
        "drop table requestlog_staging;",
        'end transaction;'
        #'vacuum;' #sorts new data added

    redshift_conn_args = {
        'host': settings.REDSHIFT_HOST,
        'port': settings.REDSHIFT_PORT,
        'username': settings.REDSHIFT_DB_USERNAME

    REDSHIFT_CONNECT_CMD = 'psql -U {username} -h {host} -p {port}'.format(**redshift_conn_args)


    returncode = subprocess.call(PSQL_LOADDATA_CMD, shell=True)
    if returncode !=0:
        raise Exception("Unable to load s3 delta file into redshift ",

What’s next

Data is like gold for any web application. The insights that it can provide and
growth it can drive is amazing, if done the right way. There are dozens of
features and insights that can be built with the requests logs, including
recommendation engine, better content delivery, and improving the overall
product. All of this is a step towards making HackerEarth better each & every
day for our users.

If you have any queries or wish to talk more about this architecture or any of the technologies involved, you can mail me at [email protected]

Posted by Praveen Kumar.

Source: HackerEarth