import abc
import collections
+from oslo_concurrency import lockutils
from oslo_config import cfg
import six
topic = resources_rpc.resource_type_versioned_topic(resource_type)
connection.create_consumer(topic, endpoints, fanout=True)
+ @lockutils.synchronized('qos-port')
def _handle_notification(self, qos_policy, event_type):
# server does not allow to remove a policy that is attached to any
# port, so we ignore DELETED events. Also, if we receive a CREATED
if event_type == events.UPDATED:
self._process_update_policy(qos_policy)
+ @lockutils.synchronized('qos-port')
def handle_port(self, context, port):
"""Handle agent QoS extension for port.
port_id in self.qos_policy_ports[qos_policy_id]):
return
- # TODO(QoS): handle race condition between push and pull APIs
self.qos_policy_ports[qos_policy_id][port_id] = port
self.known_ports.add(port_id)
qos_policy = self.resource_rpc.pull(