Thursday, January 19, 2017

Simple serverless services using Google AppEngine

Distributed programming is hard. Data needs to be prepared, split up, executed in parallel, and combined into one solution again. Coordination, cluster management, fault-tolerance, and optimization makes parallel processing complex. Things are simplified when the problem is embarrassingly parallel. This is the case when sections of the data can be processed without any dependency on other parts. The ultimate is a function that has no dependencies on its execution context and is side-effect free. Such a function can be easily memoized, or executed in any order, depending on a specific execution plan.

The aim of distributed programming is to increase scalability when a given set of operations cannot be scaled anymore on one server just by adding more memory or CPUs. The solution is to split up the work and send it to multiple machines to process. This allows us to scale to the processing of terabytes or petabytes of data using a cluster of thousands of nodes. That brings us to the latency scale for computing inspired by the earlier list originally collected by Peter Norvik:

image

The order of magnitudes for latency shown in the table above are to be considered when distributing data over a cluster. We need to balance the work to distribute to a node where the cost of transferring the data is less than computing it locally.

Serverless Functions

Serverless functions allow developers to describe logic that executes in response to a given event and executes in a stateless container that can run one or more functions at the same time. Serverless frameworks do not need to be executed in parallel, but they are easily paralellizable due to their context-free and side-effect free behavior.

What do serverless functions look like and how can they be implemented? Let’s look an example. Say, we have a Google AppEngine application written in Python with the following lambda-like function that takes a number of zipcodes and looks up their address:

def geolocate(zipcodes):
    return [get_address(zipcode) for zipcode in zipcodes]

Now assume that the get_address function itself is implemented as a service call to an external geolocation service, such as the Google Maps API. We now run into the bottleneck of network speed. Depending on the distance between the client and the server, each call realistically costs ~10ms if server and client are in the same region and ~75ms between NYC and Seattle. Crossing the Atlantic Ocean takes an additional 100ms.

If we have to resolve 1,000 addresses, the above simple code takes 10s at best and 3 minutes at worst. Typically, APIs are aware of this issue and allow for bundling of requests into one. If such bundling is not available, the solution would be to run multiple requests in parallel, either by using multiple threads or separate processes.

The above example places the bottleneck in the network. However, for different calculations, the bottleneck might be CPU-heavy and require load balancing to a number of servers in a cluster. Managing such a cluster requires extra planning. It would be nice if we could not worry about how the calls are made in parallel, and simply use a decorator as follows:

@serverless.parallel
def geolocate(zipcodes):
    # run in parallel on a cluster
    return [get_address(zipcode) for zipcode in zipcodes]

What the decorator does in this case is to split up the data into smaller chunks, send each chunk to a different server, and collate the results as they arrive in parallel, and finally return the result. For the reader, this code looks like it runs serially in one thread, but in reality it runs highly parallelized.

Before we get to how this is implemented, here is a typical run of the hosted application at simple-serverless.appspot.com:

Number of zipcodes = 3000
Regular duration = 345.57s
Parallel duration = 18.98s
Speedup = 18.2X
Details per pipeline step:
  1 - Step "geolocate" took 18.65s for 100 workers and 3000 elements
  2 - Step "cleanup" took 0.34s for 100 workers and 3000 elements
  3 - Step "sort" took 0.00s for 1 worker and 413 elements

In the above run, we resolved 3,000 zipcodes in ~19 seconds, while normal execution would take almost 6 minutes. Actual speedup depends on how many nodes are currently warmed up. Typically, a second run runs faster.

Implementation

To implement the above decorator, the easy part is splitting the data into smaller chunks and collating the results after all the work is done. The hard part is finding servers to run the stateless function on and use an effective load balancer to horizontally scale to an elastic demand. It turns out that Google AppEngine was designed to do exactly that.

If we can somehow handle a chunk by making a web service call back to our own domain, using GAE load balancing to dispatch back to the current server, or wake a new one depending on current load, we effectively piggyback on GAE to create a poor-man's serverless lambda implementation. It turns out that is not that hard to do.

Breaking up the original data into multiple chunks looks like this:

bucketSize = max(1, int(len(data) / WORKER_COUNT))
buckets = [
    data[n: n + bucketSize]
    for n in xrange(0, len(data), bucketSize)
]

We then convert the chunk into JSON, encode what lambda function we want to run, and invoke it as an RPC call:

