Source code for patcher.client.token_manager

import asyncio
from datetime import datetime, timedelta, timezone
from typing import AnyStr, Optional, Tuple

from aiohttp import ClientResponseError, ClientSession, TCPConnector

from ..models.jamf_client import JamfClient
from ..models.token import AccessToken
from ..utils import logger
from .config_manager import ConfigManager


[docs] class TokenManager: """ Manages the Bearer Token required for accessing the Jamf API. The ``TokenManager`` class handles all operations related to the token lifecycle, including fetching, saving, and validating the access token. It is initialized with a :class:`~patcher.client.config_manager.ConfigManager` instance, which provides the necessary configuration and credentials. """ def __init__(self, config: ConfigManager): """ Initializes the TokenManager with a provided ``ConfigManager`` instance. :param config: A ``ConfigManager`` instance for managing credentials and configurations. :type config: ConfigManager :raises ValueError: If the ``JamfClient`` configuration is invalid. """ self.config = config self.jamf_client = self.config.attach_client() self.log = logger.LogMe(self.__class__.__name__) self.log.debug("Initializing TokenManager...") if self.jamf_client: self.token = self.jamf_client.token self.log.info("JamfClient and token successfully attached") else: self.log.error("Invalid JamfClient configuration was detected and ValueError raised.") raise ValueError("Invalid JamfClient configuration detected!") self.lock = asyncio.Lock()
[docs] def save_token(self, token: AccessToken): """ Saves the access token and its expiration date securely. This method stores the access token and its expiration date in the keyring for later retrieval. It also updates the ``JamfClient`` instance with the new token. :param token: The ``AccessToken`` instance containing the token and its expiration date. :type token: :class:`~patcher.models.token.AccessToken` """ self.log.debug(f"Saving token: {token.token}") self.config.set_credential("TOKEN", token.token) self.config.set_credential("TOKEN_EXPIRATION", token.expires.isoformat()) self.log.info("Bearer token and expiration updated in keyring") self.jamf_client.token = token
[docs] def token_valid(self) -> bool: """ Determines if the current access token is still valid. This method checks if the token has expired by evaluating its expiration date. :return: ``True`` if the token is valid (not expired), otherwise ``False``. :rtype: bool """ valid = not self.token.is_expired self.log.debug(f"Token validity check: {valid}") return valid
[docs] def get_credentials(self) -> Tuple[AnyStr, AnyStr]: """ Retrieves the client ID and client secret for authentication. These credentials are required for generating a new access token. :return: A tuple containing the client ID and client secret. :rtype: Tuple[AnyStr, AnyStr] """ self.log.debug("Retrieving credentials from JamfClient") return self.jamf_client.client_id, self.jamf_client.client_secret
[docs] def update_token(self, token_str: AnyStr, expires_in: int): """ Updates the current access token with a new value and expiration time. This method is typically called after receiving a new token from the API. :param token_str: The new token string. :type token_str: AnyStr :param expires_in: The number of seconds until the token expires. :type expires_in: int """ self.log.debug( f"Updating token with new value: {token_str}, expiration: {expires_in} seconds" ) expiration_time = datetime.now(timezone.utc) + timedelta(seconds=expires_in) self.token = AccessToken(token=token_str, expires=expiration_time) self.save_token(self.token) self.log.info("Token updated successfully")
[docs] def check_token_lifetime(self, client: Optional[JamfClient] = None) -> bool: """ Evaluates the remaining lifetime of the access token. This method checks if the token's remaining lifetime is sufficient. If the lifetime is less than 5 minutes, a warning is issued. :param client: The `JamfClient` instance to check the token for. If ``None``, defaults to the current instance. :type client: Optional[JamfClient] :return: ``True`` if the token's remaining lifetime is more than 5 minutes, otherwise ``False``. :rtype: bool """ if client is None: client = self.jamf_client if not client.token: self.log.error(f"No token found for JamfClient {client}") return False lifetime = client.token.seconds_remaining self.log.debug(f"Token lifetime in seconds: {lifetime}") if lifetime <= 0: self.log.error("Token lifetime is invalid") return False minutes = lifetime / 60 self.log.debug(f"Token lifetime in minutes: {minutes}") if minutes < 1: self.log.error("Token life time is less than 1 minute.") elif 5 <= minutes <= 10: # Throws warning if token lifetime is between 5-10 minutes self.log.warning( "Token lifetime is between 5-10 minutes, consider increasing duration." ) else: self.log.info( f"Token lifetime is sufficient for {client.client_id}. Remaining Lifetime: {client.token.seconds_remaining}" ) return True
[docs] async def fetch_token(self) -> Optional[AccessToken]: """ Asynchronously fetches a new access token from the Jamf API. This method sends a request to the Jamf API to obtain a new token. The token is then saved and returned for use. :return: The fetched ``AccessToken`` instance, or ``None`` if the request fails. :rtype: Optional[AccessToken] """ async with self.lock: client_id, client_secret = self.get_credentials() self.log.debug(f"Using client_id: {client_id}") connector = TCPConnector(limit=self.jamf_client.max_concurrency) async with ClientSession(connector=connector) as session: payload = { "client_id": client_id, "grant_type": "client_credentials", "client_secret": client_secret, } headers = {"Content-Type": "application/x-www-form-urlencoded"} async with session.post( url=f"{self.jamf_client.base_url}/api/oauth/token", data=payload, headers=headers, ) as resp: try: resp.raise_for_status() json_response = await resp.json() self.log.debug(f"Token response received: {json_response}") except ClientResponseError as e: self.log.error(f"Failed to fetch a token: {e}") return None token = json_response.get("access_token") expires_in = json_response.get("expires_in", 0) if not isinstance(token, str) or expires_in <= 0: self.log.error("Received invalid token response") return None expiration = datetime.now(timezone.utc) + timedelta(seconds=expires_in) access_token = AccessToken(token=token, expires=expiration) self.save_token(token=access_token) self.log.info("New token fetched and saved successfully") return access_token