




# ACCORDING TO PASSED maxsize ARGUMENT _lru_cache_wrapper

def _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo):
    # Constants shared by all lru cache instances:
    sentinel = object()      # unique object used to signal cache misses

    cache = {}                                # RESULTS SAVES HERE
    cache_get = cache.get    # bound method to lookup a key or return None

    # ... maxsize is None:

    def wrapper(*args, **kwds):
        # Simple caching without ordering or size limit
        nonlocal hits, misses
        key = make_key(args, kwds, typed)     # BUILD A KEY FROM ARGUMENTS
        result = cache_get(key, sentinel)     # TRYING TO GET PREVIOUS CALLS RESULT
        if result is not sentinel:            # ALREADY CALLED WITH PASSED ARGS
            hits += 1
            return result                     # RETURN SAVED RESULT
                                              # WITHOUT ACTUALLY CALLING FUNCTION
        misses += 1
        result = user_function(*args, **kwds) # FUNCTION CALL - if cache[key] empty
        cache[key] = result                   # SAVE RESULT

        return result
    # ...

    return wrapper

Python 3.9 LRU缓存源代码:


def fib(n):
    if n == 0:
        return 0
    if n == 1:
        return 1
    return fib(n - 1) + fib(n - 2)

LRU缓存装饰器检查一些基本情况,然后使用wrapper _lru_cache_wrapper包装用户函数。在wrapper内部,将项目添加到缓存的逻辑,LRU逻辑即将新项目添加到循环队列中,并从循环队列中删除项目。

