Source code for memoize.statuses

"""
[Internal use only] Encapsulates update state management.
"""
import datetime
import logging
from asyncio import Future
from typing import Optional, Dict, Awaitable

from memoize import coerced
from memoize.entry import CacheKey, CacheEntry


[docs]class UpdateStatuses: def __init__(self, update_lock_timeout: datetime.timedelta = datetime.timedelta(minutes=5)) -> None: self.logger = logging.getLogger(__name__) self._update_lock_timeout = update_lock_timeout # type declaration should not be in comment once we drop py35 support self._updates_in_progress = {} # type: Dict[CacheKey, Future]
[docs] def is_being_updated(self, key: CacheKey) -> bool: """Checks if update for given key is in progress. Obtained info is valid until control gets back to IO-loop.""" return key in self._updates_in_progress
[docs] def mark_being_updated(self, key: CacheKey) -> None: """Informs that update has been started. Should be called only if 'is_being_updated' returned False (and since then IO-loop has not been lost).. Calls to 'is_being_updated' will return True until 'mark_updated' will be called.""" if key in self._updates_in_progress: raise ValueError('Key {} is already being updated'.format(key)) future = coerced._future() self._updates_in_progress[key] = future def complete_on_timeout_passed(): if key not in self._updates_in_progress: return if self._updates_in_progress[key] == future and not self._updates_in_progress[key].done(): self.logger.debug('Update task timed out - notifying clients awaiting for key %s', key) self._updates_in_progress[key].set_result(None) self._updates_in_progress.pop(key) coerced._call_later(self._update_lock_timeout, complete_on_timeout_passed)
[docs] def mark_updated(self, key: CacheKey, entry: CacheEntry) -> None: """Informs that update has been finished. Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called.""" if key not in self._updates_in_progress: raise ValueError('Key {} is not being updated'.format(key)) update = self._updates_in_progress.pop(key) update.set_result(entry)
[docs] def mark_update_aborted(self, key: CacheKey) -> None: """Informs that update failed to complete. Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called.""" if key not in self._updates_in_progress: raise ValueError('Key {} is not being updated'.format(key)) update = self._updates_in_progress.pop(key) update.set_result(None)
[docs] def await_updated(self, key: CacheKey) -> Awaitable[Optional[CacheEntry]]: """Waits (asynchronously) until update in progress has benn finished. Returns updated entry or None if update failed/timed-out. Should be called only if 'is_being_updated' returned True (and since then IO-loop has not been lost).""" if not self.is_being_updated(key): raise ValueError('Key {} is not being updated'.format(key)) return self._updates_in_progress[key]