PK!Rsimple_ado/__init__.py#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. """ADO API wrapper.""" import logging from typing import Dict, Optional, Tuple from simple_ado.builds import ADOBuildClient from simple_ado.context import ADOContext from simple_ado.exceptions import ADOException from simple_ado.git import ADOGitClient from simple_ado.http_client import ADOHTTPClient, ADOResponse from simple_ado.pull_requests import ADOPullRequestClient from simple_ado.security import ADOSecurityClient from simple_ado.user import ADOUserClient from simple_ado.workitems import ADOWorkItemsClient class ADOClient: """Wrapper class around the ADO API. :param str username: The username for the user who is accessing the API :param str tenant: The ADO tenant to connect to :param str project_id: The identifier for the project :param str repository_id: The identifier for the repository :param Tuple[str,str] credentials: The credentials to use for the API connection :param str status_context: The context for any statuses placed on a PR or commit :param Optional[Dict[str,str]] extra_headers: Any extra headers which should be sent with the API requests :param Optional[logging.Logger] log: The logger to use for logging (a new one will be used if one is not supplied) """ # pylint: disable=too-many-instance-attributes log: logging.Logger _context: ADOContext _http_client: ADOHTTPClient git: ADOGitClient builds: ADOBuildClient security: ADOSecurityClient user: ADOUserClient workitems: ADOWorkItemsClient def __init__( self, *, username: str, tenant: str, project_id: str, repository_id: str, credentials: Tuple[str, str], status_context: str, extra_headers: Optional[Dict[str, str]] = None, log: Optional[logging.Logger] = None, ) -> None: """Construct a new client object.""" if log is None: self.log = logging.getLogger("ado") else: self.log = log.getChild("ado") self._context = ADOContext( username=username, repository_id=repository_id, status_context=status_context ) self._http_client = ADOHTTPClient( tenant=tenant, project_id=project_id, credentials=credentials, log=self.log, extra_headers=extra_headers, ) self.git = ADOGitClient(self._context, self._http_client, self.log) self.builds = ADOBuildClient(self._context, self._http_client, self.log) self.security = ADOSecurityClient(self._context, self._http_client, self.log) self.user = ADOUserClient(self._context, self._http_client, self.log) self.workitems = ADOWorkItemsClient(self._context, self._http_client, self.log) def verify_access(self) -> bool: """Verify that we have access to ADO. :returns: True if we have access, False otherwise """ request_url = f"{self._http_client.base_url()}/git/repositories?api-version=1.0" try: response = self._http_client.get(request_url) response_data = self._http_client.decode_response(response) self._http_client.extract_value(response_data) except ADOException: return False return True def pull_request(self, pull_request_id: int) -> ADOPullRequestClient: """Get an ADOPullRequestClient for the PR identifier. :param pull_request_id: The ID of the pull request to create the client for :returns: A new ADOPullRequest client for the pull request specified """ return ADOPullRequestClient(self._context, self._http_client, self.log, pull_request_id) def list_all_pull_requests(self, *, branch_name: Optional[str] = None) -> ADOResponse: """Get the pull requests for a branch from ADO. :param branch_name: The name of the branch to fetch the pull requests for. :type branch_name: Optional[str] :returns: The ADO Response with the pull request data """ self.log.debug("Fetching PRs") request_url = f"{self._http_client.base_url()}/git/repositories/{self._context.repository_id}/pullRequests?" if branch_name is not None: if not branch_name.startswith("refs/heads/"): branch_name = "refs/heads/" + branch_name request_url += f"sourceRefName={branch_name}&" request_url += "api-version=3.0-preview" response = self._http_client.get(request_url) response_data = self._http_client.decode_response(response) return self._http_client.extract_value(response_data) PK!asimple_ado/base_client.py#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. """Base ADO Client.""" import logging from simple_ado.context import ADOContext from simple_ado.http_client import ADOHTTPClient class ADOBaseClient: """Base context for ADO API. :param context: The context information for the client :param http_client: The HTTP client to use for the client :param log: The logger to use """ log: logging.Logger http_client: ADOHTTPClient context: ADOContext def __init__( self, context: ADOContext, http_client: ADOHTTPClient, log: logging.Logger ) -> None: """Construct a new base client object.""" self.log = log self._context = context self._http_client = http_client PK!6w simple_ado/builds.py#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. """ADO build API wrapper.""" import json import logging from typing import Dict, Optional from simple_ado.base_client import ADOBaseClient from simple_ado.context import ADOContext from simple_ado.http_client import ADOHTTPClient, ADOResponse from simple_ado.types import TeamFoundationId class ADOBuildClient(ADOBaseClient): """Wrapper class around the ADO Build APIs. :param context: The context information for the client :param http_client: The HTTP client to use for the client :param log: The logger to use """ def __init__( self, context: ADOContext, http_client: ADOHTTPClient, log: logging.Logger ) -> None: super().__init__(context, http_client, log.getChild("build")) def queue_build( self, definition_id: int, source_branch: str, variables: Dict[str, str], requesting_identity: Optional[TeamFoundationId] = None ) -> ADOResponse: """Queue a new build. :param definition_id: The identity of the build definition to queue (can be a string) :param source_branch: The source branch for the build :param variables: A dictionary of variables to pass to the definition :param requesting_identity: The identity of the user who requested the build be queued :returns: The ADO response with the data in it """ request_url = f"{self._http_client.base_url()}/build/builds?api-version=4.1" variable_json = json.dumps(variables) self.log.debug(f"Queueing build ({definition_id}): {variable_json}") body = { "parameters": variable_json, "definition": {"id": definition_id}, "sourceBranch": source_branch, } if requesting_identity: body["requestedFor"] = requesting_identity response = self._http_client.post(request_url, json_data=body) return self._http_client.decode_response(response) def build_info(self, build_id: int) -> ADOResponse: """Get the info for a build. :param int build_id: The identifier of the build to get the info for :returns: The ADO response with the data in it """ request_url = f"{self._http_client.base_url()}/build/builds/{build_id}?api-version=4.1" response = self._http_client.get(request_url) return self._http_client.decode_response(response) PK!D6J<<simple_ado/comments.py#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. """ADO comment utilities""" import enum from typing import Any, Dict, Optional, Union class ADOCommentStatus(enum.Enum): """Possible values of comment statuses.""" ACTIVE: int = 1 FIXED: int = 2 WONT_FIX: int = 3 CLOSED: int = 4 BY_DESIGN: int = 5 PENDING: int = 6 class ADOCommentProperty: """The properties a comment can take.""" SUPPORTS_MARKDOWN = "Microsoft.TeamFoundation.Discussion.SupportsMarkdown" # This is just a unique property value to show that a comment was created by # this library. It allows for easy cleanup later. COMMENT_IDENTIFIER = "3533F9EC-9336-4290-85F7-6A6A51AD1861" @staticmethod def create_value(type_name: str, value: Union[int, str]) -> Dict[str, Any]: """Create a new property value. :param type_name: The type of property :param value: The value of the property :returns: A dictionary containing the raw API data """ return {"type": type_name, "value": value} @staticmethod def create_string(value: str) -> Dict[str, Any]: """Create a new string value. :param value: The value of the property :returns: A dictionary containing the raw API data """ return ADOCommentProperty.create_value("System.String", value) @staticmethod def create_int(value: int) -> Dict[str, Any]: """Create a new integer value. :param value: The value of the property :returns: A dictionary containing the raw API data """ return ADOCommentProperty.create_value("System.Int32", value) @staticmethod def create_bool(value: bool) -> Dict[str, Any]: """Create a new boolean value. :param value: The value of the property :returns: A dictionary containing the raw API data """ return ADOCommentProperty.create_int(1 if value else 0) class ADOCommentLocation: """Represents the location of a comment in a PR. :param str file_path: The location of the file, relative to the repository. :param int line: The line the comment should start on. :param start_index: The location on the line that the comment should start :type start_index: int or None """ file_path: str line: int start_index: Optional[int] def __init__(self, file_path: str, line: int, start_index: Optional[int] = None) -> None: """Construct a new comment location.""" self.file_path = file_path self.line = line self.start_index = start_index if not self.file_path.startswith("/"): # On some machines this is missing, some it's there self.file_path = "/" + self.file_path def generate_representation(self) -> Dict[str, Any]: """Generate the ADO API representation of a comment location. :returns: A dictionary containing the raw API data """ return { "filePath": self.file_path, "leftFileStart": None, "leftFileEnd": None, "rightFileStart": { "line": self.line, "offset": 0 if self.start_index is None else self.start_index, }, "rightFileEnd": { "line": self.line, "offset": 9999, # We have no way to get the actual length, so let's just pick a high number. }, } class ADOComment: """Represents a ADO comment.""" content: str location: Optional[ADOCommentLocation] parent_id: int def __init__( self, content: str, location: Optional[ADOCommentLocation] = None, parent_id: int = 0 ) -> None: """Construct a new comment. :param str content: The message which should be on the comment :param ADOCommentLocation location: The location to place the comment :param int parent_id: The ID of the parent comment (0 if a root comment) """ self.content = content self.parent_id = parent_id self.location = location def generate_representation(self) -> Dict[str, Any]: """Generate the ADO API representation of this comment. :returns: A dictionary containing the raw API data """ return { "parentCommentId": self.parent_id, "content": self.content, "commentType": 1, # Always set to 1 according to documentation } def __str__(self) -> str: """Generate and return the string representation of the object. :return: A string representation of the object """ details = self.generate_representation() if self.location is None: details["location"] = None else: details["location"] = self.location.generate_representation() return str(details) PK!tsimple_ado/context.py#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. """ADO Context object.""" class ADOContext: """Context information for the ADO Client. :param str username: The username of the bot account :param str repository_id: The ID of the repository :param str status_context: The name of the status context to use for build status notifications """ username: str repository_id: str status_context: str def __init__(self, *, username: str, repository_id: str, status_context: str) -> None: """Construct a new client object.""" self.username = username self.repository_id = repository_id self.status_context = status_context PK!ACsimple_ado/exceptions.py#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. """ADO exceptions.""" class ADOException(Exception): """All ADO exceptions inherit from this or instantiate it.""" class ADOHTTPException(ADOException): """All ADO HTTP exceptions use this class. :param message: The message for the exception :param status_code: The HTTP status code causing this exception :param text: The text of the response causing this exception """ message: str status_code: int text: str def __init__(self, message: str, status_code: int, text: str) -> None: super().__init__() self.message = message self.status_code = status_code self.text = text def __str__(self) -> str: """Generate and return the string representation of the object. :return: A string representation of the object """ return f"{self.message}, status_code={self.status_code}, text={self.text}" PK! [qsimple_ado/git.py#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. """ADO Git API wrapper.""" import enum import logging import os from typing import Optional import urllib.parse import requests from simple_ado.base_client import ADOBaseClient from simple_ado.context import ADOContext from simple_ado.exceptions import ADOException, ADOHTTPException from simple_ado.http_client import ADOHTTPClient, ADOResponse class ADOGitStatusState(enum.Enum): """Possible values of git status states.""" NOT_SET: str = "notSet" NOT_APPLICABLE: str = "notApplicable" PENDING: str = "pending" SUCCEEDED: str = "succeeded" FAILED: str = "failed" ERROR: str = "error" class ADOGitClient(ADOBaseClient): """Wrapper class around the ADO Git APIs. :param context: The context information for the client :param http_client: The HTTP client to use for the client :param log: The logger to use """ def __init__( self, context: ADOContext, http_client: ADOHTTPClient, log: logging.Logger ) -> None: super().__init__(context, http_client, log.getChild("git")) def all_repositories(self) -> ADOResponse: """Get a list of repositories in the project. :returns: The ADO response with the data in it """ self.log.debug("Getting repositories") request_url = f"{self._http_client.base_url()}/git/repositories/?api-version=1.0" response = self._http_client.get(request_url) response_data = self._http_client.decode_response(response) return self._http_client.extract_value(response_data) def get_status(self, sha: str) -> ADOResponse: """Set a status on a PR. :param str sha: The SHA of the commit to add the status to. :returns: The ADO response with the data in it :raises ADOException: If the SHA is not the full version """ self.log.debug(f"Getting status for sha: {sha}") if len(sha) != 40: raise ADOException("The SHA for a commit must be the full 40 character version") request_url = f"{self._http_client.base_url()}/git/repositories/{self._context.repository_id}/commits/{sha}/" request_url += "statuses?api-version=2.1" response = self._http_client.get(request_url) response_data = self._http_client.decode_response(response) return self._http_client.extract_value(response_data) def set_status( self, sha: str, state: ADOGitStatusState, identifier: str, description: str, *, target_url: Optional[str] = None, ) -> ADOResponse: """Set a status on a PR. :param str sha: The SHA of the commit to add the status to. :param ADOGitStatusState state: The state to set the status to. :param str identifier: A unique identifier for the status (so it can be changed later) :param str description: The text to show in the status :param target_url: An optional URL to set which is opened when the description is clicked. :type target_url: str or None :returns: The ADO response with the data in it :raises ADOException: If the SHA is not the full version, or the state is set to NOT_SET """ self.log.debug(f"Setting status ({state}) on sha ({sha}): {identifier} -> {description}") if len(sha) != 40: raise ADOException("The SHA for a commit must be the full 40 character version") if state == ADOGitStatusState.NOT_SET: raise ADOException("The NOT_SET state cannot be used for statuses on commits") request_url = f"{self._http_client.base_url()}/git/repositories/{self._context.repository_id}/commits/{sha}/" request_url += "statuses?api-version=2.1" body = { "state": state.value, "description": description, "context": {"name": self._context.status_context, "genre": identifier}, } if target_url is not None: body["targetUrl"] = target_url response = self._http_client.post(request_url, json_data=body) return self._http_client.decode_response(response) def diff_between_commits(self, base_commit: str, target_commit: str) -> ADOResponse: """Get the diff between two commits. :param base_commit: The full hash of the base commit to perform the diff against. :param target_commit: The full hash of the commit to perform the diff of. :type base_commit: str :type target_commit: str :returns: The ADO response with the data in it """ self.log.debug(f"Fetching commit diff: {base_commit}..{target_commit}") request_url = f"{self._http_client.base_url()}/git/repositories/{self._context.repository_id}/diffs/commits?" request_url += f"api-version=1.0" request_url += f"&baseVersionType=commit" request_url += f"&baseVersion={base_commit}" request_url += f"&targetVersionType=commit" request_url += f"&targetVersion={target_commit}" response = self._http_client.get(request_url) return self._http_client.decode_response(response) def download_zip(self, branch: str, output_path: str) -> None: """Download the zip of the branch specified. :param str branch: The name of the branch to download. :param str output_path: The path to write the output to. :raises ADOException: If the output path already exists :raises ADOHTTPException: If we fail to fetch the zip for any reason """ self.log.debug(f"Downloading branch: {branch}") request_url = ( f"{self._http_client.base_url()}/git/repositories/{self._context.repository_id}/Items?" ) parameters = { "path": "/", "versionDescriptor[versionOptions]": "0", "versionDescriptor[versionType]": "0", "versionDescriptor[version]": branch, "resolveLfs": "true", "$format": "zip", "api-version": "5.0-preview.1", } request_url += urllib.parse.urlencode(parameters) if os.path.exists(output_path): raise ADOException("The output path already exists") with requests.get( request_url, auth=self._http_client.credentials, headers=self._http_client.construct_headers(), stream=True, ) as response: chunk_size = 1024 * 16 if response.status_code < 200 or response.status_code >= 300: raise ADOHTTPException("Failed to fetch zip", response.status_code, response.text) with open(output_path, "wb") as output_file: content_length_string = response.headers.get("content-length", "0") total_size = int(content_length_string) total_downloaded = 0 for data in response.iter_content(chunk_size=chunk_size): total_downloaded += len(data) output_file.write(data) if total_size != 0: progress = int((total_downloaded * 100.0) / total_size) self.log.info(f"Download progress: {progress}%") PK!f*&2&2simple_ado/http_client.py#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. """ADO HTTP API wrapper.""" import logging import os import time from typing import Any, Callable, Dict, Optional, Tuple import requests from simple_ado.exceptions import ADOException, ADOHTTPException # pylint: disable=invalid-name ADOThread = Dict[str, Any] ADOResponse = Any # pylint: enable=invalid-name def exception_retry( max_attempts: int = 3, initial_delay: float = 5, backoff_factor: int = 2, should_retry: Optional[Callable[[Exception], bool]] = None, ) -> Callable: """Retry an API call at many times as required with a backoff factor. :param max_attempts: The maximum number of times we should attempt to retry :param initial_delay: How long we should wait after the first failure :param backoff_factor: What factor should we increase the delay by for each failure :param should_retry: A function which will be called with the exception to determine if we should retry or not :returns: A wrapped function :raises ValueError: If any of the inputs are invalid """ if max_attempts < 1: raise ValueError("max_attempts must be 1 or more") if initial_delay < 0: raise ValueError("initial_delay must be a positive (or 0) float") if backoff_factor < 1: raise ValueError("backoff_factor must be a postive integer greater than or equal to 1") def decorator(function: Callable) -> Callable: """Decorator function :param function: The function to wrap :returns: The wrapped function """ def wrapper(*args: Any, **kwargs: Any) -> Any: """The wrapper around the function. :param args: The unnamed arguments to the function :param kwargs: The named arguments to the function :returns: Whatever value the original function returns on success :raises ADOHTTPException: Any exception that the wrapped function raises :raises Exception: Any exception that the wrapped function raises """ # Check for a log argument to the function first log = kwargs.get("log") if log is not None: log = log.getChild("retry") # If we still don't have one, check if the function is on an object # and use the log variable from that if possible if log is None: try: # This works around a mypy issue any_self: Any = args[0] log = any_self.log except AttributeError: pass # If we still have nothing, create a new logger if log is None: log = logging.getLogger("ado.retry") remaining_attempts = max_attempts current_delay = initial_delay while remaining_attempts > 1: exception_reference: Optional[Exception] = None try: return function(*args, **kwargs) except ADOHTTPException as ex: # We can't retry non-server errors if ex.status_code < 500: raise if should_retry is not None and not should_retry(ex): log.error("ADO API call failed. Not retrying.") raise exception_reference = ex except Exception as ex: if should_retry is not None and not should_retry(ex): log.error(f"ADO API call failed. Not retrying.") raise exception_reference = ex time.sleep(current_delay) remaining_attempts -= 1 current_delay *= backoff_factor log.warning( f"ADO API call failed. Retrying in {current_delay} seconds: {exception_reference}" ) return function(*args, **kwargs) return wrapper return decorator class ADOHTTPClient: """Base class that actually makes API calls to Azure DevOps. :param str tenant: The name of the ADO tenant to connect to :param str project_id: The ID of the ADO project to connect to :param Dict[str,str] extra_headers: Any extra headers which should be added to each request :param Tuple[str,str] credentials: The credentials which should be used for authentication :param logging.Logger log: The logger to use for logging """ log: logging.Logger tenant: str project_id: str extra_headers: Dict[str, str] credentials: Tuple[str, str] def __init__( self, *, tenant: str, project_id: str, credentials: Tuple[str, str], log: logging.Logger, extra_headers: Optional[Dict[str, str]] = None, ) -> None: """Construct a new client object.""" self.log = log.getChild("http") self.tenant = tenant self.project_id = project_id self.credentials = credentials if extra_headers is None: self.extra_headers = {} else: self.extra_headers = extra_headers def base_url( self, *, is_default_collection: bool = True, is_project: bool = True, is_internal: bool = False, ) -> str: """Generate the base url for all API calls (this varies depending on the API). :param bool is_default_collection: Whether this URL should start with the path "/DefaultCollection" :param bool is_project: Whether this URL should scope down to include `project_id` :param bool is_internal: Whether this URL should use internal API endpoint "/_api" :returns: The constructed base URL """ url = f"https://{self.tenant}.visualstudio.com" if is_default_collection: url += "/DefaultCollection" if is_project: url += f"/{self.project_id}" if is_internal: url += "/_api" else: url += "/_apis" return url @exception_retry( should_retry=lambda exception: isinstance(exception, ADOHTTPException) and exception.status_code not in range(400, 500) ) def get( self, request_url: str, *, additional_headers: Optional[Dict[str, str]] = None ) -> requests.Response: """Issue a GET request with the correct credentials and headers. :param str request_url: The URL to issue the request to :param Optional[Dict[str,str]] additional_headers: Any additional headers to add to the request :returns: The raw response object from the API """ headers = self.construct_headers(additional_headers=additional_headers) return requests.get(request_url, auth=self.credentials, headers=headers) @exception_retry( should_retry=lambda exception: "Operation timed out" in str(exception) or "Connection aborted." in str(exception) or "bad handshake: " in str(exception) ) def post( self, request_url: str, *, additional_headers: Optional[Dict[str, str]] = None, json_data: Optional[Dict[str, Any]] = None, ) -> requests.Response: """Issue a POST request with the correct credentials and headers. :param str request_url: The URL to issue the request to :param Optional[Dict[str,str]] additional_headers: Any additional headers to add to the request :param Optional[Dict[str,Any]] json_data: The JSON data to send with the request :returns: The raw response object from the API """ headers = self.construct_headers(additional_headers=additional_headers) return requests.post(request_url, auth=self.credentials, headers=headers, json=json_data) def patch( self, request_url: str, json_data: Optional[Any] = None, *, additional_headers: Optional[Dict[str, Any]] = None, ) -> requests.Response: """Issue a PATCH request with the correct credentials and headers. :param str request_url: The URL to issue the request to :param Optional[Dict[str,str]] additional_headers: Any additional headers to add to the request :param Optional[Dict[str,Any]] json_data: The JSON data to send with the request :returns: The raw response object from the API """ headers = self.construct_headers(additional_headers=additional_headers) return requests.patch(request_url, auth=self.credentials, headers=headers, json=json_data) def delete( self, request_url: str, *, additional_headers: Optional[Dict[str, Any]] = None ) -> requests.Response: """Issue a DELETE request with the correct credentials and headers. :param str request_url: The URL to issue the request to :param Optional[Dict[str,str]] additional_headers: Any additional headers to add to the request :returns: The raw response object from the API """ headers = self.construct_headers(additional_headers=additional_headers) return requests.delete(request_url, auth=self.credentials, headers=headers) def post_file( self, request_url: str, file_path: str, *, additional_headers: Optional[Dict[str, Any]] = None, ) -> requests.Response: """POST a file to the URL with the given file name. :param str request_url: The URL to issue the request to :param str file_path: The path to the file to be posted :param Optional[Dict[str,str]] additional_headers: Any additional headers to add to the request :returns: The raw response object from the API""" file_size = os.path.getsize(file_path) headers = self.construct_headers(additional_headers=additional_headers) headers["Content-Length"] = str(file_size) headers["Content-Type"] = "application/json" request = requests.Request("POST", request_url, headers=headers, auth=self.credentials) prepped = request.prepare() # Send the raw content, not with "Content-Disposition", etc. with open(file_path, "rb") as file_handle: prepped.body = file_handle.read(file_size) session = requests.Session() response: requests.Response = session.send(prepped) return response def decode_response(self, response: requests.models.Response) -> ADOResponse: """Decode the response from ADO, checking for errors. :param response: The response to check and parse :returns: The JSON data from the ADO response :raises ADOHTTPException: Raised if the request returned a non-200 status code :raises ADOException: Raise if the response was not JSON """ self.log.debug("Fetching response from ADO") if response.status_code < 200 or response.status_code >= 300: raise ADOHTTPException( f"ADO returned a non-200 status code, configuration={self}", response.status_code, response.text, ) try: content: ADOResponse = response.json() except: raise ADOException("The response did not contain JSON") return content def extract_value(self, response_data: ADOResponse) -> ADOResponse: """Extract the "value" from the raw JSON data from an API response :param response_data: The raw JSON data from an API response :returns: The ADO response with the data in it :raises ADOException: If the response is invalid (does not support value extraction) """ self.log.debug("Extracting value") try: value: ADOResponse = response_data["value"] return value except: raise ADOException("The response was invalid (did not contain a value).") def construct_headers( self, *, additional_headers: Optional[Dict[str, str]] = None ) -> Dict[str, str]: """Contruct the headers used for a request, adding anything additional. :param Optional[Dict[str,str]] additional_headers: A dictionary of the additional headers to add. :returns: A dictionary of the headers for a request """ headers = {"Accept": "application/json"} for header_name, header_value in self.extra_headers: headers[header_name] = header_value if additional_headers is None: return headers for header_name, header_value in additional_headers: headers[header_name] = header_value return headers PK!ȱ_Y00simple_ado/pull_requests.py#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. """ADO Pull Request API wrapper.""" import logging from typing import Any, Dict, List, Optional import requests from simple_ado.base_client import ADOBaseClient from simple_ado.comments import ( ADOComment, ADOCommentLocation, ADOCommentProperty, ADOCommentStatus, ) from simple_ado.context import ADOContext from simple_ado.exceptions import ADOException from simple_ado.git import ADOGitStatusState from simple_ado.http_client import ADOHTTPClient, ADOResponse, ADOThread class ADOPullRequestClient(ADOBaseClient): """Wrapper class around the ADO Pull Request APIs. :param context: The context information for the client :param http_client: The HTTP client to use for the client :param log: The logger to use :param pull_request_id: The ID of the pull request """ pull_request_id: int def __init__( self, context: ADOContext, http_client: ADOHTTPClient, log: logging.Logger, pull_request_id: int, ) -> None: self.pull_request_id = pull_request_id super().__init__(context, http_client, log.getChild(f"pr.{pull_request_id}")) def details(self) -> ADOResponse: """Get the details for the PR from ADO. :returns: The ADO response with the data in it """ self.log.debug(f"Getting PR: {self.pull_request_id}") request_url = f"{self._http_client.base_url()}/git/repositories/{self._context.repository_id}" request_url += f"/pullRequests/{self.pull_request_id}?api-version=3.0-preview" response = self._http_client.get(request_url) return self._http_client.decode_response(response) def workitems(self) -> ADOResponse: """Get the workitems associated with the PR from ADO. :returns: The ADO response with the data in it """ self.log.debug(f"Getting workitems: {self.pull_request_id}") request_url = f"{self._http_client.base_url()}/git/repositories/{self._context.repository_id}" request_url += f"/pullRequests/{self.pull_request_id}/workitems?api-version=5.0" response = self._http_client.get(request_url) return self._http_client.decode_response(response) def get_threads(self, *, include_deleted: bool = False) -> List[ADOThread]: """Get the comments on the PR from ADO. :param bool include_deleted: Set to True if deleted threads should be included. :returns: A list of ADOThreads that were found """ self.log.debug(f"Getting threads: {self.pull_request_id}") request_url = f"{self._http_client.base_url()}/git/repositories/{self._context.repository_id}" request_url += f"/pullRequests/{self.pull_request_id}/threads?api-version=3.0-preview" response = self._http_client.get(request_url) response_data = self._http_client.decode_response(response) comments: List[ADOThread] = self._http_client.extract_value(response_data) if include_deleted: return comments return [comment for comment in comments if comment["isDeleted"] is False] def create_comment_with_text( self, comment_text: str, *, comment_location: Optional[ADOCommentLocation] = None, status: Optional[ADOCommentStatus] = None, ) -> ADOResponse: """Create a thread using a single root comment. :param str comment_text: The text to set in the comment. :param Optional[ADOCommentLocation] comment_location: The location to place the comment. :param Optional[ADOCommentStatus] status: The status of the comment :returns: The ADO response with the data in it """ self.log.debug(f"Creating comment: ({self.pull_request_id}) {comment_text}") comment = ADOComment(comment_text, comment_location) return self.create_comment(comment, status=status) def create_comment( self, comment: ADOComment, *, status: Optional[ADOCommentStatus] = None ) -> ADOResponse: """Create a thread using a single root comment. :param ADOComment comment: The comment to add. :param Optional[ADOCommentStatus] status: The status of the comment :returns: The ADO response with the data in it """ self.log.debug(f"Creating comment: ({self.pull_request_id}) {comment}") return self.create_thread( [comment.generate_representation()], thread_location=comment.location, status=status ) def create_thread( self, comments: List[Dict[str, Any]], *, thread_location: Optional[ADOCommentLocation] = None, status: Optional[ADOCommentStatus] = None, comment_identifier: str = "", ) -> ADOResponse: """Create a thread on a PR. :param List[Dict[str,Any]] comments: The comments to add to the thread. :param Optional[ADOCommentLocation] thread_location: The location the thread should be added :param Optional[ADOCommentStatus] status: The status of the comment :param Optional[ADOCommentStatus] comment_identifier: A unique identifier for the comment that can be used for identification at a later date :returns: The ADO response with the data in it """ self.log.debug(f"Creating thread ({self.pull_request_id})") request_url = f"{self._http_client.base_url()}/git/repositories/{self._context.repository_id}" request_url += f"/pullRequests/{self.pull_request_id}/threads?api-version=3.0-preview" properties = { ADOCommentProperty.SUPPORTS_MARKDOWN: ADOCommentProperty.create_bool(True), ADOCommentProperty.COMMENT_IDENTIFIER: ADOCommentProperty.create_string(comment_identifier), } body = { "comments": comments, "properties": properties, "status": status.value if status is not None else ADOCommentStatus.ACTIVE.value, } if thread_location is not None: body["threadContext"] = thread_location.generate_representation() response = self._http_client.post(request_url, json_data=body) return self._http_client.decode_response(response) def delete_thread(self, thread: ADOThread) -> None: """Delete a comment thread from a pull request. :param thread: The thread to delete """ thread_id = thread["id"] self.log.debug(f"Deleting thread: ({self.pull_request_id}) {thread_id}") for comment in thread["comments"]: comment_id = comment["id"] self.log.debug(f"Deleting comment: {comment_id}") request_url = ( f"{self._http_client.base_url()}/git/repositories/{self._context.repository_id}" ) request_url += f"/pullRequests/{self.pull_request_id}/threads/{thread_id}" request_url += f"/comments/{comment_id}?api-version=3.0-preview" requests.delete( request_url, auth=self._http_client.credentials, headers=self._http_client.construct_headers(), ) def create_thread_list(self, threads: List[ADOComment]) -> None: """Create a list of threads :param List[ADOComment] threads: The threads to create :raises ADOException: If a thread is not an ADO comment """ self.log.debug(f"Setting threads on PR: {self.pull_request_id}") # Check the type of the input for thread in threads: if not isinstance(thread, ADOComment): raise ADOException("Thread was not an ADOComment: " + str(thread)) for thread in threads: self.log.debug("Adding thread") self.create_comment(thread) def set_status( self, state: ADOGitStatusState, identifier: str, description: str, *, iteration: Optional[int] = None, target_url: Optional[str] = None, ) -> ADOResponse: """Set a status on a PR. :param ADOGitStatusState state: The state to set the status to. :param str identifier: A unique identifier for the status (so it can be changed later) :param str description: The text to show in the status :param Optional[int] iteration: The iteration of the PR to set the status on :param Optional[str] target_url: An optional URL to set which is opened when the description is clicked. :returns: The ADO response with the data in it """ self.log.debug( f"Setting PR status ({state}) on PR ({self.pull_request_id}): {identifier} -> {description}" ) request_url = f"{self._http_client.base_url()}/git/repositories/{self._context.repository_id}" request_url += f"/pullRequests/{self.pull_request_id}/statuses?api-version=4.0-preview" body = { "state": state.value, "description": description, "context": {"name": self._context.status_context, "genre": identifier}, } if iteration is not None: body["iterationId"] = iteration if target_url is not None: body["targetUrl"] = target_url response = self._http_client.post(request_url, json_data=body) return self._http_client.decode_response(response) def _thread_matches_identifier(self, thread: ADOThread, identifier: str) -> bool: """Check if the ADO thread matches the user and identifier :param thread: The thread to check :param identifier: The identifier to check against :returns: True if the thread matches, False otherwise :raises ADOException: If we couldn't find the author or the properties """ try: # Deleted threads can stay around if they have other comments, so we # check if it was deleted before we check anything else. if thread["comments"][0]["isDeleted"]: return False except Exception: # If it's not there, it's not set pass try: # It wasn't one of the specified users comments if thread["comments"][0]["author"]["uniqueName"] != self._context.username: return False except: raise ADOException( "Could not find comments.0.author.uniqueName in thread: " + str(thread) ) try: properties = thread["properties"] except: raise ADOException("Could not find properties in thread: " + str(thread)) if properties is None: return False comment_identifier = properties.get(ADOCommentProperty.COMMENT_IDENTIFIER) if comment_identifier is None: return False value = comment_identifier.get("$value") if value == identifier: return True return False def threads_with_identifier(self, identifier: str) -> List[ADOThread]: """Get the threads on a PR which begin with the prefix specified. :param str identifier: The identifier to look for threads with :returns: The list of threads matching the identifier :raises ADOException: If the response is in an unexpected format """ self.log.debug( f'Fetching threads with identifier "{identifier}" on PR {self.pull_request_id}' ) matching_threads = [] for thread in self.get_threads(): self.log.debug("Handling thread...") if self._thread_matches_identifier(thread, identifier): matching_threads.append(thread) return matching_threads def delete_threads_with_identifier(self, identifier: str) -> None: """Delete the threads on a PR which begin with the prefix specified. :param str identifier: The identifier property value to look for threads matching """ self.log.debug( f'Deleting threads with identifier "{identifier}" on PR {self.pull_request_id}' ) for thread in self.threads_with_identifier(identifier): self.log.debug(f"Deleting thread: {thread}") self.delete_thread(thread) PK!nT :0:0simple_ado/security.py#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. """ADO security API wrapper.""" import enum import json import logging from typing import ClassVar, Dict, List import urllib.parse from simple_ado.base_client import ADOBaseClient from simple_ado.context import ADOContext from simple_ado.exceptions import ADOException from simple_ado.http_client import ADOHTTPClient, ADOResponse from simple_ado.types import TeamFoundationId class ADOBranchPermission(enum.IntEnum): """Possible types of git branch permissions.""" ADMINISTER: int = 2 ** 0 READ: int = 2 ** 1 CONTRIBUTE: int = 2 ** 2 FORCE_PUSH: int = 2 ** 3 CREATE_BRANCH: int = 2 ** 4 CREATE_TAG: int = 2 ** 5 MANAGE_NOTES: int = 2 ** 6 BYPASS_PUSH_POLICIES: int = 2 ** 7 CREATE_REPOSITORY: int = 2 ** 8 DELETE_REPOSITORY: int = 2 ** 9 RENAME_REPOSITORY: int = 2 ** 10 EDIT_POLICIES: int = 2 ** 11 REMOVE_OTHERS_LOCKS: int = 2 ** 12 MANAGE_PERMISSIONS: int = 2 ** 13 CONTRIBUTE_TO_PULL_REQUESTS: int = 2 ** 14 BYPASS_PULL_REQUEST_POLICIES: int = 2 ** 15 class ADOBranchPermissionLevel(enum.IntEnum): """Possible values of git branch permissions.""" NOT_SET: int = 0 ALLOW: int = 1 DENY: int = 2 class ADOBranchPolicy(enum.Enum): """Possible types of git branch protections.""" APPROVAL_COUNT: str = "fa4e907d-c16b-4a4c-9dfa-4906e5d171dd" BUILD: str = "0609b952-1397-4640-95ec-e00a01b2c241" REQUIRED_REVIEWERS: str = "fd2167ab-b0be-447a-8ec8-39368250530e" CASE_ENFORCEMENT: str = "7ed39669-655c-494e-b4a0-a08b4da0fcce" MAXIMUM_BLOB_SIZE: str = "2e26e725-8201-4edd-8bf5-978563c34a80" MERGE_STRATEGY: str = "fa4e907d-c16b-4a4c-9dfa-4916e5d171ab" WORK_ITEM: str = "40e92b44-2fe1-4dd6-b3d8-74a9c21d0c6e" class ADOSecurityClient(ADOBaseClient): """Wrapper class around the undocumented ADO Security APIs. :param context: The context information for the client :param http_client: The HTTP client to use for the client :param log: The logger to use """ GIT_PERMISSIONS_NAMESPACE: ClassVar[str] = "2e9eb7ed-3c0a-47d4-87c1-0ffdd275fd87" def __init__( self, context: ADOContext, http_client: ADOHTTPClient, log: logging.Logger ) -> None: super().__init__(context, http_client, log.getChild("security")) def add_branch_build_policy(self, branch: str, *, build_definition_id: int) -> ADOResponse: """Adds a new build policy for a given branch. :param str branch: The git branch to set the build policy for :param int build_definition_id: The build definition to use when creating the build policy :returns: The ADO response with the data in it """ request_url = f"{self._http_client.base_url(is_project=True)}/policy/Configurations?api-version=5.1-preview.1" body = { "type": {"id": ADOBranchPolicy.BUILD.value}, "revision": 1, "isDeleted": False, "isBlocking": True, "isEnabled": True, "settings": { "buildDefinitionId": build_definition_id, "displayName": None, "queueOnSourceUpdateOnly": False, "manualQueueOnly": False, "validDuration": 0, "scope": [ { "refName": f"refs/heads/{branch}", "matchKind": "Exact", "repositoryId": self._context.repository_id, } ], }, } response = self._http_client.post(request_url, json_data=body) return self._http_client.decode_response(response) def add_branch_required_reviewers_policy( self, branch: str, *, identities: List[str] ) -> ADOResponse: """Adds required reviewers when opening PRs against a given branch. :param str branch: The git branch to set required reviewers for :param List[str] identities: A list of identities to become required reviewers (should be team foundation IDs) :returns: The ADO response with the data in it """ request_url = f"{self._http_client.base_url(is_project=True)}/policy/Configurations?api-version=5.1-preview.1" body = { "type": {"id": ADOBranchPolicy.REQUIRED_REVIEWERS.value}, "revision": 1, "isDeleted": False, "isBlocking": True, "isEnabled": True, "settings": { "requiredReviewerIds": identities, "filenamePatterns": [], "addedFilesOnly": False, "ignoreIfSourceIsInScope": False, "message": None, "scope": [ { "refName": f"refs/heads/{branch}", "matchKind": "Exact", "repositoryId": self._context.repository_id, } ], }, } response = self._http_client.post(request_url, json_data=body) return self._http_client.decode_response(response) def set_branch_approval_count_policy( self, branch: str, *, minimum_approver_count: int, creator_vote_counts: bool = False, reset_on_source_push: bool = False, ) -> ADOResponse: """Set minimum number of reviewers for a branch. :param str branch: The git branch to set minimum number of reviewers on :param int minimum_approver_count: The minimum number of approvals required :param bool creator_vote_counts: Allow users to approve their own changes :param bool reset_on_source_push: Reset reviewer votes when there are new changes :returns: The ADO response with the data in it """ request_url = f"{self._http_client.base_url(is_project=True)}/policy/Configurations?api-version=5.1-preview.1" body = { "type": {"id": ADOBranchPolicy.APPROVAL_COUNT.value}, "revision": 2, "isDeleted": False, "isBlocking": True, "isEnabled": True, "settings": { "minimumApproverCount": minimum_approver_count, "creatorVoteCounts": creator_vote_counts, "resetOnSourcePush": reset_on_source_push, "scope": [ { "refName": f"refs/heads/{branch}", "matchKind": "Exact", "repositoryId": self._context.repository_id, } ], }, } response = self._http_client.post(request_url, json_data=body) return self._http_client.decode_response(response) def set_branch_work_item_policy(self, branch: str, *, required: bool = True) -> ADOResponse: """Set the work item policy for a branch. :param str branch: The git branch to set the work item policy on :param bool required: Whether or not linked work items should be mandatory :returns: The ADO response with the data in it """ request_url = f"{self._http_client.base_url(is_project=True)}/policy/Configurations?api-version=5.1-preview.1" body = { "type": {"id": ADOBranchPolicy.WORK_ITEM.value}, "revision": 2, "isDeleted": False, "isBlocking": required, "isEnabled": True, "settings": { "scope": [ { "refName": f"refs/heads/{branch}", "matchKind": "Exact", "repositoryId": self._context.repository_id, } ] }, } response = self._http_client.post(request_url, json_data=body) return self._http_client.decode_response(response) def set_branch_permissions( self, branch: str, *, identity: TeamFoundationId, permissions: Dict[ADOBranchPermission, ADOBranchPermissionLevel], ) -> ADOResponse: """Set permissions for an identity on a branch. :param str branch: The git branch to set permissions on :param TeamFoundationId identity: The identity to set permissions for (should be team foundation ID) :param Dict[ADOBranchPermission,ADOBranchPermissionLevel] permissions: A dictionary of permissions to set :returns: The ADO response with the data in it """ descriptor_info = self._get_descriptor_info(branch, identity) request_url = self._http_client.base_url(is_project=True, is_internal=True) request_url += "/_security/ManagePermissions?__v=5" updates = [] for permission, level in permissions.items(): updates.append( { "PermissionId": level, "PermissionBit": permission, "NamespaceId": ADOSecurityClient.GIT_PERMISSIONS_NAMESPACE, "Token": self._generate_updates_token(branch), } ) package = { "IsRemovingIdentity": False, "TeamFoundationId": identity, "DescriptorIdentityType": descriptor_info["type"], "DescriptorIdentifier": descriptor_info["id"], "PermissionSetId": ADOSecurityClient.GIT_PERMISSIONS_NAMESPACE, "PermissionSetToken": self._generate_permission_set_token(branch), "RefreshIdentities": False, "Updates": updates, "TokenDisplayName": None, } body = {"updatePackage": json.dumps(package)} response = self._http_client.post(request_url, json_data=body) return self._http_client.decode_response(response) def _get_descriptor_info(self, branch: str, team_foundation_id: TeamFoundationId) -> Dict[str, str]: """Fetch the descriptor identity information for a given identity. :param str branch: The git branch of interest :param TeamFoundationId team_foundation_id: the unique Team Foundation GUID for the identity :returns: The raw descriptor info :raises ADOException: If we can't determine the descriptor info from the response """ request_url = f"{self._http_client.base_url(is_project=True, is_internal=True)}/_security/DisplayPermissions?" parameters = { "tfid": team_foundation_id, "permissionSetId": ADOSecurityClient.GIT_PERMISSIONS_NAMESPACE, "permissionSetToken": self._generate_permission_set_token(branch), "__v": "5", } request_url += urllib.parse.urlencode(parameters) response = self._http_client.get(request_url) response_data = self._http_client.decode_response(response) try: descriptor_info = { "type": response_data["descriptorIdentityType"], "id": response_data["descriptorIdentifier"], } except: raise ADOException( "Could not determine descriptor info for team_foundation_id: " + str(team_foundation_id) ) return descriptor_info def _generate_permission_set_token(self, branch: str) -> str: """Generate the token required for reading identity details and writing permissions. :param str branch: The git branch of interest :returns: The permission token """ encoded_branch = branch.replace("/", "^") return f"repoV2/{self._http_client.project_id}/{self._context.repository_id}/refs^heads^{encoded_branch}/" def _generate_updates_token(self, branch: str) -> str: """Generate the token required for updating permissions. :param str branch: The git branch of interest :returns: The update token """ # Encode each node in the branch to hex encoded_branch_nodes = [node.encode("utf-16le").hex() for node in branch.split("/")] encoded_branch = "/".join(encoded_branch_nodes) return f"repoV2/{self._http_client.project_id}/{self._context.repository_id}/refs/heads/{encoded_branch}/" PK!simple_ado/types.py"""Custom types for the library.""" # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. from typing import NewType # pylint: disable=invalid-name TeamFoundationId = NewType('TeamFoundationId', str) PK! M M simple_ado/user.py#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. """ADO user API wrapper.""" import logging from typing import cast from simple_ado.base_client import ADOBaseClient from simple_ado.context import ADOContext from simple_ado.exceptions import ADOException from simple_ado.http_client import ADOHTTPClient from simple_ado.types import TeamFoundationId class ADOUserClient(ADOBaseClient): """Wrapper class around the ADO user APIs. :param context: The context information for the client :param http_client: The HTTP client to use for the client :param log: The logger to use """ def __init__( self, context: ADOContext, http_client: ADOHTTPClient, log: logging.Logger ) -> None: super().__init__(context, http_client, log.getChild("user")) def get_team_foundation_id(self, identity: str) -> TeamFoundationId: """Fetch the unique Team Foundation GUID for a given identity. :param str identity: The identity to fetch for (should be email for users and display name for groups) :returns: The team foundation ID :raises ADOException: If we can't get the identity from the response """ request_url = self._http_client.base_url(is_default_collection=False, is_project=False) request_url += "/IdentityPicker/Identities?api-version=5.1-preview.1" body = { "query": identity, "identityTypes": ["user", "group"], "operationScopes": ["ims"], "properties": ["DisplayName", "Mail"], "filterByAncestorEntityIds": [], "filterByEntityIds": [], } response = self._http_client.post(request_url, json_data=body) response_data = self._http_client.decode_response(response) try: result = response_data["results"][0]["identities"][0] except: raise ADOException("Could not resolve identity: " + identity) if result["entityType"] == "User" and identity.lower() == result["mail"].lower(): return cast(TeamFoundationId, str(result["localId"])) if result["entityType"] == "Group" and identity.lower() == result["displayName"].lower(): return cast(TeamFoundationId, str(result["localId"])) raise ADOException("Could not resolve identity: " + identity) PK!'3%T%Tsimple_ado/workitems.py#!/usr/bin/env python3 # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. """ADO work items API wrapper.""" import datetime import enum import logging import os from typing import Any, Dict, List, Optional, Union from simple_ado.base_client import ADOBaseClient from simple_ado.context import ADOContext from simple_ado.exceptions import ADOException, ADOHTTPException from simple_ado.http_client import ADOHTTPClient, ADOResponse class BatchRequest: """The base type for a batch request. :param method: The HTTP method to use for the batch request :param uri: The URI for the batch request :param headers: The headers to be sent with the batch request """ method: str uri: str headers: Dict[str, str] def __init__(self, method: str, uri: str, headers: Dict[str, str]) -> None: self.method = method self.uri = uri self.headers = headers def body(self) -> Dict[str, Any]: """Generate the body of the request to be used in the API call. :returns: A dictionary with the raw API data for the request """ return {"method": self.method, "uri": self.uri, "headers": self.headers} class DeleteBatchRequest(BatchRequest): """A deletion batch request. :param uri: The URI for the batch request :param headers: The headers to be sent with the batch request """ def __init__(self, uri: str, headers: Optional[Dict[str, str]] = None) -> None: if headers is None: headers = {} if headers.get("Content-Type") is None: headers["Content-Type"] = "application/json-patch+json" super().__init__("DELETE", uri, headers) class WorkItemRelationType(enum.Enum): """Defines the various relationship types between work items.""" produces_for = "System.LinkTypes.Remote.Dependency-Forward" consumes_from = "System.LinkTypes.Remote.Dependency-Reverse" duplicate = "System.LinkTypes.Duplicate-Forward" duplicate_of = "System.LinkTypes.Duplicate-Reverse" blocked_by = "Microsoft.VSTS.BlockingLink-Forward" blocking = "Microsoft.VSTS.BlockingLink-Reverse" referenced_by = "Microsoft.VSTS.TestCase.SharedParameterReferencedBy-Forward" references = "Microsoft.VSTS.TestCase.SharedParameterReferencedBy-Reverse" tested_by = "Microsoft.VSTS.Common.TestedBy-Forward" tests = "Microsoft.VSTS.Common.TestedBy-Reverse" test_case = "Microsoft.VSTS.TestCase.SharedStepReferencedBy-Forward" shared_steps = "Microsoft.VSTS.TestCase.SharedStepReferencedBy-Reverse" successor = "System.LinkTypes.Dependency-Forward" predecessor = "System.LinkTypes.Dependency-Reverse" child = "System.LinkTypes.Hierarchy-Forward" parent = "System.LinkTypes.Hierarchy-Reverse" remote_related = "System.LinkTypes.Remote.Related" related = "System.LinkTypes.Related" attached_file = "AttachedFile" hyperlink = "Hyperlink" artifact_link = "ArtifactLink" class WorkItemField(enum.Enum): """Defines the various fields available on a work item. This does not include custom fields. """ area_path = "/fields/System.AreaPath" assigned_to = "/fields/System.AssignedTo" changed_date = "/fields/System.ChangedDate" closed_date = "/fields/Microsoft.VSTS.Common.ClosedDate" created_by = "/fields/System.CreatedBy" created_date = "/fields/System.CreatedDate" description = "/fields/System.Description" duplicate_id = "/fields/Office.Common.DuplicateID" history = "/fields/System.History" priority = "/fields/Microsoft.VSTS.Common.Priority" resolved_date = "/fields/Microsoft.VSTS.Common.ResolvedDate" resolved_reason = "/fields/Microsoft.VSTS.Common.ResolvedReason" state = "/fields/System.State" substatus = "/fields/Office.Common.SubStatus" tags = "/fields/System.Tags" title = "/fields/System.Title" relation = "/relations/-" class WorkItemFieldOperationType(enum.Enum): """Define the field operation types.""" add = "add" copy = "copy" move = "move" remove = "remove" replace = "replace" test = "test" class WorkItemFieldOperation: """Defines a base patch operation for sending via the ADO API. :param operation: The operation type :param path: The path to apply the operation to :param value: The value (if any) to use :param from_path: The original path (if required) """ def __init__( self, operation: WorkItemFieldOperationType, path: Union[WorkItemField, str], value: Optional[str], from_path: Optional[str] = None, ) -> None: self.operation = operation self.path = path self.value = value self.from_path = from_path def raw(self) -> Dict[str, Any]: """Generate the raw representation that is sent to the API. :returns: A dictionary with the raw API data for the request """ if isinstance(self.value, datetime.datetime): self.value = ( self.value.strftime("%Y-%m-%dT%H:%M:%S.") + str(int(self.value.microsecond / 10000)) + "Z" ) if isinstance(self.value, enum.Enum): self.value = self.value.value raw_dict = {"op": self.operation.value, "value": self.value, "from": self.from_path} if isinstance(self.path, str): raw_dict["path"] = self.path else: raw_dict["path"] = self.path.value return raw_dict def __str__(self) -> str: """Generate and return the string representation of the object. :return: A string representation of the object """ return str(self.raw()) class WorkItemFieldOperationAdd(WorkItemFieldOperation): """Defines an add operation for sending via the ADO API. :param field: The field to add :param value: The value to set for the new field """ def __init__(self, field: WorkItemField, value: Any) -> None: super().__init__(WorkItemFieldOperationType.add, field, value) class WorkItemFieldOperationDelete(WorkItemFieldOperation): """Defines a delete operation for sending via the ADO API. :param field: The field to delete """ def __init__(self, field: WorkItemField) -> None: super().__init__(WorkItemFieldOperationType.remove, field, None) class ADOWorkItemsClient(ADOBaseClient): """Wrapper class around the ADO work items APIs. :param context: The context information for the client :param http_client: The HTTP client to use for the client :param log: The logger to use """ def __init__( self, context: ADOContext, http_client: ADOHTTPClient, log: logging.Logger ) -> None: super().__init__(context, http_client, log.getChild("workitems")) def get(self, identifier: str) -> ADOResponse: """Get the data about a work item. :param identifier: The identifier of the work item :returns: The ADO response with the data in it """ self.log.debug(f"Getting work item: {identifier}") request_url = ( f"{self._http_client.base_url()}/wit/workitems/{identifier}?api-version=4.1&$expand=all" ) response = self._http_client.get(request_url) return self._http_client.decode_response(response) def get_work_item_types(self) -> ADOResponse: """Get the types of work items supported by the project. :returns: The ADO response with the data in it """ self.log.debug("Getting work item types") request_url = f"{self._http_client.base_url()}/wit/workitemtypes?api-version=4.1" response = self._http_client.get(request_url) return self._http_client.decode_response(response) def add_property( self, identifier: str, field: WorkItemField, value: Any, *, bypass_rules: bool = False, supress_notifications: bool = False, ) -> ADOResponse: """Add a property value to a work item. :param identifier: The identifier of the work item :param field: The field to add :param value: The value to set the field to :param bool bypass_rules: Set to True if we should bypass validation rules, False otherwise :param bool supress_notifications: Set to True if notifications for this change should be supressed, False otherwise :returns: The ADO response with the data in it """ self.log.debug(f"Add field '{field.value}' to ticket {identifier}") operation = WorkItemFieldOperationAdd(field, value) request_url = f"{self._http_client.base_url()}/wit/workitems/{identifier}" request_url += f"?bypassRules={bypass_rules}" request_url += f"&suppressNotifications={supress_notifications}" request_url += f"&api-version=4.1" response = self.http_client.patch( request_url, [operation.raw()], additional_headers={"Content-Type": "application/json-patch+json"}, ) return self._http_client.decode_response(response) def add_attachment( self, identifier: str, path_to_attachment: str, *, filename: Optional[str] = None, bypass_rules: bool = False, supress_notifications: bool = False, ) -> ADOResponse: """Add an attachment to a work item. :param identifier: The identifier of the work item :param path_to_attachment: The path to the attachment on disk :param Optional[str] filename: The new file name of the attachment :param bool bypass_rules: Set to True if we should bypass validation rules, False otherwise :param bool supress_notifications: Set to True if notifications for this change should be supressed, False otherwise :returns: The ADO response with the data in it :raises ADOException: If we can't get the url from the response """ self.log.debug(f"Adding attachment to {identifier}: {path_to_attachment}") if filename is None: filename = os.path.basename(path_to_attachment) filename = filename.replace("#", "_") # Upload the file request_url = ( f"{self._http_client.base_url()}/wit/attachments?fileName={filename}&api-version=1.0" ) response = self.http_client.post_file(request_url, path_to_attachment) response_data = self._http_client.decode_response(response) url = response_data.get("url") if url is None: raise ADOException(f"Failed to get url from response: {response_data}") # Attach it to the ticket operation = WorkItemFieldOperationAdd( WorkItemField.relation, {"rel": "AttachedFile", "url": url, "attributes": {"comment": ""}}, ) request_url = f"{self._http_client.base_url()}/wit/workitems/{identifier}" request_url += f"?bypassRules={bypass_rules}" request_url += f"&suppressNotifications={supress_notifications}" request_url += f"&api-version=4.1" response = self.http_client.patch( request_url, [operation.raw()], additional_headers={"Content-Type": "application/json-patch+json"}, ) return self._http_client.decode_response(response) def _add_link( self, *, parent_identifier: str, child_url: str, relation_type: WorkItemRelationType, bypass_rules: bool = False, supress_notifications: bool = False, ) -> ADOResponse: """Add a link between a parent work item and another resource. :param str parent_identifier: The identifier of the parent work item :param str child_url: The URL of the child item to link to :param WorkItemRelationType relation_type: The relationship type between the parent and the child :param bool bypass_rules: Set to True if we should bypass validation rules, False otherwise :param bool supress_notifications: Set to True if notifications for this change should be supressed, False otherwise :returns: The ADO response with the data in it """ self.log.debug(f"Adding link {parent_identifier} -> {child_url} ({relation_type})") operation = WorkItemFieldOperationAdd( WorkItemField.relation, {"rel": relation_type.value, "url": child_url, "attributes": {"comment": ""}}, ) request_url = f"{self._http_client.base_url()}/wit/workitems/{parent_identifier}" request_url += f"?bypassRules={bypass_rules}" request_url += f"&suppressNotifications={supress_notifications}" request_url += f"&api-version=4.1" response = self.http_client.patch( request_url, [operation.raw()], additional_headers={"Content-Type": "application/json-patch+json"}, ) return self._http_client.decode_response(response) def link_tickets( self, parent_identifier: str, child_identifier: str, relationship: WorkItemRelationType, *, bypass_rules: bool = False, supress_notifications: bool = False, ) -> ADOResponse: """Add a link between a parent and child work item. :param parent_identifier: The identifier of the parent work item :param child_identifier: The identifier of the child work item :param WorkItemRelationType relationship: The relationship type between the two work items :param bool bypass_rules: Set to True if we should bypass validation rules, False otherwise :param bool supress_notifications: Set to True if notifications for this change should be supressed, False otherwise :returns: The ADO response with the data in it """ child_url = ( f"{self._http_client.base_url(is_project=False)}/wit/workitems/{child_identifier}" ) return self._add_link( parent_identifier=parent_identifier, child_url=child_url, relation_type=relationship, bypass_rules=bypass_rules, supress_notifications=supress_notifications, ) def add_hyperlink( self, identifier: str, hyperlink: str, *, bypass_rules: bool = False, supress_notifications: bool = False, ) -> ADOResponse: """Add a hyperlink link to a work item. :param identifier: The identifier of the work item :param hyperlink: The hyperlink to add to the work item :param bool bypass_rules: Set to True if we should bypass validation rules, False otherwise :param bool supress_notifications: Set to True if notifications for this change should be supressed, False otherwise :returns: The ADO response with the data in it """ return self._add_link( parent_identifier=identifier, child_url=hyperlink, relation_type=WorkItemRelationType.hyperlink, bypass_rules=bypass_rules, supress_notifications=supress_notifications, ) def create( self, item_type: str, operations: List[WorkItemFieldOperationAdd], *, bypass_rules: bool = False, supress_notifications: bool = False, ) -> ADOResponse: """Create a new work item. :param item_type: The type of work item to create :param operations: The list of add operations to use to create the ticket :param bool bypass_rules: Set to True if we should bypass validation rules, False otherwise :param bool supress_notifications: Set to True if notifications for this change should be supressed, False otherwise :returns: The ADO response with the data in it """ self.log.debug(f"Creating a new {item_type}") request_url = f"{self._http_client.base_url()}/wit/workitems/${item_type}" request_url += f"?bypassRules={bypass_rules}" request_url += f"&suppressNotifications={supress_notifications}" request_url += f"&api-version=4.1" response = self.http_client.post( request_url, [operation.raw() for operation in operations], additional_headers={"Content-Type": "application/json-patch+json"}, ) return self._http_client.decode_response(response) def update( self, identifier: str, operations: List[WorkItemFieldOperation], *, bypass_rules: bool = False, supress_notifications: bool = False, ) -> ADOResponse: """Update a work item. :param identifier: The identifier of the work item :param operations: The list of operations to use to update the ticket :param bool bypass_rules: Set to True if we should bypass validation rules, False otherwise :param bool supress_notifications: Set to True if notifications for this change should be supressed, False otherwise :returns: The ADO response with the data in it """ self.log.debug(f"Updating {identifier}") request_url = f"{self._http_client.base_url()}/wit/workitems/{identifier}" request_url += f"?bypassRules={bypass_rules}" request_url += f"&suppressNotifications={supress_notifications}" request_url += f"&api-version=4.1" response = self.http_client.patch( request_url, [operation.raw() for operation in operations], additional_headers={"Content-Type": "application/json-patch+json"}, ) return self._http_client.decode_response(response) def execute_query(self, query_string: str) -> ADOResponse: """Execute a WIQL query. :param query_string: The WIQL query string to execute :returns: The ADO response with the data in it """ self.log.debug(f"Executing query: {query_string}") request_url = f"{self._http_client.base_url()}/wit/wiql?api-version=4.1" response = self.http_client.post(request_url, {"query": query_string}) return self._http_client.decode_response(response) def delete( self, identifier: str, *, permanent: bool = False, supress_notifications: bool = False ) -> ADOResponse: """Delete a work item. :param identifier: The identifier of the work item :param bool permanent: Set to True if we should permanently delete the work item, False otherwise :param bool supress_notifications: Set to True if notifications for this change should be supressed, False otherwise :returns: The ADO response with the data in it :raises ADOHTTPException: Raised if the response code is not 204 (No Content) """ self.log.debug(f"Deleting {identifier}") request_url = f"{self._http_client.base_url()}/wit/workitems/{identifier}" request_url += f"?suppressNotifications={supress_notifications}" request_url += f"&destroy={str(permanent).lower()}" request_url += f"&api-version=4.1" response = self.http_client.delete( request_url, additional_headers={"Content-Type": "application/json-patch+json"} ) if response.status_code != 204: raise ADOHTTPException( f"Failed to delete '{identifier}'", response.status_code, response.text ) return self._http_client.decode_response(response) def batch(self, operations: List[BatchRequest]) -> ADOResponse: """Run a batch operation. :param operations: The list of batch operations to run :returns: The ADO response with the data in it :raises ADOException: Raised if we try and run more than 200 batch operations at once """ if len(operations) >= 200: raise ADOException("Cannot perform more than 200 batch operations at once") self.log.debug("Running batch operation") full_body = [] for operation in operations: full_body.append(operation.body()) request_url = f"{self._http_client.base_url(is_project=False)}/wit/$batch" response = self.http_client.post(request_url, full_body) return self._http_client.decode_response(response) PK!AwN"simple_ado-0.4.0.dist-info/LICENSE MIT License Copyright (c) Microsoft Corporation. All rights reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE PK!HnHTU simple_ado-0.4.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hy&#simple_ado-0.4.0.dist-info/METADATAU]6|XY) $isr&ITH>w8Hܛ$Rs R '\OYaTM8^?bVt]'HgZC(Z0|ZC~w#~NEϦӝ EIsv}޿QX{ ߬kiƽm|¸ u$wDi4ej(%?ơu_Lh40Yw 3Cb|P |%zyƃdI@[`u k!8,1jig Nb24mH(6dKb)c=H@54hՖJa]8%@Aᆠ'ڛ =k:i1OӏXRUjW0Iow;.sKR\olaK:Hamr>8ǠQұ!&)WǙ]'TesIN)Z/N8S3)"_S|x UOihƊ>3)n* 4ȲOĀ% }k%Q=Q h'։jۂa4>E6*d蛿[以q>Q4߈@!a>ڲ>V)c5 .$:)t -!/Bzbd+84&>*{vM~O{ .V0TQݪ?GZY^ƚ5ukwEx"[q:{Șڻ;~<&CAHm,!'u=/PK!HTs 5!simple_ado-0.4.0.dist-info/RECORD}ɒHɼL←$Ao7uOߏࣻ%o& l$_݂0?񴸃x8Ԥ~kj+aͱ^PdwX4F\EF' SWi]j sOpmR=S0G~b=Fm{jZh0]匧N"{$Ѵ3'ṯ*hП\36 V$/)ꔨ8i}ǧ%;&>Gްt6Nm=$M/5r\AČ[}S۽1ƈ~{įz9,Z'Mn zEgɻ1@䖚ھCؖL*G: ezd@F"dG:b 1Fwncbd-_C4)ۿ!Ǧ# 4-8ݺ58Ѭ+Ǫ~!SёY?8Wh>S\|\M:3o9@l0x7"|ݫVh8!UY"E[of;q /PK!Rsimple_ado/__init__.pyPK!asimple_ado/base_client.pyPK!6w !simple_ado/builds.pyPK!D6J<< simple_ado/comments.pyPK!t큏3simple_ado/context.pyPK!AC큣6simple_ado/exceptions.pyPK! [q:simple_ado/git.pyPK!f*&2&2큡Wsimple_ado/http_client.pyPK!ȱ_Y00simple_ado/pull_requests.pyPK!nT :0:0simple_ado/security.pyPK!asimple_ado/types.pyPK! M M qsimple_ado/user.pyPK!'3%T%Tsimple_ado/workitems.pyPK!AwN"HJsimple_ado-0.4.0.dist-info/LICENSEPK!HnHTU 'Osimple_ado-0.4.0.dist-info/WHEELPK!Hy&#Osimple_ado-0.4.0.dist-info/METADATAPK!HTs 5!Ssimple_ado-0.4.0.dist-info/RECORDPK?W