]> review.fuel-infra Code Review - packages/trusty/mysql-wsrep-5.6.git/blob
1e49303caca02abb71bc6927ccd1b4bda68ad87b
[packages/trusty/mysql-wsrep-5.6.git] /
1 /*
2    Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
3
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
16 */
17
18 package com.mysql.clusterj.core;
19
20 import com.mysql.clusterj.ClusterJException;
21 import com.mysql.clusterj.ClusterJFatalException;
22 import com.mysql.clusterj.ClusterJFatalInternalException;
23 import com.mysql.clusterj.ClusterJFatalUserException;
24 import com.mysql.clusterj.ClusterJHelper;
25 import com.mysql.clusterj.ClusterJUserException;
26 import com.mysql.clusterj.Constants;
27 import com.mysql.clusterj.Session;
28 import com.mysql.clusterj.SessionFactory;
29
30 import com.mysql.clusterj.core.spi.DomainTypeHandler;
31 import com.mysql.clusterj.core.spi.DomainTypeHandlerFactory;
32 import com.mysql.clusterj.core.metadata.DomainTypeHandlerFactoryImpl;
33
34 import com.mysql.clusterj.core.store.Db;
35 import com.mysql.clusterj.core.store.ClusterConnection;
36 import com.mysql.clusterj.core.store.ClusterConnectionService;
37 import com.mysql.clusterj.core.store.Dictionary;
38 import com.mysql.clusterj.core.store.Table;
39
40 import com.mysql.clusterj.core.util.I18NHelper;
41 import com.mysql.clusterj.core.util.Logger;
42 import com.mysql.clusterj.core.util.LoggerFactoryService;
43
44 import java.util.ArrayList;
45 import java.util.HashMap;
46 import java.util.List;
47 import java.util.Map;
48
49 public class SessionFactoryImpl implements SessionFactory, Constants {
50
51     /** My message translator */
52     static final I18NHelper local = I18NHelper.getInstance(SessionFactoryImpl.class);
53
54     /** My logger */
55     static final Logger logger = LoggerFactoryService.getFactory().getInstance(SessionFactoryImpl.class);
56
57     /** The properties */
58     protected Map<?, ?> props;
59
60     /** NdbCluster connect properties */
61     String CLUSTER_CONNECTION_SERVICE;
62     String CLUSTER_CONNECT_STRING;
63     int CLUSTER_CONNECT_RETRIES;
64     int CLUSTER_CONNECT_DELAY;
65     int CLUSTER_CONNECT_VERBOSE;
66     int CLUSTER_CONNECT_TIMEOUT_BEFORE;
67     int CLUSTER_CONNECT_TIMEOUT_AFTER;
68     String CLUSTER_DATABASE;
69     int CLUSTER_MAX_TRANSACTIONS;
70
71     /** Node ids obtained from the property PROPERTY_CONNECTION_POOL_NODEIDS */
72     List<Integer> nodeIds = new ArrayList<Integer>();
73
74     /** Connection pool size obtained from the property PROPERTY_CONNECTION_POOL_SIZE */
75     int connectionPoolSize;
76
77     /** Map of Proxy to Class */
78     // TODO make this non-static
79     static private Map<Class<?>, Class<?>> proxyClassToDomainClass = new HashMap<Class<?>, Class<?>>();
80
81     /** Map of Domain Class to DomainTypeHandler. */
82     // TODO make this non-static
83     static final protected Map<Class<?>, DomainTypeHandler<?>> typeToHandlerMap =
84             new HashMap<Class<?>, DomainTypeHandler<?>>();
85
86     /** DomainTypeHandlerFactory for this session factory. */
87     DomainTypeHandlerFactory domainTypeHandlerFactory = new DomainTypeHandlerFactoryImpl();
88
89     /** The tables. */
90     // TODO make this non-static
91 //    static final protected Map<String,Table> Tables = new HashMap<String,Table>();
92
93     /** The session factories. */
94     static final protected Map<String, SessionFactoryImpl> sessionFactoryMap =
95             new HashMap<String, SessionFactoryImpl>();
96
97     /** The key for this factory */
98     final String key;
99
100     /** Cluster connections that together can be used to manage sessions */
101     private List<ClusterConnection> pooledConnections = new ArrayList<ClusterConnection>();
102
103     /** Get a cluster connection service.
104      * @return the cluster connection service
105      */
106     protected ClusterConnectionService getClusterConnectionService() {
107         return ClusterJHelper.getServiceInstance(ClusterConnectionService.class,
108                     CLUSTER_CONNECTION_SERVICE);
109     }
110
111     /** Get a session factory. If using connection pooling and there is already a session factory
112      * with the same connect string and database, return it, regardless of whether other
113      * properties of the factory are the same as specified in the Map.
114      * If not using connection pooling (maximum sessions per connection == 0), create a new session factory.
115      * @param props properties of the session factory
116      * @return the session factory
117      */
118     static public SessionFactoryImpl getSessionFactory(Map<?, ?> props) {
119         int connectionPoolSize = getIntProperty(props, 
120                 PROPERTY_CONNECTION_POOL_SIZE, DEFAULT_PROPERTY_CONNECTION_POOL_SIZE);
121         String sessionFactoryKey = getSessionFactoryKey(props);
122         SessionFactoryImpl result = null;
123         if (connectionPoolSize != 0) {
124             // if using connection pooling, see if already a session factory created
125             synchronized(sessionFactoryMap) {
126                 result = sessionFactoryMap.get(sessionFactoryKey);
127                 if (result == null) {
128                     result = new SessionFactoryImpl(props);
129                     sessionFactoryMap.put(sessionFactoryKey, result);
130                 }
131             }
132         } else {
133             // if not using connection pooling, create a new session factory
134             result = new SessionFactoryImpl(props);
135         }
136         return result;
137     }
138
139     private static String getSessionFactoryKey(Map<?, ?> props) {
140         String clusterConnectString = 
141             getRequiredStringProperty(props, PROPERTY_CLUSTER_CONNECTSTRING);
142         String clusterDatabase = getStringProperty(props, PROPERTY_CLUSTER_DATABASE,
143                 Constants.DEFAULT_PROPERTY_CLUSTER_DATABASE);
144         return clusterConnectString + "+" + clusterDatabase;
145     }
146
147     /** Create a new SessionFactoryImpl from the properties in the Map, and
148      * connect to the ndb cluster.
149      *
150      * @param props the properties for the factory
151      */
152     protected SessionFactoryImpl(Map<?, ?> props) {
153         this.props = props;
154         this.key = getSessionFactoryKey(props);
155         this.connectionPoolSize = getIntProperty(props, 
156                 PROPERTY_CONNECTION_POOL_SIZE, DEFAULT_PROPERTY_CONNECTION_POOL_SIZE);
157         CLUSTER_CONNECT_STRING = getRequiredStringProperty(props, PROPERTY_CLUSTER_CONNECTSTRING);
158         CLUSTER_CONNECT_RETRIES = getIntProperty(props, PROPERTY_CLUSTER_CONNECT_RETRIES,
159                 Constants.DEFAULT_PROPERTY_CLUSTER_CONNECT_RETRIES);
160         CLUSTER_CONNECT_DELAY = getIntProperty(props, PROPERTY_CLUSTER_CONNECT_DELAY,
161                 Constants.DEFAULT_PROPERTY_CLUSTER_CONNECT_DELAY);
162         CLUSTER_CONNECT_VERBOSE = getIntProperty(props, PROPERTY_CLUSTER_CONNECT_VERBOSE,
163                 Constants.DEFAULT_PROPERTY_CLUSTER_CONNECT_VERBOSE);
164         CLUSTER_CONNECT_TIMEOUT_BEFORE = getIntProperty(props, PROPERTY_CLUSTER_CONNECT_TIMEOUT_BEFORE,
165                 Constants.DEFAULT_PROPERTY_CLUSTER_CONNECT_TIMEOUT_BEFORE);
166         CLUSTER_CONNECT_TIMEOUT_AFTER = getIntProperty(props, PROPERTY_CLUSTER_CONNECT_TIMEOUT_AFTER,
167                 Constants.DEFAULT_PROPERTY_CLUSTER_CONNECT_TIMEOUT_AFTER);
168         CLUSTER_DATABASE = getStringProperty(props, PROPERTY_CLUSTER_DATABASE,
169                 Constants.DEFAULT_PROPERTY_CLUSTER_DATABASE);
170         CLUSTER_MAX_TRANSACTIONS = getIntProperty(props, PROPERTY_CLUSTER_MAX_TRANSACTIONS,
171                 Constants.DEFAULT_PROPERTY_CLUSTER_MAX_TRANSACTIONS);
172         CLUSTER_CONNECTION_SERVICE = getStringProperty(props, PROPERTY_CLUSTER_CONNECTION_SERVICE);
173         createClusterConnectionPool();
174         // now get a Session and complete a transaction to make sure that the cluster is ready
175         try {
176             Session session = getSession(null);
177             session.currentTransaction().begin();
178             session.currentTransaction().commit();
179             session.close();
180         } catch (Exception e) {
181             if (e instanceof ClusterJException) {
182                 logger.warn(local.message("ERR_Session_Factory_Impl_Failed_To_Complete_Transaction"));
183                 throw (ClusterJException)e;
184             }
185         }
186     }
187
188     protected void createClusterConnectionPool() {
189         String nodeIdsProperty = getStringProperty(props, PROPERTY_CONNECTION_POOL_NODEIDS);
190         if (nodeIdsProperty != null) {
191             // separators are any combination of white space, commas, and semicolons
192             String[] nodeIdsStringArray = nodeIdsProperty.split("[,; \t\n\r]+", 48);
193             for (String nodeIdString : nodeIdsStringArray) {
194                 try {
195                     int nodeId = Integer.parseInt(nodeIdString);
196                     nodeIds.add(nodeId);
197                 } catch (NumberFormatException ex) {
198                     throw new ClusterJFatalUserException(local.message("ERR_Node_Ids_Format", nodeIdsProperty), ex);
199                 }
200             }
201             // validate the size of the node ids with the connection pool size
202             if (connectionPoolSize != DEFAULT_PROPERTY_CONNECTION_POOL_SIZE) {
203                 // both are specified; they must match or nodeIds size must be 1
204                 if (nodeIds.size() ==1) {
205                     // add new nodeIds to fill out array
206                     for (int i = 1; i < connectionPoolSize; ++i) {
207                         nodeIds.add(nodeIds.get(i - 1) + 1);
208                     }
209                 }
210                 if (connectionPoolSize != nodeIds.size()) {
211                     throw new ClusterJFatalUserException(
212                             local.message("ERR_Node_Ids_Must_Match_Connection_Pool_Size",
213                                     nodeIdsProperty, connectionPoolSize));
214                     
215                 }
216             } else {
217                 // only node ids are specified; make pool size match number of node ids
218                 connectionPoolSize = nodeIds.size();
219             }
220         }
221         ClusterConnectionService service = getClusterConnectionService();
222         if (nodeIds.size() == 0) {
223             // node ids were not specified
224             for (int i = 0; i < connectionPoolSize; ++i) {
225                 createClusterConnection(service, props, 0);
226             }
227         } else {
228             for (int i = 0; i < connectionPoolSize; ++i) {
229                 createClusterConnection(service, props, nodeIds.get(i));
230             }
231         }
232     }
233
234     protected ClusterConnection createClusterConnection(
235             ClusterConnectionService service, Map<?, ?> props, int nodeId) {
236         ClusterConnection result = null;
237         try {
238             result = service.create(CLUSTER_CONNECT_STRING, nodeId);
239             result.connect(CLUSTER_CONNECT_RETRIES, CLUSTER_CONNECT_DELAY,true);
240             result.waitUntilReady(CLUSTER_CONNECT_TIMEOUT_BEFORE,CLUSTER_CONNECT_TIMEOUT_AFTER);
241         } catch (Exception ex) {
242             // need to clean up if some connections succeeded
243             for (ClusterConnection connection: pooledConnections) {
244                 connection.close();
245             }
246             pooledConnections.clear();
247             throw new ClusterJFatalUserException(
248                     local.message("ERR_Connecting", props), ex);
249         }
250         this.pooledConnections.add(result);
251         return result;
252     }
253
254     /** Get a session to use with the cluster.
255      *
256      * @return the session
257      */
258     public Session getSession() {
259         return getSession(null);
260     }
261
262     /** Get a session to use with the cluster, overriding some properties.
263      * Properties PROPERTY_CLUSTER_CONNECTSTRING, PROPERTY_CLUSTER_DATABASE,
264      * and PROPERTY_CLUSTER_MAX_TRANSACTIONS may not be overridden.
265      * @param properties overriding some properties for this session
266      * @return the session
267      */
268     public Session getSession(Map properties) {
269         ClusterConnection clusterConnection = getClusterConnectionFromPool();
270         try {
271             Db db = null;
272             synchronized(this) {
273                 checkConnection(clusterConnection);
274                 db = clusterConnection.createDb(CLUSTER_DATABASE, CLUSTER_MAX_TRANSACTIONS);
275             }
276             Dictionary dictionary = db.getDictionary();
277             return new SessionImpl(this, properties, db, dictionary);
278         } catch (ClusterJException ex) {
279             throw ex;
280         } catch (Exception ex) {
281             throw new ClusterJFatalException(
282                     local.message("ERR_Create_Ndb"), ex);
283         }
284     }
285
286     private ClusterConnection getClusterConnectionFromPool() {
287         if (connectionPoolSize <= 1) {
288             return pooledConnections.get(0);
289         }
290         // find the best pooled connection (the connection with the least active sessions)
291         // this is not perfect without synchronization since a connection might close sessions
292         // after getting the dbCount but we don't care about perfection here. 
293         ClusterConnection result = null;
294         int bestCount = Integer.MAX_VALUE;
295         for (ClusterConnection pooledConnection: pooledConnections ) {
296             int count = pooledConnection.dbCount();
297             if (count < bestCount) {
298                 bestCount = count;
299                 result = pooledConnection;
300             }
301         }
302         return result;
303     }
304
305     private void checkConnection(ClusterConnection clusterConnection) {
306         if (clusterConnection == null) {
307             throw new ClusterJUserException(local.message("ERR_Session_Factory_Closed"));
308         }
309     }
310
311     /** Get the DomainTypeHandler for a class. If the handler is not already
312      * available, null is returned. 
313      * @param cls the Class for which to get domain type handler
314      * @return the DomainTypeHandler or null if not available
315      */
316     public static <T> DomainTypeHandler<T> getDomainTypeHandler(Class<T> cls) {
317         // synchronize here because the map is not synchronized
318         synchronized(typeToHandlerMap) {
319             @SuppressWarnings( "unchecked" )
320             DomainTypeHandler<T> domainTypeHandler = (DomainTypeHandler<T>) typeToHandlerMap.get(cls);
321             return domainTypeHandler;
322         }
323     }
324
325     /** Create or get the DomainTypeHandler for a class.
326      * Use the dictionary to validate against schema.
327      * @param cls the Class for which to get domain type handler
328      * @param dictionary the dictionary to validate against
329      * @return the type handler
330      */
331     
332     public <T> DomainTypeHandler<T> getDomainTypeHandler(Class<T> cls,
333             Dictionary dictionary) {
334         // synchronize here because the map is not synchronized
335         synchronized(typeToHandlerMap) {
336             @SuppressWarnings("unchecked")
337             DomainTypeHandler<T> domainTypeHandler = (DomainTypeHandler<T>) typeToHandlerMap.get(cls);
338             if (logger.isDetailEnabled()) logger.detail("DomainTypeToHandler for "
339                     + cls.getName() + "(" + cls
340                     + ") returned " + domainTypeHandler);
341             if (domainTypeHandler == null) {
342                 domainTypeHandler = domainTypeHandlerFactory.createDomainTypeHandler(cls,
343                         dictionary);
344                 if (logger.isDetailEnabled()) logger.detail("createDomainTypeHandler for "
345                         + cls.getName() + "(" + cls
346                         + ") returned " + domainTypeHandler);
347                 typeToHandlerMap.put(cls, domainTypeHandler);
348                 Class<?> proxyClass = domainTypeHandler.getProxyClass();
349                 if (proxyClass != null) {
350                     proxyClassToDomainClass.put(proxyClass, cls);
351                 }
352             }
353             return domainTypeHandler;
354         }
355     }
356
357     /** Create or get the DomainTypeHandler for an instance.
358      * Use the dictionary to validate against schema.
359      * @param object the object
360      * @param dictionary the dictionary for metadata access
361      * @return the DomainTypeHandler for the object
362      */
363     public <T> DomainTypeHandler<T> getDomainTypeHandler(T object, Dictionary dictionary) {
364         Class<T> cls = getClassForProxy(object);
365         DomainTypeHandler<T> result = getDomainTypeHandler(cls);
366         if (result != null) {
367             return result;
368         } else {
369             return getDomainTypeHandler(cls, dictionary);
370         }
371     }
372
373     @SuppressWarnings("unchecked")
374     protected static <T> Class<T> getClassForProxy(T object) {
375         Class cls = object.getClass();
376         if (cls.getName().startsWith("$Proxy")) {
377             cls = proxyClassToDomainClass.get(cls);
378         }
379         return cls;        
380     }
381
382     public <T> T newInstance(Class<T> cls, Dictionary dictionary) {
383         DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls, dictionary);
384         return domainTypeHandler.newInstance();
385     }
386
387     public Table getTable(String tableName, Dictionary dictionary) {
388         Table result;
389         try {
390             result = dictionary.getTable(tableName);
391         } catch(Exception ex) {
392             throw new ClusterJFatalInternalException(
393                         local.message("ERR_Get_Table"), ex);
394         }
395         return result;
396     }
397
398     /** Get the property from the properties map as a String.
399      * @param props the properties
400      * @param propertyName the name of the property
401      * @return the value from the properties (may be null)
402      */
403     protected static String getStringProperty(Map<?, ?> props, String propertyName) {
404         return (String)props.get(propertyName);
405     }
406
407     /** Get the property from the properties map as a String. If the user has not
408      * provided a value in the props, use the supplied default value.
409      * @param props the properties
410      * @param propertyName the name of the property
411      * @param defaultValue the value to return if there is no property by that name
412      * @return the value from the properties or the default value
413      */
414     protected static String getStringProperty(Map<?, ?> props, String propertyName, String defaultValue) {
415         String result = (String)props.get(propertyName);
416         if (result == null) {
417             result = defaultValue;
418         }
419         return result;
420     }
421
422     /** Get the property from the properties map as a String. If the user has not
423      * provided a value in the props, throw an exception.
424      * @param props the properties
425      * @param propertyName the name of the property
426      * @return the value from the properties (may not be null)
427      */
428     protected static String getRequiredStringProperty(Map<?, ?> props, String propertyName) {
429         String result = (String)props.get(propertyName);
430         if (result == null) {
431                 throw new ClusterJFatalUserException(
432                         local.message("ERR_NullProperty", propertyName));                            
433         }
434         return result;
435     }
436
437     /** Get the property from the properties map as an int. If the user has not
438      * provided a value in the props, use the supplied default value.
439      * @param props the properties
440      * @param propertyName the name of the property
441      * @param defaultValue the value to return if there is no property by that name
442      * @return the value from the properties or the default value
443      */
444     protected static int getIntProperty(Map<?, ?> props, String propertyName, int defaultValue) {
445         Object property = props.get(propertyName);
446         if (property == null) {
447             return defaultValue;
448         }
449         if (Number.class.isAssignableFrom(property.getClass())) {
450             return ((Number)property).intValue();
451         }
452         if (property instanceof String) {
453             try {
454                 int result = Integer.parseInt((String)property);
455                 return result;
456             } catch (NumberFormatException ex) {
457                 throw new ClusterJFatalUserException(
458                         local.message("ERR_NumericFormat", propertyName, property));
459             }
460         }
461         throw new ClusterJUserException(local.message("ERR_NumericFormat", propertyName, property));
462     }
463
464     public synchronized void close() {
465         // we have to close all of the cluster connections
466         for (ClusterConnection clusterConnection: pooledConnections) {
467             clusterConnection.close();
468         }
469         pooledConnections.clear();
470         synchronized(sessionFactoryMap) {
471             // now remove this from the map
472             sessionFactoryMap.remove(key);
473         }
474     }
475
476     public void setDomainTypeHandlerFactory(DomainTypeHandlerFactory domainTypeHandlerFactory) {
477         this.domainTypeHandlerFactory = domainTypeHandlerFactory;
478     }
479
480     public DomainTypeHandlerFactory getDomainTypeHandlerFactory() {
481         return domainTypeHandlerFactory;
482     }
483
484     public List<Integer> getConnectionPoolSessionCounts() {
485         List<Integer> result = new ArrayList<Integer>();
486         for (ClusterConnection connection: pooledConnections) {
487             result.add(connection.dbCount());
488         }
489         return result;
490     }
491
492 }