Source code for pyxs.helpers
# -*- coding: utf-8 -*-
"""
pyxs.helpers
~~~~~~~~~~~~
Implements various helpers.
:copyright: (c) 2011 by Selectel, see AUTHORS for more details.
"""
from __future__ import unicode_literals
__all__ = ["validate_path", "validate_watch_path", "validate_perms",
"error"]
import errno
import re
import os
import posixpath
import sys
if sys.version_info[0] is not 3:
bytes, str = str, unicode
from .exceptions import InvalidPath, InvalidPermission, PyXSError
#: A reverse mapping for :data:`errno.errorcode`.
_codeerror = dict((message, code)
for code, message in errno.errorcode.items())
def writeall(fd, data):
"""Writes a data string to the file descriptor.
Calls :func:`os.write` repeatedly, unless all data is written.
If an error occurs, it's impossible to tell how much data has
been written.
"""
length = len(data)
while length:
length -= os.write(fd, data[-length:])
def readall(fd, length):
"""Reads a data string of a given length from the file descriptor.
Calls :func:`os.read` repeatedly, unless all data is read. If an
error occurs, it's impossible to tell how much data has been read.
"""
chunks = []
while length:
chunks.append(os.read(fd, length))
length -= len(chunks[-1])
else:
return b"".join(chunks)
def dict_merge(*dicts):
"""Merges given dicts to a single one.
>>> dict_merge()
{}
>>> dict_merge({"foo": "bar", "baz": "boo"})
{'foo': 'bar', 'baz': 'boo'}
"""
base = {}
for d in dicts:
base.update(d)
else:
return base
[docs]def error(smth):
"""Returns a :class:`~pyxs.exceptions.PyXSError` matching a given
errno or error name.
>>> error(22)
pyxs.exceptions.PyXSError: (22, 'Invalid argument')
>>> error("EINVAL")
pyxs.exceptions.PyXSError: (22, 'Invalid argument')
"""
if isinstance(smth, basestring):
smth = _codeerror.get(smth, 0)
return PyXSError(smth, os.strerror(smth))
def force_unicode(value):
"""Coerces a given value to :func:`unicode`.
>>> force_bytes(b"foo")
u'foo'
>>> force_bytes(None)
u'None'
"""
if isinstance(value, bytes):
return value.decode("utf-8")
else:
return str(value)
[docs]def validate_path(path):
"""Checks if a given path is valid, see
:exc:`~pyxs.exceptions.InvalidPath` for details.
:param str path: path to check.
:raises pyxs.exceptions.InvalidPath: when path fails to validate.
"""
# Paths longer than 3072 bytes are forbidden; clients specifying
# relative paths should keep them to within 2048 bytes.
max_len = 3072 if posixpath.abspath(path) else 2048
if not (re.match("^[a-zA-Z0-9-/_@]+\x00?$", path) and
len(path) <= max_len):
raise InvalidPath(path)
# A path is not allowed to have a trailing /, except for the
# root path and shouldn't have dount //'s.
if (len(path) > 1 and path[-1] == "/") or "//" in path:
raise InvalidPath(path)
return path
[docs]def validate_watch_path(wpath):
"""Checks if a given watch path is valid -- it should either be a
valid path or a special, starting with ``@`` character.
:param str wpath: watch path to check.
:raises pyxs.exceptions.InvalidPath: when path fails to validate.
"""
if (wpath.startswith("@") and not
re.match("^@(?:introduceDomain|releaseDomain)\x00?$", wpath)):
raise InvalidPath(wpath)
else:
validate_path(wpath)
return wpath
[docs]def validate_perms(perms):
"""Checks if a given list of permision follows the format described
in :meth:`~pyxs.client.Client.get_permissions`.
:param list perms: permissions to check.
:raises pyxs.exceptions.InvalidPermissions:
when any of the permissions fail to validate.
"""
for perm in perms:
if not re.match("[wrbn]\d+", perm):
raise InvalidPermission(perm)
return perms