Author Topic: A Ring Cache Implementation  (Read 1367 times)

0 Members and 1 Guest are viewing this topic.

Offline Lanny

  • Zealot
  • ****
  • Posts: 1,123
    • View Profile
A Ring Cache Implementation
« on: November 12, 2014, 11:43:38 pm »
People are always going on about "generating content" and shit so here.

So at work I've been having this problem where we have a stream of data with a lot of variability in volume per time unit and processing it has several potentially expensive operations that we cache. The stream feeds into a single queue and there are multiple concurrent processes consuming. The issue is that we were using fixed time caching, so like we'd put something in the cache and set it to expire in a day. When volume is high though we flood the cache, which is backed by redis and thus in memory. If the OOM killer didn't nix the redis server then it would die when it forks to save (that's actually fixable but it doesn't change the fact that we were using too much RAM in the first place). The solution was a circular cache, as an item is added the oldest is removed. The result is some level of thrashing during high intake but a bounded memory footprint which is a win all in all. My first idea was just a simple bounded linked list, but the problem there was that lookup became a linear time operation. So I ended up using a hashtable and a linked list together to manage the cache, and actually was able to get constant  time get and set. Values are stored in the hashtable and the list is used to determine which keys need to be ejected. Here's the code:

Code: [Select]
class RingCache(object):
    """
    Implements a cache with O(1) lookup and append time and which contains at
    most size items. The first items set are the first deleted.
    """
    def __init__(self, name, size=100, conn=None):
        self.hash_key = 'ring-cache:%s:hash' % name
        self.list_key = 'ring-cache:%s:list' % name
        self.size = size
        self.r = conn or get_redis_conn()

    def get(self, sub_key):
        return self.r.hget(self.hash_key, sub_key)

    def set(self, sub_key, sub_value):
        """
        Enter the new item into the hashtable and push it into the list. If
        that puts us over capacity, pop an item off the other end of the list
        and remove the corrosponding key from the hashtable.
        """

        with self.r.pipeline() as pipe:
            pipe.hset(self.hash_key, sub_key, sub_value)
            pipe.rpush(self.list_key, sub_key)
            pipe.execute()

            # We have to make sure our pops happen atomically, if we know that
            # then we know no two processes have the same key and we can buffer
            # our hdels or execute them non-atomicly
            popped_keys = []
            while 1:
                try:
                    pipe.watch(self.list_key)
                    overage = pipe.llen(self.list_key) - self.size

                    for _ in range(overage):
                        popped_keys.append(self.r.lpop(self.list_key))

                    pipe.multi()

                    for key in popped_keys:
                        pipe.hdel(self.hash_key, key)

                    pipe.execute()
                    break

                except redis.WatchError:
                    continue

    def _get_keys(self):
        return self.r.hkeys(self.hash_key)

I was kinda surprised it worked. An improvement would be if I could find a way to move an item back to the front of the list every time it's accessed, so it works like LRU instead of FIFO, but I think it's fundamentally impossible to implement LRU with constant time lookup.

Offline aldra

  • Arch Disciple
  • ***
  • Posts: 623
  • albrecht drais
    • View Profile
Re: A Ring Cache Implementation
« Reply #1 on: November 12, 2014, 11:48:00 pm »
can't go through it right now but which language is that?

Offline Lanny

  • Zealot
  • ****
  • Posts: 1,123
    • View Profile
Re: A Ring Cache Implementation
« Reply #2 on: November 12, 2014, 11:50:21 pm »
Python, and get_redis_conn just fetches a pyredis (the defacto standard redis binding for python) connection from a pool.