Source code for patcher.client.api_client

import asyncio
import json
import ssl
import subprocess
from datetime import datetime
from typing import AnyStr, Dict, List, Optional

import aiohttp

from patcher.utils.wrappers import check_token

from ..models.patch import PatchTitle
from ..utils import logger
from .config_manager import ConfigManager
from .token_manager import TokenManager


[docs] class ApiClient: """ Provides methods for interacting with the Jamf API, specifically fetching patch data, device information, and OS versions. The ``ApiClient`` manages authentication and session handling, ensuring efficient and secure communication with the Jamf API. """ def __init__(self, config: ConfigManager): """ Initializes the ApiClient with the provided :class:`~patcher.client.config_manager.ConfigManager`. This sets up the API client with necessary credentials and session parameters for interacting with the Jamf API. .. seealso:: :mod:`~patcher.models.jamf_client` :param config: Instance of ConfigManager for loading and storing credentials. :type config: ConfigManager :raises ValueError: If the JamfClient configuration is invalid. """ self.log = logger.LogMe(self.__class__.__name__) self.log.debug("Initializing ApiClient") self.config = config self.jamf_client = config.attach_client() if self.jamf_client: self.token = self.jamf_client.token self.log.info("JamfClient and token successfully attached") else: self.log.error("Invalid JamfClient configuration detected!") raise ValueError("Invalid JamfClient configuration detected!") self.jamf_url = self.jamf_client.base_url self.headers = { "Accept": "application/json", "Authorization": f"Bearer {self.token}", } self.token_manager = TokenManager(config) self.max_concurrency = self.jamf_client.max_concurrency self.ssl_context = ssl.create_default_context(cafile=self.jamf_client.cafile)
[docs] def convert_timezone(self, utc_time_str: AnyStr) -> Optional[AnyStr]: """ Converts a UTC time string to a formatted string without timezone information. :param utc_time_str: UTC time string in ISO 8601 format (e.g., "2023-08-09T12:34:56+0000"). :type utc_time_str: AnyStr :return: Formatted date string (e.g., "Aug 09 2023") or None if the input format is invalid. :rtype: Optional[AnyStr] :example: .. code-block:: python formatted_date = api_client.convert_timezone("2023-08-09T12:34:56+0000") print(formatted_date) # Outputs: "Aug 09 2023" .. note:: This function is primarily used 'privately' by methods and classes and is not designed to be called explicitly. """ try: utc_time = datetime.strptime(utc_time_str, "%Y-%m-%dT%H:%M:%S%z") time_str = utc_time.strftime("%b %d %Y") return time_str except ValueError as e: self.log.error(f"Invalid time format provided. Details: {e}") return None
[docs] async def fetch_json(self, url: AnyStr, session: aiohttp.ClientSession) -> Optional[Dict]: """ Asynchronously fetches JSON data from a specified URL using a session. :param url: URL to fetch the JSON data from. :type url: AnyStr :param session: An aiohttp.ClientSession instance used to make the request. :type session: aiohttp.ClientSession :return: JSON data as a dictionary or None if an error occurs. :rtype: Optional[Dict] :example: .. code-block:: python async with aiohttp.ClientSession() as session: json_data = await api_client.fetch_json("https://api.example.com/data", session) if json_data: print(json_data) """ self.log.debug(f"Fetching JSON data from URL: {url}") try: async with session.get(url, headers=self.headers, ssl=self.ssl_context) as response: response.raise_for_status() json_data = await response.json() self.log.info(f"Successfully fetched JSON data from {url}") return json_data except aiohttp.ClientResponseError as e: self.log.error(f"Received a client error while fetching JSON from {url}: {e}") except Exception as e: self.log.error(f"Error fetching JSON: {e}") return None
[docs] async def fetch_batch(self, urls: List[AnyStr]) -> List[Optional[Dict]]: """ Fetches JSON data in batches to respect the concurrency limit. Data is fetched from each URL in the provided list, ensuring that no more than ``max_concurrency`` requests are sent concurrently. :param urls: List of URLs to fetch data from. :type urls: List[AnyStr] :return: A list of JSON dictionaries or None for URLs that fail to retrieve data. :rtype: List[Optional[Dict]] """ results = [] async with aiohttp.ClientSession() as session: for i in range(0, len(urls), self.max_concurrency): batch = urls[i : i + self.max_concurrency] tasks = [self.fetch_json(url, session) for url in batch] batch_results = await asyncio.gather(*tasks) results.extend(batch_results) return results
[docs] @check_token async def get_policies(self) -> Optional[List]: """ Retrieves a list of patch software title IDs from the Jamf API. This function requires a valid authentication token, which is managed automatically. :return: A list of software title IDs or None if an error occurs. :rtype: Optional[List] """ async with aiohttp.ClientSession() as session: url = f"{self.jamf_url}/api/v2/patch-software-title-configurations" response = await self.fetch_json(url=url, session=session) # Verify response is list type as expected if not isinstance(response, list): self.log.error( f"Unexpected response format: expected a list, received {type(response)} instead." ) return None # Check if all elements in the list are dictionaries if not all(isinstance(item, dict) for item in response): self.log.error("Unexpected response format: all items should be dictionaries.") return None self.log.info("Patch policies obtained as expected.") return [title.get("id") for title in response]
[docs] @check_token async def get_summaries(self, policy_ids: List) -> Optional[List[PatchTitle]]: """ Retrieves patch summaries for the specified policy IDs from the Jamf API. This function fetches data asynchronously and compiles the results into a list of ``PatchTitle`` objects. :param policy_ids: List of policy IDs to retrieve summaries for. :type policy_ids: List :return: List of ``PatchTitle`` objects containing patch summaries or None if an error occurs. :rtype: Optional[List[PatchTitle]] """ urls = [ f"{self.jamf_url}/api/v2/patch-software-title-configurations/{policy}/patch-summary" for policy in policy_ids ] summaries = await self.fetch_batch(urls) policy_summaries = [ PatchTitle( title=summary.get("title"), released=self.convert_timezone(summary.get("releaseDate")), hosts_patched=summary.get("upToDate"), missing_patch=summary.get("outOfDate"), ) for summary in summaries if summary ] self.log.info( f"Successfully obtained policy summaries for {len(policy_summaries)} policies." ) return policy_summaries
[docs] @check_token async def get_device_ids(self) -> Optional[List[int]]: """ Asynchronously fetches the list of mobile device IDs from the Jamf Pro API. This method is only called if the :ref:`iOS <ios>` option is passed to the CLI. :return: A list of mobile device IDs or None on error. :rtype: Optional[List[int]] """ url = f"{self.jamf_url}/api/v2/mobile-devices" try: async with aiohttp.ClientSession() as session: response = await self.fetch_json(url=url, session=session) except aiohttp.ClientError as e: self.log.error(f"Error fetching device IDs: {e}") return None if not response: self.log.error(f"API call to {url} was unsuccessful.") return None devices = response.get("results") if not devices: self.log.error("Received empty data set when trying to obtain device IDs.") return None self.log.info(f"Received {len(devices)} device IDs successfully.") return [device.get("id") for device in devices if device]
[docs] @check_token async def get_device_os_versions( self, device_ids: List[int], ) -> Optional[List[Dict[AnyStr, AnyStr]]]: """ Asynchronously fetches the OS version and serial number for each device ID provided. This method is only called if the :ref:`iOS <ios>` option is passed to the CLI. :param device_ids: A list of mobile device IDs to retrieve information for. :type device_ids: List[int] :return: A list of dictionaries containing the serial numbers and OS versions, or None on error. :rtype: Optional[List[Dict[AnyStr, AnyStr]]] """ if not device_ids: self.log.error("No device IDs provided!") return None urls = [f"{self.jamf_url}/api/v2/mobile-devices/{device}/detail" for device in device_ids] subsets = await self.fetch_batch(urls) if not subsets: self.log.error("Received empty response obtaining device OS information.") return None devices = [ { "SN": subset.get("serialNumber"), "OS": subset.get("osVersion"), } for subset in subsets if subset ] self.log.info(f"Successfully obtained OS versions for {len(devices)} devices.") return devices
[docs] def get_sofa_feed(self) -> Optional[List[Dict[AnyStr, AnyStr]]]: """ Fetches iOS Data feeds from SOFA and extracts latest OS version information. To limit the amount of possible SSL verification checks, this method utilizes a subprocess call instead. This method is only called if the :ref:`iOS <ios>` option is passed to the CLI. :return: A list of dictionaries containing base OS versions, latest iOS versions and release dates, or None on error. :rtype: Optional[List[Dict[AnyStr, AnyStr]]] """ # Utilize curl to avoid SSL Verification errors for end-users on managed devices command = "curl -s 'https://sofafeed.macadmins.io/v1/ios_data_feed.json'" try: result = subprocess.run(command, shell=True, capture_output=True, text=True, check=True) except (subprocess.CalledProcessError, aiohttp.ClientResponseError) as e: self.log.error(f"Encountered error executing subprocess command: {e}") return None try: data = json.loads(result.stdout) except json.JSONDecodeError as e: self.log.error(f"Error decoding JSON data: {e}") return None os_versions = data.get("OSVersions", []) latest_versions = [] for version in os_versions: version_info = version.get("Latest", {}) latest_versions.append( { "OSVersion": version.get("OSVersion"), "ProductVersion": version_info.get("ProductVersion"), "ReleaseDate": self.convert_timezone(version_info.get("ReleaseDate")), } ) return latest_versions