I was in the same situation and was forced to implement it by myself.
There were also a few other issues with the python 3.x implementation:
- The main issues is not enabling a separate cache for each instance (in case the function being cached is an instance method). Meaning that if I set a maxsize of 100 to the cache, and I have 100 instances, if all are equally active - the caching will effectively do nothing.
- Also, if you run clear_cache - it clears the cache for all instances.
- The second main thing, is that I wanted a timeout feature to clear the cache every X seconds.
Function lru_cache implementation for python 2.7:
import time
import functools
import collections
def lru_cache(maxsize = 255, timeout = None):
"""lru_cache(maxsize = 255, timeout = None) --> returns a decorator which returns an instance (a descriptor).
Purpose - This decorator factory will wrap a function / instance method and will supply a caching mechanism to the function.
For every given input params it will store the result in a queue of maxsize size, and will return a cached ret_val
if the same parameters are passed.
Params - maxsize - int, the cache size limit, anything added above that will delete the first values enterred (FIFO).
This size is per instance, thus 1000 instances with maxsize of 255, will contain at max 255K elements.
- timeout - int / float / None, every n seconds the cache is deleted, regardless of usage. If None - cache will never be refreshed.
Notes - If an instance method is wrapped, each instance will have it's own cache and it's own timeout.
- The wrapped function will have a cache_clear variable inserted into it and may be called to clear it's specific cache.
- The wrapped function will maintain the original function's docstring and name (wraps)
- The type of the wrapped function will no longer be that of a function but either an instance of _LRU_Cache_class or a functool.partial type.
On Error - No error handling is done, in case an exception is raised - it will permeate up.
"""
class _LRU_Cache_class(object):
def __init__(self, input_func, max_size, timeout):
self._input_func = input_func
self._max_size = max_size
self._timeout = timeout
# This will store the cache for this function, format - {caller1 : [OrderedDict1, last_refresh_time1], caller2 : [OrderedDict2, last_refresh_time2]}.
# In case of an instance method - the caller is the instance, in case called from a regular function - the caller is None.
self._caches_dict = {}
def cache_clear(self, caller = None):
# Remove the cache for the caller, only if exists:
if caller in self._caches_dict:
del self._caches_dict[caller]
self._caches_dict[caller] = [collections.OrderedDict(), time.time()]
def __get__(self, obj, objtype):
""" Called for instance methods """
return_func = functools.partial(self._cache_wrapper, obj)
return_func.cache_clear = functools.partial(self.cache_clear, obj)
# Return the wrapped function and wraps it to maintain the docstring and the name of the original function:
return functools.wraps(self._input_func)(return_func)
def __call__(self, *args, **kwargs):
""" Called for regular functions """
return self._cache_wrapper(None, *args, **kwargs)
# Set the cache_clear function in the __call__ operator:
__call__.cache_clear = cache_clear
def _cache_wrapper(self, caller, *args, **kwargs):
# Create a unique key including the types (in order to differentiate between 1 and '1'):
kwargs_key = "".join(map(lambda x : str(x) + str(type(kwargs[x])) + str(kwargs[x]), sorted(kwargs)))
key = "".join(map(lambda x : str(type(x)) + str(x) , args)) + kwargs_key
# Check if caller exists, if not create one:
if caller not in self._caches_dict:
self._caches_dict[caller] = [collections.OrderedDict(), time.time()]
else:
# Validate in case the refresh time has passed:
if self._timeout != None:
if time.time() - self._caches_dict[caller][1] > self._timeout:
self.cache_clear(caller)
# Check if the key exists, if so - return it:
cur_caller_cache_dict = self._caches_dict[caller][0]
if key in cur_caller_cache_dict:
return cur_caller_cache_dict[key]
# Validate we didn't exceed the max_size:
if len(cur_caller_cache_dict) >= self._max_size:
# Delete the first item in the dict:
cur_caller_cache_dict.popitem(False)
# Call the function and store the data in the cache (call it with the caller in case it's an instance function - Ternary condition):
cur_caller_cache_dict[key] = self._input_func(caller, *args, **kwargs) if caller != None else self._input_func(*args, **kwargs)
return cur_caller_cache_dict[key]
# Return the decorator wrapping the class (also wraps the instance to maintain the docstring and the name of the original function):
return (lambda input_func : functools.wraps(input_func)(_LRU_Cache_class(input_func, maxsize, timeout)))
Unittesting code:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import time
import random
import unittest
import lru_cache
class Test_Decorators(unittest.TestCase):
def test_decorator_lru_cache(self):
class LRU_Test(object):
"""class"""
def __init__(self):
self.num = 0
@lru_cache.lru_cache(maxsize = 10, timeout = 3)
def test_method(self, num):
"""test_method_doc"""
self.num += num
return self.num
@lru_cache.lru_cache(maxsize = 10, timeout = 3)
def test_func(num):
"""test_func_doc"""
return num
@lru_cache.lru_cache(maxsize = 10, timeout = 3)
def test_func_time(num):
"""test_func_time_doc"""
return time.time()
@lru_cache.lru_cache(maxsize = 10, timeout = None)
def test_func_args(*args, **kwargs):
return random.randint(1,10000000)
# Init vars:
c1 = LRU_Test()
c2 = LRU_Test()
m1 = c1.test_method
m2 = c2.test_method
f1 = test_func
# Test basic caching functionality:
self.assertEqual(m1(1), m1(1))
self.assertEqual(c1.num, 1) # c1.num now equals 1 - once cached, once real
self.assertEqual(f1(1), f1(1))
# Test caching is different between instances - once cached, once not cached:
self.assertNotEqual(m1(2), m2(2))
self.assertNotEqual(m1(2), m2(2))
# Validate the cache_clear funcionality only on one instance:
prev1 = m1(1)
prev2 = m2(1)
prev3 = f1(1)
m1.cache_clear()
self.assertNotEqual(m1(1), prev1)
self.assertEqual(m2(1), prev2)
self.assertEqual(f1(1), prev3)
# Validate the docstring and the name are set correctly:
self.assertEqual(m1.__doc__, "test_method_doc")
self.assertEqual(f1.__doc__, "test_func_doc")
self.assertEqual(m1.__name__, "test_method")
self.assertEqual(f1.__name__, "test_func")
# Test the limit of the cache, cache size is 10, fill 15 vars, the first 5 will be overwritten for each and the other 5 are untouched. Test that:
c1.num = 0
c2.num = 10
m1.cache_clear()
m2.cache_clear()
f1.cache_clear()
temp_list = map(lambda i : (test_func_time(i), m1(i), m2(i)), range(15))
for i in range(5, 10):
self.assertEqual(temp_list[i], (test_func_time(i), m1(i), m2(i)))
for i in range(0, 5):
self.assertNotEqual(temp_list[i], (test_func_time(i), m1(i), m2(i)))
# With the last run the next 5 vars were overwritten, now it should have only 0..4 and 10..14:
for i in range(5, 10):
self.assertNotEqual(temp_list[i], (test_func_time(i), m1(i), m2(i)))
# Test different vars don't collide:
self.assertNotEqual(test_func_args(1), test_func_args('1'))
self.assertNotEqual(test_func_args(1.0), test_func_args('1.0'))
self.assertNotEqual(test_func_args(1.0), test_func_args(1))
self.assertNotEqual(test_func_args(None), test_func_args('None'))
self.assertEqual(test_func_args(test_func), test_func_args(test_func))
self.assertEqual(test_func_args(LRU_Test), test_func_args(LRU_Test))
self.assertEqual(test_func_args(object), test_func_args(object))
self.assertNotEqual(test_func_args(1, num = 1), test_func_args(1, num = '1'))
# Test the sorting of kwargs:
self.assertEqual(test_func_args(1, aaa = 1, bbb = 2), test_func_args(1, bbb = 2, aaa = 1))
self.assertNotEqual(test_func_args(1, aaa = '1', bbb = 2), test_func_args(1, bbb = 2, aaa = 1))
# Sanity validation of values
c1.num = 0
c2.num = 10
m1.cache_clear()
m2.cache_clear()
f1.cache_clear()
self.assertEqual((f1(0), m1(0), m2(0)), (0, 0, 10))
self.assertEqual((f1(0), m1(0), m2(0)), (0, 0, 10))
self.assertEqual((f1(1), m1(1), m2(1)), (1, 1, 11))
self.assertEqual((f1(2), m1(2), m2(2)), (2, 3, 13))
self.assertEqual((f1(2), m1(2), m2(2)), (2, 3, 13))
self.assertEqual((f1(3), m1(3), m2(3)), (3, 6, 16))
self.assertEqual((f1(3), m1(3), m2(3)), (3, 6, 16))
self.assertEqual((f1(4), m1(4), m2(4)), (4, 10, 20))
self.assertEqual((f1(4), m1(4), m2(4)), (4, 10, 20))
# Test timeout - sleep, it should refresh cache, and then check it was cleared:
prev_time = test_func_time(0)
self.assertEqual(test_func_time(0), prev_time)
self.assertEqual(m1(4), 10)
self.assertEqual(m2(4), 20)
time.sleep(3.5)
self.assertNotEqual(test_func_time(0), prev_time)
self.assertNotEqual(m1(4), 10)
self.assertNotEqual(m2(4), 20)
if __name__ == '__main__':
unittest.main()
functools32
andrepoze.lru
are available in 2.7 and work well. Better to use standard decorators than recipes. – Ashburn