import array
import base64
import binascii
+import re
from barbicanclient import client as barbican_client
from keystoneclient.auth import identity
CONF.import_opt('encryption_auth_url', 'cinder.keymgr.key_mgr', group='keymgr')
CONF.import_opt('encryption_api_url', 'cinder.keymgr.key_mgr', group='keymgr')
LOG = logging.getLogger(__name__)
+URL_PATTERN = re.compile(
+ "(?P<url_base>http[s]?://[^/]*)[/]?(?P<url_version>(v[0-9.]+)?).*")
class BarbicanKeyManager(key_mgr.KeyManager):
def __init__(self):
self._base_url = CONF.keymgr.encryption_api_url
- # the barbican endpoint can't have the '/v1' on the end
- self._barbican_endpoint = self._base_url.rpartition('/')[0]
+ self._parse_barbican_api_url()
self._barbican_client = None
+ def _parse_barbican_api_url(self):
+ """Setup member variables to reference the Barbican URL.
+
+ The key manipulation functions in this module need to use the
+ barbican URL with the version appended. But the barbicanclient
+ Client() class needs the URL without the version appended.
+ So set up a member variables here for each case.
+ """
+ m = URL_PATTERN.search(self._base_url)
+ if m is None:
+ raise exception.KeyManagerError(_(
+ "Invalid url: must be in the form "
+ "'http[s]://<ipaddr>|<fqdn>[:port]/<version>', "
+ "url specified is: %s"), self._base_url)
+ url_info = dict(m.groupdict())
+ if 'url_version' not in url_info or url_info['url_version'] == "":
+ raise exception.KeyManagerError(_(
+ "Invalid barbican api url: version is required, "
+ "e.g. 'http[s]://<ipaddr>|<fqdn>[:port]/<version>' "
+ "url specified is: %s") % self._base_url)
+ # We will also need the barbican API URL without the '/v1'.
+ # So save that now.
+ self._barbican_endpoint = url_info['url_base']
+
def _get_barbican_client(self, ctxt):
"""Creates a client to connect to the Barbican service.
self.pre_hex = "AIDxQp2++uAbKaTVDMXFYIu8PIugJGqkK0JLqkU0rhY="
self.hex = ("0080f1429dbefae01b29a4d50cc5c5608bbc3c8ba0246aa42b424baa4"
"534ae16")
+ self.original_api_url = CONF.keymgr.encryption_api_url
self.addCleanup(self._restore)
def _restore(self):
keymgr_key.SymmetricKey = self.original_key
if hasattr(self, 'original_base64'):
base64.b64encode = self.original_base64
+ if hasattr(self, 'original_api_url'):
+ CONF.keymgr.encryption_api_url = self.original_api_url
def _build_mock_barbican(self):
self.mock_barbican = mock.MagicMock(name='mock_barbican')
mock_session.assert_called_once_with(auth=mock_auth)
mock_client.assert_called_once_with(session=mock_sess,
endpoint=mock_endpoint)
+
+ def test_parse_barbican_api_url(self):
+ # assert that the correct format is handled correctly
+ CONF.keymgr.encryption_api_url = "http://host:port/v1/"
+ dummy = barbican.BarbicanKeyManager()
+ self.assertEqual(dummy._barbican_endpoint, "http://host:port")
+
+ # assert that invalid api url formats will raise an exception
+ CONF.keymgr.encryption_api_url = "http://host:port/"
+ self.assertRaises(exception.KeyManagerError,
+ barbican.BarbicanKeyManager)
+ CONF.keymgr.encryption_api_url = "http://host:port/secrets"
+ self.assertRaises(exception.KeyManagerError,
+ barbican.BarbicanKeyManager)