from cinder.api import xmlutil
from cinder import flags
from cinder.openstack.common import log as logging
-from xml.dom import minidom
+from cinder import utils
LOG = logging.getLogger(__name__)
class MetadataDeserializer(wsgi.MetadataXMLDeserializer):
def deserialize(self, text):
- dom = minidom.parseString(text)
+ dom = utils.safe_minidom_parse_string(text)
metadata_node = self.find_first_child_named(dom, "metadata")
metadata = self.extract_metadata(metadata_node)
return {'body': {'metadata': metadata}}
class MetaItemDeserializer(wsgi.MetadataXMLDeserializer):
def deserialize(self, text):
- dom = minidom.parseString(text)
+ dom = utils.safe_minidom_parse_string(text)
metadata_item = self.extract_metadata(dom)
return {'body': {'meta': metadata_item}}
return metadata
def _extract_metadata_container(self, datastring):
- dom = minidom.parseString(datastring)
+ dom = utils.safe_minidom_parse_string(datastring)
metadata_node = self.find_first_child_named(dom, "metadata")
metadata = self.extract_metadata(metadata_node)
return {'body': {'metadata': metadata}}
return self._extract_metadata_container(datastring)
def update(self, datastring):
- dom = minidom.parseString(datastring)
+ dom = utils.safe_minidom_parse_string(datastring)
metadata_item = self.extract_metadata(dom)
return {'body': {'meta': metadata_item}}
"""The hosts admin extension."""
import webob.exc
-from xml.dom import minidom
from xml.parsers import expat
from cinder.api import extensions
class HostDeserializer(wsgi.XMLDeserializer):
def default(self, string):
try:
- node = minidom.parseString(string)
+ node = utils.safe_minidom_parse_string(string)
except expat.ExpatError:
msg = _("cannot understand XML")
raise exception.MalformedRequestBody(reason=msg)
# under the License.
import webob
-from xml.dom import minidom
from cinder.api import extensions
from cinder.api.openstack import wsgi
from cinder import flags
from cinder.openstack.common import log as logging
from cinder.openstack.common.rpc import common as rpc_common
+from cinder import utils
from cinder import volume
class VolumeToImageDeserializer(wsgi.XMLDeserializer):
"""Deserializer to handle xml-formatted requests."""
def default(self, string):
- dom = minidom.parseString(string)
+ dom = utils.safe_minidom_parse_string(string)
action_node = dom.childNodes[0]
action_name = action_node.tagName
from cinder import exception
from cinder.openstack.common import jsonutils
from cinder.openstack.common import log as logging
+from cinder import utils
from cinder import wsgi
from lxml import etree
plurals = set(self.metadata.get('plurals', {}))
try:
- node = minidom.parseString(datastring).childNodes[0]
+ node = utils.safe_minidom_parse_string(datastring).childNodes[0]
return {node.nodeName: self._from_xml_node(node, plurals)}
except expat.ExpatError:
msg = _("cannot understand XML")
def action_peek_xml(body):
"""Determine action to invoke."""
- dom = minidom.parseString(body)
+ dom = utils.safe_minidom_parse_string(body)
action_node = dom.childNodes[0]
return action_node.tagName
import webob
from webob import exc
-from xml.dom import minidom
from cinder.api import common
from cinder.api.openstack import wsgi
from cinder import flags
from cinder.openstack.common import log as logging
from cinder.openstack.common import uuidutils
+from cinder import utils
from cinder import volume
from cinder.volume import volume_types
def default(self, string):
"""Deserialize an xml-formatted volume create request."""
- dom = minidom.parseString(string)
+ dom = utils.safe_minidom_parse_string(string)
volume = self._extract_volume(dom)
return {'body': {'volume': volume}}
import webob
from webob import exc
-from xml.dom import minidom
from cinder.api import common
from cinder.api.openstack import wsgi
from cinder import flags
from cinder.openstack.common import log as logging
from cinder.openstack.common import uuidutils
+from cinder import utils
from cinder import volume
from cinder.volume import volume_types
def default(self, string):
"""Deserialize an xml-formatted volume create request."""
- dom = minidom.parseString(string)
+ dom = utils.safe_minidom_parse_string(string)
volume = self._extract_volume(dom)
return {'body': {'volume': volume}}
result = utils.service_is_up(service)
self.assertFalse(result)
+ def test_safe_parse_xml(self):
+
+ normal_body = ("""
+ <?xml version="1.0" ?><foo>
+ <bar>
+ <v1>hey</v1>
+ <v2>there</v2>
+ </bar>
+ </foo>""").strip()
+
+ def killer_body():
+ return (("""<!DOCTYPE x [
+ <!ENTITY a "%(a)s">
+ <!ENTITY b "%(b)s">
+ <!ENTITY c "%(c)s">]>
+ <foo>
+ <bar>
+ <v1>%(d)s</v1>
+ </bar>
+ </foo>""") % {
+ 'a': 'A' * 10,
+ 'b': '&a;' * 10,
+ 'c': '&b;' * 10,
+ 'd': '&c;' * 9999,
+ }).strip()
+
+ dom = utils.safe_minidom_parse_string(normal_body)
+ self.assertEqual(normal_body, str(dom.toxml()))
+
+ self.assertRaises(ValueError,
+ utils.safe_minidom_parse_string,
+ killer_body())
+
def test_xhtml_escape(self):
self.assertEqual('"foo"', utils.xhtml_escape('"foo"'))
self.assertEqual(''foo'', utils.xhtml_escape("'foo'"))
import time
import types
import warnings
+from xml.dom import minidom
+from xml.parsers import expat
+from xml import sax
+from xml.sax import expatreader
from xml.sax import saxutils
from eventlet import event
return self.done.wait()
+class ProtectedExpatParser(expatreader.ExpatParser):
+ """An expat parser which disables DTD's and entities by default."""
+
+ def __init__(self, forbid_dtd=True, forbid_entities=True,
+ *args, **kwargs):
+ # Python 2.x old style class
+ expatreader.ExpatParser.__init__(self, *args, **kwargs)
+ self.forbid_dtd = forbid_dtd
+ self.forbid_entities = forbid_entities
+
+ def start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
+ raise ValueError("Inline DTD forbidden")
+
+ def entity_decl(self, entityName, is_parameter_entity, value, base,
+ systemId, publicId, notationName):
+ raise ValueError("<!ENTITY> forbidden")
+
+ def unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
+ # expat 1.2
+ raise ValueError("<!ENTITY> forbidden")
+
+ def reset(self):
+ expatreader.ExpatParser.reset(self)
+ if self.forbid_dtd:
+ self._parser.StartDoctypeDeclHandler = self.start_doctype_decl
+ if self.forbid_entities:
+ self._parser.EntityDeclHandler = self.entity_decl
+ self._parser.UnparsedEntityDeclHandler = self.unparsed_entity_decl
+
+
+def safe_minidom_parse_string(xml_string):
+ """Parse an XML string using minidom safely.
+
+ """
+ try:
+ return minidom.parseString(xml_string, parser=ProtectedExpatParser())
+ except sax.SAXParseException as se:
+ raise expat.ExpatError()
+
+
def xhtml_escape(value):
"""Escapes a string so it is valid within XML or XHTML.