Source code for patcher.client.api_client

import json
from datetime import datetime
from typing import Dict, List, Optional

from ..models.patch import PatchTitle
from ..utils import exceptions, logger
from ..utils.decorators import check_token
from . import BaseAPIClient
from .config_manager import ConfigManager
from .token_manager import TokenManager


[docs] class ApiClient(BaseAPIClient): """ Provides methods for interacting with the Jamf API, specifically fetching patch data, device information, and OS versions. The ``ApiClient`` manages authentication and session handling, ensuring efficient and secure communication with the Jamf API. """ def __init__(self, config: ConfigManager, concurrency: int): """ Initializes the ApiClient with the provided :class:`~patcher.client.config_manager.ConfigManager`. This sets up the API client with necessary credentials and session parameters for interacting with the Jamf API. .. seealso:: :mod:`~patcher.models.jamf_client` :param config: Instance of ConfigManager for loading and storing credentials. :type config: ConfigManager :raises ValueError: If the JamfClient configuration is invalid. """ self.log = logger.LogMe(self.__class__.__name__) self.config = config self.token_manager = TokenManager(config) # Use for check_token decorator # Creds can be loaded here as ApiClient objects can only exist after successful JamfClient creation. self.jamf_client = config.attach_client() self.jamf_url = self.jamf_client.base_url super().__init__(max_concurrency=concurrency) def _convert_tz(self, utc_time_str: str) -> Optional[str]: """ Converts a UTC time string to a formatted string without timezone information. :param utc_time_str: UTC time string in ISO 8601 format (e.g., "2023-08-09T12:34:56+0000"). :type utc_time_str: str :return: Formatted date string (e.g., "Aug 09 2023") or None if the input format is invalid. :rtype: Optional[str] """ try: utc_time = datetime.strptime(utc_time_str, "%Y-%m-%dT%H:%M:%S%z") return utc_time.strftime("%b %d %Y") except ValueError as e: self.log.error(f"Invalid time format provided. Details: {e}") raise
[docs] @check_token async def get_policies(self) -> Optional[List]: """ Retrieves a list of patch software title IDs from the Jamf API. This function requires a valid authentication token, which is managed automatically. :return: A list of software title IDs or None if an error occurs. :rtype: Optional[List] """ url = f"{self.jamf_url}/api/v2/patch-software-title-configurations" response = await self.fetch_json(url=url, headers=self.jamf_client.headers) # Verify response is list type as expected if not isinstance(response, list): self.log.error( f"Unexpected response format: expected a list, received {type(response)} instead." ) raise TypeError( f"Unexpected response format: expected a list, received {type(response)} instead." ) # Check if all elements in the list are dictionaries if not all(isinstance(item, dict) for item in response): self.log.error("Unexpected response format: all items should be dictionaries.") return None self.log.info("Patch policies obtained as expected.") return [title.get("id") for title in response]
[docs] @check_token async def get_summaries(self, policy_ids: List) -> Optional[List[PatchTitle]]: """ Retrieves patch summaries for the specified policy IDs from the Jamf API. This function fetches data asynchronously and compiles the results into a list of ``PatchTitle`` objects. :param policy_ids: List of policy IDs to retrieve summaries for. :type policy_ids: List :return: List of ``PatchTitle`` objects containing patch summaries or None if an error occurs. :rtype: Optional[List[PatchTitle]] """ urls = [ f"{self.jamf_url}/api/v2/patch-software-title-configurations/{policy}/patch-summary" for policy in policy_ids ] summaries = await self.fetch_batch(urls, headers=self.jamf_client.headers) patch_titles = [ PatchTitle( title=summary.get("title"), released=self._convert_tz(summary.get("releaseDate")), hosts_patched=summary.get("upToDate"), missing_patch=summary.get("outOfDate"), latest_version=summary.get("latestVersion"), ) for summary in summaries if summary ] self.log.info(f"Successfully obtained policy summaries for {len(patch_titles)} policies.") return patch_titles
[docs] @check_token async def get_device_ids(self) -> Optional[List[int]]: """ Asynchronously fetches the list of mobile device IDs from the Jamf Pro API. This method is only called if the :ref:`iOS <ios>` option is passed to the CLI. :return: A list of mobile device IDs or None on error. :rtype: Optional[List[int]] """ url = f"{self.jamf_url}/api/v2/mobile-devices" response = await self.fetch_json(url=url, headers=self.jamf_client.headers) if not response: self.log.error(f"Error fetching device IDs from {url}") raise exceptions.APIResponseError(f"Error fetching device IDs from {url}") devices = response.get("results") if not devices: self.log.error("Received empty data set when trying to obtain device IDs.") raise exceptions.SummaryFetchError( "Received empty data set when trying to obtain device IDs." ) self.log.info(f"Received {len(devices)} device IDs successfully.") return [device.get("id") for device in devices if device]
[docs] @check_token async def get_device_os_versions( self, device_ids: List[int], ) -> Optional[List[Dict[str, str]]]: """ Asynchronously fetches the OS version and serial number for each device ID provided. This method is only called if the :ref:`iOS <ios>` option is passed to the CLI. :param device_ids: A list of mobile device IDs to retrieve information for. :type device_ids: List[int] :return: A list of dictionaries containing the serial numbers and OS versions, or None on error. :rtype: Optional[List[Dict[str, str]]] """ urls = [f"{self.jamf_url}/api/v2/mobile-devices/{device}/detail" for device in device_ids] subsets = await self.fetch_batch(urls, headers=self.jamf_client.headers) if not subsets: self.log.error("Device OS API call returned an empty response.") raise exceptions.APIResponseError("Device OS API call returned an empty response.") devices = [ { "SN": subset.get("serialNumber"), "OS": subset.get("osVersion"), } for subset in subsets if subset ] self.log.info(f"Successfully obtained OS versions for {len(devices)} devices.") return devices
[docs] async def get_sofa_feed(self) -> Optional[List[Dict[str, str]]]: """ Fetches iOS Data feeds from SOFA and extracts latest OS version information. To limit the amount of possible SSL verification checks, this method utilizes a subprocess call instead. This method is only called if the :ref:`iOS <ios>` option is passed to the CLI. :return: A list of dictionaries containing base OS versions, latest iOS versions and release dates, or None on error. :rtype: Optional[List[Dict[str, str]]] """ # Call can be made directly as no additional headers or payloads need to be added command = ["/usr/bin/curl", "-s", "https://sofafeed.macadmins.io/v1/ios_data_feed.json"] result = await self.execute(command) # Convert to JSON for proper parsing result_json = json.loads(result) # Get OS version from response os_versions = result_json.get("OSVersions", []) # Iterate over versions to obtain iOS 16 & iOS 17 datasets latest_versions = [] for version in os_versions: version_info = version.get("Latest", {}) latest_versions.append( { "OSVersion": version.get("OSVersion"), "ProductVersion": version_info.get("ProductVersion"), "ReleaseDate": self._convert_tz(version_info.get("ReleaseDate")), } ) return latest_versions