PK!E None: self.url = url self.channel = channel self.username = username self.icon_emoji = icon_emoji self.timeout = timeout def __repr__(self): return ('Alerter for channel {}' .format(self.channel if self.channel else '(default)')) def critical(self, message: str, title: str = None, title_link: str = None) -> None: """Preformated critical message.""" self._send(attachments( attachment(fallback='Critical: {}'.format(message), text=message, title=title, title_link=title_link, color='danger') )) def warning(self, message: str, title: str = None, title_link: str = None) -> None: """Preformated warning message.""" self._send(attachments( attachment(fallback='Warning: {}'.format(message), text=message, title=title, title_link=title_link, color='warning') )) def info(self, message: str, title: str = None, title_link: str = None) -> None: """Preformated warning message.""" self._send(attachments( attachment(fallback='Info: {}'.format(message), text=message, title=title, title_link=title_link, color='#CCE5FF') )) def good(self, message: str, title: str = None, title_link: str = None) -> None: """Preformated ok message.""" self._send(attachments( attachment(fallback='Good: {}'.format(message), text=message, title=title, title_link=title_link, color='good') )) def custom(self, *args) -> None: """Custom message with elements.""" self._send(attachments(*args)) def _send(self, *args: Dict[str, str]) -> None: """Send the request to the Slack webhook.""" try: payload = {} payload.update({'channel': self.channel} if self.channel else {}) payload.update( {'username': self.username} if self.username else {}) payload.update( {'icon_emoji': self.icon_emoji} if self.icon_emoji else {}) payload = merge_dicts(payload, *args) r = requests.post(self.url, json=payload, timeout=self.timeout) except InvalidPayload: raise InvalidPayload('One of the args is not a valid dictionary') except (requests.exceptions.RequestException, requests.exceptions.ConnectionError, requests.exceptions.HTTPError, requests.exceptions.URLRequired, requests.exceptions.TooManyRedirects, requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout, requests.exceptions.Timeout) as e: raise CouldNotSendAlert('Request errored: {}' .format(e)) if not r.text == 'ok': raise CouldNotSendAlert('Invalid response: {}' .format(r.text)) PK!nslack_alerts/elements.pyfrom typing import Dict, List, Union def text(message: str) -> Dict[str, str]: """Simple text element""" return {'text': message} def attachments(*args): """Simple attachments base element containing multiple attachment""" return {'attachments': [*args]} def attachment(fallback: str, color: str = None, pretext: str = None, author_name: str = None, author_link: str = None, author_icon: str = None, title: str = None, title_link: str = None, text: str = None, fields: List[Dict[str, str]] = None, image_url: str = None, thumb_url: str = None, footer: str = None, footer_icon: str = None, ts: int = None ) -> Dict[str, Union[str, List[Dict[str, str]]]]: """Attachement element""" element = {} element.update({'fallback': fallback} if fallback else {}) element.update({'color': color} if color else {}) element.update({'author_name': author_name} if author_name else {}) element.update({'author_link': author_link} if author_link else {}) element.update({'author_icon': author_icon} if author_icon else {}) element.update({'title': title} if title else {}) element.update({'title_link': title_link} if title_link else {}) element.update({'text': text} if text else {}) element.update({'fields': fields} if fields else {}) element.update({'image_url': image_url} if image_url else {}) element.update({'thumb_url': thumb_url} if thumb_url else {}) element.update({'footer': footer} if footer else {}) element.update({'footer_icon': footer_icon} if footer_icon else {}) element.update({'ts': ts} if ts else {}) return element PK!4%slack_alerts/exceptions.pyclass CouldNotSendAlert(Exception): """ Raised when sending the alert failed. """ class InvalidPayload(Exception): """ Raised when the payload is not valid. """ PK!?k-__slack_alerts/utils.pyfrom typing import Dict from slack_alerts.exceptions import InvalidPayload def merge_dicts(*args: Dict[str, str]) -> Dict[str, str]: merged_dict = {} try: for arg in args: merged_dict.update(arg) return merged_dict except (ValueError, TypeError): raise InvalidPayload('Could not merge dictionaries') PK!HnHTU"slack_alerts-1.0.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H#Q %slack_alerts-1.0.0.dist-info/METADATAVmo6_qE9N-)tlN%FǰE0X%R#)__#%sѥ^xwИƔRC4c-)M2!YFղ)' I ȵ̘&3n"#p&eʤ`31h5-uEDy(7+תR Wze-xbd&raّ1io`[pi'1z"ZG Ms%,=\1MaT)ҋ&H)5L>h?)9S4ˬWT ,دK8}]rmZpɴѰ9me ,2xЂKLs%S1[T׽Y\ɘRs#;4K#sKл [o1= l|X> B >[; 7^wO' 99C/;`az}7A|h*GAc؁ڦ(ة6."K51x6b3I%*H5$j:1L03Н#|URsYjT5S!+%?iR},g~__ʠPi1 h_XMb5 sZW(li8Z} :=QBޣ R\4ByFψ; 1ChOR2__-2|W =聈 " ,N|nѵc2.Վ x^nG1& v |uP0>@5)\@sؖ>tpGmT}FVS(Z]x^mCV̪- V>t~tVF2F4LV"l *I, !FNX\?%`UdT E;"{Փ|0x,0EE Jކ(Vj W̱@!Qmn8m ^C̨P8IKi <:>}ɢӺtO5E\D(cNEqSf ˈT3=}vIka*B_Y+j@4LY4cЙEF|*غezŴڴMsbP0h2˵>>:Ns۝L.MtS C16m:<%rqm.XJJTeTHt<Ĭm"pspqҷre4)e>k "Ku Bb l X`X3!)2aeAXڱ ז ʊsZڗ ZKnQV~ H}aSh-st/p%0 }_ dMHK$@$Ͼ{$;6Q0W@fMd]ߊsx2|X"8Ƥ' wu׉*Ap:k1ʀz);XwY9XR4uSIf,I* $@|D*랱Nyצi;׳}g$gYÄS\O0ɸ)αw |_v> D-nj.]q걋}Ĺ% ]L}PK!E