PK!h__cornac/__init__.pyimport os.path from warnings import filterwarnings from flask import Flask # psycopg2 and psycopg2-binary is a mess, because you can't define OR # dependency in Python. Just globally ignore this for now. filterwarnings("ignore", message="The psycopg2 wheel package will be renamed") # noqa def create_app(): app = Flask(__name__) app.config.from_object(__name__ + '.default_config') if 'CORNAC_SETTINGS' in os.environ: path = os.path.realpath(os.environ['CORNAC_SETTINGS']) app.config.from_pyfile(path) from .core.model import db db.init_app(app) return app PK!Aܨ cornac/cli.py# # Main cornac CLI. # # Implements few commands for cornac initialization and maintainance. Running # the webservice is distinct since Flask already provide a good CLI for # development and production should use WSGI entrypoint. # import logging import os import pdb import sys from textwrap import dedent from urllib.parse import urlparse import click from flask import current_app from flask.cli import FlaskGroup from .core.model import DBInstance, db, connect from .core.schema import Migrator from .iaas import IaaS from .operator import BasicOperator logger = logging.getLogger(__name__) class KnownError(Exception): def __init__(self, message, exit_code=os.EX_SOFTWARE): super(KnownError, self).__init__(message) self.exit_code = exit_code def create_app(): # Fake factory for Flask app. from . import create_app as real_create_app app = real_create_app() from .web import rds, fallback app.register_blueprint(rds) app.errorhandler(404)(fallback) return app # Root group of CLI. @click.group(cls=FlaskGroup, create_app=create_app) def root(argv=sys.argv[1:]): pass @root.command(help=dedent( """ Provision guest and Postgres database for cornac itself. Initialize database with schema and data. """)) @click.option('--pgversion', default='11', help="Postgres engine version to deploy.", show_default=True, metavar='VERSION') @click.option('--size', default=5, type=click.IntRange(5, 300), help="Allocated storage size in gigabytes.", show_default=True, metavar='SIZE_GB',) @click.pass_context def bootstrap(ctx, pgversion, size): connstring = current_app.config['SQLALCHEMY_DATABASE_URI'] pgurl = urlparse(connstring) command = dict( AllocatedStorage=size, DBInstanceIdentifier=pgurl.path.lstrip('/'), Engine='postgres', EngineVersion=pgversion, MasterUserPassword=pgurl.password, MasterUsername=pgurl.username, ) logger.info("Creating instance %s.", command['DBInstanceIdentifier']) with IaaS.connect(current_app.config['IAAS'], current_app.config) as iaas: operator = BasicOperator(iaas, current_app.config) operator.create_db_instance(command) logger.info("Initializing schema.") ctx.invoke(migratedb, dry=False) logger.info("Registering instance to inventory.") instance = DBInstance() instance.identifier = command['DBInstanceIdentifier'] instance.status = 'running' # Drop master password before saving command in database. instance.create_params = dict(command, MasterUserPassword=None) db.session.add(instance) db.session.commit() logger.debug("Done") @root.command(help="Migrate schema and database of cornac database.") @click.option('--dry/--no-dry', default=True, help="Whether to effectively apply migration script.") def migratedb(dry): migrator = Migrator() migrator.inspect_available_versions() with connect(current_app.config['SQLALCHEMY_DATABASE_URI']) as conn: migrator.inspect_current_version(conn) if migrator.current_version: logger.info("Database version is %s.", migrator.current_version) else: logger.info("Database is not initialized.") versions = migrator.missing_versions for version in versions: if dry: logger.info("Would apply %s.", version) else: logger.info("Applying %s.", version) with conn: # Wraps in a transaction. migrator.apply(conn, version) if versions: logger.info("Check terminated." if dry else "Database updated.") else: logger.info("Database already uptodate.") def entrypoint(): logging.basicConfig( level=logging.DEBUG, format='%(levelname)1.1s: %(message)s', ) try: exit(root()) except (pdb.bdb.BdbQuit, KeyboardInterrupt): logger.info("Interrupted.") except KnownError as e: logger.critical("%s", e) exit(e.exit_code) except Exception: logger.exception('Unhandled error:') if sys.stdout.isatty(): logger.debug("Dropping in debugger.") pdb.post_mortem(sys.exc_info()[2]) else: logger.error( "Please report at " "https://github.com/dalibo/cornac/issues/new with full log.", ) exit(os.EX_SOFTWARE) PK!cornac/core/__init__.py# # Core package shares objects and functions used by every pieces of cornac: web # handlers, CLI scripts, background workers. # PK!8C--cornac/core/model.pyfrom contextlib import contextmanager import psycopg2 from flask_sqlalchemy import SQLAlchemy from sqlalchemy.dialects.postgresql import ( ENUM, JSONB, ) db = SQLAlchemy() @contextmanager def connect(connstring): # Manager for connecting to psycopg2 without flask nor SQLAlchemy. conn = psycopg2.connect(connstring) try: yield conn finally: conn.close() DBInstanceStatus = ENUM( 'creating', 'running', 'stopped', name='db_instance_status', ) class DBInstance(db.Model): __tablename__ = 'db_instances' id = db.Column(db.Integer, primary_key=True) identifier = db.Column(db.String) status = db.Column(DBInstanceStatus) status_message = db.Column(db.String) create_command = db.Column(JSONB) attributes = db.Column(JSONB) PK!cornac/core/schema/000-init.sqlCREATE TABLE IF NOT EXISTS schema_migration_log ( version TEXT UNIQUE NOT NULL PRIMARY KEY, cdate TIMESTAMP DEFAULT NOW() ); PK!(wP$cornac/core/schema/001-instances.sqlCREATE TYPE "db_instance_status" AS ENUM ( 'creating', 'running', 'stopped', 'failed' ); CREATE TABLE db_instances ( id BIGSERIAL PRIMARY KEY, identifier TEXT UNIQUE, status db_instance_status NOT NULL, status_message TEXT, create_command JSONb, attributes JSONb ); PK!^cornac/core/schema/__init__.py# # cornac.core.schema package groups code and data to manage cornac's database # schemas and data. See cornac.core.models for objects to query database using # SQLAlchemy. # from .migrator import Migrator __all__ = ['Migrator'] PK!dsscornac/core/schema/migrator.py# # The migrator manage schema and data migration across cornac versions. # # A migration is a file in versions/ directory. Filename *is* the version # identifier. Versions are sorted using POSIX C sort collation. The # migration_log table keep track of latest version. Only files sorting after # the latest version are required. This allow to squash versions to speed # bootstrap at the cost of dropping intermediate migration. # # See cornac migratedb command for usage. # import logging import os from glob import glob from textwrap import dedent import psycopg2.errorcodes logger = logging.getLogger(__name__) class Migrator(object): versionsdir = os.path.dirname(__file__) def __init__(self): self.current_version = None self.versions = [] @property def target_version(self): return self.versions[-1] @property def missing_versions(self): try: i = self.versions.index(self.current_version) except ValueError: return self.versions else: return self.versions[i + 1:] def inspect_current_version(self, conn): try: with conn.cursor() as cur: cur.execute(dedent("""\ SELECT version FROM schema_migration_log ORDER BY version DESC LIMIT 1; """)) row = cur.fetchone() self.current_version = row[0] except psycopg2.Error as e: conn.rollback() if e.pgcode != psycopg2.errorcodes.UNDEFINED_TABLE: raise return self.current_version def inspect_available_versions(self): self.versions = sorted( [os.path.basename(f) for f in glob(self.versionsdir + "/*.sql")] ) def apply(self, pg, version): path = os.path.join(self.versionsdir, version) with open(path) as fo: sql = fo.read() with pg.cursor() as cur: cur.execute(sql) cur.execute( "INSERT INTO schema_migration_log (version) VALUES (%s);", (version,), ) PK!~i,,cornac/default_config.py# # D E F A U L T C O N F I G U R A T I O N # import os # Domain suffix to resolve guest IP from DNS. DNS_DOMAIN = '' # IAAS URL, starting with provider prefix, + sign and provider specific URL. # e.g. libvirt, vcenter+https://me:password@vcenter.acmi.lan/?no_verify=1 IAAS = None # Provider specific guest network. # # For vSphere, use absolute path e.g. 'datacenter1/network/Guest Network' NETWORK = None # Provider specific name of the template machine to clone. You must install # Postgres and other tools. See appliance/ for how to maintain this template # with Ansible. # # For vSphere, must be a full path to the VM. e.g. # datacenter1/vm/templates/base-cornac. ORIGINAL_MACHINE = 'base-cornac' # DSN to Postgres database. SQLALCHEMY_DATABASE_URI = os.environ.get('CORNAC_DATABASE') # Provider-specific name of the storage pool (or datastore in vSphere). STORAGE_POOL = 'default' # SSH Public key used for deployement and maintainance of guests. ROOT_PUBLIC_KEY = None # vCenter specific resource pool where to place guests. Could be a host or a # cluster resource pool. e.g. 'datacenter1/host/esxi1/Resources VCENTER_RESOURCE_POOL = None # # I N T E R N A L S # # Here cornac configures Flask and extensions. You should not overload this # settings. SQLALCHEMY_TRACK_MODIFICATIONS = False PK!5`k--cornac/iaas/__init__.py# # IaaS object manages machine, disk and networking. # class IaaS(object): registry = { # IaaS loads provider class lazily to avoid importing irrelevant # third-party library. The module path has the same format of # setuptools entrypoint. 'libvirt': __name__ + '.libvirt:LibVirtIaaS', 'vcenter': __name__ + '.vcenter:vCenter', } @classmethod def load_iaas(cls, name): modname, clsname = cls.registry[name].split(':') mod = __import__(modname, fromlist=[clsname], level=0) return getattr(mod, clsname) @classmethod def connect(cls, url, config): provider, _, url = url.partition('+') iaas_cls = cls.load_iaas(provider) # Let's provider class analyze URL. return iaas_cls.connect(url, config) # By inheriting this class, IaaS provider implementation gains context # management to properly close resources. def __enter__(self): return self def __exit__(self, *_): self.close() def close(self): pass PK!ZZcornac/iaas/libvirt.py# Implementation of IaaS based on libvirt tools. # # Uses libvirt binding, virt-manager and guestfs tools to manage VM. # Current purpose is PoC or development. import logging import os from copy import deepcopy from string import ascii_lowercase from xml.etree import ElementTree as ET import libvirt from . import IaaS from ..ssh import logged_cmd logger = logging.getLogger(__name__) _1G = 1024 * 1024 * 1024 class LibVirtIaaS(IaaS): @classmethod def connect(cls, url, config): return cls(libvirt.open(), config) def __init__(self, conn, config): self.conn = conn self.config = config # Configuration Keys: # # ROOT_PUBLIC_KEY: SSH public key to inject to access root account # on new machines. # DNS_DOMAIN: DNS domain to build FQDN of machine on the IaaS. def attach_disk(self, domain, disk): xml = domain.XMLDesc() path = disk.path() disk.machine = self if path in xml: logger.debug("Disk %s already attached to %s.", path, disk.name()) return xml = ET.fromstring(xml) xdevices = xml.find('./devices') xdisk0 = xdevices.find('./disk') xdisk = deepcopy(xdisk0) xsrc = xdisk.find('./source') xsrc.attrib['file'] = path xscsitargets = xml.findall(".//disk/target[@bus='scsi']") devs = [e.attrib['dev'] for e in xscsitargets] xtarget = xdisk.find('./target') xtarget.attrib['dev'] = 'sd' + ascii_lowercase[len(devs)] xtarget.tail = xtarget.tail[:-2] # Remove one indent level. xdisk.remove(xdisk.find('./address')) # Try to place disk after first one. xdevices.insert(1 + len(devs), xdisk) xml = ET.tostring(xml, encoding="unicode") logger.debug("Attaching disk %s.", path) self.conn.defineXML(xml) def close(self): self.conn.close() def create_disk(self, pool, name, size_gb): pool = self.conn.storagePoolLookupByName(pool) try: disk = pool.storageVolLookupByName(name) except libvirt.libvirtError: pass else: logger.debug("Reusing disk %s.", name) return disk # For now, just clone definition of first disk found in pool. vol0 = pool.listAllVolumes()[0] xvol = ET.fromstring(vol0.XMLDesc()) xvol.find('./name').text = name xkey = xvol.find('./key') xkey.text = os.path.dirname(xkey.text) + "/" + name + ".qcow2" xvol.find('./target/path').text = xkey.text xvol.find('./capacity').text = "%d" % (size_gb * _1G) # Prallocate 256K, for partition, PV metadata and mkfs. xvol.find('./allocation').text = "%d" % (256 * 1024,) xvol.remove(xvol.find('./physical')) logger.debug("Creating disk %s.", name) return pool.createXML(ET.tostring(xvol, encoding='unicode')) def create_machine(self, name, storage_pool, data_size_gb, **kw): # The PoC reuses ressources until we have persistence of objects. try: domain = self.conn.lookupByName(name) except libvirt.libvirtError: clone_cmd = [ "virt-clone", "--original", self.config['ORIGINAL_MACHINE'], "--name", name, "--auto-clone", ] logger.debug("Allocating machine.") logged_cmd(clone_cmd) domain = self.conn.lookupByName(name) else: logger.debug("Reusing VM %s.", name) state, _ = domain.state() if libvirt.VIR_DOMAIN_SHUTOFF == state: prepare_cmd = [ "virt-sysprep", "--domain", name, "--hostname", name, "--selinux-relabel", ] if self.config['ROOT_PUBLIC_KEY']: prepare_cmd.extend([ "--ssh-inject", f"root:string:{self.config['ROOT_PUBLIC_KEY']}", ]) logger.debug("Preparing machine.") logged_cmd(prepare_cmd) disk = self.create_disk(storage_pool, f'{name}-data', data_size_gb) self.attach_disk(domain, disk) return domain def endpoint(self, domain): # Let's DNS resolve machine IP for now. return domain.name() + self.config['DNS_DOMAIN'] def guess_data_device_in_guest(self, machine): # Guess /dev/disk/by-path/… device file from XML. xml = ET.fromstring(machine.XMLDesc()) name = f'{machine.name()}-data' for xdisk in xml.findall(".//disk"): if name in xdisk.find('./source').attrib['file']: xdiskaddress = xdisk.find('./address') break else: raise Exception(f"Can't find disk {name} in VM.") xcontrolleraddress = xml.find( ".//controller[@type='scsi']/address[@type='pci']") pci_path = 'pci-{domain:04x}:{bus:02x}:{slot:02x}.{function}'.format( bus=int(xcontrolleraddress.attrib['bus'], base=0), domain=int(xcontrolleraddress.attrib['domain'], base=0), function=int(xcontrolleraddress.attrib['function'], base=0), slot=int(xcontrolleraddress.attrib['slot'], base=0), ) # cf. # https://cgit.freedesktop.org/systemd/systemd/tree/src/udev/udev-builtin-path_id.c#n405 scsi_path = 'scsi-{controller}:{bus}:{target}:{unit}'.format( **xdiskaddress.attrib) return f'/dev/disk/by-path/{pci_path}-{scsi_path}' def start(self, domain): state, _ = domain.state() if libvirt.VIR_DOMAIN_SHUTOFF == state: logger.debug("Starting %s.", domain.name()) domain.create() state, _ = domain.state() if libvirt.VIR_DOMAIN_RUNNING != state: raise Exception("%s is not running" % domain.name()) PK!Uٕcornac/iaas/vcenter.py# # IaaS provider on top of vSphere API # # cf. https://code.vmware.com/apis/358/vsphere # import logging from pathlib import Path from urllib.parse import ( parse_qs, urlparse, ) from pyVim.connect import ( Disconnect, SmartConnect, SmartConnectNoSSL, ) from pyVmomi import ( vim, vmodl, ) from . import IaaS from ..ssh import RemoteShell logger = logging.getLogger(__name__) class vCenter(IaaS): @classmethod def connect(cls, url, config): url = urlparse(url) args = parse_qs(url.query) no_verify = args.get('no_verify', ['0']) == ['1'] connector = SmartConnectNoSSL if no_verify else SmartConnect si = connector( host=url.hostname, user=url.username, pwd=url.password, port=url.port or 443, ) return cls(si, config) def __init__(self, si, config): self.config = config # si stands for ServiceInstance. self.si = si def close(self): logger.debug("Disconnecting from vSphere.") Disconnect(self.si) def create_machine( self, name, storage_pool, data_size_gb=None, **kw): logger.debug("Creating %s specification.", name) datastore = self.find(storage_pool) origin = self.find(self.config['ORIGINAL_MACHINE']) clonespec = vim.vm.CloneSpec() clonespec.powerOn = True # Let's power on the VM for sysprep. clonespec.config = vim.vm.ConfigSpec() clonespec.config.deviceChange.append( build_data_disk_spec(origin, datastore, name, data_size_gb) ) clonespec.customization = build_customization_spec() clonespec.location = locspec = vim.vm.RelocateSpec() locspec.datastore = datastore locspec.pool = self.find(self.config['VCENTER_RESOURCE_POOL']) locspec.deviceChange.append(build_nic_spec(self.config['NETWORK'])) logger.debug("Cloning %s as %s.", origin.name, name) task = origin.Clone(folder=origin.parent, name=name, spec=clonespec) machine = self.wait_task(task) self.sysprep(machine) return machine def endpoint(self, machine): return machine.name + self.config['DNS_DOMAIN'] def find(self, path): obj = self.si.content.searchIndex.FindByInventoryPath(path) if obj is None: raise KeyError(path) return obj def guess_data_device_in_guest(self, machine): # It's quite hard to determine exact device inside linux guest from VM # spec. Let's assume things are simple and reproducible. cf. # https://communities.vmware.com/thread/298072 return "/dev/sdb" def start(self, machine): return self.wait_task(machine.PowerOn()) def stop(self, machine): return self.wait_task(machine.PowerOff()) def sysprep(self, machine): endpoint = self.endpoint(machine) logger.debug("Waiting for %s to come up.", endpoint) ssh = RemoteShell('root', endpoint) ssh.wait() vhelper = str(Path(__file__).parent / 'vhelper.sh') ssh.copy(vhelper, "/usr/local/bin/vhelper.sh") logger.debug("Preparing system") ssh(["/usr/local/bin/vhelper.sh", "sysprep"]) self.stop(machine) def wait_task(self, task): # From pyvmomi samples. collector = self.si.content.propertyCollector obj_specs = [vmodl.query.PropertyCollector.ObjectSpec(obj=task)] property_spec = vmodl.query.PropertyCollector.PropertySpec( type=vim.Task, pathSet=[], all=True) filter_spec = vmodl.query.PropertyCollector.FilterSpec() filter_spec.objectSet = obj_specs filter_spec.propSet = [property_spec] pcfilter = collector.CreateFilter(filter_spec, True) try: version = state = None while True: update = collector.WaitForUpdates(version) for filter_set in update.filterSet: for obj_set in filter_set.objectSet: task = obj_set.obj for change in obj_set.changeSet: if change.name == 'info': state = change.val.state elif change.name == 'info.state': state = change.val else: continue if state == vim.TaskInfo.State.success: return task.info.result elif state == vim.TaskInfo.State.error: raise task.info.error version = update.version finally: pcfilter.Destroy() def build_customization_spec(): # To automatically set hostname, we need to create the minimal # customization spec which is no less than: spec = vim.vm.customization.Specification() spec.globalIPSettings = vim.vm.customization.GlobalIPSettings() nicsetting = vim.vm.customization.AdapterMapping() nicsetting.adapter = ipsettings = vim.vm.customization.IPSettings() ipsettings.ip = vim.vm.customization.DhcpIpGenerator() spec.nicSettingMap.append(nicsetting) spec.identity = ident = vim.vm.customization.LinuxPrep() ident.hwClockUTC = True ident.timeZone = 'Europe/Paris' # … here you are, we can tell vCenter to set hostname according to VM # name. \o/ ident.hostName = vim.vm.customization.VirtualMachineNameGenerator() return spec def build_data_disk_spec(origin, datastore, name, size_gb): spec = vim.vm.device.VirtualDeviceSpec() spec.fileOperation = 'create' spec.operation = vim.vm.device.VirtualDeviceSpec.Operation.add spec.device = disk = vim.vm.device.VirtualDisk() disk.capacityInKB = size_gb * 1024 * 1024 scsi_controllers = list(filter( lambda d: hasattr(d, 'scsiCtlrUnitNumber'), origin.config.hardware.device )) disk.controllerKey = scsi_controllers[0].key disk.unitNumber = 1 disks = list(filter( lambda d: hasattr(d.backing, 'diskMode'), origin.config.hardware.device )) disk.key = disks[-1].key + 1 disk.unitNumber = disks[-1].unitNumber + 1 disk.backing = backing = vim.vm.device.VirtualDisk.FlatVer2BackingInfo() # noqa backing.fileName = f'[{datastore.name}] {name}/{name}-data.vmdk' backing.thinProvisioned = False backing.diskMode = 'persistent' return spec def build_nic_spec(network): spec = vim.vm.device.VirtualDeviceSpec() spec.operation = vim.vm.device.VirtualDeviceSpec.Operation.add spec.device = nic = vim.vm.device.VirtualVmxnet3() nic.addressType = 'assigned' nic.backing = vim.vm.device.VirtualEthernetCard.NetworkBackingInfo() nic.backing.useAutoDetect = False nic.backing.deviceName = network nic.connectable = vim.vm.device.VirtualDevice.ConnectInfo() nic.connectable.startConnected = True return spec PK!Rl/cornac/iaas/vhelper.sh#!/bin/bash -eu # #: Usage: vhelper.sh [ARG ...] #: #: Set of scripts to manage VM. # # See helper.sh help for more information # export LC_ALL=en_US.utf8 _log() { echo "$@" >&2 } help() { #: Show this message. sed -n '/^#:/{s,#: \?,,;p}' $0 >&2 local commands=$(grep -Po '^([^_].*)(?=\(\) \{)' $0 | sort) _log _log "AVAILABLE COMMANDS:" for comm in $commands ; do _log _log $comm $(grep -Po "$comm\(\\) \{ #: \K.+" $0) sed -n /^$comm/,/^\}/p $0 | sed -n '/\t#:/{s,#: \?,,;p}' >&2 done } pwgen() { #: Generate a random password. od -vN 16 -An -tx1 /dev/urandom | tr -d ' \n' } sysprep() { #: Refresh system after clone. # Inspired by virt-sysprep # (https://github.com/libguestfs/libguestfs/tree/master/sysprep) _log "Cleaning files and logs." rm -rf \ /etc/machine-id \ /etc/ssh/*_host_* \ /root/.bash_history /home/*/.bash_history \ /var/lib/rpm/__db* \ /var/lib/yum/uuid \ /var/log/anaconda \ /var/log/audit/audit.log \ /var/log/boot.log* \ /var/log/cron \ /var/log/dmesg* \ /var/log/grubby_prune_debug \ /var/log/maillog \ /var/log/messages \ /var/log/secure \ /var/log/wtmp \ /var/log/tuned/tuned.log \ /var/log/vmware-network*.log \ /var/log/yum.log \ ${NULL-} _log "Restarting SSHd" systemctl restart sshd _log "Randomizing root password." echo "root:$(pwgen)" | chpasswd } cmd=help if [ -n "${1-}" ] ; then cmd=$1; shift fi if type -t ${cmd} | grep -q function ; then $cmd "$@" else echo "Unknown command $1." >&2 exit 1 fi PK!*{2??cornac/operator/__init__.pyfrom .basic import BasicOperator __all__ = ['BasicOperator'] PK!ɤݨ cornac/operator/basic.py# Apply actions on infrastructure. # # The concept of Operator is borrowed from K8S. # import logging import pdb import sys from pathlib import Path from ..iaas import IaaS from ..ssh import RemoteShell logger = logging.getLogger(__name__) class BasicOperator(object): # Implementation using pghelper.sh def __init__(self, iaas, config): self.iaas = iaas self.config = config # Configuration keys: # # original_machine: name of the template machine with Postgres. def create_db_instance(self, command): name = f"cornac-{command['DBInstanceIdentifier']}" machine = self.iaas.create_machine( name=name, storage_pool=self.config['STORAGE_POOL'], data_size_gb=command['AllocatedStorage'], ) self.iaas.start(machine) address = self.iaas.endpoint(machine) shell = RemoteShell('root', address) shell.wait() logger.debug("Sending helper script.") local_helper = str(Path(__file__).parent / 'pghelper.sh') helper = '/usr/local/bin/pghelper.sh' shell.copy(local_helper, helper) # Formatting disk try: # Check whether Postgres VG is configured. shell(["test", "-d", "/dev/Postgres"]) except Exception: dev = self.iaas.guess_data_device_in_guest(machine) shell([helper, "prepare-disk", dev]) shell([ helper, "create-instance", command['EngineVersion'], command['DBInstanceIdentifier'], ]) shell([helper, "start"]) else: logger.debug("Reusing Postgres instance.") # Master user master = command['MasterUsername'] shell([ helper, "create-masteruser", master, command['MasterUserPassword'], ]) # Creating database bases = shell([helper, "psql", "-l"]) dbname = command['DBInstanceIdentifier'] if f"\n {dbname} " in bases: logger.debug("Reusing database %s.", dbname) else: logger.debug("Creating database %s.", dbname) shell([helper, "create-database", dbname, master]) return dict( Endpoint=dict(Address=address, Port=5432), DBInstanceIdentifier=dbname, ) def test_main(): # Hard coded real test case, for PoC development. from .. import config # What aws would send to REST API. command = { 'DBInstanceIdentifier': 'cli0', 'AllocatedStorage': 5, 'DBInstanceClass': 'db.t2.micro', 'Engine': 'postgres', 'EngineVersion': '11', 'MasterUsername': 'postgres', 'MasterUserPassword': 'C0nfidentiel', } with IaaS.connect(config['IAAS'], config) as iaas: operator = BasicOperator(iaas, config) response = operator.create_db_instance(command) logger.info( " psql -h %s -p %s -U %s -d %s", response['Endpoint']['Address'], response['Endpoint']['Port'], command['MasterUsername'], command['DBInstanceIdentifier'], ) if "__main__" == __name__: logging.basicConfig( format="%(levelname)5.5s %(message)s", level=logging.DEBUG, ) try: test_main() except pdb.bdb.BdbQuit: pass except Exception: logger.exception("Uhandled error:") pdb.post_mortem(sys.exc_info()[2]) PK! -cornac/operator/pghelper.sh#!/bin/bash -eu # #: Usage: helper.sh [ARG ...] #: #: Opinionated helper to manage Postgres. # # Manage a single instance host with hardcoded value. # # The instance is named `Managed Postgres`. It's data are all in a disk with # data and wal isolated in logical volumes. Volumes are mounted at # ~postgres/managed/{data,wal}_mnt/. A systemd service called `postgres-managed` # is configured. # # See helper.sh help for more information # export LC_ALL=en_US.utf8 _log() { echo "$@" >&2 } clean-disk() { #: Unmount and destroy volumes. Reset disk partition table. local dev=$(readlink -e $1); shift if ! [ -b "$dev" ] ; then _log "Disk $dev not found." return 1 fi for volume in /dev/mapper/Postgres-{DATA,LOG,WAL} ; do if mount | grep -q $volume ; then umount $volume fi done sed -i~ /Postgres-/d /etc/fstab if [ -d /dev/Postgres ] ; then vgremove -qq --force Postgres fi dd if=/dev/zero of=$dev bs=513 count=64 rm -rf ~postgres/managed _log "$dev cleaned." } create-database() { #: [CREATEDB_ARG ...] #: Create database with some defaults. local name=$1; shift local owner=$1; shift sudo -u postgres createdb --locale en_US.UTF-8 -O "${owner}" "$@" "${name}" } create-masteruser() { #: [CREATEUSER_ARG ...] #: Create or update master user. Assign all databases to it. local name=$1; shift local password=$1; shift # Naïve escape of parameters printf -v name_e "%q" "${name}" printf -v password_e "%q" "${password}" if ! psql -tc "SELECT 'EXISTS' FROM pg_roles WHERE rolname = '${name_e}';" | grep -q EXISTS ; then sudo -iu postgres createuser --no-createdb --superuser "$@" "$name" fi psql <<-EOF ALTER ROLE "${name_e}" WITH PASSWORD '${password_e}'; EOF for dbname in $(psql -tc 'SELECT datname FROM pg_database WHERE datistemplate IS FALSE;'); do echo "ALTER DATABASE "'"'"${dbname}"'"'" OWNER TO "'"'"${name_e}"'"'";" done | psql } create-instance() { #: #: Initialize and enable the managed Postgres instance. local pgversion=$1; shift local bindir=/usr/pgsql-$pgversion/bin if ! [ -d $bindir ] ; then _log "Unknown version $pgversion." return 1 fi PGDATA=$(readlink -m ~postgres/managed/data_mnt/data) PGWAL=$(readlink -m ~postgres/managed/wal_mnt/wal) mkdir --parent $PGDATA $PGWAL chown -R postgres: ~postgres/managed sudo -u postgres $bindir/initdb \ --auth-local peer \ --auth-host md5 \ --pgdata $PGDATA \ --locale en_US.UTF-8 \ --waldir $PGWAL \ ${NULL-} cat >> $PGDATA/postgresql.conf <<-EOF include_dir 'conf.d' EOF _log "Accepting connection from world." sed -i s,127.0.0.1/32,0.0.0.0/0,g $PGDATA/pg_hba.conf sudo -u postgres mkdir -p $PGDATA/conf.d sudo -u postgres tee $PGDATA/conf.d/00-managed.conf >/dev/null <<-EOF # # Managed file. Don't edit manually. # archive_command = '/bin/true' archive_mode = on checkpoint_completion_target = 0.9 default_transaction_isolation = 'read committed' hot_standby = on lc_messages = C listen_addresses = '*' log_autovacuum_min_duration = 0 log_checkpoints = on log_connections = on log_destination = 'syslog' log_disconnections = on log_line_prefix = '[%p]: [%l-1] db=%d,user=%u,app=%a,client=%h ' log_lock_waits = on log_min_duration_statement = 3s log_min_messages = notice log_statement = ddl log_temp_files = 0 logging_collector = off pg_stat_statements.max = 10000 pg_stat_statements.track = all shared_preload_libraries = pg_stat_statements syslog_facility = 'local0' wal_level = hot_standby EOF _log "Setting up systemd service." cat >/etc/systemd/system/postgres-managed.service <<-EOF [Unit] Description=Managed PostgreSQL $pgversion database server Documentation=https://www.postgresql.org/docs/$pgversion/static/ After=syslog.target After=network.target [Service] Type=notify User=postgres Group=postgres # Note: avoid inserting whitespace in these Environment= lines, or you may # break postgresql-setup. # Location of database directory Environment=PGDATA=${PGDATA} # Where to send early-startup messages from the server (before the logging # options of postgresql.conf take effect) # This is normally controlled by the global default set by systemd # StandardOutput=syslog # Disable OOM kill on the postmaster OOMScoreAdjust=-1000 Environment=PG_OOM_ADJUST_FILE=/proc/self/oom_score_adj Environment=PG_OOM_ADJUST_VALUE=0 ExecStartPre=$bindir/postgresql-$pgversion-check-db-dir \${PGDATA} ExecStart=$bindir/postmaster -D \${PGDATA} ExecReload=/bin/kill -HUP \$MAINPID KillMode=mixed KillSignal=SIGINT [Install] WantedBy=multi-user.target EOF systemctl daemon-reload systemctl enable postgres-managed.service _log "Opening port 5432." firewall-cmd --quiet --zone public --add-port "5432/tcp" --permanent firewall-cmd --quiet --reload _log "Managed instance created." } delete-db-instance() { #: Reset managed instance service and data. All data are lost. if systemctl is-active postgres-managed.service >&/dev/null ; then _log "Deleting systemd unit." systemctl stop postgres-managed.service systemctl disable postgres-managed.service systemctl reset-failed postgres-managed.service ||: fi rm -f /etc/systemd/system/postgres-managed.service \ systemctl daemon-reload _log "Deleting files and directory." rm -rf ~postgres/managed/*_mnt/{data,wal} _log "Managed instance deleted." } help() { #: Show this message. sed -n '/^#:/{s,#: \?,,;p}' $0 >&2 local commands=$(grep -Po '^([^_].*)(?=\(\) \{)' $0 | sort) _log _log "AVAILABLE COMMANDS:" for comm in $commands ; do _log _log $comm $(grep -Po "$comm\(\\) \{ #: \K.+" $0) sed -n /^$comm/,/^\}/p $0 | sed -n '/\t#:/{s,#: \?,,;p}' >&2 done } prepare-disk() { #: #: Partition, format and mount disk for managed instance. local dev=$(readlink -m $1); shift if ! [ -b "$dev" ] ; then _log "Disk $dev not found." return 1 fi if sfdisk --dump $dev | grep -q . ; then _log "Disk already partitionned." return 1 fi total_sectors=$(blockdev --getsz $dev) part_sectors=$((total_sectors - 63)) _log "Writing partition table." # Force because we are using virtual disk. sfdisk --force $dev >&2 <<-EOF unit: sectors /dev/vdb1 : start= 63, size= ${part_sectors}, Id=8e EOF _log "Waiting for block device to appear." sleep 1 part=${dev}1 if ! [ -b "$part" ] ; then _log "Failed to get partition." return 1 fi _log "Creating logical volumes." pvcreate $part vgcreate Postgres $part lvcreate --yes --name "DATA" --extents 90%VG Postgres lvcreate --yes --name "WAL" --extents 10%VG Postgres _log "Formatting logical volumes." mkfs.ext4 -q -m 1 -U $(uuidgen) -L "Postgres Data" /dev/mapper/Postgres-DATA mkfs.ext4 -q -m 1 -U $(uuidgen) -L "Postgres WAL" /dev/mapper/Postgres-WAL local home=$(readlink -e ~postgres) _log "Mounting volumes in $home/managed" mkdir -p --mode 0750 $home/managed/{data,wal}_mnt/ chown -R postgres: ~postgres/managed cat >>/etc/fstab <<-EOF /dev/mapper/Postgres-DATA $home/managed/data_mnt ext4 defaults 0 0 /dev/mapper/Postgres-WAL $home/managed/wal_mnt ext4 defaults 0 0 EOF mount $home/managed/data_mnt mount $home/managed/wal_mnt } psql() { #: [PSQL_ARG ...] #: Execute psql on managed instance. sudo -iu postgres psql --no-password --set ON_ERROR_STOP=1 --quiet "$@" } start() { #: Starts Postgres managed instance systemctl start postgres-managed.service systemctl status postgres-managed.service } cmd=help if [ -n "${1-}" ] ; then cmd=$1; shift fi if type -t ${cmd} | grep -q function ; then $cmd "$@" else echo "Unknown command $1." >&2 exit 1 fi PK!X cornac/ssh.pyimport logging import shlex import socket import subprocess import tenacity logger = logging.getLogger(__name__) def logged_cmd(cmd, *a, **kw): logger.debug("Running %s", ' '.join([shlex.quote(i) for i in cmd])) child = subprocess.Popen( cmd, *a, **kw, stderr=subprocess.PIPE, stdout=subprocess.PIPE, ) err = [] for line in child.stderr: line = line.strip().decode('utf-8') err.append(line) logger.debug("<<< %s", line) out = child.stdout.read().decode('utf-8') returncode = child.wait() if returncode != 0: raise subprocess.CalledProcessError( returncode=returncode, cmd=cmd, output=out, stderr='\n'.join(err), ) return out remote_retry = tenacity.retry( wait=tenacity.wait_chain(*[ tenacity.wait_fixed(i) for i in range(12, 1, -1) ]), stop=tenacity.stop_after_delay(300), reraise=True) @remote_retry def wait_machine(address, port=22): address = socket.gethostbyname(address) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((address, port)) sock.close() class RemoteShell(object): ssh_options = [ # For now, just accept any key from remote hosts. "-o", "UserKnownHostsFile=/dev/null", "-o", "StrictHostKeyChecking=no", ] def __init__(self, user, host): self.ssh = ["ssh", "-q", "-l", user, host] self.scp_target_prefix = f"{user}@{host}:" def __call__(self, command): try: return logged_cmd( self.ssh + self.ssh_options + [shlex.quote(i) for i in command], ) except subprocess.CalledProcessError as e: # SSH shows stderr in stdout. raise Exception(e.stdout) def copy(self, src, dst): try: return logged_cmd( ["scp"] + self.ssh_options + [src, self.scp_target_prefix + dst] ) except subprocess.CalledProcessError as e: raise Exception(e.stderr) @remote_retry def wait(self): # Just ping with true to trigger SSH. This method allows Host rewrite # in ssh_config. self(["true"]) PK!c cornac/web.pyimport functools import logging import pdb import sys from concurrent.futures import ThreadPoolExecutor from textwrap import dedent from uuid import uuid4 from flask import Blueprint, abort, current_app, make_response, request from jinja2 import Template from .core.model import DBInstance, db from .iaas import IaaS from .operator import BasicOperator logger = logging.getLogger(__name__) rds = Blueprint('rds', __name__) def fallback(e): # By default, log awscli requests. current_app.logger.info( "%s %s %s", request.method, request.path, dict(request.form)) return make_response('Not Found', 404) @rds.route("/rds", methods=["POST"]) def main(): # Bridge RDS service and Flask routing. RDS actions are not RESTful. payload = dict(request.form) action_name = payload.pop('Action') try: action = getattr(RDS, action_name) except AttributeError: logger.warning("Unknown RDS action: %s.", action_name) logger.debug("payload=%r", payload) abort(400) return make_response_xml( action=action_name, result=action(payload), requestid=uuid4(), ) WORKERPOOL = ThreadPoolExecutor(max_workers=4) def task(func): # Wraps a function to be executed in a concurrent executor for logging and # error handling. @functools.wraps(func) def task_wrapper(app, *a, **kw): logger.info("Running task %s.", func.__name__) try: with app.app_context(): ret = func(*a, **kw) logger.info("Task %s done.", func.__name__) return ret except pdb.bdb.BdbQuit: pass except Exception: logger.exception("Unhandled exception in background task:") if False: pdb.post_mortem(sys.exc_info()[2]) raise def send_task(*a, **kw): # Get effective current app, and pass it to wrapper. The wrapper will # set app as current app for the worker thread. app = current_app._get_current_object() WORKERPOOL.submit(task_wrapper, app, *a, **kw) task_wrapper.send = send_task return task_wrapper @task def create_db_task(instance_id): # Background task to trigger operator and update global in-memory database. instance = DBInstance.query.filter(DBInstance.id == instance_id).one() with IaaS.connect(current_app.config['IAAS'], current_app.config) as iaas: operator = BasicOperator(iaas, current_app.config) response = operator.create_db_instance(instance.create_command) instance.status = 'running' instance.attributes = response db.session.commit() class RDS(object): # RDS-like service. # # Each method corresponds to a well-known RDS action, returning result as # XML snippet. default_create_command = dict( EngineVersion='11', ) @classmethod def CreateDBInstance(cls, command): command = dict(cls.default_create_command, **command) command['AllocatedStorage'] = int(command['AllocatedStorage']) instance = DBInstance() instance.identifier = command['DBInstanceIdentifier'] instance.status = 'creating' instance.create_command = command db.session.add(instance) db.session.commit() create_db_task.send(instance.id) return InstanceEncoder(instance).as_xml() INSTANCE_LIST_TMPL = Template(dedent("""\ {% for instance in instances %} {{ instance.as_xml() | indent(2) }} {% endfor %} """), trim_blocks=True) @classmethod def DescribeDBInstances(cls, command): instances = DBInstance.query.all() return cls.INSTANCE_LIST_TMPL.render( instances=[InstanceEncoder(i) for i in instances]) RESPONSE_TMPL = Template("""\ <{{ action }}Response xmlns="http://rds.amazonaws.com/doc/2014-10-31/"> <{{ action }}Result> {{ result | indent(4) }} {{ requestid }} """) def make_response_xml(action, requestid, result): # Wraps result XML snippet in response XML envelope. xml = RESPONSE_TMPL.render(**locals()) response = make_response(xml) response.content_type = 'text/xml; charset=utf-8' return response class InstanceEncoder: # Adapt DBInstance object to RDS XML response. XML_SNIPPET_TMPL = Template(dedent("""\ {{ status }} {{ identifier }} postgres {% if endpoint_address %}
{{ endpoint_address }}
5432
{% endif %} postgres
"""), trim_blocks=True) def __init__(self, instance): self.instance = instance def as_xml(self): return self.XML_SNIPPET_TMPL.render(**self.instance.__dict__) PK!Hk-0)pgcornac-0.0.1.dist-info/entry_points.txtN+I/N.,()J/KLPz9Vy%Ey%\\PK!HnHTUpgcornac-0.0.1.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H?)]!pgcornac-0.0.1.dist-info/METADATAMN09 5VBAVA DJ#z "zŘ OY}3 G5x7JA`u)3 // BkpKfY,!B6%P:lG(ѥd v^74n< k>GExNdltcԠ*per<)R$BxJ||p'Yޫ= n+6kg,~0SgKZ+S1rAۂUߖ热+"UN`l\~)m]6q /ΕJ's|t'ZDo *z= 鯩 -PK!Hrjpgcornac-0.0.1.dist-info/RECORDuɖJ}? VC2/z! $2 3PGGJJH g>g_ϤL(ʩAKi컢o:5+c)GxDg\nU;D3!`pUf廅k7&U::st`fyj `kH~(]Еނv盜PfleXjd.2例_%XcT'nCR5?q\VK|<>>y߄jN9}z#cv]qb.nF pZƦSn:ɦ-:w A )㵓[ӓa"Q|1L7l]svK=APoc¥ܕ~ 1>42N:U86buu7ϧW.>8"oWDqpg'OͶ+k}h'p%C5Dg+l[|f^Nɮ;s;[)?NQ}h߰%K@PK!h__cornac/__init__.pyPK!Aܨ cornac/cli.pyPK!bcornac/core/__init__.pyPK!8C--cornac/core/model.pyPK!wcornac/core/schema/000-init.sqlPK!(wP$5cornac/core/schema/001-instances.sqlPK!^cornac/core/schema/__init__.pyPK!dsscornac/core/schema/migrator.pyPK!~i,,i$cornac/default_config.pyPK!5`k--)cornac/iaas/__init__.pyPK!ZZ-.cornac/iaas/libvirt.pyPK!UٕEcornac/iaas/vcenter.pyPK!Rl/큄acornac/iaas/vhelper.shPK!*{2??gcornac/operator/__init__.pyPK!ɤݨ Lhcornac/operator/basic.pyPK! -*vcornac/operator/pghelper.shPK!X fcornac/ssh.pyPK!c lcornac/web.pyPK!Hk-0)pgcornac-0.0.1.dist-info/entry_points.txtPK!HnHTUpgcornac-0.0.1.dist-info/WHEELPK!H?)]!pgcornac-0.0.1.dist-info/METADATAPK!Hrj"pgcornac-0.0.1.dist-info/RECORDPKe