Source code for pyxs.client

# -*- coding: utf-8 -*-
"""
    pyxs.client
    ~~~~~~~~~~~

    This module implements XenStore client, which uses multiple connection
    options for communication: :class:`UnixSocketConnection` and
    :class:`XenBusConnection`. Note however, that the latter one can
    be a bit buggy, when dealing with ``WATCH_EVENT`` packets, so
    using :class:`UnixSocketConnection` is preferable.

    :copyright: (c) 2011 by Selectel, see AUTHORS for more details.
"""

from __future__ import absolute_import, unicode_literals

__all__ = ["Client", "UnixSocketConnection", "XenBusConnection"]

import copy
import errno
import os
import platform
import socket
from collections import deque

from ._internal import Event, Packet, Op
from .exceptions import ConnectionError, UnexpectedPacket, PyXSError
from .helpers import spec


#: A reverse mapping for :data:`errno.errorcode`.
_codeerror = dict((message, code)
                  for code, message in errno.errorcode.iteritems())


class FileDescriptorConnection(object):
    """Abstract XenStore connection, using an fd for I/O operations.

    Subclasses are expected to define :meth:`connect()` and set
    :attr:`fd` and :attr:`path` attributes, where `path` is a human
    readable path to the object, `fd` points to.
    """
    fd = path = None

    def __init__(self):
        raise NotImplemented("__init__() should be overriden by subclasses.")

    def disconnect(self):
        if self.fd is None:
            return

        try:
            os.close(self.fd)
        except OSError:
            pass
        finally:
            self.fd = None

    def send(self, packet):
        if not self.fd:
            self.connect()

        try:
            return os.write(self.fd, str(packet))
        except OSError as e:
            if e.args[0] is errno.EPIPE:
                self.disconnect()

            raise ConnectionError("Error while writing to {0!r}: {1}"
                                  .format(self.path, e.args))

    def recv(self):
        try:
            data = os.read(self.fd, Packet._struct.size)
        except OSError as e:
            if e.args[0] is errno.EPIPE:
                self.disconnect()

            raise ConnectionError("Error while reading from {0!r}: {1}"
                                  .format(self.path, e.args))
        else:
            op, rq_id, tx_id, size = Packet._struct.unpack(data)
            return Packet(op, os.read(self.fd, size), rq_id, tx_id)


