'egress': utilsv2.HyperVUtilsV2._ACL_DIR_OUT},
'ethertype': {'IPv4': utilsv2.HyperVUtilsV2._ACL_TYPE_IPV4,
'IPv6': utilsv2.HyperVUtilsV2._ACL_TYPE_IPV6},
+ 'protocol': {'icmp': utilsv2.HyperVUtilsV2._ICMP_PROTOCOL},
'default': "ANY",
'address_default': {'IPv4': '0.0.0.0/0', 'IPv6': '::/0'}
}
'direction': self._ACL_PROP_MAP['direction'][rule['direction']],
'acl_type': self._ACL_PROP_MAP['ethertype'][rule['ethertype']],
'local_port': local_port,
- 'protocol': self._get_rule_prop_or_default(rule, 'protocol'),
+ 'protocol': self._get_rule_protocol(rule),
'remote_address': self._get_rule_remote_address(rule)
}
return rule[ip_prefix]
return self._ACL_PROP_MAP['address_default'][rule['ethertype']]
+ def _get_rule_protocol(self, rule):
+ protocol = self._get_rule_prop_or_default(rule, 'protocol')
+ if protocol in self._ACL_PROP_MAP['protocol'].keys():
+ return self._ACL_PROP_MAP['protocol'][protocol]
+
+ return protocol
+
def _get_rule_prop_or_default(self, rule, prop):
if prop in rule:
return rule[prop]
_IPV6_ANY = '::/0'
_TCP_PROTOCOL = 'tcp'
_UDP_PROTOCOL = 'udp'
+ _ICMP_PROTOCOL = '1'
_MAX_WEIGHT = 65500
_wmi_namespace = '//./root/virtualization/v2'
ipv6_pair = (self._ACL_TYPE_IPV6, self._IPV6_ANY)
for direction in [self._ACL_DIR_IN, self._ACL_DIR_OUT]:
for acl_type, address in [ipv4_pair, ipv6_pair]:
- for protocol in [self._TCP_PROTOCOL, self._UDP_PROTOCOL]:
+ for protocol in [self._TCP_PROTOCOL,
+ self._UDP_PROTOCOL,
+ self._ICMP_PROTOCOL]:
self._bind_security_rule(
port, direction, acl_type, self._ACL_ACTION_DENY,
self._ACL_DEFAULT, protocol, address, weight)
_PORT_EXT_ACL_SET_DATA = 'Msvm_EthernetSwitchPortExtendedAclSettingData'
_MAX_WEIGHT = 65500
- def create_security_rule(self, switch_port_name, direction, acl_type,
- local_port, protocol, remote_address):
- protocols = [protocol]
- if protocol is self._ACL_DEFAULT:
- protocols = [self._TCP_PROTOCOL, self._UDP_PROTOCOL]
-
- for proto in protocols:
- super(HyperVUtilsV2R2, self).create_security_rule(
- switch_port_name, direction, acl_type, local_port,
- proto, remote_address)
-
- def remove_security_rule(self, switch_port_name, direction, acl_type,
- local_port, protocol, remote_address):
- protocols = [protocol]
- if protocol is self._ACL_DEFAULT:
- protocols = ['tcp', 'udp']
-
- for proto in protocols:
- super(HyperVUtilsV2R2, self).remove_security_rule(
- switch_port_name, direction, acl_type,
- local_port, proto, remote_address)
-
def _create_security_acl(self, direction, acl_type, action, local_port,
protocol, remote_addr, weight):
acl = self._get_default_setting_data(self._PORT_EXT_ACL_SET_DATA)
self.assertEqual(self._driver._ACL_PROP_MAP['address_default']['IPv6'],
actual)
+ def test_get_rule_protocol_icmp(self):
+ self._test_get_rule_protocol(
+ 'icmp', self._driver._ACL_PROP_MAP['protocol']['icmp'])
+
+ def test_get_rule_protocol_no_icmp(self):
+ self._test_get_rule_protocol('tcp', 'tcp')
+
+ def _test_get_rule_protocol(self, protocol, expected):
+ rule = self._create_security_rule()
+ rule['protocol'] = protocol
+ actual = self._driver._get_rule_protocol(rule)
+
+ self.assertEqual(expected, actual)
+
def _get_port(self):
return {
'device': self._FAKE_DEVICE,
for direction in [self._utils._ACL_DIR_IN, self._utils._ACL_DIR_OUT]:
for acl_type, address in [ipv4_pair, ipv6_pair]:
for protocol in [self._utils._TCP_PROTOCOL,
- self._utils._UDP_PROTOCOL]:
+ self._utils._UDP_PROTOCOL,
+ self._utils._ICMP_PROTOCOL]:
calls.append(mock.call(m_port, direction, acl_type,
self._utils._ACL_ACTION_DENY,
self._utils._ACL_DEFAULT,