Throttling in Distributed Systems

In public cloud, throttling, or rate limiting, is ubiquitous – if you are a multi-tenant service you have to make sure none of your tenants is able to abruptly claim so much resources in your system that affects the user experience of other tenants. However, doing throttling in distributed context in inherently hard. To obtain accurate and reliable real time concurrency or requests per second (RPS) numbers you need to offload the tracking to a centralized, standalone service, which is both hard to implement and tricky to get right. So many systems go live without having a centralized traffic tracking system, but still manage to provide the throttling mechanism. In this article I’ll walk through some intuitive approaches that achieves this goal.

The first idea that comes to mind is to let individual servers to handle throttling based on their share of the concurrency/RPS allocated to a particular tenant. For example, if you have a server fleet size of 10 and the allocated RPS for a tenant is 100, then, if the load is shared perfectly even among the servers, each server can assume it should handle no more than 100 / 10 = 10 RPS for the tenant.

However, the tricky part in the above approach is “if the load is shared perfectly even among the servers”. It requires a load balancer that can round-robin really well, which is often hardly the case. In addition, the enforced RPS limit is a function of the fleet size, which means that servers must learn the changes of fleet size in a timely fashion. This adds to implementation complexity. Bummer is even bigger when it comes to super sized server fleet, which is often the case for giant sized cloud providers. Say you have a server fleet at the size of 1000, but the RPS limit for a particular tenant is just 100, so each server is expected to deal with only 0.1 RPS and the load balancer not round-robining well can pose great threat to throttling accuracies.

A natural improvement to local throttling is to find a way to direct all traffic for a tenant to just one server, which is what the term “consistent hashing” is about. In this scenario, at receiving a request, a server will first figure out which server is the designated server based on the consensus hashing algorithm, and forward the request to the designated server. In this scenario we don’t need to worry about imperfect round-robining, as ultimately the processor of requests from the same tenant will be the same server. Thus designated server’s local view of the tenant’s traffic is the global view and throttling becomes easy.

Needless to say any reader with production experiences on large distributed systems know “consistent hashing” may suffer from “hot shard” problems either due to poorly designed hashing algorithms or even just organic traffic growth. Some servers can receive more traffic than others thus become less performant, affecting customer experience as a result. To tackle this problem,  we can add capacity to “shards”. For instance, in consistent hashing, essentially each “shard” contains only one server. If we let each shard have two servers, the processing power becomes 2X. But, this comes at a cost – if we provision shard capacity based on the traffic volume on the hottest shard, there might be a waste of resources on not so hot shards.

An improved approach to “consistent hashing” is “rendezvous hashing”, the basic idea of which is we can select K (rather than just one) servers to handle a tenant’s traffic and this K can be dynamically adjusted in response to tenant’s real time traffic. After K servers are selected out we can further reduce the choice down to one server, either by randomly choosing or round-robining (again here you are). Then how do we enforce throttling? Let the ultimate request processor do this job or selector do this job? Apparent the job processor is a better fit as the size of job processors (K) is usually far less than the size of the selectors (the size of the entire fleet). Note that now the problem “how to let local view of traffic reflect true global view traffic” comes back again.

Let’s discuss in a bit more detail how to use K job processor servers do throttling. One method is to preset an RPS limit per server per tenant, which is essentially specifying in initial setup “As a server you are only allowed to process this many RPS per tenant”. Let’s say the tenant is allowed to send 400 RPS and RPS limit per server per tenant is 100, then the selector knows it should have K = 400 / 100 = 4 and robin-round through the 4 processors. This approach is simple and effective, but has the limitation that the tenant RPS limit can only be set in incrementals of RPS limit per server per tenant. For example, a tenant RPS limit of 80 is not achievable with the above example. Another method is to let the selector server propagate the RPS limit per server per tenant information to the request processor servers. For instance for the above example, if tenant RPS limit is 320 and K = 4, each request processor server will be told to at most handle 80 RPS for the tenant. This, of course, adds considerable complexity to the system.

Up to now we have a seemingly decent throttling mechanism, but it is far from perfect. There are many issues that can be specific to your system but we omit in our discussions. For example, with “rendezvous hashing” each tenant will essentially have their own randomly chosen K processor servers. These “K processor servers” can of course overlap. If some servers happen to serve more heavy tenants than others, we would still have a “hot server“ issue. And this can be hard to mitigate because the nature of “rendezvous hashing” is human-touch free selection, so human-driven heat management is not viable. Sharp readers may have already noticed that even though this article discusses about throttling in distributed systems, we have also dedicated so much content discussing other relevant issues, like heat management. In the distributed world, there is hardly any problem you can solve in isolated mode, nor are there any ultimately perfect solutions – the best approach is always understand your requirements, and weigh in carefully.

Debugging Go Program Memory Leak

I was debugging a memory leak issue for our Go application recently. The complexity of the service is moderate, and what I originally intended to do is just bake testing – throwing requests at the service at constant concurrency level and observe the memory footprint of the application as time goes by. To my surprise, the resident memory application grew almost linearly against time – running for 5 minutes, I saw the memory size to be around 160MB; running for 10 minutes it almost got me 320 MB. This was in sharp contrast to the 40 MB from author’s original test results.

Apparently memory was leaking, and the leak highly likely happened along with requests. But where? The application has the mechanism to dump the memory profile into a file and that’s what the service’s author used to capture in-use memory footprint conveniently when he was doing the development. The memory profile in a visual format looks like the following.

I tried to dig something out from such graph, but only to find that in that 320MB memory profile, almost every allocation point was occupying twice the amount of memory as its corresponding point in the 160MB memory profile, which means leaking is everywhere!

Strictly speaking, it is not easy for a language that has garbage collection mechanism such as Go, to leak memory like C/C++. Basically you need keeping references to memory objects and let them not go out of scope, and keep doing this all the time, which is not easy to hide. The memory profile report can tell me that “Line X in file Y has allocated 80MB worth of object that are in use at the instant you took the snapshot”, which you know is not right, but it cannot tell you why it is being kept and who is (annoyingly) keeping them.

I spent hours tracking down the usage of a few suspicious allocation points but got no clue on where the leak could be. Having made quite a few random changes and got no luck rerunning test, I came across a post saying “goroutines can pile up if they hang and cause memory leaking”. Sounds reasonable. So I added this line to code which would make stack traces of all existing goroutines to be printed to console when the application exits.

pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)

I reran the test, was able to see the stack traces this time, and the culprit was immediately clear – there are thousands of clones of one goroutine, all blocked on the same line of code, because of writing to a channel! Further analysis showed that because of a miss consideration, the routine that is supposed to read that channel almost always returns earlier than the routine writing to the channel, and it happens for every request. So a thousand requests leave behind a thousand hanging goroutines, and each such goroutine holds on to a variety of objects. That’s why like mentioned above, it looks like leaking was happening everywhere.

Diving deeper, I found that the application’s memory footprint at the designed concurrency level is just around 10MB, far smaller than 40MB claimed by the original author. A likely reason for this is he was testing unknowing the memory leak, but he terminated the test with fewer requests.

The whole process was tiresome, but fun. Only when you finally find the root cause can you connect the dots and realize all traces were pointing to this hidden root cause, like you finally figured out all mysteries at the end of a thriller movie.

Understanding asynchronous programming with example in Guava

It is not easy to understand asynchronous programming. Even though having worked in Java for many years, quite a few junior or even median level software engineers still have confusion on how to use it and what the word asynchronous implies (the literal definitions are easy to memorize, but not useful if you do not profoundly understand the inner workings). Here is a snippet of code (see bottom of this post) I wrote with Guava to demonstrate the difference between synchronous or asynchronous programming.

First we have SquareRpcService, inside which we have a method squareRPC to which you can provide an integer and it will compute the square of the provided integer and return the result in the form of a future. The future will be fulfilled after 1 second. The 1 second delay is achieved with thread sleeping and does not have much real life meaning. This method can resemble very well any asynchronous API you may have which does not carry out underlying working on the calling thread, and also is a little slow to give you substantial result.

Next inside SyncAsyncCompare class is where you will find the main function. There are two private methods – one is called doCalcSync, inside of which we call squareRPC, then do a blocking wait to get the result, and then adds 1 to the result from squareRPC, and return the final result. This method would block the thread that doCalcSync is invoked on. The other private method is called doCalcAsync, inside of which we also call squareRPC, but instead of doing a block wait to get the result, we chain a callback onto the resultant future, and return that future.

