from quantum.db import dhcp_rpc_base
from quantum.db import l3_db
from quantum.extensions import l3
+from quantum.extensions import portbindings
from quantum.openstack.common import lockutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.plugins.bigswitch.version import version_string_with_vcs
+from quantum import policy
LOG = logging.getLogger(__name__)
class QuantumRestProxyV2(db_base_plugin_v2.QuantumDbPluginV2,
l3_db.L3_NAT_db_mixin):
- supported_extension_aliases = ["router"]
+ supported_extension_aliases = ["router", "binding"]
+
+ binding_view = "extension:port_binding:view"
+ binding_set = "extension:port_binding:set"
def __init__(self):
LOG.info(_('QuantumRestProxy: Starting plugin. Version=%s'),
# Set port state up and return that port
port_update = {"port": {"admin_state_up": True}}
- return super(QuantumRestProxyV2, self).update_port(context,
- new_port["id"],
- port_update)
+ new_port = super(QuantumRestProxyV2, self).update_port(context,
+ new_port["id"],
+ port_update)
+ return self._extend_port_dict_binding(context, new_port)
+
+ def get_port(self, context, id, fields=None):
+ with context.session.begin(subtransactions=True):
+ port = super(QuantumRestProxyV2, self).get_port(context, id,
+ fields)
+ self._extend_port_dict_binding(context, port)
+ return self._fields(port, fields)
+
+ def get_ports(self, context, filters=None, fields=None):
+ with context.session.begin(subtransactions=True):
+ ports = super(QuantumRestProxyV2, self).get_ports(context, filters,
+ fields)
+ for port in ports:
+ self._extend_port_dict_binding(context, port)
+ return [self._fields(port, fields) for port in ports]
def update_port(self, context, port_id, port):
"""Update values of a port.
raise
# return new_port
- return new_port
+ return self._extend_port_dict_binding(context, new_port)
def delete_port(self, context, port_id, l3_port_check=True):
"""Delete a port.
}
return data
+
+ def _check_view_auth(self, context, resource, action):
+ return policy.check(context, action, resource)
+
+ def _enforce_set_auth(self, context, resource, action):
+ policy.enforce(context, action, resource)
+
+ def _extend_port_dict_binding(self, context, port):
+ if self._check_view_auth(context, port, self.binding_view):
+ port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_OVS
+ port[portbindings.CAPABILITIES] = {
+ portbindings.CAP_PORT_FILTER:
+ 'security-group' in self.supported_extension_aliases}
+ return port
from mock import patch
import quantum.common.test_lib as test_lib
+from quantum.extensions import portbindings
from quantum.manager import QuantumManager
+from quantum.tests.unit import _test_extension_portbindings as test_bindings
import quantum.tests.unit.test_db_plugin as test_plugin
class TestBigSwitchProxyPortsV2(test_plugin.TestPortsV2,
- BigSwitchProxyPluginV2TestCase):
+ BigSwitchProxyPluginV2TestCase,
+ test_bindings.PortBindingsTestCase):
- pass
+ VIF_TYPE = portbindings.VIF_TYPE_OVS
+ HAS_PORT_FILTER = False
class TestBigSwitchProxyNetworksV2(test_plugin.TestNetworksV2,