2 Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 package com.mysql.clusterj.openjpa;
20 import java.sql.SQLException;
21 import java.util.ArrayList;
22 import java.util.BitSet;
23 import java.util.Collection;
24 import java.util.List;
27 import org.apache.openjpa.jdbc.kernel.ConnectionInfo;
28 import org.apache.openjpa.jdbc.kernel.JDBCFetchConfiguration;
29 import org.apache.openjpa.jdbc.kernel.JDBCStoreManager;
30 import org.apache.openjpa.jdbc.meta.ClassMapping;
31 import org.apache.openjpa.jdbc.meta.ValueMapping;
32 import org.apache.openjpa.jdbc.schema.Table;
33 import org.apache.openjpa.jdbc.sql.Result;
34 import org.apache.openjpa.kernel.FetchConfiguration;
35 import org.apache.openjpa.kernel.OpenJPAStateManager;
36 import org.apache.openjpa.kernel.PCState;
37 import org.apache.openjpa.kernel.QueryLanguages;
38 import org.apache.openjpa.kernel.StoreContext;
39 import org.apache.openjpa.kernel.StoreQuery;
40 import org.apache.openjpa.kernel.exps.ExpressionParser;
41 import org.apache.openjpa.meta.ClassMetaData;
42 import org.apache.openjpa.meta.FieldMetaData;
43 import org.apache.openjpa.util.OpenJPAId;
45 import com.mysql.clusterj.ClusterJDatastoreException;
46 import com.mysql.clusterj.ClusterJException;
47 import com.mysql.clusterj.ClusterJFatalInternalException;
48 import com.mysql.clusterj.ClusterJFatalUserException;
49 import com.mysql.clusterj.ClusterJUserException;
50 import com.mysql.clusterj.SessionFactory;
51 import com.mysql.clusterj.Transaction;
52 import com.mysql.clusterj.core.query.QueryExecutionContextImpl;
53 import com.mysql.clusterj.core.spi.DomainTypeHandler;
54 import com.mysql.clusterj.core.spi.SessionSPI;
55 import com.mysql.clusterj.core.spi.ValueHandler;
56 import com.mysql.clusterj.core.store.Dictionary;
57 import com.mysql.clusterj.core.store.Operation;
58 import com.mysql.clusterj.core.store.ResultData;
59 import com.mysql.clusterj.core.util.I18NHelper;
60 import com.mysql.clusterj.core.util.Logger;
61 import com.mysql.clusterj.core.util.LoggerFactoryService;
62 import com.mysql.clusterj.query.QueryDomainType;
67 public class NdbOpenJPAStoreManager extends JDBCStoreManager {
69 /** My message translator */
70 static final I18NHelper local = I18NHelper.getInstance(NdbOpenJPAStoreManager.class);
73 static final Logger logger = LoggerFactoryService.getFactory().getInstance(NdbOpenJPAStoreManager.class);
75 @SuppressWarnings("unused")
76 private StoreContext storeContext;
77 private NdbOpenJPAConfiguration ndbConfiguration;
78 private SessionFactory sessionFactory;
80 // TODO This really belongs in store context.
81 private SessionSPI session;
82 private Transaction tx;
83 private Dictionary dictionary;
85 public NdbOpenJPAStoreManager() {
90 public void setContext(StoreContext ctx) {
91 super.setContext(ctx);
92 setContext(ctx, (NdbOpenJPAConfiguration) ctx.getConfiguration());
95 public void setContext(StoreContext ctx, NdbOpenJPAConfiguration conf) {
97 ndbConfiguration = conf;
98 sessionFactory = conf.getSessionFactory();
102 protected NdbOpenJPADomainTypeHandlerImpl<?> getDomainTypeHandler(OpenJPAStateManager sm) {
103 // get DomainTypeHandler from StateManager
104 ClassMapping cmp = (ClassMapping) sm.getMetaData();
105 return getDomainTypeHandler(cmp);
108 protected NdbOpenJPADomainTypeHandlerImpl<?> getDomainTypeHandler(ClassMapping cmp) {
109 NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler =
110 ndbConfiguration.getDomainTypeHandler(cmp, dictionary);
111 return domainTypeHandler;
114 protected int deleteAll(DomainTypeHandler<?> base) {
115 // used by NdbOpenJPAStoreQuery to delete all instances of a class
116 int result = session.deletePersistentAll(base);
120 protected SessionSPI getSession() {
121 if (session == null) {
122 session = (SessionSPI) sessionFactory.getSession();
123 dictionary = session.getDictionary();
128 * Find the object with the given oid.
131 public Object find(Object oid, ValueMapping vm,
132 JDBCFetchConfiguration fetch) {
133 if (logger.isDebugEnabled()) {
134 logger.debug("NdbStoreManager.find(Object oid, ValueMapping vm, "
135 + "JDBCFetchConfiguration fetch) delegated to super with oid " + oid + ".");
137 // return null if the oid is null (this will be the case if a foreign key element is null)
138 ClassMapping cls = vm.getDeclaredTypeMapping();
139 NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler = getDomainTypeHandler(cls);
140 Object handler = domainTypeHandler.createKeyValueHandler(oid);
141 if (handler == null) {
144 return super.find(oid, vm, fetch);
147 /** Load the fields for the persistent instance owned by the sm.
148 * @param sm the StateManager
149 * @param fields the fields to load
150 * @param fetch the FetchConfiguration
151 * @param lockLevel the lock level to use when getting data
152 * @param context the StoreContext
153 * @return true if any field was loaded
156 public boolean load(OpenJPAStateManager sm, BitSet fields,
157 FetchConfiguration fetch, int lockLevel, Object context) {
158 if (logger.isDebugEnabled()) {
159 logger.debug("NdbStoreManager.load(OpenJPAStateManager sm, BitSet fields, "
160 + "FetchConfiguration fetch, int lockLevel, Object context) "
161 + "Id: " + sm.getId() + " requested fields: "
162 + NdbOpenJPAUtility.printBitSet(sm, fields));
164 if (context != null && ((ConnectionInfo) context).result != null) {
165 // there is already a result set to process
166 return super.load(sm, fields, fetch, lockLevel, context);
168 NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler = getDomainTypeHandler(sm);
169 if (!isSupportedType(domainTypeHandler, "NdbOpenJPAStoreManager.load")) {
170 return super.load(sm, fields, fetch, lockLevel, context);
173 return domainTypeHandler.load(sm, this, fields, (JDBCFetchConfiguration) fetch, context);
174 } catch (SQLException sQLException) {
175 logger.error("Fatal error from NdbOpenJPAStoreManager.load " + sQLException);
183 public Object load(ClassMapping mapping, JDBCFetchConfiguration fetch,
184 BitSet exclude, Result result) throws SQLException {
185 if (logger.isDebugEnabled()) {
186 logger.debug("NdbStoreManager.load(ClassMapping mapping, JDBCFetchConfiguration fetch, "
187 + "BitSet exclude, Result result) for " + mapping.getDescribedType().getName()
188 + " delegated to super.");
190 return super.load(mapping, fetch, exclude, result);
193 @SuppressWarnings("unchecked")
195 public Collection loadAll(Collection sms, PCState state, int load,
196 FetchConfiguration fetch, Object context) {
197 if (logger.isDebugEnabled()) {
198 logger.debug("NdbStoreManager.loadAll(Collection sms, PCState state, int load, "
199 + "FetchConfiguration fetch, Object context) delegated to super.");
201 return super.loadAll(sms, state, load, fetch, context);
205 public boolean initialize(OpenJPAStateManager sm, PCState state,
206 FetchConfiguration fetch, Object context) {
207 if (logger.isDebugEnabled()) {
208 logger.debug("NdbStoreManager.initialize(OpenJPAStateManager sm, PCState state, "
209 + "FetchConfiguration fetch, Object context)");
211 // if context already contains a result, use the result to initialize
212 if (context != null) {
213 ConnectionInfo info = (ConnectionInfo)context;
214 ClassMapping mapping = info.mapping;
215 Result result = info.result;
216 logger.info("info mapping: " + mapping.getDescribedType().getName() + " result: " + result);
218 return initializeState(sm, state, (JDBCFetchConfiguration)fetch, info);
219 } catch (ClassNotFoundException e) {
220 throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"), e);
221 } catch (SQLException e) {
222 throw new ClusterJDatastoreException(local.message("ERR_Datastore_Exception"), e);
225 // otherwise, load from the datastore
226 // TODO: support user-defined oid types
227 OpenJPAId id = (OpenJPAId)sm.getId();
228 if (logger.isTraceEnabled()) {
229 logger.trace("Id: " + id.getClass() + " " + id);
231 // get domain type handler for StateManager
232 NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler = getDomainTypeHandler(sm);
234 if (!isSupportedType(domainTypeHandler, "NdbOpenJPAStoreManager.initialize")) {
235 // if not supported, go the jdbc route
236 boolean result = super.initialize(sm, state, fetch, context);
237 if (logger.isDebugEnabled()) logger.debug(
238 "NdbOpenJPAStoreManager.initialize delegated to super: returned " + result);
242 // get session from session factory
244 session.startAutoTransaction();
245 // get domain type handler for StateManager
246 // NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler =
247 // getDomainTypeHandler(sm);
248 // Object instance = session.initializeFromDatabase(
249 // domainTypeHandler, null,
250 // domainTypeHandler.getValueHandler(sm),
251 // domainTypeHandler.createKeyValueHandler(id.getIdObject()));
252 // initialize via OpenJPA protocol
253 // select all columns from table
254 ValueHandler keyValueHandler = domainTypeHandler.createKeyValueHandler(id.getIdObject());
255 ResultData resultData = session.selectUnique(domainTypeHandler,
258 // create an OpenJPA Result from the ndb result data
259 NdbOpenJPAResult result = new NdbOpenJPAResult(resultData, domainTypeHandler, null);
261 // we have an instance; create the PC instance
262 domainTypeHandler.newInstance(sm);
263 // for each field, call its handler to initialize the field
264 // TODO: should compare using this technique against
265 // using clusterj directly (see above) since
266 // there is a lot more overhead using the openjpa technique
267 domainTypeHandler.load(sm, this, (JDBCFetchConfiguration)fetch, result);
268 // NdbOpenJPADomainFieldHandlerImpl[] fieldHandlers = domainTypeHandler.getDomainFieldHandlers();
269 // for (NdbOpenJPADomainFieldHandlerImpl fmd:fieldHandlers) {
271 // // if (fmd.isToOne()) {
272 // FieldMapping fm = fmd.getFieldMapping();
273 // fm.load(sm, this, (JDBCFetchConfiguration)fetch, result);
275 // // fmd.load(sm, fetch, result);
278 if (logger.isDetailEnabled()) {
279 logger.detail("After initializing PCState: " +
280 sm.getPCState().getClass().getSimpleName() + " " +
283 session.endAutoTransaction();
286 } catch (ClusterJException e) {
287 session.failAutoTransaction();
289 } catch (Exception e) {
290 session.failAutoTransaction();
291 throw new ClusterJFatalInternalException("Unexpected exception.", e);
292 // if any problem, fall back
293 // return super.initialize(sm, state, fetch, context);
298 protected boolean initializeState(OpenJPAStateManager sm, PCState state,
299 JDBCFetchConfiguration fetch, ConnectionInfo info)
300 throws ClassNotFoundException, SQLException {
301 if (logger.isDebugEnabled()) {
302 logger.debug("NdbStoreManager.initializeState(" +
303 "OpenJPAStateManager, PCState, JDBCFetchConfiguration, " +
304 "ConnectionInfo) delegated to super.");
306 return super.initializeState(sm, state, fetch, info);
310 * Flush the given state manager collection to the datastore, returning
311 * a collection of exceptions encountered during flushing.
312 * The given collection may include states that do not require data
313 * store action, such as persistent-clean instances or persistent-dirty
314 * instances that have not been modified since they were last flushed.
315 * For datastore updates and inserts, the dirty, non-flushed fields of
316 * each state should be flushed. New instances without an assigned object
317 * id should be given one via {@link OpenJPAStateManager#setObjectId}. New
318 * instances with value-strategy fields that have not been assigned yet
319 * should have their fields set. Datastore version information should be
320 * updated during flush, and the state manager's version indicator
321 * updated through the {@link OpenJPAStateManager#setNextVersion} method.
322 * The current version will roll over to this next version upon successful
325 @SuppressWarnings("unchecked")
327 public Collection<Exception> flush(Collection sms) {
328 Collection<OpenJPAStateManager> stateManagers =
329 (Collection<OpenJPAStateManager>)sms;
330 StringBuffer buffer = null;
331 if (logger.isTraceEnabled()) {
332 buffer = new StringBuffer();
334 // make sure all instances are OK to insert/update/delete
335 boolean allSupportedTypes = true;
336 for (OpenJPAStateManager sm: stateManagers) {
337 DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(sm);
338 if (!domainTypeHandler.isSupportedType()) {
339 if (logger.isDetailEnabled()) logger.detail("Found unsupported class "
340 + domainTypeHandler.getName());
341 if (ndbConfiguration.getFailOnJDBCPath()) {
342 throw new ClusterJFatalUserException(
343 local.message("ERR_JDBC_Path", domainTypeHandler.getName()));
345 allSupportedTypes = false;
347 if (logger.isTraceEnabled()) {
348 buffer.append(printState(sm));
351 if (logger.isTraceEnabled()) {
352 logger.trace(buffer.toString());
354 if (!allSupportedTypes) {
355 // not all instances are of supported types; delegate to super
356 Collection<Exception> exceptions = super.flush(sms);
357 if (logger.isDetailEnabled()) logger.detail("Found unsupported class(es); "
358 + "super resulted in exceptions: " + exceptions);
361 // now flush changes to the cluster back end
363 Collection exceptions = new ArrayList<Exception>();
364 for (OpenJPAStateManager sm:stateManagers) {
365 // get DomainTypeHandler from StateManager
366 NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler = getDomainTypeHandler(sm);
367 // get the value handler for the StateManager
368 ValueHandler valueHandler = domainTypeHandler.getValueHandler(sm, this);
369 // now flush based on current PCState
370 PCState pcState = sm.getPCState();
372 if (pcState == PCState.PNEW) {
373 // flush new instance
374 session.insert(domainTypeHandler, valueHandler);
375 } else if (pcState == PCState.PDELETED) {
376 // flush deleted instance
377 session.delete(domainTypeHandler, valueHandler);
378 } else if (pcState == PCState.PDIRTY) {
379 // flush dirty instance
380 session.update(domainTypeHandler, valueHandler);
381 } else if (pcState == PCState.PNEWFLUSHEDDELETED) {
382 // flush new flushed deleted instance
383 session.delete(domainTypeHandler, valueHandler);
384 } else if (pcState == PCState.PNEWFLUSHEDDELETEDFLUSHED) {
387 throw new ClusterJUserException(
388 local.message("ERR_Unsupported_Flush_Operation",
389 pcState.toString()));
391 } catch (Exception ex) {
392 if (logger.isDebugEnabled()) {
393 logger.debug("Exception caught: " + ex.toString());
398 // after all instances are flushed, send to the back end
404 /** Handle unsupported class in a standard way. If unsupported, log the request and
405 * throw an exception if the failOnJDBCPath flag is set.
406 * @param domainTypeHandler
407 * @return true if the type is supported by clusterjpa
409 private boolean isSupportedType(NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler,
411 boolean result = domainTypeHandler.isSupportedType();
413 if (logger.isDebugEnabled()) logger.debug(where
414 + " found unsupported class " + domainTypeHandler.getName());
415 if (ndbConfiguration.getFailOnJDBCPath()) {
416 throw new ClusterJFatalUserException(
417 local.message("ERR_JDBC_Path", domainTypeHandler.getName()));
424 public void beforeStateChange(OpenJPAStateManager sm, PCState fromState,
426 if (logger.isDetailEnabled()) {
428 printState("from ", fromState) +
429 printState(" to ", toState));
431 super.beforeStateChange(sm, fromState, toState);
435 public StoreQuery newQuery(String language) {
436 ExpressionParser ep = QueryLanguages.parserForLanguage(language);
437 return new NdbOpenJPAStoreQuery(this, ep);
441 public void beginOptimistic() {
442 if (logger.isTraceEnabled()) {
443 logger.trace(" Transaction " + hashCode() + printIsActive(tx));
445 super.beginOptimistic();
448 tx = session.currentTransaction();
453 } catch (Exception e) {
454 logger.detail("NdbOpenJPAStoreManager.beginOptimistic():" +
455 "caught exception in session.currentTransaction.begin().");
456 throw new ClusterJDatastoreException(
457 local.message("ERR_Datastore_Exception"), e);
462 public void begin() {
463 if (logger.isTraceEnabled()) {logger.trace(" Transaction " + hashCode() + printIsActive(tx));}
466 // end ndb transaction if active
467 tx = session.currentTransaction();
472 } catch (Exception e) {
473 logger.detail("Caught exception in session.currentTransaction.commit()." +
476 // TODO: handle JDBC connection for queries
481 public void commit() {
482 if (logger.isTraceEnabled()) {logger.trace(" Transaction " + hashCode() + printIsActive(tx));}
485 } catch (Exception ex) {
486 logger.detail(" failed" + ex.toString());
487 throw new ClusterJException(
488 local.message("ERR_Commit_Failed", ex.toString()));
490 // TODO: handle JDBC connection for queries
495 public void rollback() {
496 if (logger.isTraceEnabled()) {logger.trace(" Transaction " + hashCode() + printIsActive(tx));}
498 // TODO: handle JDBC connection for queries
503 public void close() {
504 if (logger.isTraceEnabled()) {logger.trace(" Transaction " + hashCode() + printIsActive(tx));}
505 if (session != null && !session.isClosed()) {
506 if (session.currentTransaction().isActive()) {
513 protected String printState(OpenJPAStateManager sm) {
514 StringBuffer buffer = new StringBuffer();
515 buffer.append("class: ");
516 buffer.append(sm.getPersistenceCapable().getClass().getName());
517 buffer.append(" objectId: ");
518 buffer.append(sm.getObjectId());
519 buffer.append(" PCState: ");
520 buffer.append(sm.getPCState());
522 return buffer.toString();
525 protected String printState(String header, PCState state) {
526 StringBuffer buffer = new StringBuffer(header);
527 buffer.append(state.getClass().getSimpleName());
528 return buffer.toString();
531 protected String printLoaded(OpenJPAStateManager sm) {
532 BitSet loaded = sm.getLoaded();
533 return "Loaded: " + NdbOpenJPAUtility.printBitSet(sm, loaded);
536 protected String printIsActive(Transaction tx) {
537 return (tx==null?" is null.":tx.isActive()?" is active.":" is not active.");
539 // The following is not used in ClusterJ, since managed mode is not implemented
540 // LockManager lm = ctx.getLockManager();
541 // if (lm instanceof JDBCLockManager)
542 // _lm = (JDBCLockManager) lm;
544 // if (!ctx.isManaged() && _conf.isConnectionFactoryModeManaged())
545 // _ds = _conf.getDataSource2(ctx);
547 // _ds = _conf.getDataSource(ctx);
549 // if (_conf.getUpdateManagerInstance().orderDirty())
550 // ctx.setOrderDirtyObjects(true);
553 * @param type the root type of the query
554 * @return the query domain type
556 public <T> QueryDomainType<T> createQueryDomainType(Class<T> type) {
557 return session.getQueryBuilder().createQueryDefinition(type);
560 /** Execute the query and return the result list.
561 * @param domainTypeHandler the domain type handler
562 * @param queryDomainType the QueryDomainType
563 * @param parameterMap the bound parameters
564 * @return the result of the query
566 public NdbOpenJPAResult executeQuery(DomainTypeHandler<?> domainTypeHandler,
567 QueryDomainType<?> queryDomainType, Map<String, Object> parameterMap) {
568 QueryExecutionContextImpl context = new QueryExecutionContextImpl(session, parameterMap);
569 ResultData resultData = context.getResultData(queryDomainType);
570 NdbOpenJPAResult result = new NdbOpenJPAResult(resultData, domainTypeHandler, null);
574 /** Look up the row in the database in order to load them into the instance.
575 * @param sm the state manager whose fields are to be loaded
576 * @param domainTypeHandler the domain type handler for the instance's type
577 * @param fieldHandlers the field handlers for the fields to be loaded
578 * @return the result containing just the fields requested
580 public NdbOpenJPAResult lookup(OpenJPAStateManager sm,
581 NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler,
582 List<NdbOpenJPADomainFieldHandlerImpl> fieldHandlers) {
583 com.mysql.clusterj.core.store.Table storeTable = domainTypeHandler.getStoreTable();
584 session.startAutoTransaction();
586 Operation op = session.getSelectOperation(storeTable);
587 int[] keyFields = domainTypeHandler.getKeyFieldNumbers();
588 BitSet fieldsInResult = new BitSet();
589 for (int i : keyFields) {
590 fieldsInResult.set(i);
592 ValueHandler handler = domainTypeHandler.getValueHandler(sm, this);
593 domainTypeHandler.operationSetKeys(handler, op);
594 // include the key columns in the results
595 domainTypeHandler.operationGetKeys(op);
596 for (NdbOpenJPADomainFieldHandlerImpl fieldHandler : fieldHandlers) {
597 fieldHandler.operationGetValue(op);
598 fieldsInResult.set(fieldHandler.getFieldNumber());
600 ResultData resultData = op.resultData();
601 NdbOpenJPAResult result = new NdbOpenJPAResult(resultData, domainTypeHandler, fieldsInResult);
602 session.endAutoTransaction();
604 } catch (RuntimeException ex) {
605 session.failAutoTransaction();
610 public Dictionary getDictionary() {