# error code
VCNS_ERROR_CODE_EDGE_NOT_RUNNING = 10013
+SUFFIX_LENGTH = 8
+
# router status by number
class RouterStatus(object):
context.session, pool_id, edge_id)
pool_vseid = poolid_map['pool_vseid']
return {
- 'name': vip.get('name'),
+ 'name': vip.get(
+ 'name', '') + vip['id'][-vcns_const.SUFFIX_LENGTH:],
'description': vip.get('description'),
'ipAddress': vip.get('address'),
'protocol': vip.get('protocol'),
vip_vse['defaultPoolId'])
return {
- 'name': vip_vse['name'],
+ 'name': vip_vse['name'][:-vcns_const.SUFFIX_LENGTH],
'address': vip_vse['ipAddress'],
'protocol': vip_vse['protocol'],
'protocol_port': vip_vse['port'],
def _convert_lb_pool(self, context, edge_id, pool, members):
vsepool = {
- 'name': pool.get('name'),
+ 'name': pool.get(
+ 'name', '') + pool['id'][-vcns_const.SUFFIX_LENGTH:],
'description': pool.get('description'),
'algorithm': BALANCE_MAP.get(
pool.get('lb_method'),
def _restore_lb_pool(self, context, edge_id, pool_vse):
#TODO(linb): Get more usefule info
return {
- 'name': pool_vse['name'],
+ 'name': pool_vse['name'][:-vcns_const.SUFFIX_LENGTH],
}
def _convert_lb_monitor(self, context, monitor):
break
return self.return_helper(header, response)
+ def is_name_unique(self, objs_dict, name):
+ return name not in [obj_dict['name']
+ for obj_dict in objs_dict.values()]
+
def create_vip(self, edge_id, vip_new):
+ header = {'status': 403}
+ response = ""
if not self._fake_virtualservers_dict.get(edge_id):
self._fake_virtualservers_dict[edge_id] = {}
+ if not self.is_name_unique(self._fake_virtualservers_dict[edge_id],
+ vip_new['name']):
+ return self.return_helper(header, response)
vip_vseid = uuidutils.generate_uuid()
self._fake_virtualservers_dict[edge_id][vip_vseid] = vip_new
header = {
'status': 204,
'location': "https://host/api/4.0/edges/edge_id"
"/loadbalancer/config/%s" % vip_vseid}
- response = ""
return self.return_helper(header, response)
def get_vip(self, edge_id, vip_vseid):
return self.return_helper(header, response)
def create_pool(self, edge_id, pool_new):
+ header = {'status': 403}
+ response = ""
if not self._fake_pools_dict.get(edge_id):
self._fake_pools_dict[edge_id] = {}
+ if not self.is_name_unique(self._fake_pools_dict[edge_id],
+ pool_new['name']):
+ return self.return_helper(header, response)
pool_vseid = uuidutils.generate_uuid()
self._fake_pools_dict[edge_id][pool_vseid] = pool_new
header = {
'status': 204,
'location': "https://host/api/4.0/edges/edge_id"
"/loadbalancer/config/%s" % pool_vseid}
- response = ""
return self.return_helper(header, response)
def get_pool(self, edge_id, pool_vseid):
protocol='TCP',
session_persistence={'type': 'HTTP_COOKIE'})
+ def test_create_vips_with_same_names(self):
+ new_router_id = self._create_and_get_router()
+ with self.subnet() as subnet:
+ net_id = subnet['subnet']['network_id']
+ self._set_net_external(net_id)
+ with contextlib.nested(
+ self.vip(
+ name='vip',
+ router_id=new_router_id,
+ subnet=subnet, protocol_port=80),
+ self.vip(
+ name='vip',
+ router_id=new_router_id,
+ subnet=subnet, protocol_port=81),
+ self.vip(
+ name='vip',
+ router_id=new_router_id,
+ subnet=subnet, protocol_port=82),
+ ) as (vip1, vip2, vip3):
+ req = self.new_list_request('vips')
+ res = self.deserialize(
+ self.fmt, req.get_response(self.ext_api))
+ for index in range(len(res['vips'])):
+ self.assertEqual(res['vips'][index]['name'], 'vip')
+
def test_update_vip(self):
name = 'new_vip'
router_id = self._create_and_get_router()
for k, v in vip_get.iteritems():
self.assertEqual(vip_create[k], v)
+ def test_create_two_vips_with_same_name(self):
+ ctx = context.get_admin_context()
+ with self.pool(no_delete=True) as pool:
+ self.pool_id = pool['pool']['id']
+ POOL_MAP_INFO['pool_id'] = pool['pool']['id']
+ vcns_db.add_vcns_edge_pool_binding(ctx.session, POOL_MAP_INFO)
+ with self.vip(pool=pool) as res:
+ vip_create = res['vip']
+ self.driver.create_vip(ctx, VSE_ID, vip_create)
+ self.assertRaises(vcns_exc.Forbidden,
+ self.driver.create_vip,
+ ctx, VSE_ID, vip_create)
+
def test_convert_app_profile(self):
app_profile_name = 'app_profile_name'
sess_persist1 = {'type': "SOURCE_IP"}
for k, v in pool_get.iteritems():
self.assertEqual(pool_create[k], v)
+ def test_create_two_pools_with_same_name(self):
+ ctx = context.get_admin_context()
+ with self.pool(no_delete=True) as p:
+ self.pool_id = p['pool']['id']
+ pool_create = p['pool']
+ self.driver.create_pool(ctx, VSE_ID, pool_create, [])
+ self.assertRaises(vcns_exc.Forbidden,
+ self.driver.create_pool,
+ ctx, VSE_ID, pool_create, [])
+
def test_update_pool(self):
ctx = context.get_admin_context()
with self.pool(no_delete=True) as p: