PK!]0 flake8_pie.pyfrom typing import Optional, List, NamedTuple, Iterable from functools import partial import ast class ErrorLoc(NamedTuple): """ location of the lint infraction """ lineno: int col_offset: int message: str type: "Flake8PieCheck" class Flake8PieVisitor(ast.NodeVisitor): def __init__(self) -> None: self.errors: List[ErrorLoc] = [] def visit_FunctionDef(self, node: ast.FunctionDef) -> None: """ run checker function and track error if found """ error = is_assign_and_return(node) if error: self.errors.append(error) error = is_celery_task_missing_name(node) if error: self.errors.append(error) self.generic_visit(node) def visit_Call(self, node: ast.Call) -> None: error = is_loose_crontab_call(node) if error: self.errors.append(error) error = is_celery_apply_async_missing_expires(node) if error: self.errors.append(error) self.generic_visit(node) def visit_Dict(self, node: ast.Dict) -> None: error = is_celery_task_missing_expires(node) if error: self.errors.append(error) self.generic_visit(node) def visit_JoinedStr(self, node: ast.JoinedStr) -> None: error = is_pointless_f_string(node) if error: self.errors.append(error) self.generic_visit(node) def __repr__(self) -> str: return f"<{self.__class__.__name__}: errors={self.errors}>" def is_pointless_f_string(node: ast.JoinedStr) -> Optional[ErrorLoc]: for value in node.values: if isinstance(value, ast.FormattedValue): return None return PIE782(lineno=node.lineno, col_offset=node.col_offset) def get_assign_target_id(stmt: ast.stmt) -> Optional[str]: """ We can have two types of assignments statements: - ast.Assign: usual assignment - ast.AnnAssign: assignment with a type hint Here we check accordingly and return the `id`. """ if ( isinstance(stmt, ast.Assign) and len(stmt.targets) == 1 and isinstance(stmt.targets[0], ast.Name) ): return stmt.targets[0].id elif isinstance(stmt, ast.AnnAssign) and isinstance(stmt.target, ast.Name): return stmt.target.id return None def is_assign_and_return(func: ast.FunctionDef) -> Optional[ErrorLoc]: """ check a FunctionDef for assignment and return where a user assigns to a variable and returns that variable instead of just returning """ # assign and return can only occur with at least two statements if len(func.body) >= 2: return_stmt = func.body[-1] if isinstance(return_stmt, ast.Return) and isinstance( return_stmt.value, ast.Name ): assign_stmt = func.body[-2] assign_id = get_assign_target_id(assign_stmt) if return_stmt.value.id == assign_id: return PIE781( lineno=return_stmt.lineno, col_offset=return_stmt.col_offset ) return None def has_name_kwarg(dec: ast.Call) -> bool: return all(k.arg != "name" for k in dec.keywords) def is_celery_task_missing_name(func: ast.FunctionDef) -> Optional[ErrorLoc]: """ check if a Celery task definition is missing an explicit name. """ if func.decorator_list: for dec in func.decorator_list: if isinstance(dec, ast.Call): if isinstance(dec.func, ast.Name): if dec.func.id == "shared_task": if has_name_kwarg(dec): return PIE783(lineno=dec.lineno, col_offset=dec.col_offset) if isinstance(dec.func, ast.Attribute): if dec.func.attr == "task": if has_name_kwarg(dec): return PIE783(lineno=dec.lineno, col_offset=dec.col_offset) return None # from: github.com/celery/celery/blob/0736cff9d908c0519e07babe4de9c399c87cb32b/celery/schedules.py#L403 CELERY_ARG_MAP = dict(minute=0, hour=1, day_of_week=2, day_of_month=3, month_of_year=4) CELERY_LS = ["minute", "hour", "day_of_week", "day_of_month", "month_of_year"] def is_invalid_celery_crontab(*, kwargs: List[ast.keyword]) -> bool: keyword_args = {k.arg for k in kwargs if k.arg is not None} if not keyword_args: return True largest_index = max( (CELERY_ARG_MAP[k] for k in keyword_args if CELERY_ARG_MAP.get(k)), default=0 ) for key in CELERY_LS[:largest_index]: if key not in keyword_args: return True return False def is_loose_crontab_call(call: ast.Call) -> Optional[ErrorLoc]: """ require that a user pass all time increments that are smaller than the highest one they specify. e.g., user passes day_of_week, then they must pass hour and minute """ if isinstance(call.func, ast.Name): if call.func.id == "crontab": if is_invalid_celery_crontab(kwargs=call.keywords): return PIE784(lineno=call.lineno, col_offset=call.col_offset) return None def is_celery_dict_task_definition(dict_: ast.Dict) -> bool: """ determine whether the Dict is a Celery task definition """ celery_task_dict_target_keys = {"task", "schedule"} # We are looking for the `task` and `schedule` keys that all celery tasks # configured via a Dict have if len(dict_.keys) >= 2: for key in dict_.keys: if isinstance(key, ast.Str): if key.s in celery_task_dict_target_keys: celery_task_dict_target_keys.remove(key.s) if not celery_task_dict_target_keys: return True return len(celery_task_dict_target_keys) == 0 CELERY_OPTIONS_KEY = "options" CELERY_EXPIRES_KEY = "expires" def is_celery_task_missing_expires(dict_: ast.Dict) -> Optional[ErrorLoc]: """ ensure that celery tasks have an `expires` arg """ if is_celery_dict_task_definition(dict_): for key, value in zip(dict_.keys, dict_.values): if isinstance(key, ast.Str) and key.s == CELERY_OPTIONS_KEY: # check that options value, a dict, has `expires` key if isinstance(value, ast.Dict): for k in value.keys: if isinstance(k, ast.Str) and k.s == CELERY_EXPIRES_KEY: return None return PIE785(lineno=value.lineno, col_offset=value.col_offset) return PIE785(lineno=dict_.lineno, col_offset=dict_.col_offset) return None CELERY_APPLY_ASYNC = "apply_async" def is_celery_apply_async_missing_expires(node: ast.Call) -> Optional[ErrorLoc]: """ ensure foo.apply_async() is given an expiration """ if isinstance(node.func, ast.Attribute) and node.func.attr == CELERY_APPLY_ASYNC: for k in node.keywords: if k.arg == CELERY_EXPIRES_KEY: return None return PIE785(lineno=node.lineno, col_offset=node.col_offset) return None class Flake8PieCheck: name = "flake8-pie" version = "0.4.1" def __init__(self, tree: ast.Module) -> None: self.tree = tree def run(self) -> Iterable[ErrorLoc]: visitor = Flake8PieVisitor() visitor.visit(self.tree) yield from visitor.errors PIE781 = partial( ErrorLoc, message="PIE781: You are assigning to a variable and then returning. Instead remove the assignment and return.", type=Flake8PieCheck, ) PIE782 = partial( ErrorLoc, message="PIE782: Unnecessary f-string. You can safely remove the `f` prefix.", type=Flake8PieCheck, ) PIE783 = partial( ErrorLoc, message="PIE783: Celery tasks should have explicit names.", type=Flake8PieCheck, ) PIE784 = partial( ErrorLoc, message="PIE784: Celery crontab is missing explicit arguments.", type=Flake8PieCheck, ) PIE785 = partial( ErrorLoc, message="PIE785: Celery tasks should have expirations.", type=Flake8PieCheck, ) PK!H[,2+flake8_pie-0.4.1.dist-info/entry_points.txtNINK(I+ϋ tdZ\\PK!HڽTU flake8_pie-0.4.1.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H;x#flake8_pie-0.4.1.dist-info/METADATAWmo_1XIKNQ58(B#\vw[%yS"v}yf^)Ӯ,9>Fb#_ǝe vX$D˖E“j:-zGryBZwgȸwnuNrӤn]u}rXoR/r[9%.z_k/%QU+0PzE~(U0;(++]|5@}*6T$+kZ΅V+43Zk{! %\ҦCdXwtuCurT~U[ Q^ ٱ_/~eMeEӰ{V=*cs埔K4;и1ڔ~+3t\oJo2߯Saey#]nU-C||3t@RSwSmE{(?^*ky'<*xO%x=vCuTS%VR.Q&=|[X=tE?w@L>iD d,UV /- xwϻR<`kzE(B(P(Q80zqU /\ti s7){D흏54 gyP(6U >>4 : G7p=h6kM?!&:;޸1vf>sԪEx 5(Rmi(Vm#,I [C'ځ֦1<}ߦ9cz`b1$xj,˺0iQ!K*9Y""zsZ {:leT|Iw?{h?2 Q8z]ǐG@k{13OmQz4$^p,T!Pߣ%LO"}T~;j} .aBU،V?~FjہvsZ9UP3 s ,a',j$T0 \ +&qNI~ ~]7y|'Q=?¿w+JG~زgZtÌ: g'~(~ \-Y-(r}gvD+/L ]w؜b~N${~ryzvnCm@D0ڨUU=֘vI|; @czxt35B 7;4)r F-QZՃ4Vy}3!k$*^E'G7S dNT8mo٧ 1!0 -ha}Bi\@5rmLC 鈲B Δw[)7rݬJ 9̅gcfa7y{Zbv Cȋ]b( lj#27㦌[g$TrqB3X#_EIa:8` -6.L%y b\i52Ps^_8BM}Ǒ3Te&Mt01$ȩ&򄘶T*A.KMznhsR($ 9`^q)f{ؒЄP >0%vhќ?r @z5E{b+)?+1eKC@"'4/y} A#y-]3pKmā8P鑟̤j#ޏp>]'!{l_)y&, 9ŜxhKq+@cRUJˑ! hmL