Concurrent priority queue with mutable priorities [closed]

Here is what I’d like to achieve, as part of an existing Golang project:

  • Concurrent clients making requests to an HTTP server increase their hit counts in an in-memory record associated to their User ID – this happens very frequently, and as fast as possible (SLO on HTTP timings).
  • Background threads continuously pop the record with the highest hit counts, perform a slow task involving a database, and insert the record back into the priority queue (modified by the task, and with its priority decreased) – there could be many threads as we want to keep the frequency of task for each User ID high (e.g every 100-500ms).

I had naively started with a Fibonacci heap given its O(1) Decrease Key function but it is inherently non-concurrent. So, I have been looking at heap-based concurrent priority queues with mutable priorities, such as CHAMP (published by Orr Tamir & al.) 1 and CoMPiQ (published by Ivan Walulya) 2. The first one is lock-based and riddled with concurrency bugs which I’ve been slowly trying to address, while the second one is essentially impossible to implement based on the paper alone, with the pseudo-code only internally published & no public implementation available.

I’ve read about highly-efficient concurrent skip lists, but by nature, they do not support DecreaseKey/IncreaseKey, and therefore do not sound like a good fit – we’d have to locate the old record, mark it as “deleted”, allocate/insert a new record.. we’d be extracting orders-of-magnitude more “deleted” records than actionable ones, and possibly even run out of memory if we wouldn’t be extracting fast enough to clear all of the older/lower priority records which would all be concentrated at the bottom of the queue.

Any suggestions? Thank you in advance!

  • Sorry, stupid question: if you want to process each user at a high frequency, then isn’t it counterproductive to try to process them in a non–round-robin order? (If you always prioritize the ones with the highest hit counts, doesn’t that mean that rarely-hit users are likely to be starved?)

    – 




  • BTW, if you have a lot of threads, you may find it easier to parallelize by partitioning: if you have a few dozen Fibonacci queues and distribute users among them in some way, then you may be able to keep contention low enough even if each queue is single-threaded. (Java’s ConcurrentHashMap uses this principle, but for a hash-map rather than a priority queue.)

    – 




  • @ruakh Yes absolutely, although age can be a factor in priority scoring. For this specific use-case, we care to process users with the highest hit rates at a higher-frequency than quiet users who we can take a longer to get to – e.g. seconds. We can monitor the ages in the queue to ensure we aren’t completely ignoring them.

    – 




  • I use redis sorted sets for similar purposes, I use time-based scores and scale them depending on the prioritization complexity. If you have a chance to have an out-of-process solution you may want to give a chance to it.

    – 

It sounds like your use-case doesn’t require very precise prioritization; users with more hits should get processed sooner, but it’s not essential that users be processed in exact order by number of hits, because either way you’ll be processing everyone.

So, I’ll give three approaches that exploit that flexibility to give you better parallelism at the expense of precise prioritization, without requiring the degree of complexity of CHAMP or CoMPiQ.


Approach #1: Fuzzy comparison.

One approach is to define a set of ranges, and to write your Fibonacci heap’s comparison function so that it only prioritizes one user over another if their hit-counts are in different ranges. With this approach, the ‘decrease key’ operation will usually be O(1), because incrementing a hit-count will never cause a heap invariant violation (that would have to be rectified) except when the hit-count crosses a range threshold. (The ranges could be something like 0–15, 16–31, 32–63, …, 220–∞.)

Furthermore, if you’re careful about it, you can actually avoid even locking the heap when you increment a hit-count without crossing a range threshold. (Of course, you’ll need to either lock the individual user record, or else use atomic.Int64 or similar; but you don’t need to lock the whole heap, so you can concurrently update many users at once.)

By the way, you could also do this with a heap implementation that doesn’t support a ‘decrease key’ operation. You’ve already rejected an approach that involves marking records as ‘deleted’ and re-inserting them with new priorities, but if you only have to do this when a record crosses a range threshold, then you won’t have so many excess records. So that might let you use one of the concurrent priority queue implementations that you had to reject because it didn’t support mutable priorities.


Approach #2: Partitioning into multiple heaps.

Another approach is, instead of having a single Fibonacci heap with a high degree of contention, you can have k heaps, where k is a parameter that you tune experimentally. You assign each user to one heap and each background thread processes one heap, and you don’t worry about whether one heap’s highest-hit user has fewer hits than a different heap’s highest-hit user, because you know that both will be processed soon enough.

This approach still requires locking (because the Fibonacci heap doesn’t support concurrency), but the contention for these locks is greatly reduced, because there’s only a 1/k chance that two threads will need the same heap, provided that everything is distributed evenly.

This approach can actually be combined with approach #1: this approach reduces contention by letting different operations usually operate on different heaps, while approach #1 reduces contention by letting operations usually avoid locking the heap even when operating on the same one. (And again, if you combine this with approach #1, you could do this with a different heap implementation.)

By the way, if you have some users who represent such a large fraction of your traffic that you can’t distribute traffic evenly among your partitions (or can’t easily make sure that it’ll be distributed evenly), then a more complicated variant of this approach is to give users multiple records, with these records being distributed across separate heaps; for each hit, you can choose one record to increment, either randomly or in round-robin fashion. But I don’t particularly recommend this; for one thing, it means each background thread will end up processing a subset of each user’s hits, which would probably affect your business logic in unpleasant ways.


Approach #3: Bucketing into unsorted ranges.

Similarly to approach #1, you can define a set of ranges within which hit-counts are considered interchangeable . . . and then dispense with the heap altogether, in favor of just having an unsorted container for each range. When a user’s hit-count crosses from one range to the next, you remove the record from the old container and move it to the new one. The background threads check the container for the largest defined range, then the next-largest, etc., until they find a user to process.

One version of this approach is to lock a container when adding or removing a user, relying on the fact that users tend to stay in one container for a long time and so you won’t get much contention. (Merely incrementing a user’s hit-count doesn’t require a lock, because you can use an atomic integer for that; you only need to lock when you find that the old and new values belong to different ranges.) This is closely analogous to the Fibonacci-heap approach, but I think it’s likely to be more efficient (and therefore involve less lock contention), because you can use a simpler data structure for the containers.

A more complex version, that completely avoids locks, is to have these containers be lockless queues of special holders, where the holder holds (or simply is) an atomic pointer to the record and the record holds an atomic pointer to the holder. To remove a record from a container, you nil out the pointer from its holder, and to add it to a different container, you create a new holder. (Or, if you’re not comfortable with keeping excess holders around — even if they’re only created when a hit-count crosses a range threshold — then you can use an intrusive doubly-linked list of these holders, with atomic prev and next pointers, and after you’ve nil’d out the userData pointer, you can use compare-and-swap operations to update the linked list to fully remove the holder. I find this a bit messy and tricky to get right, but it’s absolutely doable.)

This approach could also be combined with approach #2, though I suspect that you won’t find much extra benefit from that.

Leave a Comment