Now look inside the main function for how we use doCalcSync and doCalcAsync to calculate the result for 10 integers “concurrently”. Both doCalcSync and doCalcAsync are called using a thread pool named executorService which contain 1 thread under the hood. We use a stopwatch to get the elapsed time for both approaches. The following shows the result of running this snippet of code.

Finished sync calc on thread 9, result is 1, start time is 101191, end time is 101192 s
Finished sync calc on thread 9, result is 2, start time is 101192, end time is 101193 s
Finished sync calc on thread 9, result is 5, start time is 101193, end time is 101194 s
Finished sync calc on thread 9, result is 10, start time is 101194, end time is 101195 s
Finished sync calc on thread 9, result is 17, start time is 101195, end time is 101196 s
Finished sync calc on thread 9, result is 26, start time is 101196, end time is 101197 s
Finished sync calc on thread 9, result is 37, start time is 101197, end time is 101198 s
Finished sync calc on thread 9, result is 50, start time is 101198, end time is 101199 s
Finished sync calc on thread 9, result is 65, start time is 101199, end time is 101200 s
Finished sync calc on thread 9, result is 82, start time is 101200, end time is 101201 s
Sync Mode: total elapsed time 10201 ms

Finished async calc on thread 9, result is 1, start time is 101201, end time is 101202 s
Finished async calc on thread 9, result is 2, start time is 101201, end time is 101202 s
Finished async calc on thread 9, result is 5, start time is 101201, end time is 101202 s
Finished async calc on thread 9, result is 10, start time is 101201, end time is 101202 s
Finished async calc on thread 9, result is 17, start time is 101201, end time is 101202 s
Finished async calc on thread 9, result is 26, start time is 101201, end time is 101202 s
Finished async calc on thread 9, result is 37, start time is 101201, end time is 101202 s
Finished async calc on thread 9, result is 50, start time is 101201, end time is 101202 s
Finished async calc on thread 9, result is 65, start time is 101201, end time is 101202 s
Finished async calc on thread 9, result is 82, start time is 101201, end time is 101202 s
Async Mode: total elapsed time 1025 ms

You will find that doCalcSync approach takes 10 seconds while the doCalcAsync takes only 1 second. Note that both two approaches carry out all the work with the same executorService. So where does the difference come from?

The difference is all due to the block waiting of ListenableFuture::get. In the doCalcSync case, this will block the thread for 1 second, making the thread essentially idle but not able to move on to next task though all tasks are already waiting in the thread pool queue. And because we only have 1 thread in the thread pool, the execution of these 10 tasks becomes essentially sequential, so it roughly takes 10 * 1 = 10 seconds; In the doCalcAsync case, there is no block waiting; we just chain a callback to each future, which basically tells the thread

Please move on, and as soon as that future is fulfilled, please remember to execute the callback in the same thread pool.

So, all 10 tasks, each calls squareRPC once, are fired off in a blink with one thread, and after 1 second, that same thread is used to harvest 10 fulfilled futures in another blink. So the overall time elapsed is roughly just one second.

To make these two approaches similarly fast. We need to make executorService contain the same number of threads as the number of concurrent tasks, which is apparently not a scalable option.

I have been asked by someone what’s the magic behind as soon as that future is fulfilled, please remember to execute the callback in the same thread pool?. Basically, what he is asking is who is monitor the fulfillment of a future? This looks like a magic, but not so magical if you have looked at the implementation of the future class: a future object maintains many states. When you call addListener to chain your callback, you are registering your function and executor into to the future object. On the other end, later when the future producer fulfills the future and calls complete(), those registered callbacks will be submitted to the executor altogether. That’s how the block waiting is saved: no one is doing monitoring or waiting, it’s the future fulfiller that pulls the trigger.

A tale of two storage types: block store v.s. social graph store

Having worked at two storage teams, I found it very interesting to go through the two different storage architectures in an article. Warning: this article only gives you a little bit of 30,000 feet super high-level overview and barely scratches the surface of the two very complicated systems.

The first team is the graph storage team of a company that is famous for its endless news-worthy content. One of the most notable graphs I worked on is the social graph – if you think of every user as a vertex in this graph, edges are the following (and followed by) relationships. There are some interesting characteristics that make this social graph topology and read/write pattern fundamentally different compared to its peer comparatives.

