Set lock_path correctly.
[openstack-build/neutron-build.git] / neutron / agent / ovsdb / native / connection.py
1 # Copyright (c) 2015 Red Hat, Inc.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 import os
16 from six.moves import queue as Queue
17 import threading
18 import traceback
19
20 from ovs.db import idl
21 from ovs import poller
22 import retrying
23
24 from neutron.agent.ovsdb.native import helpers
25 from neutron.agent.ovsdb.native import idlutils
26
27
28 class TransactionQueue(Queue.Queue, object):
29     def __init__(self, *args, **kwargs):
30         super(TransactionQueue, self).__init__(*args, **kwargs)
31         alertpipe = os.pipe()
32         self.alertin = os.fdopen(alertpipe[0], 'r', 0)
33         self.alertout = os.fdopen(alertpipe[1], 'w', 0)
34
35     def get_nowait(self, *args, **kwargs):
36         try:
37             result = super(TransactionQueue, self).get_nowait(*args, **kwargs)
38         except Queue.Empty:
39             return None
40         self.alertin.read(1)
41         return result
42
43     def put(self, *args, **kwargs):
44         super(TransactionQueue, self).put(*args, **kwargs)
45         self.alertout.write('X')
46         self.alertout.flush()
47
48     @property
49     def alert_fileno(self):
50         return self.alertin.fileno()
51
52
53 class Connection(object):
54     def __init__(self, connection, timeout, schema_name):
55         self.idl = None
56         self.connection = connection
57         self.timeout = timeout
58         self.txns = TransactionQueue(1)
59         self.lock = threading.Lock()
60         self.schema_name = schema_name
61
62     def start(self):
63         with self.lock:
64             if self.idl is not None:
65                 return
66
67             try:
68                 helper = idlutils.get_schema_helper(self.connection,
69                                                     self.schema_name)
70             except Exception:
71                 # We may have failed do to set-manager not being called
72                 helpers.enable_connection_uri(self.connection)
73
74                 # There is a small window for a race, so retry up to a second
75                 @retrying.retry(wait_exponential_multiplier=10,
76                                 stop_max_delay=1000)
77                 def do_get_schema_helper():
78                     return idlutils.get_schema_helper(self.connection,
79                                                       self.schema_name)
80                 helper = do_get_schema_helper()
81
82             helper.register_all()
83             self.idl = idl.Idl(self.connection, helper)
84             idlutils.wait_for_change(self.idl, self.timeout)
85             self.poller = poller.Poller()
86             self.thread = threading.Thread(target=self.run)
87             self.thread.setDaemon(True)
88             self.thread.start()
89
90     def run(self):
91         while True:
92             self.idl.wait(self.poller)
93             self.poller.fd_wait(self.txns.alert_fileno, poller.POLLIN)
94             self.poller.block()
95             self.idl.run()
96             txn = self.txns.get_nowait()
97             if txn is not None:
98                 try:
99                     txn.results.put(txn.do_commit())
100                 except Exception as ex:
101                     er = idlutils.ExceptionResult(ex=ex,
102                                                   tb=traceback.format_exc())
103                     txn.results.put(er)
104                 self.txns.task_done()
105
106     def queue_txn(self, txn):
107         self.txns.put(txn)