def lru_cache(maxsize=128, typed=False):
    if isinstance(maxsize, int):
        # Negative maxsize is treated as 0
        if maxsize < 0:
            maxsize = 0
    elif callable(maxsize) and isinstance(typed, bool):
        # The user_function was passed in directly via the maxsize argument
        user_function, maxsize = maxsize, 128
        wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo)
        wrapper.cache_parameters = lambda : {'maxsize': maxsize, 'typed': typed}
        return update_wrapper(wrapper, user_function)
    elif maxsize is not None:
        raise TypeError(
         'Expected first argument to be an integer, a callable, or None')

    def decorating_function(user_function):
        wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo)
        wrapper.cache_parameters = lambda : {'maxsize': maxsize, 'typed': typed}
        return update_wrapper(wrapper, user_function)

    return decorating_function


  • Lru Cache wrapper has few book keeping variables.

     sentinel = object()          # unique object used to signal cache misses
     make_key = _make_key         # build a key from the function arguments
     PREV, NEXT, KEY, RESULT = 0, 1, 2, 3   # names for the link fields
     cache = {}
     hits = misses = 0
     full = False
     cache_get = cache.get    # bound method to lookup a key or return None
     cache_len = cache.__len__  # get cache size without calling len()
     lock = RLock()           # because linkedlist updates aren't threadsafe
     root = []                # root of the circular doubly linked list
     root[:] = [root, root, None, None]     # initialize by pointing to self
  • The wrapper acquires the lock before performing any operation.

  • A few important variables - root list contains all the items adhering to maxsize value. The important concept to remember root is self-referencing itself (root[:] = [root, root, None, None]) in the previous (0) and next position (1)


  • The first case, when maxsize is 0, that means no cache functionality, the wrapper wraps the user function without any caching ability. The wrapper increments cache miss count and return the result.

     def wrapper(*args, **kwds):
         # No caching -- just a statistics update
         nonlocal misses
         misses += 1
         result = user_function(*args, **kwds)
         return result
  • The second case. when maxsize is None. In the section, there is no limit on the number of elements to store in the cache. So the wrapper checks for the key in the cache(dictionary). When the key is present, the wrapper returns the value and updates the cache hit info. And when the key is missing, the wrapper calls the user function with user passed arguments, updates the cache, updates the cache miss info, and returns the result.

     def wrapper(*args, **kwds):
         # Simple caching without ordering or size limit
         nonlocal hits, misses
         key = make_key(args, kwds, typed)
         result = cache_get(key, sentinel)
         if result is not sentinel:
             hits += 1
             return result
         misses += 1
         result = user_function(*args, **kwds)
         cache[key] = result
         return result
  • The third case, when maxsize is a default value (128) or user passed integer value. Here is the actual LRU cache implementation. The entire code in the wrapper in a thread-safe way. Before performing any operation, read/write/delete from the cache, the wrapper obtains RLock.


  • The value in the cache is stored as a list of four items(remember root). The first item is the reference to the previous item, the second item is the reference to the next item, the third item is the key for the particular function call, the fourth item is a result. Here is an actual value for Fibonacci function argument 1 [[[...], [...], 1, 1], [[...], [...], 1, 1], None, None] . [...] means the reference to the self(list).

  • The first check is for the cache hit. If yes, the value in the cache is a list of four values.

     nonlocal root, hits, misses, full
     key = make_key(args, kwds, typed)
     with lock:
         link = cache_get(key)
          if link is not None:
              # Move the link to the front of the circular queue
              print(f'Cache hit for {key}, {root}')
              link_prev, link_next, _key, result = link
              link_prev[NEXT] = link_next
              link_next[PREV] = link_prev
              last = root[PREV]
              last[NEXT] = root[PREV] = link
              link[PREV] = last
              link[NEXT] = root
              hits += 1
              return result

    When the item is already in the cache, there is no need to check whether the circular queue is full or pop the item from the cache. Rather change the positions of the items in the circular queue. Since the recently used item is always on the top, the code moves to recent value to the top of the queue and the previous top item becomes next of the current item last[NEXT] = root[PREV] = link and link[PREV] = last and link[NEXT] = root. NEXT and PREV are initialized in the top which points to appropriate positions in the list PREV, NEXT, KEY, RESULT = 0, 1, 2, 3 # names for the link fields. Finally, increment the cache hit info and return the result.

  • When it is cache miss, update the misses info and the code checks for three cases. All three operations happen after obtaining the RLock. Three cases in the source code in the following order - after acquiring the lock key is found in the cache, the cache is full, and the cache can take new items. For demonstration, let's follow the order, when the cache is not full, the cache is full, and the key is available in the cache after acquiring the lock.


        # Put result in a new link at the front of the queue.
        last = root[PREV]
        link = [last, root, key, result]
        last[NEXT] = root[PREV] = cache[key] = link
        # Use the cache_len bound method instead of the len() function
        # which could potentially be wrapped in an lru_cache itself.
        full = (cache_len() >= maxsize)
  • 当缓存未满时,准备最近的result(link=[last, root, key, result]),包含根节点的前一个引用、根节点、键和计算结果。

  • 然后将最近的结果(link)指向循环队列的顶部(root[PREV] = link),将根节点的上一个项的下一个指向最近的结果(last[NEXT]=link),并将最近的结果添加到缓存中(cache[key] = link)。

  • 最后,检查缓存是否已满(cache_len() >= maxsize and cache_len = cache.__len__ is declared in the top),并将状态设置为已满。

  • 对于斐波那契数列的示例,当函数收到第一个值1时,根节点为空,根值为[[...], [...], None, None],在将结果添加到循环队列后,根值为[[[...], [...], 1, 1], [[...], [...], 1, 1], None, None]。前一个和后一个都指向键1的结果。对于下一个值0,插入后根值为

    [[[[...], [...], 1, 1], [...], 0, 0], [[...], [[...], [...], 0, 0], 1, 1], None, None]。前一个是[[[[...], [...], None, None], [...], 1, 1], [[...], [[...], [...], 1, 1], None, None], 0, 0],后一个是[[[[...], [...], 0, 0], [...], None, None], [[...], [[...], [...], None, None], 0, 0], 1, 1]


    elif full:
        # Use the old root to store the new key and result.
        oldroot = root
        oldroot[KEY] = key
        oldroot[RESULT] = result
        # Empty the oldest link and make it the new root.
        # Keep a reference to the old key and old result to
        # prevent their ref counts from going to zero during the
        # update. That will prevent potentially arbitrary object
        # clean-up code (i.e. __del__) from running while we're
        # still adjusting the links.
        root = oldroot[NEXT]
        oldkey = root[KEY]
        oldresult = root[RESULT]
        root[KEY] = root[RESULT] = None
        # Now update the cache dictionary.
        del cache[oldkey]
        # Save the potentially reentrant cache[key] assignment
        # for last, after the root and links have been put in
        # a consistent state.
        cache[key] = oldroot
  • 当缓存已满时,将根节点用旧根节点(oldroot=root)替换,并更新键和结果。
  • 然后将旧根节点的下一个项目作为新的根节点(root=oldroot[NEXT]),复制新的根节点键和结果(oldkey = root[KEY] and oldresult = root[RESULT])。
  • 将新的根节点键和结果设置为None(root[KEY] = root[RESULT] = None)。
  • 从缓存中删除旧键的项(del cache[oldkey])并将计算出的结果添加到缓存中(cache[key] = oldroot)。
  • 对于斐波那契示例,当缓存已满且键为2时,根值为[[[[...], [...], 1, 1], [...], 0, 0], [[...], [[...], [...], 0, 0], 1, 1], None, None],块结束时的新根是[[[[...], [...], 0, 0], [...], 2, 1], [[...], [[...], [...], 2, 1], 0, 0], None, None]。可以看到,键1被替换为键2


    if key in cache:
        # Getting here means that this same key was added to the
        # cache while the lock was released.  Since the link
        # update is already done, we need only return the
        # computed result and update the count of misses.




cache = {}
cache_get = cache.get
make_key = _make_key         # build a key from the function arguments
key = make_key(args, kwds, typed)
result = cache_get(key, sentinel)


elif full:

    oldroot = root
    oldroot[KEY] = key
    oldroot[RESULT] = result

    # update the linked list to pop out the least recent function call information        
    root = oldroot[NEXT]
    oldkey = root[KEY]
    oldresult = root[RESULT]
    root[KEY] = root[RESULT] = None

