Source code for cate.util.cache

# The MIT License (MIT)
# Copyright (c) 2021 by the ESA CCI Toolbox development team and contributors
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
# of the Software, and to permit persons to whom the Software is furnished to do
# so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.


"""
Description
===========

This module defines the :py:class:`Cache` class which represents a general-purpose cache.
A cache is configured by a :py:class:`CacheStore` which is responsible for storing and reloading cached items.

The default cache stores are

* :py:class:`MemoryCacheStore`
* :py:class:`FileCacheStore`

Every cache has capacity in physical units defined by the :py:class:`CacheStore`. When the cache capacity is exceeded
a replacement policy for cached items is applied until the cache size falls below a given ratio of the total capacity.

The default replacement policies are

* :py:data:`POLICY_LRU`
* :py:data:`POLICY_MRU`
* :py:data:`POLICY_LFU`
* :py:data:`POLICY_RR`

This package is independent of other ``cate.*``packages and can therefore be used stand-alone.

Components
==========
"""

import os
import os.path
import sys
import time
from abc import ABCMeta, abstractmethod
from threading import RLock

from .misc import is_debug_mode

__author__ = "Norman Fomferra (Brockmann Consult GmbH)"

_DEBUG_CACHE = is_debug_mode()


[docs]class CacheStore(metaclass=ABCMeta): """ Represents a store to which cached values can be stored into and restored from. """
[docs] @abstractmethod def can_load_from_key(self, key) -> bool: """ Test whether a stored value representation can be loaded from the given key. :param key: the key :return: True, if so """ pass
[docs] @abstractmethod def load_from_key(self, key): """ Load a stored value representation of the value and its size from the given key. :param key: the key :return: a 2-element sequence containing the stored representation of the value and it's size """ pass
[docs] @abstractmethod def store_value(self, key, value): """ Store a value and return it's stored representation and size in any unit, e.g. in bytes. :param key: the key :param value: the value :return: a 2-element sequence containing the stored representation of the value and it's size """ pass
[docs] @abstractmethod def restore_value(self, key, stored_value): """ Restore a vale from its stored representation. :param key: the key :param stored_value: the stored representation of the value :return: the item """ pass
[docs] @abstractmethod def discard_value(self, key, stored_value): """ Discard a value from it's storage. :param key: the key :param stored_value: the stored representation of the value """ pass
[docs]class MemoryCacheStore(CacheStore): """ Simple memory store. """
[docs] def can_load_from_key(self, key) -> bool: # This store type does not maintain key-value pairs on its own return False
[docs] def load_from_key(self, key): raise NotImplementedError()
[docs] def store_value(self, key, value): """ Return (value, 1). :param key: the key :param value: the original value :return: the tuple (stored value, size) where stored value is the sequence [key, value]. """ return [key, value], _compute_object_size(value)
[docs] def restore_value(self, key, stored_value): """ :param key: the key :param stored_value: the stored representation of the value :return: the original value. """ if key != stored_value[0]: raise ValueError('key does not match stored value') return stored_value[1]
[docs] def discard_value(self, key, stored_value): """ Clears the value in the given stored_value. :param key: the key :param stored_value: the stored representation of the value """ if key != stored_value[0]: raise ValueError('key does not match stored value') stored_value[1] = None
[docs]class FileCacheStore(CacheStore): """ Simple file store for values which can be written and read as bytes, e.g. encoded PNG images. """ def __init__(self, cache_dir: str, ext: str): self.cache_dir = cache_dir self.ext = ext
[docs] def can_load_from_key(self, key) -> bool: path = self._key_to_path(key) return os.path.exists(path)
[docs] def load_from_key(self, key): path = self._key_to_path(key) return path, os.path.getsize(path)
[docs] def store_value(self, key, value): path = self._key_to_path(key) dir_path = os.path.dirname(path) if not os.path.exists(dir_path): os.makedirs(dir_path, exist_ok=True) with open(path, 'wb') as fp: fp.write(value) return path, os.path.getsize(path)
[docs] def restore_value(self, key, stored_value): path = self._key_to_path(key) with open(path, 'rb') as fp: return fp.read()
[docs] def discard_value(self, key, stored_value): path = self._key_to_path(key) try: os.remove(path) # TODO (forman): also remove empty directories up to self.cache_dir except IOError: pass
def _key_to_path(self, key): return os.path.join(self.cache_dir, str(key) + self.ext)
def _policy_lru(item): return item.access_time def _policy_mru(item): return -item.access_time def _policy_lfu(item): return item.access_count def _policy_rr(item): return item.access_count % 2 #: Discard Least Recently Used items first POLICY_LRU = _policy_lru #: Discard Most Recently Used first POLICY_MRU = _policy_mru #: Discard Least Frequently Used first POLICY_LFU = _policy_lfu #: Discard items by Random Replacement POLICY_RR = _policy_rr _T0 = time.perf_counter()
[docs]class Cache: """ An implementation of a cache. See https://en.wikipedia.org/wiki/Cache_algorithms """
[docs] class Item: """ Cache-private class representing an item in the cache. """ def __init__(self): self.key = None self.stored_value = None self.stored_size = 0 self.creation_time = 0 self.access_time = 0 self.access_count = 0 @staticmethod def load_from_key(store, key): if not store.can_load_from_key(key): return None item = Cache.Item() item._load_from_key(store, key) return item def store(self, store, key, value): self.key = key self.access_count = 0 self._access() stored_value, stored_size = store.store_value(key, value) self.stored_value = stored_value self.stored_size = stored_size def restore(self, store, key): self._access() return store.restore_value(key, self.stored_value) def discard(self, store, key): store.discard_value(key, self.stored_value) self.__init__() def _load_from_key(self, store, key): self.key = key self.access_count = 0 self._access() stored_value, stored_size = store.load_from_key(key) self.stored_value = stored_value self.stored_size = stored_size def _access(self): self.access_time = time.perf_counter() - _T0 self.access_count += 1
def __init__(self, store=MemoryCacheStore(), capacity=1000, threshold=0.75, policy=POLICY_LRU, parent_cache=None): """ Constructor. :param store: the cache store, see CacheStore interface :param capacity: the size capacity in units used by the store's store() method :param threshold: a number greater than zero and less than one :param policy: cache replacement policy. This is a function that maps a :py:class:`Cache.Item` to a numerical value. See :py:data:`POLICY_LRU`, :py:data:`POLICY_MRU`, :py:data:`POLICY_LFU`, :py:data:`POLICY_RR` """ self._store = store self._capacity = capacity self._threshold = threshold self._policy = policy self._parent_cache = parent_cache self._size = 0 self._max_size = self._capacity * self._threshold self._item_dict = {} self._item_list = [] self._lock = RLock() @property def policy(self): return self._policy @property def store(self): return self._store @property def capacity(self): return self._capacity @property def threshold(self): return self._threshold @property def size(self): return self._size @property def max_size(self): return self._max_size def get_value(self, key): self._lock.acquire() item = self._item_dict.get(key) value = None restored = False if item: value = item.restore(self._store, key) restored = True if _DEBUG_CACHE: _debug_print('restored value for key "%s" from cache' % key) elif self._parent_cache: item = self._parent_cache.get_value(key) if item: value = item.restore(self._parent_cache.store, key) restored = True if _DEBUG_CACHE: _debug_print('restored value for key "%s" from parent cache' % key) if not restored: item = Cache.Item.load_from_key(self._store, key) if item: self._add_item(item) value = item.restore(self._store, key) if _DEBUG_CACHE: _debug_print('restored value for key "%s" from cache' % key) self._lock.release() return value def put_value(self, key, value): self._lock.acquire() if self._parent_cache: # remove value from parent cache, because this cache will now take over self._parent_cache.remove_value(key) item = self._item_dict.get(key) if item: self._remove_item(item) item.discard(self._store, key) if _DEBUG_CACHE: _debug_print('discarded value for key "%s" from cache' % key) else: item = Cache.Item() item.store(self._store, key, value) if _DEBUG_CACHE: _debug_print('stored value for key "%s" in cache' % key) self._add_item(item) self._lock.release() def remove_value(self, key): self._lock.acquire() if self._parent_cache: self._parent_cache.remove_value(key) item = self._item_dict.get(key) if item: self._remove_item(item) item.discard(self._store, key) if _DEBUG_CACHE: _debug_print('cate.util.im.cache.Cache: discarded value for key "%s" from parent cache' % key) self._lock.release() def _add_item(self, item): self._item_dict[item.key] = item self._item_list.append(item) if self._size + item.stored_size > self._max_size: self.trim(item.stored_size) self._size += item.stored_size def _remove_item(self, item): self._item_dict.pop(item.key) self._item_list.remove(item) self._size -= item.stored_size def trim(self, extra_size=0): if _DEBUG_CACHE: _debug_print('trimming...') self._lock.acquire() self._item_list.sort(key=self._policy) keys = [] size = self._size max_size = self._max_size for item in self._item_list: if size + extra_size > max_size: keys.append(item.key) size -= item.stored_size self._lock.release() # release lock to give another thread a chance then require lock again self._lock.acquire() for key in keys: if self._parent_cache: # Before discarding item fully, put its value into the parent cache value = self.get_value(key) self.remove_value(key) if value: self._parent_cache.put_value(key, value) else: self.remove_value(key) self._lock.release() def clear(self, clear_parent=True): self._lock.acquire() if self._parent_cache and clear_parent: self._parent_cache.clear(clear_parent) keys = list(self._item_dict.keys()) self._lock.release() for key in keys: if self._parent_cache and not clear_parent: value = self.get_value(key) if value: self._parent_cache.put_value(key, value) self.remove_value(key)
def _debug_print(msg): print("cate.util.cache.Cache:", msg) def _compute_object_size(obj): if hasattr(obj, 'nbytes'): # A numpy ndarray instance return obj.nbytes elif hasattr(obj, 'size') and hasattr(obj, 'mode'): # A PIL Image instance w, h = obj.size m = obj.mode return w * h * (4 if m in ('RGBA', 'RGBx', 'I', 'F') else 3 if m in ('RGB', 'YCbCr', 'LAB', 'HSV') else 1. / 8. if m == '1' else 1) else: return sys.getsizeof(obj)