Source code for patcher.client.token_manager

import asyncio
from datetime import datetime, timedelta, timezone
from typing import Dict

from pydantic import ValidationError

from ..models.jamf_client import JamfClient
from ..models.token import AccessToken
from ..utils.exceptions import APIResponseError, CredentialError, PatcherError, TokenError
from ..utils.logger import LogMe
from . import BaseAPIClient
from .config_manager import ConfigManager


[docs] class TokenManager: def __init__(self, config: ConfigManager): """ The ``TokenManager`` class handles all operations related to the token lifecycle, including fetching, saving, and validating the access token. It is initialized with a :class:`~patcher.client.config_manager.ConfigManager` instance, which provides the necessary credentials. :param config: A ``ConfigManager`` instance for managing credentials and configurations. :type config: :class:`~patcher.client.config_manager.ConfigManager` """ self.log = LogMe(self.__class__.__name__) self.config = config self.api_client = BaseAPIClient() self._client = None # lazy load creds self._token = None self.lock = asyncio.Lock() @property def client(self): if not self._client: self.log.debug("Attempting to attach JamfClient.") self._client = self.attach_client() self.log.info(f"JamfClient initialized with base URL: {self._client.base_url}") return self._client @property def token(self) -> AccessToken: if not self._token: self.log.debug("Attempting to load AccessToken.") try: self._token = self.load_token() except CredentialError: self.log.warning("Failed to load token from keychain.") self.log.info( f"Token ending in {self._token.token[-4:]} loaded successfully from JamfClient." ) return self._token
[docs] def load_token(self) -> AccessToken: """ Loads the ``AccessToken`` and its expiration from the keyring. If either the AccessToken string or AccessToken expiration cannot be retrieved, a :exc:`~patcher.utils.exceptions.CredentialError` is raised. :return: An AccessToken object containing the token and its expiration date. :rtype: :class:`~patcher.models.token.AccessToken` """ self.log.debug("Attempting to load token and expiration from keychain.") try: token = self.config.get_credential("TOKEN") or "" expires = ( self.config.get_credential("TOKEN_EXPIRATION") or datetime(1970, 1, 1, tzinfo=timezone.utc).isoformat() ) self.log.info("Token and expiration loaded from keychain") return AccessToken(token=token, expires=expires) # type: ignore except CredentialError as e: self.log.error("Token or expiration is missing, loading failed.") raise TokenError("Unable to load token from keychain.", error_msg=str(e))
[docs] def attach_client(self) -> JamfClient: """ Creates and returns a ``JamfClient`` object using the stored credentials. :return: The ``JamfClient`` object if validation is successful. :rtype: :class:`~patcher.models.jamf_client.JamfClient` :raises PatcherError: If ``JamfClient`` object fails pydantic validation. """ self.log.debug("Attempting to attach JamfClient with stored credentials") try: client = JamfClient( client_id=self.config.get_credential("CLIENT_ID"), client_secret=self.config.get_credential("CLIENT_SECRET"), server=self.config.get_credential("URL"), ) self.log.info(f"JamfClient ending in {client.client_id[-4:]} attached successfully") return client except ValidationError as e: self.log.error(f"Failed attaching JamfClient due to validation error. Details: {e}") raise PatcherError( "Unable to attach JamfClient due to invalid configuration", error_msg=str(e), )
[docs] async def fetch_token(self) -> AccessToken: """ Asynchronously fetches a new access token from the Jamf API. The token is then saved and returned for use. :return: The fetched ``AccessToken`` instance. :rtype: :class:`~patcher.models.token.AccessToken` :raises TokenError: If a token cannot be retrieved from the Jamf API. """ self.log.debug("Attempting to fetch new AccessToken.") url = f"{self.client.base_url}/api/oauth/token" headers = {"Content-Type": "application/x-www-form-urlencoded"} data = { "client_id": self.client.client_id, "grant_type": "client_credentials", "client_secret": self.client.client_secret, } try: response = await self.api_client.fetch_json( url, headers=headers, method="POST", data=data ) self.log.info("Received valid response from Jamf API for AccessToken call.") except APIResponseError as e: self.log.error(f"Failed to fetch a token from {url}. Details: {e}") raise TokenError( "Unable to retrieve AccessToken from Jamf instance.", url=url, error_msg=str(e), ) return self._parse_token_response(response)
def _parse_token_response(self, response: Dict) -> AccessToken: """ Private method that parses the Jamf API Token response and extracts the token string and token expiration. :param response: The API response payload from Jamf. :type response: :py:obj:`~typing.Dict` :return: The extracted ``AccessToken`` object. :rtype: :class:`~patcher.models.token.AccessToken` """ self.log.debug("Attempting to parse API response for AccessToken.") token = response.get("access_token") expires_in = response.get("expires_in") expiration = datetime.now(timezone.utc) + timedelta(seconds=expires_in) access_token = AccessToken(token=token, expires=expiration) self._save_token(token=access_token) self.log.info("New token fetched and saved successfully") return access_token def _save_token(self, token: AccessToken): """ This method stores the access token and its expiration date in the keyring for later retrieval. It also updates the ``JamfClient`` instance with the new token. :param token: The ``AccessToken`` instance containing the token and its expiration date. :type token: :class:`~patcher.models.token.AccessToken` :raises TokenError: If either the token string or expiration could not be saved. """ self.log.debug("Attempting to save retrieved AccessToken object.") try: self.config.set_credential("TOKEN", token.token) self.config.set_credential("TOKEN_EXPIRATION", token.expires.isoformat()) except CredentialError as e: self.log.error(f"Unable to save AccessToken object to keychain. Details: {e}") raise TokenError("Unable to save AccessToken object to keychain.", error_msg=str(e)) self._token = None # clear cache; force reload on next access self.log.info("AccessToken object updated in keychain")
[docs] async def ensure_valid_token(self) -> AccessToken: """ Verifies the current access token is valid (present and not expired). If the token is found to be invalid, a new token is requested and refreshed. .. seealso:: The :meth:`~patcher.utils.decorators.check_token` decorator leverages this method with thread locking to ensure tokens are valid before API calls. :return: The ``AccessToken`` object by way of ``self.token`` property. :rtype: :class:`~patcher.models.token.AccessToken` """ async with self.lock: if self.token.is_expired: self.log.warning("Bearer token is invalid or expired, attempting to refresh...") await self.fetch_token() self.log.info( f"Token ending in ({self.token.token[-4:]}) retrieved successfully. Remaining seconds: {self.token.seconds_remaining}" ) return self.token