Distributed programming is hard. Data needs to be prepared, split up, executed in parallel, and combined into one solution again. Coordination, cluster management, fault-tolerance, and optimization makes parallel processing complex. Things are simplified when the problem is embarrassingly parallel. This is the case when sections of the data can be processed without any dependency on other parts. The ultimate is a function that has no dependencies on its execution context and is side-effect free. Such a function can be easily memoized, or executed in any order, depending on a specific execution plan.
The aim of distributed programming is to increase scalability when a given set of operations cannot be scaled anymore on one server just by adding more memory or CPUs. The solution is to split up the work and send it to multiple machines to process. This allows us to scale to the processing of terabytes or petabytes of data using a cluster of thousands of nodes. That brings us to the latency scale for computing inspired by the earlier list originally collected by Peter Norvik:
The order of magnitudes for latency shown in the table above are to be considered when distributing data over a cluster. We need to balance the work to distribute to a node where the cost of transferring the data is less than computing it locally.
Serverless Functions
Serverless functions allow developers to describe logic that executes in response to a given event and executes in a stateless container that can run one or more functions at the same time. Serverless frameworks do not need to be executed in parallel, but they are easily paralellizable due to their context-free and side-effect free behavior.
What do serverless functions look like and how can they be implemented? Let’s look an example. Say, we have a Google AppEngine application written in Python with the following lambda-like function that takes a number of zipcodes and looks up their address:
def geolocate(zipcodes):
return [get_address(zipcode) for zipcode in zipcodes]
Now assume that the get_address function itself is implemented as a service call to an external geolocation service, such as the Google Maps API. We now run into the bottleneck of network speed. Depending on the distance between the client and the server, each call realistically costs ~10ms if server and client are in the same region and ~75ms between NYC and Seattle. Crossing the Atlantic Ocean takes an additional 100ms.
If we have to resolve 1,000 addresses, the above simple code takes 10s at best and 3 minutes at worst. Typically, APIs are aware of this issue and allow for bundling of requests into one. If such bundling is not available, the solution would be to run multiple requests in parallel, either by using multiple threads or separate processes.
The above example places the bottleneck in the network. However, for different calculations, the bottleneck might be CPU-heavy and require load balancing to a number of servers in a cluster. Managing such a cluster requires extra planning. It would be nice if we could not worry about how the calls are made in parallel, and simply use a decorator as follows:
@serverless.parallel
def geolocate(zipcodes):
# run in parallel on a cluster
return [get_address(zipcode) for zipcode in zipcodes]
What the decorator does in this case is to split up the data into smaller chunks, send each chunk to a different server, and collate the results as they arrive in parallel, and finally return the result. For the reader, this code looks like it runs serially in one thread, but in reality it runs highly parallelized.
Before we get to how this is implemented, here is a typical run of the hosted application at simple-serverless.appspot.com:
Number of zipcodes = 3000
Regular duration = 345.57s
Parallel duration = 18.98s
Speedup = 18.2X
Details per pipeline step:
1 - Step "geolocate" took 18.65s for 100 workers and 3000 elements
2 - Step "cleanup" took 0.34s for 100 workers and 3000 elements
3 - Step "sort" took 0.00s for 1 worker and 413 elements
In the above run, we resolved 3,000 zipcodes in ~19 seconds, while normal execution would take almost 6 minutes. Actual speedup depends on how many nodes are currently warmed up. Typically, a second run runs faster.
Implementation
To implement the above decorator, the easy part is splitting the data into smaller chunks and collating the results after all the work is done. The hard part is finding servers to run the stateless function on and use an effective load balancer to horizontally scale to an elastic demand. It turns out that Google AppEngine was designed to do exactly that.
If we can somehow handle a chunk by making a web service call back to our own domain, using GAE load balancing to dispatch back to the current server, or wake a new one depending on current load, we effectively piggyback on GAE to create a poor-man's serverless lambda implementation. It turns out that is not that hard to do.
Breaking up the original data into multiple chunks looks like this:
bucketSize = max(1, int(len(data) / WORKER_COUNT))
buckets = [
data[n: n + bucketSize]
for n in xrange(0, len(data), bucketSize)
]
We then convert the chunk into JSON, encode what lambda function we want to run, and invoke it as an RPC call:
# create a non-blocking, asynchronous worker to handle one
# bucket, on our own instance, or on new ones automatically
# launched by appengine def createWorker(bucket): worker = urlfetch.create_rpc(deadline=60) scheme = os.environ['wsgi.url_scheme'] host = os.environ['HTTP_HOST'] url = scheme + '://' + host + URL headers = { 'Content-Type': 'application/x-www-form-urlencoded', SECRET_KEY: SECRET, } payload = urllib.urlencode({ 'module': method.__module__, 'className': className, 'isclass': isclass, 'method': method.__name__, 'data': json.dumps(bucket), 'args': json.dumps(args), 'argv': json.dumps(argv), }) return urlfetch.make_fetch_call(worker, url, payload,
'POST', headers)
Note: We use a secret that is known only to the application, to avoid external calls to our worker functions. This security by obscurity is not recommended for production applications and a stronger authentication model should be used then.
The receiving worker route is set up in the client code as follows:
app = webapp2.WSGIApplication([
('/', ZipCodeHandler),
serverless.init('/serverless_route', SERVERLESS_SECRET)
We initialize serverless with a route name of our choice, which can be any arbitrary name and the secret of our choosing. When a chunk of data is sent to that route as a POST, it is unpacked, the requested lambda function is invoked, and the result is returned in JSON format.
The results for each of the workers is collated as follows:
workers =
map(createWorker, buckets)
result = list(itertools.chain(*map(getResult, workers))))
The workers are created and will invoke their lambda functions asynchronously using the createWorker function shown above. The result is collected in the getResult handler, which blocks until the result is received:
def getResult(worker):
response = worker.get_result().content
try:
return json.loads(response)
except Exception as e:
logging.error('Error: %s' % e)
return [e]
An additional utility is offered to handle workflow processing in the form of a pipeline:
pipeline = serverless.Pipeline(geolocate, cleanup, sort)
This sets up a serverless pipeline where data streams from geolocate, to cleanup, to sort. The pipeline is invoked using its run method:
zipcodes = [
random.randint(10000,99999)
for n in xrange(ZIPCODE_COUNT)] addresses = pipeline.run(zipcodes)
The cleanup and sort functions look similar to the geolocate function. Key is that they are fully "functional". They depend only on the values of the data they process and are completely context free. The cleanup function is executed in parallel:
@serverless.parallel
def cleanup(addresses):
return filter(None, addresses)
The sort function is executed sequentially, in the current thread, on all the collated results from the previous step in the pipeline:
@serverless.sequential
def sort(addresses):
return sorted(addresses)
The entire implementation of this simple serverless library is just 230 lines of Python, slightly more than the length of this README file.
Check the github repo with the source for simple-serverless.
Disclaimer: This work is a personal weekend project and is unrelated to Google Cloud Functions, a serverless functions solution based on Node.js and Google Dataflow for Python, a more comprehensive solution for data-driven distribubted programming pipelines.