[BACK]Return to kernel.py CVS log [TXT][DIR] Up to [local] / OpenXM / src / jupyter

File: [local] / OpenXM / src / jupyter / kernel.py (download)

Revision 1.3, Mon May 27 23:30:27 2019 UTC (4 years, 11 months ago) by takayama
Branch: MAIN
CVS Tags: HEAD
Changes since 1.2: +25 -25 lines

Octave --> Asir

# coding: utf-8
# $OpenXM: OpenXM/src/jupyter/kernel.py,v 1.3 2019/05/27 23:30:27 takayama Exp $
from __future__ import print_function

import codecs
import glob
import json
import logging
import os
import re
import shutil
import subprocess
import sys
import tempfile
import uuid
from xml.dom import minidom

from metakernel import MetaKernel, ProcessMetaKernel, REPLWrapper, u
from metakernel.pexpect import which
from IPython.display import Image, SVG

from . import __version__


STDIN_PROMPT = '__stdin_prompt>'
STDIN_PROMPT_REGEX = re.compile(r'\A.+?%s|debug> ' % STDIN_PROMPT)
HELP_LINKS = [
    {
        'text': "Risa/Asir",
        'url': "http://www.openxm.org",
    },
    {
        'text': "Asir Kernel",
        'url': "http://www.openxm.org",
    },

] + MetaKernel.help_links


def get_kernel_json():
    """Get the kernel json for the kernel.
    """
    here = os.path.dirname(__file__)
    default_json_file = os.path.join(here, 'kernel.json')
    json_file = os.environ.get('ASIR_KERNEL_JSON', default_json_file)
    with open(json_file) as fid:
        data = json.load(fid)
    data['argv'][0] = sys.executable
    return data


class AsirKernel(ProcessMetaKernel):
    implementation = 'Asir Kernel'
    implementation_version = __version__,
    language = 'asir'
    help_links = HELP_LINKS
    kernel_json = get_kernel_json()

    _asir_engine = None
    _language_version = None

    @property
    def language_version(self):
        if self._language_version:
            return self._language_version
        ver = self.asir_engine.eval('version', silent=True)
        ver = self._language_version = ver.split()[-1]
        return ver

    @property
    def language_info(self):
        return {'mimetype': 'text/x-octave',
                'name': 'c',
                'file_extension': '.rr',
                'version': self.language_version,
                'help_links': HELP_LINKS}

    @property
    def banner(self):
        msg = 'Asir Kernel v%s running Risa/Asir v%s'
        return msg % (__version__, self.language_version)

    @property
    def asir_engine(self):
        if self._asir_engine:
            return self._asir_engine
        self._asir_engine = AsirEngine(plot_settings=self.plot_settings,
                                           error_handler=self.Error,
                                           stdin_handler=self.raw_input,
                                           stream_handler=self.Print,
                                           logger=self.log)
        return self._asir_engine

    def makeWrapper(self):
        """Start an Asir process and return a :class:`REPLWrapper` object.
        """
        return self.asir_engine.repl

    def do_execute_direct(self, code, silent=False):
        if code.strip() in ['quit', 'quit()', 'exit', 'exit()']:
            self._asir_engine = None
            self.do_shutdown(True)
            return
#        f = open('tmptmp.txt','a');f.write(str(code));f.close()  #####
#        self._asir_engine.logger.debug(str(code))  #####
        val = ProcessMetaKernel.do_execute_direct(self, code+';;', silent=silent)
        if not silent:
            try:
                plot_dir = self.asir_engine.make_figures()
            except Exception as e:
                self.Error(e)
                return val
            if plot_dir:
                for image in self.asir_engine.extract_figures(plot_dir, True):
                    self.Display(image)
        return val

    def get_kernel_help_on(self, info, level=0, none_on_fail=False):
        obj = info.get('help_obj', '')
        if not obj or len(obj.split()) > 1:
            if none_on_fail:
                return None
            else:
                return ""
        return self.asir_engine.eval('help %s' % obj, silent=True)

    def Print(self, *args, **kwargs):
        # Ignore standalone input hook displays.
        out = []
        for arg in args:
            if arg.strip() == STDIN_PROMPT:
                return
            if arg.strip().startswith(STDIN_PROMPT):
                arg = arg.replace(STDIN_PROMPT, '')
            out.append(arg)
        super(AsirKernel, self).Print(*out, **kwargs)

    def raw_input(self, text):
        # Remove the stdin prompt to restore the original prompt.
        text = text.replace(STDIN_PROMPT, '')
        return super(AsirKernel, self).raw_input(text)

    def get_completions(self, info):
        """
        Get completions from kernel based on info dict.
        """
        cmd = 'completion_matches("%s")' % info['obj']
        val = self.asir_engine.eval(cmd, silent=True)
        return val and val.splitlines() or []

    def handle_plot_settings(self):
        """Handle the current plot settings"""
        self.asir_engine.plot_settings = self.plot_settings


