PK"NiswXXpytest_docker_tools/__init__.py''' An opionated set of helpers for defining Docker integration test environments with py.test fixtures. ''' from .factories import build, container, fetch, image, image_or_build, network, volume __version__ = '0.2.0' __all__ = [ 'build', 'container', 'fetch', 'image', 'image_or_build', 'network', 'volume', ] PKmLK__pytest_docker_tools/builder.pyimport textwrap from .templates import find_fixtures_in_params, resolve_fixtures_in_params def build_fixture_function(callable, scope, wrapper_class, kwargs): name = callable.__name__ docstring = getattr(callable, '__doc__', '').format(**kwargs) fixtures = find_fixtures_in_params(kwargs).union(set(('request', 'docker_client'))) fixtures_str = ','.join(fixtures) template = textwrap.dedent(f''' import pytest @pytest.fixture(scope=scope) def {name}({fixtures_str}): \'\'\' {docstring} \'\'\' real_kwargs = resolve_fixtures_in_params(request, kwargs) return _{name}(request, docker_client, wrapper_class=wrapper_class, **real_kwargs) ''') globals = { 'resolve_fixtures_in_params': resolve_fixtures_in_params, f'_{name}': callable, 'kwargs': kwargs, 'scope': scope, 'wrapper_class': wrapper_class, } exec(template, globals) return globals[name] def fixture_factory(scope='function'): wrapper_class = None def inner(callable): def factory(*, scope=scope, wrapper_class=wrapper_class, **kwargs): return build_fixture_function(callable, scope, wrapper_class, kwargs) factory.__name__ = callable.__name__ factory.__doc__ = getattr(callable, '__doc__', '') return factory return inner PK`fM+44!pytest_docker_tools/exceptions.pyclass TimeoutError(Exception): pass class ContainerError(Exception): def __init__(self, container, *args, **kwargs): self._container = container super().__init__(*args, **kwargs) class ContainerFailed(ContainerError): pass class ContainerNotReady(ContainerError): pass PK`fM^݆pytest_docker_tools/plugin.pyimport docker import pytest from .exceptions import ContainerError from .wrappers import Container @pytest.fixture(scope='session') def docker_client(request): ''' A Docker client configured from environment variables ''' return docker.from_env() @pytest.hookimpl(tryfirst=True, hookwrapper=True) def pytest_runtest_makereport(item, call): ''' This hook allows Docker containers to contribute their logs to the py.test report. ''' outcome = yield rep = outcome.get_result() if not rep.failed: return if call.excinfo and isinstance(call.excinfo.value, ContainerError): container = call.excinfo.value._container rep.sections.append((container.name, container.logs())) if 'request' not in item.funcargs: return for name, fixturedef in item.funcargs['request']._fixture_defs.items(): if not hasattr(fixturedef, 'cached_result'): continue fixture = fixturedef.cached_result[0] if isinstance(fixture, Container): rep.sections.append(( name + ': ' + fixture.name, fixture.logs(), )) PKLs~33 pytest_docker_tools/templates.py''' # Template handling Fixture factories can take static strings: ```python redis = container( image='redis:latest', ) ``` But that is not useful when building multiple containers that need to reference one another or you need to parameterize the fixture. This module provides two facilities: The ability to reference fixtures using python string template notation and the ability to know what fixtures this will fetch in at test collection (and generation) time. For example: ``` def test_simple_resolve(request): # These are parameters declared at import time - they can be reevaluated in the context of multiple tests kwargs = { 'somekey': ['{pytestconfig.getoption("verbose")}'], } # This can be used in a fixture factory to fill in the templates resolved = resolve_fixtures_in_params(request, kwargs) # And then the test can access them pytestconfig = request.getfixturevalue('pytestconfig') assert resolved['somekey'][0] == str(pytestconfig.getoption("verbose")) } ``` In order to make fixtures generated by a fixture factory more seamless we need to know a fixtures dependencies at collection time. We have a helper to find them: def test_simple_find(): # These are parameters declared at import time - they can be reevaluated in the context of multiple tests kwargs = { 'somekey': ['{pytestconfig.getoption("verbose")}'], } dependencies = find_fixtures_in_params(kwargs) assert dependencies = set('pytestconfig') ''' import inspect from string import Formatter __all__ = [ 'find_fixtures_in_params', 'resolve_fixtures_in_params', ] class FixtureFormatter(Formatter): def __init__(self, request): self.request = request def get_value(self, key, args, kwargs): return self.request.getfixturevalue(key) class Renderer(object): def __init__(self, request): self.request = request def visit_value(self, val): if isinstance(val, str): return FixtureFormatter(self.request).format(val) elif callable(val): return val(*[self.request.getfixturevalue(f) for f in inspect.getargspec(val)[0]]) return val def visit_list(self, val): return [self.visit(v) for v in val] def visit_dict(self, mapping): return {self.visit(k): self.visit(v) for (k, v) in mapping.items()} def visit(self, value): if isinstance(value, dict): return self.visit_dict(value) elif isinstance(value, list): return self.visit_list(value) elif value: return self.visit_value(value) class FixtureFinder(object): def visit_value(self, val): if isinstance(val, str): for literal_text, format_spec, conversion, _ in Formatter().parse(val): if format_spec: yield format_spec.split('.')[0].split('[')[0] elif callable(val): yield from inspect.getargspec(val)[0] def visit_list(self, val): for v in val: yield from self.visit(v) def visit_dict(self, mapping): for k, v in mapping.items(): yield from self.visit(k) yield from self.visit(v) def visit(self, value): if isinstance(value, dict): yield from self.visit_dict(value) elif isinstance(value, list): yield from self.visit_list(value) elif value: yield from self.visit_value(value) def find_fixtures_in_params(value): ''' Walk an object and identify fixtures references in templates in strings. ''' finder = FixtureFinder() return set(finder.visit(value)) def resolve_fixtures_in_params(request, value): ''' Walk an object and resolve fixture values referenced in template strings. ''' renderer = Renderer(request) return renderer.visit(value) PKojfMY;;pytest_docker_tools/utils.pyimport os import sys import time from .exceptions import TimeoutError def wait_for_callable(message, callable, timeout=30): ''' Runs a callable once a second until it returns True or we hit the timeout. ''' sys.stdout.write(message) try: for i in range(timeout): sys.stdout.write('.') sys.stdout.flush() if callable(): return time.sleep(1) finally: sys.stdout.write('\n') raise TimeoutError(f'Timeout of {timeout}s exceeded') def tests_inside_container(): ''' Returns True if tests are running inside a Linux container ''' if not os.path.exists('/proc/1/sched'): return False with open('/proc/1/sched', 'r') as fp: line1 = fp.read().split('\n')[0] # Right now this file contains a header like this which leaks the actual pid # systemd (1, #threads: 1) # If its not '1' we have detected containment init, info = line1.split(' ', 1) pid, threads = info.strip('(').rstrip(')').split(', ', 1) return pid != '1' PKFL./pytest_docker_tools/contexts/scratch/Dockerfile# For volume seeding we want a near-empty volune, You can't 'docker create scratch' # and you can have an iamge with just `FROM scratch`. FROM scratch CMD ["/bin/sh"] PKcfMֶJJ)pytest_docker_tools/factories/__init__.pyfrom .build import build from .container import container from .fetch import fetch from .image import image from .image_or_build import image_or_build from .network import network from .volume import volume __all__ = [ 'build', 'container', 'fetch', 'image', 'image_or_build', 'network', 'volume', ] PKdLOm&pytest_docker_tools/factories/build.pyimport sys from pytest_docker_tools.builder import fixture_factory @fixture_factory(scope='session') def build(request, docker_client, wrapper_class, **kwargs): ''' Docker image: built from "{path}" ''' # The docker build command now defaults to --rm=true, but docker-py doesnt # Let's do what docker build does by default kwargs.setdefault('rm', True) sys.stdout.write(f'Building {kwargs["path"]}') try: image, logs = docker_client.images.build(**kwargs) for line in logs: sys.stdout.write('.') sys.stdout.flush() finally: sys.stdout.write('\n') # request.addfinalizer(lambda: docker_client.images.remove(image.id)) wrapper_class = wrapper_class or (lambda image: image) return wrapper_class(image) PK`fM+zz*pytest_docker_tools/factories/container.pyfrom pytest_docker_tools.builder import fixture_factory from pytest_docker_tools.exceptions import ContainerNotReady, TimeoutError from pytest_docker_tools.utils import wait_for_callable from pytest_docker_tools.wrappers import Container @fixture_factory() def container(request, docker_client, wrapper_class, **kwargs): ''' Docker container: image={image} ''' kwargs.update({'detach': True}) raw_container = docker_client.containers.run(**kwargs) request.addfinalizer(lambda: raw_container.remove(force=True) and raw_container.wait(timeout=10)) wrapper_class = wrapper_class or Container container = wrapper_class(raw_container) try: wait_for_callable('Waiting for container to be ready', container.ready) except TimeoutError: raise ContainerNotReady(container, 'Timeout while waiting for container to be ready') return container PKaLL"&pytest_docker_tools/factories/fetch.pyimport sys from pytest_docker_tools.builder import fixture_factory @fixture_factory(scope='session') def fetch(request, docker_client, wrapper_class, **kwargs): ''' Docker image: Fetched from {repository} ''' sys.stdout.write(f'Fetching {kwargs["repository"]}\n') image = docker_client.images.pull(**kwargs) # request.addfinalizer(lambda: docker_client.images.remove(image.id)) wrapper_class = wrapper_class or (lambda image: image) return wrapper_class(image) PKhfM:ff&pytest_docker_tools/factories/image.pyfrom pytest_docker_tools.builder import fixture_factory @fixture_factory(scope='session') def image(request, docker_client, wrapper_class, **kwargs): ''' Docker image: named "{name}" (already available) ''' image = docker_client.images.get(kwargs['name']) wrapper_class = wrapper_class or (lambda image: image) return wrapper_class(image) PKnfM~/n/pytest_docker_tools/factories/image_or_build.pyimport os from .build import build from .image import image def image_or_build(environ_key, **kwargs): if environ_key in os.environ: return image(name=os.environ[environ_key]) return build(**kwargs) PK]L{u(pytest_docker_tools/factories/network.pyimport uuid from pytest_docker_tools.builder import fixture_factory @fixture_factory() def network(request, docker_client, wrapper_class, **kwargs): ''' Docker network ''' name = kwargs.pop('name', 'pytest-{uuid}').format(uuid=str(uuid.uuid4())) print(f'Creating network {name}') network = docker_client.networks.create(name, **kwargs) request.addfinalizer(lambda: network.remove()) wrapper_class = wrapper_class or (lambda network: network) return wrapper_class(network) PKXLc'pytest_docker_tools/factories/volume.pyimport io import os import tarfile import uuid from pytest_docker_tools.builder import fixture_factory def _populate_volume(docker_client, volume, seeds): fp = io.BytesIO() tf = tarfile.open(mode="w:gz", fileobj=fp) for path, contents in seeds.items(): ti = tarfile.TarInfo(path) if contents is None: ti.type = tarfile.DIRTYPE tf.addfile(ti) else: ti.size = len(contents) tf.addfile(ti, io.BytesIO(contents)) tf.close() fp.seek(0) image, logs = docker_client.images.build( path=os.path.join(os.path.dirname(__file__), '..', 'contexts/scratch'), rm=True, ) list(logs) container = docker_client.containers.create( image=image.id, volumes={ f'{volume.name}': {'bind': '/data'}, }, ) try: container.put_archive('/data', fp) finally: container.remove(force=True) @fixture_factory() def volume(request, docker_client, wrapper_class, **kwargs): ''' Docker volume ''' name = kwargs.pop('name', 'pytest-{uuid}').format(uuid=str(uuid.uuid4())) seeds = kwargs.pop('initial_content', {}) print(f'Creating volume {name}') volume = docker_client.volumes.create(name, **kwargs) request.addfinalizer(lambda: volume.remove(True)) if seeds: _populate_volume(docker_client, volume, seeds) wrapper_class = wrapper_class or (lambda volume: volume) return wrapper_class(volume) PKU_L'kBB(pytest_docker_tools/wrappers/__init__.pyfrom .container import Container __all__ = [ 'Container', ] PKg"Nl)pytest_docker_tools/wrappers/container.py''' This module contains a wrapper that adds some helpers to a Docker Container object that are useful for integration testing. ''' import io import tarfile from pytest_docker_tools.exceptions import ( ContainerFailed, ContainerNotReady, TimeoutError, ) from pytest_docker_tools.utils import tests_inside_container, wait_for_callable class _Map(object): def __init__(self, container): self._container = container def values(self): return [self[k] for k in self.keys()] def items(self): return [(k, self[k]) for k in self.keys()] def __iter__(self): return iter(self.keys()) class IpMap(_Map): @property def primary(self): return next(iter(self.values())) def keys(self): return self._container.attrs['NetworkSettings']['Networks'].keys() def __getitem__(self, key): if not isinstance(key, str): key = key.name networks = self._container.attrs['NetworkSettings']['Networks'] if key not in networks: raise KeyError(f'Unknown network: {key}') return networks[key]['IPAddress'] class PortMap(_Map): def __init__(self, container): self._container = container def keys(self): return self._container.attrs['NetworkSettings']['Ports'].keys() def __getitem__(self, key): ports = self._container.attrs['NetworkSettings']['Ports'] if key not in ports: raise KeyError(f'Unknown port: {key}') if not ports[key]: return [] return [int(p['HostPort']) for p in ports[key]] class Container(object): def __init__(self, container): self._container = container self.ips = IpMap(container) self.ports = PortMap(container) def ready(self): self._container.reload() if self.status == 'exited': raise ContainerFailed(self, f'Container {self.name} has already exited before we noticed it was ready') if self.status != 'running': return False networks = self._container.attrs['NetworkSettings']['Networks'] for name, network in networks.items(): if not network['IPAddress']: return False # If a user has exposed a port then wait for LISTEN socket to show up in netstat ports = self._container.attrs['NetworkSettings']['Ports'] for port, listeners in ports.items(): if not listeners: continue port, proto = port.split('/') assert proto in ('tcp', 'udp') if proto == 'tcp' and port not in self.get_open_tcp_ports(): return False if proto == 'udp' and port not in self.get_open_udp_ports(): return False return True @property def attrs(self): return self._container.attrs @property def id(self): return self._container.id @property def name(self): return self._container.name @property def env(self): kv_pairs = map(lambda v: v.split('=', 1), self._container.attrs['Config']['Env']) return {k: v for k, v in kv_pairs} @property def status(self): return self._container.status def exec_run(self, *args, **kwargs): return self._container.exec_run(*args, **kwargs) def reload(self): return self._container.reload() def restart(self, timeout=10): self._container.restart(timeout=timeout) try: wait_for_callable('Waiting for container to be ready after restart', self.ready) except TimeoutError: raise ContainerNotReady(self, 'Timeout while waiting for container to be ready after restart') def kill(self, signal=None): return self._container.kill(signal) def remove(self, *args, **kwargs): raise RuntimeError('Do not remove this container manually. It will be removed automatically by py.test after the test finishes.') def logs(self): return self._container.logs().decode('utf-8') def get_files(self, path): ''' Retrieve files from a container at a given path. This is meant for extracting log files from a container where it is not using the docker logging capabilities. ''' archive_iter, _ = self._container.get_archive(path) archive_stream = io.BytesIO() [archive_stream.write(chunk) for chunk in archive_iter] archive_stream.seek(0) archive = tarfile.TarFile(fileobj=archive_stream) files = {} for info in archive.getmembers(): if not info.isfile(): files[info.name] = None continue reader = archive.extractfile(info.name) files[info.name] = reader.read() return files def get_text(self, path): text = {} for path, bytes in self.get_files(path).items(): if bytes is None: text[path] = None continue text[path] = bytes.decode('utf-8') return text def get_open_tcp_ports(self): ''' Gets all TCP sockets in the LISTEN state ''' netstat = self._container.exec_run('cat /proc/net/tcp /proc/net/tcp6')[1].decode('utf-8').strip() ports = [] for line in netstat.split('\n'): # Not interested in empty lines if not line: continue line = line.split() # Only interested in listen sockets if line[3] != '0A': continue ports.append(str(int(line[1].split(':', 1)[1], 16))) return ports def get_open_udp_ports(self): ''' Gets all UDP sockets in the LISTEN state ''' netstat = self._container.exec_run('cat /proc/net/udp /proc/net/udp6')[1].decode('utf-8').strip() ports = [] for line in netstat.split('\n'): # Not interested in empty lines if not line: continue line = line.split() # If we are listening on a UDP port it will appear in /proc/net/udp # and state will be '07' if line[3] != '07': continue ports.append(str(int(line[1].split(':', 1)[1], 16))) return ports def get_addr(self, port): if tests_inside_container(): return (self.ips.primary, int(port.split('/')[0])) else: return ('127.0.0.1', self.ports[port][0]) PK!HG^(44pytest_docker_tools-0.2.0.dist-info/entry_points.txt.,I-.14JON-/)#䔦gqqPK!H>*RQ)pytest_docker_tools-0.2.0.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,rzd&Y)r$[)T&UrPK!HKĶk,pytest_docker_tools-0.2.0.dist-info/METADATA=isǕ+VmLC*n蘉{S `@sezV(Ɏ*L_n=uu)cuƙ>Vֶ'JW(R;`ɲZ\%k(kUR%4WR FS&kpkSyڪ0}DouTFL`[uiвF"jacfs Otpˢ:V):J,6G3x&D({'U޻\0c_X僳4̍k%W㺱X}Է%ϫPeqhؘ4UQ]{E罠>{qN˲*aiϖZ(*k~ {<ƾU x~ -[;J?MMO_U|`@mߋF-kV*VejfLT#Ja;-ឰ+x(lbaGA"͢P,ٚ;@dFzB 3@E25 D{ ZFWxn#1yb/M2dEҤhN_ rRbX@j ٘!iffjӸdpi]<"- 7 H}d_Lj]u0h pdŕ2s~eRB!CYW]j `U;eaP,>z+]WqJ <BQGԐjsШ9n0c'kzy+v[ ~ʸ^ Av[w w;~ hГ$IB?|pz?CN<|>>{d<>9oq4YtQn^Bo`X7h`GHdJA2o_$ 85n Fc?9GW/^QJpy~FaZ[`~o;HlV IȐ_jZzdMHPE Dj@XQmZWĿF;X($;P(>F;'1NWoGw-`_XښuP6]o2t?Q=IbEQAr_By:6yRc6u 6qO[V5"d`Mؚ3qxL~(R|ނ32<%VH]Lp +,|ktD*ٵ}_;oL/sL80Йvr Ӽumbت5tWdE9ïC&@5Dd0%] v"{!ߋ]G'Km̮f@Pc.A[:uuV7kƶ^Vh` A9xVݑX Kxְi$iF"a$$sJAèԳ9LT>X l{~@P:Bx?7# ;Kҟ- `$%4 7@-bru@$Ȼv5R|V>79Yof\+\\4 ^ׁF~79`6s&7>il`i7C?fh:D;[?r a8uZ\og5 zԲҳ$:Oܯh0n'?8~͔VvtԊ$V=ӑOF'Rj%,K߂ lEY&(믾qF vuFS#uVV:%;I<ߕ1~K m6VD$6-Va/-(l[-AMMaw.(+Ir6e$T@yQ3FI$8€f 1z=Åq%, NC5qnɃsa<4!+ׁfp%< 2ɼK8+ 'cqanrʽMAj:sV59~RL;=c[L# d"K[Πb?LFHXZs9&:Cb0'v=P Wa(0EW?פcp_Q/7G*@}\#7)9(5`)`]6zbD>|J{0,Z* z`!tb3l9"S#]juީt$21-LD G&@e+Y"1-S59mfG2 zL:ZE~xѬ2TJM~46i@S:'، @;Rs=u! fͽ6x|VqX2w$}w"c~"D> Jj5"UKI7wj]bKrIfo0jh`Le *M^Uٍ`Ԏ#ʀad*#E-ժ^شo6btl>vчA?q;;o]&I$Yj}鋓}4%l0_~NkhFUUʿ٭gܔÇ2 ;xeEn}|~g g/zs3:s)9\ͷT?REa&Ѡ)g |Mۋ*<$НWRuNEV44bWp]p+ߦu^ԇ֝P=;y/h0F95s=[0cf[[^ka~V.YGb.m+oWLmws#8v|ƙT.Qgc"_Td"uXMƽmbLήȈh֚|Vad .' '|\x)KAS ]s#q[yߧC3z V<ѱpX*Ơv*7G~%:y7ElibO^{N>W@h-MR 7éo;Gn8ų㿞~g?_v=}bY: 4|!ЇgX:'#SK@o^xf(D䖴s[.u.\F7dG Y=)8 AG V&JfƵt |.Ln5/e,rgB.TZvWbeJqM&ȺG@!GxFw*LQ*,ceոB##BIcr (tt;ПuzL+fv4!bx ;,V߇?\K΃__Sܧz66Ec^qE/mi6w%82d& :Z_)CBgFtY>.NVNy&h7w9)!Y\„>>-^s񊇒hNn,C&°: {ySg^0B gʵyïï˻'[1;9qIҀ@>LW}\Q}k~tn<4jߑ7ɰ <83&`f˪ݮ}XEur~wfȹ=I p9O=9ߥP;{h㾉؃.SQzQ8PݼfenΓu._t¿S_UFt0tb7F0 _3lԹ]ӽyrg"7XKXWx""UMS`lЙ  j,3VRpC@pʞwW" gE:IU{"}uZTq/u-nЖKDF,QPj1:3"}uA;XM1fmk,Rh"4j<\}$U/?uEM[ZtK'EUTFv+ n2s9)wG `6r(daYT$V-'-\r){8Nk]w[=zWb?M$C]2ŕfd֗nZiL~3A[ㅛXqC'X)<LKx X@)jݖ[jYX5 nX7ň6ɥ|grJ-_+1;BƶBtsO+:(oe:9xQfTެj]i_ A4`HQ,瓚0F ϊlae&d~5t{&bįse}Sz(cFJ㮹@}6ɁA=`-+ձvmQ]~}5b;vaĈwOMܭx;Srv9hd@N3˙eԾOnjY2P cuKTwan GmٿioR11W{گcg  :\]Wdc/  N# Fe:-K*J+y1YYDc>25sʋOzt/]PbWhQ'!>x]G>'\J5m=PpN+00`V|$Q}=6]* WOzKQDjbXKr }f}I1i:l5hkW&EތF9&gK=#/Aa +|f?=0O׸OTE_ZTgνֹ #Vlmu:wuFیPi9vw7g>:7 7cAo2)(Sd.{sSXUIx WK-"ܣzE˞-Y/Aq֖(qdv8.itXt@/ P%m]6++5McIB*q3Q01[^J%%X^q"Oqgט\ksM|/:9$7Gȶam)ѢGqwǃ^S*xw:,}c%Q;U3q /zox~>.ݥ ǃ欘3-&Z1/O/Q^Z۰F]S1vt3Ǎ 8Foi!3jauohB'7%m۹T7yǏz .K7L\z]͈8*RQ)