return IMPL.service_update(context, service_id, values)
-###################
-
-
-def iscsi_target_count_by_host(context, host):
- """Return count of export devices."""
- return IMPL.iscsi_target_count_by_host(context, host)
-
-
-def iscsi_target_create_safe(context, values):
- """Create an iscsi_target from the values dictionary.
-
- The device is not returned. If the create violates the unique
- constraints because the iscsi_target and host already exist,
- no exception is raised.
-
- """
- return IMPL.iscsi_target_create_safe(context, values)
-
-
###############
offset=offset)
-def volume_get_iscsi_target_num(context, volume_id):
- """Get the target num (tid) allocated to the volume."""
- return IMPL.volume_get_iscsi_target_num(context, volume_id)
-
-
def volume_update(context, volume_id, values):
"""Set the given properties on a volume and update it.
###################
-@require_admin_context
-def iscsi_target_count_by_host(context, host):
- return model_query(context, models.IscsiTarget).\
- filter_by(host=host).\
- count()
-
-
-@require_admin_context
-def iscsi_target_create_safe(context, values):
- iscsi_target_ref = models.IscsiTarget()
-
- for (key, value) in values.items():
- iscsi_target_ref[key] = value
- session = get_session()
-
- try:
- with session.begin():
- session.add(iscsi_target_ref)
- return iscsi_target_ref
- except db_exc.DBDuplicateEntry:
- LOG.debug("Can not add duplicate IscsiTarget.")
- return None
-
-
-###################
-
-
@require_context
def _quota_get(context, project_id, resource, session=None):
result = model_query(context, models.Quota, session=session,
'deleted_at': now,
'updated_at': literal_column('updated_at'),
'migration_status': None})
- model_query(context, models.IscsiTarget, session=session).\
- filter_by(volume_id=volume_id).\
- update({'volume_id': None})
model_query(context, models.VolumeMetadata, session=session).\
filter_by(volume_id=volume_id).\
update({'deleted': True,
return result_keys, result_dirs
-@require_admin_context
-def volume_get_iscsi_target_num(context, volume_id):
- result = model_query(context, models.IscsiTarget, read_deleted="yes").\
- filter_by(volume_id=volume_id).\
- first()
-
- if not result:
- raise exception.ISCSITargetNotFoundForVolume(volume_id=volume_id)
-
- return result.target_num
-
-
@require_context
def volume_update(context, volume_id, values):
session = get_session()
This is a database migration repository.
-More information at
-http://code.google.com/p/sqlalchemy-migrate/
+More information at:
+ https://github.com/openstack/sqlalchemy-migrate
+
+Original project is no longer maintained at:
+ http://code.google.com/p/sqlalchemy-migrate/
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from sqlalchemy import MetaData, Table
+
+
+def upgrade(migrate_engine):
+ meta = MetaData()
+ meta.bind = migrate_engine
+ table = Table('iscsi_targets', meta, autoload=True)
+ table.drop()
'SnapshotMetadata.deleted == False)')
-class IscsiTarget(BASE, CinderBase):
- """Represents an iscsi target for a given host."""
- __tablename__ = 'iscsi_targets'
- __table_args__ = (schema.UniqueConstraint("target_num", "host"),
- {'mysql_engine': 'InnoDB'})
- id = Column(Integer, primary_key=True)
- target_num = Column(Integer)
- host = Column(String(255))
- volume_id = Column(String(36), ForeignKey('volumes.id'), nullable=True)
- volume = relationship(Volume,
- backref=backref('iscsi_target', uselist=False),
- foreign_keys=volume_id,
- primaryjoin='and_(IscsiTarget.volume_id==Volume.id,'
- 'IscsiTarget.deleted==False)')
-
-
class Backup(BASE, CinderBase):
"""Represents a backup of a volume to Swift."""
__tablename__ = 'backups'
self.assertRaises(exception.InvalidInput, db.volume_get_all,
self.ctxt, None, None, sort_keys=keys)
- def test_volume_get_iscsi_target_num(self):
- db.iscsi_target_create_safe(self.ctxt, {'volume_id': 42,
- 'target_num': 43})
- self.assertEqual(43, db.volume_get_iscsi_target_num(self.ctxt, 42))
-
- def test_volume_get_iscsi_target_num_nonexistent(self):
- self.assertRaises(exception.ISCSITargetNotFoundForVolume,
- db.volume_get_iscsi_target_num, self.ctxt, 42)
-
def test_volume_update(self):
volume = db.volume_create(self.ctxt, {'host': 'h1'})
ref_a = db.volume_update(self.ctxt, volume['id'],
self.ctxt, 'p1'))
-class DBAPIIscsiTargetTestCase(BaseTest):
-
- """Unit tests for cinder.db.api.iscsi_target_*."""
-
- def _get_base_values(self):
- return {'target_num': 10, 'host': 'fake_host'}
-
- def test_iscsi_target_create_safe(self):
- target = db.iscsi_target_create_safe(self.ctxt,
- self._get_base_values())
- self.assertTrue(target['id'])
- self.assertEqual('fake_host', target['host'])
- self.assertEqual(10, target['target_num'])
-
- def test_iscsi_target_count_by_host(self):
- for i in range(3):
- values = self._get_base_values()
- values['target_num'] += i
- db.iscsi_target_create_safe(self.ctxt, values)
- self.assertEqual(3,
- db.iscsi_target_count_by_host(self.ctxt, 'fake_host'))
-
- def test_integrity_error(self):
- values = self._get_base_values()
- values['id'] = 1
- db.iscsi_target_create_safe(self.ctxt, values)
- self.assertFalse(db.iscsi_target_create_safe(self.ctxt, values))
-
-
class DBAPIBackupTestCase(BaseTest):
"""Tests for db.api.backup_* methods."""
db_volume = db.volume_get(self.context, volume.id)
self.assertEqual('available', db_volume.status)
- def test_concurrent_volumes_get_different_targets(self):
- """Ensure multiple concurrent volumes get different targets."""
- volume_ids = []
- targets = []
-
- def _check(volume_id):
- """Make sure targets aren't duplicated."""
- volume_ids.append(volume_id)
- admin_context = context.get_admin_context()
- iscsi_target = db.volume_get_iscsi_target_num(admin_context,
- volume_id)
- self.assertNotIn(iscsi_target, targets)
- targets.append(iscsi_target)
-
- # FIXME(jdg): What is this actually testing?
- # We never call the internal _check method?
- for _index in range(100):
- tests_utils.create_volume(self.context, **self.volume_params)
- for volume_id in volume_ids:
- self.volume.delete_volume(self.context, volume_id)
-
def test_multi_node(self):
# TODO(termie): Figure out how to test with two nodes,
# each of them having a different FLAG for storage_node