# coding: utf-8 # $OpenXM: OpenXM/src/jupyter/kernel.py,v 1.3 2019/05/27 23:30:27 takayama Exp $ from __future__ import print_function import codecs import glob import json import logging import os import re import shutil import subprocess import sys import tempfile import uuid from xml.dom import minidom from metakernel import MetaKernel, ProcessMetaKernel, REPLWrapper, u from metakernel.pexpect import which from IPython.display import Image, SVG from . import __version__ STDIN_PROMPT = '__stdin_prompt>' STDIN_PROMPT_REGEX = re.compile(r'\A.+?%s|debug> ' % STDIN_PROMPT) HELP_LINKS = [ { 'text': "Risa/Asir", 'url': "http://www.openxm.org", }, { 'text': "Asir Kernel", 'url': "http://www.openxm.org", }, ] + MetaKernel.help_links def get_kernel_json(): """Get the kernel json for the kernel. """ here = os.path.dirname(__file__) default_json_file = os.path.join(here, 'kernel.json') json_file = os.environ.get('ASIR_KERNEL_JSON', default_json_file) with open(json_file) as fid: data = json.load(fid) data['argv'][0] = sys.executable return data class AsirKernel(ProcessMetaKernel): implementation = 'Asir Kernel' implementation_version = __version__, language = 'asir' help_links = HELP_LINKS kernel_json = get_kernel_json() _asir_engine = None _language_version = None @property def language_version(self): if self._language_version: return self._language_version ver = self.asir_engine.eval('version', silent=True) ver = self._language_version = ver.split()[-1] return ver @property def language_info(self): return {'mimetype': 'text/x-octave', 'name': 'c', 'file_extension': '.rr', 'version': self.language_version, 'help_links': HELP_LINKS} @property def banner(self): msg = 'Asir Kernel v%s running Risa/Asir v%s' return msg % (__version__, self.language_version) @property def asir_engine(self): if self._asir_engine: return self._asir_engine self._asir_engine = AsirEngine(plot_settings=self.plot_settings, error_handler=self.Error, stdin_handler=self.raw_input, stream_handler=self.Print, logger=self.log) return self._asir_engine def makeWrapper(self): """Start an Asir process and return a :class:`REPLWrapper` object. """ return self.asir_engine.repl def do_execute_direct(self, code, silent=False): if code.strip() in ['quit', 'quit()', 'exit', 'exit()']: self._asir_engine = None self.do_shutdown(True) return # f = open('tmptmp.txt','a');f.write(str(code));f.close() ##### # self._asir_engine.logger.debug(str(code)) ##### val = ProcessMetaKernel.do_execute_direct(self, code+';;', silent=silent) if not silent: try: plot_dir = self.asir_engine.make_figures() except Exception as e: self.Error(e) return val if plot_dir: for image in self.asir_engine.extract_figures(plot_dir, True): self.Display(image) return val def get_kernel_help_on(self, info, level=0, none_on_fail=False): obj = info.get('help_obj', '') if not obj or len(obj.split()) > 1: if none_on_fail: return None else: return "" return self.asir_engine.eval('help %s' % obj, silent=True) def Print(self, *args, **kwargs): # Ignore standalone input hook displays. out = [] for arg in args: if arg.strip() == STDIN_PROMPT: return if arg.strip().startswith(STDIN_PROMPT): arg = arg.replace(STDIN_PROMPT, '') out.append(arg) super(AsirKernel, self).Print(*out, **kwargs) def raw_input(self, text): # Remove the stdin prompt to restore the original prompt. text = text.replace(STDIN_PROMPT, '') return super(AsirKernel, self).raw_input(text) def get_completions(self, info): """ Get completions from kernel based on info dict. """ cmd = 'completion_matches("%s")' % info['obj'] val = self.asir_engine.eval(cmd, silent=True) return val and val.splitlines() or [] def handle_plot_settings(self): """Handle the current plot settings""" self.asir_engine.plot_settings = self.plot_settings class AsirEngine(object): def __init__(self, error_handler=None, stream_handler=None, stdin_handler=None, plot_settings=None, logger=None): self.logger = logger self.executable = self._get_executable() self.repl = self._create_repl() self.error_handler = error_handler self.stream_handler = stream_handler self.stdin_handler = stdin_handler self._startup(plot_settings) @property def plot_settings(self): return None return self._plot_settings @plot_settings.setter def plot_settings(self, settings): settings = settings or dict(backend='inline') return None self._plot_settings = settings # Remove "None" keys so we can use setdefault below. keys = ['format', 'backend', 'width', 'height', 'resolution', 'backend', 'name'] for key in keys: if key in settings and settings.get(key, None) is None: del settings[key] if sys.platform == 'darwin': settings.setdefault('format', 'svg') else: settings.setdefault('format', 'png') settings.setdefault('backend', 'inline') settings.setdefault('width', -1) settings.setdefault('height', -1) settings.setdefault('resolution', 0) settings.setdefault('name', 'Figure') cmds = [] if settings['backend'] == 'inline': cmds.append("set(0, 'defaultfigurevisible', 'off');") else: cmds.append("set(0, 'defaultfigurevisible', 'on');") cmds.append("graphics_toolkit('%s');" % settings['backend']) self.eval('\n'.join(cmds)) def eval(self, code, timeout=None, silent=False): """Evaluate code using the engine. """ stream_handler = None if silent else self.stream_handler if self.logger: # self.logger.setLevel(logging.DEBUG) #### self.logger.debug('Asir eval:') self.logger.debug(code) try: resp = self.repl.run_command(code.rstrip(), timeout=timeout, stream_handler=stream_handler, stdin_handler=self.stdin_handler) resp = resp.replace(STDIN_PROMPT, '') if self.logger and resp: self.logger.debug(resp) return resp except KeyboardInterrupt: return self._interrupt(True) except Exception as e: if self.error_handler: self.error_handler(e) else: raise e def make_figures(self, plot_dir=None): """Create figures for the current figures. Parameters ---------- plot_dir: str, optional The directory in which to create the plots. Returns ------- out: str The plot directory containing the files. """ return None settings = self.plot_settings if settings['backend'] != 'inline': self.eval('drawnow("expose");') if not plot_dir: return fmt = settings['format'] res = settings['resolution'] wid = settings['width'] hgt = settings['height'] name = settings['name'] plot_dir = plot_dir or tempfile.mkdtemp() plot_dir = plot_dir.replace(os.path.sep, '/') # Do not overwrite any existing plot files. spec = os.path.join(plot_dir, '%s*' % name) start = len(glob.glob(spec)) make_figs = '_make_figures("%s", "%s", "%s", %d, %d, %d, %d)' make_figs = make_figs % (plot_dir, fmt, name, wid, hgt, res, start) resp = self.eval(make_figs, silent=True) msg = 'Inline plot failed, consider trying another graphics toolkit\n' if resp and 'error:' in resp: resp = msg + resp if self.error_handler: self.error_handler(resp) else: raise Exception(resp) return plot_dir def extract_figures(self, plot_dir, remove=False): """Get a list of IPython Image objects for the created figures. Parameters ---------- plot_dir: str The directory in which to create the plots. remove: bool, optional. Whether to remove the plot directory after saving. """ images = [] spec = os.path.join(plot_dir, '%s*' % self.plot_settings['name']) for fname in reversed(glob.glob(spec)): filename = os.path.join(plot_dir, fname) try: if fname.lower().endswith('.svg'): im = self._handle_svg(filename) else: im = Image(filename) images.append(im) except Exception as e: if self.error_handler: self.error_handler(e) else: raise e if remove: shutil.rmtree(plot_dir, True) return images def _startup(self, plot_settings): return None def _handle_svg(self, filename): """ Handle special considerations for SVG images. """ # Gnuplot can create invalid characters in SVG files. with codecs.open(filename, 'r', encoding='utf-8', errors='replace') as fid: data = fid.read() im = SVG(data=data) try: im.data = self._fix_svg_size(im.data) except Exception: pass return im def _fix_svg_size(self, data): """GnuPlot SVGs do not have height/width attributes. Set these to be the same as the viewBox, so that the browser scales the image correctly. """ # Minidom does not support parseUnicode, so it must be decoded # to accept unicode characters parsed = minidom.parseString(data.encode('utf-8')) (svg,) = parsed.getElementsByTagName('svg') viewbox = svg.getAttribute('viewBox').split(' ') width, height = viewbox[2:] width, height = int(width), int(height) # Handle overrides in case they were not encoded. settings = self.plot_settings if settings['width'] != -1: if settings['height'] == -1: height = height * settings['width'] / width width = settings['width'] if settings['height'] != -1: if settings['width'] == -1: width = width * settings['height'] / height height = settings['height'] svg.setAttribute('width', '%dpx' % width) svg.setAttribute('height', '%dpx' % height) return svg.toxml() def _create_repl(self): cmd = self.executable if 'asir-cli' not in cmd: version_cmd = [self.executable, '--version'] version = subprocess.check_output(version_cmd).decode('utf-8') if 'version 4' in version: cmd += ' --no-gui' # Interactive mode prevents crashing on Windows on syntax errors. # Delay sourcing the "~/.asirrc" file in case it displays a pager. cmd += ' --interactive --quiet --no-init-file ' # Add cli options provided by the user. cmd += os.environ.get('ASIR_CLI_OPTIONS', '') orig_prompt = u('PEXPECT_PROMPT>') change_prompt = u("base_prompt('{0}')") repl = REPLWrapper(cmd, orig_prompt, change_prompt, stdin_prompt_regex=STDIN_PROMPT_REGEX) if os.name == 'nt': repl.child.crlf = '\n' repl.interrupt = self._interrupt # Remove the default 50ms delay before sending lines. repl.child.delaybeforesend = None return repl def _interrupt(self, silent=False): if (os.name == 'nt'): msg = '** Warning: Cannot interrupt Asir on Windows' if self.stream_handler: self.stream_handler(msg) elif self.logger: self.logger.warn(msg) return self._interrupt_expect(silent) return REPLWrapper.interrupt(self.repl) def _interrupt_expect(self, silent): repl = self.repl child = repl.child expects = [repl.prompt_regex, child.linesep] expected = uuid.uuid4().hex repl.sendline('disp("%s");' % expected) if repl.prompt_emit_cmd: repl.sendline(repl.prompt_emit_cmd) lines = [] while True: # Prevent a keyboard interrupt from breaking this up. while True: try: pos = child.expect(expects) break except KeyboardInterrupt: pass if pos == 1: # End of line received line = child.before if silent: lines.append(line) else: self.stream_handler(line) else: line = child.before if line.strip() == expected: break if len(line) != 0: # prompt received, but partial line precedes it if silent: lines.append(line) else: self.stream_handler(line) return '\n'.join(lines) def _get_executable(self): """Find the best asir executable. """ executable = os.environ.get('ASIR_EXECUTABLE', None) if not executable or not which(executable): if which('asir-cli'): executable = 'asir-cli' else: msg = ('asir-cli Executable not found, please add to path or set' '"ASIR_EXECUTABLE" environment variable') raise OSError(msg) executable = executable.replace(os.path.sep, '/') return executable