[docs]class UnixSocketConnection(FileDescriptorConnection): """XenStore connection through Unix domain socket. :param str path: path to XenStore unix domain socket, if not provided explicitly is restored from process environment -- similar to what ``libxs`` does. :param float socket_timeout: see :func:`socket.settimeout` for details. """ def __init__(self, path=None, socket_timeout=None): if path is None: path = ( os.getenv("XENSTORED_PATH") or os.path.join(os.getenv("XENSTORED_RUNDIR", "/var/run/xenstored"), "socket") ) self.path = path self.socket_timeout = None def __copy__(self): return self.__class__(self.path, self.socket_timeout) def connect(self): if self.fd: return try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(self.socket_timeout) sock.connect(self.path) except socket.error as e: raise ConnectionError("Error connecting to {0!r}: {1}" .format(self.path, e.args)) else: self.fd = os.dup(sock.fileno())
[docs]class XenBusConnection(FileDescriptorConnection): """XenStore connection through XenBus. :param str path: path to XenBus block device; a predefined OS-specific constant is used, if a value isn't provided explicitly. """ def __init__(self, path=None): if path is None: # .. note:: it looks like OCaml-powered ``xenstored`` # simply ignores the posibility of being launched on a # platform, different from Linux, but ``libxs`` has those # constants in-place. system = platform.system() if system == "Linux": path = "/proc/xen/xenbus" elif system == "NetBSD": path = "/kern/xen/xenbus" else: path = "/dev/xen/xenbus" self.path = path def __copy__(self): return self.__class__(self.path) def connect(self): if self.fd: return try: self.fd = os.open(self.path, os.O_RDWR) except OSError as e: raise ConnectionError("Error while opening {0!r}: {1}" .format(self.path, e.args))
[docs]class Client(object): """XenStore client -- <useful comment>. :param str xen_bus_path: path to XenBus device, implies that :class:`XenBusConnection` is used as a backend. :param str unix_socket_path: path to XenStore Unix domain socket, usually something like ``/var/run/xenstored/socket`` -- implies that :class:`UnixSocketConnection` is used as a backend. :param float socket_timeout: see :func:`socket.settimeout` for details. :param bool transaction: if ``True`` :meth:`transaction_start` will be issued right after connection is established. .. note:: :class:`UnixSocketConnection` is used as a fallback value, if backend cannot be determined from arguments given. Here's a quick example: >>> with Client() as c: ... c.write("/foo/bar", "baz") ... c.read("/foo/bar") 'OK' 'baz' """ def __init__(self, unix_socket_path=None, socket_timeout=None, xen_bus_path=None, connection=None, transaction=None): if connection: self.connection = connection elif unix_socket_path or not xen_bus_path: self.connection = UnixSocketConnection( unix_socket_path, socket_timeout=socket_timeout) else: self.connection = XenBusConnection(xen_bus_path) self.tx_id = 0 self.events = deque() if transaction: # Requesting a new transaction id. self.tx_id = self.transaction_start() def __enter__(self): self.connection.connect() return self def __exit__(self, *exc_info): if not any(exc_info) and self.tx_id: self.transaction_end(commit=True) self.connection.disconnect() # Private API. # ............ def communicate(self, op, *args, **kwargs): kwargs["tx_id"] = self.tx_id # Forcing ``tx_id`` here. self.connection.send(Packet(op, "".join(args), **kwargs)) # If we have any watched paths `XenStore` will send watch events # mixed with replies to other operations, so we loop untill we # recieve a packet with an expected operation type. while True: packet = self.connection.recv() # According to ``xenstore.txt`` erroneous responses start with # a capital E and end with ``NULL``-byte. if packet.op is Op.ERROR: error = _codeerror.get(packet.payload[:-1], 0) raise PyXSError(error, os.strerror(error)) # Incoming packet should either be a watch event or have the # same operation type as the packet sent. elif packet.op is Op.WATCH_EVENT: self.events.append(packet) elif packet.op is not op: raise UnexpectedPacket(packet) # Making sure sent and recieved packets are within the same # transaction -- not relevant for # `XenBusConnection`, for # some reason it sometimes returns *random* values of tx_id # and rq_id. elif (not isinstance(self.connection, XenBusConnection) and packet.tx_id is not self.tx_id): raise UnexpectedPacket(packet) else: break return packet def command(self, *args): return self.communicate(*args).payload def ack(self, *args): if self.command(*args) != "OK\x00": raise PyXSError("Ooops ...") # Public API. # ........... @spec("<path>|")
[docs] def read(self, path): """Reads data from a given path. :param str path: a path to read from. """ return self.command(Op.READ, path)
@spec("<path>|", "<value|>")
[docs] def write(self, path, value): """Writes data to a given path. :param value: data to write (can be of any type, but will be coerced to :func:`bytes` eventually). :param str path: a path to write to. """ self.ack(Op.WRITE, path, value)
@spec("<path>|")
[docs] def mkdir(self, path): """Ensures that a given path exists, by creating it and any missing parents with empty values. If `path` or any parent already exist, its value is left unchanged. :param str path: path to directory to create. """ self.ack(Op.MKDIR, path)
@spec("<path>|")
[docs] def rm(self, path): """Ensures that a given does not exist, by deleting it and all of its children. It is not an error if `path` doesn't exist, but it **is** an error if `path`'s immediate parent does not exist either. :param str path: path to directory to remove. """ self.ack(Op.RM, path)
@spec("<path>|")
[docs] def directory(self, path): """Returns a list of names of the immediate children of `path`. The resulting children are each named as ``<path>/<child-leaf-name>``. :param str path: path to list. """ return self.command(Op.DIRECTORY, path).rstrip("\x00").split("\x00")
@spec("<path>|")
[docs] def get_perms(self, path): """Returns a list of permissions for a given `path`, see :exc:`~pyxs.exceptions.InvalidPermission` for details on permission format. :param str path: path to get permissions for. """ return self.command(Op.GET_PERMS, path).rstrip("\x00").split("\x00")
@spec("<path>|", "<perms>|+")
[docs] def set_perms(self, path, perms): """Sets a access permissions for a given `path`, see :exc:`~pyxs.exceptions.InvalidPermission` for details on permission format. :param str path: path to set permissions for. :param list perms: a list of permissions to set. """ self.ack(Op.SET_PERMS, path, *perms)
@spec("<wpath>|", "<token>|")
[docs] def watch(self, wpath, token): """Adds a watch. When a `path` is modified (including path creation, removal, contents change or permissions change) this generates an event on the changed `path`. Changes made in transactions cause an event only if and when committed. :param str wpath: path to watch. :param str token: watch token, returned in watch notification. """ self.ack(Op.WATCH, wpath, token)
@spec("<wpath>|", "<token>|")
[docs] def unwatch(self, wpath, token): """Removes a previously added watch. :param str wpath: path to unwatch. :param str token: watch token, passed to :meth:`watch`. """ self.ack(Op.UNWATCH, wpath, token)
[docs] def wait(self): """Waits for any of the watched paths to generate an event, which is a ``(path, token)`` pair, where the first element is event path, i.e. the actual path that was modified and second element is a token, passed to the :meth:`watch`. """ if self.events: return self.events.popleft() while True: packet = self.connection.recv() if packet.op is Op.WATCH_EVENT: return Event(*packet.payload.rstrip("\x00").split("\x00"))
@spec("<domid>|")
[docs] def get_domain_path(self, domid): """Returns the domain's base path, as is used for relative transactions: ex: ``"/local/domain/<domid>"``. If a given `domid` doesn't exists the answer is undefined. :param int domid: domain to get base path for. """ return self.command(Op.GET_DOMAIN_PATH, domid)
@spec("<domid>|")
[docs] def is_domain_introduced(self, domid): """Returns ``True` if ``xenstored`` is in communication with the domain; that is when `INTRODUCE` for the domain has not yet been followed by domain destruction or explicit `RELEASE`; and ``False`` otherwise. :param int domid: domain to check status for. """ return { "T": True, "F": False }.get(self.command(Op.IS_DOMAIN_INTRODUCED, domid).rstrip("\x00"))
@spec("<domid>|", "<mfn>|", "<eventchn>|")
[docs] def introduce(self, domid, mfn, eventchn): """Tells ``xenstored`` to communicate with this domain. :param int domid: a real domain id, (``0`` is forbidden). :param long mfn: address of xenstore page in `domid`. :param int eventch: an unbound event chanel in `domid`. """ self.ack(Op.INTRODUCE, domid, mfn, eventchn)
@spec("<domid>|")
[docs] def release(self, domid): """Manually requests ``xenstored`` to disconnect from the domain. :param int domid: domain to disconnect. .. note:: ``xenstored`` will in any case detect domain destruction and disconnect by itself. .. todo:: make sure it's only executed from Dom0. """ self.ack(Op.RELEASE, domid)
@spec("<domid>|")
[docs] def resume(self, domid): """Tells ``xenstored`` to clear its shutdown flag for a domain. This ensures that a subsequent shutdown will fire the appropriate watches. :param int domid: domain to resume. .. todo:: make sure it's only executed from Dom0. """ self.ack(Op.RESUME, domid)
@spec("<domid>|", "<tdomid>|")
[docs] def set_target(self, domid, target): """Tells ``xenstored`` that a domain is targetting another one, so it should let it tinker with it. This grants domain `domid` full access to paths owned by `target`. Domain `domid` also inherits all permissions granted to `target` on all other paths. :param int domid: domain to set target for. :param int target: target domain (yours truly, Captain). .. todo:: make sure it's only executed from Dom0. """ self.ack(Op.SET_TARGET, domid, target)
[docs] def transaction_start(self): """Starts a new transaction and returns transaction handle, which is simply an int. .. warning:: Currently ``xenstored`` has a bug that after 2^32 transactions it will allocate id 0 for an actual transaction. """ return int(self.command(Op.TRANSACTION_START, "\x00") .rstrip("\x00"))
[docs] def transaction_end(self, commit=True): """End a transaction currently in progress; if no transaction is running no command is sent to XenStore. """ if self.tx_id: self.ack(Op.TRANSACTION_END, ["F", "T"][commit] + "\x00") self.tx_id = 0
[docs] def transaction(self): """Returns a new :class:`Client` instance, operating within a new transaction; can only be used only when no transaction is running. Here's an example: >>> with Client().transaction() as t: ... t.do_something() ... t.transaction_end(commit=True) However, the last line is completely optional, since the default behaviour is to commit everything on context manager exit. :raises pyxs.exceptions.PyXSError: if this client is linked to and active transaction. """ if self.tx_id: raise PyXSError(errno.EALREADY, os.strerror(errno.EALREADY)) return Client(connection=copy.copy(self.connection), transaction=True)