PK!FU>>simple_smartsheet/__init__.pyfrom .smartsheet import Smartsheet __all__ = ("Smartsheet",) PK!c4 00simple_smartsheet/config.pyfrom .utils import is_debug DEBUG = is_debug() PK!J,,simple_smartsheet/constants.pyAPI_ROOT = "https://api.smartsheet.com/2.0" PK!Psimple_smartsheet/crud.pyimport logging from typing import ( TypeVar, Dict, Any, Generic, Optional, cast, Type, List, Iterator, TYPE_CHECKING, Sequence, ) from simple_smartsheet import exceptions from simple_smartsheet.models.base import CoreObject if TYPE_CHECKING: from simple_smartsheet.smartsheet import Smartsheet # noqa: F401 from simple_smartsheet.models.extra import Result logger = logging.getLogger(__name__) TS = TypeVar("TS", bound=CoreObject) # noinspection PyShadowingBuiltins class CRUDRead(Generic[TS]): base_url = "" _get_url: Optional[str] = None _list_url: Optional[str] = None get_include_fields: Optional[Sequence[str]] = None get_exclude_fields: Sequence[str] = () list_include_fields: Optional[Sequence[str]] = None list_exclude_fields: Sequence[str] = () factory: Type[TS] = cast(Type[TS], CoreObject) def __init__(self, smartsheet: "Smartsheet") -> None: self.smartsheet = smartsheet def __iter__(self) -> Iterator[TS]: return iter(self.list()) @property def get_url(self) -> str: return self._get_url or self.base_url + "/{id}" @property def list_url(self) -> str: return self._list_url or self.base_url def get_id(self, name: str) -> int: for obj in self.list(): if obj._name == name: id_ = obj._id if id_ is None: raise ValueError( f"{self.factory.__qualname__} object with the name {name!r} " f"does not have an id" ) return id_ raise exceptions.SmartsheetObjectNotFound( f"{self.factory.__qualname__} object with the name {name!r} " f"has not been found" ) def get( self, name: Optional[str] = None, id: Optional[int] = None, **kwargs: Any ) -> TS: """Fetches a CoreObject by name or id. Args: name: name of the object id: id of the object Returns: CoreObject """ if id: endpoint = self.get_url.format(id=id) obj_data = cast(Dict[str, Any], self.smartsheet.get(endpoint, path=None)) logger.debug( "Creating an object %s from data: %s", repr(self.factory.__qualname__), str(obj_data), ) obj = self.factory.load( obj_data, self.get_include_fields, self.get_exclude_fields, **kwargs ) if not obj.id: obj.id = id obj.smartsheet = self.smartsheet return obj elif name: id_ = self.get_id(name) return self.get(id=id_, **kwargs) raise ValueError(f"To use get method, either name or id must be provided") def list(self) -> List[TS]: """Fetches a list of CoreObject objects. Note: API usually returns an incomplete view of objects. For example: /sheets will return a list of sheets without columns or rows Returns: CoreObject """ result = [] for obj_data in self.smartsheet.get( self.list_url, params={"includeAll": "true"} ): logger.debug( "Creating an object '%s' from data: %s", self.factory.__qualname__, str(obj_data), ) obj_data_ = cast(Dict[str, Any], obj_data) obj = self.factory.load( obj_data_, self.list_include_fields, self.list_exclude_fields ) obj.smartsheet = self.smartsheet result.append(obj) return result # noinspection PyShadowingBuiltins class CRUD(CRUDRead[TS]): _update_url: Optional[str] = None _create_url: Optional[str] = None _delete_url: Optional[str] = None create_include_fields: Optional[Sequence[str]] = None create_exclude_fields: Sequence[str] = () update_include_fields: Optional[Sequence[str]] = None update_exclude_fields: Sequence[str] = () factory: Type[TS] = cast(Type[TS], CoreObject) @property def create_url(self) -> str: return self._create_url or self.base_url @property def update_url(self) -> str: return self._update_url or self.base_url + "/{id}" @property def delete_url(self) -> str: return self._delete_url or self.base_url + "/{id}" def create(self, obj: TS) -> "Result": """Creates CoreObject Args: obj: CoreObject Returns: Result object """ obj.smartsheet = self.smartsheet endpoint = self.create_url.format(obj=obj) result = self.smartsheet.post( endpoint, obj.dump( only=self.create_include_fields, exclude=self.create_exclude_fields ), ) return cast("Result", result) def update(self, obj: TS) -> "Result": """Updates CoreObject Args: obj: CoreObject Returns: Result object """ obj.smartsheet = self.smartsheet endpoint = self.update_url.format(id=obj.id) result = self.smartsheet.put( endpoint, obj.dump( only=self.update_include_fields, exclude=self.update_exclude_fields ), ) return cast("Result", result) def delete(self, name: Optional[str] = None, id: Optional[int] = None) -> "Result": """Deletes CoreObject by name or id Args: name: CoreObject name attribute id: CoreObject id attribute Returns: Result object """ if id: endpoint = self.delete_url.format(id=id) return self.smartsheet.delete(endpoint) elif name: id_ = self.get_id(name) return self.delete(id=id_) raise ValueError(f"To use delete method, either name or id must be provided") PK!`"simple_smartsheet/exceptions.pyimport json from typing import Optional from requests import Response from simple_smartsheet.models.extra import Error class SmartsheetError(Exception): pass class SmartsheetHTTPError(SmartsheetError): def __init__(self, http_response_code: int, error: Error = None, message: str = ""): result_msg_items = [f"HTTP response code {http_response_code}"] if error is not None: result_msg_items.append(f"Error code {error.error_code}") if error.message: message = error.message if message: result_msg_items.append(message) result_msg = " - ".join(result_msg_items) super().__init__(result_msg) self.http_response_code = http_response_code self.message = message self.error = error @classmethod def from_response(cls, response: Response): http_response_code = response.status_code error: Optional[Error] = None message = "" if response.text: try: error = Error.load(response.json()) except json.JSONDecodeError: message = response.text if 400 <= http_response_code < 500: return SmartsheetHTTPClientError(http_response_code, error, message) elif 500 <= http_response_code < 600: return SmartsheetHTTPServerError(http_response_code, error, message) else: return cls(http_response_code, error, message) class SmartsheetHTTPClientError(SmartsheetHTTPError): pass class SmartsheetHTTPServerError(SmartsheetHTTPError): pass class SmartsheetObjectNotFound(SmartsheetError): pass class SmartsheetIndexNotFound(SmartsheetError): pass class SmartsheetIndexNotUnique(SmartsheetError): pass PK!+$simple_smartsheet/models/__init__.py# from . import base, cell, column, extra, row, sheet, report # noqa: F401 from .cell import Cell from .column import Column from .sheet import Sheet # sheet must be before row from .row import Row from .report import Report __all__ = ("Cell", "Column", "Row", "Sheet", "Report") PK!t simple_smartsheet/models/base.pyimport logging import copy from datetime import datetime from typing import ( TypeVar, Dict, Any, Optional, Type, TYPE_CHECKING, Union, ClassVar, Sequence, ) import attr import marshmallow from marshmallow import fields from cattr.converters import Converter from simple_smartsheet import config from simple_smartsheet import utils from simple_smartsheet.types import IndexesType if TYPE_CHECKING: from simple_smartsheet.smartsheet import Smartsheet # noqa: F401 logger = logging.getLogger(__name__) converter = Converter() converter.register_structure_hook(datetime, lambda ts, _: ts) converter.register_structure_hook(IndexesType, lambda x, _: x) converter.register_structure_hook(Union[float, str, datetime, None], lambda ts, _: ts) class Schema(marshmallow.Schema): class Meta: unknown = utils.get_unknown_field_handling(config.DEBUG) @marshmallow.post_dump def remove_none(self, data): return {key: value for key, value in data.items() if value is not None} class CoreSchema(Schema): id = fields.Int() name = fields.Str() T = TypeVar("T", bound="Object") @attr.s(auto_attribs=True, repr=False, kw_only=True) class Object: _schema: ClassVar[Type[Schema]] = Schema @classmethod def load( cls: Type[T], data: Dict[str, Any], only: Optional[Sequence[str]] = None, exclude: Sequence[str] = (), **kwargs: Any, ) -> T: schema = cls._schema(only=only, exclude=exclude) normalized_data = schema.load(data) normalized_data.update(kwargs) obj = converter.structure(normalized_data, cls) return obj def dump( self, only: Optional[Sequence[str]] = None, exclude: Sequence[str] = () ) -> Dict[str, Any]: schema = self._schema(only=only, exclude=exclude) return schema.dump(self) def __repr__(self) -> str: if hasattr(self, "id") and hasattr(self, "name"): attrs = ["name", "id"] elif hasattr(self, "id"): attrs = ["id"] elif hasattr(self, "name"): attrs = ["name"] else: return super().__repr__() return utils.create_repr(self, attrs) def copy(self: T, deep: bool = True) -> T: if deep: return copy.deepcopy(self) else: return copy.copy(self) @attr.s(auto_attribs=True, repr=False, kw_only=True) class CoreObject(Object): name: str id: Optional[int] = None _schema: ClassVar[Type[CoreSchema]] = CoreSchema smartsheet: Optional["Smartsheet"] = attr.ib(default=None, init=False) @property def _id(self) -> Optional[int]: return getattr(self, "id") @property def _name(self) -> str: return getattr(self, "name") PK!QII simple_smartsheet/models/cell.pyimport logging from datetime import datetime, date from typing import Optional, Union, ClassVar, Type, Any, List import attr import marshmallow.utils from marshmallow import fields, post_load from simple_smartsheet import utils from simple_smartsheet.models.base import Schema, Object from simple_smartsheet.models.extra import Hyperlink, HyperlinkSchema logger = logging.getLogger(__name__) # noinspection PyShadowingNames class CellValueField(fields.Field): def _serialize(self, value, attr, obj, **kwargs): if isinstance(value, datetime): return marshmallow.utils.isoformat(value) elif isinstance(value, date): return marshmallow.utils.to_iso_date(value) else: return super()._serialize(value, attr, obj, **kwargs) def _deserialize(self, value, attr, data, **kwargs): column_id_to_type = self.context["column_id_to_type"] if "virtualColumnId" in data: column_id = data["virtualColumnId"] else: column_id = data["columnId"] column_type = column_id_to_type[column_id] if not value: return value if column_type == "DATE": try: return marshmallow.utils.from_iso_date(value) except ValueError: logger.warning("Cell value %r is not a valid date", value) return value elif column_type in ("DATETIME", "ABSTRACT_DATETIME"): try: return marshmallow.utils.from_iso_datetime(value) except ValueError: logger.info("Cell value %r is not a valid datetime ", value) return value return value class CellSchema(Schema): column_id = fields.Int(data_key="columnId") column_type = fields.Str(data_key="columnType") conditional_format = fields.Str(data_key="conditionalFormat") display_value = fields.Str(data_key="displayValue") format = fields.Str() formula = fields.Str() hyperlink = fields.Nested(HyperlinkSchema) image = fields.Field() # TODO: Image object link_in_from_cell = fields.Field(data_key="linkInFromCell") # TODO: CellLink object link_out_to_cells = fields.List( fields.Field(), data_key="linksOutToCells" ) # TODO: CellLink object object_value = fields.Field(data_key="objectValue") # TODO: ObjectValue object override_validation = fields.Bool(data_key="overrideValidation") strict = fields.Bool() value = CellValueField() @post_load def fix_checkbox_value(self, data): column_id_to_type = self.context["column_id_to_type"] if "virtual_column_id" in data: column_id = data["virtual_column_id"] else: column_id = data["column_id"] column_type = column_id_to_type[column_id] if column_type == "CHECKBOX" and "value" not in data: data["value"] = False return data @attr.s(auto_attribs=True, repr=False, kw_only=True) class Cell(Object): column_id: Optional[int] = None column_type: Optional[str] = None conditional_format: Optional[str] = None display_value: Optional[str] = None format: Optional[str] = None formula: Optional[str] = None hyperlink: Optional[Hyperlink] = None image: Optional[Any] = None # TODO: Image object link_in_from_cell: Optional[Any] = None # TODO: CellLink object link_out_to_cells: Optional[List[Any]] = None # TODO: CellLink object object_value: Optional[Any] = None # TODO: ObjectValue object override_validation: Optional[bool] = None strict: bool = True value: Union[float, str, datetime, None] = None _schema: ClassVar[Type[CellSchema]] = CellSchema def __repr__(self) -> str: return utils.create_repr(self, ["column_id", "value"]) @property def _column_id(self) -> Optional[int]: return self.column_id PK!4k "simple_smartsheet/models/column.pyfrom typing import Optional, List import attr from marshmallow import fields, post_load from simple_smartsheet import utils from simple_smartsheet.models.base import Schema, Object from simple_smartsheet.models.extra import AutoNumberFormatSchema, AutoNumberFormat class ContactOptionSchema(Schema): email = fields.Str() name = fields.Str() @attr.s(auto_attribs=True, kw_only=True) class ContactOption(Object): email: str name: Optional[str] = None class ColumnSchema(Schema): id = fields.Int() system_column_type = fields.Str(data_key="systemColumnType") type = fields.Str() # TODO: should be enum auto_number_format = fields.Nested( AutoNumberFormatSchema, data_key="autoNumberFormat" ) contact_options = fields.Nested( ContactOptionSchema, data_key="contactOptions", many=True ) format = fields.Str() hidden = fields.Bool() index = fields.Int() locked = fields.Bool() locked_for_user = fields.Bool(data_key="lockedForUser") options = fields.List(fields.Str()) primary = fields.Bool() symbol = fields.Str() tags = fields.List(fields.Str()) title = fields.Str() validation = fields.Bool() version = fields.Int() width = fields.Int() @property def _id_attr(self): return "id" @post_load def post_load_update_parent_context(self, data): column_id_to_type = self.context["column_id_to_type"] id_ = data[self._id_attr] type_ = data["type"] column_id_to_type[id_] = type_ return data @attr.s(auto_attribs=True, repr=False, kw_only=True) class Column(Object): id: Optional[int] = None system_column_type: Optional[str] = None type: Optional[str] = None auto_number_format: Optional[AutoNumberFormat] = None contact_options: Optional[List[ContactOption]] = None format: Optional[str] = None hidden: Optional[bool] = None index: Optional[int] = None locked: Optional[bool] = None locked_for_user: Optional[bool] = None options: Optional[List[str]] = None primary: Optional[bool] = None symbol: Optional[str] = None tags: Optional[List[str]] = None title: Optional[str] = None validation: Optional[bool] = None version: Optional[int] = None width: Optional[int] = None @property def _id(self) -> Optional[int]: return self.id def __repr__(self) -> str: return utils.create_repr(self, ["id", "title"]) PK!6C||!simple_smartsheet/models/extra.pyfrom typing import Optional, ClassVar, Type, List, Any import attr from marshmallow import fields from simple_smartsheet.models.base import Schema, Object class AutoNumberFormatSchema(Schema): fill = fields.Str() prefix = fields.Str() starting_num = fields.Str(data_key="startingNumber") suffix = fields.Str() @attr.s(auto_attribs=True, kw_only=True) class AutoNumberFormat(Object): fill: Optional[str] = None prefix: Optional[str] = None starting_num: Optional[int] = None suffix: Optional[str] = None class ErrorSchema(Schema): error_code = fields.Int(data_key="errorCode") message = fields.Str() ref_id = fields.Str(data_key="refId") detail = fields.Field() @attr.s(auto_attribs=True, kw_only=True) class Error(Object): error_code: int message: str ref_id: str detail: Optional[Any] = None _schema: ClassVar[Type[ErrorSchema]] = ErrorSchema class ResultSchema(Schema): failed_items = fields.List(fields.Field(), data_key="failedItems") message = fields.Str() result = fields.Field() result_code = fields.Int(data_key="resultCode") version = fields.Int() @attr.s(auto_attribs=True, kw_only=True) class Result(Object): failed_items: List[Any] = attr.Factory(list) message: Optional[str] = None result: Optional[Any] = None result_code: Optional[int] = None version: Optional[int] = None _schema: ClassVar[Type[ResultSchema]] = ResultSchema class HyperlinkSchema(Schema): report_id = fields.Int(data_key="reportId") sheet_id = fields.Int(data_key="sheetId") sight_id = fields.Int(data_key="sightId") url = fields.Str() @attr.s(auto_attribs=True, kw_only=True) class Hyperlink(Object): url: str report_id: Optional[int] = None sheet_id: Optional[int] = None sight_id: Optional[int] = None _schema: ClassVar[Type[HyperlinkSchema]] = HyperlinkSchema PK!J>  "simple_smartsheet/models/report.pyimport logging from typing import Optional, List import attr from marshmallow import fields from simple_smartsheet.crud import CRUDRead from simple_smartsheet.models.cell import Cell, CellSchema from simple_smartsheet.models.column import Column, ColumnSchema from simple_smartsheet.models.row import _RowBase, RowSchema from simple_smartsheet.models.sheet import Sheet, _SheetBase, SheetSchema logger = logging.getLogger(__name__) class ReportCellSchema(CellSchema): virtual_column_id = fields.Int(data_key="virtualColumnId") @attr.s(auto_attribs=True, repr=False, kw_only=True) class ReportCell(Cell): virtual_column_id: Optional[int] = None _schema = ReportCellSchema @property def _column_id(self) -> Optional[int]: return self.virtual_column_id class ReportRowSchema(RowSchema): cells = fields.Nested(ReportCellSchema, many=True) sheet_id = fields.Int(data_key="sheetId") @attr.s(auto_attribs=True, repr=False, kw_only=True) class ReportRow(_RowBase[ReportCell]): sheet_id: Optional[int] = None cells: List[ReportCell] = attr.Factory(list) _schema = ReportRowSchema class ReportColumnSchema(ColumnSchema): virtual_id = fields.Int(data_key="virtualId") sheet_name_column = fields.Bool(data_key="sheetNameColumn") @property def _id_attr(self): return "virtual_id" @attr.s(auto_attribs=True, repr=False, kw_only=True) class ReportColumn(Column): virtual_id: Optional[int] = None sheet_name_column: Optional[bool] = None _schema = ReportColumnSchema @property def _id(self) -> Optional[int]: return self.virtual_id class ReportSchema(SheetSchema): """Marshmallow Schema for Smartsheet Report object Additional details about fields can be found here: http://smartsheet-platform.github.io/api-docs/#reports """ columns = fields.Nested(ReportColumnSchema, many=True) rows = fields.Nested(ReportRowSchema, many=True) source_sheets = fields.Nested(SheetSchema, many=True, data_key="sourceSheets") @attr.s(auto_attribs=True, repr=False, kw_only=True) class Report(_SheetBase[ReportRow, ReportColumn]): """Represents Smartsheet Report object Additional details about fields can be found here: http://smartsheet-platform.github.io/api-docs/#reports """ columns: List[ReportColumn] = attr.Factory(list) rows: List[ReportRow] = attr.Factory(list) source_sheets: List[Sheet] = attr.Factory(list) _schema = ReportSchema class ReportCRUD(CRUDRead[Report]): base_url = "/reports" factory = Report PK!p7simple_smartsheet/models/row.pyimport logging from datetime import datetime from typing import Generic, Optional, List, Dict, Any, ClassVar, Type, TypeVar, cast import attr from marshmallow import fields from simple_smartsheet import utils from simple_smartsheet.models import sheet as sheet_models from simple_smartsheet.models.base import Schema, Object from simple_smartsheet.models.cell import Cell, CellSchema from simple_smartsheet.models.column import Column, ColumnSchema logger = logging.getLogger(__name__) class RowSchema(Schema): id = fields.Int() sheet_id = fields.Int(data_key="sheetId") access_level = fields.Str(data_key="accessLevel") attachments = fields.List(fields.Field()) # TODO: Attachment object conditional_format = fields.Str(data_key="conditionalFormat") created_at = fields.DateTime(data_key="createdAt") created_by = fields.Field(data_key="createdBy") # TODO: User object discussions = fields.List(fields.Field()) # TODO: Discussion object expanded = fields.Bool() filtered_out = fields.Bool(data_key="filteredOut") format = fields.Str() in_critical_path = fields.Bool(data_key="inCriticalPath") locked = fields.Bool() locked_for_user = fields.Bool(data_key="lockedForUser") modified_at = fields.DateTime(data_key="modifiedAt") modified_by = fields.Field(data_key="modifiedBy") # TODO: User object num = fields.Int(data_key="rowNumber") permalink = fields.Str() version = fields.Int() cells = fields.Nested(CellSchema, many=True) columns = fields.Nested(ColumnSchema, many=True) # location-specifier attributes parent_id = fields.Int(data_key="parentId") sibling_id = fields.Int(data_key="siblingId") above = fields.Bool() indent = fields.Int() outdent = fields.Int() to_bottom = fields.Bool(data_key="toBottom") to_top = fields.Bool(data_key="toTop") CellT = TypeVar("CellT", bound=Cell) RowT = TypeVar("RowT", bound="_RowBase[Any]") ColumnT = TypeVar("ColumnT", bound=Column) @attr.s(auto_attribs=True, repr=False, kw_only=True) class _RowBase(Object, Generic[CellT]): id: Optional[int] = None sheet_id: Optional[int] = None access_level: Optional[str] = None attachments: List[Any] = attr.Factory(list) conditional_format: Optional[str] = None created_at: Optional[datetime] = None created_by: Optional[Any] = None discussions: List[Any] = attr.Factory(list) expanded: Optional[bool] = None filtered_out: Optional[bool] = None format: Optional[str] = None in_critical_path: Optional[bool] = None locked: Optional[bool] = None locked_for_user: Optional[bool] = None modified_at: Optional[datetime] = None modified_by: Optional[Any] = None num: Optional[int] = None permalink: Optional[str] = None version: Optional[int] = None cells: List[CellT] = attr.Factory(list) columns: List[Column] = attr.Factory(list) # location-specified attributes parent_id: Optional[int] = None sibling_id: Optional[int] = None above: Optional[bool] = None indent: Optional[int] = None outdent: Optional[int] = None to_bottom: Optional[bool] = None to_top: Optional[bool] = None # index column_title_to_cell: Dict[str, CellT] = attr.Factory(dict) column_id_to_cell: Dict[int, CellT] = attr.Factory(dict) _schema: ClassVar[Type[RowSchema]] = RowSchema def __repr__(self) -> str: return utils.create_repr(self, ["id", "num"]) def _update_cell_lookup( self, sheet: "sheet_models._SheetBase[RowT, ColumnT]" ) -> None: self.column_title_to_cell.clear() self.column_id_to_cell.clear() for cell in self.cells: column_id = cell._column_id if column_id is None: continue column = sheet.get_column(column_id=column_id) column_title = column.title self.column_id_to_cell[column_id] = cell if column_title is not None: self.column_title_to_cell[column_title] = cell def _update_index(self, sheet: "sheet_models._SheetBase[RowT, ColumnT]") -> None: for index_key, index_dict in sheet.indexes.items(): index = index_dict["index"] unique = index_dict["unique"] if isinstance(index_key, str): key = self.get_cell(index_key).value else: key = tuple( self.get_cell(column_title).value for column_title in index_key ) if unique: index[key] = self else: container = cast(List[_RowBase[CellT]], index.setdefault(key, [])) container.append(self) def get_cell( self, column_title: Optional[str] = None, column_id: Optional[int] = None ) -> CellT: if column_title is not None: return self.column_title_to_cell[column_title] elif column_id is not None: return self.column_id_to_cell[column_id] else: raise ValueError( "Either column_title or column_id argument should be provided" ) def as_dict(self) -> Dict[str, Any]: """Returns a dictionary of column title to cell value""" return { column_title: cell.value for column_title, cell in self.column_title_to_cell.items() } @attr.s(auto_attribs=True, repr=False, kw_only=True) class Row(_RowBase[Cell]): cells: List[Cell] = attr.Factory(list) PK!i8)HH!simple_smartsheet/models/sheet.pyimport logging from datetime import datetime from typing import ( Optional, Dict, List, ClassVar, Generic, Type, TypeVar, Sequence, Tuple, Any, Union, cast, ) import attr from marshmallow import fields, pre_load from simple_smartsheet import exceptions from simple_smartsheet.crud import CRUD from simple_smartsheet.types import IndexKeysDict, IndexesType from simple_smartsheet.models.base import Schema, CoreSchema, Object, CoreObject from simple_smartsheet.models.cell import Cell from simple_smartsheet.models.column import Column, ColumnSchema from simple_smartsheet.models.row import Row, RowSchema, _RowBase from simple_smartsheet.models.extra import Result logger = logging.getLogger(__name__) class UserSettingsSchema(Schema): critical_path_enabled = fields.Bool(data_key="criticalPathEnabled") display_summary_tasks = fields.Bool(data_key="displaySummaryTasks") @attr.s(auto_attribs=True, repr=False, kw_only=True) class UserSettings(Object): critical_path_enabled: bool display_summary_tasks: bool class WorkspaceSchema(Schema): id = fields.Int() name = fields.Str() @attr.s(auto_attribs=True, repr=False, kw_only=True) class Workspace(Object): id: int name: str class SheetSchema(CoreSchema): """Marshmallow Schema for Smartsheet Sheet object Additional details about fields can be found here: http://smartsheet-platform.github.io/api-docs/#sheets """ id = fields.Int() name = fields.Str() access_level = fields.Str(data_key="accessLevel") permalink = fields.Str() favorite = fields.Bool() created_at = fields.DateTime(data_key="createdAt") modified_at = fields.DateTime(data_key="modifiedAt") version = fields.Int() total_row_count = fields.Int(data_key="totalRowCount") effective_attachment_options = fields.List( fields.Str(), data_key="effectiveAttachmentOptions" ) gantt_enabled = fields.Bool(data_key="ganttEnabled") dependencies_enabled = fields.Bool(data_key="dependenciesEnabled") resource_management_enabled = fields.Bool(data_key="resourceManagementEnabled") cell_image_upload_enabled = fields.Bool(data_key="cellImageUploadEnabled") user_settings = fields.Nested(UserSettingsSchema, data_key="userSettings") columns = fields.Nested(ColumnSchema, many=True) rows = fields.Nested(RowSchema, many=True) workspace = fields.Nested(WorkspaceSchema) class Meta: ordered = True @pre_load() def update_context(self, data): self.context["column_id_to_type"] = {} return data RowT = TypeVar("RowT", bound=_RowBase[Any]) ColumnT = TypeVar("ColumnT", bound=Column) @attr.s(auto_attribs=True, repr=False, kw_only=True) class _SheetBase(CoreObject, Generic[RowT, ColumnT]): """Represents Smartsheet Sheet object Additional details about fields can be found here: http://smartsheet-platform.github.io/api-docs/#sheets Extra attributes: indexes: contains all built indices """ name: str id: Optional[int] = None access_level: Optional[str] = None permalink: Optional[str] = None favorite: Optional[bool] = None created_at: Optional[datetime] = None modified_at: Optional[datetime] = None version: Optional[int] = None total_row_count: Optional[int] = None effective_attachment_options: List[str] = attr.Factory(list) gantt_enabled: Optional[bool] = None dependencies_enabled: Optional[bool] = None resource_management_enabled: Optional[bool] = None cell_image_upload_enabled: Optional[bool] = None user_settings: Optional[UserSettings] = None columns: List[ColumnT] = attr.Factory(list) rows: List[RowT] = attr.Factory(list) workspace: Optional[Workspace] = None _row_num_to_row: Dict[int, RowT] = attr.ib(attr.Factory(dict), init=False) _row_id_to_row: Dict[int, RowT] = attr.ib(attr.Factory(dict), init=False) _column_title_to_column: Dict[str, ColumnT] = attr.ib( attr.Factory(dict), init=False ) _column_id_to_column: Dict[int, ColumnT] = attr.ib(attr.Factory(dict), init=False) indexes: IndexesType = attr.ib(attr.Factory(dict), init=False) _schema: ClassVar[Type[SheetSchema]] = SheetSchema def __attrs_post_init__(self) -> None: self._update_column_lookup() self._update_row_cell_lookup() def _update_column_lookup(self) -> None: self._column_title_to_column.clear() self._column_id_to_column.clear() for column in self.columns: column_id = column._id if column_id is None: continue self._column_id_to_column[column_id] = column column_title = column.title if column_title is None: continue if column_title in self._column_title_to_column: logger.info( "Column with the title %s is already present in the index", column_title, ) self._column_title_to_column[column_title] = column def _update_row_cell_lookup(self) -> None: self._row_num_to_row.clear() self._row_id_to_row.clear() for row in self.rows: if row.num: self._row_num_to_row[row.num] = row if row.id: self._row_id_to_row[row.id] = row row._update_cell_lookup(self) def build_index(self, indexes: List[IndexKeysDict]) -> None: for index in indexes: columns = index["columns"] unique = index["unique"] self.indexes[columns] = {"index": {}, "unique": unique} for row in self.rows: row._update_index(self) def get_row( self, row_num: Optional[int] = None, row_id: Optional[int] = None, filter: Optional[Dict[str, Any]] = None, ) -> Optional[RowT]: """Returns Row object by row number or ID Either row_num or row_id must be provided Args: row_num: row number row_id: row id filter: a dictionary with column title to value mappings in the same order as index was built. Index must be unique. Returns: Row object """ if row_num is not None: return self._row_num_to_row.get(row_num) elif row_id is not None: return self._row_id_to_row.get(row_id) elif filter is not None: columns, query = zip(*sorted(filter.items())) index_dict = self.indexes.get(columns) if index_dict is None: raise exceptions.SmartsheetIndexNotFound( f"Index {columns} is not found, " f"build it first with build_index method" ) unique = index_dict["unique"] if not unique: raise exceptions.SmartsheetIndexNotUnique( f"Index {columns} is non-unique and lookup will potentially " "return multiple rows, use get_rows method instead" ) index = cast(Dict[Tuple[Any, ...], RowT], index_dict["index"]) return index[query] else: raise ValueError("Either row_num or row_id argument should be provided") def get_rows(self, filter: Dict[str, Any]) -> List[RowT]: """Returns Row objects by index query Args: filter: a dictionary or ordered dictionary with column title to value mappings in the same order as index was built. Index must be non-unique. Returns: Row object """ columns, query = zip(*sorted(filter.items())) index_dict = self.indexes.get(columns) if index_dict is None: raise exceptions.SmartsheetIndexNotFound( f"Index {columns} is not found, " f"build it first with build_index method" ) unique = index_dict["unique"] if unique: unique_index = cast(Dict[Tuple[Any, ...], RowT], index_dict["index"]) result = unique_index.get(query) if result is not None: return [result] else: return [] else: non_unique_index = cast( Dict[Tuple[Any, ...], List[RowT]], index_dict["index"] ) return non_unique_index.get(query, []) def get_column( self, column_title: Optional[str] = None, column_id: Optional[int] = None ) -> ColumnT: """Returns Column object by column title or ID Either column_title or column_id must be provided Args: column_title: column title (case-sensitive) column_id: column id Returns: Column object """ if column_title is not None: return self._column_title_to_column[column_title] elif column_id is not None: return self._column_id_to_column[column_id] else: raise ValueError( "Either column_title or column_id argument should be provided" ) def as_list(self) -> List[Dict[str, Union[float, str, datetime, None]]]: """Returns a list of dictionaries with column titles and cell values""" return [row.as_dict() for row in self.rows] @attr.s(auto_attribs=True, repr=False, kw_only=True) class Sheet(_SheetBase[Row, Column]): """Represents Smartsheet Sheet object Additional details about fields can be found here: http://smartsheet-platform.github.io/api-docs/#sheets Extra attributes: _row_тum_to_row: mapping of row number to Row object _row_id_to_row: mapping of row id to Row object _column_title_to_column: mapping of column title to Column object _column_id_to_column: mapping of column id to Column object _schema: reference to SheetSchema """ columns: List[Column] = cast(List[Column], attr.Factory(list)) rows: List[Row] = attr.Factory(list) def add_rows(self, rows: Sequence[Row]) -> Result: """Adds several rows to the smartsheet. Sheet must have smartsheet attribute set. It is automatically set when method Smartsheet.sheets.get() is used Every row must have either location-specifier attributes or row number set More details: http://smartsheet-platform.github.io/api-docs/#add-rows Args: rows: sequence of Row objects Returns: Result object """ if self.smartsheet is None: raise ValueError("To use this method, smartsheet attribute must be set") include_fields = ( "parent_id", "sibling_id", "above", "indent", "outdent", "to_bottom", "to_top", "expanded", "format", "cells.column_id", "cells.formula", "cells.value", "cells.hyperlink", "cells.link_in_from_cell", "cells.strict", "cells.format", "cells.image", "cells.override_validation", "locked", ) data = [] schema = RowSchema(only=include_fields) for row in rows: new_row = row.copy(deep=False) new_row.cells = [ cell for cell in row.cells if cell.value is not None or cell.formula is not None ] data.append(schema.dump(new_row)) result = cast( "Result", self.smartsheet.post(f"/sheets/{self.id}/rows", data=data) ) return result def add_row(self, row: Row) -> Result: """Adds a single row to the smartsheet. Sheet must have smartsheet attribute set. It is automatically set when method Smartsheet.sheets.get() is used A row must have either location-specifier attributes or row number set More details: http://smartsheet-platform.github.io/api-docs/#add-rows Args: row: Row object Returns: Result object """ return self.add_rows([row]) def update_rows(self, rows: Sequence[Row]) -> Result: """Updates several rows in the Sheet. Sheet must have smartsheet attribute set. It is automatically set when method Smartsheet.sheets.get() is used More details: http://smartsheet-platform.github.io/api-docs/#update-rows Args: rows: sequence of Row objects Returns: Result object """ if self.smartsheet is None: raise ValueError("To use this method, smartsheet attribute must be set") include_fields = ( "id", "parent_id", "above", "indent", "outdent", "to_bottom", "to_top", "expanded", "format", "cells.column_id", "cells.formula", "cells.value", "cells.hyperlink", "cells.link_in_from_cell", "cells.strict", "cells.format", "cells.image", "cells.override_validation", "locked", ) data = [] schema = RowSchema(only=include_fields) for row in rows: new_row = row.copy(deep=False) new_row.cells = [ cell for cell in row.cells if cell.value is not None or cell.formula is not None ] data.append(schema.dump(new_row)) result = cast( "Result", self.smartsheet.put(f"/sheets/{self.id}/rows", data=data) ) return result def update_row(self, row: Row) -> Result: """Updates a single row in the Sheet. Sheet must have smartsheet attribute set. It is automatically set when method Smartsheet.sheets.get() is used More details: http://smartsheet-platform.github.io/api-docs/#update-rows Args: row: Row object Returns: Result object """ return self.update_rows([row]) def delete_rows(self, row_ids: Sequence[int]) -> Result: """Deletes several rows in the Sheet. Rows are identified by ids. Args: row_ids: sequence of row ids Returns: Result object """ if self.smartsheet is None: raise ValueError("To use this method, smartsheet attribute must be set") endpoint = f"/sheets/{self.id}/rows" params = {"ids": ",".join(str(row_id) for row_id in row_ids)} return self.smartsheet.delete(endpoint, params=params) def delete_row(self, row_id: int) -> Result: """Deletes a single row in the Sheet specified by ID. Args: row_id: Row id Returns: Result object """ return self.delete_rows([row_id]) def sort_rows(self, order: List[Dict[str, Any]]) -> "Sheet": """Sorts rows in the sheet with the specified order Args: order: List of dictionaries containing column_title or column_id and (optional) descending bool (default is ascending). Example: [ {"column_title": "Birth date", "descending": True}, {"column_title": "Full Name"} ] Returns: Sheet object """ # TODO: add validation _schema for sorting order normalized_order = [] for item in order: normalized_item = {} if "column_id" in item: normalized_item["columnId"] = item["column_id"] elif "column_title" in item: column_title = item["column_title"] column = self.get_column(column_title) normalized_item["columnId"] = column.id else: raise ValueError( "Sorting key must have either column_id or column_title" ) descending = item.get("descending", False) if descending: normalized_item["direction"] = "DESCENDING" else: normalized_item["direction"] = "ASCENDING" normalized_order.append(normalized_item) data = {"sortCriteria": normalized_order} endpoint = f"/sheets/{self.id}/sort" if not self.smartsheet: raise ValueError("Can't use API because smartsheet attribute is not set") response = cast( Dict[str, Any], self.smartsheet.post(endpoint, data, result_obj=False) ) sheet = self.load(response) sheet.smartsheet = self.smartsheet return sheet def make_cell( self, column_title: str, field_value: Union[float, str, datetime, None] ) -> Cell: """Creates a Cell object for an existing column Args: column_title: title of an existing column field_value: value of the cell Returns: Cell object """ column = self.get_column(column_title) if column is None: raise ValueError( "A column with the title %s does not exist in this sheet", column_title ) cell = Cell(column_id=column.id, value=field_value) return cell def make_cells( self, fields: Dict[str, Union[float, str, datetime, None]] ) -> List[Cell]: """Create a list of Cell objects from dictionary Args: fields: dictionary where key is a column title and value is a cell value Returns: list of Cell objects """ result: List[Cell] = [] for column_title, field_value in fields.items(): result.append(self.make_cell(column_title, field_value)) return result def as_list(self) -> List[Dict[str, Union[float, str, datetime, None]]]: """Returns a list of dictionaries with column titles and cell values""" return [row.as_dict() for row in self.rows] class SheetCRUD(CRUD[Sheet]): base_url = "/sheets" factory = Sheet create_include_fields = ( "name", "columns.primary", "columns.title", "columns.type", "columns.auto_number_format", "columns.options", "columns.symbol", "columns.system_column_type", "columns.width", ) PK!`Kȹsimple_smartsheet/smartsheet.pyfrom typing import Optional, Dict, Any, Union import requests from simple_smartsheet import constants from simple_smartsheet import exceptions from simple_smartsheet.types import JSONType from simple_smartsheet.models.extra import Result from simple_smartsheet.models.report import ReportCRUD from simple_smartsheet.models.sheet import SheetCRUD class Smartsheet: """Smartsheet API class that provides a way to interact with Smartsheet objects. Attributes: token: Smartsheet API token, obtained in Personal Settings -> API access session: requests.Session object which stores headers based on the token sheets: SheetsCRUD object which provides methods to interact with Sheets """ API_HEADERS = {"Content-Type": "application/json", "Accept": "application/json"} def __init__(self, token: str) -> None: self.session = requests.Session() self.token = token self.sheets = SheetCRUD(self) self.reports = ReportCRUD(self) @property def token(self) -> str: return self._token @token.setter def token(self, value) -> None: self._token = value # when the token is changed, update headers too self._update_headers() def _update_headers(self) -> None: """Updates HTTP Headers with the token""" headers = {"Authorization": f"Bearer {self.token}", **self.API_HEADERS} self.session.headers.update(headers) @staticmethod def get_endpoint_url(endpoint: str) -> str: """Build a full API url based on the relative API path. For example: get_endpoint_url('/sheets') -> 'https://api.smartsheet.com/2.0/sheets' """ return constants.API_ROOT + endpoint def get( self, endpoint: str, path: Optional[str] = "data", params: Optional[Dict[str, Any]] = None, ) -> JSONType: """Performs HTTP GET on the endpoint Args: endpoint: relative API endpoint, for example '/sheets' path: in response extract the data under the specific path. Default - 'data'. Specify None if not needed params: HTTP query params dictionary Returns: JSON data from the response, under the specific key. """ url = self.get_endpoint_url(endpoint) response = self.session.get(url, params=params) if response.ok: if response.text: response_json = response.json() if path: if path in response_json: return response_json[path] raise AttributeError( f"Response from GET {url} does not contain key {path!r}" ) else: return response_json else: return {} else: raise exceptions.SmartsheetHTTPError.from_response(response) def post( self, endpoint: str, data: Optional[JSONType] = None, result_obj: bool = True ) -> Union[Result, Dict[str, Any], None]: """Performs HTTP POST on the endpoint Args: endpoint: relative API endpoint, for example '/sheets' data: dictionary or list with data that is going to be sent as JSON result_obj: whether to convert received JSON response to Result object Returns: Result object """ url = self.get_endpoint_url(endpoint) if data: response = self.session.post(url, json=data) else: response = self.session.post(url) if response.ok: if response.text: json_response = response.json() if result_obj: result = Result.load(json_response) return result else: return json_response else: return None else: raise exceptions.SmartsheetHTTPError.from_response(response) def put(self, endpoint: str, data: JSONType) -> Optional[Result]: """Performs HTTP PUT on the endpoint Args: endpoint: relative API endpoint, for example '/sheets' data: dictionary or list with data that is going to be sent as JSON Returns: Result object """ url = self.get_endpoint_url(endpoint) response = self.session.put(url, json=data) if response.ok: result = Result.load(response.json()) return result else: raise exceptions.SmartsheetHTTPError.from_response(response) def delete(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Result: """Performs HTTP DELETE on the endpoint Args: endpoint: relative API endpoint, for example '/sheets' params: HTTP query params dictionary Returns: Result object """ url = self.get_endpoint_url(endpoint) response = self.session.delete(url, params=params) if response.ok: result = Result.load(response.json()) return result else: raise exceptions.SmartsheetHTTPError.from_response(response) PK!u-simple_smartsheet/types.pyfrom typing import Union, Dict, List, Any, Tuple from mypy_extensions import TypedDict JSONType = Union[Dict[str, Any], List[Dict[str, Any]]] IndexKeysDict = TypedDict("IndexKeysDict", {"columns": Tuple[str, ...], "unique": bool}) IndexesKeysType = List[IndexKeysDict] IndexKeyType = Tuple[str, ...] IndexType = TypedDict( "IndexType", {"index": Dict[Tuple[Any, ...], Any], "unique": bool} ) IndexesType = Dict[IndexKeyType, IndexType] PK!7Csimple_smartsheet/utils.pyimport os from typing import Optional, Sequence, Any from marshmallow import EXCLUDE, RAISE def get_unknown_field_handling(debug: bool) -> str: if debug: return RAISE else: return EXCLUDE def is_env_var(env_var: str) -> bool: env_var_str = os.getenv(env_var, "").lower() return env_var_str in ("yes", "true", "y", "1") def is_debug() -> bool: return is_env_var("SIMPLE_SMARTSHEET_DEBUG") def create_repr(obj: Any, attrs: Optional[Sequence[str]] = None): if attrs is None: attrs = obj.__dict__.keys() attrs_repr = ", ".join(f"{attr}={getattr(obj, attr)!r}" for attr in attrs) return f"{obj.__class__.__qualname__}({attrs_repr})" PK!H]--)simple_smartsheet-0.2.0.dist-info/LICENSEMIT License Copyright (c) 2018 Dmitry Figol Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HڽTU'simple_smartsheet-0.2.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!HQ5*simple_smartsheet-0.2.0.dist-info/METADATAZmSH_1|@!! %ȅJHRneigj$_wH6ln<~egts/r/I.uZL( y_F^UMqvkPDn|, ~ %Ab W ɦ_Hb *LE&A0i)NìC/I^p޶%, L?3.2Dd&1q >RUUʀ0reɻɭj21,_Y R;w՚$}Jjm{ N' j%7=f4uZń+e v({^Zy͈KDHkJ *$2b6=/^rA@$z.&hV/Wjmа8+4^(ÓIo@g[w7@@@2gP 'κxUg&3i"b|F#HnJ=iP+HՒX. "⇹5d1A]ҵRB% kJM!h{l\.#8#dUst="R~ [3xIN6XJHdPI(WLe`D91\9.^l@e)&HUTڅ.U(| =Jdv ZC*@ cN@ݞ8K k&K dQinn4 q=ﭠ-ի,I4zD-˾Đ .a=hQKKve%F )_P3Pvm3>*SK2IARbMy~JfLq+tm髖g )oRx2^̦ELQiKo^vU`qW Pp2>"MHJc*&c ~ lЦMݩu>SR\:4%$}1:xƔl30#+F䞏s+Dzpk*m^~.j}t䷓^7Bz ~[jc?j,4Ph\h t1kYoᏭ=xk[[cGg'g#v^?}ދxd_ngwΫ;{nwC{a2_|ro/^yX>xuf[Lٖ`%5V `Ƨb[]K'G"Nx~cz=& 9_Oeo grEXX};EwMgs6:B^.GRMmӆ+-d1WP֊5Vdwjd]Zk,\mZieU:]K>Ʀ`Z_; -n1|1PG0ǹ7UW>:^=*k3;$H:M{qnk7p 7y[3 UJ*R3~L3鷻:r1Bpe$xRxN l1 p'sJ6Bift|bE J7nդޝյLKwU=~V jd?SGk<+Zd` r='^ |lBUve ѕݙq~Ի',ǝt_쑐ϑ8B:r2!M)Ǫ^ >`ɃzȰ6*ðv!яѡsfd1(ϤRh}P559V3t#(⿗{6=^вfEtܞħ8/ɜ{mbʑh_<֤4}\PPM\XkmSjDnsX{Le/DAlcX%\HZDge~,v]Ц@P"Ͻ{!\¾rroRO}rdecw-\޽%ND_Oeɕxhى_F+ۈQ;ghS\qC\U ;d6mF\P5?-c:G׀hZCC<>SX"J?! !(DŽPWUb\q߅/ɩc䡲1QNJT$qf_t[eE_6g{&nj=75fkl>NP@u?(OPK!FU>>simple_smartsheet/__init__.pyPK!c4 00ysimple_smartsheet/config.pyPK!J,,simple_smartsheet/constants.pyPK!PJsimple_smartsheet/crud.pyPK!`"?simple_smartsheet/exceptions.pyPK!+$ simple_smartsheet/models/__init__.pyPK!t !simple_smartsheet/models/base.pyPK!QII -simple_smartsheet/models/cell.pyPK!4k "<simple_smartsheet/models/column.pyPK!6C||!Fsimple_smartsheet/models/extra.pyPK!J>  "NNsimple_smartsheet/models/report.pyPK!p7Xsimple_smartsheet/models/row.pyPK!i8)HH!nsimple_smartsheet/models/sheet.pyPK!`Kȹsimple_smartsheet/smartsheet.pyPK!u-simple_smartsheet/types.pyPK!7Csimple_smartsheet/utils.pyPK!H]--)xsimple_smartsheet-0.2.0.dist-info/LICENSEPK!HڽTU'simple_smartsheet-0.2.0.dist-info/WHEELPK!HQ5*simple_smartsheet-0.2.0.dist-info/METADATAPK!H$R(bsimple_smartsheet-0.2.0.dist-info/RECORDPK$K