from neutron.agent.l3 import dvr_fip_ns
from neutron.agent.l3 import dvr_snat_ns
+from neutron.agent.l3 import namespaces
LOG = logging.getLogger(__name__)
return fip_ns
def _destroy_fip_namespace(self, ns):
- ex_net_id = ns[len(dvr_fip_ns.FIP_NS_PREFIX):]
+ ex_net_id = namespaces.get_id_from_ns_name(ns)
fip_ns = self.get_fip_ns(ex_net_id)
fip_ns.delete()
class FipNamespace(namespaces.Namespace):
def __init__(self, ext_net_id, agent_conf, driver, use_ipv6):
- name = FIP_NS_PREFIX + ext_net_id
+ name = self._get_ns_name(ext_net_id)
super(FipNamespace, self).__init__(
name, agent_conf, driver, use_ipv6)
self.local_subnets = lla.LinkLocalAllocator(path, FIP_LL_SUBNET)
self.destroyed = False
+ @classmethod
+ def _get_ns_name(cls, ext_net_id):
+ return namespaces.build_ns_name(FIP_NS_PREFIX, ext_net_id)
+
def get_name(self):
- return (FIP_NS_PREFIX + self._ext_net_id)
+ return self._get_ns_name(self._ext_net_id)
def get_ext_device_name(self, port_id):
return (FIP_EXT_DEV_PREFIX + port_id)[:self.driver.DEV_NAME_LEN]
@classmethod
def get_snat_ns_name(cls, router_id):
- return (SNAT_NS_PREFIX + router_id)
+ return namespaces.build_ns_name(SNAT_NS_PREFIX, router_id)
def delete(self):
ns_ip = ip_lib.IPWrapper(namespace=self.name)
:param ns_name: The name of the namespace
:returns: tuple with prefix and id or None if no prefix matches
"""
- for prefix in [namespaces.NS_PREFIX, dvr_snat_ns.SNAT_NS_PREFIX]:
- if ns_name.startswith(prefix):
- return (prefix, ns_name[len(prefix):])
+ prefix = namespaces.get_prefix_from_ns_name(ns_name)
+ if prefix in (namespaces.NS_PREFIX, dvr_snat_ns.SNAT_NS_PREFIX):
+ identifier = namespaces.get_id_from_ns_name(ns_name)
+ return (prefix, identifier)
def is_managed(self, ns_name):
"""Return True if the namespace name passed belongs to this manager."""
ROUTER_2_FIP_DEV_PREFIX = 'rfp-'
+def build_ns_name(prefix, identifier):
+ """Builds a namespace name from the given prefix and identifier
+
+ :param prefix: The prefix which must end with '-' for legacy reasons
+ :param identifier: The id associated with the namespace
+ """
+ return prefix + identifier
+
+
+def get_prefix_from_ns_name(ns_name):
+ """Parses prefix from prefix-identifier
+
+ :param ns_name: The name of a namespace
+ :returns: The prefix ending with a '-' or None if there is no '-'
+ """
+ dash_index = ns_name.find('-')
+ if 0 <= dash_index:
+ return ns_name[:dash_index + 1]
+
+
+def get_id_from_ns_name(ns_name):
+ """Parses identifier from prefix-identifier
+
+ :param ns_name: The name of a namespace
+ :returns: Identifier or None if there is no - to end the prefix
+ """
+ dash_index = ns_name.find('-')
+ if 0 <= dash_index:
+ return ns_name[dash_index + 1:]
+
+
class Namespace(object):
def __init__(self, name, agent_conf, driver, use_ipv6):
super(RouterNamespace, self).__init__(
name, agent_conf, driver, use_ipv6)
- @staticmethod
- def _get_ns_name(router_id):
- return (NS_PREFIX + router_id)
+ @classmethod
+ def _get_ns_name(cls, router_id):
+ return build_ns_name(NS_PREFIX, router_id)
def delete(self):
ns_ip = ip_lib.IPWrapper(namespace=self.name)
# under the License.
from neutron.agent.l3 import agent as l3_agent
+from neutron.agent.l3 import namespaces
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.openstack.common import uuidutils
SingleNodeEnvironment(), *args, **kwargs)
def _get_namespace(self, router_id):
- return "%s%s" % (l3_agent.NS_PREFIX, router_id)
+ return namespaces.build_ns_name(l3_agent.NS_PREFIX, router_id)
def _assert_namespace_exists(self, ns_name):
ip = ip_lib.IPWrapper(ns_name)