from oslo.utils import timeutils
from neutron.agent.common import config
+from neutron.agent.l3 import event_observers
from neutron.agent.l3 import ha
from neutron.agent.l3 import link_local_allocator as lla
from neutron.agent.l3 import router_info
from neutron.openstack.common import processutils
from neutron.openstack.common import service
from neutron import service as neutron_service
+from neutron.services import advanced_service as adv_svc
try:
from neutron_fwaas.services.firewall.agents.l3reference \
import firewall_l3_agent
self.fip_priorities = set(range(FIP_PR_START, FIP_PR_END))
self._queue = queue.RouterProcessingQueue()
+ self.event_observers = event_observers.L3EventObservers()
super(L3NATAgent, self).__init__(conf=self.conf)
self.target_ex_net_id = None
router=router,
use_ipv6=self.use_ipv6,
ns_name=ns_name)
+ self.event_observers.notify(
+ adv_svc.AdvancedService.before_router_added, ri)
+
self.router_info[router_id] = ri
if self.conf.use_namespaces:
self._create_router_namespace(ri)
"Skipping router removal"), router_id)
return
+ self.event_observers.notify(
+ adv_svc.AdvancedService.before_router_removed, ri)
+
if ri.is_ha:
self.process_ha_router_removed(ri)
del self.router_info[router_id]
self._destroy_router_namespace(ri.ns_name)
+ self.event_observers.notify(
+ adv_svc.AdvancedService.after_router_removed, ri)
+
def _get_metadata_proxy_callback(self, router_id):
def callback(pid_file):
router_id=router['id'])
if router['id'] not in self.router_info:
- self._router_added(router['id'], router)
+ self._process_added_router(router)
+ else:
+ self._process_updated_router(router)
+
+ def _process_added_router(self, router):
+ # TODO(pcm): Next refactoring will rework this logic
+ self._router_added(router['id'], router)
+ ri = self.router_info[router['id']]
+ ri.router = router
+ self.process_router(ri)
+ self.event_observers.notify(
+ adv_svc.AdvancedService.after_router_added, ri)
+
+ def _process_updated_router(self, router):
+ # TODO(pcm): Next refactoring will rework this logic
ri = self.router_info[router['id']]
ri.router = router
+ self.event_observers.notify(
+ adv_svc.AdvancedService.before_router_updated, ri)
self.process_router(ri)
+ self.event_observers.notify(
+ adv_svc.AdvancedService.after_router_updated, ri)
def _process_router_update(self):
for rp, update in self._queue.each_update_to_next_router():
--- /dev/null
+# Copyright 2014 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+class L3EventObservers(object):
+
+ """Manages observers for L3 agent events."""
+
+ def __init__(self):
+ self.observers = set()
+
+ def add(self, observer):
+ """Add a listener for L3 agent notifications."""
+ self.observers.add(observer)
+
+ def notify(self, l3_event_action, *args, **kwargs):
+ """Give interested parties a chance to act on event.
+
+ NOTE: Preserves existing behavior for error propagation.
+ """
+ method_name = l3_event_action.__name__
+ for observer in self.observers:
+ getattr(observer, method_name)(*args, **kwargs)
--- /dev/null
+# Copyright 2014 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.openstack.common import lockutils
+from neutron.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+class AdvancedService(object):
+ """Observer base class for Advanced Services.
+
+ Base class for service types. This should not be instantiated normally.
+ Instead, a child class is defined for each service type and instantiated
+ by the corresponding service agent. The instances will have a back
+ reference to the L3 agent, and will register as an observer of events.
+ A singleton is used to create only one service object per service type.
+
+ This base class provides a definition for all of the L3 event handlers
+ that a service could "observe". A child class for a service type will
+ implement handlers, for events of interest.
+ """
+
+ _instance = None
+
+ def __init__(self, l3_agent):
+ """Base class for an advanced service.
+
+ Do not directly instantiate objects of this class. Should only be
+ called indirectly by a child class's instance() invocation.
+ """
+ self.l3_agent = l3_agent
+ # NOTE: Copying L3 agent attributes, so that they are accessible
+ # from device drivers, which are now provided a service instance.
+ # TODO(pcm): Address this in future refactorings.
+ self.conf = l3_agent.conf
+ self.root_helper = l3_agent.root_helper
+
+ @classmethod
+ def instance(cls, l3_agent):
+ """Creates instance (singleton) of service.
+
+ Do not directly call this for the base class. Instead, it should be
+ called by a child class, that represents a specific service type.
+
+ This ensures that only one instance is created for all agents of a
+ specific service type.
+ """
+ if not cls._instance:
+ with lockutils.lock('instance'):
+ if not cls._instance:
+ cls._instance = cls(l3_agent)
+
+ return cls._instance
+
+ # NOTE: Handler definitions for events generated by the L3 agent.
+ # Subclasses of AdvancedService can override these to perform service
+ # specific actions. Unique methods are defined for add/update, as
+ # some services may want to take different actions.
+ def before_router_added(self, ri):
+ """Actions taken before router_info created."""
+ pass
+
+ def after_router_added(self, ri):
+ """Actions taken after router_info created."""
+ pass
+
+ def before_router_updated(self, ri):
+ """Actions before processing for an updated router."""
+ pass
+
+ def after_router_updated(self, ri):
+ """Actions add processing for an updated router."""
+ pass
+
+ def before_router_removed(self, ri):
+ """Actions before removing router."""
+ pass
+
+ def after_router_removed(self, ri):
+ """Actions after processing and removing router."""
+ pass
from neutron.common import constants as l3_constants
from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
+from neutron.services import advanced_service as adv_svc
from neutron.tests.common.agents import l3_agent as l3_test_agent
from neutron.tests.functional.agent.linux import base
from neutron.tests.unit import test_l3_agent
return ri
def _create_router(self, agent, router):
- agent._router_added(router['id'], router)
- ri = agent.router_info[router['id']]
- ri.router = router
- agent.process_router(ri)
- return ri
+ agent._process_added_router(router)
+ return agent.router_info[router['id']]
def _delete_router(self, agent, router_id):
agent._router_removed(router_id)
class L3AgentTestCase(L3AgentTestFramework):
+ def test_observer_notifications_legacy_router(self):
+ self._test_observer_notifications(enable_ha=False)
+
+ def test_observer_notifications_ha_router(self):
+ self._test_observer_notifications(enable_ha=True)
+
+ def _test_observer_notifications(self, enable_ha):
+ """Test create, update, delete of router and notifications."""
+ with mock.patch.object(
+ self.agent.event_observers, 'notify') as notify:
+ router_info = self.generate_router_info(enable_ha)
+ router = self.manage_router(self.agent, router_info)
+ self.agent._process_updated_router(router.router)
+ self._delete_router(self.agent, router.router_id)
+
+ calls = notify.call_args_list
+ self.assertEqual(
+ [((adv_svc.AdvancedService.before_router_added, router),),
+ ((adv_svc.AdvancedService.after_router_added, router),),
+ ((adv_svc.AdvancedService.before_router_updated, router),),
+ ((adv_svc.AdvancedService.after_router_updated, router),),
+ ((adv_svc.AdvancedService.before_router_removed, router),),
+ ((adv_svc.AdvancedService.after_router_removed, router),)],
+ calls)
+
def test_legacy_router_lifecycle(self):
self._router_lifecycle(enable_ha=False)
--- /dev/null
+# Copyright 2014 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from neutron.agent.l3 import event_observers
+from neutron.services import advanced_service as adv_svc
+from neutron.tests import base
+
+
+class DummyService1(adv_svc.AdvancedService):
+ def before_router_added(self, ri):
+ pass
+
+ def after_router_added(self, ri):
+ pass
+
+
+class DummyService2(adv_svc.AdvancedService):
+ def before_router_added(self, ri):
+ pass
+
+
+class TestL3EventObservers(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestL3EventObservers, self).setUp()
+ self.event_observers = event_observers.L3EventObservers()
+
+ def test_add_observer(self):
+ observer = object()
+ self.assertNotIn(observer, self.event_observers.observers)
+ self.event_observers.add(observer)
+ self.assertIn(observer, self.event_observers.observers)
+
+ def test_add_duplicate_observer_is_ignored(self):
+ observer = object()
+ self.event_observers.add(observer)
+ try:
+ self.event_observers.add(observer)
+ except Exception:
+ self.fail('Duplicate additions of observers should be ignored')
+ self.assertEqual(1, len(self.event_observers.observers))
+
+ def test_observers_in_service_notified(self):
+ """Test that correct handlers for multiple services are called."""
+ l3_agent = mock.Mock()
+ router_info = mock.Mock()
+ observer1 = DummyService1.instance(l3_agent)
+ observer2 = DummyService2.instance(l3_agent)
+ observer1_before_add = mock.patch.object(
+ DummyService1, 'before_router_added').start()
+ observer2_before_add = mock.patch.object(
+ DummyService2, 'before_router_added').start()
+
+ self.event_observers.add(observer1)
+ self.event_observers.add(observer2)
+ self.event_observers.notify(
+ adv_svc.AdvancedService.before_router_added, router_info)
+
+ observer1_before_add.assert_called_with(router_info)
+ observer2_before_add.assert_called_with(router_info)
--- /dev/null
+# Copyright 2014 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from neutron.agent.l3 import event_observers
+from neutron.services import advanced_service
+from neutron.tests import base
+
+
+class FakeServiceA(advanced_service.AdvancedService):
+ pass
+
+
+class FakeServiceB(advanced_service.AdvancedService):
+ pass
+
+
+class TestAdvancedService(base.BaseTestCase):
+
+ def setUp(self):
+ super(TestAdvancedService, self).setUp()
+ self.agent = mock.Mock()
+ self.test_observers = event_observers.L3EventObservers()
+ # Ensure no instances for each test
+ FakeServiceA._instance = None
+ FakeServiceB._instance = None
+
+ def test_create_service(self):
+ """Test agent saved and service added to observer list."""
+ my_service = FakeServiceA.instance(self.agent)
+ self.test_observers.add(my_service)
+ self.assertIn(my_service, self.test_observers.observers)
+ self.assertEqual(self.agent, my_service.l3_agent)
+
+ def test_service_is_singleton(self):
+ """Test that two services of same time use same instance."""
+ a1 = FakeServiceA.instance(self.agent)
+ a2 = FakeServiceA.instance(self.agent)
+ self.assertIs(a1, a2)
+
+ def test_shared_observers_for_different_services(self):
+ """Test different service type instances created.
+
+ The services are unique instances, with different agents, but
+ sharing the same observer list.
+ """
+ a = FakeServiceA.instance(self.agent)
+ self.test_observers.add(a)
+ self.assertEqual(self.agent, a.l3_agent)
+ self.assertIn(a, self.test_observers.observers)
+
+ another_agent = mock.Mock()
+ b = FakeServiceB.instance(another_agent)
+ self.test_observers.add(b)
+ self.assertNotEqual(a, b)
+ self.assertEqual(another_agent, b.l3_agent)
+ self.assertIn(b, self.test_observers.observers)
+ self.assertEqual(2, len(self.test_observers.observers))
+
+ def test_unique_observers_for_different_services(self):
+ """Test different service types with different observer lists.
+
+ The services are unique instances, shared the same agent, but
+ are using different observer lists.
+ """
+ a = FakeServiceA.instance(self.agent)
+ self.test_observers.add(a)
+ other_observers = event_observers.L3EventObservers()
+ b = FakeServiceB.instance(self.agent)
+ other_observers.add(b)
+
+ self.assertNotEqual(a, b)
+ self.assertEqual(self.agent, a.l3_agent)
+ self.assertIn(a, self.test_observers.observers)
+ self.assertEqual(1, len(self.test_observers.observers))
+
+ self.assertEqual(self.agent, b.l3_agent)
+ self.assertIn(b, other_observers.observers)
+ self.assertEqual(1, len(other_observers.observers))