Blob Blame History Raw
# ET is 80's!
#import elementtree as etree
# LXML is 00's!
from lxml import etree
from lxml.etree import tostring
#from dateutil.parser import parse as parse_date
from datetime import datetime
import uuid
import cgi
import copy

__all__ = [
    'ATOM', 'atom_ns', 'Element', 'tostring']

ATOM_NAMESPACE = atom_ns = 'http://www.w3.org/2005/Atom'
app_ns = 'http://www.w3.org/2007/app'
xhtml_ns = 'http://www.w3.org/1999/xhtml'

nsmap = {'': atom_ns, 'app': app_ns}

_rel_alternate_xpath = etree.XPath(
    "./atom:link[not(@rel) or @rel = 'alternate']",
    namespaces=dict(atom=atom_ns))
_rel_other_xpath = etree.XPath(
    "./atom:link[@rel = $rel]",
    namespaces=dict(atom=atom_ns))



class AtomLookup(etree.CustomElementClassLookup):
    _elements = {}
    _app_elements = {}

    def lookup(self, node_type, document, namespace, name):
        if node_type == 'element':
            if namespace == atom_ns:
                return self._elements.get(name, AtomElement)
            elif namespace == app_ns:
                return self._app_elements.get(name, APPElement)
            ## FIXME: is this default good?
            return AtomElement
        # Otherwise normal lookup
        return None

atom_parser = etree.XMLParser()
atom_parser.setElementClassLookup(AtomLookup())

def parse(input):
    return etree.parse(input, atom_parser)

def ATOM(atom):
    """
    Parse an Atom document
    """
    return etree.XML(atom, atom_parser)

def Element(tag, *args, **kw):
    """
    Create an Atom element.  Adds the Atom namespace if no namespace
    is given.
    """
    if '{' not in tag:
        # No namespace means the atom namespace
        tag = '{%s}%s' % (atom_ns, tag)
    return atom_parser.makeelement(tag, *args, **kw)

def _strftime(d):
    """
    Format a date the way Atom likes it (RFC3339?)
    """
    return d.strftime('%Y-%m-%dT%H:%M:%SZ%z')

## try:
##     from lxml import builder
## except ImportError:
##     pass
## else:
##     E = builder.ElementMaker(parser=atom_parser,
##                              typemap={datetime: lambda e, v: _strftime(v)})
from lxml import builder
E = builder.ElementMaker(#parser=atom_parser,
                         typemap={datetime: lambda e, v: _strftime(v)})
__all__.append('E')

class NoDefault:
    pass

class _LiveList(list):
    """
    This list calls on_add or on_remove whenever the list is modified.
    """
    on_add = on_remove = None
    name = None
    def __init__(self, *args, **kw):
        on_add = on_remove = name = None
        if 'on_add' in kw:
            on_add = kw.pop('on_add')
        if 'on_remove' in kw:
            on_remove = kw.pop('on_remove')
        if 'name' in kw:
            name = kw.pop('name')
        list.__init__(self, *args, **kw)
        self.on_add = on_add
        self.on_remove = on_remove
        self.name = name
    def _make_list(self, obj):
        if not isinstance(obj, (list, tuple)):
            obj = list(obj)
        return obj
    def _do_add(self, items):
        if self.on_add is not None:
            for item in items:
                self.on_add(self, item)
    def _do_remove(self, items):
        if self.on_remove is not None:
            for item in items:
                self.on_remove(self, item)
    def __setslice__(self, i, j, other):
        other = self._make_list(other)
        old = self[i:j]
        list.__setslice__(self, i, j, other)
        self._do_remove(old)
        self._do_add(other)
    def __delslice__(self, i, j):
        old = self[i:j]
        list.__delslice__(self, i, j)
        self._do_remove(old)
    def __iadd__(self, other):
        other = self._make_list(other)
        list.__iadd__(self, other)
        self._do_add(other)
    def __imul__(self, n):
        while n > 0:
            self += self
            n -= 1
    def append(self, item):
        list.append(self, item)
        self._do_add([item])
    def insert(self, i, item):
        list.insert(self, i, item)
        self._do_add([item])
    def pop(self, i=-1):
        item = self[i]
        result = list.pop(self, i)
        self._do_remove([item])
        return result
    def remove(self, item):
        list.remove(self, item)
        self._do_remove([item])
    def extend(self, other):
        for item in other:
            self.append(item)
    def __repr__(self):
        name = self.name
        if name is None:
            name = '_LiveList'
        return '%s(%s)' % (name, list.__repr__(self))

