PKILRsecretly/__init__.py""" Perform an asynchronous task with a secret. """ from ._impl import secretly __all__ = [ "secretly", ] __version__ = '0.2' PK7IL/ / secretly/_impl.pyfrom __future__ import unicode_literals, print_function import os import sys import getpass import attr import keyring from twisted.python.procutils import which from twisted.protocols.basic import LineReceiver from twisted.internet.defer import ( Deferred, maybeDeferred, inlineCallbacks, returnValue) from twisted.internet.endpoints import ProcessEndpoint from twisted.internet.protocol import Factory from twisted.internet.task import react, deferLater from twisted.internet.utils import getProcessOutputAndValue @attr.s class AssuanResponse(object): """ Record encapsulating a response from pinentry. """ data = attr.ib() debugInfo = attr.ib() class AssuanError(Exception): """ Record encapsulating a problem from pinentry (probably: the user hit 'cancel'). """ class SimpleAssuan(LineReceiver, object): """ Simple Assuan protocol speaker. """ delimiter = b'\n' def __init__(self): self._ready = False self._dq = [] self._bufferedData = [] def connectionMade(self): """ Connection established; work around https://twistedmatrix.com/trac/ticket/6606 gfdi. """ self.transport.disconnecting = False def issueCommand(self, command, *args): """ Issue the given Assuan command and return a Deferred that will fire with the response. """ result = Deferred() self._dq.append(result) self.sendLine(b" ".join([command] + list(args))) return result def _currentResponse(self, debugInfo): """ Pull the current response off the queue. """ bd = b''.join(self._bufferedData) self._bufferedData = [] return AssuanResponse(bd, debugInfo) def lineReceived(self, line): """ A line was received. """ if line.startswith(b"#"): # ignore it return if line.startswith(b"OK"): # if no command issued, then just 'ready' if self._ready: self._dq.pop(0).callback(self._currentResponse(line)) else: self._ready = True if line.startswith(b"D "): self._bufferedData.append(line[2:].replace(b"%0A", b"\r") .replace(b"%0D", b"\n") .replace(b"%25", b"%")) if line.startswith(b"ERR"): self._dq.pop(0).errback(AssuanError(line)) class PinentryNotFound(Exception): """ Raised when a C{pinentry} program does not exist. """ @attr.s(frozen=True) class Pinentry(object): """ A C{pinentry} that can prompt you for a password. @ivar _name: The name of the C{pinentry} program. Must be in C{PATH} or an absolute path to an executable. @type _name: L{str} @ivar _argumentFactory: A callable that accepts no arguments and returns C{argv[1:]}. Can raise L{PinentryNotFound} or L{OSError} to cause this C{pinentry} to be ignored. @type _argumentFactory: L{callable} """ _name = attr.ib() _argumentFactory = attr.ib(default=lambda: ttynameArgument()) def argv(self, _which=which): """ Return an argv list suitable for passing to L{ProcessEndpoint}. @return: An argv L{list} suitable for passing to L{ProcessEndpoint} @raises: L{PinentryNotFound} if the requested pinentry program """ argv = _which(self._name)[:1] if not argv: raise PinentryNotFound(self._name) argv.extend(self._argumentFactory()) return argv @inlineCallbacks def askForPassword(self, reactor, prompt, title, description): """ The documentation appears to be here only: https://github.com/gpg/pinentry/blob/287d40e879f767dbcb3d19b3629b872c08d39cf4/pinentry/pinentry.c#L1444-L1464 TODO: multiple backends for password-prompting. """ argv = self.argv() assuan = yield (ProcessEndpoint(reactor, argv[0], argv, os.environ.copy()) .connect(Factory.forProtocol(SimpleAssuan))) try: yield assuan.issueCommand(b"SETPROMPT", prompt.encode("utf-8")) yield assuan.issueCommand(b"SETTITLE", title.encode("utf-8")) yield assuan.issueCommand(b"SETDESC", description.encode("utf-8")) response = yield assuan.issueCommand(b"GETPIN") finally: assuan.issueCommand(b"BYE") returnValue(response.data.decode("utf-8")) @attr.s class GetPassAsker(object): """ Fallback implementation of L{Pinentry} that just blocks. """ @inlineCallbacks def askForPassword(self, reactor, prompt, title, description): returnValue( (yield getpass.getpass('\n'.join([description, prompt + " "]))) ) def ttynameArgument( _stdout=sys.stdout.fileno(), ): """ C{pinentry-curses} requires C{--ttyname} be set to the process' controlling terminal so it can draw its dialogs. This function either returns a list that sets C{--ttyname} to the terminal that underlies the calling process' stdout or raises L{OSError} if stdout has no terminal. """ return ['--ttyname', os.ttyname(_stdout)] @inlineCallbacks def call(exe, *argv): """ Run a command, returning its output, or None if it fails. """ exes = which(exe) if not exes: returnValue(None) stdout, stderr, value = yield getProcessOutputAndValue( exes[0], argv, env=os.environ.copy() ) if value: returnValue(None) returnValue(stdout.decode('utf-8').rstrip('\n')) @inlineCallbacks def choosePinentry(): """ Choose a C{pinentry} that can prompt you for a secret. @return: An argv list suitable for passing to L{ProcessEndpoint} @raises: L{RuntimeError} if no C{pinentry} is available. @see: U{https://www.gnupg.org/documentation/manuals/gnupg/Common-Problems.html} """ pinentries = [] if 'PINENTRY' in os.environ: pinentries.append(Pinentry(os.environ['PINENTRY'])) else: mgrd = yield call('launchctl', 'managername') if mgrd == 'Aqua': pinentries.extend([ Pinentry( '/usr/local/MacGPG2/libexec/pinentry-mac.app' '/Contents/MacOS/pinentry-mac'), Pinentry('pinentry-mac'), ]) if 'DISPLAY' in os.environ: pinentries.extend([ Pinentry('pinentry-gnome3'), Pinentry('pinentry-x11') ]) pinentries.extend([ Pinentry('pinentry-curses'), Pinentry('pinentry'), ]) for pinentry in pinentries: try: pinentry.argv() except (PinentryNotFound, OSError): continue else: return pinentry else: return GetPassAsker() @inlineCallbacks def secretly(reactor, action, system=None, username=None, prompt="Password:"): """ Call the given C{action} with a secret value. @return: a L{Deferred} that fires with C{action}'s result, or L{NoSecretError} if no secret can be retrieved. """ if system is None: system = action.__module__ if system == '__main__': system = os.path.abspath(sys.argv[0]) if username is None: username = getpass.getuser() while True: secret = keyring.get_password(system, username) if secret is not None: break pinentry = yield choosePinentry() keyring.set_password( system, username, (yield pinentry.askForPassword( reactor, prompt, "Enter Password", "Password Prompt for {username}@{system}" .format(system=system, username=username))) ) yield maybeDeferred(action, secret) if __name__ == '__main__': @react def main(reactor): return secretly(reactor, lambda pw: deferLater(reactor, 3.0, print, 'pw:', pw)) PKIL*ugGGsecretly/test/test_impl.pyimport attr import attr.validators import os import sys from twisted.trial import unittest from .. import _impl class PinentryTests(unittest.SynchronousTestCase): """ Tests for L{_impl.Pinentry}. """ def setUp(self): self.pinentry = _impl.Pinentry(name="pinentry") def test_argvRaisesWhenNotFound(self): """ Calling L{_impl.Pinentry.argv} raises L{_impl.PinentryNotFound} when the C{pinentry} does not exist. """ self.assertRaises( _impl.PinentryNotFound, self.pinentry.argv, _which=lambda arg: [], ) def test_argvReturnsListWhenFoundWithDefaultArgumentFactory(self): """ Calling L{_impl.Pinentry.argv} returns a list containing the path to its C{pinentry} program with the default C{argumentFactory}. """ path = ["/path/to/pinentry"] self.assertEqual(self.pinentry.argv(_which=lambda arg: path), path) def test_argvReturnsListWhenFoundWithCustomArgumentFactory(self): """ Calling L{_impl.Pinentry.argv} returns a list containing the first available path to its C{pinentry} program and the arguments provided by the C{argumentFactory}. """ def argumentFactory(): return ["a", "b"] path = "/path/to/pin/entry" def which(arg): return [path, "ignored"] pinentry = _impl.Pinentry( name="pinentry", argumentFactory=argumentFactory ) self.assertEqual( pinentry.argv(_which=which), [path] + argumentFactory(), ) class TTYNameArgumenIntegrationTests(unittest.SynchronousTestCase): """ Integration tests for L{_impl.ttynameArgument}. These are integration tests because they exercise L{_impl.ttynameArgument}'s real implementation. """ def test_stdoutNotTTY(self): """ L{_impl.ttynameArgument} raises L{OSError} when stdout is not a tty. """ read, write = os.pipe() self.addCleanup(os.close, read) self.addCleanup(os.close, write) self.assertRaises( OSError, _impl.ttynameArgument, _stdout=write, ) def test_ttyPathReturned(self): """ The path to stdout's tty is returned as the argument to C{--ttyname}. """ master, slave = os.openpty() self.addCleanup(os.close, master) self.addCleanup(os.close, slave) argv = _impl.ttynameArgument(_stdout=master) self.assertEqual(len(argv), 2) argument, value = argv self.assertEqual("--ttyname", argument) with open(value, "rb") as tty: self.assertTrue(os.isatty(tty.fileno())) @attr.s class PinEntryArgvRaises(object): """ An orchestrator for L{FakePinentry}s that causes its L{FakePinentry.argv} method to raise an exception. """ _raises = attr.ib(attr.validators.instance_of( (_impl.PinentryNotFound, OSError))) def _argv(self): """ Raise our exception. """ raise self._raises @attr.s class PinEntryArgvReturns(object): """ An orchestrator for L{FakePinentry}s that causes its L{FakePinentry.argv} method to return a value. """ _returns = attr.ib() @_returns.validator def check(self, attribute, value): listOfStrs = isinstance(value, list) and all( isinstance(el, str) for el in value) if not listOfStrs: raise ValueError("Must be list of strings.") def _argv(self): """ Raise our exception. @return: The L{list} passed to our initializer. """ return self._returns @attr.s class FakePinentry(object): """ A fake L{Pinentry}. """ _orchestrator = attr.ib(validator=attr.validators.instance_of( (PinEntryArgvRaises, PinEntryArgvReturns))) def argv(self): """ Delegates to our orchestrator's argv. @return: Whatever the orchestrator returns. """ return self._orchestrator._argv() class VerifyFakePinentryIntegrationTests(unittest.SynchronousTestCase): """ Test that L{FakePinentry}'s behavior matches L{_impl.Pinentry}. These are integration tests because they exercise L{_impl.Pinentry}'s real implementation. """ def test_pinentryNotFound(self): """ L{FakePinentry.argv} simulates L{Pinentry.argv}'s behavior when a C{pinentry} is not found. """ missing = "missing" fake = FakePinentry( PinEntryArgvRaises(_impl.PinentryNotFound(missing))) real = _impl.Pinentry(missing) fakeException = self.assertRaises(_impl.PinentryNotFound, fake.argv) self.patch(os, "environ", {}) realException = self.assertRaises(_impl.PinentryNotFound, real.argv) self.assertEqual(str(fakeException), str(realException)) def test_argumentFactoryRaisesOSError(self): """ L{FakePinentry.argv} simulates L{Pinentry.argv}'s behavior when the argument factory raises L{OSError}. """ error = OSError("failed", 11) fake = FakePinentry(PinEntryArgvRaises(error)) def raisesOSError(): raise error real = _impl.Pinentry(sys.executable, argumentFactory=raisesOSError) fakeException = self.assertRaises(OSError, fake.argv) realException = self.assertRaises(OSError, real.argv) self.assertEqual(str(fakeException), str(realException)) def test_returnsList(self): """ L{FakePinentry.argv} simulates L{Pinentry.argv} when the executable is found. """ fake = FakePinentry(PinEntryArgvReturns([sys.executable])) real = _impl.Pinentry(sys.executable) self.assertEqual(fake.argv(), real.argv()) def test_canOnlyReturnListOfStrings(self): """ L{FakePinentry.argv} can only return a list of strings. """ self.assertRaises(ValueError, PinEntryArgvReturns, 1) self.assertRaises(ValueError, PinEntryArgvReturns, [1]) class ChoosePinEntryTests(unittest.SynchronousTestCase): """ Tests for L{_impl.choosePinentry}. """ def test_noPinentriesFound(self): """ L{RuntimeError} is raised if no C{pinentry} programs are found. """ self.assertRaises(RuntimeError, _impl.choosePinentry, []) def test_skipPinentryNotFound(self): """ L{PinEntry}s that whose L{PinEntry.argv} method raises L{PinentryNotFound} are skipped. """ argv = ["argv0", "argv1"] pinentries = [ FakePinentry(PinEntryArgvRaises(_impl.PinentryNotFound)), FakePinentry(PinEntryArgvReturns(argv)), ] self.assertEqual(_impl.choosePinentry(pinentries), argv) def test_skipOSError(self): """ L{PinEntry}s that whose L{PinEntry.argv} method raises L{OSError} are skipped. """ argv = ["argv0", "argv1"] pinentries = [ FakePinentry(PinEntryArgvRaises(OSError)), FakePinentry(PinEntryArgvReturns(argv)), ] self.assertEqual(_impl.choosePinentry(pinentries), argv) def test_returnsFirstFound(self): """ The first found L{PinEntry}s is returned. """ argv = ["argv0", "argv1"] otherArgv = ["otherArgv0", "otherArgv1"] pinentries = [ FakePinentry(PinEntryArgvReturns(argv)), FakePinentry(PinEntryArgvReturns(otherArgv)), ] self.assertEqual(_impl.choosePinentry(pinentries), argv) PKfJ$/00secretly-0.2.dist-info/LICENSEThe MIT License (MIT) Copyright (c) 2017 Glyph Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HxyUbsecretly-0.2.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,Q034 /, (-JLR()*M ILR(4KM̫#DPK!HZBFsecretly-0.2.dist-info/METADATA]MO0 8 HcuqD^њ[=)*;~5 6(X|P`㝆В:Im {,.x# hs-ډ ˲͍ޖm<{659|VjArM 7{'GBE 42}fo(M@6,!jF^ꕾ Ci@>P Ƶv\ )naO308УSPK!HQ1W secretly-0.2.dist-info/RECORDun@@ѽ>a\t TtP A(/׷im˓ˢY7}ZUDZ4f҇{a|U 19z`5݈.ѱ\Zϖ-K fM€,W;m xmv Z%_}v_Xdn9Mo&Ѿ2aR˹f,&/!i#X FOLN=T*q5(!@q*2],9nO'cYxǺ:BJg!'f2φl~$'*ۀ^75r8S\J2snzK!d]e"B?۳(rS cwha PKILRsecretly/__init__.pyPK7IL/ / secretly/_impl.pyPKIL*ugGG!secretly/test/test_impl.pyPKfJ$/00?secretly-0.2.dist-info/LICENSEPK!HxyUbCsecretly-0.2.dist-info/WHEELPK!HZBFDsecretly-0.2.dist-info/METADATAPK!HQ1W Esecretly-0.2.dist-info/RECORDPKSG