return msg
+def _validate_mac_address_or_none(data, valid_values=None):
+ if data is None:
+ return
+ return _validate_mac_address(data, valid_values)
+
+
def _validate_ip_address(data, valid_values=None):
try:
netaddr.IPAddress(_validate_no_whitespace(data))
return msg
+def _validate_subnet_or_none(data, valid_values=None):
+ if data is None:
+ return
+ return _validate_subnet(data, valid_values)
+
+
def _validate_regex(data, valid_values=None):
try:
if re.match(valid_values, data):
return msg
+def _validate_regex_or_none(data, valid_values=None):
+ if data is None:
+ return
+ return _validate_regex(data, valid_values)
+
+
def _validate_uuid(data, valid_values=None):
if not uuidutils.is_uuid_like(data):
msg = _("'%s' is not a valid UUID") % data
'type:ip_address_or_none': _validate_ip_address_or_none,
'type:ip_pools': _validate_ip_pools,
'type:mac_address': _validate_mac_address,
+ 'type:mac_address_or_none': _validate_mac_address_or_none,
'type:nameservers': _validate_nameservers,
'type:non_negative': _validate_non_negative,
'type:range': _validate_range,
'type:regex': _validate_regex,
+ 'type:regex_or_none': _validate_regex_or_none,
'type:string': _validate_string,
'type:string_or_none': _validate_string_or_none,
'type:not_empty_string': _validate_not_empty_string,
_validate_not_empty_string_or_none,
'type:subnet': _validate_subnet,
'type:subnet_list': _validate_subnet_list,
+ 'type:subnet_or_none': _validate_subnet_or_none,
'type:uuid': _validate_uuid,
'type:uuid_or_none': _validate_uuid_or_none,
'type:uuid_list': _validate_uuid_list,
# under the License.
# @author: Ryota MIBU
+# TODO(amotoki): bug 1287432: Rename quantum_id column in ID mapping tables.
+
import sqlalchemy as sa
from neutron.db import api as db
"port_id: %s"), id)
+def get_active_ports_on_ofc(context, network_id, port_id=None):
+ """Retrieve ports on OFC on a given network.
+
+ It returns a list of tuple (neutron port_id, OFC id).
+ """
+ query = context.session.query(nmodels.OFCPortMapping)
+ query = query.join(models_v2.Port,
+ nmodels.OFCPortMapping.quantum_id == models_v2.Port.id)
+ query = query.filter(models_v2.Port.network_id == network_id)
+ if port_id:
+ query = query.filter(nmodels.OFCPortMapping.quantum_id == port_id)
+
+ return [(p['quantum_id'], p['ofc_id']) for p in query]
+
+
def get_port_from_device(port_id):
"""Get port from database."""
LOG.debug(_("get_port_with_securitygroups() called:port_id=%s"), port_id)
from sqlalchemy.orm import exc as sa_exc
from neutron.api.v2 import attributes
-from neutron.common import exceptions
from neutron.db import model_base
from neutron.db import models_v2
from neutron.openstack.common import uuidutils
+from neutron.plugins.nec.db import models as nmodels
+from neutron.plugins.nec.extensions import packetfilter as ext_pf
PF_STATUS_ACTIVE = 'ACTIVE'
PF_STATUS_DOWN = 'DOWN'
PF_STATUS_ERROR = 'ERROR'
-
-class PacketFilterNotFound(exceptions.NotFound):
- message = _("PacketFilter %(id)s could not be found")
+INT_FIELDS = ('eth_type', 'src_port', 'dst_port')
class PacketFilter(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
'action': pf_entry['action'],
'priority': pf_entry['priority'],
'in_port': pf_entry['in_port'],
- 'src_mac': pf_entry['src_mac'],
- 'dst_mac': pf_entry['dst_mac'],
- 'eth_type': pf_entry['eth_type'],
- 'src_cidr': pf_entry['src_cidr'],
- 'dst_cidr': pf_entry['dst_cidr'],
- 'protocol': pf_entry['protocol'],
- 'src_port': pf_entry['src_port'],
- 'dst_port': pf_entry['dst_port'],
+ # "or None" ensure the filed is None if empty
+ 'src_mac': pf_entry['src_mac'] or None,
+ 'dst_mac': pf_entry['dst_mac'] or None,
+ 'eth_type': pf_entry['eth_type'] or None,
+ 'src_cidr': pf_entry['src_cidr'] or None,
+ 'dst_cidr': pf_entry['dst_cidr'] or None,
+ 'protocol': pf_entry['protocol'] or None,
+ 'src_port': pf_entry['src_port'] or None,
+ 'dst_port': pf_entry['dst_port'] or None,
'admin_state_up': pf_entry['admin_state_up'],
'status': pf_entry['status']}
return self._fields(res, fields)
try:
pf_entry = self._get_by_id(context, PacketFilter, id)
except sa_exc.NoResultFound:
- raise PacketFilterNotFound(id=id)
+ raise ext_pf.PacketFilterNotFound(id=id)
return pf_entry
def get_packet_filter(self, context, id, fields=None):
filters=filters,
fields=fields)
+ def _replace_unspecified_field(self, params, key):
+ if not attributes.is_attr_set(params[key]):
+ if key == 'in_port':
+ params[key] = None
+ elif key in INT_FIELDS:
+ # Integer field
+ params[key] = 0
+ else:
+ params[key] = ''
+
+ def _get_eth_type_for_protocol(self, protocol):
+ if protocol.upper() in ("ICMP", "TCP", "UDP"):
+ return 0x800
+ elif protocol.upper() == "ARP":
+ return 0x806
+
+ def _set_eth_type_from_protocol(self, filter_dict):
+ if filter_dict.get('protocol'):
+ eth_type = self._get_eth_type_for_protocol(filter_dict['protocol'])
+ if eth_type:
+ filter_dict['eth_type'] = eth_type
+
+ def _check_eth_type_and_protocol(self, new_filter, current_filter):
+ if 'protocol' in new_filter or 'eth_type' not in new_filter:
+ return
+ eth_type = self._get_eth_type_for_protocol(current_filter['protocol'])
+ if not eth_type:
+ return
+ if eth_type != new_filter['eth_type']:
+ raise ext_pf.PacketFilterEtherTypeProtocolMismatch(
+ eth_type=hex(new_filter['eth_type']),
+ protocol=current_filter['protocol'])
+
def create_packet_filter(self, context, packet_filter):
pf_dict = packet_filter['packet_filter']
tenant_id = self._get_tenant_id_for_create(context, pf_dict)
'src_port': pf_dict['src_port'],
'dst_port': pf_dict['dst_port'],
'protocol': pf_dict['protocol']}
- for key, default in params.items():
- if params[key] == attributes.ATTR_NOT_SPECIFIED:
- if key == 'in_port':
- params[key] = None
- else:
- params[key] = ''
+ for key in params:
+ self._replace_unspecified_field(params, key)
+ self._set_eth_type_from_protocol(params)
with context.session.begin(subtransactions=True):
pf_entry = PacketFilter(**params)
return self._make_packet_filter_dict(pf_entry)
def update_packet_filter(self, context, id, packet_filter):
- pf = packet_filter['packet_filter']
+ params = packet_filter['packet_filter']
+ for key in params:
+ self._replace_unspecified_field(params, key)
+ self._set_eth_type_from_protocol(params)
with context.session.begin(subtransactions=True):
pf_entry = self._get_packet_filter(context, id)
- pf_entry.update(pf)
+ self._check_eth_type_and_protocol(params, pf_entry)
+ pf_entry.update(params)
return self._make_packet_filter_dict(pf_entry)
def delete_packet_filter(self, context, id):
with context.session.begin(subtransactions=True):
pf_entry = self._get_packet_filter(context, id)
context.session.delete(pf_entry)
+
+ def get_packet_filters_for_port(self, context, port):
+ """Retrieve packet filters on OFC on a given port.
+
+ It returns a list of tuple (neutron filter_id, OFC id).
+ """
+ # TODO(amotoki): bug 1287432
+ # Rename quantum_id column in ID mapping tables.
+ query = (context.session.query(nmodels.OFCFilterMapping)
+ .join(PacketFilter,
+ nmodels.OFCFilterMapping.quantum_id == PacketFilter.id)
+ .filter(PacketFilter.admin_state_up == True))
+
+ network_id = port['network_id']
+ net_pf_query = (query.filter(PacketFilter.network_id == network_id)
+ .filter(PacketFilter.in_port == None))
+ net_filters = [(pf['quantum_id'], pf['ofc_id']) for pf in net_pf_query]
+
+ port_pf_query = query.filter(PacketFilter.in_port == port['id'])
+ port_filters = [(pf['quantum_id'], pf['ofc_id'])
+ for pf in port_pf_query]
+
+ return net_filters + port_filters
'trema_port': DRIVER_PATH % "trema.TremaPortBaseDriver",
'trema_portmac': DRIVER_PATH % "trema.TremaPortMACBaseDriver",
'trema_mac': DRIVER_PATH % "trema.TremaMACBaseDriver",
- 'pfc': DRIVER_PATH % "pfc.PFCV4Driver",
+ 'pfc': DRIVER_PATH % "pfc.PFCV51Driver",
'pfc_v3': DRIVER_PATH % "pfc.PFCV3Driver",
'pfc_v4': DRIVER_PATH % "pfc.PFCV4Driver",
'pfc_v5': DRIVER_PATH % "pfc.PFCV5Driver",
+ 'pfc_v51': DRIVER_PATH % "pfc.PFCV51Driver",
}
import re
import uuid
+import netaddr
+
+from neutron.api.v2 import attributes
+from neutron.common import constants
+from neutron.common import exceptions as qexc
+from neutron.common import log as call_log
+from neutron import manager
from neutron.plugins.nec.common import ofc_client
from neutron.plugins.nec.db import api as ndb
+from neutron.plugins.nec.extensions import packetfilter as ext_pf
from neutron.plugins.nec import ofc_driver_base
+class InvalidOFCIdFormat(qexc.NeutronException):
+ message = _("OFC %(resource)s ID has an invalid format: %(ofc_id)s")
+
+
class PFCDriverBase(ofc_driver_base.OFCDriverBase):
"""Base Class for PDC Drivers.
router_supported = False
+ match_ofc_network_id = re.compile(
+ "^/tenants/(?P<tenant_id>[^/]+)/networks/(?P<network_id>[^/]+)$")
+ match_ofc_port_id = re.compile(
+ "^/tenants/(?P<tenant_id>[^/]+)/networks/(?P<network_id>[^/]+)"
+ "/ports/(?P<port_id>[^/]+)$")
+
def __init__(self, conf_ofc):
self.client = ofc_client.OFCClient(host=conf_ofc.host,
port=conf_ofc.port,
return self._generate_pfc_str(desc)[:127]
def _extract_ofc_network_id(self, ofc_network_id):
- # ofc_network_id : /tenants/<tenant-id>/networks/<network-id>
- return ofc_network_id.split('/')[4]
+ match = self.match_ofc_network_id.match(ofc_network_id)
+ if match:
+ return match.group('network_id')
+ raise InvalidOFCIdFormat(resource='network', ofc_id=ofc_network_id)
+
+ def _extract_ofc_port_id(self, ofc_port_id):
+ match = self.match_ofc_port_id.match(ofc_port_id)
+ if match:
+ return {'tenant': match.group('tenant_id'),
+ 'network': match.group('network_id'),
+ 'port': match.group('port_id')}
+ raise InvalidOFCIdFormat(resource='port', ofc_id=ofc_port_id)
def create_tenant(self, description, tenant_id=None):
ofc_tenant_id = self._generate_pfc_id(tenant_id)
return self.client.delete(ofc_network_id)
def create_port(self, ofc_network_id, portinfo,
- port_id=None):
+ port_id=None, filters=None):
path = "%s/ports" % ofc_network_id
body = {'datapath_id': portinfo.datapath_id,
'port': str(portinfo.port_no),
'vid': str(portinfo.vlan_id)}
+ if self.filter_supported() and filters:
+ body['filters'] = [self._extract_ofc_filter_id(pf[1])
+ for pf in filters]
res = self.client.post(path, body=body)
ofc_port_id = res['id']
return path + '/' + ofc_port_id
return '%(network)s/ports/%(port)s' % params
+class PFCFilterDriverMixin(object):
+ """PFC PacketFilter Driver Mixin."""
+ filters_path = "/filters"
+ filter_path = "/filters/%s"
+
+ # PFC specific constants
+ MIN_PRIORITY = 1
+ MAX_PRIORITY = 32766
+ CREATE_ONLY_FIELDS = ['action', 'priority']
+ PFC_ALLOW_ACTION = "pass"
+ PFC_DROP_ACTION = "drop"
+
+ match_ofc_filter_id = re.compile("^/filters/(?P<filter_id>[^/]+)$")
+
+ @classmethod
+ def filter_supported(cls):
+ return True
+
+ def _set_param(self, filter_dict, body, key, create, convert_to=None):
+ if key in filter_dict:
+ if filter_dict[key]:
+ if convert_to:
+ body[key] = convert_to(filter_dict[key])
+ else:
+ body[key] = filter_dict[key]
+ elif not create:
+ body[key] = ""
+
+ def _generate_body(self, filter_dict, apply_ports=None, create=True):
+ body = {}
+
+ if create:
+ # action : pass, drop (mandatory)
+ if filter_dict['action'].lower() in ext_pf.ALLOW_ACTIONS:
+ body['action'] = self.PFC_ALLOW_ACTION
+ else:
+ body['action'] = self.PFC_DROP_ACTION
+ # priority : mandatory
+ body['priority'] = filter_dict['priority']
+
+ for key in ['src_mac', 'dst_mac', 'src_port', 'dst_port']:
+ self._set_param(filter_dict, body, key, create)
+
+ for key in ['src_cidr', 'dst_cidr']:
+ # CIDR must contain netmask even if it is an address.
+ convert_to = lambda x: str(netaddr.IPNetwork(x))
+ self._set_param(filter_dict, body, key, create, convert_to)
+
+ # protocol : decimal (0-255)
+ if 'protocol' in filter_dict:
+ if (not filter_dict['protocol'] or
+ # In the case of ARP, ip_proto should be set to wildcard.
+ # eth_type is set during adding an entry to DB layer.
+ filter_dict['protocol'].lower() == ext_pf.PROTO_NAME_ARP):
+ if not create:
+ body['protocol'] = ""
+ elif filter_dict['protocol'].lower() == constants.PROTO_NAME_ICMP:
+ body['protocol'] = constants.PROTO_NUM_ICMP
+ elif filter_dict['protocol'].lower() == constants.PROTO_NAME_TCP:
+ body['protocol'] = constants.PROTO_NUM_TCP
+ elif filter_dict['protocol'].lower() == constants.PROTO_NAME_UDP:
+ body['protocol'] = constants.PROTO_NUM_UDP
+ else:
+ body['protocol'] = int(filter_dict['protocol'], 0)
+
+ # eth_type : hex (0x0-0xFFFF)
+ self._set_param(filter_dict, body, 'eth_type', create, hex)
+
+ # apply_ports
+ if apply_ports:
+ # each element of apply_ports is a tuple of (neutron_id, ofc_id),
+ body['apply_ports'] = []
+ for p in apply_ports:
+ try:
+ body['apply_ports'].append(self._extract_ofc_port_id(p[1]))
+ except InvalidOFCIdFormat:
+ pass
+
+ return body
+
+ def _validate_filter_common(self, filter_dict):
+ # Currently PFC support only IPv4 CIDR.
+ for field in ['src_cidr', 'dst_cidr']:
+ if (not filter_dict.get(field) or
+ filter_dict[field] == attributes.ATTR_NOT_SPECIFIED):
+ continue
+ net = netaddr.IPNetwork(filter_dict[field])
+ if net.version != 4:
+ raise ext_pf.PacketFilterIpVersionNonSupported(
+ version=net.version, field=field, value=filter_dict[field])
+ if ('priority' in filter_dict and
+ not (self.MIN_PRIORITY <= filter_dict['priority']
+ <= self.MAX_PRIORITY)):
+ raise ext_pf.PacketFilterInvalidPriority(
+ min=self.MIN_PRIORITY, max=self.MAX_PRIORITY)
+
+ def _validate_duplicate_priority(self, context, filter_dict):
+ plugin = manager.NeutronManager.get_plugin()
+ filters = {'network_id': [filter_dict['network_id']],
+ 'priority': [filter_dict['priority']]}
+ ret = plugin.get_packet_filters(context, filters=filters,
+ fields=['id'])
+ if ret:
+ raise ext_pf.PacketFilterDuplicatedPriority(
+ priority=filter_dict['priority'])
+
+ def validate_filter_create(self, context, filter_dict):
+ self._validate_filter_common(filter_dict)
+ self._validate_duplicate_priority(context, filter_dict)
+
+ def validate_filter_update(self, context, filter_dict):
+ for field in self.CREATE_ONLY_FIELDS:
+ if field in filter_dict:
+ raise ext_pf.PacketFilterUpdateNotSupported(field=field)
+ self._validate_filter_common(filter_dict)
+
+ @call_log.log
+ def create_filter(self, ofc_network_id, filter_dict,
+ portinfo=None, filter_id=None, apply_ports=None):
+ body = self._generate_body(filter_dict, apply_ports, create=True)
+ res = self.client.post(self.filters_path, body=body)
+ # filter_id passed from a caller is not used.
+ # ofc_filter_id is generated by PFC because the prefix of
+ # filter_id has special meaning and it is internally used.
+ ofc_filter_id = res['id']
+ return self.filter_path % ofc_filter_id
+
+ @call_log.log
+ def update_filter(self, ofc_filter_id, filter_dict):
+ body = self._generate_body(filter_dict, create=False)
+ self.client.put(ofc_filter_id, body)
+
+ @call_log.log
+ def delete_filter(self, ofc_filter_id):
+ return self.client.delete(ofc_filter_id)
+
+ def _extract_ofc_filter_id(self, ofc_filter_id):
+ match = self.match_ofc_filter_id.match(ofc_filter_id)
+ if match:
+ return match.group('filter_id')
+ raise InvalidOFCIdFormat(resource='filter', ofc_id=ofc_filter_id)
+
+ def convert_ofc_filter_id(self, context, ofc_filter_id):
+ # PFC Packet Filter is supported after the format of mapping tables
+ # are changed, so it is enough just to return ofc_filter_id
+ return ofc_filter_id
+
+
class PFCRouterDriverMixin(object):
router_supported = True
class PFCV5Driver(PFCRouterDriverMixin, PFCDriverBase):
pass
+
+
+class PFCV51Driver(PFCFilterDriverMixin, PFCV5Driver):
+ pass
return True
def create_filter(self, ofc_network_id, filter_dict,
- portinfo=None, filter_id=None):
+ portinfo=None, filter_id=None, apply_ports=None):
if filter_dict['action'].upper() in ["ACCEPT", "ALLOW"]:
ofc_action = "ALLOW"
elif filter_dict['action'].upper() in ["DROP", "DENY"]:
ofp_wildcards.append("nw_dst:32")
if filter_dict['protocol']:
- if filter_dict['protocol'].upper() in "ICMP":
+ if filter_dict['protocol'].upper() == "ICMP":
body['dl_type'] = "0x800"
body['nw_proto'] = hex(1)
- elif filter_dict['protocol'].upper() in "TCP":
+ elif filter_dict['protocol'].upper() == "TCP":
body['dl_type'] = "0x800"
body['nw_proto'] = hex(6)
- elif filter_dict['protocol'].upper() in "UDP":
+ elif filter_dict['protocol'].upper() == "UDP":
body['dl_type'] = "0x800"
body['nw_proto'] = hex(17)
- elif filter_dict['protocol'].upper() in "ARP":
+ elif filter_dict['protocol'].upper() == "ARP":
body['dl_type'] = "0x806"
ofp_wildcards.append("nw_proto")
else:
body['nw_proto'] = filter_dict['protocol']
- if filter_dict['eth_type']:
- body['dl_type'] = filter_dict['eth_type']
- else:
- ofp_wildcards.append("dl_type")
else:
- ofp_wildcards.append("dl_type")
ofp_wildcards.append("nw_proto")
+ if 'dl_type' in body:
+ pass
+ elif filter_dict['eth_type']:
+ body['dl_type'] = filter_dict['eth_type']
+ else:
+ ofp_wildcards.append("dl_type")
+
if filter_dict['src_port']:
body['tp_src'] = hex(filter_dict['src_port'])
else:
port_path = "%(network)s/ports/%(port)s"
def create_port(self, ofc_network_id, portinfo,
- port_id=None):
+ port_id=None, filters=None):
ofc_port_id = port_id or uuidutils.generate_uuid()
path = self.ports_path % {'network': ofc_network_id}
body = {'id': ofc_port_id,
attachments_path = "%(network)s/ports/%(port)s/attachments"
attachment_path = "%(network)s/ports/%(port)s/attachments/%(attachment)s"
- def create_port(self, ofc_network_id, portinfo, port_id=None):
+ def create_port(self, ofc_network_id, portinfo, port_id=None,
+ filters=None):
#NOTE: This Driver create slices with Port-MAC Based bindings on Trema
# Sliceable. It's REST API requires Port Based binding before you
# define Port-MAC Based binding.
def filter_supported(cls):
return False
- def create_port(self, ofc_network_id, portinfo, port_id=None):
+ def create_port(self, ofc_network_id, portinfo, port_id=None,
+ filters=None):
ofc_port_id = port_id or uuidutils.generate_uuid()
path = self.attachments_path % {'network': ofc_network_id}
body = {'id': ofc_port_id, 'mac': portinfo.mac}
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron.api.v2 import base
+from neutron.common import constants
from neutron.common import exceptions
from neutron.manager import NeutronManager
from neutron import quota
cfg.CONF.register_opts(quota_packet_filter_opts, 'QUOTAS')
-def convert_to_int(data):
+class PacketFilterNotFound(exceptions.NotFound):
+ message = _("PacketFilter %(id)s could not be found")
+
+
+class PacketFilterIpVersionNonSupported(exceptions.BadRequest):
+ message = _("IP version %(version)s is not supported for %(field)s "
+ "(%(value)s is specified)")
+
+
+class PacketFilterInvalidPriority(exceptions.BadRequest):
+ message = _("Packet Filter priority should be %(min)s-%(max)s (included)")
+
+
+class PacketFilterUpdateNotSupported(exceptions.BadRequest):
+ message = _("%(field)s field cannot be updated")
+
+
+class PacketFilterDuplicatedPriority(exceptions.BadRequest):
+ message = _("The backend does not support duplicated priority. "
+ "Priority %(priority)s is in use")
+
+
+class PacketFilterEtherTypeProtocolMismatch(exceptions.Conflict):
+ message = _("Ether Type '%(eth_type)s' conflicts with protocol "
+ "'%(protocol)s'. Update or clear protocol before "
+ "changing ether type.")
+
+
+def convert_to_int_dec_and_hex(data):
try:
return int(data, 0)
except (ValueError, TypeError):
raise exceptions.InvalidInput(error_message=msg)
+def convert_to_int_or_none(data):
+ if data is None:
+ return
+ return convert_to_int_dec_and_hex(data)
+
+
+PROTO_NAME_ARP = 'arp'
+SUPPORTED_PROTOCOLS = [constants.PROTO_NAME_ICMP,
+ constants.PROTO_NAME_TCP,
+ constants.PROTO_NAME_UDP,
+ PROTO_NAME_ARP]
+ALLOW_ACTIONS = ['allow', 'accept']
+DROP_ACTIONS = ['drop', 'deny']
+SUPPORTED_ACTIONS = ALLOW_ACTIONS + DROP_ACTIONS
+
ALIAS = 'packet-filter'
RESOURCE = 'packet_filter'
COLLECTION = 'packet_filters'
-PACKET_FILTER_ACTION_REGEX = '(?i)^(allow|accept|drop|deny)$'
-PACKET_FILTER_PROTOCOL_REGEX = (
- '(?i)^(icmp|tcp|udp|arp|0x[0-9a-fA-F]+|[0-9]+|)$')
+PACKET_FILTER_ACTION_REGEX = '(?i)^(%s)$' % '|'.join(SUPPORTED_ACTIONS)
+PACKET_FILTER_PROTOCOL_REGEX = ('(?i)^(%s|0x[0-9a-fA-F]+|[0-9]+|)$' %
+ '|'.join(SUPPORTED_PROTOCOLS))
PACKET_FILTER_ATTR_PARAMS = {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'validate': {'type:regex': PACKET_FILTER_ACTION_REGEX},
'is_visible': True},
'priority': {'allow_post': True, 'allow_put': True,
- 'convert_to': convert_to_int,
+ 'convert_to': convert_to_int_dec_and_hex,
'is_visible': True},
'in_port': {'allow_post': True, 'allow_put': False,
'default': attributes.ATTR_NOT_SPECIFIED,
'is_visible': True},
'src_mac': {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
- 'validate': {'type:mac_address': None},
+ 'validate': {'type:mac_address_or_none': None},
'is_visible': True},
'dst_mac': {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
- 'validate': {'type:mac_address': None},
+ 'validate': {'type:mac_address_or_none': None},
'is_visible': True},
'eth_type': {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
- 'convert_to': convert_to_int,
+ 'convert_to': convert_to_int_or_none,
'is_visible': True},
'src_cidr': {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
- 'validate': {'type:subnet': None},
+ 'validate': {'type:subnet_or_none': None},
'is_visible': True},
'dst_cidr': {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
- 'validate': {'type:subnet': None},
+ 'validate': {'type:subnet_or_none': None},
'is_visible': True},
'protocol': {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
- 'validate': {'type:regex': PACKET_FILTER_PROTOCOL_REGEX},
+ 'validate': {'type:regex_or_none':
+ PACKET_FILTER_PROTOCOL_REGEX},
'is_visible': True},
'src_port': {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
- 'convert_to': convert_to_int,
+ 'convert_to': convert_to_int_or_none,
'is_visible': True},
'dst_port': {'allow_post': True, 'allow_put': True,
'default': attributes.ATTR_NOT_SPECIFIED,
- 'convert_to': convert_to_int,
+ 'convert_to': convert_to_int_or_none,
'is_visible': True},
}
PACKET_FILTER_ATTR_MAP = {COLLECTION: PACKET_FILTER_ATTR_PARAMS}
def __init__(self):
super(NECPluginV2, self).__init__()
- self.ofc = ofc_manager.OFCManager()
+ self.ofc = ofc_manager.OFCManager(self)
self.base_binding_dict = self._get_base_binding_dict()
portbindings_base.register_port_dict_function()
@abstractmethod
def create_port(self, ofc_network_id, portinfo,
- port_id=None):
+ port_id=None, filters=None):
"""Create a new port on specified network at OFC.
:param ofc_network_id: a OFC tenant ID in which a new port belongs.
If a port is identified in combination with a network or
a tenant, such information should be included in the ID.
+ :param filters: A list of packet filter associated with the port.
+ Each element is a tuple (neutron ID, OFC ID)
:returns: ID of the port created at OpenFlow Controller.
:raises: neutron.plugin.nec.common.exceptions.OFCException
of the switch. An ID named as 'ofc_*' is used to identify resource on OFC.
"""
- def __init__(self):
+ def __init__(self, plugin):
self.driver = drivers.get_driver(config.OFC.driver)(config.OFC)
+ self.plugin = plugin
def _get_ofc_id(self, context, resource, neutron_id):
return ndb.get_ofc_id_lookup_both(context.session,
if not portinfo:
raise nexc.PortInfoNotFound(id=port_id)
- ofc_port_id = self.driver.create_port(ofc_net_id, portinfo, port_id)
+ # Associate packet filters
+ filters = self.plugin.get_packet_filters_for_port(context, port)
+ if filters is not None:
+ params = {'filters': filters}
+ else:
+ params = {}
+
+ ofc_port_id = self.driver.create_port(ofc_net_id, portinfo, port_id,
+ **params)
self._add_ofc_item(context, "ofc_port", port_id, ofc_port_id)
def exists_ofc_port(self, context, port_id):
if not portinfo:
raise nexc.PortInfoNotFound(id=in_port_id)
+ # Collect ports to be associated with the filter
+ apply_ports = ndb.get_active_ports_on_ofc(
+ context, filter_dict['network_id'], in_port_id)
ofc_pf_id = self.driver.create_filter(ofc_net_id,
- filter_dict, portinfo, filter_id)
+ filter_dict, portinfo, filter_id,
+ apply_ports)
self._add_ofc_item(context, "ofc_packet_filter", filter_id, ofc_pf_id)
+ def update_ofc_packet_filter(self, context, filter_id, filter_dict):
+ ofc_pf_id = self._get_ofc_id(context, "ofc_packet_filter", filter_id)
+ ofc_pf_id = self.driver.convert_ofc_filter_id(context, ofc_pf_id)
+ self.driver.update_filter(ofc_pf_id, filter_dict)
+
def exists_ofc_packet_filter(self, context, filter_id):
return self._exists_ofc_item(context, "ofc_packet_filter", filter_id)
# under the License.
# @author: Ryota MIBU
+from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.plugins.nec.common import config
from neutron.plugins.nec.common import exceptions as nexc
LOG.debug(_("create_packet_filter() called, packet_filter=%s ."),
packet_filter)
+ if hasattr(self.ofc.driver, 'validate_filter_create'):
+ pf = packet_filter['packet_filter']
+ self.ofc.driver.validate_filter_create(context, pf)
pf = super(PacketFilterMixin, self).create_packet_filter(
context, packet_filter)
"id=%(id)s packet_filter=%(packet_filter)s ."),
{'id': id, 'packet_filter': packet_filter})
+ pf_data = packet_filter['packet_filter']
+ if hasattr(self.ofc.driver, 'validate_filter_update'):
+ self.ofc.driver.validate_filter_update(context, pf_data)
+
# validate ownership
pf_old = self.get_packet_filter(context, id)
context, id, packet_filter)
def _packet_filter_changed(old_pf, new_pf):
+ LOG.debug('old_pf=%(old_pf)s, new_pf=%(new_pf)s',
+ {'old_pf': old_pf, 'new_pf': new_pf})
+ # When the status is ERROR, force sync to OFC.
+ if old_pf['status'] == pf_db.PF_STATUS_ERROR:
+ LOG.debug('update_packet_filter: Force filter update '
+ 'because the previous status is ERROR.')
+ return True
for key in new_pf:
- if key not in ('id', 'name', 'tenant_id', 'network_id',
- 'in_port', 'status'):
- if old_pf[key] != new_pf[key]:
- return True
+ if key in ('id', 'name', 'tenant_id', 'network_id',
+ 'in_port', 'status'):
+ continue
+ if old_pf[key] != new_pf[key]:
+ return True
return False
if _packet_filter_changed(pf_old, pf):
- pf = self.deactivate_packet_filter(context, pf)
- pf = self.activate_packet_filter_if_ready(context, pf)
+ if hasattr(self.ofc.driver, 'update_filter'):
+ # admin_state is changed
+ if pf_old['admin_state_up'] != pf['admin_state_up']:
+ LOG.debug('update_packet_filter: admin_state '
+ 'is changed to %s', pf['admin_state_up'])
+ if pf['admin_state_up']:
+ self.activate_packet_filter_if_ready(context, pf)
+ else:
+ self.deactivate_packet_filter(context, pf)
+ elif pf['admin_state_up']:
+ LOG.debug('update_packet_filter: admin_state is '
+ 'unchanged (True)')
+ if self.ofc.exists_ofc_packet_filter(context, id):
+ pf = self._update_packet_filter(context, pf, pf_data)
+ else:
+ pf = self.activate_packet_filter_if_ready(context, pf)
+ else:
+ LOG.debug('update_packet_filter: admin_state is unchanged '
+ '(False). No need to update OFC filter.')
+ else:
+ pf = self.deactivate_packet_filter(context, pf)
+ pf = self.activate_packet_filter_if_ready(context, pf)
return pf
+ def _update_packet_filter(self, context, new_pf, pf_data):
+ pf_id = new_pf['id']
+ prev_status = new_pf['status']
+ try:
+ # If previous status is ERROR, try to sync all attributes.
+ pf = new_pf if prev_status == pf_db.PF_STATUS_ERROR else pf_data
+ self.ofc.update_ofc_packet_filter(context, pf_id, pf)
+ new_status = pf_db.PF_STATUS_ACTIVE
+ if new_status != prev_status:
+ self._update_resource_status(context, "packet_filter",
+ pf_id, new_status)
+ new_pf['status'] = new_status
+ return new_pf
+ except Exception as exc:
+ with excutils.save_and_reraise_exception():
+ if (isinstance(exc, nexc.OFCException) or
+ isinstance(exc, nexc.OFCConsistencyBroken)):
+ LOG.error(_("Failed to create packet_filter id=%(id)s on "
+ "OFC: %(exc)s"),
+ {'id': pf_id, 'exc': exc})
+ new_status = pf_db.PF_STATUS_ERROR
+ if new_status != prev_status:
+ self._update_resource_status(context, "packet_filter",
+ pf_id, new_status)
+
def delete_packet_filter(self, context, id):
"""Deactivate and delete packet_filter."""
LOG.debug(_("delete_packet_filter() called, id=%s ."), id)
pf_status = pf_db.PF_STATUS_ACTIVE
except (nexc.OFCException, nexc.OFCMappingNotFound) as exc:
LOG.error(_("Failed to create packet_filter id=%(id)s on "
- "OFC: %(exc)s"), {'id': pf_id, 'exc': str(exc)})
+ "OFC: %(exc)s"), {'id': pf_id, 'exc': exc})
pf_status = pf_db.PF_STATUS_ERROR
if pf_status != current:
pf_status = pf_db.PF_STATUS_DOWN
except (nexc.OFCException, nexc.OFCMappingNotFound) as exc:
LOG.error(_("Failed to delete packet_filter id=%(id)s from "
- "OFC: %(exc)s"), {'id': pf_id, 'exc': str(exc)})
+ "OFC: %(exc)s"), {'id': pf_id, 'exc': exc})
pf_status = pf_db.PF_STATUS_ERROR
else:
LOG.debug(_("deactivate_packet_filter(): skip, "
pfs = self.get_packet_filters(context, filters=filters)
for pf in pfs:
self.deactivate_packet_filter(context, pf)
+
+ def get_packet_filters_for_port(self, context, port):
+ if self.packet_filter_enabled:
+ return super(PacketFilterMixin,
+ self).get_packet_filters_for_port(context, port)
LOG.debug(_('delete_network: SUCCEED'))
@call_log.log
- def create_port(self, ofc_network_id, info, port_id=None):
+ def create_port(self, ofc_network_id, info, port_id=None, filters=None):
ofc_id = "ofc-" + port_id[:-4]
if self.autocheck:
if ofc_network_id not in self.ofc_network_dict:
% ofc_id)
self.ofc_port_dict[ofc_id] = {'network_id': ofc_network_id,
'port_id': port_id}
+ if filters:
+ self.ofc_port_dict[ofc_id]['filters'] = filters
return ofc_id
@call_log.log
return True
def create_filter(self, ofc_network_id, filter_dict,
- portinfo=None, filter_id=None):
+ portinfo=None, filter_id=None, apply_ports=None):
return "ofc-" + filter_id[:-4]
def delete_filter(self, ofc_filter_id):
driver = "neutron.tests.unit.nec.stub_ofc_driver.StubOFCDriver"
config.CONF.set_override('driver', driver, 'OFC')
self.addCleanup(ndb.clear_db)
- self.ofc = ofc_manager.OFCManager()
+ self.plugin = mock.Mock()
+ self.plugin.get_packet_filters_for_port.return_value = None
+ self.ofc = ofc_manager.OFCManager(self.plugin)
# NOTE: enable_autocheck() is a feature of StubOFCDriver
self.ofc.driver.enable_autocheck()
self.ctx = context.get_admin_context()
get_portinfo.return_value = fake_portinfo
return get_portinfo
- def testg_create_ofc_port(self):
- """test create ofc_port."""
+ def _test_create_ofc_port(self, with_filter=False):
t, n, p, f, none = self.get_random_params()
self.ofc.create_ofc_tenant(self.ctx, t)
self.ofc.create_ofc_network(self.ctx, t, n)
self.assertFalse(ndb.get_ofc_item(self.ctx.session, 'ofc_port', p))
get_portinfo = self._mock_get_portinfo(p)
port = {'tenant_id': t, 'network_id': n}
+ if with_filter:
+ _filters = ['filter1', 'filter2']
+ self.plugin.get_packet_filters_for_port.return_value = _filters
self.ofc.create_ofc_port(self.ctx, p, port)
self.assertTrue(ndb.get_ofc_item(self.ctx.session, 'ofc_port', p))
port = ndb.get_ofc_item(self.ctx.session, 'ofc_port', p)
self.assertEqual(port.ofc_id, "ofc-" + p[:-4])
get_portinfo.assert_called_once_with(mock.ANY, p)
+ portval = self.ofc.driver.ofc_port_dict[port.ofc_id]
+ if with_filter:
+ self.assertEqual(_filters, portval['filters'])
+ else:
+ self.assertFalse('filters' in portval)
+
+ def testg_create_ofc_port(self):
+ """test create ofc_port."""
+ self._test_create_ofc_port(with_filter=False)
+
+ def testg_create_ofc_port_with_filters(self):
+ """test create ofc_port."""
+ self._test_create_ofc_port(with_filter=True)
def testh_exists_ofc_port(self):
"""test exists_ofc_port."""
from neutron.api.v2 import attributes
from neutron import context
from neutron.plugins.nec.common import exceptions as nexc
-from neutron.plugins.nec.extensions import packetfilter
+from neutron.plugins.nec.extensions import packetfilter as ext_pf
from neutron.tests.unit.nec import test_nec_plugin
from neutron.tests.unit import test_db_plugin as test_plugin
"""
-class PacketfilterExtensionManager(packetfilter.Packetfilter):
+class PacketfilterExtensionManager(ext_pf.Packetfilter):
@classmethod
def get_resources(cls):
# initialize the main API router which extends
# the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
- {'packet_filters': packetfilter.PACKET_FILTER_ATTR_MAP})
+ {'packet_filters': ext_pf.PACKET_FILTER_ATTR_MAP})
return super(PacketfilterExtensionManager, cls).get_resources()
-class TestNecPluginPacketFilter(test_nec_plugin.NecPluginV2TestCase):
+class TestNecPluginPacketFilterBase(test_nec_plugin.NecPluginV2TestCase):
_nec_ini = NEC_PLUGIN_PF_INI
def setUp(self):
ext_mgr = PacketfilterExtensionManager()
- super(TestNecPluginPacketFilter, self).setUp(ext_mgr=ext_mgr)
+ super(TestNecPluginPacketFilterBase, self).setUp(ext_mgr=ext_mgr)
def _create_packet_filter(self, fmt, net_id, expected_res_status=None,
arg_list=None, **kwargs):
if do_delete:
self._delete('packet_filters', pf['packet_filter']['id'])
+
+class TestNecPluginPacketFilter(TestNecPluginPacketFilterBase):
+
+ def setUp(self):
+ super(TestNecPluginPacketFilter, self).setUp()
+ # Remove attributes explicitly from mock object to check
+ # a case where there are no update_filter and validate_*.
+ del self.ofc.driver.update_filter
+ del self.ofc.driver.validate_filter_create
+ del self.ofc.driver.validate_filter_update
+
def test_list_packet_filters(self):
self._list('packet_filters')
self.assertEqual(self.ofc.create_ofc_packet_filter.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_packet_filter.call_count, 1)
+ def _test_create_pf_with_protocol(self, protocol, expected_eth_type):
+ with self.packet_filter_on_network(protocol=protocol) as pf:
+ pf_data = pf['packet_filter']
+ self.assertEqual(protocol, pf_data['protocol'])
+ self.assertEqual(expected_eth_type, pf_data['eth_type'])
+
+ def test_create_pf_with_protocol_tcp(self):
+ self._test_create_pf_with_protocol('TCP', 0x800)
+
+ def test_create_pf_with_protocol_udp(self):
+ self._test_create_pf_with_protocol('UDP', 0x800)
+
+ def test_create_pf_with_protocol_icmp(self):
+ self._test_create_pf_with_protocol('ICMP', 0x800)
+
+ def test_create_pf_with_protocol_arp(self):
+ self._test_create_pf_with_protocol('ARP', 0x806)
+
+ def test_create_pf_with_inconsistent_protocol_and_eth_type(self):
+ with self.packet_filter_on_network(protocol='TCP') as pf:
+ pf_data = pf['packet_filter']
+ pf_id = pf_data['id']
+ self.assertEqual('TCP', pf_data['protocol'])
+ self.assertEqual(0x800, pf_data['eth_type'])
+ data = {'packet_filter': {'eth_type': 0x806}}
+ self._update('packet_filters', pf_id, data,
+ expected_code=409)
+
def test_create_pf_with_invalid_priority(self):
with self.network() as net:
net_id = net['network']['id']
self._create_packet_filter(self.fmt, net_id,
webob.exc.HTTPBadRequest.code,
**kwargs)
-
self.assertFalse(self.ofc.create_ofc_packet_filter.called)
def test_create_pf_with_ofc_creation_failure(self):
self.ofc.set_raise_exc('create_ofc_packet_filter', None)
- # Retry deactivate packet_filter.
- data = {'packet_filter': {'priority': 1000}}
+ # Retry activate packet_filter (even if there is no change).
+ data = {'packet_filter': {}}
self._update('packet_filters', pf_id, data)
pf_ref = self._show('packet_filters', pf_id)
# convert string to int.
kwargs.update({'priority': 102, 'eth_type': 2048,
- 'src_port': 35001, 'dst_port': 22})
+ 'src_port': 35001, 'dst_port': 22,
+ 'in_port': None})
+
+ self.assertEqual(pf_id, pf_ref['packet_filter']['id'])
+ for key in kwargs:
+ self.assertEqual(kwargs[key], pf_ref['packet_filter'][key])
+
+ def test_show_pf_on_network_with_wildcards(self):
+ kwargs = {
+ 'name': 'test-pf-net',
+ 'admin_state_up': False,
+ 'action': 'DENY',
+ 'priority': '102',
+ }
+
+ with self.packet_filter_on_network(**kwargs) as pf:
+ pf_id = pf['packet_filter']['id']
+ pf_ref = self._show('packet_filters', pf_id)
+
+ # convert string to int.
+ kwargs.update({'priority': 102,
+ 'in_port': None,
+ 'src_mac': None,
+ 'dst_mac': None,
+ 'eth_type': None,
+ 'src_cidr': None,
+ 'dst_cidr': None,
+ 'protocol': None,
+ 'src_port': None,
+ 'dst_port': None})
self.assertEqual(pf_id, pf_ref['packet_filter']['id'])
for key in kwargs:
# convert string to int.
kwargs.update({'priority': 103, 'eth_type': 2048,
- 'src_port': u'', 'dst_port': 80})
+ 'dst_port': 80,
+ # wildcard field is None in a response.
+ 'src_port': None})
self.assertEqual(pf_id, pf_ref['packet_filter']['id'])
+ self.assertTrue(pf_ref['packet_filter']['in_port'])
for key in kwargs:
self.assertEqual(kwargs[key], pf_ref['packet_filter'][key])
self._update('ports', in_port_id, data)
self.assertEqual(self.ofc.create_ofc_packet_filter.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_packet_filter.call_count, 0)
+
+
+class TestNecPluginPacketFilterWithValidate(TestNecPluginPacketFilterBase):
+
+ def setUp(self):
+ super(TestNecPluginPacketFilterWithValidate, self).setUp()
+ # Remove attributes explicitly from mock object to check
+ # a case where there are no update_filter.
+ del self.ofc.driver.update_filter
+ self.validate_create = self.ofc.driver.validate_filter_create
+ self.validate_update = self.ofc.driver.validate_filter_update
+
+ def test_create_pf_on_network(self):
+ with self.packet_filter_on_network() as pf:
+ pf_id = pf['packet_filter']['id']
+ self.assertEqual(pf['packet_filter']['status'], 'ACTIVE')
+
+ ctx = mock.ANY
+ pf_dict = mock.ANY
+ expected = [
+ mock.call.driver.validate_filter_create(ctx, pf_dict),
+ mock.call.exists_ofc_packet_filter(ctx, pf_id),
+ mock.call.create_ofc_packet_filter(ctx, pf_id, pf_dict),
+ mock.call.exists_ofc_packet_filter(ctx, pf_id),
+ mock.call.delete_ofc_packet_filter(ctx, pf_id),
+ ]
+ self.ofc.assert_has_calls(expected)
+ self.assertEqual(self.ofc.create_ofc_packet_filter.call_count, 1)
+ self.assertEqual(self.ofc.delete_ofc_packet_filter.call_count, 1)
+
+ def test_update_pf_on_network(self):
+ ctx = mock.ANY
+ pf_dict = mock.ANY
+ with self.packet_filter_on_network(admin_state_up=False) as pf:
+ pf_id = pf['packet_filter']['id']
+
+ self.assertFalse(self.ofc.create_ofc_packet_filter.called)
+ data = {'packet_filter': {'admin_state_up': True}}
+ self._update('packet_filters', pf_id, data)
+ self.ofc.create_ofc_packet_filter.assert_called_once_with(
+ ctx, pf_id, pf_dict)
+ self.ofc.driver.validate_filter_update.assert_called_once_with(
+ ctx, data['packet_filter'])
+
+ self.assertFalse(self.ofc.delete_ofc_packet_filter.called)
+ data = {'packet_filter': {'admin_state_up': False}}
+ self._update('packet_filters', pf_id, data)
+ self.ofc.delete_ofc_packet_filter.assert_called_once_with(
+ ctx, pf_id)
+ self.assertEqual(
+ 2, self.ofc.driver.validate_filter_update.call_count)
+
+ def test_create_pf_on_network_with_validation_error(self):
+ self.validate_create.side_effect = ext_pf.PacketFilterInvalidPriority(
+ min=1, max=65535)
+ with self.network() as net:
+ net_id = net['network']['id']
+ e = self.assertRaises(webob.exc.HTTPClientError,
+ self._make_packet_filter,
+ self.fmt, net_id, expected_res_status=400)
+ self.assertEqual(400, e.status_int)
+
+ def test_update_pf_on_network_with_validation_error(self):
+ self.validate_update.side_effect = (
+ ext_pf.PacketFilterUpdateNotSupported(field='priority'))
+ with self.packet_filter_on_network() as pf:
+ pf_id = pf['packet_filter']['id']
+ pf_ref = self._show('packet_filters', pf_id)
+ self.assertEqual(pf_ref['packet_filter']['status'], 'ACTIVE')
+ data = {'packet_filter': {'priority': 1000}}
+ self._update('packet_filters', pf_id, data,
+ expected_code=400)
+
+
+class TestNecPluginPacketFilterWithFilterUpdate(TestNecPluginPacketFilterBase):
+
+ def setUp(self):
+ super(TestNecPluginPacketFilterWithFilterUpdate, self).setUp()
+ # Remove attributes explicitly from mock object to check
+ # a case where there are no update_filter and validate_*.
+ del self.ofc.driver.validate_filter_create
+ del self.ofc.driver.validate_filter_update
+
+ def test_update_pf_toggle_admin_state(self):
+ ctx = mock.ANY
+ pf_dict = mock.ANY
+ with self.packet_filter_on_network(admin_state_up=False) as pf:
+ pf_id = pf['packet_filter']['id']
+
+ self.assertFalse(self.ofc.create_ofc_packet_filter.called)
+ data = {'packet_filter': {'admin_state_up': True}}
+ self._update('packet_filters', pf_id, data)
+ self.ofc.create_ofc_packet_filter.assert_called_once_with(
+ ctx, pf_id, pf_dict)
+
+ self.assertFalse(self.ofc.delete_ofc_packet_filter.called)
+ data = {'packet_filter': {'admin_state_up': False}}
+ self._update('packet_filters', pf_id, data)
+ self.ofc.delete_ofc_packet_filter.assert_called_once_with(
+ ctx, pf_id)
+
+ def test_update_pf_change_field(self):
+ ctx = mock.ANY
+ with self.packet_filter_on_network(admin_state_up=True) as pf:
+ pf_id = pf['packet_filter']['id']
+ self.assertTrue(self.ofc.create_ofc_packet_filter.called)
+
+ data = {'packet_filter': {'src_mac': '12:34:56:78:9a:bc'}}
+ self._update('packet_filters', pf_id, data)
+ self.ofc.update_ofc_packet_filter.assert_called_once_with(
+ ctx, pf_id, data['packet_filter'])
+ self.assertEqual(1, self.ofc.update_ofc_packet_filter.call_count)
+
+ self.assertFalse(self.ofc.delete_ofc_packet_filter.called)
+ data = {'packet_filter': {'admin_state_up': False}}
+ self._update('packet_filters', pf_id, data)
+ self.ofc.delete_ofc_packet_filter.assert_called_once_with(
+ ctx, pf_id)
+
+ data = {'packet_filter': {'src_mac': '11:22:33:44:55:66'}}
+ self._update('packet_filters', pf_id, data)
+ self.assertEqual(1, self.ofc.update_ofc_packet_filter.call_count)
+
+ data = {'packet_filter': {'admin_state_up': True}}
+ self._update('packet_filters', pf_id, data)
+
+ data = {'packet_filter': {'src_mac': '66:55:44:33:22:11'}}
+ self._update('packet_filters', pf_id, data)
+ self.assertEqual(2, self.ofc.update_ofc_packet_filter.call_count)
import random
import string
+import uuid
import mock
import netaddr
+from neutron.common import constants
from neutron.openstack.common import uuidutils
from neutron.plugins.nec.common import ofc_client as ofc
from neutron.plugins.nec.db import api as ndb
from neutron.plugins.nec.db import models as nmodels
from neutron.plugins.nec import drivers
+from neutron.plugins.nec.drivers import pfc
+from neutron.plugins.nec.extensions import packetfilter as ext_pf
from neutron.tests import base
class PFCDriverTestBase(base.BaseTestCase):
driver = 'neutron.plugins.nec.drivers.pfc.PFCDriverBase'
+ filter_supported = False
def setUp(self):
super(PFCDriverTestBase, self).setUp()
self.driver.delete_network(net_path)
self.do_request.assert_called_once_with("DELETE", net_path)
- def testg_create_port(self):
+ def _test_create_port(self, call_filters_arg=None, send_filters_arg=None):
t, n, p = self.get_ofc_item_random_params()
net_path = "/tenants/%s/networks/%s" % (_ofc(t), _ofc(n))
body = {'datapath_id': p.datapath_id,
'port': str(p.port_no),
'vid': str(p.vlan_id)}
+ if send_filters_arg is not None:
+ body['filters'] = send_filters_arg
port = {'id': _ofc(p.id)}
self.do_request.return_value = port
- ret = self.driver.create_port(net_path, p, p.id)
+ if call_filters_arg is not None:
+ ret = self.driver.create_port(net_path, p, p.id, call_filters_arg)
+ else:
+ ret = self.driver.create_port(net_path, p, p.id)
self.do_request.assert_called_once_with("POST", post_path, body=body)
self.assertEqual(ret, port_path)
+ def testg_create_port(self):
+ self._test_create_port()
+
+ def test_create_port_with_filters_argument(self):
+ # If no filter support, 'filters' argument is passed to OFC.
+ # Note that it will be overridden in a test class with filter support.
+ self._test_create_port(call_filters_arg=['dummy'],
+ send_filters_arg=None)
+
def testh_delete_port(self):
t, n, p = self.get_ofc_item_random_params()
self.do_request.assert_called_once_with("DELETE", port_path)
def test_filter_supported(self):
- self.assertFalse(self.driver.filter_supported())
+ self.assertEqual(self.filter_supported, self.driver.filter_supported())
class PFCDriverBaseTest(PFCDriverTestBase):
- pass
+
+ def test_extract_ofc_network_id(self):
+ network_id = '/tenants/tenant-a/networks/network-a'
+ self.assertEqual('network-a',
+ self.driver._extract_ofc_network_id(network_id))
+
+ def test_extract_ofc_network_id_failure(self):
+ network_id = '/tenants/tenant-a/networks/network-a/dummy'
+ self.assertRaises(pfc.InvalidOFCIdFormat,
+ self.driver._extract_ofc_network_id, network_id)
+
+ def test_extract_ofc_port_id(self):
+ port_id = '/tenants/tenant-a/networks/network-a/ports/port-a'
+ self.assertEqual({'tenant': 'tenant-a',
+ 'network': 'network-a',
+ 'port': 'port-a'},
+ self.driver._extract_ofc_port_id(port_id))
+
+ def test_extract_ofc_port_id_failure(self):
+ port_id = '/tenants/tenant-a/dummy/network-a/ports/port-a'
+ self.assertRaises(pfc.InvalidOFCIdFormat,
+ self.driver._extract_ofc_port_id, port_id)
class PFCV3DriverTest(PFCDriverTestBase):
self.assertEqual(data['routes'], expected)
+class PFCFilterDriverTestMixin:
+ def _test_create_filter(self, filter_dict=None, filter_post=None,
+ apply_ports=None):
+ t, n, p = self.get_ofc_item_random_params()
+
+ filter_id = uuidutils.generate_uuid()
+ f = {'priority': 123, 'action': "ACCEPT"}
+ if filter_dict:
+ f.update(filter_dict)
+
+ net_path = "/networks/%s" % n
+ body = {'action': 'pass', 'priority': 123}
+ if filter_post:
+ body.update(filter_post)
+
+ self.do_request.return_value = {'id': filter_id}
+ if apply_ports is not None:
+ ret = self.driver.create_filter(net_path, f, p,
+ apply_ports=apply_ports)
+ else:
+ ret = self.driver.create_filter(net_path, f, p)
+ self.do_request.assert_called_once_with("POST", "/filters",
+ body=body)
+ self.assertEqual(ret, '/filters/%s' % filter_id)
+
+ def test_create_filter_accept(self):
+ self._test_create_filter(filter_dict={'action': 'ACCEPT'})
+
+ def test_create_filter_allow(self):
+ self._test_create_filter(filter_dict={'action': 'ALLOW'})
+
+ def test_create_filter_deny(self):
+ self._test_create_filter(filter_dict={'action': 'DENY'},
+ filter_post={'action': 'drop'})
+
+ def test_create_filter_drop(self):
+ self._test_create_filter(filter_dict={'action': 'DROP'},
+ filter_post={'action': 'drop'})
+
+ def test_create_filter_empty_field_not_post(self):
+ filter_dict = {'src_mac': '', 'src_cidr': '', 'src_port': 0,
+ 'dst_mac': '', 'dst_cidr': '', 'dst_port': 0,
+ 'protocol': '', 'eth_type': 0}
+ filter_post = {}
+ self._test_create_filter(filter_dict=filter_dict,
+ filter_post=filter_post)
+
+ def test_create_filter_none_field_not_post(self):
+ filter_dict = {'src_mac': None, 'src_cidr': None, 'src_port': None,
+ 'dst_mac': None, 'dst_cidr': None, 'dst_port': None,
+ 'protocol': None, 'eth_type': None}
+ filter_post = {}
+ self._test_create_filter(filter_dict=filter_dict,
+ filter_post=filter_post)
+
+ def test_create_filter_all_fields(self):
+ filter_dict = {'src_mac': '11:22:33:44:55:66',
+ 'dst_mac': '77:88:99:aa:bb:cc',
+ 'src_cidr': '192.168.3.0/24',
+ 'dst_cidr': '10.11.240.0/20',
+ 'src_port': 12345,
+ 'dst_port': 23456,
+ 'protocol': '0x10',
+ 'eth_type': 0x800}
+ filter_post = filter_dict.copy()
+ filter_post['protocol'] = 16
+ filter_post['eth_type'] = '0x800'
+ self._test_create_filter(filter_dict=filter_dict,
+ filter_post=filter_post)
+
+ def test_create_filter_cidr_ip_addr_32(self):
+ filter_dict = {'src_cidr': '192.168.3.1',
+ 'dst_cidr': '10.11.240.2'}
+ filter_post = {'src_cidr': '192.168.3.1/32',
+ 'dst_cidr': '10.11.240.2/32'}
+ self._test_create_filter(filter_dict=filter_dict,
+ filter_post=filter_post)
+
+ def test_create_filter_proto_tcp(self):
+ filter_dict = {'protocol': 'TCP'}
+ filter_post = {'protocol': constants.PROTO_NUM_TCP}
+ self._test_create_filter(filter_dict=filter_dict,
+ filter_post=filter_post)
+
+ def test_create_filter_proto_udp(self):
+ filter_dict = {'protocol': 'UDP'}
+ filter_post = {'protocol': constants.PROTO_NUM_UDP}
+ self._test_create_filter(filter_dict=filter_dict,
+ filter_post=filter_post)
+
+ def test_create_filter_proto_icmp(self):
+ filter_dict = {'protocol': 'ICMP'}
+ filter_post = {'protocol': constants.PROTO_NUM_ICMP}
+ self._test_create_filter(filter_dict=filter_dict,
+ filter_post=filter_post)
+
+ def test_create_filter_proto_arp_not_proto_post(self):
+ filter_dict = {'protocol': 'ARP'}
+ filter_post = {}
+ self._test_create_filter(filter_dict=filter_dict,
+ filter_post=filter_post)
+
+ def test_create_filter_apply_ports(self):
+ apply_ports = [
+ ('p1', '/tenants/tenant-1/networks/network-1/ports/port-1'),
+ ('p2', '/tenants/tenant-2/networks/network-2/ports/port-2')]
+ filter_post = {'apply_ports': [
+ {'tenant': 'tenant-1', 'network': 'network-1', 'port': 'port-1'},
+ {'tenant': 'tenant-2', 'network': 'network-2', 'port': 'port-2'}
+ ]}
+ self._test_create_filter(filter_dict={}, apply_ports=apply_ports,
+ filter_post=filter_post)
+
+ def _test_update_filter(self, filter_dict=None, filter_post=None):
+ filter_id = uuidutils.generate_uuid()
+ ofc_filter_id = '/filters/%s' % filter_id
+ self.driver.update_filter(ofc_filter_id, filter_dict)
+ self.do_request.assert_called_once_with("PUT", ofc_filter_id,
+ body=filter_post)
+
+ def test_update_filter_empty_fields(self):
+ filter_dict = {'src_mac': '', 'src_cidr': '', 'src_port': 0,
+ 'dst_mac': '', 'dst_cidr': '', 'dst_port': 0,
+ 'protocol': '', 'eth_type': 0}
+ filter_post = {'src_mac': '', 'src_cidr': '', 'src_port': '',
+ 'dst_mac': '', 'dst_cidr': '', 'dst_port': '',
+ 'protocol': '', 'eth_type': ''}
+ self._test_update_filter(filter_dict=filter_dict,
+ filter_post=filter_post)
+
+ def test_update_filter_none_fields(self):
+ filter_dict = {'src_mac': None, 'src_cidr': None, 'src_port': None,
+ 'dst_mac': None, 'dst_cidr': None, 'dst_port': None,
+ 'protocol': None, 'eth_type': None}
+ filter_post = {'src_mac': '', 'src_cidr': '', 'src_port': '',
+ 'dst_mac': '', 'dst_cidr': '', 'dst_port': '',
+ 'protocol': '', 'eth_type': ''}
+ self._test_update_filter(filter_dict=filter_dict,
+ filter_post=filter_post)
+
+ def test_update_filter_all_fields(self):
+ filter_dict = {'src_mac': '11:22:33:44:55:66',
+ 'dst_mac': '77:88:99:aa:bb:cc',
+ 'src_cidr': '192.168.3.0/24',
+ 'dst_cidr': '10.11.240.0/20',
+ 'src_port': 12345,
+ 'dst_port': 23456,
+ 'protocol': '0x10',
+ 'eth_type': 0x800}
+ filter_post = filter_dict.copy()
+ filter_post['protocol'] = 16
+ filter_post['eth_type'] = '0x800'
+ self._test_update_filter(filter_dict=filter_dict,
+ filter_post=filter_post)
+
+ def test_update_filter_cidr_ip_addr_32(self):
+ filter_dict = {'src_cidr': '192.168.3.1',
+ 'dst_cidr': '10.11.240.2'}
+ filter_post = {'src_cidr': '192.168.3.1/32',
+ 'dst_cidr': '10.11.240.2/32'}
+ self._test_update_filter(filter_dict=filter_dict,
+ filter_post=filter_post)
+
+ def test_update_filter_proto_tcp(self):
+ filter_dict = {'protocol': 'TCP'}
+ filter_post = {'protocol': constants.PROTO_NUM_TCP}
+ self._test_update_filter(filter_dict=filter_dict,
+ filter_post=filter_post)
+
+ def test_update_filter_proto_udp(self):
+ filter_dict = {'protocol': 'UDP'}
+ filter_post = {'protocol': constants.PROTO_NUM_UDP}
+ self._test_update_filter(filter_dict=filter_dict,
+ filter_post=filter_post)
+
+ def test_update_filter_proto_icmp(self):
+ filter_dict = {'protocol': 'ICMP'}
+ filter_post = {'protocol': constants.PROTO_NUM_ICMP}
+ self._test_update_filter(filter_dict=filter_dict,
+ filter_post=filter_post)
+
+ def test_update_filter_proto_arp_post_empty(self):
+ filter_dict = {'protocol': 'ARP'}
+ filter_post = {'protocol': ''}
+ self._test_update_filter(filter_dict=filter_dict,
+ filter_post=filter_post)
+
+ def test_delete_filter(self):
+ t, n, p = self.get_ofc_item_random_params()
+ f_path = "/filters/%s" % uuidutils.generate_uuid()
+ self.driver.delete_filter(f_path)
+ self.do_request.assert_called_once_with("DELETE", f_path)
+
+ def _test_validate_filter_duplicate_priority(self, method, found_dup):
+ with mock.patch('neutron.manager.NeutronManager'
+ '.get_plugin') as get_plugin:
+ plugin = get_plugin.return_value
+ if found_dup:
+ plugin.get_packet_filters.return_value = ['found']
+ else:
+ plugin.get_packet_filters.return_value = []
+ network_id = str(uuid.uuid4())
+ filter_dict = {'network_id': network_id,
+ 'priority': 12}
+ if found_dup:
+ self.assertRaises(ext_pf.PacketFilterDuplicatedPriority,
+ method, 'context', filter_dict)
+ else:
+ self.assertIsNone(method('context', filter_dict))
+ plugin.get_packet_filters.assert_called_once_with(
+ 'context',
+ filters={'network_id': [network_id],
+ 'priority': [12]},
+ fields=['id'])
+
+ def test_validate_filter_create_no_duplicate_priority(self):
+ self._test_validate_filter_duplicate_priority(
+ self.driver.validate_filter_create,
+ found_dup=False)
+
+ def test_validate_filter_create_duplicate_priority(self):
+ self._test_validate_filter_duplicate_priority(
+ self.driver.validate_filter_create,
+ found_dup=True)
+
+ def test_validate_filter_update_action_raises_error(self):
+ filter_dict = {'action': 'ALLOW'}
+ self.assertRaises(ext_pf.PacketFilterUpdateNotSupported,
+ self.driver.validate_filter_update,
+ 'context', filter_dict)
+
+ def test_validate_filter_update_priority_raises_error(self):
+ filter_dict = {'priority': '13'}
+ self.assertRaises(ext_pf.PacketFilterUpdateNotSupported,
+ self.driver.validate_filter_update,
+ 'context', filter_dict)
+
+ def _test_validate_filter_ipv6_not_supported(self, field, create=True):
+ if create:
+ filter_dict = {'network_id': 'net1', 'priority': 12}
+ method = self.driver.validate_filter_create
+ else:
+ filter_dict = {}
+ method = self.driver.validate_filter_update
+ filter_dict[field] = 'fe80::1'
+ self.assertRaises(ext_pf.PacketFilterIpVersionNonSupported,
+ method, 'context', filter_dict)
+ filter_dict[field] = '10.56.3.3'
+ self.assertIsNone(method('context', filter_dict))
+
+ def test_validate_filter_create_ipv6_not_supported(self):
+ with mock.patch('neutron.manager.NeutronManager'
+ '.get_plugin') as get_plugin:
+ plugin = get_plugin.return_value
+ plugin.get_packet_filters.return_value = []
+ self._test_validate_filter_ipv6_not_supported(
+ 'src_cidr', create=True)
+ self._test_validate_filter_ipv6_not_supported(
+ 'dst_cidr', create=True)
+
+ def test_validate_filter_update_ipv6_not_supported(self):
+ self._test_validate_filter_ipv6_not_supported('src_cidr', create=False)
+ self._test_validate_filter_ipv6_not_supported('dst_cidr', create=False)
+
+ def _test_validate_filter_priority_range_one(self, method, priority, ok):
+ filter_dict = {'priority': priority, 'network_id': 'net1'}
+ if ok:
+ self.assertIsNone(method('context', filter_dict))
+ else:
+ self.assertRaises(ext_pf.PacketFilterInvalidPriority,
+ method, 'context', filter_dict)
+
+ def test_validate_filter_create_priority_range(self):
+ with mock.patch('neutron.manager.NeutronManager'
+ '.get_plugin') as get_plugin:
+ plugin = get_plugin.return_value
+ plugin.get_packet_filters.return_value = []
+
+ method = self.driver.validate_filter_create
+ self._test_validate_filter_priority_range_one(method, 0, False)
+ self._test_validate_filter_priority_range_one(method, 1, True)
+ self._test_validate_filter_priority_range_one(method, 32766, True)
+ self._test_validate_filter_priority_range_one(method, 32767, False)
+
+
+class PFCV51DriverTest(PFCFilterDriverTestMixin, PFCV5DriverTest):
+ driver = 'pfc_v51'
+ filter_supported = True
+
+ def test_create_port_with_filters_argument(self):
+ self._test_create_port(
+ call_filters_arg=[('neutron-id-1', '/filters/filter-1'),
+ ('neutron-id-2', '/filters/filter-2')],
+ send_filters_arg=['filter-1', 'filter-2'])
+
+
class PFCDriverStringTest(base.BaseTestCase):
driver = 'neutron.plugins.nec.drivers.pfc.PFCDriverBase'
ofp_wildcards = ["%s:32" % _f if _f in ['nw_src', 'nw_dst'] else _f
for _f in all_wildcards_ofp if _f not in body]
- body['ofp_wildcards'] = ','.join(ofp_wildcards)
+ body['ofp_wildcards'] = set(ofp_wildcards)
non_ofp_wildcards = [_f for _f in all_wildcards_non_ofp
if _f not in body]
if non_ofp_wildcards:
- body['wildcards'] = ','.join(non_ofp_wildcards)
+ body['wildcards'] = set(non_ofp_wildcards)
ret = self.driver.create_filter(net_path, f, p, f['id'])
- self.do_request.assert_called_once_with("POST", "/filters", body=body)
+ # The content of 'body' is checked below.
+ self.do_request.assert_called_once_with("POST", "/filters",
+ body=mock.ANY)
self.assertEqual(ret, '/filters/%s' % f['id'])
+ # ofp_wildcards and wildcards in body are comma-separated
+ # string but the order of elements are not considered,
+ # so we check these fields as set.
+ actual_body = self.do_request.call_args[1]['body']
+ if 'ofp_wildcards' in actual_body:
+ ofp_wildcards = actual_body['ofp_wildcards'].split(',')
+ actual_body['ofp_wildcards'] = set(ofp_wildcards)
+ if 'wildcards' in actual_body:
+ actual_body['wildcards'] = set(actual_body['wildcards'].split(','))
+ self.assertEqual(body, actual_body)
+
def test_create_filter_accept(self):
self._test_create_filter(filter_dict={'action': 'ACCEPT'})
self.assertEqual(msg,
"'10' is too large - must be no larger than '9'")
- def test_validate_mac_address(self):
+ def _test_validate_mac_address(self, validator, allow_none=False):
mac_addr = "ff:16:3e:4f:00:00"
- msg = attributes._validate_mac_address(mac_addr)
+ msg = validator(mac_addr)
self.assertIsNone(msg)
mac_addr = "ffa:16:3e:4f:00:00"
- msg = attributes._validate_mac_address(mac_addr)
+ msg = validator(mac_addr)
self.assertEqual(msg, "'%s' is not a valid MAC address" % mac_addr)
+ mac_addr = None
+ msg = validator(mac_addr)
+ if allow_none:
+ self.assertIsNone(msg)
+ else:
+ self.assertEqual(msg, "'None' is not a valid MAC address")
+
+ def test_validate_mac_address(self):
+ self._test_validate_mac_address(attributes._validate_mac_address)
+
+ def test_validate_mac_address_or_none(self):
+ self._test_validate_mac_address(
+ attributes._validate_mac_address_or_none, allow_none=True)
+
def test_validate_ip_address(self):
ip_addr = '1.1.1.1'
msg = attributes._validate_ip_address(ip_addr)
attributes.MAC_PATTERN)
self.assertIsNotNone(msg)
- def test_validate_subnet(self):
+ def _test_validate_subnet(self, validator, allow_none=False):
# Valid - IPv4
cidr = "10.0.2.0/24"
- msg = attributes._validate_subnet(cidr,
- None)
+ msg = validator(cidr, None)
self.assertIsNone(msg)
# Valid - IPv6 without final octets
cidr = "fe80::/24"
- msg = attributes._validate_subnet(cidr,
- None)
+ msg = validator(cidr, None)
self.assertIsNone(msg)
# Valid - IPv6 with final octets
cidr = "fe80::/24"
- msg = attributes._validate_subnet(cidr,
- None)
+ msg = validator(cidr, None)
self.assertIsNone(msg)
# Valid - uncompressed ipv6 address
cidr = "fe80:0:0:0:0:0:0:0/128"
- msg = attributes._validate_subnet(cidr,
- None)
+ msg = validator(cidr, None)
self.assertIsNone(msg)
# Valid - ipv6 address with multiple consecutive zero
cidr = "2001:0db8:0:0:1::1/128"
- msg = attributes._validate_subnet(cidr,
- None)
+ msg = validator(cidr, None)
self.assertIsNone(msg)
# Valid - ipv6 address with multiple consecutive zero
cidr = "2001:0db8::1:0:0:1/128"
- msg = attributes._validate_subnet(cidr,
- None)
+ msg = validator(cidr, None)
self.assertIsNone(msg)
# Valid - ipv6 address with multiple consecutive zero
cidr = "2001::0:1:0:0:1100/120"
- msg = attributes._validate_subnet(cidr,
- None)
+ msg = validator(cidr, None)
self.assertIsNone(msg)
# Valid - abbreviated ipv4 address
cidr = "10/24"
- msg = attributes._validate_subnet(cidr,
- None)
+ msg = validator(cidr, None)
self.assertIsNone(msg)
# Invalid - IPv4 missing mask
cidr = "10.0.2.0"
- msg = attributes._validate_subnet(cidr,
- None)
+ msg = validator(cidr, None)
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "10.0.2.0/32"}
# Invalid - IPv4 with final octets
cidr = "192.168.1.1/24"
- msg = attributes._validate_subnet(cidr,
- None)
+ msg = validator(cidr, None)
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "192.168.1.0/24"}
# Invalid - IPv6 without final octets, missing mask
cidr = "fe80::"
- msg = attributes._validate_subnet(cidr,
- None)
+ msg = validator(cidr, None)
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "fe80::/128"}
# Invalid - IPv6 with final octets, missing mask
cidr = "fe80::0"
- msg = attributes._validate_subnet(cidr,
- None)
+ msg = validator(cidr, None)
error = _("'%(data)s' isn't a recognized IP subnet cidr,"
" '%(cidr)s' is recommended") % {"data": cidr,
"cidr": "fe80::/128"}
# Invalid - Address format error
cidr = 'invalid'
- msg = attributes._validate_subnet(cidr,
- None)
+ msg = validator(cidr, None)
error = "'%s' is not a valid IP subnet" % cidr
self.assertEqual(msg, error)
- def test_validate_regex(self):
+ cidr = None
+ msg = validator(cidr, None)
+ if allow_none:
+ self.assertIsNone(msg)
+ else:
+ error = "'%s' is not a valid IP subnet" % cidr
+ self.assertEqual(msg, error)
+
+ def test_validate_subnet(self):
+ self._test_validate_subnet(attributes._validate_subnet)
+
+ def test_validate_subnet_or_none(self):
+ self._test_validate_subnet(attributes._validate_subnet_or_none,
+ allow_none=True)
+
+ def _test_validate_regex(self, validator, allow_none=False):
pattern = '[hc]at'
data = None
- msg = attributes._validate_regex(data, pattern)
- self.assertEqual(msg, "'%s' is not a valid input" % data)
+ msg = validator(data, pattern)
+ if allow_none:
+ self.assertIsNone(msg)
+ else:
+ self.assertEqual(msg, "'None' is not a valid input")
data = 'bat'
- msg = attributes._validate_regex(data, pattern)
+ msg = validator(data, pattern)
self.assertEqual(msg, "'%s' is not a valid input" % data)
data = 'hat'
- msg = attributes._validate_regex(data, pattern)
+ msg = validator(data, pattern)
self.assertIsNone(msg)
data = 'cat'
- msg = attributes._validate_regex(data, pattern)
+ msg = validator(data, pattern)
self.assertIsNone(msg)
+ def test_validate_regex(self):
+ self._test_validate_regex(attributes._validate_regex)
+
+ def test_validate_regex_or_none(self):
+ self._test_validate_regex(attributes._validate_regex_or_none,
+ allow_none=True)
+
def test_validate_uuid(self):
msg = attributes._validate_uuid('garbage')
self.assertEqual(msg, "'garbage' is not a valid UUID")