def __init__(self, context, plugin_rpc):
self.context = context
self.plugin_rpc = plugin_rpc
- self.init_firewall()
if sg_rpc.is_firewall_enabled():
+ self.init_firewall()
self._setup_rpc()
def _setup_rpc(self):
def port_update(self, context, port=None, network_type=None,
segmentation_id=None, physical_network=None):
LOG.debug(_("port_update received"))
- if 'security_groups' in port:
- self.sec_groups_agent.refresh_firewall()
+ if CONF.SECURITYGROUP.enable_security_group:
+ if 'security_groups' in port:
+ self.sec_groups_agent.refresh_firewall()
self._treat_vif_port(
port['id'], port['network_id'],
device_details['segmentation_id'],
device_details['admin_state_up'])
- self.sec_groups_agent.prepare_devices_filter(devices)
+ # check if security groups is enabled.
+ # if not, teardown the security group rules
+ if CONF.SECURITYGROUP.enable_security_group:
+ self.sec_groups_agent.prepare_devices_filter([device])
+ else:
+ self._utils.remove_all_security_rules(
+ device_details['port_id'])
self.plugin_rpc.update_device_up(self.context,
device,
self.agent_id,
_ICMP_PROTOCOL = '1'
_MAX_WEIGHT = 65500
+ # 2 directions x 2 address types = 4 ACLs
+ _REJECT_ACLS_COUNT = 4
+
_wmi_namespace = '//./root/virtualization/v2'
def __init__(self):
self._check_job_status(ret_val, job_path)
def _remove_virt_feature(self, feature_resource):
+ self._remove_multiple_virt_features([feature_resource])
+
+ def _remove_multiple_virt_features(self, feature_resources):
vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
(job_path, ret_val) = vs_man_svc.RemoveFeatureSettings(
- FeatureSettings=[feature_resource.path_()])
+ FeatureSettings=[f.path_() for f in feature_resources])
self._check_job_status(ret_val, job_path)
def disconnect_switch_port(
for acl in filtered_acls:
self._remove_virt_feature(acl)
+ def remove_all_security_rules(self, switch_port_name):
+ port, found = self._get_switch_port_allocation(switch_port_name, False)
+ if not found:
+ # Port not found. It happens when the VM was already deleted.
+ return
+
+ acls = port.associators(wmi_result_class=self._PORT_EXT_ACL_SET_DATA)
+ filtered_acls = [a for a in acls if
+ a.Action is not self._ACL_ACTION_METER]
+
+ if filtered_acls:
+ self._remove_multiple_virt_features(filtered_acls)
+
def create_default_reject_all_rules(self, switch_port_name):
port, found = self._get_switch_port_allocation(switch_port_name, False)
if not found:
acls = port.associators(wmi_result_class=self._PORT_EXT_ACL_SET_DATA)
filtered_acls = [v for v in acls if v.Action == self._ACL_ACTION_DENY]
- # 2 directions x 2 address types x 2 protocols = 8 ACLs
- if len(filtered_acls) >= 8:
+ if len(filtered_acls) >= self._REJECT_ACLS_COUNT:
return
for acl in filtered_acls:
_PORT_EXT_ACL_SET_DATA = 'Msvm_EthernetSwitchPortExtendedAclSettingData'
_MAX_WEIGHT = 65500
+ # 2 directions x 2 address types x 3 protocols = 12 ACLs
+ _REJECT_ACLS_COUNT = 12
+
def _create_security_acl(self, direction, acl_type, action, local_port,
protocol, remote_addr, weight):
acl = self._get_default_setting_data(self._PORT_EXT_ACL_SET_DATA)
self._utils._remove_virt_feature.assert_called_once_with(m_acl)
self._utils._bind_security_rule.assert_has_calls(calls)
+ @mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
+ '._remove_virt_feature')
+ @mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
+ '._bind_security_rule')
+ def test_create_default_reject_all_rules_already_added(self, mock_bind,
+ mock_remove):
+ (m_port, m_acl) = self._setup_security_rule_test()
+ m_acl.Action = self._utils._ACL_ACTION_DENY
+ m_port.associators.return_value = [
+ m_acl] * self._utils._REJECT_ACLS_COUNT
+ self._utils.create_default_reject_all_rules(self._FAKE_PORT_NAME)
+
+ self.assertFalse(self._utils._remove_virt_feature.called)
+ self.assertFalse(self._utils._bind_security_rule.called)
+
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
'._remove_virt_feature')
@mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
self._FAKE_LOCAL_PORT, self._FAKE_PROTOCOL, self._FAKE_REMOTE_ADDR)
self._utils._remove_virt_feature.assert_called_once_with(mock_acl)
+ @mock.patch('neutron.plugins.hyperv.agent.utilsv2.HyperVUtilsV2'
+ '._remove_multiple_virt_features')
+ def test_remove_all_security_rules(self, mock_remove_feature):
+ mock_acl = self._setup_security_rule_test()[1]
+ self._utils.remove_all_security_rules(self._FAKE_PORT_NAME)
+ self._utils._remove_multiple_virt_features.assert_called_once_with(
+ [mock_acl])
+
def _setup_security_rule_test(self):
mock_port = mock.MagicMock()
mock_acl = mock.MagicMock()