# ET is 80's!
#import elementtree as etree
# LXML is 00's!
from lxml import etree
from lxml.etree import tostring
#from dateutil.parser import parse as parse_date
from datetime import datetime
import uuid
import cgi
import copy
__all__ = [
'ATOM', 'atom_ns', 'Element', 'tostring']
ATOM_NAMESPACE = atom_ns = 'http://www.w3.org/2005/Atom'
app_ns = 'http://www.w3.org/2007/app'
xhtml_ns = 'http://www.w3.org/1999/xhtml'
nsmap = {'': atom_ns, 'app': app_ns}
_rel_alternate_xpath = etree.XPath(
"./atom:link[not(@rel) or @rel = 'alternate']",
namespaces=dict(atom=atom_ns))
_rel_other_xpath = etree.XPath(
"./atom:link[@rel = $rel]",
namespaces=dict(atom=atom_ns))
class AtomLookup(etree.CustomElementClassLookup):
_elements = {}
_app_elements = {}
def lookup(self, node_type, document, namespace, name):
if node_type == 'element':
if namespace == atom_ns:
return self._elements.get(name, AtomElement)
elif namespace == app_ns:
return self._app_elements.get(name, APPElement)
## FIXME: is this default good?
return AtomElement
# Otherwise normal lookup
return None
atom_parser = etree.XMLParser()
atom_parser.setElementClassLookup(AtomLookup())
def parse(input):
return etree.parse(input, atom_parser)
def ATOM(atom):
"""
Parse an Atom document
"""
return etree.XML(atom, atom_parser)
def Element(tag, *args, **kw):
"""
Create an Atom element. Adds the Atom namespace if no namespace
is given.
"""
if '{' not in tag:
# No namespace means the atom namespace
tag = '{%s}%s' % (atom_ns, tag)
return atom_parser.makeelement(tag, *args, **kw)
def _strftime(d):
"""
Format a date the way Atom likes it (RFC3339?)
"""
return d.strftime('%Y-%m-%dT%H:%M:%SZ%z')
## try:
## from lxml import builder
## except ImportError:
## pass
## else:
## E = builder.ElementMaker(parser=atom_parser,
## typemap={datetime: lambda e, v: _strftime(v)})
from lxml import builder
E = builder.ElementMaker(#parser=atom_parser,
typemap={datetime: lambda e, v: _strftime(v)})
__all__.append('E')
class NoDefault:
pass
class _LiveList(list):
"""
This list calls on_add or on_remove whenever the list is modified.
"""
on_add = on_remove = None
name = None
def __init__(self, *args, **kw):
on_add = on_remove = name = None
if 'on_add' in kw:
on_add = kw.pop('on_add')
if 'on_remove' in kw:
on_remove = kw.pop('on_remove')
if 'name' in kw:
name = kw.pop('name')
list.__init__(self, *args, **kw)
self.on_add = on_add
self.on_remove = on_remove
self.name = name
def _make_list(self, obj):
if not isinstance(obj, (list, tuple)):
obj = list(obj)
return obj
def _do_add(self, items):
if self.on_add is not None:
for item in items:
self.on_add(self, item)
def _do_remove(self, items):
if self.on_remove is not None:
for item in items:
self.on_remove(self, item)
def __setslice__(self, i, j, other):
other = self._make_list(other)
old = self[i:j]
list.__setslice__(self, i, j, other)
self._do_remove(old)
self._do_add(other)
def __delslice__(self, i, j):
old = self[i:j]
list.__delslice__(self, i, j)
self._do_remove(old)
def __iadd__(self, other):
other = self._make_list(other)
list.__iadd__(self, other)
self._do_add(other)
def __imul__(self, n):
while n > 0:
self += self
n -= 1
def append(self, item):
list.append(self, item)
self._do_add([item])
def insert(self, i, item):
list.insert(self, i, item)
self._do_add([item])
def pop(self, i=-1):
item = self[i]
result = list.pop(self, i)
self._do_remove([item])
return result
def remove(self, item):
list.remove(self, item)
self._do_remove([item])
def extend(self, other):
for item in other:
self.append(item)
def __repr__(self):
name = self.name
if name is None:
name = '_LiveList'
return '%s(%s)' % (name, list.__repr__(self))
class _findall_property(object):
"""
Returns a LiveList of all the objects with the given tag. You can
append or remove items to the list to add or remove them from the
containing tag.
"""
def __init__(self, tag, ns=atom_ns):
self.tag = tag
self.ns = ns
self.__doc__ = 'Return live list of all the <atom:%s> element' % self.tag
def __get__(self, obj, type=None):
if obj is None:
return self
def add(lst, item):
# FIXME: shouldn't just be an append
obj.append(item)
def remove(lst, item):
obj.remove(item)
return _LiveList(obj._atom_iter(self.tag, ns=self.ns),
on_add=add, on_remove=remove,
name='live_%s_list' % self.tag)
def __set__(self, obj, value):
cur = self.__get__(obj)
cur[:] = value
class _text_element_property(object):
"""
Creates an attribute that returns the text content of the given
subelement. E.g., ``title = _text_element_property('title')``
will make ``obj.title`` return the contents of the ``<title>``.
Similarly setting the attribute sets the text content of the
attribute.
"""
def __init__(self, tag, strip=True):
self.tag = tag
self.strip = strip
self.__doc__ = 'Access the <atom:%s> element as text' % self.tag
def __get__(self, obj, type=None):
if obj is None:
return self
v = obj._atom_findtext(self.tag)
if self.strip:
if v is not None:
v = v.strip()
else:
return ''
return v
def __set__(self, obj, value):
el = obj._get_or_create(self.tag)
el.text = value
def __delete__(self, obj):
el = obj._atom_get(self.tag)
if el:
# FIXME: should it be an error if it doesn't exist?
obj.remove(el)
class _element_property(object):
"""
Returns a single subelement based on tag. Setting the attribute
removes the element and adds a new one. Deleting it removes the
element.
"""
def __init__(self, tag):
self.tag = tag
self.__doc__ = 'Get the <atom:%s> element' % self.tag
def __get__(self, obj, type=None):
if obj is None:
return self
return obj._atom_get(self.tag)
def __set__(self, obj, value):
el = obj._atom_get(self.tag)
if el is not None:
parent = el.getparent()
index = parent.index(el)
parent[index] = value
else:
obj.append(value)
def __delete__(self):
el = obj._atom_get(self.tag)
if el is not None:
obj.remove(el)
class _attr_element_property(object):
"""
Get/set the value of the attribute on this element.
"""
def __init__(self, attr, default=NoDefault):
self.attr = attr
self.default = default
self.__doc__ = 'Access the %s attribute' % self.attr
def __get__(self, obj, type=None):
if obj is None:
return self
try:
return obj.attrib[self.attr]
except KeyError:
if self.default is not NoDefault:
return self.default
raise AttributeError(self.attr)
def __set__(self, obj, value):
if value is None:
self.__delete__(obj)
else:
obj.attrib[self.attr] = value
def __delete__(self, obj):
if self.attr in obj.attrib:
del obj.attrib[self.attr]
class _date_element_property(object):
"""
Get/set the parsed date value of the text content of a tag.
"""
def __init__(self, tag, ns=atom_ns):
self.tag = tag
self.ns = ns
self.__doc__ = 'Access the date in %s' % self.tag
def __get__(self, obj, type=None):
if obj is None:
return self
el = obj._atom_get(self.tag, ns=self.ns)
if el is None:
return None
return el.date
def __set__(self, obj, value):
el = obj._get_or_create(self.tag, ns=self.ns)
el.date = value
def __delete__(self):
el = obj._atom_get(self.tag)
if el is not None:
obj.remove(el)
class _date_text_property(object):
def __get__(self, obj, type=None):
if obj is None:
return self
return parse_date(obj.text)
def __set__(self, obj, value):
if not value:
obj.text = None
return
if isinstance(value, datetime):
value = _strftime(value)
obj.text = value
def __del__(self, obj):
obj.text = None
class AtomElement(etree.ElementBase):
def _get_or_create(self, tag, ns=atom_ns):
el = self.find('{%s}%s' % (ns, tag))
if el is None:
el = self.makeelement('{%s}%s' % (ns, tag))
self.append(el)
return el
def _atom_get(self, tag, ns=atom_ns):
for item in self._atom_iter(tag, ns=ns):
return item
return None
def _atom_iter(self, tag, ns=atom_ns):
return self.getiterator('{%s}%s' % (ns, tag))
def _atom_findtext(self, tag, ns=atom_ns):
return self.findtext('{%s}%s' % (ns, tag))
def _get_parent(self, tag, ns=atom_ns):
parent = self
while 1:
if parent.tag == '{%s}%s' % (ns, tag):
return parent
parent = parent.getparent()
if parent is None:
return None
@property
def feed(self):
return self._get_parent('feed')
def rel_links(self, rel='alternate'):
"""
Return all the links with the given ``rel`` attribute. The
default relation is ``'alternate'``, and as specified for Atom
links with no ``rel`` attribute are assumed to mean alternate.
"""
if rel is None:
return self._atom_iter('link')
return [
el for el in self._atom_iter('link')
if el.get('rel') == rel
or rel == 'alternate' and not el.get('rel')]
def __repr__(self):
tag = self.tag
if '}' in tag:
tag = tag.split('}', 1)[1]
return '<%s.%s atom:%s at %s>' % (
self.__class__.__module__,
self.__class__.__name__,
tag,
hex(abs(id(self)))[2:])
class Feed(AtomElement):
"""
For ``<feed>`` elements.
"""
@property
def feed(self):
return self
entries = _findall_property('entry')
title = _text_element_property('title')
author = _element_property('author')
class Entry(AtomElement):
"""
For ``<entry>`` elements.
"""
@property
def entry(self):
return self
id = _text_element_property('id')
title = _text_element_property('title')
published = _date_element_property('published')
updated = _date_element_property('updated')
edited = _date_element_property('edited', ns=app_ns)
def update_edited(self):
"""
Set app:edited to current time
"""
self.edited = datetime.utcnow()
def update_updated(self):
"""
Set atom:updated to the current time
"""
self.updated = datetime.utcnow()
def make_id(self):
"""
Create an artificial id for this entry
"""
assert not self.id, (
"You cannot make an id if one already exists")
self.id = 'uuid:%s' % uuid.uuid4()
def author__get(self):
el = self._atom_get('author')
if el is None:
if self.feed is not None:
return self.feed.author
return el
def author__set(self, value):
el = self._atom_get('author')
if el is not None:
self.remove(el)
self.append(value)
def author__del(self):
el = self._atom_get('author')
if el is not None:
self.remove(el)
author = property(author__get, author__set, author__del)
categories = _findall_property('category')
class _EntryElement(AtomElement):
@property
def entry(self):
return self._get_parent('entry')
class Category(_EntryElement):
"""
For ``<category>`` elements.
"""
term = _attr_element_property('term')
scheme = _attr_element_property('scheme', None)
label = _attr_element_property('label', None)
def as_string(self):
"""
Returns the string representation of the category, using the
GData convention of ``{scheme}term``
"""
if self.scheme is not None:
return '{%s}%s' % (self.scheme, self.term)
else:
return self.term
class PersonElement(_EntryElement):
"""
Represents authors and contributors
"""
email = _text_element_property('email')
uri = _text_element_property('uri')
name = _text_element_property('name')
class DateElement(_EntryElement):
"""
For elements that contain a date in their text content.
"""
date = _date_text_property()
class TextElement(_EntryElement):
type = _attr_element_property('type', None)
src = _attr_element_property('src', None)
def _html__get(self):
"""
Gives the parsed HTML of element's content. May return an
HtmlElement (from lxml.html) or an XHTML tree. If the element
is ``type="text"`` then it is returned as quoted HTML.
You can also set this attribute to either an lxml.html
element, an XHTML element, or an HTML string.
Raises AttributeError if this is not HTML content.
"""
## FIXME: should this handle text/html types?
if self.type == 'html':
content = self.text
elif self.type == 'text':
content = cgi.escape(self.text)
elif self.type == 'xhtml':
div = copy.deepcopy(self[0])
# Now remove the namespaces:
for el in div.getiterator():
if el.tag.startswith('{'):
el.tag = el.tag.split('}', 1)[1]
if div.tag.startswith('{'):
div.tag = el.tag.split('}', 1)[1]
from lxml.html import tostring
content = tostring(div)
else:
raise AttributeError(
"Not an HTML or text content (type=%r)" % self.type)
from lxml.html import fromstring
return fromstring(content)
def _html__set(self, value):
if value is None:
del self.html
return
if isinstance(value, basestring):
# Some HTML text
self.type = 'html'
self.text = value
return
if value.tag.startswith('{%s}' % xhtml_ns):
if value.tag != '{%s}div' % xhtml_ns:
# Need to wrap it in a <div>
el = self.makeelement('{%s}div' % xhtml_ns)
el.append(value)
value = el
self[:] = []
self.type = 'xhtml'
self.append(value)
return
from lxml import html
if isinstance(value, html.HtmlElement):
value = tostring(value)
self[:] = []
self.type = 'html'
self.text = value
return
raise TypeError(
"Unknown HTML type: %s" % type(value))
def _html__del(self):
self.text = None
html = property(_html__get, _html__set, _html__del, doc=_html__get.__doc__)
def _binary__get(self):
"""
Gets/sets the binary content, which is base64 encoded in the
text.
"""
text = self.text
if text is None:
raise AttributeError(
"No text (maybe in src?)")
text = text.decode('base64')
return text
def _binary__set(self, value):
if isinstance(value, unicode):
## FIXME: is this kosher?
value = value.encode('utf8')
if not isinstance(value, str):
raise TypeError(
"Must set .binary to a str or unicode object (not %s)"
% type(value))
value = value.encode('base64')
self.text = value
def _binary__del(self):
self.text = None
binary = property(_binary__get, _binary__set, _binary__del, doc=_binary__get.__doc__)
class LinkElement(_EntryElement):
"""
For ``<link>`` elements.
"""
href = _attr_element_property('href', None)
rel = _attr_element_property('rel', None)
type = _attr_element_property('type', None)
title = _attr_element_property('title', None)
def __repr__(self):
return '<%s.%s at %s rel=%r href=%r>' % (
self.__class__.__module__,
self.__class__.__name__,
hex(abs(id(self)))[2:],
self.rel, self.href)
AtomLookup._elements.update(dict(
feed=Feed,
entry=Entry,
category=Category,
author=PersonElement,
contributor=PersonElement,
published=DateElement,
updated=DateElement,
content=TextElement,
summary=TextElement,
title=TextElement,
rights=TextElement,
subtitle=TextElement,
link=LinkElement,
))
class APPElement(etree.ElementBase):
def __repr__(self):
tag = self.tag
if '}' in tag:
tag = tag.split('}', 1)[1]
return '<%s.%s app:%s at %s>' % (
self.__class__.__module__,
self.__class__.__name__,
tag,
hex(abs(id(self)))[2:])
class Service(APPElement):
workspaces = _findall_property('workspace', ns=app_ns)
class Workspace(APPElement):
collections = _findall_property('collection', ns=app_ns)
class Collection(APPElement):
pass
class Edited(APPElement):
date = _date_text_property()
AtomLookup._app_elements.update(dict(
service=Service,
workspace=Workspace,
collection=Collection,
edited=Edited,
))