2 Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 package com.mysql.clusterj.core;
20 import com.mysql.clusterj.ClusterJException;
21 import com.mysql.clusterj.ClusterJFatalException;
22 import com.mysql.clusterj.ClusterJFatalInternalException;
23 import com.mysql.clusterj.ClusterJFatalUserException;
24 import com.mysql.clusterj.ClusterJHelper;
25 import com.mysql.clusterj.ClusterJUserException;
26 import com.mysql.clusterj.Constants;
27 import com.mysql.clusterj.Session;
28 import com.mysql.clusterj.SessionFactory;
30 import com.mysql.clusterj.core.spi.DomainTypeHandler;
31 import com.mysql.clusterj.core.spi.DomainTypeHandlerFactory;
32 import com.mysql.clusterj.core.metadata.DomainTypeHandlerFactoryImpl;
34 import com.mysql.clusterj.core.store.Db;
35 import com.mysql.clusterj.core.store.ClusterConnection;
36 import com.mysql.clusterj.core.store.ClusterConnectionService;
37 import com.mysql.clusterj.core.store.Dictionary;
38 import com.mysql.clusterj.core.store.Table;
40 import com.mysql.clusterj.core.util.I18NHelper;
41 import com.mysql.clusterj.core.util.Logger;
42 import com.mysql.clusterj.core.util.LoggerFactoryService;
44 import java.util.ArrayList;
45 import java.util.HashMap;
46 import java.util.List;
49 public class SessionFactoryImpl implements SessionFactory, Constants {
51 /** My message translator */
52 static final I18NHelper local = I18NHelper.getInstance(SessionFactoryImpl.class);
55 static final Logger logger = LoggerFactoryService.getFactory().getInstance(SessionFactoryImpl.class);
58 protected Map<?, ?> props;
60 /** NdbCluster connect properties */
61 String CLUSTER_CONNECTION_SERVICE;
62 String CLUSTER_CONNECT_STRING;
63 int CLUSTER_CONNECT_RETRIES;
64 int CLUSTER_CONNECT_DELAY;
65 int CLUSTER_CONNECT_VERBOSE;
66 int CLUSTER_CONNECT_TIMEOUT_BEFORE;
67 int CLUSTER_CONNECT_TIMEOUT_AFTER;
68 String CLUSTER_DATABASE;
69 int CLUSTER_MAX_TRANSACTIONS;
71 /** Node ids obtained from the property PROPERTY_CONNECTION_POOL_NODEIDS */
72 List<Integer> nodeIds = new ArrayList<Integer>();
74 /** Connection pool size obtained from the property PROPERTY_CONNECTION_POOL_SIZE */
75 int connectionPoolSize;
77 /** Map of Proxy to Class */
78 // TODO make this non-static
79 static private Map<Class<?>, Class<?>> proxyClassToDomainClass = new HashMap<Class<?>, Class<?>>();
81 /** Map of Domain Class to DomainTypeHandler. */
82 // TODO make this non-static
83 static final protected Map<Class<?>, DomainTypeHandler<?>> typeToHandlerMap =
84 new HashMap<Class<?>, DomainTypeHandler<?>>();
86 /** DomainTypeHandlerFactory for this session factory. */
87 DomainTypeHandlerFactory domainTypeHandlerFactory = new DomainTypeHandlerFactoryImpl();
90 // TODO make this non-static
91 // static final protected Map<String,Table> Tables = new HashMap<String,Table>();
93 /** The session factories. */
94 static final protected Map<String, SessionFactoryImpl> sessionFactoryMap =
95 new HashMap<String, SessionFactoryImpl>();
97 /** The key for this factory */
100 /** Cluster connections that together can be used to manage sessions */
101 private List<ClusterConnection> pooledConnections = new ArrayList<ClusterConnection>();
103 /** Get a cluster connection service.
104 * @return the cluster connection service
106 protected ClusterConnectionService getClusterConnectionService() {
107 return ClusterJHelper.getServiceInstance(ClusterConnectionService.class,
108 CLUSTER_CONNECTION_SERVICE);
111 /** Get a session factory. If using connection pooling and there is already a session factory
112 * with the same connect string and database, return it, regardless of whether other
113 * properties of the factory are the same as specified in the Map.
114 * If not using connection pooling (maximum sessions per connection == 0), create a new session factory.
115 * @param props properties of the session factory
116 * @return the session factory
118 static public SessionFactoryImpl getSessionFactory(Map<?, ?> props) {
119 int connectionPoolSize = getIntProperty(props,
120 PROPERTY_CONNECTION_POOL_SIZE, DEFAULT_PROPERTY_CONNECTION_POOL_SIZE);
121 String sessionFactoryKey = getSessionFactoryKey(props);
122 SessionFactoryImpl result = null;
123 if (connectionPoolSize != 0) {
124 // if using connection pooling, see if already a session factory created
125 synchronized(sessionFactoryMap) {
126 result = sessionFactoryMap.get(sessionFactoryKey);
127 if (result == null) {
128 result = new SessionFactoryImpl(props);
129 sessionFactoryMap.put(sessionFactoryKey, result);
133 // if not using connection pooling, create a new session factory
134 result = new SessionFactoryImpl(props);
139 private static String getSessionFactoryKey(Map<?, ?> props) {
140 String clusterConnectString =
141 getRequiredStringProperty(props, PROPERTY_CLUSTER_CONNECTSTRING);
142 String clusterDatabase = getStringProperty(props, PROPERTY_CLUSTER_DATABASE,
143 Constants.DEFAULT_PROPERTY_CLUSTER_DATABASE);
144 return clusterConnectString + "+" + clusterDatabase;
147 /** Create a new SessionFactoryImpl from the properties in the Map, and
148 * connect to the ndb cluster.
150 * @param props the properties for the factory
152 protected SessionFactoryImpl(Map<?, ?> props) {
154 this.key = getSessionFactoryKey(props);
155 this.connectionPoolSize = getIntProperty(props,
156 PROPERTY_CONNECTION_POOL_SIZE, DEFAULT_PROPERTY_CONNECTION_POOL_SIZE);
157 CLUSTER_CONNECT_STRING = getRequiredStringProperty(props, PROPERTY_CLUSTER_CONNECTSTRING);
158 CLUSTER_CONNECT_RETRIES = getIntProperty(props, PROPERTY_CLUSTER_CONNECT_RETRIES,
159 Constants.DEFAULT_PROPERTY_CLUSTER_CONNECT_RETRIES);
160 CLUSTER_CONNECT_DELAY = getIntProperty(props, PROPERTY_CLUSTER_CONNECT_DELAY,
161 Constants.DEFAULT_PROPERTY_CLUSTER_CONNECT_DELAY);
162 CLUSTER_CONNECT_VERBOSE = getIntProperty(props, PROPERTY_CLUSTER_CONNECT_VERBOSE,
163 Constants.DEFAULT_PROPERTY_CLUSTER_CONNECT_VERBOSE);
164 CLUSTER_CONNECT_TIMEOUT_BEFORE = getIntProperty(props, PROPERTY_CLUSTER_CONNECT_TIMEOUT_BEFORE,
165 Constants.DEFAULT_PROPERTY_CLUSTER_CONNECT_TIMEOUT_BEFORE);
166 CLUSTER_CONNECT_TIMEOUT_AFTER = getIntProperty(props, PROPERTY_CLUSTER_CONNECT_TIMEOUT_AFTER,
167 Constants.DEFAULT_PROPERTY_CLUSTER_CONNECT_TIMEOUT_AFTER);
168 CLUSTER_DATABASE = getStringProperty(props, PROPERTY_CLUSTER_DATABASE,
169 Constants.DEFAULT_PROPERTY_CLUSTER_DATABASE);
170 CLUSTER_MAX_TRANSACTIONS = getIntProperty(props, PROPERTY_CLUSTER_MAX_TRANSACTIONS,
171 Constants.DEFAULT_PROPERTY_CLUSTER_MAX_TRANSACTIONS);
172 CLUSTER_CONNECTION_SERVICE = getStringProperty(props, PROPERTY_CLUSTER_CONNECTION_SERVICE);
173 createClusterConnectionPool();
174 // now get a Session and complete a transaction to make sure that the cluster is ready
176 Session session = getSession(null);
177 session.currentTransaction().begin();
178 session.currentTransaction().commit();
180 } catch (Exception e) {
181 if (e instanceof ClusterJException) {
182 logger.warn(local.message("ERR_Session_Factory_Impl_Failed_To_Complete_Transaction"));
183 throw (ClusterJException)e;
188 protected void createClusterConnectionPool() {
189 String nodeIdsProperty = getStringProperty(props, PROPERTY_CONNECTION_POOL_NODEIDS);
190 if (nodeIdsProperty != null) {
191 // separators are any combination of white space, commas, and semicolons
192 String[] nodeIdsStringArray = nodeIdsProperty.split("[,; \t\n\r]+", 48);
193 for (String nodeIdString : nodeIdsStringArray) {
195 int nodeId = Integer.parseInt(nodeIdString);
197 } catch (NumberFormatException ex) {
198 throw new ClusterJFatalUserException(local.message("ERR_Node_Ids_Format", nodeIdsProperty), ex);
201 // validate the size of the node ids with the connection pool size
202 if (connectionPoolSize != DEFAULT_PROPERTY_CONNECTION_POOL_SIZE) {
203 // both are specified; they must match or nodeIds size must be 1
204 if (nodeIds.size() ==1) {
205 // add new nodeIds to fill out array
206 for (int i = 1; i < connectionPoolSize; ++i) {
207 nodeIds.add(nodeIds.get(i - 1) + 1);
210 if (connectionPoolSize != nodeIds.size()) {
211 throw new ClusterJFatalUserException(
212 local.message("ERR_Node_Ids_Must_Match_Connection_Pool_Size",
213 nodeIdsProperty, connectionPoolSize));
217 // only node ids are specified; make pool size match number of node ids
218 connectionPoolSize = nodeIds.size();
221 ClusterConnectionService service = getClusterConnectionService();
222 if (nodeIds.size() == 0) {
223 // node ids were not specified
224 for (int i = 0; i < connectionPoolSize; ++i) {
225 createClusterConnection(service, props, 0);
228 for (int i = 0; i < connectionPoolSize; ++i) {
229 createClusterConnection(service, props, nodeIds.get(i));
234 protected ClusterConnection createClusterConnection(
235 ClusterConnectionService service, Map<?, ?> props, int nodeId) {
236 ClusterConnection result = null;
238 result = service.create(CLUSTER_CONNECT_STRING, nodeId);
239 result.connect(CLUSTER_CONNECT_RETRIES, CLUSTER_CONNECT_DELAY,true);
240 result.waitUntilReady(CLUSTER_CONNECT_TIMEOUT_BEFORE,CLUSTER_CONNECT_TIMEOUT_AFTER);
241 } catch (Exception ex) {
242 // need to clean up if some connections succeeded
243 for (ClusterConnection connection: pooledConnections) {
246 pooledConnections.clear();
247 throw new ClusterJFatalUserException(
248 local.message("ERR_Connecting", props), ex);
250 this.pooledConnections.add(result);
254 /** Get a session to use with the cluster.
256 * @return the session
258 public Session getSession() {
259 return getSession(null);
262 /** Get a session to use with the cluster, overriding some properties.
263 * Properties PROPERTY_CLUSTER_CONNECTSTRING, PROPERTY_CLUSTER_DATABASE,
264 * and PROPERTY_CLUSTER_MAX_TRANSACTIONS may not be overridden.
265 * @param properties overriding some properties for this session
266 * @return the session
268 public Session getSession(Map properties) {
269 ClusterConnection clusterConnection = getClusterConnectionFromPool();
273 checkConnection(clusterConnection);
274 db = clusterConnection.createDb(CLUSTER_DATABASE, CLUSTER_MAX_TRANSACTIONS);
276 Dictionary dictionary = db.getDictionary();
277 return new SessionImpl(this, properties, db, dictionary);
278 } catch (ClusterJException ex) {
280 } catch (Exception ex) {
281 throw new ClusterJFatalException(
282 local.message("ERR_Create_Ndb"), ex);
286 private ClusterConnection getClusterConnectionFromPool() {
287 if (connectionPoolSize <= 1) {
288 return pooledConnections.get(0);
290 // find the best pooled connection (the connection with the least active sessions)
291 // this is not perfect without synchronization since a connection might close sessions
292 // after getting the dbCount but we don't care about perfection here.
293 ClusterConnection result = null;
294 int bestCount = Integer.MAX_VALUE;
295 for (ClusterConnection pooledConnection: pooledConnections ) {
296 int count = pooledConnection.dbCount();
297 if (count < bestCount) {
299 result = pooledConnection;
305 private void checkConnection(ClusterConnection clusterConnection) {
306 if (clusterConnection == null) {
307 throw new ClusterJUserException(local.message("ERR_Session_Factory_Closed"));
311 /** Get the DomainTypeHandler for a class. If the handler is not already
312 * available, null is returned.
313 * @param cls the Class for which to get domain type handler
314 * @return the DomainTypeHandler or null if not available
316 public static <T> DomainTypeHandler<T> getDomainTypeHandler(Class<T> cls) {
317 // synchronize here because the map is not synchronized
318 synchronized(typeToHandlerMap) {
319 @SuppressWarnings( "unchecked" )
320 DomainTypeHandler<T> domainTypeHandler = (DomainTypeHandler<T>) typeToHandlerMap.get(cls);
321 return domainTypeHandler;
325 /** Create or get the DomainTypeHandler for a class.
326 * Use the dictionary to validate against schema.
327 * @param cls the Class for which to get domain type handler
328 * @param dictionary the dictionary to validate against
329 * @return the type handler
332 public <T> DomainTypeHandler<T> getDomainTypeHandler(Class<T> cls,
333 Dictionary dictionary) {
334 // synchronize here because the map is not synchronized
335 synchronized(typeToHandlerMap) {
336 @SuppressWarnings("unchecked")
337 DomainTypeHandler<T> domainTypeHandler = (DomainTypeHandler<T>) typeToHandlerMap.get(cls);
338 if (logger.isDetailEnabled()) logger.detail("DomainTypeToHandler for "
339 + cls.getName() + "(" + cls
340 + ") returned " + domainTypeHandler);
341 if (domainTypeHandler == null) {
342 domainTypeHandler = domainTypeHandlerFactory.createDomainTypeHandler(cls,
344 if (logger.isDetailEnabled()) logger.detail("createDomainTypeHandler for "
345 + cls.getName() + "(" + cls
346 + ") returned " + domainTypeHandler);
347 typeToHandlerMap.put(cls, domainTypeHandler);
348 Class<?> proxyClass = domainTypeHandler.getProxyClass();
349 if (proxyClass != null) {
350 proxyClassToDomainClass.put(proxyClass, cls);
353 return domainTypeHandler;
357 /** Create or get the DomainTypeHandler for an instance.
358 * Use the dictionary to validate against schema.
359 * @param object the object
360 * @param dictionary the dictionary for metadata access
361 * @return the DomainTypeHandler for the object
363 public <T> DomainTypeHandler<T> getDomainTypeHandler(T object, Dictionary dictionary) {
364 Class<T> cls = getClassForProxy(object);
365 DomainTypeHandler<T> result = getDomainTypeHandler(cls);
366 if (result != null) {
369 return getDomainTypeHandler(cls, dictionary);
373 @SuppressWarnings("unchecked")
374 protected static <T> Class<T> getClassForProxy(T object) {
375 Class cls = object.getClass();
376 if (cls.getName().startsWith("$Proxy")) {
377 cls = proxyClassToDomainClass.get(cls);
382 public <T> T newInstance(Class<T> cls, Dictionary dictionary) {
383 DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls, dictionary);
384 return domainTypeHandler.newInstance();
387 public Table getTable(String tableName, Dictionary dictionary) {
390 result = dictionary.getTable(tableName);
391 } catch(Exception ex) {
392 throw new ClusterJFatalInternalException(
393 local.message("ERR_Get_Table"), ex);
398 /** Get the property from the properties map as a String.
399 * @param props the properties
400 * @param propertyName the name of the property
401 * @return the value from the properties (may be null)
403 protected static String getStringProperty(Map<?, ?> props, String propertyName) {
404 return (String)props.get(propertyName);
407 /** Get the property from the properties map as a String. If the user has not
408 * provided a value in the props, use the supplied default value.
409 * @param props the properties
410 * @param propertyName the name of the property
411 * @param defaultValue the value to return if there is no property by that name
412 * @return the value from the properties or the default value
414 protected static String getStringProperty(Map<?, ?> props, String propertyName, String defaultValue) {
415 String result = (String)props.get(propertyName);
416 if (result == null) {
417 result = defaultValue;
422 /** Get the property from the properties map as a String. If the user has not
423 * provided a value in the props, throw an exception.
424 * @param props the properties
425 * @param propertyName the name of the property
426 * @return the value from the properties (may not be null)
428 protected static String getRequiredStringProperty(Map<?, ?> props, String propertyName) {
429 String result = (String)props.get(propertyName);
430 if (result == null) {
431 throw new ClusterJFatalUserException(
432 local.message("ERR_NullProperty", propertyName));
437 /** Get the property from the properties map as an int. If the user has not
438 * provided a value in the props, use the supplied default value.
439 * @param props the properties
440 * @param propertyName the name of the property
441 * @param defaultValue the value to return if there is no property by that name
442 * @return the value from the properties or the default value
444 protected static int getIntProperty(Map<?, ?> props, String propertyName, int defaultValue) {
445 Object property = props.get(propertyName);
446 if (property == null) {
449 if (Number.class.isAssignableFrom(property.getClass())) {
450 return ((Number)property).intValue();
452 if (property instanceof String) {
454 int result = Integer.parseInt((String)property);
456 } catch (NumberFormatException ex) {
457 throw new ClusterJFatalUserException(
458 local.message("ERR_NumericFormat", propertyName, property));
461 throw new ClusterJUserException(local.message("ERR_NumericFormat", propertyName, property));
464 public synchronized void close() {
465 // we have to close all of the cluster connections
466 for (ClusterConnection clusterConnection: pooledConnections) {
467 clusterConnection.close();
469 pooledConnections.clear();
470 synchronized(sessionFactoryMap) {
471 // now remove this from the map
472 sessionFactoryMap.remove(key);
476 public void setDomainTypeHandlerFactory(DomainTypeHandlerFactory domainTypeHandlerFactory) {
477 this.domainTypeHandlerFactory = domainTypeHandlerFactory;
480 public DomainTypeHandlerFactory getDomainTypeHandlerFactory() {
481 return domainTypeHandlerFactory;
484 public List<Integer> getConnectionPoolSessionCounts() {
485 List<Integer> result = new ArrayList<Integer>();
486 for (ClusterConnection connection: pooledConnections) {
487 result.add(connection.dbCount());