PK!2nppmemory_tempfile/__init__.pyfrom memory_tempfile.memory_tempfile import MemoryTempfile, MEM_BASED_FS, SUITABLE_PATHS __version__ = '0.1.0' PK!*\ܰ"memory_tempfile/memory_tempfile.pyimport os import sys import tempfile import platform from collections import OrderedDict MEM_BASED_FS = ['tmpfs', 'ramfs'] SUITABLE_PATHS = ['/tmp', '/run/user/{uid}', '/run/shm', '/dev/shm'] class MemoryTempfile: def __init__(self, preferred_paths=None, remove_paths=None, additional_paths=None, filesystem_types: list = None, fallback=None): self.os_tempdir = tempfile.gettempdir() if isinstance(fallback, bool): self.fallback = self.os_tempdir if fallback else None else: self.fallback = fallback self.tempdir = self.fallback if platform.platform() == "Linux": self.filesystem_types = filesystem_types if filesystem_types is not None else MEM_BASED_FS preferred_paths = [] if preferred_paths is None else preferred_paths remove_paths = [] if remove_paths is None else remove_paths additional_paths = [] if additional_paths is None else additional_paths self.suitable_paths = list(preferred_paths) \ + list(self.os_tempdir) \ + [i for i in SUITABLE_PATHS if i not in remove_paths] \ + list(additional_paths) uid = os.geteuid() with open('/proc/self/mountinfo', 'r') as file: mnt_info = {i[2]: i for i in [line.split() for line in file]} self.usable_paths = OrderedDict() for path in self.suitable_paths: path = path.replace('{uid}', str(uid)) # We may have repeated if self.usable_paths.get(path) is not None: continue self.usable_paths[path] = False try: dev = os.stat(path).st_dev major, minor = os.major(dev), os.minor(dev) mp = mnt_info.get("{}:{}".format(major, minor)) if mp and mp[8] in self.filesystem_types: self.usable_paths[path] = mp except FileNotFoundError: pass for k, v in self.usable_paths: if not v: del self.usable_paths[k] if len(self.usable_paths) > 0: self.tempdir = self.usable_paths.keys()[0] else: if fallback: self.tempdir = self.fallback else: raise RuntimeError('No memory temporary dir found and fallback is disabled.') def found_mem_tempdir(self): return len(self.usable_paths) > 0 def using_mem_tempdir(self): return self.tempdir in self.usable_paths def get_usable_mem_tempdir_paths(self): return self.usable_paths.copy() def gettempdir(self): return self.tempdir def gettempdirb(self): return self.tempdir.encode(sys.getfilesystemencoding(), 'surrogateescape') def mkdtemp(self, suffix=None, prefix=None, dir=None): return tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=self.tempdir if not dir else dir) def mkstemp(self, suffix=None, prefix=None, dir=None, text=False): return tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=self.tempdir if not dir else dir, text=text) def TemporaryDirectory(self, suffix=None, prefix=None, dir=None): return tempfile.TemporaryDirectory(suffix=suffix, prefix=prefix, dir=self.tempdir if not dir else dir) def SpooledTemporaryFile(self, max_size=0, mode='w+b', buffering=None, encoding=None, newline=None, suffix=None, prefix=None, dir=None): return tempfile.SpooledTemporaryFile(max_size=max_size, mode=mode, buffering=buffering, encoding=encoding, newline=newline, suffix=suffix, prefix=prefix, dir=self.tempdir if not dir else dir) def NamedTemporaryFile(self, mode='w+b', buffering=None, encoding=None, newline=None, suffix=None, prefix=None, dir=None, delete=True): return tempfile.NamedTemporaryFile(mode=mode, buffering=buffering, encoding=encoding, newline=newline, suffix=suffix, prefix=prefix, dir=self.tempdir if not dir else dir, delete=delete) def TemporaryFile(self, mode='w+b', buffering=None, encoding=None, newline=None, suffix=None, prefix=None, dir=None): return tempfile.TemporaryFile(mode=mode, buffering=buffering, encoding=encoding, newline=newline, suffix=suffix, prefix=prefix, dir=self.tempdir if not dir else dir) def gettempprefix(self): return tempfile.gettempdir() def gettempprefixb(self): return tempfile.gettempprefixb() PK!X#'memory_tempfile-2.1.0.dist-info/LICENSECC0 LICENSE =========== To the extent possible under law, Marcelo Bello has waived all copyright and related or neighboring rights to template-regexp-processor. This work is published from: Brasil. PK!HڽTU%memory_tempfile-2.1.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H7xl{(memory_tempfile-2.1.0.dist-info/METADATAX]oF}篸@$"8+:v7-oXHSvfhY({ÊEXyι\rT͔NɿUTںutɾIvuuzJujGE,{ L`5&kjU(=نB L=TXG]hUj:+sڄP 5-VAY:mi!~:,M(y몲B7GϏS.^Y6?G7ハbNչI&5T쒳.֡M1յ2t[] ɭ3NfchO#Jyo ἯR*oW#YЅE.tn׍KSfIWYv/;)_>'~H`^jJnwB3-&ljs]5I򆾠YOFK;(@h<)j4 :S^=i<4р?3WK\!v &|F_j m'nM@V.,ţg9ʃ'Rk Kcv M}p,!hVʡ#GG&i+g~\Q\_ $Q] MA@p3@zI.}W2tM]jQehC" c88%- hI[ەp5xzB̰I['jmt,CÏÚ:Eј?I(H6 $/8y%flgt)%hcaw$"uSaSP$e@o}tޟnRNd> Nњۧ[V8xy)J>ڴ n'm4dzO?3!`%7}ps6|u5#@S4O'_d=C̑ ^@RLb JqC̒˂z$yٴar9L4s0cK7 +@+.{9:|R[xxHHmL?9I:i`7zVM&҃xVn-yq OUKG7OpXH| VyȪH)a\.RU?J.Pu9j.p?Y6yZ`>Ղ;YZ=U[xQf{EXU!9,pև~LBl,f"{h'xC$FQv/ ˯p3m,f,f\X$=˲ɞɾ߮Xع|K"Tjr3W?^ #y",Xfk<`ONcLuN~ʏ_k7@끧6kwΦrp`ӹxPH$ "BrQ0\\ ~*Zn~Yi ̶xPQ? qY< NY ŭlGGmF-C Mb[=?d?DY]]2U`Qhkr #?@LKd(xKΛtH&#| b2G241h N0{ˆaL%؛CtLfXSY핰.8,Ѽ_K; Z!HŽ,N:n"f J `z'J\fPK!2nppmemory_tempfile/__init__.pyPK!*\ܰ"memory_tempfile/memory_tempfile.pyPK!X#'memory_tempfile-2.1.0.dist-info/LICENSEPK!HڽTU%memory_tempfile-2.1.0.dist-info/WHEELPK!H7xl{(8memory_tempfile-2.1.0.dist-info/METADATAPK!HA*&hmemory_tempfile-2.1.0.dist-info/RECORDPK