PK DF documint/__init__.pyPK YF" documint/mediumbox.py# From lp:~glyph/+junk/amphacks """ An argument type for sending medium-sized strings (more than 64k, but small enough that they still fit into memory and don't require streaming). """ from cStringIO import StringIO from itertools import count from twisted.protocols.amp import AMP, Argument, Command CHUNK_MAX = 0xffff class BigString(Argument): def fromBox(self, name, strings, objects, proto): value = StringIO() value.write(strings.get(name)) for counter in count(2): chunk = strings.get("%s.%d" % (name, counter)) if chunk is None: break value.write(chunk) objects[name] = value.getvalue() def toBox(self, name, strings, objects, proto): value = StringIO(objects[name]) firstChunk = value.read(CHUNK_MAX) strings[name] = firstChunk counter = 2 while True: nextChunk = value.read(CHUNK_MAX) if not nextChunk: break strings["%s.%d" % (name, counter)] = nextChunk counter += 1 class Send(Command): arguments = [('big', BigString())] class Example(AMP): @Send.responder def gotBig(self, big): print 'Got a big input', len(big) f = file("OUTPUT", "wb") f.write(big) f.close() return {} def main(argv): from twisted.internet import reactor from twisted.internet.protocol import Factory, ClientCreator if argv[1] == 'client': filename = argv[2] def connected(result): result.callRemote(Send, big=file(filename).read()) ClientCreator(reactor, AMP).connectTCP("localhost", 4321).addCallback( connected) reactor.run() elif argv[1] == 'server': f = Factory() f.protocol = Example reactor.listenTCP(4321, f) reactor.run() else: print "Specify 'client' or 'server'." if __name__ == '__main__': from sys import argv as arguments main(arguments) PK YF documint/util.py""" I{Documint} utility functions. """ from StringIO import StringIO from lxml import etree from twisted.internet.defer import maybeDeferred def embedStylesheets(markup, stylesheets, removeStylesheets=False): """ Embed external stylesheets in I{XHTML} markup. @type markup: L{str} @param markup: I{XHTML} markup UTF-8 byte data. @type stylesheets: I{iterable} of L{str} @param stylesheets: Iterable of stylesheet UTF-8 byte data to embed. @rtype: L{str} @return: I{XHTML} UTF-8 byte data with embedded stylesheets. """ tree = etree.parse(StringIO(markup)) namespaces = {'xhtml': 'http://www.w3.org/1999/xhtml'} head = tree.xpath('//xhtml:head', namespaces=namespaces)[0] for stylesheet in stylesheets: e = etree.SubElement( head, '{http://www.w3.org/1999/xhtml}style', attrib=dict(type='text/css')) e.text = stylesheet.decode('utf-8') if removeStylesheets: stylesheetLinks = head.xpath( 'xhtml:link[@rel="stylesheet"]', namespaces=namespaces) for e in stylesheetLinks: head.remove(e) return etree.tostring(tree, encoding='utf-8') def defertee(result, func, *a, **kw): """ Call C{func}, with positional and keyword arguments, as a side effect, and return C{result}. Useful in combination with C{Deferred.addCallback} when you wish to perform an operation in the callback chain but wish to retain the result for subsequent callbacks. """ d = maybeDeferred(func, result, *a, **kw) d.addCallback(lambda ignored: result) return d __all__ = ['embedStylesheets'] PK XF(.~? ? documint/client.py""" I{Documint} I{AMP} client. """ from twisted.internet import task from twisted.internet.endpoints import clientFromString, connectProtocol from twisted.protocols.amp import AMP from documint.commands import Certify, Render def render(protocol, markup, stylesheets): """ Execute the L{Render} AMP command. """ return protocol.callRemote( Render, markup=markup, stylesheets=stylesheets) def certify(protocol, data, contentType, reason, location): """ Execute the L{Certify} AMP command. """ return protocol.callRemote( Certify, data=data, contentType=contentType, reason=reason, location=location) __all__ = ['render', 'certify'] if __name__ == '__main__': import sys def main(reactor, markup, styles): def _readFile(name): with file(name, 'rb') as fd: return fd.read() def _writeResponse(response): sys.stdout.write(response['data']) sys.stdout.flush() endpoint = clientFromString(reactor, 'tcp:host=127.0.0.1:port=8750') d = connectProtocol(endpoint, AMP()) d.addCallback(render, _readFile(markup), map(_readFile, styles)) d.addCallback(_writeResponse) return d task.react(main, (sys.argv[1], sys.argv[2:])) PK XFV6 documint/commands.py""" Documint I{AMP} command and protocol definitions. """ from lxml import etree from twisted.internet.defer import succeed from twisted.protocols import amp from twisted.python.failure import Failure from documint.errors import ( ExternalProcessError, RemoteExternalProcessError, UnsupportedContentType, XMLSyntaxError) from documint.extproc.css2xslfo import renderXHTML from documint.extproc.neon import failingPDFSign from documint.mediumbox import BigString from documint.util import embedStylesheets class Render(amp.Command): """ Render I{XHTML} markup and I{CSS} documents to a I{PDF} document. Accepts the following arguments: * C{markup}, a L{str}, that is the I{XHTML} markup byte data; * C{stylesheets}, a L{list} of L{str}, that is a list of stylesheet byte data. Returns a L{dict} mapping C{'data'} to a C{str} of rendered byte data, and C{'contentType'} to the content type of the byte data. """ arguments = [ ('markup', BigString()), ('stylesheets', amp.ListOf(amp.String()))] response = [ ('data', BigString()), ('contentType', amp.String())] errors = { XMLSyntaxError: 'XML_SYNTAX_ERROR', RemoteExternalProcessError: 'EXTERNAL_PROCESS_ERROR'} class Certify(amp.Command): """ Sign a I{PDF} document with the configured keystore. """ arguments = [ ('data', BigString()), ('contentType', amp.String()), ('reason', amp.String()), ('location', amp.String())] response = [ ('data', BigString()), ('contentType', amp.String())] errors = { UnsupportedContentType: 'UNSUPPORTED_CONTENT_TYPE', RemoteExternalProcessError: 'EXTERNAL_PROCESS_ERROR'} class Minter(amp.CommandLocator): """ Documint command locator. This implementation is for performing the full range of tasks that produce PDF documents. """ def __init__(self, signPDF=failingPDFSign): """ @param signPDF: Something like L{documint.extproc.neon.signPDF} with the keystore, keystore password and private key parameters already partially applied. """ self._signPDF = signPDF def embedStylesheets(self, markup, stylesheets): """ Embed external stylesheets in I{XHTML} markup. @type markup: L{str} @param markup: I{XHTML} markup byte data. @type stylesheets: I{iterable} of L{str} @param stylesheets: Iterable of stylesheet byte data to embed. @rtype: L{str} @return: I{XHTML} markup with embedded stylesheets. """ try: return embedStylesheets(markup, stylesheets) except etree.XMLSyntaxError, e: raise XMLSyntaxError(e) def _handleExternalProcessError(self, f): """ Convert L{ExternalProcessError} into L{RemoteExternalProcessError}. """ f.trap(ExternalProcessError) return Failure(RemoteExternalProcessError(f.getErrorMessage())) def _dataResult(self, (data, contentType)): """ Convert a 2-tuple of data and content type into a dict. """ return dict(data=data, contentType=contentType) def _renderXHTML(self, markup, stylesheets): """ Render I{XHMTL} markup and I{CSS} to a I{PDF}. @type markup: L{str} @param markup: I{XHTML} UTF-8 byte data. @type stylesheets: I{iterable} of L{str} @param stylesheets: Iterable of stylesheet UTF-8 byte data to embed. @rtype: L{Deferred} firing with C{(str, str)} @return: Deferred that fires with the generated I{PDF} byte data and content type. """ d = renderXHTML(self.embedStylesheets(markup, stylesheets)) d.addErrback(self._handleExternalProcessError) d.addCallback(lambda data: (data, 'application/pdf')) return d @Render.responder def render(self, markup, stylesheets): d = self._renderXHTML(markup, stylesheets) d.addCallback(self._dataResult) return d def _certifyDocument(self, data, contentType, reason, location): """ Sign a I{PDF} document. """ if contentType != 'application/pdf': raise UnsupportedContentType( 'Only PDF content can be certified, got: {!r}'.format(contentType)) d = self._signPDF(data=data, reason=reason, location=location) d.addErrback(self._handleExternalProcessError) d.addCallback(lambda data: (data, 'application/pdf')) return d @Certify.responder def certify(self, data, contentType, reason, location): d = self._certifyDocument(data, contentType, reason, location) d.addCallback(self._dataResult) return d class SimpleMinter(Minter): """ Documint command locator. The main difference between L{SimpleMinter} and L{Minter} is that L{SimpleMinter} will output I{XHTML}. """ def _renderXHTML(self, markup, stylesheets): """ Embed I{CSS} in I{XHMTL} markup @type markup: L{str} @param markup: I{XHTML} UTF-8 byte data. @type stylesheets: I{iterable} of L{str} @param stylesheets: Iterable of stylesheet UTF-8 byte data to embed. @rtype: L{Deferred} firing with C{(str, str)} @return: Deferred that fires with the generated I{XHTML}, including embedded I{CSS} byte data, and the content type. """ return succeed( (self.embedStylesheets(markup, stylesheets), 'text/html')) __all__ = ['Render', 'Minter', 'SimpleMinter', 'Certify'] PK XF documint/errors.py""" I{Documint} error types. """ class XMLSyntaxError(Exception): """ Wrapper around L{lxml.etree.XMLSyntaxError} that requires no additional arguments. """ class ExternalProcessError(RuntimeError): """ An external process returned an exit status indicating failure. @type binary: C{str} @ivar binary: Path to the binary to spawn. @type arguments: C{sequence} of C{str} @ivar arguments: Arguments to pass when spawning L{binary}. @type code: C{int} @ivar code: Exit status. @type stdout: C{str} @ivar stdout: Standard output data. @type stderr: C{str} @ivar stderr: Standard error data. """ def __init__(self, binary, arguments, code, (stdout, stderr)): RuntimeError.__init__(self, (binary, arguments, code, (stdout, stderr))) self.binary = binary self.arguments = arguments self.code = code self.stdout = stdout self.stderr = stderr class RemoteExternalProcessError(Exception): """ AMP-friendly description of an external process error. """ class NoSuchFile(IOError): """ The specified file could not be found. """ class UnsupportedContentType(ValueError): """ The specified content type is not supported. """ PK DF documint/test/__init__.pyPK YF2 documint/test/test_util.py""" Tests for L{documint.util}. """ from StringIO import StringIO from lxml import etree from twisted.trial.unittest import TestCase from documint.util import embedStylesheets class EmbedStylesheetsTests(TestCase): """ Tests for L{documint.util.embedStylesheets}. """ def test_removeStylesheetLinks(self): """ L{documint.util.embedStylesheets} removes links with the C{'stylesheet'} relationship. """ markup = embedStylesheets('''
''', [], removeStylesheets=True) tree = etree.parse(StringIO(markup)) namespaces = {'xhtml': 'http://www.w3.org/1999/xhtml'} elems = tree.findall('//xhtml:head/xhtml:link', namespaces=namespaces) self.assertEquals(1, len(elems)) self.assertEquals('something', elems[0].get('rel')) def test_embedStylesheets(self): """ L{documint.util.embedStylesheets} embeds stylesheets in I{style} elements. """ stylesheets = [ 'div {color:red;}', 'span {color:blue;}'] markup = embedStylesheets(''' ''', stylesheets) tree = etree.parse(StringIO(markup)) namespaces = {'xhtml': 'http://www.w3.org/1999/xhtml'} elems = tree.findall('//xhtml:head/xhtml:style', namespaces=namespaces) self.assertEquals(2, len(elems)) for elem, stylesheet in zip(elems, stylesheets): self.assertEquals(stylesheet, elem.text) PK YFyvD D documint/test/test_css2xslfo.pyfrom twisted.internet.defer import fail, succeed from twisted.python.filepath import FilePath from twisted.trial.unittest import TestCase from documint.errors import ExternalProcessError from documint.extproc.css2xslfo import css2xslfo, findCSS2XSLFO, renderXHTML class CSS2XSLFOTests(TestCase): """ Tests for L{documint.util.css2xslfo}. """ try: findCSS2XSLFO() except RuntimeError: skip = 'css2xslfo unavailable' def setUp(self): self.dataPath = FilePath(__file__).sibling('data') def test_missing(self): """ Invoking I{css2xslfo} with a missing input raises an L{ExternalProcessError}. """ def checkException(e): self.assertIn('(No such file or directory)', e.stderr) outputPath = FilePath(self.mktemp()) outputPath.touch() d = css2xslfo(self.dataPath.child('missing.html'), outputPath) d = self.assertFailure(d, ExternalProcessError) d.addCallback(checkException) return d def test_broken(self): """ Invoking I{css2xslfo} with a broken input raises an L{ExternalProcessError}. """ def checkException(e): self.assertIn( 'element type "div" must be terminated by the matching end-tag', e.stderr) outputPath = FilePath(self.mktemp()) outputPath.touch() d = css2xslfo(self.dataPath.child('broken.html'), outputPath) d = self.assertFailure(d, ExternalProcessError) d.addCallback(checkException) return d class RenderXHMTLTests(TestCase): """ Tests for L{documint.util.renderXHTML}. """ def mkdtemp(self): """ Create a temporary directory. @rtype: L{FilePath} """ tempDir = FilePath(self.mktemp()) if not tempDir.exists(): tempDir.makedirs() return tempDir def test_renderXHTML(self): """ L{renderXHTML} invokes L{css2xslfo} then L{fop}, removes the temporary directory, and returns I{fop}'s result. """ def mockCSS2XSLFO(xhtmlPath, xslfoPath): return succeed(None) def mockFop(xslfoPath, pdfPath, configFile=None): return succeed('pdf') def cb(pdfData): self.assertIdentical(str, type(pdfData)) self.assertEquals('pdf', pdfData) self.assertFalse(tempDir.exists()) tempDir = self.mkdtemp() d = renderXHTML( 'markup', tempDir=tempDir, css2xslfo=mockCSS2XSLFO, fop=mockFop) d.addCallback(cb) return d def test_renderXHTMLCSS2XSLFOFails(self): """ If L{renderXHTML} fails invoking L{css2xslfo}, L{fop} is not invoked and the temporary directory is not removed. """ def mockCSS2XSLFO(xslfoPath, xhtmlPath): return fail(RuntimeError(1)) def mockFop(xslfoPath, pdfPath, configFile=None): self.fail('Never get here') def cb(e): self.assertEquals('1', str(e)) self.assertTrue(tempDir.exists()) tempDir = self.mkdtemp() d = renderXHTML( 'markup', tempDir=tempDir, css2xslfo=mockCSS2XSLFO, fop=mockFop) d = self.assertFailure(d, RuntimeError) d.addCallback(cb) return d def test_renderXHTMLFopFails(self): """ If L{renderXHTML} fails invoking L{fop}, the temporary directory is not removed. """ def mockCSS2XSLFO(xhtmlPath, xslfoPath): return succeed(None) def mockFop(xslfoPath, pdfPath, configFile=None): return fail(RuntimeError(2)) def cb(e): self.assertEquals('2', str(e)) self.assertTrue(tempDir.exists()) tempDir = self.mkdtemp() d = renderXHTML( 'markup', tempDir=tempDir, css2xslfo=mockCSS2XSLFO, fop=mockFop) self.assertFailure(d, RuntimeError) d.addCallback(cb) return d PK XF;i documint/test/test_commands.py""" Tests for L{documint.commands}. """ from twisted.internet.defer import succeed from twisted.trial.unittest import TestCase from documint.commands import Minter, SimpleMinter class MinterTests(TestCase): """ Tests for L{documint.commands.Minter}. """ def test_render(self): """ L{Minter.render}, a responder for L{documint.commands.Render}, invokes L{Minter.renderXHTML}. """ minter = Minter() minter._renderXHTML = lambda *a: succeed((a, 'application/pdf')) d = minter.render('markup', ['css1', 'css2']) d.addCallback( self.assertEquals, {'data': ('markup', ['css1', 'css2']), 'contentType': 'application/pdf'}) return d class SimpleMinterTests(TestCase): """ Tests for L{documint.commands.SimpleMinter}. """ def test_renderXHTML(self): """ L{SimpleMinter.renderXHTML} invokes L{Minter.embedStylesheets}. """ minter = SimpleMinter() minter.embedStylesheets = lambda *a: a d = minter._renderXHTML('markup', ['css1', 'css2']) d.addCallback( self.assertEquals, (('markup', ['css1', 'css2']), 'text/html')) return d PK aF6ׁ documint/test/test_neon.pyfrom twisted.python.filepath import FilePath from twisted.trial.unittest import TestCase from documint.errors import ExternalProcessError from documint.extproc.neon import _neonBinary, signPDF class NeonTests(TestCase): """ Tests for L{documint.extproc.neon}. """ try: _neonBinary() except RuntimeError: skip = 'clj-neon unavailable.' def setUp(self): self.keystore = FilePath( __file__).sibling('data').child('keystore.jks') self.keystorePassword = u'tQ4i4RJKyX6J4Lq1' self.privateKeyPassword = u'tQ4i4RJKyX6J4Lq1' def signPDF(self, unsignedPDF): return signPDF( data=unsignedPDF, keystorePath=self.keystore, keystorePassword=self.keystorePassword, reason='Test reason', location='Test location', privateKeyPassword=self.privateKeyPassword) def assertValidPDF(self, data): """ Assert that C{data} is valid PDF data. """ self.assertNotEqual(len(data), 0) self.assertTrue(data.startswith('%PDF-')) def test_success(self): """ Neon generates a valid document when invoked with valid data. """ unsignedPDF = FilePath(__file__).sibling('data').child('test.pdf') d = self.signPDF(unsignedPDF.getContent()) d.addCallback(self.assertValidPDF) return d def test_failure(self): """ Invoking Neon with invalid data raises L{documint.error.ExternalProcessError}. """ d = self.signPDF('garbage') return self.assertFailure(d, ExternalProcessError) PK FF documint/test/data/broken.html