with context.session.begin(subtransactions=True):
self._delete_port(context, id)
- def delete_ports(self, context, filters):
- with context.session.begin(subtransactions=True):
- # Disable eagerloads to avoid postgresql issues with outer joins
- # and SELECT FOR UPDATE. This means that only filters for columns
- # on the Port model will be effective, which is fine in nearly all
- # the cases where filters are used
- query = context.session.query(
- models_v2.Port).enable_eagerloads(False)
- ports = self._apply_filters_to_query(
- query, models_v2.Port, filters).with_lockmode('update').all()
- for port in ports:
- self.delete_port(context, port['id'])
+ def delete_ports_by_device_id(self, context, device_id, network_id=None):
+ query = (context.session.query(models_v2.Port.id)
+ .enable_eagerloads(False)
+ .filter(models_v2.Port.device_id == device_id))
+ if network_id:
+ query = query.filter(models_v2.Port.network_id == network_id)
+ port_ids = [p[0] for p in query]
+ for port_id in port_ids:
+ self.delete_port(context, port_id)
def _delete_port(self, context, id):
query = (context.session.query(models_v2.Port).
from neutron.db import dhcp_rpc_base
from neutron.db import external_net_db
from neutron.db import l3_rpc_base
-from neutron.db import models_v2
from neutron.db import portbindings_base
from neutron.db import portbindings_db
from neutron.db import quota_db # noqa
super(NECPluginV2, self).delete_port(context, id)
self.notify_security_groups_member_updated(context, port)
- def delete_ports(self, context, filters):
- # Note(amotoki): Override the superclass method to avoid
- # a long transaction over external API calls.
- # TODO(amotoki): Need to revisit after bug 1282925 is addressed.
- query = context.session.query(
- models_v2.Port).enable_eagerloads(False)
- query = self._apply_filters_to_query(
- query, models_v2.Port, filters)
- port_ids = [p['id'] for p in query]
- for port_id in port_ids:
- self.delete_port(context, port_id)
-
class NECPluginV2AgentNotifierApi(proxy.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin):
# See the License for the specific language governing permissions and
# limitations under the License.
-import contextlib
import os
import fixtures
pass
-class TestNecPortsV2(test_plugin.TestPortsV2, NecPluginV2TestCase):
-
- def test_delete_ports(self):
- with self.subnet() as subnet:
- with contextlib.nested(
- self.port(subnet=subnet, device_owner='test-owner',
- no_delete=True),
- self.port(subnet=subnet, device_owner='test-owner',
- no_delete=True),
- self.port(subnet=subnet, device_owner='other-owner'),
- ) as (p1, p2, p3):
- network_id = subnet['subnet']['network_id']
- filters = {'network_id': [network_id],
- 'device_owner': ['test-owner']}
- self.plugin.delete_ports(self.context, filters)
-
- self._show('ports', p1['port']['id'],
- expected_code=webob.exc.HTTPNotFound.code)
- self._show('ports', p2['port']['id'],
- expected_code=webob.exc.HTTPNotFound.code)
- self._show('ports', p3['port']['id'],
- expected_code=webob.exc.HTTPOk.code)
-
-
class TestNecNetworksV2(test_plugin.TestNetworksV2, NecPluginV2TestCase):
pass
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
return self.deserialize(fmt, res)
- def _do_side_effect(self, patched_plugin, orig, *args, **kwargs):
+ def _fail_second_call(self, patched_plugin, orig, *args, **kwargs):
"""Invoked by test cases for injecting failures in plugin."""
def second_call(*args, **kwargs):
raise n_exc.NeutronException()
'create_port') as patched_plugin:
def side_effect(*args, **kwargs):
- return self._do_side_effect(patched_plugin, orig,
- *args, **kwargs)
+ return self._fail_second_call(patched_plugin, orig,
+ *args, **kwargs)
patched_plugin.side_effect = side_effect
with self.network() as net:
'create_port') as patched_plugin:
def side_effect(*args, **kwargs):
- return self._do_side_effect(patched_plugin, orig,
- *args, **kwargs)
+ return self._fail_second_call(patched_plugin, orig,
+ *args, **kwargs)
patched_plugin.side_effect = side_effect
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
self.assertEqual(res.status_int,
webob.exc.HTTPClientError.code)
+ def test_delete_ports_by_device_id(self):
+ plugin = NeutronManager.get_plugin()
+ ctx = context.get_admin_context()
+ with self.subnet() as subnet:
+ with contextlib.nested(
+ self.port(subnet=subnet, device_id='owner1', no_delete=True),
+ self.port(subnet=subnet, device_id='owner1', no_delete=True),
+ self.port(subnet=subnet, device_id='owner2'),
+ ) as (p1, p2, p3):
+ network_id = subnet['subnet']['network_id']
+ plugin.delete_ports_by_device_id(ctx, 'owner1',
+ network_id)
+ self._show('ports', p1['port']['id'],
+ expected_code=webob.exc.HTTPNotFound.code)
+ self._show('ports', p2['port']['id'],
+ expected_code=webob.exc.HTTPNotFound.code)
+ self._show('ports', p3['port']['id'],
+ expected_code=webob.exc.HTTPOk.code)
+
+ def _test_delete_ports_by_device_id_second_call_failure(self, plugin):
+ ctx = context.get_admin_context()
+ with self.subnet() as subnet:
+ with contextlib.nested(
+ self.port(subnet=subnet, device_id='owner1', no_delete=True),
+ self.port(subnet=subnet, device_id='owner1'),
+ self.port(subnet=subnet, device_id='owner2'),
+ ) as (p1, p2, p3):
+ orig = plugin.delete_port
+ with mock.patch.object(plugin, 'delete_port') as del_port:
+
+ def side_effect(*args, **kwargs):
+ return self._fail_second_call(del_port, orig,
+ *args, **kwargs)
+
+ del_port.side_effect = side_effect
+ network_id = subnet['subnet']['network_id']
+ self.assertRaises(n_exc.NeutronException,
+ plugin.delete_ports_by_device_id,
+ ctx, 'owner1', network_id)
+ self._show('ports', p1['port']['id'],
+ expected_code=webob.exc.HTTPNotFound.code)
+ self._show('ports', p2['port']['id'],
+ expected_code=webob.exc.HTTPOk.code)
+ self._show('ports', p3['port']['id'],
+ expected_code=webob.exc.HTTPOk.code)
+
+ def test_delete_ports_by_device_id_second_call_failure(self):
+ plugin = NeutronManager.get_plugin()
+ self._test_delete_ports_by_device_id_second_call_failure(plugin)
+
class TestNetworksV2(NeutronDbPluginV2TestCase):
# NOTE(cerberus): successful network update and delete are
'create_network') as patched_plugin:
def side_effect(*args, **kwargs):
- return self._do_side_effect(patched_plugin, orig,
- *args, **kwargs)
+ return self._fail_second_call(patched_plugin, orig,
+ *args, **kwargs)
patched_plugin.side_effect = side_effect
res = self._create_network_bulk(self.fmt, 2, 'test', True)
'create_network') as patched_plugin:
def side_effect(*args, **kwargs):
- return self._do_side_effect(patched_plugin, orig,
- *args, **kwargs)
+ return self._fail_second_call(patched_plugin, orig,
+ *args, **kwargs)
patched_plugin.side_effect = side_effect
res = self._create_network_bulk(self.fmt, 2, 'test', True)
'create_subnet') as patched_plugin:
def side_effect(*args, **kwargs):
- self._do_side_effect(patched_plugin, orig,
- *args, **kwargs)
+ self._fail_second_call(patched_plugin, orig,
+ *args, **kwargs)
patched_plugin.side_effect = side_effect
with self.network() as net:
with mock.patch.object(NeutronManager._instance.plugin,
'create_subnet') as patched_plugin:
def side_effect(*args, **kwargs):
- return self._do_side_effect(patched_plugin, orig,
- *args, **kwargs)
+ return self._fail_second_call(patched_plugin, orig,
+ *args, **kwargs)
patched_plugin.side_effect = side_effect
with self.network() as net: