# Maximum amount of retries to generate a unique MAC address
# mac_generation_retries = 16
+# DHCP Lease duration (in seconds)
+# dhcp_lease_duration = 120
+
# Enable or disable bulk create/update/delete operations
# allow_bulk = True
# RPC configuration options. Defined in rpc __init__
cfg.IntOpt('max_dns_nameservers', default=5),
cfg.IntOpt('max_subnet_host_routes', default=20),
cfg.StrOpt('state_path', default='.'),
+ cfg.IntOpt('dhcp_lease_duration', default=120),
]
# Register the configuration options
# See the License for the specific language governing permissions and
# limitations under the License.
+import datetime
import logging
import random
from quantum.db import api as db
from quantum.db import models_v2
from quantum.openstack.common import cfg
+from quantum.openstack.common import timeutils
from quantum import quantum_plugin_base_v2
QuantumDbPluginV2._delete_ip_allocation(context, network_id, subnet_id,
port_id, ip_address)
+ @staticmethod
+ def _default_allocation_expiration():
+ return (timeutils.utcnow() +
+ datetime.timedelta(seconds=cfg.CONF.dhcp_lease_duration))
+
@staticmethod
def _delete_ip_allocation(context, network_id, subnet_id, port_id,
ip_address):
for ip in ips:
allocated = models_v2.IPAllocation(
network_id=port['network_id'], port_id=port.id,
- ip_address=ip['ip_address'], subnet_id=ip['subnet_id'])
+ ip_address=ip['ip_address'], subnet_id=ip['subnet_id'],
+ expiration=self._default_allocation_expiration())
context.session.add(allocated)
port.update(p)
network_id = sa.Column(sa.String(36), sa.ForeignKey("networks.id",
ondelete="CASCADE"),
nullable=False, primary_key=True)
+ expiration = sa.Column(sa.DateTime, nullable=True)
class Port(model_base.BASEV2, HasId, HasTenant):
import contextlib
import copy
+import datetime
import logging
import mock
import os
from quantum.db import db_base_plugin_v2
from quantum.manager import QuantumManager
from quantum.openstack.common import cfg
+from quantum.openstack.common import timeutils
from quantum.tests.unit import test_extensions
from quantum.tests.unit.testlib_api import create_request
from quantum.wsgi import Serializer, JSONDeserializer
req = self.new_delete_request('subnets', subnet['subnet']['id'])
res = req.get_response(self.api)
self.assertEquals(res.status_int, 204)
+
+ def test_default_allocation_expiration(self):
+ reference = datetime.datetime(2012, 8, 13, 23, 11, 0)
+ timeutils.utcnow.override_time = reference
+
+ cfg.CONF.set_override('dhcp_lease_duration', 120)
+ expires = QuantumManager.get_plugin()._default_allocation_expiration()
+ timeutils.utcnow
+ cfg.CONF.reset()
+ timeutils.utcnow.override_time = None
+ self.assertEqual(expires, reference + datetime.timedelta(seconds=120))