class AsirEngine(object):

    def __init__(self, error_handler=None, stream_handler=None,
                 stdin_handler=None, plot_settings=None,
                 logger=None):
        self.logger = logger
        self.executable = self._get_executable()
        self.repl = self._create_repl()
        self.error_handler = error_handler
        self.stream_handler = stream_handler
        self.stdin_handler = stdin_handler
        self._startup(plot_settings)
    
    @property
    def plot_settings(self):
        return None
        return self._plot_settings

    @plot_settings.setter
    def plot_settings(self, settings):
        settings = settings or dict(backend='inline')
        return None
        self._plot_settings = settings

        # Remove "None" keys so we can use setdefault below.
        keys = ['format', 'backend', 'width', 'height', 'resolution',
                'backend', 'name']
        for key in keys:
            if key in settings and settings.get(key, None) is None:
                del settings[key]

        if sys.platform == 'darwin':
            settings.setdefault('format', 'svg')
        else:
            settings.setdefault('format', 'png')

        settings.setdefault('backend', 'inline')
        settings.setdefault('width', -1)
        settings.setdefault('height', -1)
        settings.setdefault('resolution', 0)
        settings.setdefault('name', 'Figure')

        cmds = []
        if settings['backend'] == 'inline':
            cmds.append("set(0, 'defaultfigurevisible', 'off');")
        else:
            cmds.append("set(0, 'defaultfigurevisible', 'on');")
            cmds.append("graphics_toolkit('%s');" % settings['backend'])
        self.eval('\n'.join(cmds))

    def eval(self, code, timeout=None, silent=False):
        """Evaluate code using the engine.
        """
        stream_handler = None if silent else self.stream_handler
        if self.logger:
#            self.logger.setLevel(logging.DEBUG)  ####
            self.logger.debug('Asir eval:')
            self.logger.debug(code)
        try:
            resp = self.repl.run_command(code.rstrip(),
                                         timeout=timeout,
                                         stream_handler=stream_handler,
                                         stdin_handler=self.stdin_handler)
            resp = resp.replace(STDIN_PROMPT, '')
            if self.logger and resp:
                self.logger.debug(resp)
            return resp
        except KeyboardInterrupt:
            return self._interrupt(True)
        except Exception as e:
            if self.error_handler:
                self.error_handler(e)
            else:
                raise e

    def make_figures(self, plot_dir=None):
        """Create figures for the current figures.

        Parameters
        ----------
        plot_dir: str, optional
            The directory in which to create the plots.

        Returns
        -------
        out: str
            The plot directory containing the files.
        """
        return None
        settings = self.plot_settings
        if settings['backend'] != 'inline':
            self.eval('drawnow("expose");')
            if not plot_dir:
                return
        fmt = settings['format']
        res = settings['resolution']
        wid = settings['width']
        hgt = settings['height']
        name = settings['name']
        plot_dir = plot_dir or tempfile.mkdtemp()
        plot_dir = plot_dir.replace(os.path.sep, '/')

        # Do not overwrite any existing plot files.
        spec = os.path.join(plot_dir, '%s*' % name)
        start = len(glob.glob(spec))

        make_figs = '_make_figures("%s", "%s", "%s", %d, %d, %d, %d)'
        make_figs = make_figs % (plot_dir, fmt, name, wid, hgt, res, start)
        resp = self.eval(make_figs, silent=True)
        msg = 'Inline plot failed, consider trying another graphics toolkit\n'
        if resp and 'error:' in resp:
            resp = msg + resp
            if self.error_handler:
                self.error_handler(resp)
            else:
                raise Exception(resp)
        return plot_dir

    def extract_figures(self, plot_dir, remove=False):
        """Get a list of IPython Image objects for the created figures.

        Parameters
        ----------
        plot_dir: str
            The directory in which to create the plots.
        remove: bool, optional.
            Whether to remove the plot directory after saving.
        """
        images = []
        spec = os.path.join(plot_dir, '%s*' % self.plot_settings['name'])
        for fname in reversed(glob.glob(spec)):
            filename = os.path.join(plot_dir, fname)
            try:
                if fname.lower().endswith('.svg'):
                    im = self._handle_svg(filename)
                else:
                    im = Image(filename)
                images.append(im)
            except Exception as e:
                if self.error_handler:
                    self.error_handler(e)
                else:
                    raise e
        if remove:
            shutil.rmtree(plot_dir, True)
        return images

    def _startup(self, plot_settings):
        return None

    def _handle_svg(self, filename):
        """
        Handle special considerations for SVG images.
        """
        # Gnuplot can create invalid characters in SVG files.
        with codecs.open(filename, 'r', encoding='utf-8',
                         errors='replace') as fid:
            data = fid.read()
        im = SVG(data=data)
        try:
            im.data = self._fix_svg_size(im.data)
        except Exception:
            pass
        return im

    def _fix_svg_size(self, data):
        """GnuPlot SVGs do not have height/width attributes.  Set
        these to be the same as the viewBox, so that the browser
        scales the image correctly.
        """
        # Minidom does not support parseUnicode, so it must be decoded
        # to accept unicode characters
        parsed = minidom.parseString(data.encode('utf-8'))
        (svg,) = parsed.getElementsByTagName('svg')

        viewbox = svg.getAttribute('viewBox').split(' ')
        width, height = viewbox[2:]
        width, height = int(width), int(height)

        # Handle overrides in case they were not encoded.
        settings = self.plot_settings
        if settings['width'] != -1:
            if settings['height'] == -1:
                height = height * settings['width'] / width
            width = settings['width']
        if settings['height'] != -1:
            if settings['width'] == -1:
                width = width * settings['height'] / height
            height = settings['height']

        svg.setAttribute('width', '%dpx' % width)
        svg.setAttribute('height', '%dpx' % height)
        return svg.toxml()

    def _create_repl(self):
        cmd = self.executable
        if 'asir-cli' not in cmd:
            version_cmd = [self.executable, '--version']
            version = subprocess.check_output(version_cmd).decode('utf-8')
            if 'version 4' in version:
                cmd += ' --no-gui'
        # Interactive mode prevents crashing on Windows on syntax errors.
        # Delay sourcing the "~/.asirrc" file in case it displays a pager.
        cmd += ' --interactive --quiet --no-init-file '

        # Add cli options provided by the user.
        cmd += os.environ.get('ASIR_CLI_OPTIONS', '')

        orig_prompt = u('PEXPECT_PROMPT>')
        change_prompt = u("base_prompt('{0}')")

        repl = REPLWrapper(cmd, orig_prompt, change_prompt,
                           stdin_prompt_regex=STDIN_PROMPT_REGEX)
        if os.name == 'nt':
            repl.child.crlf = '\n'
        repl.interrupt = self._interrupt
        # Remove the default 50ms delay before sending lines.
        repl.child.delaybeforesend = None
        return repl

    def _interrupt(self, silent=False):
        if (os.name == 'nt'):
            msg = '** Warning: Cannot interrupt Asir on Windows'
            if self.stream_handler:
                self.stream_handler(msg)
            elif self.logger:
                self.logger.warn(msg)
            return self._interrupt_expect(silent)
        return REPLWrapper.interrupt(self.repl)

    def _interrupt_expect(self, silent):
        repl = self.repl
        child = repl.child
        expects = [repl.prompt_regex, child.linesep]
        expected = uuid.uuid4().hex
        repl.sendline('disp("%s");' % expected)
        if repl.prompt_emit_cmd:
            repl.sendline(repl.prompt_emit_cmd)
        lines = []
        while True:
            # Prevent a keyboard interrupt from breaking this up.
            while True:
                try:
                    pos = child.expect(expects)
                    break
                except KeyboardInterrupt:
                    pass
            if pos == 1:  # End of line received
                line = child.before
                if silent:
                    lines.append(line)
                else:
                    self.stream_handler(line)
            else:
                line = child.before
                if line.strip() == expected:
                    break
                if len(line) != 0:
                    # prompt received, but partial line precedes it
                    if silent:
                        lines.append(line)
                    else:
                        self.stream_handler(line)
        return '\n'.join(lines)

    def _get_executable(self):
        """Find the best asir executable.
        """
        executable = os.environ.get('ASIR_EXECUTABLE', None)
        if not executable or not which(executable):
            if which('asir-cli'):
                executable = 'asir-cli'
            else:
                msg = ('asir-cli Executable not found, please add to path or set'
                       '"ASIR_EXECUTABLE" environment variable')
                raise OSError(msg)
        executable = executable.replace(os.path.sep, '/')
        return executable