1 # Licensed under the Apache License, Version 2.0 (the "License"); you may
2 # not use this file except in compliance with the License. You may obtain
3 # a copy of the License at
5 # http://www.apache.org/licenses/LICENSE-2.0
7 # Unless required by applicable law or agreed to in writing, software
8 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10 # License for the specific language governing permissions and limitations
13 from oslo_serialization import jsonutils as json
14 from tempest_lib import exceptions as lib_exc
16 from neutron.tests.tempest.common import service_client
19 class IdentityClientJSON(service_client.ServiceClient):
21 def has_admin_extensions(self):
23 Returns True if the KSADM Admin Extensions are supported
26 if hasattr(self, '_has_admin_extensions'):
27 return self._has_admin_extensions
28 # Try something that requires admin
31 self._has_admin_extensions = True
33 self._has_admin_extensions = False
34 return self._has_admin_extensions
36 def create_role(self, name):
41 post_body = json.dumps({'role': post_body})
42 resp, body = self.post('OS-KSADM/roles', post_body)
43 self.expected_success(200, resp.status)
44 return service_client.ResponseBody(resp, self._parse_resp(body))
46 def get_role(self, role_id):
47 """Get a role by its id."""
48 resp, body = self.get('OS-KSADM/roles/%s' % role_id)
49 self.expected_success(200, resp.status)
50 body = json.loads(body)
51 return service_client.ResponseBody(resp, body['role'])
53 def create_tenant(self, name, **kwargs):
56 name (required): New tenant name
57 description: Description of new tenant (default is none)
58 enabled <true|false>: Initial tenant status (default is true)
62 'description': kwargs.get('description', ''),
63 'enabled': kwargs.get('enabled', True),
65 post_body = json.dumps({'tenant': post_body})
66 resp, body = self.post('tenants', post_body)
67 self.expected_success(200, resp.status)
68 return service_client.ResponseBody(resp, self._parse_resp(body))
70 def delete_role(self, role_id):
72 resp, body = self.delete('OS-KSADM/roles/%s' % str(role_id))
73 self.expected_success(204, resp.status)
76 def list_user_roles(self, tenant_id, user_id):
77 """Returns a list of roles assigned to a user for a tenant."""
78 url = '/tenants/%s/users/%s/roles' % (tenant_id, user_id)
79 resp, body = self.get(url)
80 self.expected_success(200, resp.status)
81 return service_client.ResponseBodyList(resp, self._parse_resp(body))
83 def assign_user_role(self, tenant_id, user_id, role_id):
84 """Add roles to a user on a tenant."""
85 resp, body = self.put('/tenants/%s/users/%s/roles/OS-KSADM/%s' %
86 (tenant_id, user_id, role_id), "")
87 self.expected_success(200, resp.status)
88 return service_client.ResponseBody(resp, self._parse_resp(body))
90 def remove_user_role(self, tenant_id, user_id, role_id):
91 """Removes a role assignment for a user on a tenant."""
92 resp, body = self.delete('/tenants/%s/users/%s/roles/OS-KSADM/%s' %
93 (tenant_id, user_id, role_id))
94 self.expected_success(204, resp.status)
95 return service_client.ResponseBody(resp, body)
97 def delete_tenant(self, tenant_id):
98 """Delete a tenant."""
99 resp, body = self.delete('tenants/%s' % str(tenant_id))
100 self.expected_success(204, resp.status)
101 return service_client.ResponseBody(resp, body)
103 def get_tenant(self, tenant_id):
104 """Get tenant details."""
105 resp, body = self.get('tenants/%s' % str(tenant_id))
106 self.expected_success(200, resp.status)
107 return service_client.ResponseBody(resp, self._parse_resp(body))
109 def list_roles(self):
111 resp, body = self.get('OS-KSADM/roles')
112 self.expected_success(200, resp.status)
113 return service_client.ResponseBodyList(resp, self._parse_resp(body))
115 def list_tenants(self):
116 """Returns tenants."""
117 resp, body = self.get('tenants')
118 self.expected_success(200, resp.status)
119 body = json.loads(body)
120 return service_client.ResponseBodyList(resp, body['tenants'])
122 def get_tenant_by_name(self, tenant_name):
123 tenants = self.list_tenants()
124 for tenant in tenants:
125 if tenant['name'] == tenant_name:
127 raise lib_exc.NotFound('No such tenant')
129 def update_tenant(self, tenant_id, **kwargs):
130 """Updates a tenant."""
131 body = self.get_tenant(tenant_id)
132 name = kwargs.get('name', body['name'])
133 desc = kwargs.get('description', body['description'])
134 en = kwargs.get('enabled', body['enabled'])
141 post_body = json.dumps({'tenant': post_body})
142 resp, body = self.post('tenants/%s' % tenant_id, post_body)
143 self.expected_success(200, resp.status)
144 return service_client.ResponseBody(resp, self._parse_resp(body))
146 def create_user(self, name, password, tenant_id, email, **kwargs):
150 'password': password,
153 if tenant_id is not None:
154 post_body['tenantId'] = tenant_id
155 if kwargs.get('enabled') is not None:
156 post_body['enabled'] = kwargs.get('enabled')
157 post_body = json.dumps({'user': post_body})
158 resp, body = self.post('users', post_body)
159 self.expected_success(200, resp.status)
160 return service_client.ResponseBody(resp, self._parse_resp(body))
162 def update_user(self, user_id, **kwargs):
163 """Updates a user."""
164 put_body = json.dumps({'user': kwargs})
165 resp, body = self.put('users/%s' % user_id, put_body)
166 self.expected_success(200, resp.status)
167 return service_client.ResponseBody(resp, self._parse_resp(body))
169 def get_user(self, user_id):
171 resp, body = self.get("users/%s" % user_id)
172 self.expected_success(200, resp.status)
173 return service_client.ResponseBody(resp, self._parse_resp(body))
175 def delete_user(self, user_id):
177 resp, body = self.delete("users/%s" % user_id)
178 self.expected_success(204, resp.status)
179 return service_client.ResponseBody(resp, body)
182 """Get the list of users."""
183 resp, body = self.get("users")
184 self.expected_success(200, resp.status)
185 return service_client.ResponseBodyList(resp, self._parse_resp(body))
187 def enable_disable_user(self, user_id, enabled):
188 """Enables or disables a user."""
192 put_body = json.dumps({'user': put_body})
193 resp, body = self.put('users/%s/enabled' % user_id, put_body)
194 self.expected_success(200, resp.status)
195 return service_client.ResponseBody(resp, self._parse_resp(body))
197 def get_token(self, token_id):
198 """Get token details."""
199 resp, body = self.get("tokens/%s" % token_id)
200 self.expected_success(200, resp.status)
201 return service_client.ResponseBody(resp, self._parse_resp(body))
203 def delete_token(self, token_id):
204 """Delete a token."""
205 resp, body = self.delete("tokens/%s" % token_id)
206 self.expected_success(204, resp.status)
207 return service_client.ResponseBody(resp, body)
209 def list_users_for_tenant(self, tenant_id):
210 """List users for a Tenant."""
211 resp, body = self.get('/tenants/%s/users' % tenant_id)
212 self.expected_success(200, resp.status)
213 return service_client.ResponseBodyList(resp, self._parse_resp(body))
215 def get_user_by_username(self, tenant_id, username):
216 users = self.list_users_for_tenant(tenant_id)
218 if user['name'] == username:
220 raise lib_exc.NotFound('No such user')
222 def create_service(self, name, type, **kwargs):
223 """Create a service."""
227 'description': kwargs.get('description')
229 post_body = json.dumps({'OS-KSADM:service': post_body})
230 resp, body = self.post('/OS-KSADM/services', post_body)
231 self.expected_success(200, resp.status)
232 return service_client.ResponseBody(resp, self._parse_resp(body))
234 def get_service(self, service_id):
236 url = '/OS-KSADM/services/%s' % service_id
237 resp, body = self.get(url)
238 self.expected_success(200, resp.status)
239 return service_client.ResponseBody(resp, self._parse_resp(body))
241 def list_services(self):
242 """List Service - Returns Services."""
243 resp, body = self.get('/OS-KSADM/services')
244 self.expected_success(200, resp.status)
245 return service_client.ResponseBodyList(resp, self._parse_resp(body))
247 def delete_service(self, service_id):
248 """Delete Service."""
249 url = '/OS-KSADM/services/%s' % service_id
250 resp, body = self.delete(url)
251 self.expected_success(204, resp.status)
252 return service_client.ResponseBody(resp, body)
254 def update_user_password(self, user_id, new_pass):
255 """Update User Password."""
257 'password': new_pass,
260 put_body = json.dumps({'user': put_body})
261 resp, body = self.put('users/%s/OS-KSADM/password' % user_id, put_body)
262 self.expected_success(200, resp.status)
263 return service_client.ResponseBody(resp, self._parse_resp(body))
265 def list_extensions(self):
266 """List all the extensions."""
267 resp, body = self.get('/extensions')
268 self.expected_success(200, resp.status)
269 body = json.loads(body)
270 return service_client.ResponseBodyList(resp,
271 body['extensions']['values'])
273 def create_user_ec2_credentials(self, user_id, tenant_id):
274 post_body = json.dumps({'tenant_id': tenant_id})
275 resp, body = self.post('/users/%s/credentials/OS-EC2' % user_id,
277 self.expected_success(200, resp.status)
278 return service_client.ResponseBody(resp, self._parse_resp(body))
280 def list_user_ec2_credentials(self, user_id):
281 resp, body = self.get('/users/%s/credentials/OS-EC2' % user_id)
282 self.expected_success(200, resp.status)
283 return service_client.ResponseBodyList(resp, self._parse_resp(body))