The vertex degree can be extremely high. It’s already rare for some user to have 1000 connections on a friend-orientated platform, or 1000 connections on some professional-orientated platform. But it’s pretty common for celebrities to have tens of millions of followers on this platform.

This social graph is read-heavy and write-light-weight, because every time you publish something all of your followers need to be read for content delivery (a jaw-dropping 10 million read QPS was what I observed as of 2016), while writes only happen when you follow/unfollow some user. Reads not only have high QPS, but also need to be lightning fast because there would be a ton of content deliveries if some celebrity publishes (or celebrities publish at the same time).

Even though it’s technically a “graph”, it is not a conventional graph that you would imagine. Take graph traversal for example: on friend-orientated platform, it would be a quite common ask to get all friends of your friends so the user-engagement team can recommend new friends to you; on professional-orientated platforms, it would be interesting to compute how many connections hops are between you and someone so you can ask for potential references when you want to connect with some important user. But here, we don’t really care who are the followers of all followers of the presidents of United States (We are still interested in who are following popular Singer A and popular Singer B simultaneously though).

MySQL is used to store the social graph, at least for many years until I left (We can temporarily hold off the discussion on why use a row-based relational database to store a social graph). As you would expect it is a distributed cluster that spans more than one data-center, with sharding to increase throughput and data replication to increase durability/availability. If we simplify case to one data-center, any data is stored in four replicas.

The above figure shows a simplified piece of the storage system, there is a so-called app server tier sitting between client and database. Any app server has the same functionality but not “entirely” stateless. Based on some load balance metrics, the client chooses a random app server to use. For reads, the app server reads from one random database host among the four that contains the data (app server has the cache for topology) and returns the data to the client. For writes, the app server persists the request to its local disk-backed log and acknowledge immediate to the client. At this time point, client thinks the write is successful but actually it is not yet. On the other end, the requests in the log are serviced one by one and written to all four databases. If some or all databases are down during the moment, the app server will put the write request into the queue again.

This scheme is simple and effective, notably bringing very high write availability – no matter how many data copies are there at the backend it does not affect availability, since the client just needs to deliver writes to one app server. It is the app server’s responsibility to deliver the data to all database replicas later on. The shortcomings are easy to spot – there is no read-after-write consistency, which is acceptable – suppose you click the “follow” button of some user and it takes a little while for his/her latest content to appear on your timeline, no big deal and we call it eventual consistency. In very unfortunate case though, that “write success” acknowledgement that client gets could be a sheer or partial lie – if the app server’s disk crashes before all replicas get the data, the data is either totally lost or they are discrepancies between these replicas. But this is still within acceptable range – suppose that you clicked the button to follow your favorite celebrity. A day later you find that you did not see his/her content so you go to the person’s profile page and click “follow” again. User experiences are impacted but it is not catastrophic in and out.

After some time, I moved on to another storage team at some leading cloud computing company, which provides block storage as a service. Simply put, the work we do is to physically decouple block devices from virtual computing instances, realized with using a storage node cluster and convert disk I/O to network I/O. In a customer’s eye, he is using a traditional block device without feeling any difference, but under the hood the block device is away from the physical machine that his computing instance is running on, and a single logical block device may even be serviced by multiple physical devices. The most common use case is to setup file systems on top of it and deploy mission critical relational databases to the file system.

The famous CAP theorem states, in a partitioned system, consistency and availability cannot be simultaneously be achieved. Yet, we desire availability and consistency both to a very high degree. Availability should not need much explanation, since block devices (local disks) are already relatively reliable on a computer and our team advertises that our type of “virtual” block device even has a much lower failure rate that a commodity block device.

Why do all replicas need to be strictly consistent? In my previous graph storage scenario, it is acceptable occasionally (or even permanently!) that when you query if you are following some person, database A replies yes and database B replies no. It is also easily fixable by sending the write again to overwrite the inconsistency. However, for block storage, if there is one byte inconsistency between replica A and replica B, it could mean that you have two different views of a file system! A single byte’s contamination can lead to a Terabyte device to be junk. So, if you cannot guarantee consistency, availability is meaningless.

