PKlfMY)YYpytest_docker_tools/__init__.py''' An opionated set of helpers for defining Docker integration test environments with py.test fixtures. ''' from .factories import build, container, fetch, image, image_or_build, network, volume __version__ = '0.0.12' __all__ = [ 'build', 'container', 'fetch', 'image', 'image_or_build', 'network', 'volume', ] PKmLK__pytest_docker_tools/builder.pyimport textwrap from .templates import find_fixtures_in_params, resolve_fixtures_in_params def build_fixture_function(callable, scope, wrapper_class, kwargs): name = callable.__name__ docstring = getattr(callable, '__doc__', '').format(**kwargs) fixtures = find_fixtures_in_params(kwargs).union(set(('request', 'docker_client'))) fixtures_str = ','.join(fixtures) template = textwrap.dedent(f''' import pytest @pytest.fixture(scope=scope) def {name}({fixtures_str}): \'\'\' {docstring} \'\'\' real_kwargs = resolve_fixtures_in_params(request, kwargs) return _{name}(request, docker_client, wrapper_class=wrapper_class, **real_kwargs) ''') globals = { 'resolve_fixtures_in_params': resolve_fixtures_in_params, f'_{name}': callable, 'kwargs': kwargs, 'scope': scope, 'wrapper_class': wrapper_class, } exec(template, globals) return globals[name] def fixture_factory(scope='function'): wrapper_class = None def inner(callable): def factory(*, scope=scope, wrapper_class=wrapper_class, **kwargs): return build_fixture_function(callable, scope, wrapper_class, kwargs) factory.__name__ = callable.__name__ factory.__doc__ = getattr(callable, '__doc__', '') return factory return inner PK`fM+44!pytest_docker_tools/exceptions.pyclass TimeoutError(Exception): pass class ContainerError(Exception): def __init__(self, container, *args, **kwargs): self._container = container super().__init__(*args, **kwargs) class ContainerFailed(ContainerError): pass class ContainerNotReady(ContainerError): pass PK`fM^݆pytest_docker_tools/plugin.pyimport docker import pytest from .exceptions import ContainerError from .wrappers import Container @pytest.fixture(scope='session') def docker_client(request): ''' A Docker client configured from environment variables ''' return docker.from_env() @pytest.hookimpl(tryfirst=True, hookwrapper=True) def pytest_runtest_makereport(item, call): ''' This hook allows Docker containers to contribute their logs to the py.test report. ''' outcome = yield rep = outcome.get_result() if not rep.failed: return if call.excinfo and isinstance(call.excinfo.value, ContainerError): container = call.excinfo.value._container rep.sections.append((container.name, container.logs())) if 'request' not in item.funcargs: return for name, fixturedef in item.funcargs['request']._fixture_defs.items(): if not hasattr(fixturedef, 'cached_result'): continue fixture = fixturedef.cached_result[0] if isinstance(fixture, Container): rep.sections.append(( name + ': ' + fixture.name, fixture.logs(), )) PKLs~33 pytest_docker_tools/templates.py''' # Template handling Fixture factories can take static strings: ```python redis = container( image='redis:latest', ) ``` But that is not useful when building multiple containers that need to reference one another or you need to parameterize the fixture. This module provides two facilities: The ability to reference fixtures using python string template notation and the ability to know what fixtures this will fetch in at test collection (and generation) time. For example: ``` def test_simple_resolve(request): # These are parameters declared at import time - they can be reevaluated in the context of multiple tests kwargs = { 'somekey': ['{pytestconfig.getoption("verbose")}'], } # This can be used in a fixture factory to fill in the templates resolved = resolve_fixtures_in_params(request, kwargs) # And then the test can access them pytestconfig = request.getfixturevalue('pytestconfig') assert resolved['somekey'][0] == str(pytestconfig.getoption("verbose")) } ``` In order to make fixtures generated by a fixture factory more seamless we need to know a fixtures dependencies at collection time. We have a helper to find them: def test_simple_find(): # These are parameters declared at import time - they can be reevaluated in the context of multiple tests kwargs = { 'somekey': ['{pytestconfig.getoption("verbose")}'], } dependencies = find_fixtures_in_params(kwargs) assert dependencies = set('pytestconfig') ''' import inspect from string import Formatter __all__ = [ 'find_fixtures_in_params', 'resolve_fixtures_in_params', ] class FixtureFormatter(Formatter): def __init__(self, request): self.request = request def get_value(self, key, args, kwargs): return self.request.getfixturevalue(key) class Renderer(object): def __init__(self, request): self.request = request def visit_value(self, val): if isinstance(val, str): return FixtureFormatter(self.request).format(val) elif callable(val): return val(*[self.request.getfixturevalue(f) for f in inspect.getargspec(val)[0]]) return val def visit_list(self, val): return [self.visit(v) for v in val] def visit_dict(self, mapping): return {self.visit(k): self.visit(v) for (k, v) in mapping.items()} def visit(self, value): if isinstance(value, dict): return self.visit_dict(value) elif isinstance(value, list): return self.visit_list(value) elif value: return self.visit_value(value) class FixtureFinder(object): def visit_value(self, val): if isinstance(val, str): for literal_text, format_spec, conversion, _ in Formatter().parse(val): if format_spec: yield format_spec.split('.')[0].split('[')[0] elif callable(val): yield from inspect.getargspec(val)[0] def visit_list(self, val): for v in val: yield from self.visit(v) def visit_dict(self, mapping): for k, v in mapping.items(): yield from self.visit(k) yield from self.visit(v) def visit(self, value): if isinstance(value, dict): yield from self.visit_dict(value) elif isinstance(value, list): yield from self.visit_list(value) elif value: yield from self.visit_value(value) def find_fixtures_in_params(value): ''' Walk an object and identify fixtures references in templates in strings. ''' finder = FixtureFinder() return set(finder.visit(value)) def resolve_fixtures_in_params(request, value): ''' Walk an object and resolve fixture values referenced in template strings. ''' renderer = Renderer(request) return renderer.visit(value) PKojfMY;;pytest_docker_tools/utils.pyimport os import sys import time from .exceptions import TimeoutError def wait_for_callable(message, callable, timeout=30): ''' Runs a callable once a second until it returns True or we hit the timeout. ''' sys.stdout.write(message) try: for i in range(timeout): sys.stdout.write('.') sys.stdout.flush() if callable(): return time.sleep(1) finally: sys.stdout.write('\n') raise TimeoutError(f'Timeout of {timeout}s exceeded') def tests_inside_container(): ''' Returns True if tests are running inside a Linux container ''' if not os.path.exists('/proc/1/sched'): return False with open('/proc/1/sched', 'r') as fp: line1 = fp.read().split('\n')[0] # Right now this file contains a header like this which leaks the actual pid # systemd (1, #threads: 1) # If its not '1' we have detected containment init, info = line1.split(' ', 1) pid, threads = info.strip('(').rstrip(')').split(', ', 1) return pid != '1' PKFL./pytest_docker_tools/contexts/scratch/Dockerfile# For volume seeding we want a near-empty volune, You can't 'docker create scratch' # and you can have an iamge with just `FROM scratch`. FROM scratch CMD ["/bin/sh"] PKcfMֶJJ)pytest_docker_tools/factories/__init__.pyfrom .build import build from .container import container from .fetch import fetch from .image import image from .image_or_build import image_or_build from .network import network from .volume import volume __all__ = [ 'build', 'container', 'fetch', 'image', 'image_or_build', 'network', 'volume', ] PKdLOm&pytest_docker_tools/factories/build.pyimport sys from pytest_docker_tools.builder import fixture_factory @fixture_factory(scope='session') def build(request, docker_client, wrapper_class, **kwargs): ''' Docker image: built from "{path}" ''' # The docker build command now defaults to --rm=true, but docker-py doesnt # Let's do what docker build does by default kwargs.setdefault('rm', True) sys.stdout.write(f'Building {kwargs["path"]}') try: image, logs = docker_client.images.build(**kwargs) for line in logs: sys.stdout.write('.') sys.stdout.flush() finally: sys.stdout.write('\n') # request.addfinalizer(lambda: docker_client.images.remove(image.id)) wrapper_class = wrapper_class or (lambda image: image) return wrapper_class(image) PK`fM+zz*pytest_docker_tools/factories/container.pyfrom pytest_docker_tools.builder import fixture_factory from pytest_docker_tools.exceptions import ContainerNotReady, TimeoutError from pytest_docker_tools.utils import wait_for_callable from pytest_docker_tools.wrappers import Container @fixture_factory() def container(request, docker_client, wrapper_class, **kwargs): ''' Docker container: image={image} ''' kwargs.update({'detach': True}) raw_container = docker_client.containers.run(**kwargs) request.addfinalizer(lambda: raw_container.remove(force=True) and raw_container.wait(timeout=10)) wrapper_class = wrapper_class or Container container = wrapper_class(raw_container) try: wait_for_callable('Waiting for container to be ready', container.ready) except TimeoutError: raise ContainerNotReady(container, 'Timeout while waiting for container to be ready') return container PKaLL"&pytest_docker_tools/factories/fetch.pyimport sys from pytest_docker_tools.builder import fixture_factory @fixture_factory(scope='session') def fetch(request, docker_client, wrapper_class, **kwargs): ''' Docker image: Fetched from {repository} ''' sys.stdout.write(f'Fetching {kwargs["repository"]}\n') image = docker_client.images.pull(**kwargs) # request.addfinalizer(lambda: docker_client.images.remove(image.id)) wrapper_class = wrapper_class or (lambda image: image) return wrapper_class(image) PKhfM:ff&pytest_docker_tools/factories/image.pyfrom pytest_docker_tools.builder import fixture_factory @fixture_factory(scope='session') def image(request, docker_client, wrapper_class, **kwargs): ''' Docker image: named "{name}" (already available) ''' image = docker_client.images.get(kwargs['name']) wrapper_class = wrapper_class or (lambda image: image) return wrapper_class(image) PKhfMde/pytest_docker_tools/factories/image_or_build.pyimport os from .build import build from .image import image def image_or_build(environ_key, **kwargs): if environ_key in os.environ: return image(name=os.environ[kwargs['environ_key']]) return build(**kwargs) PK]L{u(pytest_docker_tools/factories/network.pyimport uuid from pytest_docker_tools.builder import fixture_factory @fixture_factory() def network(request, docker_client, wrapper_class, **kwargs): ''' Docker network ''' name = kwargs.pop('name', 'pytest-{uuid}').format(uuid=str(uuid.uuid4())) print(f'Creating network {name}') network = docker_client.networks.create(name, **kwargs) request.addfinalizer(lambda: network.remove()) wrapper_class = wrapper_class or (lambda network: network) return wrapper_class(network) PKXLc'pytest_docker_tools/factories/volume.pyimport io import os import tarfile import uuid from pytest_docker_tools.builder import fixture_factory def _populate_volume(docker_client, volume, seeds): fp = io.BytesIO() tf = tarfile.open(mode="w:gz", fileobj=fp) for path, contents in seeds.items(): ti = tarfile.TarInfo(path) if contents is None: ti.type = tarfile.DIRTYPE tf.addfile(ti) else: ti.size = len(contents) tf.addfile(ti, io.BytesIO(contents)) tf.close() fp.seek(0) image, logs = docker_client.images.build( path=os.path.join(os.path.dirname(__file__), '..', 'contexts/scratch'), rm=True, ) list(logs) container = docker_client.containers.create( image=image.id, volumes={ f'{volume.name}': {'bind': '/data'}, }, ) try: container.put_archive('/data', fp) finally: container.remove(force=True) @fixture_factory() def volume(request, docker_client, wrapper_class, **kwargs): ''' Docker volume ''' name = kwargs.pop('name', 'pytest-{uuid}').format(uuid=str(uuid.uuid4())) seeds = kwargs.pop('initial_content', {}) print(f'Creating volume {name}') volume = docker_client.volumes.create(name, **kwargs) request.addfinalizer(lambda: volume.remove(True)) if seeds: _populate_volume(docker_client, volume, seeds) wrapper_class = wrapper_class or (lambda volume: volume) return wrapper_class(volume) PKU_L'kBB(pytest_docker_tools/wrappers/__init__.pyfrom .container import Container __all__ = [ 'Container', ] PKifMra99)pytest_docker_tools/wrappers/container.py''' This module contains a wrapper that adds some helpers to a Docker Container object that are useful for integration testing. ''' import io import tarfile from pytest_docker_tools.exceptions import ( ContainerFailed, ContainerNotReady, TimeoutError, ) from pytest_docker_tools.utils import tests_inside_container, wait_for_callable class _Map(object): def __init__(self, container): self._container = container def values(self): return [self[k] for k in self.keys()] def items(self): return [(k, self[k]) for k in self.keys()] def __iter__(self): return iter(self.keys()) class IpMap(_Map): @property def primary(self): return next(iter(self.values())) def keys(self): return self._container.attrs['NetworkSettings']['Networks'].keys() def __getitem__(self, key): if not isinstance(key, str): key = key.name networks = self._container.attrs['NetworkSettings']['Networks'] if key not in networks: raise KeyError(f'Unknown network: {key}') return networks[key]['IPAddress'] class PortMap(_Map): def __init__(self, container): self._container = container def keys(self): return self._container.attrs['NetworkSettings']['Ports'].keys() def __getitem__(self, key): ports = self._container.attrs['NetworkSettings']['Ports'] if key not in ports: raise KeyError(f'Unknown port: {key}') if not ports[key]: return [] return [int(p['HostPort']) for p in ports[key]] class Container(object): def __init__(self, container): self._container = container self.ips = IpMap(container) self.ports = PortMap(container) def ready(self): self._container.reload() if self.status == 'exited': raise ContainerFailed(self, f'Container {self.name} has already exited before we noticed it was ready') if self.status != 'running': return False networks = self._container.attrs['NetworkSettings']['Networks'] for name, network in networks.items(): if not network['IPAddress']: return False # If a user has exposed a port then wait for LISTEN socket to show up in netstat ports = self._container.attrs['NetworkSettings']['Ports'] for port, listeners in ports.items(): if not listeners: continue port, proto = port.split('/') assert proto in ('tcp', 'udp') if proto == 'tcp' and port not in self.get_open_tcp_ports(): return False if proto == 'udp' and port not in self.get_open_udp_ports(): return False return True @property def attrs(self): return self._container.attrs @property def id(self): return self._container.id @property def name(self): return self._container.name @property def env(self): kv_pairs = map(lambda v: v.split('=', 1), self._container.attrs['Config']['Env']) return {k: v for k, v in kv_pairs} @property def status(self): return self._container.status def exec_run(self, *args, **kwargs): return self._container.exec_run(*args, **kwargs) def reload(self): return self._container.reload() def restart(self, timeout=10): self._container.restart(timeout=timeout) try: wait_for_callable('Waiting for container to be ready after restart', self.ready) except TimeoutError: raise ContainerNotReady(self, 'Timeout while waiting for container to be ready after restart') def kill(self, signal=None): return self._container.kill(signal) def remove(self, *args, **kwargs): raise RuntimeError('Do not remove this container manually. It will be removed automatically by py.test after the test finishes.') def logs(self): return self._container.logs().decode('utf-8') def get_files(self, path): ''' Retrieve files from a container at a given path. This is meant for extracting log files from a container where it is not using the docker logging capabilities. ''' archive_iter, _ = self._container.get_archive(path) archive_stream = io.BytesIO() [archive_stream.write(chunk) for chunk in archive_iter] archive_stream.seek(0) archive = tarfile.TarFile(fileobj=archive_stream) files = {} for info in archive.getmembers(): if not info.isfile(): files[info.name] = None continue reader = archive.extractfile(info.name) files[info.name] = reader.read() return files def get_text(self, path): text = {} for path, bytes in self.get_files(path).items(): if bytes is None: text[path] = None continue text[path] = bytes.decode('utf-8') return text def get_open_tcp_ports(self): ''' Gets all TCP sockets in the LISTEN state ''' netstat = self._container.exec_run('cat /proc/net/tcp')[1].decode('utf-8').strip() ports = [] for line in netstat.split('\n'): # Not interested in empty lines if not line: continue line = line.split() # Only interested in listen sockets if line[3] != '0A': continue ports.append(str(int(line[1].split(':', 1)[1], 16))) return ports def get_open_udp_ports(self): ''' Gets all UDP sockets in the LISTEN state ''' netstat = self._container.exec_run('cat /proc/net/udp')[1].decode('utf-8').strip() ports = [] for line in netstat.split('\n'): # Not interested in empty lines if not line: continue line = line.split() # Only interested in listen sockets if line[3] != '0A': continue ports.append(str(int(line[1].split(':', 1)[1], 16))) return ports def get_addr(self, port): if tests_inside_container(): return (self.ips.primary, int(port.split('/')[0])) else: return ('127.0.0.1', self.ports[port][0]) PK!HG^(45pytest_docker_tools-0.0.12.dist-info/entry_points.txt.,I-.14JON-/)#䔦gqqPK!H>*RQ*pytest_docker_tools-0.0.12.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,rzd&Y)r$[)T&UrPK!H\qk-pytest_docker_tools-0.0.12.dist-info/METADATA=isǕ+VmLC*n蘉{S `@sezV(ɎU6>^~{z8xG]YS(:<3}um=NٕuQvDEeq>V*Jx:QVתNKhE=7 L^EE4Jצ*LU+S/a뺩+/e3fE>@1ЏL:_>ഩEuR,suWA"͢P,ٚ;@dFzB 3@E25 D{ ZFWxn#1yb/M2dEҤhN_ rRbX@j ٘!iffjӸdpi]<"- 7 H}d_Lj]u0h pdŕ2s~eRB!CYW]j `U;eaP,>z+]OqJ <BQGԐjsШ9n0c'kzy+v[ ʸ^ Av[w w;~ hГ$IB?|pzCN<|>>{d<>Y-Nq> μ?KM,kC9`SP-N^zDQY_!(?E€`JPㆩ`9qDмlax4 g4evDf2nZ |;f0eGDAnB㬟6Sr[YʯEQ+X%NG>UH](h@,} 63Reb|Ƣ)ةyMĺYY[سoXG$WƸB|.pc3ڈ3ZXmLsLشXoQmm2R$%65_^oD9컠@$N8ٔp 6NzSqFAͷ&EIShbx 5& ^$8- !h_Lǹ%>h@\kpU h )|Ln&/AL ȯ,HC-)66 D @ADY,)J2m&8Jn1c39$,]n1K;Y0)!aejգ 8MH`FyxLi}.9)g=HK4C:V{9$ %z{<–dW1YS NmX}y&E3 ۓ4z}}뒪!;%Z*:1ձm ]X3XGG.2וN`0Cz4XKWj_-V/_߿\ 8!_ _OT׫pQp0DDX׀o]w!V|?9Ee:)h" A&"M5n06%ɣ0$?vAP.X>:Q(mND0)C]SEt4eY\rQSJ Eڟ 0\nI!\~>k/] E^SnRE*Ǭ"F[*J! gD榲uy@>ڨUl)ӢvB s) J Q&XR3!`&VQeOFA =/T8r{24)} pIFv7E fDQJ|ށ%7AHc ~%9(`R& 1- ig 9>n \I:"hà1Jوf>\%Q#gI`cܙ¿:/"n'n$tڥ7;/wfk'E$.q1.;R0p1lة-SvxjE61N}^U@B UgwsDW5LSFMIdbZ27|3AMsU!:n_^WDb$ZOfkr,)+̎d\-.@U18u60HYexdǖ,!k {n)߸xCeuDoY\2Nf֕ i B-VNbU.^TBk P2Q*vΫИd?Pbk6X%^t>;%{q-@ 1hQ˹{6>ФEm Ew1,<#D)lCWHW@bD(%gk] n ;U( 6AD!ZNĆޞ`3& HA@lT+^އ(F7 PQX9uct8ܡaމaX[[|yC$$4k+ڟֈT/'ѫu钞:,Eȥ'A{_*0M/#0Y,4V4ymRWe7Q;ێT+F>)CS`0Wz!`=Ò1= ܝZzr/+./7;G!J'=rsw5';rHO2@IAID:j2Q20[(wdt{]﯑}.g\,N. XJ}\WWx@1(1c _1+V Sa>5Hm Z Ot >@Գ) +ziKAkw,Ǒ {6!m?e,lJB'Qzo&5Qu5ge'mHQ[ &ԞtU<[Sy @8wb\GgiO<\f.NdH!B*qbX2;aPxtx{6 =A_HK(}}OAe-dT %$]M"b'Wf;GDR3jѥ1%rj:]+qO|6>>l9B.YYx,|Ep.HhR#Fb5 /||MK^tkYӝA+\o vҗ.\c64%Žb),1ef (4Wv;WU CMu305yzĘőIw?CQqpu<&2 C:ՁIIsI. tv=a4OؙTa5脩?/yn∖\ q}M`Kje)L5'x(Ɋ& P2d" !oڰ7uF#$N8!l Ax\KG>(:*:{~_<%]L0A;T_- SɤJ z%طU5߷6H+f̓KvIzcɜ `c ?kjHḬ)ڇ\T''Jpg c#4gKM.9ѓ] kAkL|6= :%`ꠈA E+mFZF&[Bb so>cǜ<i⋀-|y`+8px#^|-a ohA'w5צ!padvf<*RQ*Rpytest_docker_tools-0.0.12.dist-info/WHEELPK!H\qk-Spytest_docker_tools-0.0.12.dist-info/METADATAPK!H*%+ppytest_docker_tools-0.0.12.dist-info/RECORDPKt