pass
@abstractmethod
- def set_binding(self, segment_id, vif_type, vif_details):
+ def set_binding(self, segment_id, vif_type, vif_details,
+ status=None):
"""Set the binding for the port.
:param segment_id: Network segment bound for the port.
:param vif_type: The VIF type for the bound port.
:param vif_details: Dictionary with details for VIF driver.
+ :param status: Port status to set if not None.
Called by MechanismDriver.bind_port to indicate success and
specify binding details to use for port. The segment_id must
else:
self._original_bound_segment_id = None
self._original_bound_driver = None
+ self._new_port_status = None
@property
def current(self):
filters={'agent_type': [agent_type],
'host': [self._binding.host]})
- def set_binding(self, segment_id, vif_type, vif_details):
+ def set_binding(self, segment_id, vif_type, vif_details,
+ status=None):
# TODO(rkukura) Verify binding allowed, segment in network
self._binding.segment = segment_id
self._binding.vif_type = vif_type
self._binding.vif_details = jsonutils.dumps(vif_details)
+ self._new_port_status = status
from oslo.config import cfg
import requests
+from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc
from neutron.common import utils
from neutron.extensions import portbindings
if self.check_segment(segment):
context.set_binding(segment[api.ID],
self.vif_type,
- self.vif_details)
+ self.vif_details,
+ status=n_const.PORT_STATUS_ACTIVE)
LOG.debug(_("Bound using segment: %s"), segment)
return
else:
self.mechanism_manager.bind_port(mech_context)
self._update_port_dict_binding(port, binding)
+ # Update the port status if requested by the bound driver.
+ if binding.segment and mech_context._new_port_status:
+ # REVISIT(rkukura): This function is currently called
+ # inside a transaction with the port either newly
+ # created or locked for update. After the fix for bug
+ # 1276391 is merged, this will no longer be true, and
+ # the port status update will need to be handled in
+ # the transaction that commits the new binding.
+ port_db = db.get_port(mech_context._plugin_context.session,
+ port['id'])
+ port_db.status = mech_context._new_port_status
+ port['status'] = mech_context._new_port_status
+
return ret_value
def _update_port_dict_binding(self, port, binding):
# License for the specific language governing permissions and limitations
# under the License.
+from neutron.common import constants as const
from neutron.extensions import portbindings
from neutron.plugins.ml2 import driver_api as api
context.set_binding(segment, portbindings.VIF_TYPE_BRIDGE,
{portbindings.CAP_PORT_FILTER: True})
self.bound_ports.add(context.current['id'])
+ elif host == "host-ovs-filter-active":
+ context.set_binding(segment, portbindings.VIF_TYPE_OVS,
+ {portbindings.CAP_PORT_FILTER: True},
+ status=const.PORT_STATUS_ACTIVE)
+ self.bound_ports.add(context.current['id'])
self.plugin = manager.NeutronManager.get_plugin()
self.plugin.start_rpc_listener()
- def _check_response(self, port, vif_type, has_port_filter, bound):
+ def _check_response(self, port, vif_type, has_port_filter, bound, status):
self.assertEqual(port[portbindings.VIF_TYPE], vif_type)
vif_details = port[portbindings.VIF_DETAILS]
+ port_status = port['status']
if bound:
# TODO(rkukura): Replace with new VIF security details
self.assertEqual(vif_details[portbindings.CAP_PORT_FILTER],
has_port_filter)
+ self.assertEqual(port_status, status or 'DOWN')
+ else:
+ self.assertEqual(port_status, 'DOWN')
- def _test_port_binding(self, host, vif_type, has_port_filter, bound):
+ def _test_port_binding(self, host, vif_type, has_port_filter, bound,
+ status=None):
host_arg = {portbindings.HOST_ID: host}
with self.port(name='name', arg_list=(portbindings.HOST_ID,),
**host_arg) as port:
self._check_response(port['port'], vif_type, has_port_filter,
- bound)
+ bound, status)
port_id = port['port']['id']
neutron_context = context.get_admin_context()
details = self.plugin.callbacks.get_device_details(
portbindings.VIF_TYPE_BRIDGE,
True, True)
+ def test_binding_status_active(self):
+ self._test_port_binding("host-ovs-filter-active",
+ portbindings.VIF_TYPE_OVS,
+ True, True, 'ACTIVE')
+
def _test_update_port_binding(self, host, new_host=None):
with mock.patch.object(self.plugin,
'_notify_port_updated') as notify_mock: