1 # Copyright 2013 OpenStack Foundation
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
16 from oslo_serialization import jsonutils as json
18 from neutron.tests.tempest.common import service_client
21 class CredentialsClientJSON(service_client.ServiceClient):
24 def create_credential(self, access_key, secret_key, user_id, project_id):
25 """Creates a credential."""
26 blob = "{\"access\": \"%s\", \"secret\": \"%s\"}" % (
27 access_key, secret_key)
30 "project_id": project_id,
34 post_body = json.dumps({'credential': post_body})
35 resp, body = self.post('credentials', post_body)
36 self.expected_success(201, resp.status)
37 body = json.loads(body)
38 body['credential']['blob'] = json.loads(body['credential']['blob'])
39 return service_client.ResponseBody(resp, body['credential'])
41 def update_credential(self, credential_id, **kwargs):
42 """Updates a credential."""
43 body = self.get_credential(credential_id)
44 cred_type = kwargs.get('type', body['type'])
45 access_key = kwargs.get('access_key', body['blob']['access'])
46 secret_key = kwargs.get('secret_key', body['blob']['secret'])
47 project_id = kwargs.get('project_id', body['project_id'])
48 user_id = kwargs.get('user_id', body['user_id'])
49 blob = "{\"access\": \"%s\", \"secret\": \"%s\"}" % (
50 access_key, secret_key)
53 "project_id": project_id,
57 post_body = json.dumps({'credential': post_body})
58 resp, body = self.patch('credentials/%s' % credential_id, post_body)
59 self.expected_success(200, resp.status)
60 body = json.loads(body)
61 body['credential']['blob'] = json.loads(body['credential']['blob'])
62 return service_client.ResponseBody(resp, body['credential'])
64 def get_credential(self, credential_id):
65 """To GET Details of a credential."""
66 resp, body = self.get('credentials/%s' % credential_id)
67 self.expected_success(200, resp.status)
68 body = json.loads(body)
69 body['credential']['blob'] = json.loads(body['credential']['blob'])
70 return service_client.ResponseBody(resp, body['credential'])
72 def list_credentials(self):
73 """Lists out all the available credentials."""
74 resp, body = self.get('credentials')
75 self.expected_success(200, resp.status)
76 body = json.loads(body)
77 return service_client.ResponseBodyList(resp, body['credentials'])
79 def delete_credential(self, credential_id):
80 """Deletes a credential."""
81 resp, body = self.delete('credentials/%s' % credential_id)
82 self.expected_success(204, resp.status)
83 return service_client.ResponseBody(resp, body)