# License for the specific language governing permissions and limitations
# under the License.
# @author: Somik Behera, Nicira Networks, Inc.
+# @author: Salvatore Orlando, Citrix
+
+import logging
from quantum.common import exceptions as exc
+LOG = logging.getLogger('quantum.plugins.SamplePlugin')
class QuantumEchoPlugin(object):
<network_uuid, network_name> for
the specified tenant.
"""
- print("get_all_networks() called\n")
+ LOG.debug("FakePlugin.get_all_networks() called")
return FakePlugin._networks.values()
def get_network_details(self, tenant_id, net_id):
retrieved a list of all the remote vifs that
are attached to the network
"""
- print("get_network_details() called\n")
+ LOG.debug("get_network_details() called")
return self._get_network(tenant_id, net_id)
def create_network(self, tenant_id, net_name):
Creates a new Virtual Network, and assigns it
a symbolic name.
"""
- print("create_network() called\n")
+ LOG.debug("FakePlugin.create_network() called")
FakePlugin._net_counter += 1
new_net_id = ("0" * (3 - len(str(FakePlugin._net_counter)))) + \
str(FakePlugin._net_counter)
- print new_net_id
new_net_dict = {'net-id': new_net_id,
'net-name': net_name,
'net-ports': {}}
Deletes the network with the specified network identifier
belonging to the specified tenant.
"""
- print("delete_network() called\n")
+ LOG.debug("FakePlugin.delete_network() called")
net = FakePlugin._networks.get(net_id)
# Verify that no attachments are plugged into the network
if net:
Updates the symbolic name belonging to a particular
Virtual Network.
"""
- print("rename_network() called\n")
+ LOG.debug("FakePlugin.rename_network() called")
net = self._get_network(tenant_id, net_id)
net['net-name'] = new_name
return net
Retrieves all port identifiers belonging to the
specified Virtual Network.
"""
- print("get_all_ports() called\n")
+ LOG.debug("FakePlugin.get_all_ports() called")
network = self._get_network(tenant_id, net_id)
ports_on_net = network['net-ports'].values()
return ports_on_net
This method allows the user to retrieve a remote interface
that is attached to this particular port.
"""
- print("get_port_details() called\n")
+ LOG.debug("FakePlugin.get_port_details() called")
return self._get_port(tenant_id, net_id, port_id)
def create_port(self, tenant_id, net_id, port_state=None):
"""
Creates a port on the specified Virtual Network.
"""
- print("create_port() called\n")
+ LOG.debug("FakePlugin.create_port() called")
net = self._get_network(tenant_id, net_id)
# check port state
# TODO(salvatore-orlando): Validate port state in API?
"""
Updates the state of a port on the specified Virtual Network.
"""
- print("create_port() called\n")
+ LOG.debug("FakePlugin.update_port() called")
port = self._get_port(tenant_id, net_id, port_id)
self._validate_port_state(port_state)
port['port-state'] = port_state
the remote interface is first un-plugged and then the port
is deleted.
"""
- print("delete_port() called\n")
+ LOG.debug("FakePlugin.delete_port() called")
net = self._get_network(tenant_id, net_id)
port = self._get_port(tenant_id, net_id, port_id)
if port['attachment']:
Attaches a remote interface to the specified port on the
specified Virtual Network.
"""
- print("plug_interface() called\n")
+ LOG.debug("FakePlugin.plug_interface() called")
# Validate attachment
self._validate_attachment(tenant_id, net_id, port_id,
remote_interface_id)
Detaches a remote interface from the specified port on the
specified Virtual Network.
"""
- print("unplug_interface() called\n")
+ LOG.debug("FakePlugin.unplug_interface() called")
port = self._get_port(tenant_id, net_id, port_id)
# TODO(salvatore-orlando):
# Should unplug on port without attachment raise an Error?
port['attachment'] = None
-
- def get_interface_details(self, tenant_id, net_id, port_id):
- """
- Get Attachment details
- """
- print("get_interface_details() called\n")
- port = self._get_port(tenant_id, net_id, port_id)
- return port["attachment"]
def setUp(self):
self.client = MiniClient(HOST, PORT, USE_SSL)
- def create_network(self, data, tenant_id=TENANT_ID):
- content_type = "application/" + FORMAT
- body = Serializer().serialize(data, content_type)
- res = self.client.do_request(tenant_id, 'POST', "/networks." + FORMAT,
- body=body)
- self.assertEqual(res.status, 200, "bad response: %s" % res.read())
-
- def test_listNetworks(self):
- self.create_network(test_network1_data)
- self.create_network(test_network2_data)
- res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
- self.assertEqual(res.status, 200, "bad response: %s" % res.read())
-
- def test_getNonexistentNetwork(self):
+ #def create_network(self, data, tenant_id=TENANT_ID):
+ # content_type = "application/" + FORMAT
+ # body = Serializer().serialize(data, content_type)
+ # res = self.client.do_request(tenant_id, 'POST', "/networks." + FORMAT,
+ # body=body)
+ # self.assertEqual(res.status, 200, "bad response: %s" % res.read())
+
+ #def test_listNetworks(self):
+ # self.create_network(test_network1_data)
+ # self.create_network(test_network2_data)
+ # res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
+ # self.assertEqual(res.status, 200, "bad response: %s" % res.read())
+
+ #def test_getNonexistentNetwork(self):
# TODO(bgh): parse exception and make sure it is NetworkNotFound
- try:
- res = self.client.do_request(TENANT_ID, 'GET',
- "/networks/%s.%s" % ("8675309", "xml"))
- self.assertEqual(res.status, 400)
- except Exception, e:
- print "Caught exception: %s" % (str(e))
-
- def test_deleteNonexistentNetwork(self):
+ #try:
+ # res = self.client.do_request(TENANT_ID, 'GET',
+ # "/networks/%s.%s" % ("8675309", "xml"))
+ # self.assertEqual(res.status, 400)
+ #except Exception, e:
+ # print "Caught exception: %s" % (str(e))
+
+ #def test_deleteNonexistentNetwork(self):
# TODO(bgh): parse exception and make sure it is NetworkNotFound
- try:
- res = self.client.do_request(TENANT_ID, 'DELETE',
- "/networks/%s.%s" % ("8675309", "xml"))
- self.assertEqual(res.status, 400)
- except Exception, e:
- print "Caught exception: %s" % (str(e))
-
- def test_createNetwork(self):
- self.create_network(test_network1_data)
-
- def test_createPort(self):
- self.create_network(test_network1_data)
- res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
- resdict = json.loads(res.read())
- for n in resdict["networks"]:
- net_id = n["id"]
+ #try:
+ # res = self.client.do_request(TENANT_ID, 'DELETE',
+ # "/networks/%s.%s" % ("8675309", "xml"))
+ # self.assertEqual(res.status, 400)
+ #except Exception, e:
+ # print "Caught exception: %s" % (str(e))
+
+ #def test_createNetwork(self):
+ #self.create_network(test_network1_data)
+
+ #def test_createPort(self):
+ #self.create_network(test_network1_data)
+ #res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
+ #resdict = json.loads(res.read())
+ #for n in resdict["networks"]:
+ # net_id = n["id"]
# Step 1 - List Ports for network (should not find any)
- res = self.client.do_request(TENANT_ID, 'GET',
- "/networks/%s/ports.%s" % (net_id, FORMAT))
- output = res.read()
- self.assertEqual(res.status, 200, "Bad response: %s" % output)
- if len(output) > 0:
- resdict = json.loads(output)
- self.assertTrue(len(resdict["ports"]) == 0,
- "Found unexpected ports: %s" % output)
- else:
- self.assertTrue(len(output) == 0,
- "Found unexpected ports: %s" % output)
+ #res = self.client.do_request(TENANT_ID, 'GET',
+ # "/networks/%s/ports.%s" % (net_id, FORMAT))
+ #output = res.read()
+ #self.assertEqual(res.status, 200, "Bad response: %s" % output)
+ #if len(output) > 0:
+ # resdict = json.loads(output)
+ # self.assertTrue(len(resdict["ports"]) == 0,
+ # "Found unexpected ports: %s" % output)
+ #else:
+ # self.assertTrue(len(output) == 0,
+ # "Found unexpected ports: %s" % output)
# Step 2 - Create Port for network
- res = self.client.do_request(TENANT_ID, 'POST',
- "/networks/%s/ports.%s" % (net_id, FORMAT))
- self.assertEqual(res.status, 200, "Bad response: %s" % output)
+ #res = self.client.do_request(TENANT_ID, 'POST',
+ # "/networks/%s/ports.%s" % (net_id, FORMAT))
+ #self.assertEqual(res.status, 200, "Bad response: %s" % output)
# Step 3 - List Ports for network (again); should find one
- res = self.client.do_request(TENANT_ID, 'GET',
- "/networks/%s/ports.%s" % (net_id, FORMAT))
- output = res.read()
- self.assertEqual(res.status, 200, "Bad response: %s" % output)
- resdict = json.loads(output)
- ids = []
- for p in resdict["ports"]:
- ids.append(p["id"])
- self.assertTrue(len(ids) == 1,
- "Didn't find expected # of ports (1): %s" % ids)
-
- def test_getAttachment(self):
- self.create_network(test_network1_data)
- res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
- resdict = json.loads(res.read())
- for n in resdict["networks"]:
- net_id = n["id"]
+ #res = self.client.do_request(TENANT_ID, 'GET',
+ # "/networks/%s/ports.%s" % (net_id, FORMAT))
+ #output = res.read()
+ #self.assertEqual(res.status, 200, "Bad response: %s" % output)
+ #resdict = json.loads(output)
+ #ids = []
+ #for p in resdict["ports"]:
+ # ids.append(p["id"])
+ #self.assertTrue(len(ids) == 1,
+ # "Didn't find expected # of ports (1): %s" % ids)
+
+ #def test_getAttachment(self):
+ #self.create_network(test_network1_data)
+ #res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
+ #resdict = json.loads(res.read())
+ #for n in resdict["networks"]:
+ # net_id = n["id"]
# Step 1 - Create Port for network and attempt to get the
# attachment (even though there isn't one)
- res = self.client.do_request(TENANT_ID, 'POST',
- "/networks/%s/ports.%s" % (net_id, FORMAT))
- output = res.read()
- self.assertEqual(res.status, 200, "Bad response: %s" % output)
- resdict = json.loads(output)
- port_id = resdict["ports"]["port"]["id"]
-
- res = self.client.do_request(TENANT_ID, 'GET',
- "/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
- FORMAT))
- output = res.read()
- self.assertEqual(res.status, 200, "Bad response: %s" % output)
+ #res = self.client.do_request(TENANT_ID, 'POST',
+ # "/networks/%s/ports.%s" % (net_id, FORMAT))
+ #output = res.read()
+ #self.assertEqual(res.status, 200, "Bad response: %s" % output)
+ #resdict = json.loads(output)
+ #port_id = resdict["ports"]["port"]["id"]
+
+ #res = self.client.do_request(TENANT_ID, 'GET',
+ # "/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
+ # FORMAT))
+ #output = res.read()
+ #self.assertEqual(res.status, 200, "Bad response: %s" % output)
# Step 2 - Add an attachment
- data = {'port': {'attachment-id': 'fudd'}}
- content_type = "application/" + FORMAT
- body = Serializer().serialize(data, content_type)
- res = self.client.do_request(TENANT_ID, 'PUT',
- "/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
- FORMAT), body=body)
- output = res.read()
- self.assertEqual(res.status, 202, "Bad response: %s" % output)
+ #data = {'port': {'attachment-id': 'fudd'}}
+ #content_type = "application/" + FORMAT
+ #body = Serializer().serialize(data, content_type)
+ #res = self.client.do_request(TENANT_ID, 'PUT',
+ # "/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
+ # FORMAT), body=body)
+ #output = res.read()
+ #self.assertEqual(res.status, 202, "Bad response: %s" % output)
# Step 3 - Fetch the attachment
- res = self.client.do_request(TENANT_ID, 'GET',
- "/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
- FORMAT))
- output = res.read()
- self.assertEqual(res.status, 200, "Bad response: %s" % output)
- resdict = json.loads(output)
- attachment = resdict["attachment"]
- self.assertEqual(attachment, "fudd", "Attachment: %s" % attachment)
-
- def test_renameNetwork(self):
- self.create_network(test_network1_data)
- res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
- resdict = json.loads(res.read())
- net_id = resdict["networks"][0]["id"]
-
- data = test_network1_data.copy()
- data['network']['network-name'] = 'test_renamed'
- content_type = "application/" + FORMAT
- body = Serializer().serialize(data, content_type)
- res = self.client.do_request(TENANT_ID, 'PUT',
- "/networks/%s.%s" % (net_id, FORMAT), body=body)
- resdict = json.loads(res.read())
- self.assertTrue(resdict["networks"]["network"]["id"] == net_id,
- "Network_rename: renamed network has a different uuid")
- self.assertTrue(
- resdict["networks"]["network"]["name"] == "test_renamed",
- "Network rename didn't take effect")
-
- def test_createNetworkOnMultipleTenants(self):
+ #res = self.client.do_request(TENANT_ID, 'GET',
+ # "/networks/%s/ports/%s/attachment.%s" % (net_id, port_id,
+ # FORMAT))
+ #output = res.read()
+ #self.assertEqual(res.status, 200, "Bad response: %s" % output)
+ #resdict = json.loads(output)
+ #attachment = resdict["attachment"]
+ #self.assertEqual(attachment, "fudd", "Attachment: %s" % attachment)
+
+ #def test_renameNetwork(self):
+ #self.create_network(test_network1_data)
+ #res = self.client.do_request(TENANT_ID, 'GET', "/networks." + FORMAT)
+ #resdict = json.loads(res.read())
+ #net_id = resdict["networks"][0]["id"]
+
+ #data = test_network1_data.copy()
+ #data['network']['network-name'] = 'test_renamed'
+ #content_type = "application/" + FORMAT
+ #body = Serializer().serialize(data, content_type)
+ #res = self.client.do_request(TENANT_ID, 'PUT',
+ #"/networks/%s.%s" % (net_id, FORMAT), body=body)
+ #resdict = json.loads(res.read())
+ #self.assertTrue(resdict["networks"]["network"]["id"] == net_id,
+ #"Network_rename: renamed network has a different uuid")
+ #self.assertTrue(
+ #resdict["networks"]["network"]["name"] == "test_renamed",
+ #"Network rename didn't take effect")
+
+ #def test_createNetworkOnMultipleTenants(self):
# Create the same network on multiple tenants
- self.create_network(test_network1_data, "tenant1")
- self.create_network(test_network1_data, "tenant2")
+ #self.create_network(test_network1_data, "tenant1")
+ #self.create_network(test_network1_data, "tenant2")
- def delete_networks(self, tenant_id=TENANT_ID):
+ #def delete_networks(self, tenant_id=TENANT_ID):
# Remove all the networks created on the tenant (including ports and
# attachments)
- res = self.client.do_request(tenant_id, 'GET',
- "/networks." + FORMAT)
- resdict = json.loads(res.read())
- for n in resdict["networks"]:
- net_id = n["id"]
- # Delete all the ports
- res = self.client.do_request(tenant_id, 'GET',
- "/networks/%s/ports.%s" % (net_id, FORMAT))
- output = res.read()
- self.assertEqual(res.status, 200, "Bad response: %s" % output)
- resdict = json.loads(output)
- ids = []
- for p in resdict["ports"]:
- res = self.client.do_request(tenant_id, 'DELETE',
- "/networks/%s/ports/%s/attachment.%s" % (net_id, p["id"],
- FORMAT))
- res = self.client.do_request(tenant_id, 'DELETE',
- "/networks/%s/ports/%s.%s" % (net_id, p["id"], FORMAT))
+ #res = self.client.do_request(tenant_id, 'GET',
+ # "/networks." + FORMAT)
+ #resdict = json.loads(res.read())
+ #for n in resdict["networks"]:
+ # net_id = n["id"]
+ # # Delete all the ports
+ # res = self.client.do_request(tenant_id, 'GET',
+ # "/networks/%s/ports.%s" % (net_id, FORMAT))
+ # output = res.read()
+ # self.assertEqual(res.status, 200, "Bad response: %s" % output)
+ # resdict = json.loads(output)
+ # ids = []
+ # for p in resdict["ports"]:
+ # res = self.client.do_request(tenant_id, 'DELETE',
+ # "/networks/%s/ports/%s/attachment.%s" % (net_id, p["id"],
+ # FORMAT))
+ # res = self.client.do_request(tenant_id, 'DELETE',
+ # "/networks/%s/ports/%s.%s" % (net_id, p["id"], FORMAT))
# Now, remove the network
- res = self.client.do_request(tenant_id, 'DELETE',
- "/networks/" + net_id + "." + FORMAT)
- self.assertEqual(res.status, 202)
+ # res = self.client.do_request(tenant_id, 'DELETE',
+ # "/networks/" + net_id + "." + FORMAT)
+ # self.assertEqual(res.status, 202)
def tearDown(self):
self.delete_networks()