Previously, the default value would be populated into attr by API
controller, but some codes in plugin or service plugins call plugin
to create network directly, such as l3, which will have no default
value populated.
This patch fixes it by populating default port_security value into
network data.
In addition, for network without port-security set, we also give the
default value to populate the return network dict object, which will
let the extension construct the response dictionary gracefully for
those existing network.
Co-Authored-By: gong yong sheng <gong.yongsheng@99cloud.net>
Change-Id: I73abc98d83372f6259f17680806e6541458e2077
Closes-bug: #
1461519
Closes-Bug: #
1461647
Closes-Bug: #
1468588
def process_create_network(self, context, data, result):
# Create the network extension attributes.
- if psec.PORTSECURITY in data:
- self._process_network_port_security_create(context, data, result)
+ if psec.PORTSECURITY not in data:
+ data[psec.PORTSECURITY] = (psec.EXTENDED_ATTRIBUTES_2_0['networks']
+ [psec.PORTSECURITY]['default'])
+ self._process_network_port_security_create(context, data, result)
def process_update_network(self, context, data, result):
# Update the network extension attributes.
self._extend_port_security_dict(result, db_data)
def _extend_port_security_dict(self, response_data, db_data):
- response_data[psec.PORTSECURITY] = (
+ if db_data.get('port_security') is None:
+ response_data[psec.PORTSECURITY] = (
+ psec.EXTENDED_ATTRIBUTES_2_0['networks']
+ [psec.PORTSECURITY]['default'])
+ else:
+ response_data[psec.PORTSECURITY] = (
db_data['port_security'][psec.PORTSECURITY])
def _determine_port_security(self, context, port):
from neutron.extensions import portsecurity as psec
from neutron.extensions import securitygroup as ext_sg
from neutron import manager
+from neutron.plugins.ml2.extensions import port_security
from neutron.tests.unit.db import test_db_base_plugin_v2
from neutron.tests.unit.extensions import test_securitygroup
'', 'not_network_owner')
res = req.get_response(self.api)
self.assertEqual(res.status_int, exc.HTTPForbidden.code)
+
+ def test_extend_port_dict_no_port_security(self):
+ """Test _extend_port_security_dict won't crash
+ if port_security item is None
+ """
+ for db_data in ({'port_security': None, 'name': 'net1'}, {}):
+ response_data = {}
+
+ driver = port_security.PortSecurityExtensionDriver()
+ driver._extend_port_security_dict(response_data, db_data)
+
+ self.assertTrue(response_data[psec.PORTSECURITY])
# License for the specific language governing permissions and limitations
# under the License.
+from neutron import context
from neutron.extensions import portsecurity as psec
+from neutron import manager
from neutron.plugins.ml2 import config
from neutron.tests.unit.extensions import test_portsecurity as test_psec
from neutron.tests.unit.plugins.ml2 import test_plugin
group='ml2')
super(PSExtDriverTestCase, self).setUp()
+ def test_create_net_port_security_default(self):
+ _core_plugin = manager.NeutronManager.get_plugin()
+ admin_ctx = context.get_admin_context()
+ _default_value = (psec.EXTENDED_ATTRIBUTES_2_0['networks']
+ [psec.PORTSECURITY]['default'])
+ args = {'network':
+ {'name': 'test',
+ 'tenant_id': '',
+ 'shared': False,
+ 'admin_state_up': True,
+ 'status': 'ACTIVE'}}
+ try:
+ network = _core_plugin.create_network(admin_ctx, args)
+ _value = network[psec.PORTSECURITY]
+ finally:
+ if network:
+ _core_plugin.delete_network(admin_ctx, network['id'])
+ self.assertEqual(_default_value, _value)
+
def test_create_port_with_secgroup_none_and_port_security_false(self):
if self._skip_security_group:
self.skipTest("Plugin does not support security groups")