The figure below shows a simplified snippet of the block storage service architecture, which looks rather trivial at first glance. We store a volume as two replicas (on two storage nodes). Low latency is highly desired so there is no app server like proxy involved. Client talks to the storage node directly. Network connections are a non-negligible overhead and we don’t want the client to jump between two replicas, so we assign (through some means) one replica to be master and one to be slave. Client only talks to master (more accurately, whichever replica the client is talking to has the master status). For reads, master grabs the data on its local disk and return immediately. For writes, master replicate them to slave first, only if that’s a success, master writes to itself and acknowledge the client.

Now consider some failure cases. When slave fails, should master stop serving requests? Apparently nope. So master would “quickly” accept the status quo and act as a “solo” master, which means that the master will no longer seek replication before acknowledge the client. In the meanwhile, the master would try to provision a new slave, copy all the existing and incoming data to it and recover the master-slave topology. When master fails, the client would flip to the slave and the slave would be promoted as master. Likewise, now the master is a “solo” master and it should provision a new slave to recover the master-slave topology as soon as possible.

If you are familiar with distributed storage topic, you might already have found that I used a lot of vague terms without much clarification above. Also, lots of details are omitted. For example, how to update the storage node software? If a storage node is temporarily brought down for software update, from the client’s perspective it is a failing storage node. Does that mean the peer takes over, provisioning should happen and a whole device’s data need to be copied? In the early days that was indeed what life looked like. Then, people thought it should be better to wait for the “failing” node for a while, and if it recovers, no full device copy is required. Master can just send those incremental changes happening when the node was in “failing” stage. We call it “catch-up” recovery. Now needless to say the software needs to be more complicated because we need to keep track of what and where these incremental changes are. Wait, if you think further, what if there is a network partition happening? As illustrated in the figure below, suppose that B was put in some unknown-reason network isolation for a while, and A becomes a solo serving master and at the same time waiting for the “failing” slave to recover. If during this period the A unfortunately enters some unknown-reason network isolation and B goes out of network isolation, the client would flip to B thinking it contains the most up-to-date data, but actually it does not! This is essentially a split-brain scenario and introduced by that wait-for-failing-peer-to-recover mechanism. Now we need extra mechanism to prevent this kind of risk introduced by a performance improvement change!

At its infancy, the storage node software is trivial and the system architecture is much simpler. Over the years many features are added to improve efficiency, but they also introduce lots of complications or even “bugs”. A lot of such “bugs” are hard to find during development stage because they only happen when a series of coincidental failures chained sequentially (but they do happen given the scale we operate). As of January 2018, the storage node software contains a staggering 300,000 lines of C++ code, and it is safe to say that a great portion of it is dealing with very rare scenarios. Today, when an engineer submits code change to improve the system architecture, even senior and experienced engineers can hardly ascertain if it is a safe change. Extra validation tools are needed to simulate tens of millions of combinational scenarios to make sure all pre-set consistency constraints are still met. I would say that the root cause for this situation is that the team wants to push both availability and consistency to the extreme.

To summarize, the graph storage aims at achieving unprecedented availability at the cost of consistency; while the block storage views availability and consistency equally important. This lead to very different system architectures. You might conclude that the latter is harder, but that is not entirely (or at all) true, as I will discuss in later articles.

Using MoreExecutors.directExecutor()

In the world of Java async programming, ListenableFuture is a very useful class. One very nice feature about it is you can append callback onto an instance and chain as many callbacks as you want. When adding a callback you need to implicitly or explicitly specify what executor service you want to execute the callback. When you do not want much hassle, MoreExecutors.directExecutor() seems to be a good choice, however by using this service you lose control on what thread will actually run the callback 

Here is the output of running the above snippet of code.

Before submit, main, 1
In run of future1, pool-1-thread-1, 10
In run of future2, pool-1-thread-2, 11
In run of future3, pool-1-thread-3, 12
In callback of future1, main, 1
In callback of future2, main, 1
In callback of future3, pool-1-thread-3, 12

The take away is  – if the future is complete by the time you add the callback, the thread that completes the future will be used to execute the callback; if the future is not yet complete, the caller of addCallback will be the executor. But really you should not have any assumption on which thread will execute the callback, so keep the callback as light-weight as possible.