# create a non-blocking, asynchronous worker to handle one 
# bucket, on our own instance, or on new ones automatically
# launched by appengine def createWorker(bucket): worker = urlfetch.create_rpc(deadline=60) scheme = os.environ['wsgi.url_scheme'] host = os.environ['HTTP_HOST'] url = scheme + '://' + host + URL headers = { 'Content-Type': 'application/x-www-form-urlencoded', SECRET_KEY: SECRET, } payload = urllib.urlencode({ 'module': method.__module__, 'className': className, 'isclass': isclass, 'method': method.__name__, 'data': json.dumps(bucket), 'args': json.dumps(args), 'argv': json.dumps(argv), }) return urlfetch.make_fetch_call(worker, url, payload,
'POST', headers)

Note: We use a secret that is known only to the application, to avoid external calls to our worker functions. This security by obscurity is not recommended for production applications and a stronger authentication model should be used then.

The receiving worker route is set up in the client code as follows:

app = webapp2.WSGIApplication([
    ('/', ZipCodeHandler),
    serverless.init('/serverless_route', SERVERLESS_SECRET)

We initialize serverless with a route name of our choice, which can be any arbitrary name and the secret of our choosing. When a chunk of data is sent to that route as a POST, it is unpacked, the requested lambda function is invoked, and the result is returned in JSON format.

The results for each of the workers is collated as follows:

workers = map(createWorker, buckets)
result = list(itertools.chain(*map(getResult, workers))))

The workers are created and will invoke their lambda functions asynchronously using the createWorker function shown above. The result is collected in the getResult handler, which blocks until the result is received:

def getResult(worker):
    response = worker.get_result().content
    try:
        return json.loads(response)
    except Exception as e:
        logging.error('Error: %s' % e)
        return [e]

An additional utility is offered to handle workflow processing in the form of a pipeline:

pipeline = serverless.Pipeline(geolocate, cleanup, sort)

This sets up a serverless pipeline where data streams from geolocate, to cleanup, to sort. The pipeline is invoked using its run method:

zipcodes = [
random.randint(10000,99999)
for n in xrange(ZIPCODE_COUNT)
] addresses = pipeline.run(zipcodes)

The cleanup and sort functions look similar to the geolocate function. Key is that they are fully "functional". They depend only on the values of the data they process and are completely context free. The cleanup function is executed in parallel:

@serverless.parallel
def cleanup(addresses):
    return filter(None, addresses)

The sort function is executed sequentially, in the current thread, on all the collated results from the previous step in the pipeline:

@serverless.sequential
def sort(addresses):
    return sorted(addresses)

The entire implementation of this simple serverless library is just 230 lines of Python, slightly more than the length of this README file.

Check the github repo with the source for simple-serverless.

 

Disclaimer: This work is a personal weekend project and is unrelated to Google Cloud Functions, a serverless functions solution based on Node.js and Google Dataflow for Python, a more comprehensive solution for data-driven distribubted programming pipelines.

Friday, January 6, 2017

The 3Sum Problem

For a given list of numbers, the 3Sum problem discovers whether there is at least one combination of three numbers in the list that sum up to zero. The same problem is also listed at LeetCode, with a small twist, where all solutions are to be returned, with duplicates removed.

A few examples:

[0, 0, 0] ==> [[0, 0, 0]]
[-1, 0, 1, 2, -1, -4] ==> [[-1, -1, 2], [-1, 0, 1]]
[-2, 0, -2, 1, 0, 4, 0, -1, -2, 0, -2, 1, 0, 4, 0, -1] ==> [[-1, 0, 1], [-2, -2, 4], [0, 0, 0], [-2, 1, 1]]

The naive, brute force, yet Pythonic approach would be to use itertools.combinations to generate a collection of all possible triplets. From those triplets, we filter out the ones that sum up to zero. Each triplet we then order and add to a set to remove the duplicates. We need to temporarily convert the triples to a tuple, as lists are not hashable in Python. Finally, we convert the set of unique results into a list of lists, as requested by Leetcode:


def threesum_brute_force(L):
    triples = itertools.combinations(L, 3)  # O(n^3)
    zeroes = [t for t in triples if sum(t) == 0]
    return map(list, set([tuple(sorted(t)) for t in zeroes]))


That approach is O(n^3) because we compare all possible combinations. It may look like we actually create them ahead of time before doing any further analysis, but itertools.combinations is implemented as a generator function, so we only produce one triplet at a time, when needed. For a list with 800 elements, we end up generating and comparing 510,081,600 triples.

A similar implementation uses three loops to create each of the combinations. This is still O(n^3), more clearly showing now as we iterate over i, j, and k. This version runs much faster than the previous one, as we avoid creating all the intermediate tuples themselves. We are still doing a lot of repetitive work.


def threesum_triple_loop(L):
    result = set()
    for i in xrange(len(L) - 2):
        for j in xrange(i + 1, len(L) - 1):
            for k in xrange(j + 1, len(L)):  # O(n^3)
                if L[i] + L[j] + L[k] == 0:
                    result.add(tuple(sorted((L[i], L[j], L[k]))))
    return map(list, result)


Rather than comparing all triples, producing O(n^3) time complexity, there is a way to solve this problem in O(n^2). The insight is to first sort all the numbers. Then we loop from left to right. At each incremental step the next number will be larger than before. Rather than have two nested loops to find the next two elements of the triple, we maintain a region bound by j and k.

While we go over each number, we sum this number with the two numbers on the edge of the region. If the sum is greater than zero, we are too far to the right and we shrink the region towards the left. If the sum is less than zero, we are too far to the left. The region remembers the last result and provides a good starting point to find the next zero sum triple. This makes this solution a Dynamic Programming solution, rather than brute force.

A final optimization is to stop once i reaches zero. Namely, after than point the total sum can only be larger than zero, so we can stop iterating.


def threesum(L):
    L = sorted(L)
    result = set()
    for i in xrange(len(L) - 2):
        j = i + 1
        k = len(L) - 1
        while j<k:
            s = L[i] + L[j] + L[k]
            if s == 0:
                result.add(tuple(sorted((L[i], L[j], L[k]))))
            if s > 0:
                k -= 1
            else:
                j += 1
        if L[i] >= 0:
            break
    return map(list, result)


Using DP, we narrowed down the search space dramatically. However, in the above solution, each time we increment i, we pick the next k by selecting the last number in the list, which is also the maximum number. Now, once i gets closer and closer to zero, this k will be less appropriate and we will end up doing a linear search from k towards i to get a smaller positive number that will give us a zero sum. The optimal k can be found more efficiently with a binary search, which makes the overall algorithm more efficient again.

A similar argument applies to j. If we consider the maximum number, it may make less sense to make j become i+1 for its first candidate. Namely if i+j+k==0, then j should be -i + -k. For example, assume the number at i is -30 and the maximum for the list is 24. In that case, rather than make j point at a number such as -29, we can make j skip all the way ahead to 6, as that would be the first candidate to yield zero. Again, using binary search will give us a starting point for j more effectively than doing a linear search, if we are dealing with a large number of elements:


def threesum_binarysearch(L):
    L = sorted(L)
    n = len(L)
    result = set()
    i = 0
    while i < n - 2:
        j = binarySearch(L, i + 1, n - 2, -(L[i] + L[-1]))
        k = binarySearch(L, j + 1, n - 1, -(L[i] + L[j]))
        while j<k and k<n:
            s = L[i] + L[j] + L[k]
            if s == 0:
                result.add(tuple(sorted((L[i], L[j], L[k]))))
                k -= 1
                j += 1
            elif s > 0:
                k -= 1
            else:
                j += 1
        if L[i] == 0:
            break
        i += 1
    return map(list, result)

The binary search makes the search for j and k possible in O(log n), rather than O(n):


def binarySearch(L, min, max, target):
    while True:
        if max < min:
            return min
        mid = (min + max) / 2
        if L[mid] < target:
            min = mid + 1
        elif L[mid] > target:
            max = mid - 1
        else:
            return mid


To give you an idea of the performance of each algorithm, here is a test run with 800 numbers, showing the number of i+j+k==0 comparisons performed by each algorithm and the total time needed:

AlgorithmComparisonsTime (s)
Brute force510,081,600 21.09800005
Three loops636,8040.1089999676
DP195,0480.07100009918
DP + binary search133,3830.03600001335

In short, when solving problems like 3Sum, avoid comparing all elements. This can be done by reducing the search space and ignoring candidates that won't lead to a solution anyways. Aside from reducing the search space, if we can order it in some form, we have even more optimization opportunities. In that case, we can typically avoid linear search and use binary search.

Check out many more algorithms with visualizations at PyAlgoViz.


Python code styled as tango by hilite.me with border:1px solid #ddd;padding:11px 7px;