PKz[J>[2[2jupyter_kernel_test/__init__.py"""Machinery for testing Jupyter kernels via the messaging protocol. """ from unittest import TestCase, SkipTest from queue import Empty from jupyter_client.manager import start_new_kernel from .messagespec import validate_message, MimeBundle TIMEOUT = 15 __version__ = '0.3' class KernelTests(TestCase): kernel_name = "" @classmethod def setUpClass(cls): cls.km, cls.kc = start_new_kernel(kernel_name=cls.kernel_name) @classmethod def tearDownClass(cls): cls.kc.stop_channels() cls.km.shutdown_kernel() def flush_channels(self): for channel in (self.kc.shell_channel, self.kc.iopub_channel): while True: try: msg = channel.get_msg(block=True, timeout=0.1) except Empty: break else: validate_message(msg) language_name = "" file_extension = "" def test_kernel_info(self): self.flush_channels() msg_id = self.kc.kernel_info() reply = self.kc.get_shell_msg(timeout=TIMEOUT) validate_message(reply, 'kernel_info_reply', msg_id) if self.language_name: self.assertEqual(reply['content']['language_info']['name'], self.language_name) if self.file_extension: self.assertEqual(reply['content']['language_info']['file_extension'], self.file_extension) self.assertTrue(reply['content']['language_info']['file_extension'].startswith(".")) def execute_helper(self, code, timeout=TIMEOUT, silent=False, store_history=True, stop_on_error=True): msg_id = self.kc.execute(code=code, silent=silent, store_history=store_history, stop_on_error=stop_on_error) reply = self.kc.get_shell_msg(timeout=timeout) validate_message(reply, 'execute_reply', msg_id) busy_msg = self.kc.iopub_channel.get_msg(timeout=1) validate_message(busy_msg, 'status', msg_id) self.assertEqual(busy_msg['content']['execution_state'], 'busy') output_msgs = [] while True: msg = self.kc.iopub_channel.get_msg(timeout=0.1) validate_message(msg, msg['msg_type'], msg_id) if msg['msg_type'] == 'status': self.assertEqual(msg['content']['execution_state'], 'idle') break elif msg['msg_type'] == 'execute_input': self.assertEqual(msg['content']['code'], code) continue output_msgs.append(msg) return reply, output_msgs code_hello_world = "" def test_execute_stdout(self): if not self.code_hello_world: raise SkipTest self.flush_channels() reply, output_msgs = self.execute_helper(code=self.code_hello_world) self.assertEqual(reply['content']['status'], 'ok') self.assertGreaterEqual(len(output_msgs), 1) for msg in output_msgs: if (msg['msg_type'] == 'stream') and (msg['content']['name'] == 'stdout'): self.assertIn('hello, world', msg['content']['text']) break else: self.assertTrue(False, "Expected one output message of type 'stream' and 'content.name'='stdout'") code_stderr = "" def test_execute_stderr(self): if not self.code_stderr: raise SkipTest self.flush_channels() reply, output_msgs = self.execute_helper(code=self.code_stderr) self.assertEqual(reply['content']['status'], 'ok') self.assertGreaterEqual(len(output_msgs), 1) for msg in output_msgs: if (msg['msg_type'] == 'stream') and (msg['content']['name'] == 'stderr'): break else: self.assertTrue(False, "Expected one output message of type 'stream' and 'content.name'='stderr'") completion_samples = [] def test_completion(self): if not self.completion_samples: raise SkipTest for sample in self.completion_samples: with self.subTest(text=sample['text']): msg_id = self.kc.complete(sample['text']) reply = self.kc.get_shell_msg() validate_message(reply, 'complete_reply', msg_id) if 'matches' in sample: self.assertEqual(set(reply['content']['matches']), set(sample['matches'])) complete_code_samples = [] incomplete_code_samples = [] invalid_code_samples = [] def check_is_complete(self, sample, status): msg_id = self.kc.is_complete(sample) reply = self.kc.get_shell_msg() validate_message(reply, 'is_complete_reply', msg_id) if reply['content']['status'] != status: msg = "For code sample\n {!r}\nExpected {!r}, got {!r}." raise AssertionError(msg.format(sample, status, reply['content']['status'])) def test_is_complete(self): if not (self.complete_code_samples or self.incomplete_code_samples or self.invalid_code_samples): raise SkipTest self.flush_channels() with self.subTest(status="complete"): for sample in self.complete_code_samples: self.check_is_complete(sample, 'complete') with self.subTest(status="incomplete"): for sample in self.incomplete_code_samples: self.check_is_complete(sample, 'incomplete') with self.subTest(status="invalid"): for sample in self.invalid_code_samples: self.check_is_complete(sample, 'invalid') code_page_something = "" def test_pager(self): if not self.code_page_something: raise SkipTest self.flush_channels() reply, output_msgs = self.execute_helper(self.code_page_something) self.assertEqual(reply['content']['status'], 'ok') payloads = reply['content']['payload'] self.assertEqual(len(payloads), 1) self.assertEqual(payloads[0]['source'], 'page') mimebundle = payloads[0]['data'] # Validate the mimebundle MimeBundle().data = mimebundle self.assertIn('text/plain', mimebundle) code_generate_error = "" def test_error(self): if not self.code_generate_error: raise SkipTest self.flush_channels() reply, output_msgs = self.execute_helper(self.code_generate_error) self.assertEqual(reply['content']['status'], 'error') self.assertEqual(len(output_msgs), 1) self.assertEqual(output_msgs[0]['msg_type'], 'error') code_execute_result = [] def test_execute_result(self): if not self.code_execute_result: raise SkipTest for sample in self.code_execute_result: with self.subTest(code=sample['code']): self.flush_channels() reply, output_msgs = self.execute_helper(sample['code']) self.assertEqual(reply['content']['status'], 'ok') self.assertGreaterEqual(len(output_msgs), 1) self.assertEqual(output_msgs[0]['msg_type'], 'execute_result') self.assertIn('text/plain', output_msgs[0]['content']['data']) self.assertEqual(output_msgs[0]['content']['data']['text/plain'], sample['result']) code_display_data = [] def test_display_data(self): if not self.code_display_data: raise SkipTest for sample in self.code_display_data: with self.subTest(code=sample['code']): self.flush_channels() reply, output_msgs = self.execute_helper(sample['code']) self.assertEqual(reply['content']['status'], 'ok') self.assertGreaterEqual(len(output_msgs), 1) self.assertEqual(output_msgs[0]['msg_type'], 'display_data') self.assertIn(sample['mime'], output_msgs[0]['content']['data']) # this should match one of the values in code_execute_result code_history_pattern = "" supported_history_operations = () def history_helper(self, execute_first, timeout=TIMEOUT, **histargs): self.flush_channels() for code in execute_first: reply, output_msgs = self.execute_helper(code) self.flush_channels() msg_id = self.kc.history(**histargs) reply = self.kc.get_shell_msg(timeout=timeout) validate_message(reply, 'history_reply', msg_id) return reply def test_history(self): if not self.code_execute_result: raise SkipTest codes = [s['code'] for s in self.code_execute_result] results = [s['result'] for s in self.code_execute_result] n = len(codes) session = start = None with self.subTest(hist_access_type="tail"): if 'tail' not in self.supported_history_operations: raise SkipTest reply = self.history_helper(codes, output=False, raw=True, hist_access_type="tail", n=n) self.assertEqual(len(reply['content']['history']), n) self.assertEqual(len(reply['content']['history'][0]), 3) self.assertEqual(codes, [h[2] for h in reply['content']['history']]) session, start = reply['content']['history'][0][0:2] with self.subTest(output=True): reply = self.history_helper(codes, output=True, raw=True, hist_access_type="tail", n=n) self.assertEqual(len(reply['content']['history'][0][2]), 2) with self.subTest(hist_access_type="range"): if 'range' not in self.supported_history_operations: raise SkipTest if session is None: raise SkipTest reply = self.history_helper(codes, output=False, raw=True, hist_access_type="range", session=session, start=start, stop=start+1) self.assertEqual(len(reply['content']['history']), 1) self.assertEqual(reply['content']['history'][0][0], session) self.assertEqual(reply['content']['history'][0][1], start) with self.subTest(hist_access_type="search"): if not self.code_history_pattern: raise SkipTest if 'search' not in self.supported_history_operations: raise SkipTest with self.subTest(subsearch="normal"): reply = self.history_helper(codes, output=False, raw=True, hist_access_type="search", pattern=self.code_history_pattern) self.assertGreaterEqual(len(reply['content']['history']), 1) with self.subTest(subsearch="unique"): reply = self.history_helper(codes, output=False, raw=True, hist_access_type="search", pattern=self.code_history_pattern, unique=True) self.assertEqual(len(reply['content']['history']), 1) with self.subTest(subsearch="n"): reply = self.history_helper(codes, output=False, raw=True, hist_access_type="search", pattern=self.code_history_pattern, n=3) self.assertEqual(len(reply['content']['history']), 3) code_inspect_sample = "" def test_inspect(self): if not self.code_inspect_sample: raise SkipTest self.flush_channels() msg_id = self.kc.inspect(self.code_inspect_sample) reply = self.kc.get_shell_msg(timeout=TIMEOUT) validate_message(reply, 'inspect_reply', msg_id) self.assertEqual(reply['content']['status'], 'ok') self.assertTrue(reply['content']['found']) self.assertGreaterEqual(len(reply['content']['data']), 1) code_clear_output = "" def test_clear_output(self): if not self.code_clear_output: raise SkipTest self.flush_channels() reply, output_msgs = self.execute_helper(code=self.code_clear_output) self.assertEqual(reply['content']['status'], 'ok') self.assertGreaterEqual(len(output_msgs), 1) self.assertEqual(output_msgs[0]['msg_type'], 'clear_output') PK^HP`T"jupyter_kernel_test/messagespec.py"""The v5 message specification. This can be used in the tests to verify messages.""" # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. import sys import nose.tools as nt from traitlets import ( Bool, Unicode, Dict, Integer, List, Enum, ) from .messagespec_common import Reference, MimeBundle, Version # message and header formats specific to verion 5 class RMessage(Reference): msg_id = Unicode() msg_type = Unicode() header = Dict() parent_header = Dict() content = Dict() def check(self, d): super(RMessage, self).check(d) RHeader().check(self.header) if self.parent_header: RHeader().check(self.parent_header) class RHeader(Reference): msg_id = Unicode() msg_type = Unicode() session = Unicode() username = Unicode() version = Version(min='5.0') # version shell replies class ExecuteReply(Reference): execution_count = Integer() status = Enum((u'ok', u'error')) def check(self, d): Reference.check(self, d) if d['status'] == 'ok': ExecuteReplyOkay().check(d) elif d['status'] == 'error': ExecuteReplyError().check(d) class ExecuteReplyOkay(Reference): payload = List(Dict) user_expressions = Dict() class ExecuteReplyError(Reference): ename = Unicode() evalue = Unicode() traceback = List(Unicode) class InspectReply(MimeBundle): found = Bool() class ArgSpec(Reference): args = List(Unicode) varargs = Unicode() varkw = Unicode() defaults = List() class Status(Reference): execution_state = Enum((u'busy', u'idle', u'starting')) class CompleteReply(Reference): matches = List(Unicode) cursor_start = Integer() cursor_end = Integer() status = Unicode() class LanguageInfo(Reference): name = Unicode() version = Unicode(sys.version.split()[0]) class KernelInfoReply(Reference): protocol_version = Version(min='5.0') implementation = Unicode() implementation_version = Version() language_info = Dict() banner = Unicode() def check(self, d): Reference.check(self, d) LanguageInfo().check(d['language_info']) class IsCompleteReply(Reference): status = Enum((u'complete', u'incomplete', u'invalid', u'unknown')) def check(self, d): Reference.check(self, d) if d['status'] == 'incomplete': IsCompleteReplyIncomplete().check(d) class IsCompleteReplyIncomplete(Reference): indent = Unicode() # IOPub messages class ExecuteInput(Reference): code = Unicode() execution_count = Integer() Error = ExecuteReplyError class Stream(Reference): name = Enum((u'stdout', u'stderr')) text = Unicode() class DisplayData(MimeBundle): pass class ExecuteResult(MimeBundle): execution_count = Integer() class HistoryReply(Reference): history = List(List()) class ClearOutput(Reference): wait = Bool() """ Specifications of `content` part of the reply messages. """ references = { 'execute_reply' : ExecuteReply(), 'inspect_reply' : InspectReply(), 'status' : Status(), 'complete_reply' : CompleteReply(), 'kernel_info_reply': KernelInfoReply(), 'is_complete_reply': IsCompleteReply(), 'execute_input' : ExecuteInput(), 'execute_result' : ExecuteResult(), 'history_reply' : HistoryReply(), 'error' : Error(), 'stream' : Stream(), 'display_data' : DisplayData(), 'header' : RHeader(), 'clear_output' : ClearOutput(), } #validation specific to this version of the message specification def validate_message(msg, msg_type=None, parent=None): """validate a message If msg_type and/or parent are given, the msg_type and/or parent msg_id are compared with the given values. """ RMessage().check(msg) if msg_type: nt.assert_equal(msg['msg_type'], msg_type) if parent: nt.assert_equal(msg['parent_header']['msg_id'], parent) content = msg['content'] ref = references[msg['msg_type']] ref.check(content) PKbZJaI)jupyter_kernel_test/messagespec_common.py""" The common code for checking message specification. This can be used in the tests to verify messages. These will remain same between different versions of message specification. """ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. import re from distutils.version import LooseVersion as V import nose.tools as nt from traitlets import ( HasTraits, TraitError, Unicode, Dict, observe ) string_types = (str, type(u'')) #----------------------------------------------------------------------------- # Message Spec References #----------------------------------------------------------------------------- class Reference(HasTraits): """ Base class for message spec specification testing. This class is the core of the message specification test. The idea is that child classes implement trait attributes for each message keys, so that message keys can be tested against these traits using :meth:`check` method. """ def check(self, d): """validate a dict against our traits""" for key in self.trait_names(): nt.assert_in(key, d) # FIXME: always allow None, probably not a good idea if d[key] is None: continue try: setattr(self, key, d[key]) except TraitError as e: assert False, str(e) class Version(Unicode): def __init__(self, *args, **kwargs): self.min = kwargs.pop('min', None) self.max = kwargs.pop('max', None) kwargs['default_value'] = self.min super(Version, self).__init__(*args, **kwargs) def validate(self, obj, value): if self.min and V(value) < V(self.min): raise TraitError("bad version: %s < %s" % (value, self.min)) if self.max and (V(value) > V(self.max)): raise TraitError("bad version: %s > %s" % (value, self.max)) mime_pat = re.compile(r'^[\w\-\+\.]+/[\w\-\+\.]+$') class MimeBundle(Reference): metadata = Dict() data = Dict() @observe('data') def _data_changed(self, change): for k,v in change['new'].items(): assert mime_pat.match(k) nt.assert_is_instance(v, string_types) PKwa/H" !  ,jupyter_kernel_test-0.3.dist-info/COPYING.md# Licensing terms This project is licensed under the terms of the Modified BSD License (also known as New or Revised or 3-Clause BSD), as follows: - Copyright (c) 2015-, Jupyter Development Team All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. Neither the name of the Jupyter Development Team nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ## About the Jupyter Development Team The Jupyter Development Team is the set of all contributors to the Jupyter project. This includes all of the Jupyter subprojects. The core team that coordinates development on GitHub can be found here: https://github.com/jupyter/. ## Our Copyright Policy Jupyter uses a shared copyright model. Each contributor maintains copyright over their contributions to Jupyter. But, it is important to note that these contributions are typically only changes to the repositories. Thus, the Jupyter source code, in its entirety is not the copyright of any single person or institution. Instead, it is the collective copyright of the entire Jupyter Development Team. If individual contributors want to maintain a record of what changes/contributions they have specific copyright on, they should indicate their copyright in the commit message of the change, when they commit the change to one of the Jupyter repositories. With this in mind, the following banner should be used in any source code file to indicate the copyright and license terms: # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. PK!H;@QP'jupyter_kernel_test-0.3.dist-info/WHEEL1 0 RZq+D-Dv;_[*7Fp ܦpv/fݞoL(*IPK!HZ*jupyter_kernel_test-0.3.dist-info/METADATAWm6_1``˹nrInEH\yH1>CXM{/׽'s} i<νyW6یGmSK^XZ(=FJٶl4^~l~yeeL VImfCwA@=O\ M m)E`{"ʉfjw{ۨ6셓'c1ʇ 'T2_o x_ؒ~sy:'<' \V#T60*71mQbVSaKI,֒0% jAH| =7E{d&ɪ{kz|HNy71_ I\fl@T R mH0+X;$ M/([BVi8 Qn "⤖;U"(F8`oUC77/(yB!w}jaHjUnL[_Bq@ىRzSOI!$zݖqsɜVEꫜ8q+(:ݶu2OreC;P"ҹz;a_ j_yr@ft񎹚Bʠ.mlq-D[. =}d{$+U ^hR8 4uwȖ򲌮heM͎ wN;xg8Nugˢ cf lw c[[UV2_ nVa7y^иg2|HƄx)/'K`F=ݮEݛ5DX|vq:(P61>Jm:8ex8f_+~hJ'>_ /)mjc^*8}fe?1oft\]\D7Rh&7j4_'[ӪnȋnyiG±빱{$V|]k >M\lR-DLMs x=PKz[J>[2[2jupyter_kernel_test/__init__.pyPK^HP`T"2jupyter_kernel_test/messagespec.pyPKbZJaI)Bjupyter_kernel_test/messagespec_common.pyPKwa/H" !  ,Ljupyter_kernel_test-0.3.dist-info/COPYING.mdPK!H;@QP'pWjupyter_kernel_test-0.3.dist-info/WHEELPK!HZ*Xjupyter_kernel_test-0.3.dist-info/METADATAPK!HMmg( _jupyter_kernel_test-0.3.dist-info/RECORDPKQ`