class Controller(object):
- def __init__(self, plugin, collection, resource, params):
+ def __init__(self, plugin, collection, resource, attr_info):
self._plugin = plugin
self._collection = collection
self._resource = resource
- self._params = params
+ self._attr_info = attr_info
self._view = getattr(views, self._resource)
def _items(self, request):
def create(self, request, body=None):
"""Creates a new instance of the requested entity"""
- body = self._prepare_request_body(body, allow_bulk=True)
+
+ body = self._prepare_request_body(request.context, body, True,
+ allow_bulk=True)
obj_creator = getattr(self._plugin,
"create_%s" % self._resource)
kwargs = {self._resource: body}
def update(self, request, id, body=None):
"""Updates the specified entity's attributes"""
+ body = self._prepare_request_body(request.context, body, False)
obj_updater = getattr(self._plugin,
"update_%s" % self._resource)
kwargs = {self._resource: body}
obj = obj_updater(request.context, id, **kwargs)
return {self._resource: self._view(obj)}
- def _prepare_request_body(self, body, allow_bulk=False):
- """ verifies required parameters are in request body.
- Parameters with default values are considered to be
+ def _populate_tenant_id(self, context, res_dict, is_create):
+
+ if self._resource not in ['network', 'port']:
+ return
+
+ if ('tenant_id' in res_dict and
+ res_dict['tenant_id'] != context.tenant_id and
+ not context.is_admin):
+ msg = _("Specifying 'tenant_id' other than authenticated"
+ "tenant in request requires admin privileges")
+ raise webob.exc.HTTPBadRequest(msg)
+
+ if is_create and 'tenant_id' not in res_dict:
+ if context.tenant_id:
+ res_dict['tenant_id'] = context.tenant_id
+ else:
+ msg = _("Running without keystyone AuthN requires "
+ " that tenant_id is specified")
+ raise webob.exc.HTTPBadRequest(msg)
+
+ def _prepare_request_body(self, context, body, is_create,
+ allow_bulk=False):
+ """ verifies required attributes are in request body, and that
+ an attribute is only specified if it is allowed for the given
+ operation (create/update).
+ Attribute with default values are considered to be
optional.
body argument must be the deserialized body
body = body or {self._resource: {}}
if self._collection in body and allow_bulk:
- bulk_body = [self._prepare_request_body({self._resource: b})
+ bulk_body = [self._prepare_request_body(context,
+ {self._resource: b},
+ is_create)
if self._resource not in b
- else self._prepare_request_body(b)
+ else self._prepare_request_body(context, b, is_create)
for b in body[self._collection]]
if not bulk_body:
msg = _("Unable to find '%s' in request body") % self._resource
raise webob.exc.HTTPBadRequest(msg)
- for param in self._params:
- param_value = res_dict.get(param['attr'], param.get('default'))
- if param_value is None:
- msg = _("Failed to parse request. Parameter %s not "
- "specified") % param
- raise webob.exc.HTTPUnprocessableEntity(msg)
- res_dict[param['attr']] = param_value
+ self._populate_tenant_id(context, res_dict, is_create)
+
+ if is_create: # POST
+ for attr, attr_vals in self._attr_info.iteritems():
+ is_required = ('default' not in attr_vals and
+ attr_vals['allow_post'])
+ if is_required and attr not in res_dict:
+ msg = _("Failed to parse request. Required "
+ " attribute '%s' not specified") % attr
+ raise webob.exc.HTTPUnprocessableEntity(msg)
+
+ if not attr_vals['allow_post'] and attr in res_dict:
+ msg = _("Attribute '%s' not allowed in POST" % attr)
+ raise webob.exc.HTTPUnprocessableEntity(msg)
+
+ if attr_vals['allow_post']:
+ res_dict[attr] = res_dict.get(attr,
+ attr_vals.get('default'))
+
+ else: # PUT
+ for attr, attr_vals in self._attr_info.iteritems():
+ if attr in res_dict and not attr_vals['allow_put']:
+ msg = _("Cannot update read-only attribute %s") % attr
+ raise webob.exc.HTTPUnprocessableEntity(msg)
+
return body
REQUIREMENTS = {'id': UUID_PATTERN, 'format': 'xml|json'}
-RESOURCE_PARAM_MAP = {
- 'networks': [
- {'attr': 'name'},
- ],
- 'ports': [
- {'attr': 'state', 'default': 'DOWN'},
- ],
- 'subnets': [
- {'attr': 'prefix'},
- {'attr': 'network_id'},
- ]
+ATTR_NOT_SPECIFIED = object()
+
+# Note: a default of ATTR_NOT_SPECIFIED indicates that an
+# attribute is not required, but will be generated by the plugin
+# if it is not specified. Particularly, a value of ATTR_NOT_SPECIFIED
+# is different from an attribute that has been specified with a value of
+# None. For example, if 'gateway_ip' is ommitted in a request to
+# create a subnet, the plugin will receive ATTR_NOT_SPECIFIED
+# and the default gateway_ip will be generated.
+# However, if gateway_ip is specified as None, this means that
+# the subnet does not have a gateway IP.
+
+RESOURCE_ATTRIBUTE_MAP = {
+ 'networks': {
+ 'id': {'allow_post': False, 'allow_put': False},
+ 'name': {'allow_post': True, 'allow_put': True},
+ 'subnets': {'allow_post': True, 'allow_put': True, 'default': []},
+ 'admin_state_up': {'allow_post': True, 'allow_put': True,
+ 'default': True},
+ 'status': {'allow_post': False, 'allow_put': False},
+ 'tenant_id': {'allow_post': True, 'allow_put': True},
+ },
+ 'ports': {
+ 'id': {'allow_post': False, 'allow_put': False},
+ 'network_id': {'allow_post': True, 'allow_put': False},
+ 'admin_state_up': {'allow_post': True, 'allow_put': True,
+ 'default': True},
+ 'mac_address': {'allow_post': True, 'allow_put': False,
+ 'default': ATTR_NOT_SPECIFIED},
+ 'fixed_ips_v4': {'allow_post': True, 'allow_put': True,
+ 'default': ATTR_NOT_SPECIFIED},
+ 'fixed_ips_v6': {'allow_post': True, 'allow_put': True,
+ 'default': ATTR_NOT_SPECIFIED},
+ 'host_routes': {'allow_post': True, 'allow_put': True,
+ 'default': ATTR_NOT_SPECIFIED},
+ 'device_id': {'allow_post': True, 'allow_put': True, 'default': ''},
+ 'tenant_id': {'allow_post': True, 'allow_put': True},
+ },
+ 'subnets': {
+ 'id': {'allow_post': False, 'allow_put': False},
+ 'ip_version': {'allow_post': True, 'allow_put': False},
+ 'network_id': {'allow_post': True, 'allow_put': False},
+ 'cidr': {'allow_post': True, 'allow_put': False},
+ 'gateway_ip': {'allow_post': True, 'allow_put': True,
+ 'default': ATTR_NOT_SPECIFIED},
+ 'dns_namesevers': {'allow_post': True, 'allow_put': True,
+ 'default': ATTR_NOT_SPECIFIED},
+ 'additional_host_routes': {'allow_post': True, 'allow_put': True,
+ 'default': ATTR_NOT_SPECIFIED},
+ }
}
mapper.connect('index', '/', controller=Index(resources))
for resource in resources:
_map_resource(resources[resource], resource,
- RESOURCE_PARAM_MAP.get(resources[resource],
+ RESOURCE_ATTRIBUTE_MAP.get(resources[resource],
dict()))
super(APIRouter, self).__init__(mapper)
def port(port_data):
"""Represents a view for a port object"""
keys = ('id', 'network_id', 'mac_address', 'fixed_ips',
- 'device_id', 'admin_state_up', 'tenant_id', 'op_status')
+ 'device_id', 'admin_state_up', 'tenant_id', 'status')
return resource(port_data, keys)
def network(network_data):
"""Represents a view for a network object"""
- keys = ('id', 'name', 'subnets', 'admin_state_up', 'op_status',
+ keys = ('id', 'name', 'subnets', 'admin_state_up', 'status',
'tenant_id', 'mac_ranges')
return resource(network_data, keys)
def subnet(subnet_data):
"""Represents a view for a subnet object"""
keys = ('id', 'network_id', 'tenant_id', 'gateway_ip', 'ip_version',
- 'prefix')
+ 'cidr')
return resource(subnet_data, keys)
# limitations under the License.
import logging
+import random
+import netaddr
from sqlalchemy import orm
from sqlalchemy.orm import exc
from quantum import quantum_plugin_base_v2
+from quantum.api.v2 import router as api_router
from quantum.common import exceptions as q_exc
from quantum.db import api as db
from quantum.db import models_v2
'name': network['name'],
'tenant_id': network['tenant_id'],
'admin_state_up': network['admin_state_up'],
- 'op_status': network['op_status'],
+ 'status': network['status'],
'subnets': [subnet['id']
for subnet in network['subnets']]}
def _make_subnet_dict(self, subnet, fields=None):
res = {'id': subnet['id'],
'network_id': subnet['network_id'],
- 'tenant_id': subnet['tenant_id'],
'ip_version': subnet['ip_version'],
- 'prefix': subnet['prefix'],
+ 'cidr': subnet['cidr'],
'gateway_ip': subnet['gateway_ip']}
return self._fields(res, fields)
'tenant_id': port['tenant_id'],
"mac_address": port["mac_address"],
"admin_state_up": port["admin_state_up"],
- "op_status": port["op_status"],
+ "status": port["status"],
"fixed_ips": [ip["address"] for ip in port["fixed_ips"]],
"device_id": port["device_id"]}
return self._fields(res, fields)
network = models_v2.Network(tenant_id=tenant_id,
name=n['name'],
admin_state_up=n['admin_state_up'],
- op_status="ACTIVE")
+ status="ACTIVE")
context.session.add(network)
return self._make_network_dict(network)
def create_subnet(self, context, subnet):
s = subnet['subnet']
- # NOTE(jkoelker) Get the tenant_id outside of the session to avoid
- # unneeded db action if the operation raises
- tenant_id = self._get_tenant_id_for_create(context, s)
+
+ if s['gateway_ip'] == api_router.ATTR_NOT_SPECIFIED:
+ net = netaddr.IPNetwork(s['cidr'])
+ s['gateway_ip'] = str(netaddr.IPAddress(net.first + 1))
+
with context.session.begin():
- subnet = models_v2.Subnet(tenant_id=tenant_id,
- network_id=s['network_id'],
+ subnet = models_v2.Subnet(network_id=s['network_id'],
ip_version=s['ip_version'],
- prefix=s['prefix'],
+ cidr=s['cidr'],
gateway_ip=s['gateway_ip'])
context.session.add(subnet)
# unneeded db action if the operation raises
tenant_id = self._get_tenant_id_for_create(context, p)
- #FIXME(danwent): allocate MAC
- mac_address = p.get('mac_address', 'ca:fe:de:ad:be:ef')
+ if p['mac_address'] == api_router.ATTR_NOT_SPECIFIED:
+ #FIXME(danwent): this is exact Nova mac generation logic
+ # we will want to provide more flexibility and to check
+ # for uniqueness.
+ mac = [0xfa, 0x16, 0x3e, random.randint(0x00, 0x7f),
+ random.randint(0x00, 0xff), random.randint(0x00, 0xff)]
+ p['mac_address'] = ':'.join(map(lambda x: "%02x" % x, mac))
+
with context.session.begin():
network = self._get_network(context, p["network_id"])
port = models_v2.Port(tenant_id=tenant_id,
network_id=p['network_id'],
- mac_address=mac_address,
+ mac_address=p['mac_address'],
admin_state_up=p['admin_state_up'],
- op_status="ACTIVE",
+ status="ACTIVE",
device_id=p['device_id'])
context.session.add(port)
fixed_ips = orm.relationship(IPAllocation, backref='ports')
mac_address = sa.Column(sa.String(32), nullable=False)
admin_state_up = sa.Column(sa.Boolean(), nullable=False)
- op_status = sa.Column(sa.String(16), nullable=False)
+ status = sa.Column(sa.String(16), nullable=False)
device_id = sa.Column(sa.String(255), nullable=False)
-class Subnet(model_base.BASEV2, HasTenant):
+class Subnet(model_base.BASEV2):
"""Represents a quantum subnet"""
network_id = sa.Column(sa.String(36), sa.ForeignKey('networks.id'))
allocations = orm.relationship(IPAllocation,
backref=orm.backref('subnet',
uselist=False))
ip_version = sa.Column(sa.Integer, nullable=False)
- prefix = sa.Column(sa.String(255), nullable=False)
+ cidr = sa.Column(sa.String(64), nullable=False)
gateway_ip = sa.Column(sa.String(255))
#TODO(danwent):
name = sa.Column(sa.String(255))
ports = orm.relationship(Port, backref='networks')
subnets = orm.relationship(Subnet, backref='networks')
- op_status = sa.Column(sa.String(16))
+ status = sa.Column(sa.String(16))
admin_state_up = sa.Column(sa.Boolean)
is bound.
"ip_version": integer indicating IP protocol version.
example: 4
- "prefix": string indicating IP prefix indicating addresses
- that can be allocated for devices on this subnet.
- example: "10.0.0.0/24"
+ "cidr": string indicating IP prefix indicating addresses
+ that can be allocated for devices on this subnet.
+ example: "10.0.0.0/24"
"gateway_ip": string indicating the default gateway
for devices on this subnet. example: "10.0.0.1"
"dns_nameservers": list of strings stricting indication the
DNS name servers for devices on this
subnet. example: [ "8.8.8.8", "8.8.4.4" ]
- "excluded_ranges" : list of dicts indicating pairs of IPs that
- should not be allocated from the prefix.
+ "reserved_ranges" : list of dicts indicating pairs of IPs that
+ should not be automatically allocated from
+ the prefix.
example: [ { "start" : "10.0.0.2",
"end" : "10.0.0.5" } ]
- "additional_routes": list of dicts indicating routes beyond
+ "additional_host_routes": list of dicts indicating routes beyond
the default gateway and local prefix route
that should be injected into the device.
example: [{"destination": "192.168.0.0/16",
from webob import exc
+from quantum import context
from quantum.common import exceptions as q_exc
from quantum.api.v2 import resource as wsgi_resource
from quantum.api.v2 import router
LOG = logging.getLogger(__name__)
+def _uuid():
+ return str(uuid.uuid4())
+
+
def _get_path(resource, id=None, fmt=None):
path = '/%s' % resource
verbose=True)
+# Note: since all resources use the same controller and validation
+# logic, we actually get really good coverage from testing just networks.
class JSONV2TestCase(APIv2TestCase):
def test_list(self):
- return_value = [{'network': {'name': 'net1',
- 'admin_state_up': True,
- 'subnets': []}}]
+ input_dict = {'id': str(uuid.uuid4()),
+ 'name': 'net1',
+ 'admin_state_up': True,
+ 'status': "ACTIVE",
+ 'tenant_id': str(uuid.uuid4()),
+ 'subnets': []}
+ return_value = [input_dict]
instance = self.plugin.return_value
instance.get_networks.return_value = return_value
res = self.api.get(_get_path('networks'))
self.assertTrue('networks' in res.json)
+ self.assertEqual(len(res.json['networks']), 1)
+ output_dict = res.json['networks'][0]
+ self.assertEqual(len(input_dict), len(output_dict))
+ for k, v in input_dict.iteritems():
+ self.assertEqual(v, output_dict[k])
def test_create(self):
- data = {'network': {'name': 'net1', 'admin_state_up': True}}
- return_value = {'subnets': []}
+ net_id = _uuid()
+ data = {'network': {'name': 'net1', 'admin_state_up': True,
+ 'tenant_id': _uuid()}}
+ return_value = {'subnets': [], 'status': "ACTIVE",
+ 'id': net_id}
return_value.update(data['network'].copy())
instance = self.plugin.return_value
instance.create_network.return_value = return_value
res = self.api.post_json(_get_path('networks'), data)
+
self.assertEqual(res.status_int, exc.HTTPCreated.code)
+ self.assertTrue('network' in res.json)
+ net = res.json['network']
+ self.assertEqual(net['id'], net_id)
+ self.assertEqual(net['status'], "ACTIVE")
+
+ def test_create_use_defaults(self):
+ net_id = _uuid()
+ initial_input = {'network': {'name': 'net1', 'tenant_id': _uuid()}}
+ full_input = {'network': {'admin_state_up': True, 'subnets': []}}
+ full_input['network'].update(initial_input['network'])
+
+ return_value = {'id': net_id, 'status': "ACTIVE"}
+ return_value.update(full_input['network'])
+
+ instance = self.plugin.return_value
+ instance.create_network.return_value = return_value
+
+ res = self.api.post_json(_get_path('networks'), initial_input)
+
+ instance.create_network.assert_called_with(mock.ANY,
+ network=full_input)
+ self.assertEqual(res.status_int, exc.HTTPCreated.code)
+ self.assertTrue('network' in res.json)
+ net = res.json['network']
+ self.assertEqual(net['id'], net_id)
+ self.assertEqual(net['admin_state_up'], True)
+ self.assertEqual(net['status'], "ACTIVE")
+
+ def test_create_no_keystone_env(self):
+ data = {'name': 'net1'}
+ res = self.api.post_json(_get_path('networks'), data,
+ expect_errors=True)
+ self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
+
+ def test_create_with_keystone_env(self):
+ tenant_id = _uuid()
+ net_id = _uuid()
+ env = {'quantum.context': context.Context('', tenant_id)}
+ # tenant_id should be fetched from env
+ initial_input = {'network': {'name': 'net1'}}
+ full_input = {'network': {'admin_state_up': True, 'subnets': [],
+ 'tenant_id': tenant_id}}
+ full_input['network'].update(initial_input['network'])
+
+ return_value = {'id': net_id, 'status': "ACTIVE"}
+ return_value.update(full_input['network'])
+
+ instance = self.plugin.return_value
+ instance.create_network.return_value = return_value
+
+ res = self.api.post_json(_get_path('networks'), initial_input,
+ extra_environ=env)
+
+ instance.create_network.assert_called_with(mock.ANY,
+ network=full_input)
+ self.assertEqual(res.status_int, exc.HTTPCreated.code)
+
+ def test_create_bad_keystone_tenant(self):
+ tenant_id = _uuid()
+ data = {'network': {'name': 'net1', 'tenant_id': tenant_id}}
+ env = {'quantum.context': context.Context('', tenant_id + "bad")}
+ res = self.api.post_json(_get_path('networks'), data,
+ expect_errors=True,
+ extra_environ=env)
+ self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
def test_create_no_body(self):
data = {'whoa': None}
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
def test_create_missing_attr(self):
- data = {'network': {'what': 'who'}}
+ data = {'network': {'what': 'who', 'tenant_id': _uuid()}}
+ res = self.api.post_json(_get_path('networks'), data,
+ expect_errors=True)
+ self.assertEqual(res.status_int, 422)
+
+ def test_create_readonly_attr(self):
+ data = {'network': {'name': 'net1', 'tenant_id': _uuid(),
+ 'status': "ACTIVE"}}
res = self.api.post_json(_get_path('networks'), data,
expect_errors=True)
self.assertEqual(res.status_int, 422)
def test_create_bulk(self):
- data = {'networks': [{'name': 'net1', 'admin_state_up': True},
- {'name': 'net2', 'admin_state_up': True}]}
+ data = {'networks': [{'name': 'net1', 'admin_state_up': True,
+ 'tenant_id': _uuid()},
+ {'name': 'net2', 'admin_state_up': True,
+ 'tenant_id': _uuid()}]}
def side_effect(context, network):
nets = network.copy()
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
def test_create_bulk_missing_attr(self):
- data = {'networks': [{'what': 'who'}]}
+ data = {'networks': [{'what': 'who', 'tenant_id': _uuid()}]}
res = self.api.post_json(_get_path('networks'), data,
expect_errors=True)
self.assertEqual(res.status_int, 422)
def test_create_bulk_partial_body(self):
- data = {'networks': [{'name': 'net1', 'admin_state_up': True},
- {}]}
+ data = {'networks': [{'name': 'net1', 'admin_state_up': True,
+ 'tenant_id': _uuid()},
+ {'tenant_id': _uuid()}]}
res = self.api.post_json(_get_path('networks'), data,
expect_errors=True)
self.assertEqual(res.status_int, 422)
+ def test_create_attr_not_specified(self):
+ net_id = _uuid()
+ tenant_id = _uuid()
+ device_id = _uuid()
+ initial_input = {'port': {'network_id': net_id, 'tenant_id': tenant_id,
+ 'device_id': device_id,
+ 'admin_state_up': True}}
+ full_input = {'port': {'admin_state_up': True,
+ 'mac_address': router.ATTR_NOT_SPECIFIED,
+ 'fixed_ips_v4': router.ATTR_NOT_SPECIFIED,
+ 'fixed_ips_v6': router.ATTR_NOT_SPECIFIED,
+ 'host_routes': router.ATTR_NOT_SPECIFIED}}
+ full_input['port'].update(initial_input['port'])
+ return_value = {'id': _uuid(), 'status': 'ACTIVE',
+ 'admin_state_up': True,
+ 'mac_address': 'ca:fe:de:ad:be:ef',
+ 'fixed_ips_v4': ['10.0.0.0/24'],
+ 'fixed_ips_v6': [],
+ 'host_routes': [],
+ 'device_id': device_id}
+ return_value.update(initial_input['port'])
+
+ instance = self.plugin.return_value
+ instance.create_port.return_value = return_value
+ res = self.api.post_json(_get_path('ports'), initial_input)
+
+ instance.create_port.assert_called_with(mock.ANY, port=full_input)
+ self.assertEqual(res.status_int, exc.HTTPCreated.code)
+ self.assertTrue('port' in res.json)
+ port = res.json['port']
+ self.assertEqual(port['network_id'], net_id)
+ self.assertEqual(port['mac_address'], 'ca:fe:de:ad:be:ef')
+
def test_fields(self):
return_value = {'name': 'net1', 'admin_state_up': True,
'subnets': []}
self.assertEqual(res.status_int, exc.HTTPNoContent.code)
def test_update(self):
- data = {'network': {'name': 'net1', 'admin_state_up': True}}
+ # leave out 'name' field intentionally
+ data = {'network': {'admin_state_up': True}}
return_value = {'subnets': []}
return_value.update(data['network'].copy())
self.api.put_json(_get_path('networks',
id=str(uuid.uuid4())), data)
+ def test_update_readonly_field(self):
+ data = {'network': {'status': "NANANA"}}
+ res = self.api.put_json(_get_path('networks', id=_uuid()), data,
+ expect_errors=True)
+ self.assertEqual(res.status_int, 422)
+
class V2Views(unittest.TestCase):
def _view(self, keys, func):
self.assertTrue('two' not in res)
def test_network(self):
- keys = ('id', 'name', 'subnets', 'admin_state_up', 'op_status',
+ keys = ('id', 'name', 'subnets', 'admin_state_up', 'status',
'tenant_id', 'mac_ranges')
self._view(keys, views.network)
def test_port(self):
keys = ('id', 'network_id', 'mac_address', 'fixed_ips',
- 'device_id', 'admin_state_up', 'tenant_id', 'op_status')
+ 'device_id', 'admin_state_up', 'tenant_id', 'status')
self._view(keys, views.port)
def test_subnet(self):
keys = ('id', 'network_id', 'tenant_id', 'gateway_ip',
- 'ip_version', 'prefix')
+ 'ip_version', 'cidr')
self._view(keys, views.subnet)
def _create_network(self, fmt, name, admin_status_up):
data = {'network': {'name': name,
- 'admin_state_up': admin_status_up}}
+ 'admin_state_up': admin_status_up,
+ 'tenant_id': self._tenant_id}}
network_req = self.new_create_request('networks', data, fmt)
return network_req.get_response(self.api)
- def _create_subnet(self, fmt, net_id, gateway_ip, prefix):
+ def _create_subnet(self, fmt, net_id, gateway_ip, cidr):
data = {'subnet': {'network_id': net_id,
- 'allocations': [],
- 'prefix': prefix,
- 'ip_version': 4,
- 'gateway_ip': gateway_ip}}
+ 'cidr': cidr,
+ 'ip_version': 4}}
+ if gateway_ip:
+ data['subnet']['gateway_ip'] = gateway_ip
+
subnet_req = self.new_create_request('subnets', data, fmt)
return subnet_req.get_response(self.api)
- def _make_subnet(self, fmt, network, gateway, prefix):
+ def _create_port(self, fmt, net_id, custom_req_body=None,
+ expected_res_status=None, **kwargs):
+ content_type = 'application/' + fmt
+ data = {'port': {'network_id': net_id,
+ 'tenant_id': self._tenant_id}}
+ for arg in ('admin_state_up', 'device_id', 'mac_address',
+ 'fixed_ips_v4', 'fixed_ips_v6'):
+ if arg in kwargs:
+ data['port'][arg] = kwargs[arg]
+
+ port_req = self.new_create_request('ports', data, fmt)
+ return port_req.get_response(self.api)
+
+ def _make_subnet(self, fmt, network, gateway, cidr):
res = self._create_subnet(fmt, network['network']['id'],
- gateway, prefix)
+ gateway, cidr)
+ return self.deserialize(fmt, res)
+
+ def _make_port(self, fmt, net_id, **kwargs):
+ res = self._create_port(fmt, net_id, **kwargs)
return self.deserialize(fmt, res)
def _delete(self, collection, id):
self._delete('networks', network['network']['id'])
@contextlib.contextmanager
- def subnet(self, network=None, gateway='10.0.0.1',
- prefix='10.0.0.0/24', fmt='json'):
+ def subnet(self, network=None, gateway=None,
+ cidr='10.0.0.0/24', fmt='json'):
# TODO(anyone) DRY this
if not network:
with self.network() as network:
- subnet = self._make_subnet(fmt, network, gateway, prefix)
+ subnet = self._make_subnet(fmt, network, gateway, cidr)
yield subnet
self._delete('subnets', subnet['subnet']['id'])
else:
- subnet = self._make_subnet(fmt, network, gateway, prefix)
+ subnet = self._make_subnet(fmt, network, gateway, cidr)
yield subnet
self._delete('subnets', subnet['subnet']['id'])
+ @contextlib.contextmanager
+ def port(self, subnet=None, fmt='json'):
+ if not subnet:
+ with self.subnet() as subnet:
+ net_id = subnet['subnet']['network_id']
+ port = self._make_port(fmt, net_id)
+ yield port
+ self._delete('ports', port['port']['id'])
+
class TestV2HTTPResponse(QuantumDbPluginV2TestCase):
def test_create_returns_201(self):
self.assertEquals(res.status_int, 404)
-#class TestPortsV2(APIv2TestCase):
-# def setUp(self):
-# super(TestPortsV2, self).setUp()
-# res = self._create_network('json', 'net1', True)
-# data = self._deserializers['application/json'].\
-# deserialize(res.body)['body']
-# self.net_id = data['network']['id']
-#
-# def _create_port(self, fmt, net_id, admin_state_up, device_id,
-# custom_req_body=None,
-# expected_res_status=None):
-# content_type = 'application/' + fmt
-# data = {'port': {'network_id': net_id,
-# 'admin_state_up': admin_state_up,
-# 'device_id': device_id}}
-# port_req = self.new_create_request('ports', data, fmt)
-# port_res = port_req.get_response(self.api)
-# return json.loads(port_res.body)
-#
-# def test_create_port_json(self):
-# port = self._create_port('json', self.net_id, True, 'dev_id_1')
-# self.assertEqual(port['id'], 'dev_id_1')
-# self.assertEqual(port['admin_state_up'], 'DOWN')
-# self.assertEqual(port['device_id'], 'dev_id_1')
-# self.assertTrue('mac_address' in port)
-# self.assertTrue('op_status' in port)
-#
-# def test_list_ports(self):
-# port1 = self._create_port('json', self.net_id, True, 'dev_id_1')
-# port2 = self._create_port('json', self.net_id, True, 'dev_id_2')
-#
-# res = self.new_list_request('ports', 'json')
-# port_list = json.loads(res.body)['body']
-# self.assertTrue(port1 in port_list['ports'])
-# self.assertTrue(port2 in port_list['ports'])
-#
-# def test_show_port(self):
-# port = self._create_port('json', self.net_id, True, 'dev_id_1')
-# res = self.new_show_request('port', 'json', port['id'])
-# port = json.loads(res.body)['body']
-# self.assertEquals(port['port']['name'], 'dev_id_1')
-#
-# def test_delete_port(self):
-# port = self._create_port('json', self.net_id, True, 'dev_id_1')
-# self.new_delete_request('port', 'json', port['id'])
-#
-# port = self.new_show_request('port', 'json', port['id'])
-#
-# self.assertEquals(res.status_int, 404)
-#
-# def test_update_port(self):
-# port = self._create_port('json', self.net_id, True, 'dev_id_1')
-# port_body = {'port': {'device_id': 'Bob'}}
-# res = self.new_update_request('port', port_body, port['id'])
-# port = json.loads(res.body)['body']
-# self.assertEquals(port['device_id'], 'Bob')
-#
-# def test_delete_non_existent_port_404(self):
-# res = self.new_delete_request('port', 'json', 1)
-# self.assertEquals(res.status_int, 404)
-#
-# def test_show_non_existent_port_404(self):
-# res = self.new_show_request('port', 'json', 1)
-# self.assertEquals(res.status_int, 404)
-#
-# def test_update_non_existent_port_404(self):
-# res = self.new_update_request('port', 'json', 1)
-# self.assertEquals(res.status_int, 404)
+class TestPortsV2(QuantumDbPluginV2TestCase):
+
+ def test_create_port_json(self):
+ keys = [('admin_state_up', True), ('status', 'ACTIVE')]
+ with self.port() as port:
+ for k, v in keys:
+ self.assertEquals(port['port'][k], v)
+ self.assertTrue('mac_address' in port['port'])
+
+ def test_list_ports(self):
+ with contextlib.nested(self.port(), self.port()) as (port1, port2):
+ req = self.new_list_request('ports', 'json')
+ port_list = self.deserialize('json', req.get_response(self.api))
+ self.assertEqual(len(port_list['ports']), 2)
+ ids = [p['id'] for p in port_list['ports']]
+ self.assertTrue(port1['port']['id'] in ids)
+ self.assertTrue(port2['port']['id'] in ids)
+
+ def test_show_port(self):
+ with self.port() as port:
+ req = self.new_show_request('ports', port['port']['id'], 'json')
+ sport = self.deserialize('json', req.get_response(self.api))
+ self.assertEquals(port['port']['id'], sport['port']['id'])
+
+ def test_delete_port(self):
+ port_id = None
+ with self.port() as port:
+ port_id = port['port']['id']
+ req = self.new_show_request('port', 'json', port['port']['id'])
+ res = req.get_response(self.api)
+ self.assertEquals(res.status_int, 404)
+
+ def test_update_port(self):
+ with self.port() as port:
+ data = {'port': {'admin_state_up': False}}
+ req = self.new_update_request('ports', data, port['port']['id'])
+ res = self.deserialize('json', req.get_response(self.api))
+ self.assertEqual(res['port']['admin_state_up'],
+ data['port']['admin_state_up'])
class TestNetworksV2(QuantumDbPluginV2TestCase):
def test_create_network(self):
name = 'net1'
keys = [('subnets', []), ('name', name), ('admin_state_up', True),
- ('op_status', 'ACTIVE')]
+ ('status', 'ACTIVE')]
with self.network(name=name) as net:
for k, v in keys:
self.assertEquals(net['network'][k], v)
class TestSubnetsV2(QuantumDbPluginV2TestCase):
+
def test_create_subnet(self):
gateway = '10.0.0.1'
- prefix = '10.0.0.0/24'
+ cidr = '10.0.0.0/24'
keys = [('ip_version', 4), ('gateway_ip', gateway),
- ('prefix', prefix)]
- with self.subnet(gateway=gateway, prefix=prefix) as subnet:
+ ('cidr', cidr)]
+ with self.subnet(gateway=gateway, cidr=cidr) as subnet:
+ for k, v in keys:
+ self.assertEquals(subnet['subnet'][k], v)
+
+ def test_create_subnet_defaults(self):
+ generated_gateway = '10.0.0.1'
+ cidr = '10.0.0.0/24'
+ keys = [('ip_version', 4), ('gateway_ip', generated_gateway),
+ ('cidr', cidr)]
+ # intentionally not passing gateway in
+ with self.subnet(cidr=cidr) as subnet:
for k, v in keys:
self.assertEquals(subnet['subnet'][k], v)
def test_update_subnet(self):
with self.subnet() as subnet:
- data = {'subnet': {'network_id': 'blarg',
- 'prefix': '192.168.0.0/24'}}
+ data = {'subnet': {'gateway_ip': '11.0.0.1'}}
req = self.new_update_request('subnets', data,
subnet['subnet']['id'])
res = self.deserialize('json', req.get_response(self.api))
- self.assertEqual(res['subnet']['prefix'],
- data['subnet']['prefix'])
+ self.assertEqual(res['subnet']['gateway_ip'],
+ data['subnet']['gateway_ip'])
def test_show_subnet(self):
with self.network() as network:
# or just drop 2.6 support ;)
with self.network() as network:
with self.subnet(network=network, gateway='10.0.0.1',
- prefix='10.0.1.0/24') as subnet:
+ cidr='10.0.1.0/24') as subnet:
with self.subnet(network=network, gateway='10.0.1.1',
- prefix='10.0.1.0/24') as subnet2:
+ cidr='10.0.1.0/24') as subnet2:
req = self.new_list_request('subnets')
res = self.deserialize('json',
req.get_response(self.api))
res1 = res['subnets'][0]
res2 = res['subnets'][1]
- self.assertEquals(res1['prefix'],
- subnet['subnet']['prefix'])
- self.assertEquals(res2['prefix'],
- subnet2['subnet']['prefix'])
+ self.assertEquals(res1['cidr'],
+ subnet['subnet']['cidr'])
+ self.assertEquals(res2['cidr'],
+ subnet2['subnet']['cidr'])
Routes>=1.12.3
eventlet>=0.9.12
lxml
+netaddr
python-gflags==1.3
sqlalchemy>0.6.4
webob==1.2.0