"""
Caching module which leverages the use of diskcache python library. Essential methods mandatory to any Cache class: [set, get, delete, clear]
"""
import os
import time
import uuid
import shutil
import string
import datetime
import random
import diskcache
from pathlib import Path
from typing import Any
from collections import (
deque,
defaultdict,
OrderedDict,
)
from functools import lru_cache
[docs]
class CacheBase:
"""
Base class for caching
"""
[docs]
def set(self, key: str, value: Any, expiry: int | float = None):
"""
Set a value in the cache.
"""
raise NotImplementedError("Implement this method for setting keys.")
[docs]
def get(self, key: str) -> Any:
"""
Get a value from the cache. Returns None if the key is not found.
"""
raise NotImplementedError("Implement this method to retrieve values.")
[docs]
def delete(self, key: str):
"""
Delete a value from the cache.
"""
raise NotImplementedError("Implement this method for deleting keys.")
[docs]
def clear(self):
"""
Clear all values from the cache.
"""
raise NotImplementedError("Implement this method for clearing key pairs.")
[docs]
class InMemoryCache(CacheBase):
"""
InMemoryCache with LRU eviction.
"""
def __init__(self, maxkeys=None, *_):
self.expiry_map = {}
self.cache = OrderedDict()
self.maxkeys = maxkeys
[docs]
def set(self, key: str, value: Any, expiry=None):
"""
Set a value in the cache.
"""
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if expiry:
self.expiry_map[key] = datetime.datetime.now() + datetime.timedelta(seconds=expiry)
if self.maxkeys and len(self.cache) > self.maxkeys:
oldest_key, _ = self.cache.popitem(last=False) # LRU
self.expiry_map.pop(oldest_key, None)
[docs]
def get(self, key: str, default: Any = None, pop: bool = False) -> Any:
"""
Get a value from the cache.
"""
if key in self.expiry_map:
expiry_date = self.expiry_map[key]
if datetime.datetime.now() >= expiry_date:
self.cache.pop(key, None)
self.expiry_map.pop(key, None)
return None
if key in self.cache:
self.cache.move_to_end(key) # Mark as recently used
return self.cache.pop(key, default) if pop else self.cache.get(key, default)
[docs]
def has(self, key: str):
return key in self.cache
[docs]
def delete(self, key: str):
self.cache.pop(key, None)
self.expiry_map.pop(key, None)
[docs]
def clear(self):
self.cache.clear()
self.expiry_map.clear()
[docs]
def close(self):
self.clear()
[docs]
class PersistentFileCache(CacheBase):
"""
Implementation of caching using the `diskcache` library
"""
def __init__(self, path: str, cache_size: int = None):
if os.path.isfile(path):
raise FileExistsError(f"Path should be a directory, not a file: {path}")
self.path = path
self.cache_size = cache_size
self._closed = False
self._cache = (diskcache.Cache(path, size_limit=cache_size, sqlite_timeout=30) if cache_size
else diskcache.Cache(path, sqlite_timeout=30)
)
@property
def closed(self):
"""
Checks whether the cache is closed.
"""
return self._closed
[docs]
def set(self, key: str, _obj: Any, expiry: int | float = None):
"""
Sets the cache with an optional expiry in seconds.
"""
if not isinstance(key, str):
raise KeyError(f"Key should be an instance of str, not {type(key)}")
self._cache.set(key, _obj, expire=expiry)
[docs]
def get(self, key: str):
"""
Retrieves the key value from cache.
"""
if not isinstance(key, str):
raise KeyError(f"Key should be an instance of str, not {type(key)}")
return self._cache.get(key)
[docs]
def delete(self, key: str):
"""
Delete a key from the cache.
"""
self._cache.delete(key)
[docs]
def clear(self):
"""
Clears all data from the cache.
"""
self._cache.clear()
[docs]
def close(self):
"""
This closes the cache.
"""
self._closed = True
self._cache.close()
[docs]
class DynamicFileCache(CacheBase):
"""
Manages a cache of files, dynamically creating new files when existing ones reach a size limit.
"""
def __init__(
self,
cache_dir: str,
cache_limit=1e9,
cached_objs_limit: int = 128
): # Default file limit is 1GB and number of objects to be cached is 128
self.cache_dir = cache_dir
self.cache_limit = cache_limit
if not os.path.isdir(self.cache_dir):
raise FileNotFoundError(f"Directory {cache_dir} not found")
self._loaded_cache_objs = deque(maxlen=cached_objs_limit)
self._reload_cache_files()
[docs]
def _reload_cache_files(self):
"""
This reloads existing cache files in a directory.
"""
self._cache_files = [
Path(dir_entry.path) for dir_entry in os.scandir(self.cache_dir)
if dir_entry.is_dir()
]
self._cache_files.sort()
[docs]
def _get_cache_path(self) -> str:
"""
Returns the path to a cache dir that is not at the size limit.
"""
for dir_entry in self._cache_files:
size = sum(f.stat().st_size for f in dir_entry.iterdir())
if size < self.cache_limit:
return str(dir_entry)
new_path = self._create_new_cache_path()
self._cache_files.append(Path(new_path))
return new_path
[docs]
def _create_new_cache_path(self):
"""
This retrieves new cache path with a unique name using uuid module.
"""
name = f"{len(self._cache_files)}-{str(uuid.uuid4())[:5]}"
path = os.path.join(self.cache_dir, name)
os.makedirs(path, exist_ok=True)
return path
@property
def cache_obj(self):
"""
Returns the Cache object for the current cache file that is not at its limit.
"""
current_cache_path = self._get_cache_path()
for obj in self._loaded_cache_objs:
if os.path.samefile(obj.path, current_cache_path):
return obj
prev_obj = (self._loaded_cache_objs[-1]
if self._loaded_cache_objs else None)
# closing prev obj cache if not closed
if prev_obj:
prev_obj.close() if not prev_obj.closed else 0
cache_obj = self.get_cache_obj(current_cache_path)
self._loaded_cache_objs.append(cache_obj) # add new cache object
return cache_obj
[docs]
def set(self, key: str, data: Any, expiry: float | int = None):
"""
Set cache data with persistence.
"""
self.cache_obj.set(key, data, expiry=expiry)
[docs]
def get(self, key: str) -> Any:
"""
Retrieve cache data.
"""
data = self.cache_obj.get(key)
if data is not None:
return data
for dir_entry in reversed(self._cache_files):
if dir_entry.samefile(self.cache_obj.path):
continue
cache = PersistentFileCache(str(dir_entry))
data = cache.get(key)
if data:
return data
return None
[docs]
@staticmethod
@lru_cache(maxsize=128)
def get_cache_obj(path: str) -> PersistentFileCache:
return PersistentFileCache(path)
[docs]
def delete(self, key: str):
"""
Delete a key pair from the cache.
"""
for p in self._cache_files:
try:
obj = self.get_cache_obj(p)
obj.delete(key)
except:
pass
[docs]
def clear(self):
"""
Clears all data from the cache.
"""
for p in self._cache_files:
try:
obj = self.get_cache_obj(p)
obj.clear()
except:
pass
[docs]
def close(self):
"""
Close the cache.
"""
for p in self._cache_files:
try:
obj = self.get_cache_obj(p)
obj.close()
except:
pass
[docs]
class KeyAsFolderCache(CacheBase):
"""
Caching which stores cache data in folders with the name of cache keys.
"""
def __init__(self, cache_dir: str):
self.cache_dir = cache_dir
if not os.path.isdir(self.cache_dir):
raise FileNotFoundError(f"Directory {cache_dir} not found")
[docs]
def set(self, key: str, data: Any, expiry: int | float = None):
"""
Set some cache data.
"""
cache_data_path = os.path.join(self.cache_dir, key)
cache_obj = self.get_cache_obj(cache_data_path)
cache_obj.set(key, data, expiry=expiry)
[docs]
@staticmethod
@lru_cache(maxsize=128)
def get_cache_obj(path: str) -> PersistentFileCache:
return PersistentFileCache(path)
[docs]
def get(self, key: str) -> Any:
"""
This lookup for a folder in cache_dir with the name of the parsed key and returns the cache data.
"""
cache_data_path = os.path.join(self.cache_dir, key or "")
if not os.path.isdir(cache_data_path):
# no cache data with provided key
return None
cache_obj = self.get_cache_obj(cache_data_path)
cache_data = cache_obj.get(key)
if cache_data is None:
# remove cache data folder because the key might have expired
try:
shutil.rmtree(cache_data_path)
except OSError:
pass
else:
return cache_data
[docs]
@staticmethod
@lru_cache
def get_cache_files(d: str):
"""
This gets the directories in cache_dir.
"""
return [
Path(dir_entry.path) for dir_entry in os.scandir(d)
if dir_entry.is_dir()
]
[docs]
def delete(self, key: str):
"""
Delete a key pair from the cache.
"""
key_cache_dir = os.path.join(self.cache_dir, key)
if not os.path.isdir(key_cache_dir):
return
try:
obj = self.get_cache_obj(key_cache_dir)
obj.clear()
except:
pass
[docs]
def clear(self):
"""
Clear all data from the cache.
"""
for p in self.get_cache_files(self.cache_dir):
try:
obj = self.get_cache_obj(p)
obj.clear()
except:
pass
[docs]
def close(self):
"""
Closes the cache.
"""
for p in self.get_cache_files(self.cache_dir):
try:
obj = self.get_cache_obj(p)
obj.close()
except:
pass
[docs]
class CacheSpeedTest:
"""
This class performs speed test of Cache classes.
"""
instances = [
InMemoryCache,
DynamicFileCache,
KeyAsFolderCache,
]
def __init__(self, repeat: int = 1):
self.repeat = repeat
self.key = self.generate_random_string(32)
self.results = {} # Store results for comparison
[docs]
@staticmethod
def generate_random_string(length):
letters = string.ascii_lowercase
return "".join(random.choice(letters) for _ in range(length))
[docs]
def test_create(self, instance):
start = time.time()
instance = instance("./test")
stop = time.time()
elapse = stop - start
# cleanup
instance.clear()
return elapse
[docs]
def test_set(self, instance):
data = self.generate_random_string(1024)
instance = instance("./test")
start = time.time()
instance.set(self.key, data)
stop = time.time()
elapse = stop - start
return elapse
[docs]
def test_get(self, instance):
instance = instance("./test")
start = time.time()
value = instance.get(self.key)
stop = time.time()
elapse = stop - start
return elapse
[docs]
def test_del(self, instance):
instance = instance("./test")
start = time.time()
value = instance.delete(self.key)
stop = time.time()
elapse = stop - start
return elapse
[docs]
def test_clear(self, instance):
instance = instance("./test")
start = time.time()
value = instance.clear()
stop = time.time()
elapse = stop - start
return elapse
[docs]
def run_test(self, instance):
create_t = 0
set_t = 0
get_t = 0
del_t = 0
clear_t = 0
for i in range(self.repeat):
create_t += self.test_create(instance)
set_t += self.test_set(instance)
get_t += self.test_get(instance)
del_t += self.test_del(instance)
clear_t += self.test_clear(instance)
self.key = self.generate_random_string(32)
# Store results
self.results[instance.__name__] = {
"create": create_t / self.repeat,
"set": set_t / self.repeat,
"get": get_t / self.repeat,
"delete": del_t / self.repeat,
"clear": clear_t / self.repeat,
}
[docs]
def execute_all(self):
print("Running caching speed tests...")
os.makedirs("./test", exist_ok=True)
for instance in self.instances:
self.run_test(instance)
self.print_summary()
self.compare_performance()
[docs]
def print_summary(self):
print("\nOverall Performance Summary:")
for instance_name, result in self.results.items():
print(f"\n[{instance_name}]")
print(
f"Create for {self.repeat} item(s): {result['create']} seconds"
)
print(
f"Set for {self.repeat} item(s): {result['set']} seconds")
print(
f"Get for {self.repeat} item(s): {result['get']} seconds")
print(
f"Delete for {self.repeat} item(s): {result['delete']} seconds"
)
print(
f"Clear for {self.repeat} item(s): {result['clear']} seconds"
)