from cinder import db
from cinder.db import migration as db_migration
from cinder.db.sqlalchemy import api as db_api
+from cinder import exception
from cinder.i18n import _
from cinder import objects
from cinder import rpc
svc['availability_zone'], status, art,
svc['updated_at']))
+ @args('binary', type=str,
+ help='Service to delete from the host.')
+ @args('host_name', type=str,
+ help='Host from which to remove the service.')
+ def remove(self, binary, host_name):
+ """Completely removes a service."""
+ ctxt = context.get_admin_context()
+ try:
+ svc = db.service_get_by_args(ctxt, host_name, binary)
+ db.service_destroy(ctxt, svc['id'])
+ except exception.HostBinaryNotFound as e:
+ print(_("Host not found. Failed to remove %(service)s"
+ " on %(host)s.") %
+ {'service': binary, 'host': host_name})
+ print (u"%s" % e.args)
+ return 2
+ print(_("Service %(service)s on host %(host)s removed.") %
+ {'service': binary, 'host': host_name})
CATEGORIES = {
'backup': BackupCommands,
self.assertFalse(log_setup.called)
self.assertEqual(2, exit.code)
+ @mock.patch('cinder.db')
+ def test_remove_service_failure(self, mock_db):
+ mock_db.service_destroy.side_effect = SystemExit(1)
+ service_commands = cinder_manage.ServiceCommands()
+ exit = service_commands.remove('abinary', 'ahost')
+ self.assertEqual(2, exit)
+
+ @mock.patch('cinder.db.service_destroy')
+ @mock.patch('cinder.db.service_get_by_args',
+ return_value = {'id': 'volID'})
+ def test_remove_service_success(self, mock_get_by_args,
+ mock_service_destroy):
+ service_commands = cinder_manage.ServiceCommands()
+ self.assertIsNone(service_commands.remove('abinary', 'ahost'))
+
class TestCinderRtstoolCmd(test.TestCase):