46e8f8781ab758b8bc02f13629ba6c6db8ab21f3
[openstack-build/neutron-build.git] / neutron / tests / tempest / services / identity / v2 / json / identity_client.py
1 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
2 #    not use this file except in compliance with the License. You may obtain
3 #    a copy of the License at
4 #
5 #         http://www.apache.org/licenses/LICENSE-2.0
6 #
7 #    Unless required by applicable law or agreed to in writing, software
8 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10 #    License for the specific language governing permissions and limitations
11 #    under the License.
12
13 from oslo_serialization import jsonutils as json
14 from tempest_lib import exceptions as lib_exc
15
16 from neutron.tests.tempest.common import service_client
17
18
19 class IdentityClientJSON(service_client.ServiceClient):
20
21     def has_admin_extensions(self):
22         """
23         Returns True if the KSADM Admin Extensions are supported
24         False otherwise
25         """
26         if hasattr(self, '_has_admin_extensions'):
27             return self._has_admin_extensions
28         # Try something that requires admin
29         try:
30             self.list_roles()
31             self._has_admin_extensions = True
32         except Exception:
33             self._has_admin_extensions = False
34         return self._has_admin_extensions
35
36     def create_role(self, name):
37         """Create a role."""
38         post_body = {
39             'name': name,
40         }
41         post_body = json.dumps({'role': post_body})
42         resp, body = self.post('OS-KSADM/roles', post_body)
43         self.expected_success(200, resp.status)
44         return service_client.ResponseBody(resp, self._parse_resp(body))
45
46     def get_role(self, role_id):
47         """Get a role by its id."""
48         resp, body = self.get('OS-KSADM/roles/%s' % role_id)
49         self.expected_success(200, resp.status)
50         body = json.loads(body)
51         return service_client.ResponseBody(resp, body['role'])
52
53     def create_tenant(self, name, **kwargs):
54         """
55         Create a tenant
56         name (required): New tenant name
57         description: Description of new tenant (default is none)
58         enabled <true|false>: Initial tenant status (default is true)
59         """
60         post_body = {
61             'name': name,
62             'description': kwargs.get('description', ''),
63             'enabled': kwargs.get('enabled', True),
64         }
65         post_body = json.dumps({'tenant': post_body})
66         resp, body = self.post('tenants', post_body)
67         self.expected_success(200, resp.status)
68         return service_client.ResponseBody(resp, self._parse_resp(body))
69
70     def delete_role(self, role_id):
71         """Delete a role."""
72         resp, body = self.delete('OS-KSADM/roles/%s' % str(role_id))
73         self.expected_success(204, resp.status)
74         return resp, body
75
76     def list_user_roles(self, tenant_id, user_id):
77         """Returns a list of roles assigned to a user for a tenant."""
78         url = '/tenants/%s/users/%s/roles' % (tenant_id, user_id)
79         resp, body = self.get(url)
80         self.expected_success(200, resp.status)
81         return service_client.ResponseBodyList(resp, self._parse_resp(body))
82
83     def assign_user_role(self, tenant_id, user_id, role_id):
84         """Add roles to a user on a tenant."""
85         resp, body = self.put('/tenants/%s/users/%s/roles/OS-KSADM/%s' %
86                               (tenant_id, user_id, role_id), "")
87         self.expected_success(200, resp.status)
88         return service_client.ResponseBody(resp, self._parse_resp(body))
89
90     def remove_user_role(self, tenant_id, user_id, role_id):
91         """Removes a role assignment for a user on a tenant."""
92         resp, body = self.delete('/tenants/%s/users/%s/roles/OS-KSADM/%s' %
93                                  (tenant_id, user_id, role_id))
94         self.expected_success(204, resp.status)
95         return service_client.ResponseBody(resp, body)
96
97     def delete_tenant(self, tenant_id):
98         """Delete a tenant."""
99         resp, body = self.delete('tenants/%s' % str(tenant_id))
100         self.expected_success(204, resp.status)
101         return service_client.ResponseBody(resp, body)
102
103     def get_tenant(self, tenant_id):
104         """Get tenant details."""
105         resp, body = self.get('tenants/%s' % str(tenant_id))
106         self.expected_success(200, resp.status)
107         return service_client.ResponseBody(resp, self._parse_resp(body))
108
109     def list_roles(self):
110         """Returns roles."""
111         resp, body = self.get('OS-KSADM/roles')
112         self.expected_success(200, resp.status)
113         return service_client.ResponseBodyList(resp, self._parse_resp(body))
114
115     def list_tenants(self):
116         """Returns tenants."""
117         resp, body = self.get('tenants')
118         self.expected_success(200, resp.status)
119         body = json.loads(body)
120         return service_client.ResponseBodyList(resp, body['tenants'])
121
122     def get_tenant_by_name(self, tenant_name):
123         tenants = self.list_tenants()
124         for tenant in tenants:
125             if tenant['name'] == tenant_name:
126                 return tenant
127         raise lib_exc.NotFound('No such tenant')
128
129     def update_tenant(self, tenant_id, **kwargs):
130         """Updates a tenant."""
131         body = self.get_tenant(tenant_id)
132         name = kwargs.get('name', body['name'])
133         desc = kwargs.get('description', body['description'])
134         en = kwargs.get('enabled', body['enabled'])
135         post_body = {
136             'id': tenant_id,
137             'name': name,
138             'description': desc,
139             'enabled': en,
140         }
141         post_body = json.dumps({'tenant': post_body})
142         resp, body = self.post('tenants/%s' % tenant_id, post_body)
143         self.expected_success(200, resp.status)
144         return service_client.ResponseBody(resp, self._parse_resp(body))
145
146     def create_user(self, name, password, tenant_id, email, **kwargs):
147         """Create a user."""
148         post_body = {
149             'name': name,
150             'password': password,
151             'email': email
152         }
153         if tenant_id is not None:
154             post_body['tenantId'] = tenant_id
155         if kwargs.get('enabled') is not None:
156             post_body['enabled'] = kwargs.get('enabled')
157         post_body = json.dumps({'user': post_body})
158         resp, body = self.post('users', post_body)
159         self.expected_success(200, resp.status)
160         return service_client.ResponseBody(resp, self._parse_resp(body))
161
162     def update_user(self, user_id, **kwargs):
163         """Updates a user."""
164         put_body = json.dumps({'user': kwargs})
165         resp, body = self.put('users/%s' % user_id, put_body)
166         self.expected_success(200, resp.status)
167         return service_client.ResponseBody(resp, self._parse_resp(body))
168
169     def get_user(self, user_id):
170         """GET a user."""
171         resp, body = self.get("users/%s" % user_id)
172         self.expected_success(200, resp.status)
173         return service_client.ResponseBody(resp, self._parse_resp(body))
174
175     def delete_user(self, user_id):
176         """Delete a user."""
177         resp, body = self.delete("users/%s" % user_id)
178         self.expected_success(204, resp.status)
179         return service_client.ResponseBody(resp, body)
180
181     def get_users(self):
182         """Get the list of users."""
183         resp, body = self.get("users")
184         self.expected_success(200, resp.status)
185         return service_client.ResponseBodyList(resp, self._parse_resp(body))
186
187     def enable_disable_user(self, user_id, enabled):
188         """Enables or disables a user."""
189         put_body = {
190             'enabled': enabled
191         }
192         put_body = json.dumps({'user': put_body})
193         resp, body = self.put('users/%s/enabled' % user_id, put_body)
194         self.expected_success(200, resp.status)
195         return service_client.ResponseBody(resp, self._parse_resp(body))
196
197     def get_token(self, token_id):
198         """Get token details."""
199         resp, body = self.get("tokens/%s" % token_id)
200         self.expected_success(200, resp.status)
201         return service_client.ResponseBody(resp, self._parse_resp(body))
202
203     def delete_token(self, token_id):
204         """Delete a token."""
205         resp, body = self.delete("tokens/%s" % token_id)
206         self.expected_success(204, resp.status)
207         return service_client.ResponseBody(resp, body)
208
209     def list_users_for_tenant(self, tenant_id):
210         """List users for a Tenant."""
211         resp, body = self.get('/tenants/%s/users' % tenant_id)
212         self.expected_success(200, resp.status)
213         return service_client.ResponseBodyList(resp, self._parse_resp(body))
214
215     def get_user_by_username(self, tenant_id, username):
216         users = self.list_users_for_tenant(tenant_id)
217         for user in users:
218             if user['name'] == username:
219                 return user
220         raise lib_exc.NotFound('No such user')
221
222     def create_service(self, name, type, **kwargs):
223         """Create a service."""
224         post_body = {
225             'name': name,
226             'type': type,
227             'description': kwargs.get('description')
228         }
229         post_body = json.dumps({'OS-KSADM:service': post_body})
230         resp, body = self.post('/OS-KSADM/services', post_body)
231         self.expected_success(200, resp.status)
232         return service_client.ResponseBody(resp, self._parse_resp(body))
233
234     def get_service(self, service_id):
235         """Get Service."""
236         url = '/OS-KSADM/services/%s' % service_id
237         resp, body = self.get(url)
238         self.expected_success(200, resp.status)
239         return service_client.ResponseBody(resp, self._parse_resp(body))
240
241     def list_services(self):
242         """List Service - Returns Services."""
243         resp, body = self.get('/OS-KSADM/services')
244         self.expected_success(200, resp.status)
245         return service_client.ResponseBodyList(resp, self._parse_resp(body))
246
247     def delete_service(self, service_id):
248         """Delete Service."""
249         url = '/OS-KSADM/services/%s' % service_id
250         resp, body = self.delete(url)
251         self.expected_success(204, resp.status)
252         return service_client.ResponseBody(resp, body)
253
254     def update_user_password(self, user_id, new_pass):
255         """Update User Password."""
256         put_body = {
257             'password': new_pass,
258             'id': user_id
259         }
260         put_body = json.dumps({'user': put_body})
261         resp, body = self.put('users/%s/OS-KSADM/password' % user_id, put_body)
262         self.expected_success(200, resp.status)
263         return service_client.ResponseBody(resp, self._parse_resp(body))
264
265     def list_extensions(self):
266         """List all the extensions."""
267         resp, body = self.get('/extensions')
268         self.expected_success(200, resp.status)
269         body = json.loads(body)
270         return service_client.ResponseBodyList(resp,
271                                                body['extensions']['values'])
272
273     def create_user_ec2_credentials(self, user_id, tenant_id):
274         post_body = json.dumps({'tenant_id': tenant_id})
275         resp, body = self.post('/users/%s/credentials/OS-EC2' % user_id,
276                                post_body)
277         self.expected_success(200, resp.status)
278         return service_client.ResponseBody(resp, self._parse_resp(body))
279
280     def list_user_ec2_credentials(self, user_id):
281         resp, body = self.get('/users/%s/credentials/OS-EC2' % user_id)
282         self.expected_success(200, resp.status)
283         return service_client.ResponseBodyList(resp, self._parse_resp(body))