Disco + EC2 spot instance = WIN

tl;dr version : This is how I spent a Saturday evening.

<blink>Warning: If you like to write Java, stop reading now. Go back to using Hadoop. It's a much more mature project.</blink>

As a part of my job, I do a lot of number processing. Over the course of last few weeks, I shifted to doing most of it using MapReduce using Disco. Its a wonderful approach to processing big data where the time to process data is directly proportional to the amount of hardware you throw at it and the quantity of data. The amount of data to be processed can (in theory) be unlimited. While I don't do anything of Google scale, I deal with Small Big Data. My datasets for an individual job would probably not exceed 1 GB. I can currently afford to continue not use MapReduce, but as my data set grows, I would have to do distributed computing, so better start early.

Getting started with Disco

If you, like me, had given up on MapReduce in the past after trying to deal with administrating Hadoop, now is a great time to look into Disco. Installation is pretty easy. Follow the docs. Within 5 minutes I was writing Jobs in python to process data, would have been faster if I knew before-hand that SSH daemon should be listening on port 22.

Python for user scripts + Erlang for backend == match made in heaven

Enter disposable Disco

I made a set of python scripts to launch and manage Disco clusters on EC2 where there is no need for any data to be stored. In my usecase, the input is read from Amazon S3 and output goes back into S3.

There are some issues with running disco on EC2.

disposabledisco takes care of the above things and more. Everything needed to run the cluster is defined in a config file. First generate a sample config file.

python create_config.py > config.json
	

This creates a new file with some pre-populated values. For my case the config file looks like this(some info masked)

{
    "AWS_SECRET": "SNIPPED", 
    "ADDITIONAL_PACKAGES": [
        "git", 
        "libwww-perl", 
        "mongodb-clients", 
        "python-numpy", 
        "python-scipy", 
        "libzmq-dev", 
        "s3cmd", 
        "ntp", 
        "libguess1", 
        "python-dnspython", 
        "python-dateutil", 
        "pigz"
    ], 
    "SLAVE_MULTIPLIER": 1, 
    "PIP_REQUIREMENTS": [
        "iso8601",
        "pygeoip"
    ], 
    "MASTER_MULTIPLIER": 1, 
    "MGMT_KEY": "ssh-rsa SNIPPED\n", 
    "SECURITY_GROUPS": ["disco"], 
    "BASE_PACKAGES": [
        "python-pip", 
        "python-dev", 
        "lighttpd"
    ], 
    "TAG_KEY": "disposabledisco", 
    "NUM_SLAVES": 30, 
    "KEY_NAME": "SNIPPED", 
    "AWS_ACCESS": "SNIPPED", 
    "INSTANCE_TYPE": "c1.medium", 
    "AMI": "ami-6d3f9704", 
    "MAX_BID": "0.04",
    "POST_INIT": "echo \"[default]\naccess_key = SNIPPED\nsecret_key = SNIPPED\" > /tmp/s3cfg\ncd /tmp\ns3cmd -c /tmp/s3cfg get s3://SNIPPED/GeoIPASNum.dat.gz\ns3cmd -c /tmp/s3cfg get s3://SNIPPED/GeoIP.dat.gz\ns3cmd -c /tmp/s3cfg get s3://SNIPPED/GeoLiteCity.dat.gz\ns3cmd -c /tmp/s3cfg get s3://SNIPPED/GeoIPRegion.dat.gz\ngunzip *.gz\nchown disco:disco *.dat\n\n"
}
	

This tells disposabledisco that I want a cluster with 1 master and 30 slaces all of type c1.medium, and use ami-6d3f9704 as the starting point. It lists out the packages to be installed via apt-get and python dependencies to be installed using PIP. You can link to external tar, git repo, etc. Basically anything pip allows after pip install

The POST_INIT portion is bash script that runs as root after rest of the install. In my case I am downloading and uncompressing different GeoIP databases archived in a S3 bucket for use from within disco jobs.

Once the config file is ready run the following command many times. The output is fairly verbose.

python create_cluster.py config.json
	

Why many times? Cause there is no state stored in the system. All state is managed using EC2 tags. This is what the script does on each run

Cloudwatch showing 31 instances

Many steps involve EC2 provisioning spot instances, waiting for instance to get initialized, etc..

To help with shipping output to S3, I made some output classes for Disco

Both these functions can be configured gzip the contents before uploading.

As far as input is concerned, I send it a list of signed S3 urls. (Sidenote: It seems disco cannot handle https inputs at the moment, so I use http). A sample job run might look like..

def get_urls():
    urls = []
    for k in bucket.list(prefix="processed"):
        if k.name.endswith("gz"):
            urls += [k.generate_url(3660).replace("https", "http")]
    return urls

MyExampleJob().run(
	input=get_urls(),
	params={
		"AWS_KEY": "SNIP",
		"AWS_SECRET": "SNIP",
		"BUCKET_NAME": "SNIP",
		"gzip": True
	},
	partitions=10,
	required_files=["s3lineoutput.py"],
	reduce_output_stream=s3_line_output_stream
	).wait()

	

Bonus - MagicList - Memory efficient way to store and process potentially infinite lists.

We used Disco to compute numbers for a series of blogposts on CDN Planet. For this analysis it was painful process for me to manually launch Disco clusters, which lead me to create the helper scripts.

Shameless plug

Turbobytes, multi-CDN made easy

Have your static content delivered by 6 global content delivery networks, not just 1. Turbobytes' platform closely monitors CDN performance and makes sure your content is always delivered by the fastest CDN, automatically.

Tags: disco mapreduce Python
Categories: Linux Python