"cloudformation:EstimateTemplateCost": "rule:deny_stack_user",
"cloudformation:DescribeStackResource": "",
"cloudformation:DescribeStackResources": "rule:deny_stack_user",
- "cloudformation:ListStackResources": "rule:deny_stack_user"
+ "cloudformation:ListStackResources": "rule:deny_stack_user",
+
+ "cloudwatch:DeleteAlarms": "rule:deny_stack_user",
+ "cloudwatch:DescribeAlarmHistory": "rule:deny_stack_user",
+ "cloudwatch:DescribeAlarms": "rule:deny_stack_user",
+ "cloudwatch:DescribeAlarmsForMetric": "rule:deny_stack_user",
+ "cloudwatch:DisableAlarmActions": "rule:deny_stack_user",
+ "cloudwatch:EnableAlarmActions": "rule:deny_stack_user",
+ "cloudwatch:GetMetricStatistics": "rule:deny_stack_user",
+ "cloudwatch:ListMetrics": "rule:deny_stack_user",
+ "cloudwatch:PutMetricAlarm": "rule:deny_stack_user",
+ "cloudwatch:PutMetricData": "",
+ "cloudwatch:SetAlarmState": "rule:deny_stack_user"
}
from heat.api.aws import exception
from heat.api.aws import utils as api_utils
from heat.common import wsgi
+from heat.common import policy
+from heat.common import exception as heat_exception
from heat.rpc import client as rpc_client
from heat.rpc import api as engine_api
def __init__(self, options):
self.options = options
self.engine_rpcapi = rpc_client.EngineClient()
+ self.policy = policy.Enforcer(scope='cloudwatch')
+
+ def _enforce(self, req, action):
+ """Authorize an action against the policy.json"""
+ try:
+ self.policy.enforce(req.context, action, {})
+ except heat_exception.Forbidden:
+ raise exception.HeatAccessDeniedError("Action %s not allowed " %
+ action + "for user")
+ except Exception as ex:
+ # We expect policy.enforce to either pass or raise Forbidden
+ # however, if anything else happens, we want to raise
+ # HeatInternalFailureError, failure to do this results in
+ # the user getting a big stacktrace spew as an API response
+ raise exception.HeatInternalFailureError("Error authorizing " +
+ "action %s" % action)
@staticmethod
def _reformat_dimensions(dims):
"""
Implements DeleteAlarms API action
"""
+ self._enforce(req, 'DeleteAlarms')
return exception.HeatAPINotImplementedError()
def describe_alarm_history(self, req):
"""
Implements DescribeAlarmHistory API action
"""
+ self._enforce(req, 'DescribeAlarmHistory')
return exception.HeatAPINotImplementedError()
def describe_alarms(self, req):
"""
Implements DescribeAlarms API action
"""
+ self._enforce(req, 'DescribeAlarms')
def format_metric_alarm(a):
"""
"""
Implements DescribeAlarmsForMetric API action
"""
+ self._enforce(req, 'DescribeAlarmsForMetric')
return exception.HeatAPINotImplementedError()
def disable_alarm_actions(self, req):
"""
Implements DisableAlarmActions API action
"""
+ self._enforce(req, 'DisableAlarmActions')
return exception.HeatAPINotImplementedError()
def enable_alarm_actions(self, req):
"""
Implements EnableAlarmActions API action
"""
+ self._enforce(req, 'EnableAlarmActions')
return exception.HeatAPINotImplementedError()
def get_metric_statistics(self, req):
"""
Implements GetMetricStatistics API action
"""
+ self._enforce(req, 'GetMetricStatistics')
return exception.HeatAPINotImplementedError()
def list_metrics(self, req):
Lists metric datapoints associated with a particular alarm,
or all alarms if none specified
"""
+ self._enforce(req, 'ListMetrics')
+
def format_metric_data(d, fil={}):
"""
Reformat engine output into the AWS "Metric" format
"""
Implements PutMetricAlarm API action
"""
+ self._enforce(req, 'PutMetricAlarm')
return exception.HeatAPINotImplementedError()
def put_metric_data(self, req):
"""
Implements PutMetricData API action
"""
+ self._enforce(req, 'PutMetricData')
con = req.context
parms = dict(req.params)
"""
Implements SetAlarmState API action
"""
+ self._enforce(req, 'SetAlarmState')
+
# Map from AWS state names to those used in the engine
state_map = {'OK': engine_api.WATCH_STATE_OK,
'ALARM': engine_api.WATCH_STATE_ALARM,
"cloudformation:EstimateTemplateCost": "rule:deny_stack_user",
"cloudformation:DescribeStackResource": "",
"cloudformation:DescribeStackResources": "rule:deny_stack_user",
- "cloudformation:ListStackResources": "rule:deny_stack_user"
+ "cloudformation:ListStackResources": "rule:deny_stack_user",
+
+ "cloudwatch:DeleteAlarms": "rule:deny_stack_user",
+ "cloudwatch:DescribeAlarmHistory": "rule:deny_stack_user",
+ "cloudwatch:DescribeAlarms": "rule:deny_stack_user",
+ "cloudwatch:DescribeAlarmsForMetric": "rule:deny_stack_user",
+ "cloudwatch:DisableAlarmActions": "rule:deny_stack_user",
+ "cloudwatch:EnableAlarmActions": "rule:deny_stack_user",
+ "cloudwatch:GetMetricStatistics": "rule:deny_stack_user",
+ "cloudwatch:ListMetrics": "rule:deny_stack_user",
+ "cloudwatch:PutMetricAlarm": "rule:deny_stack_user",
+ "cloudwatch:PutMetricData": "",
+ "cloudwatch:SetAlarmState": "rule:deny_stack_user"
}
# License for the specific language governing permissions and limitations
# under the License.
-
+import os
import mox
import unittest
from nose.plugins.attrib import attr
from heat.common import context
+from heat.common import policy
from heat.openstack.common import cfg
from heat.openstack.common import rpc
from heat.common.wsgi import Request
{'Name': 'Foo', 'Value': 'bar'}]
self.assert_(response == expected)
+ def test_enforce_default(self):
+ self.m.ReplayAll()
+ params = {'Action': 'ListMetrics'}
+ dummy_req = self._dummy_GET_request(params)
+ self.controller.policy.policy_path = None
+ response = self.controller._enforce(dummy_req, 'ListMetrics')
+ self.assertEqual(response, None)
+ self.m.VerifyAll()
+
+ def test_enforce_denied(self):
+ self.m.ReplayAll()
+ params = {'Action': 'ListMetrics'}
+ dummy_req = self._dummy_GET_request(params)
+ dummy_req.context.roles = ['heat_stack_user']
+ self.controller.policy.policy_path = (self.policy_path +
+ 'deny_stack_user.json')
+ self.assertRaises(exception.HeatAccessDeniedError,
+ self.controller._enforce, dummy_req, 'ListMetrics')
+ self.m.VerifyAll()
+
+ def test_enforce_ise(self):
+ params = {'Action': 'ListMetrics'}
+ dummy_req = self._dummy_GET_request(params)
+ dummy_req.context.roles = ['heat_stack_user']
+
+ self.m.StubOutWithMock(policy.Enforcer, 'enforce')
+ policy.Enforcer.enforce(dummy_req.context, 'ListMetrics', {}
+ ).AndRaise(AttributeError)
+ self.m.ReplayAll()
+
+ self.controller.policy.policy_path = (self.policy_path +
+ 'deny_stack_user.json')
+ self.assertRaises(exception.HeatInternalFailureError,
+ self.controller._enforce, dummy_req, 'ListMetrics')
+ self.m.VerifyAll()
+
def test_delete(self):
# Not yet implemented, should raise HeatAPINotImplementedError
params = {'Action': 'DeleteAlarms'}
self.maxDiff = None
self.m = mox.Mox()
+ self.path = os.path.dirname(os.path.realpath(__file__))
+ self.policy_path = self.path + "/policy/"
+ opts = [
+ cfg.StrOpt('config_dir', default=self.policy_path),
+ cfg.StrOpt('config_file', default='foo'),
+ cfg.StrOpt('project', default='heat'),
+ ]
+ cfg.CONF.register_opts(opts)
cfg.CONF.set_default('engine_topic', 'engine')
cfg.CONF.set_default('host', 'host')
self.topic = '%s.%s' % (cfg.CONF.engine_topic, cfg.CONF.host)
bind_port = 8003
cfgopts = DummyConfig()
self.controller = watches.WatchController(options=cfgopts)
+ self.controller.policy.policy_path = None
print "setup complete"
def tearDown(self):
"EstimateTemplateCost", "DescribeStackResource",
"DescribeStackResources")
+ cw_actions = ("DeleteAlarms", "DescribeAlarmHistory", "DescribeAlarms",
+ "DescribeAlarmsForMetric", "DisableAlarmActions",
+ "EnableAlarmActions", "GetMetricStatistics", "ListMetrics",
+ "PutMetricAlarm", "PutMetricData", "SetAlarmState")
+
def setUp(self):
self.path = os.path.dirname(os.path.realpath(__file__)) + "/policy/"
self.m = mox.Mox()
# Everything should be allowed
enforcer.enforce(ctx, action, {})
self.m.VerifyAll()
+
+ def test_policy_cw_deny_stack_user(self):
+ pf = self.path + 'deny_stack_user.json'
+ self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file')
+ policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf)
+ self.m.ReplayAll()
+
+ enforcer = policy.Enforcer(scope='cloudwatch')
+
+ ctx = context.RequestContext(roles=['heat_stack_user'])
+ for action in self.cw_actions:
+ # Everything apart from PutMetricData should be Forbidden
+ if action == "PutMetricData":
+ enforcer.enforce(ctx, action, {})
+ else:
+ self.assertRaises(exception.Forbidden, enforcer.enforce, ctx,
+ action, {})
+ self.m.VerifyAll()
+
+ def test_policy_cw_allow_non_stack_user(self):
+ pf = self.path + 'deny_stack_user.json'
+ self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file')
+ policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf)
+ self.m.ReplayAll()
+
+ enforcer = policy.Enforcer(scope='cloudwatch')
+
+ ctx = context.RequestContext(roles=['not_a_stack_user'])
+ for action in self.cw_actions:
+ # Everything should be allowed
+ enforcer.enforce(ctx, action, {})
+ self.m.VerifyAll()