class _findall_property(object):
    """
    Returns a LiveList of all the objects with the given tag.  You can
    append or remove items to the list to add or remove them from the
    containing tag.
    """
    
    def __init__(self, tag, ns=atom_ns):
        self.tag = tag
        self.ns = ns
        self.__doc__ = 'Return live list of all the <atom:%s> element' % self.tag
    def __get__(self, obj, type=None):
        if obj is None:
            return self
        def add(lst, item):
            # FIXME: shouldn't just be an append
            obj.append(item)
        def remove(lst, item):
            obj.remove(item)
        return _LiveList(obj._atom_iter(self.tag, ns=self.ns),
                         on_add=add, on_remove=remove,
                         name='live_%s_list' % self.tag)
    def __set__(self, obj, value):
        cur = self.__get__(obj)
        cur[:] = value

class _text_element_property(object):
    """
    Creates an attribute that returns the text content of the given
    subelement.  E.g., ``title = _text_element_property('title')``
    will make ``obj.title`` return the contents of the ``<title>``.
    Similarly setting the attribute sets the text content of the
    attribute.
    """

    def __init__(self, tag, strip=True):
        self.tag = tag
        self.strip = strip
        self.__doc__ = 'Access the <atom:%s> element as text' % self.tag
    def __get__(self, obj, type=None):
        if obj is None:
            return self
        v = obj._atom_findtext(self.tag)
        if self.strip:
            if v is not None:
                v = v.strip()
            else:
                return ''
        return v
    def __set__(self, obj, value):
        el = obj._get_or_create(self.tag)
        el.text = value
    def __delete__(self, obj):
        el = obj._atom_get(self.tag)
        if el:
            # FIXME: should it be an error if it doesn't exist?
            obj.remove(el)

class _element_property(object):
    """
    Returns a single subelement based on tag.  Setting the attribute
    removes the element and adds a new one.  Deleting it removes the
    element.

    """
    def __init__(self, tag):
        self.tag = tag
        self.__doc__ = 'Get the <atom:%s> element' % self.tag
    def __get__(self, obj, type=None):
        if obj is None:
            return self
        return obj._atom_get(self.tag)
    def __set__(self, obj, value):
        el = obj._atom_get(self.tag)
        if el is not None:
            parent = el.getparent()
            index = parent.index(el)
            parent[index] = value
        else:
            obj.append(value)
    def __delete__(self):
        el = obj._atom_get(self.tag)
        if el is not None:
            obj.remove(el)

class _attr_element_property(object):
    """
    Get/set the value of the attribute on this element.
    """

    def __init__(self, attr, default=NoDefault):
        self.attr = attr
        self.default = default
        self.__doc__ = 'Access the %s attribute' % self.attr
    def __get__(self, obj, type=None):
        if obj is None:
            return self
        try:
            return obj.attrib[self.attr]
        except KeyError:
            if self.default is not NoDefault:
                return self.default
            raise AttributeError(self.attr)
    def __set__(self, obj, value):
        if value is None:
            self.__delete__(obj)
        else:
            obj.attrib[self.attr] = value
    def __delete__(self, obj):
        if self.attr in obj.attrib:
            del obj.attrib[self.attr]

class _date_element_property(object):
    """
    Get/set the parsed date value of the text content of a tag.
    """

    def __init__(self, tag, ns=atom_ns):
        self.tag = tag
        self.ns = ns
        self.__doc__ = 'Access the date in %s' % self.tag
    def __get__(self, obj, type=None):
        if obj is None:
            return self
        el = obj._atom_get(self.tag, ns=self.ns)
        if el is None:
            return None
        return el.date
    def __set__(self, obj, value):
        el = obj._get_or_create(self.tag, ns=self.ns)
        el.date = value
    def __delete__(self):
        el = obj._atom_get(self.tag)
        if el is not None:
            obj.remove(el)

class _date_text_property(object):

    def __get__(self, obj, type=None):
        if obj is None:
            return self
        return parse_date(obj.text)
    def __set__(self, obj, value):
        if not value:
            obj.text = None
            return
        if isinstance(value, datetime):
            value = _strftime(value)
        obj.text = value
    def __del__(self, obj):
        obj.text = None

class AtomElement(etree.ElementBase):
    def _get_or_create(self, tag, ns=atom_ns):
        el = self.find('{%s}%s' % (ns, tag))
        if el is None:
            el = self.makeelement('{%s}%s' % (ns, tag))
            self.append(el)
        return el

    def _atom_get(self, tag, ns=atom_ns):
        for item in self._atom_iter(tag, ns=ns):
            return item
        return None

    def _atom_iter(self, tag, ns=atom_ns):
        return self.getiterator('{%s}%s' % (ns, tag))

    def _atom_findtext(self, tag, ns=atom_ns):
        return self.findtext('{%s}%s' % (ns, tag))

    def _get_parent(self, tag, ns=atom_ns):
        parent = self
        while 1:
            if parent.tag == '{%s}%s' % (ns, tag):
                return parent
            parent = parent.getparent()
            if parent is None:
                return None

    @property
    def feed(self):
        return self._get_parent('feed')

    def rel_links(self, rel='alternate'):
        """
        Return all the links with the given ``rel`` attribute.  The
        default relation is ``'alternate'``, and as specified for Atom
        links with no ``rel`` attribute are assumed to mean alternate.
        """
        if rel is None:
            return self._atom_iter('link')
        return [
            el for el in self._atom_iter('link')
            if el.get('rel') == rel
            or rel == 'alternate' and not el.get('rel')]

    def __repr__(self):
        tag = self.tag
        if '}' in tag:
            tag = tag.split('}', 1)[1]
        return '<%s.%s atom:%s at %s>' % (
            self.__class__.__module__,
            self.__class__.__name__,
            tag,
            hex(abs(id(self)))[2:])

class Feed(AtomElement):
    """
    For ``<feed>`` elements.
    """
    
    @property
    def feed(self):
        return self

    entries = _findall_property('entry')
    title = _text_element_property('title')
    author = _element_property('author')

class Entry(AtomElement):
    """
    For ``<entry>`` elements.
    """
    
    @property
    def entry(self):
        return self
    id = _text_element_property('id')
    title = _text_element_property('title')
    published = _date_element_property('published')
    updated = _date_element_property('updated')
    edited = _date_element_property('edited', ns=app_ns)
    def update_edited(self):
        """
        Set app:edited to current time
        """
        self.edited = datetime.utcnow()
    def update_updated(self):
        """
        Set atom:updated to the current time
        """
        self.updated = datetime.utcnow()
    def make_id(self):
        """
        Create an artificial id for this entry
        """
        assert not self.id, (
            "You cannot make an id if one already exists")
        self.id = 'uuid:%s' % uuid.uuid4()
    def author__get(self):
        el = self._atom_get('author')
        if el is None:
            if self.feed is not None:
                return self.feed.author
        return el
    def author__set(self, value):
        el = self._atom_get('author')
        if el is not None:
            self.remove(el)
        self.append(value)
    def author__del(self):
        el = self._atom_get('author')
        if el is not None:
            self.remove(el)
    author = property(author__get, author__set, author__del)

    categories = _findall_property('category')

class _EntryElement(AtomElement):
    @property
    def entry(self):
        return self._get_parent('entry')

class Category(_EntryElement):
    """
    For ``<category>`` elements.
    """
    term = _attr_element_property('term')
    scheme = _attr_element_property('scheme', None)
    label = _attr_element_property('label', None)

    def as_string(self):
        """
        Returns the string representation of the category, using the
        GData convention of ``{scheme}term``
        """
        if self.scheme is not None:
            return '{%s}%s' % (self.scheme, self.term)
        else:
            return self.term

class PersonElement(_EntryElement):
    """
    Represents authors and contributors
    """
    
    email = _text_element_property('email')
    uri = _text_element_property('uri')
    name = _text_element_property('name')

class DateElement(_EntryElement):
    """
    For elements that contain a date in their text content.
    """
    date = _date_text_property()

class TextElement(_EntryElement):

    type = _attr_element_property('type', None)
    src = _attr_element_property('src', None)

    def _html__get(self):
        """
        Gives the parsed HTML of element's content.  May return an
        HtmlElement (from lxml.html) or an XHTML tree.  If the element
        is ``type="text"`` then it is returned as quoted HTML.

        You can also set this attribute to either an lxml.html
        element, an XHTML element, or an HTML string.

        Raises AttributeError if this is not HTML content.
        """
        ## FIXME: should this handle text/html types?
        if self.type == 'html':
            content = self.text
        elif self.type == 'text':
            content = cgi.escape(self.text)
        elif self.type == 'xhtml':
            div = copy.deepcopy(self[0])
            # Now remove the namespaces:
            for el in div.getiterator():
                if el.tag.startswith('{'):
                    el.tag = el.tag.split('}', 1)[1]
            if div.tag.startswith('{'):
                div.tag = el.tag.split('}', 1)[1]
            from lxml.html import tostring
            content = tostring(div)
        else:
            raise AttributeError(
                "Not an HTML or text content (type=%r)" % self.type)
        from lxml.html import fromstring
        return fromstring(content)

    def _html__set(self, value):
        if value is None:
            del self.html
            return
        if isinstance(value, basestring):
            # Some HTML text
            self.type = 'html'
            self.text = value
            return
        if value.tag.startswith('{%s}' % xhtml_ns):
            if value.tag != '{%s}div' % xhtml_ns:
                # Need to wrap it in a <div>
                el = self.makeelement('{%s}div' % xhtml_ns)
                el.append(value)
                value = el
            self[:] = []
            self.type = 'xhtml'
            self.append(value)
            return
        from lxml import html
        if isinstance(value, html.HtmlElement):
            value = tostring(value)
            self[:] = []
            self.type = 'html'
            self.text = value
            return
        raise TypeError(
            "Unknown HTML type: %s" % type(value))

    def _html__del(self):
        self.text = None

    html = property(_html__get, _html__set, _html__del, doc=_html__get.__doc__)

    def _binary__get(self):
        """
        Gets/sets the binary content, which is base64 encoded in the
        text.
        """
        text = self.text
        if text is None:
            raise AttributeError(
                "No text (maybe in src?)")
        text = text.decode('base64')
        return text

    def _binary__set(self, value):
        if isinstance(value, unicode):
            ## FIXME: is this kosher?
            value = value.encode('utf8')
        if not isinstance(value, str):
            raise TypeError(
                "Must set .binary to a str or unicode object (not %s)"
                % type(value))
        value = value.encode('base64')
        self.text = value

    def _binary__del(self):
        self.text = None

    binary = property(_binary__get, _binary__set, _binary__del, doc=_binary__get.__doc__)
            

class LinkElement(_EntryElement):
    """
    For ``<link>`` elements.
    """
    href = _attr_element_property('href', None)
    rel = _attr_element_property('rel', None)
    type = _attr_element_property('type', None)
    title = _attr_element_property('title', None)

    def __repr__(self):
        return '<%s.%s at %s rel=%r href=%r>' % (
            self.__class__.__module__,
            self.__class__.__name__,
            hex(abs(id(self)))[2:],
            self.rel, self.href)

AtomLookup._elements.update(dict(
    feed=Feed,
    entry=Entry,
    category=Category,
    author=PersonElement,
    contributor=PersonElement,
    published=DateElement,
    updated=DateElement,
    content=TextElement,
    summary=TextElement,
    title=TextElement,
    rights=TextElement,
    subtitle=TextElement,
    link=LinkElement,
    ))

class APPElement(etree.ElementBase):
    def __repr__(self):
        tag = self.tag
        if '}' in tag:
            tag = tag.split('}', 1)[1]
        return '<%s.%s app:%s at %s>' % (
            self.__class__.__module__,
            self.__class__.__name__,
            tag,
            hex(abs(id(self)))[2:])

class Service(APPElement):
    workspaces = _findall_property('workspace', ns=app_ns)

class Workspace(APPElement):
    collections = _findall_property('collection', ns=app_ns)

class Collection(APPElement):
    pass

class Edited(APPElement):
    date = _date_text_property()

AtomLookup._app_elements.update(dict(
    service=Service,
    workspace=Workspace,
    collection=Collection,
    edited=Edited,
    ))