* packages is a list of:
- pkg name (httpd), or
- pkg name with version spec (httpd-2.2.22), or
- - pkg name with version-release spec (httpd-2.2.22-1.fc16)
+ - pkg name with version-release spec
+ (httpd-2.2.22-1.fc16)
"""
if rpms:
cmd = "rpm -U --force --nosignature "
* use Yum to downgrade packages
* packages is a list of:
- pkg name with version spec (httpd-2.2.22), or
- - pkg name with version-release spec (httpd-2.2.22-1.fc16)
+ - pkg name with version-release spec
+ (httpd-2.2.22-1.fc16)
"""
if rpms:
cls.install(packages)
def _handle_yum_packages(self, packages):
"""
- Handle installation, upgrade, or downgrade of a set of packages via yum.
+ Handle installation, upgrade, or downgrade of a set of
+ packages via yum.
Arguments:
packages -- a package entries map of the form:
# FIXME:print non-error, but skipping pkg
pass
elif not RpmHelper.yum_package_available(pkg):
- logging.warn("Skipping package '%s'. Not available via yum" % pkg)
+ logging.warn("Skipping package '%s'. Not available via yum" % \
+ pkg)
elif not ver:
installs.append(pkg)
else:
def _handle_rpm_packages(sef, packages):
"""
- Handle installation, upgrade, or downgrade of a set of packages via rpm.
+ Handle installation, upgrade, or downgrade of a set of
+ packages via rpm.
Arguments:
packages -- a package entries map of the form:
else:
handler(self, package_entries)
+
class FilesHandler(object):
def __init__(self, files):
self._files = files
"""
Process the resource metadata
"""
- # FIXME: when config sets are implemented, this should select the correct
- # config set from the metadata, and send each config in the config set to
- # process_config
+ # FIXME: when config sets are implemented, this should select the
+ # correct config set from the metadata, and send each config in the
+ # config set to process_config
if not self._is_valid_metadata():
raise Exception("invalid metadata")
else:
#conf.db_backend = 'heat.db.sqlalchemy.api'
IMPL = heat.utils.LazyPluggable('db_backend',
sqlalchemy='heat.db.sqlalchemy.api')
+
+
def configure(conf):
global SQL_CONNECTION
global SQL_IDLE_TIMEOUT
SQL_IDLE_TIMEOUT = conf.sql_idle_timeout
-
def raw_template_get(context, template_id):
return IMPL.raw_template_get(context, template_id)
_REPOSITORY = None
+
@migrate_util.decorator
def patched_with_engine(f, *a, **kw):
url = a[0]
from migrate import exceptions as versioning_exceptions
except ImportError:
sys.exit(_("python-migrate is not installed. Exiting."))
-
+
#_REPOSITORY = None
engine = get_engine()
meta.reflect(bind=engine)
try:
- for table in ('stack', 'resource', 'event',
+ for table in ('stack', 'resource', 'event',
'parsed_template', 'raw_template'):
assert table in meta.tables
return db_version_control(1)
class Error(Exception):
pass
+
class DBError(Error):
"""Wraps an implementation specific exception."""
def __init__(self, inner_exception=None):
_wrap.func_name = f.func_name
return _wrap
+
def get_session(autocommit=True, expire_on_commit=False):
"""Return a SQLAlchemy session."""
global _ENGINE, _MAKER
self.t['stack_status'] = new_status
self.update_parsed_template()
-
def create_blocking(self):
'''
create all the resources in the order specified by get_create_order
LOG = logging.getLogger(__name__)
+
def create_connection(new=True):
"""Create a connection to the message bus used for rpc.
_DB = None
+
def reset_db():
engine = get_engine()
engine.dispose()
conn = engine.connect()
conn.connection.executescript(_DB)
+
def setup():
import mox # Fail fast if you don't have mox. Workaround for bug 810424
f.write('junk, just junk')
f.close()
+
def tearDown_credential_file():
shutil.rmtree('/tmp/incredible', ignore_errors=True)
+
@with_setup(setUp_credential_file, tearDown_credential_file)
@attr(tag=['unit', 'cfn-hup'])
@attr(speed='fast')
def test_hup_conf1():
- good= """
+ good = """
[main]
stack=stack-test
credential-file=/tmp/incredible
@attr(tag=['unit', 'cfn-hup'])
@attr(speed='fast')
def test_hup_default():
- good= """
+ good = """
[main]
stack=stack-testr
credential-file=/tmp/incredible
@attr(tag=['unit', 'cfn-hup'])
@attr(speed='fast')
def test_hup_hook():
- good= """
+ good = """
[main]
stack=stackname_is_fred
credential-file=/tmp/incredible
self.m.UnsetStubs()
shutil.rmtree('/tmp/_files_test_', ignore_errors=True)
-
@attr(tag=['unit', 'cfn-metadata'])
@attr(speed='fast')
def test_metadata_files(self):
"config" : {
"files" : {
"/tmp/_files_test_/epel.repo" : {
- "source" : "https://raw.github.com/heat-api/heat/master/README.rst",
+ "source" : "https://raw.github.com/heat-api/heat/master/README.rst",
"mode" : "000644"
},
"/tmp/_files_test_/_with/some/dirs/to/make/small.conf" : {
os.chown('/tmp/_files_test_/epel.repo', -1, -1)
os.chmod('/tmp/_files_test_/node.json', 384)
os.chmod('/tmp/_files_test_/epel.repo', 420)
- os.chown('/tmp/_files_test_/_with/some/dirs/to/make/small.conf', -1, -1)
+ os.chown('/tmp/_files_test_/_with/some/dirs/to/make/small.conf',
+ -1, -1)
os.chmod('/tmp/_files_test_/_with/some/dirs/to/make/small.conf', 511)
self.m.ReplayAll()
import heat.db as db_api
from heat.engine import parser
+
@attr(tag=['unit', 'resource'])
@attr(speed='fast')
class ResourcesTest(unittest.TestCase):
params = {}
parameters = {}
- params['KeyStoneCreds'] = None
+ params['KeyStoneCreds'] = None
t['Parameters']['KeyName']['Value'] = 'test'
stack = parser.Stack('test_stack', t, 0, params)
-
+
self.m.StubOutWithMock(db_api, 'resource_get_by_name_and_stack')
db_api.resource_get_by_name_and_stack(None, 'test_resource_name',\
stack).AndReturn(None)
userdata = t['Resources']['WebServer']['Properties']['UserData']
self.m.ReplayAll()
-
- t['Resources']['WebServer']['Properties']['ImageId'] = 'CentOS 5.2'
- t['Resources']['WebServer']['Properties']['InstanceType'] = '256 MB Server'
+
+ t['Resources']['WebServer']['Properties']['ImageId'] = 'CentOS 5.2'
+ t['Resources']['WebServer']['Properties']['InstanceType'] = \
+ '256 MB Server'
instance = resources.Instance('test_resource_name',\
t['Resources']['WebServer'], stack)
-
+
server_userdata = instance._build_userdata(json.dumps(userdata))
self.m.StubOutWithMock(self.fc.servers, 'create')
self.fc.servers.create(image=1, flavor=1, key_name='test',\
userdata=server_userdata).\
AndReturn(self.fc.servers.list()[1])
self.m.ReplayAll()
-
instance.itype_oflavor['256 MB Server'] = '256 MB Server'
instance.create()
-
+
self.m.ReplayAll()
instance.itype_oflavor['256 MB Server'] = '256 MB Server'
params = {}
parameters = {}
- params['KeyStoneCreds'] = None
+ params['KeyStoneCreds'] = None
t['Parameters']['KeyName']['Value'] = 'test'
stack = parser.Stack('test_stack', t, 0, params)
-
+
self.m.StubOutWithMock(db_api, 'resource_get_by_name_and_stack')
db_api.resource_get_by_name_and_stack(None, 'test_resource_name',\
stack).AndReturn(None)
userdata = t['Resources']['WebServer']['Properties']['UserData']
self.m.ReplayAll()
-
- t['Resources']['WebServer']['Properties']['ImageId'] = 'CentOS 5.2'
- t['Resources']['WebServer']['Properties']['InstanceType'] = '256 MB Server'
+
+ t['Resources']['WebServer']['Properties']['ImageId'] = 'CentOS 5.2'
+ t['Resources']['WebServer']['Properties']['InstanceType'] = \
+ '256 MB Server'
instance = resources.Instance('test_resource_name',\
t['Resources']['WebServer'], stack)
-
+
server_userdata = instance._build_userdata(json.dumps(userdata))
self.m.StubOutWithMock(self.fc.servers, 'create')
self.fc.servers.create(image=1, flavor=1, key_name='test',\
userdata=server_userdata).\
AndReturn(self.fc.servers.list()[1])
self.m.ReplayAll()
-
instance.itype_oflavor['256 MB Server'] = '256 MB Server'
instance.create()
-
+
self.m.ReplayAll()
-
+
instance.instance_id = 1234
instance.itype_oflavor['256 MB Server'] = '256 MB Server'
instance.create()
assert(instance.instance_id == None)
assert(instance.state == instance.DELETE_COMPLETE)
-
# allows testing of the test directly, shown below
if __name__ == '__main__':
sys.argv.append(__file__)
nose.main()
-