2 Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 package com.mysql.clusterj.core;
20 import com.mysql.clusterj.ClusterJException;
21 import com.mysql.clusterj.ClusterJFatalInternalException;
22 import com.mysql.clusterj.ClusterJUserException;
23 import com.mysql.clusterj.DynamicObject;
24 import com.mysql.clusterj.LockMode;
25 import com.mysql.clusterj.Query;
26 import com.mysql.clusterj.Transaction;
28 import com.mysql.clusterj.core.spi.DomainTypeHandler;
29 import com.mysql.clusterj.core.spi.ValueHandler;
31 import com.mysql.clusterj.core.query.QueryDomainTypeImpl;
32 import com.mysql.clusterj.core.query.QueryBuilderImpl;
33 import com.mysql.clusterj.core.query.QueryImpl;
35 import com.mysql.clusterj.core.spi.SessionSPI;
37 import com.mysql.clusterj.core.store.ClusterTransaction;
38 import com.mysql.clusterj.core.store.Db;
39 import com.mysql.clusterj.core.store.Dictionary;
40 import com.mysql.clusterj.core.store.Index;
41 import com.mysql.clusterj.core.store.IndexOperation;
42 import com.mysql.clusterj.core.store.IndexScanOperation;
43 import com.mysql.clusterj.core.store.Operation;
44 import com.mysql.clusterj.core.store.PartitionKey;
45 import com.mysql.clusterj.core.store.ResultData;
46 import com.mysql.clusterj.core.store.ScanOperation;
47 import com.mysql.clusterj.core.store.Table;
49 import com.mysql.clusterj.core.util.I18NHelper;
50 import com.mysql.clusterj.core.util.Logger;
51 import com.mysql.clusterj.core.util.LoggerFactoryService;
53 import com.mysql.clusterj.query.QueryBuilder;
54 import com.mysql.clusterj.query.QueryDefinition;
55 import com.mysql.clusterj.query.QueryDomainType;
57 import java.util.ArrayList;
58 import java.util.BitSet;
59 import java.util.Collections;
60 import java.util.Iterator;
61 import java.util.List;
65 * This class implements Session, the main user interface to ClusterJ.
66 * It also implements SessionSPI, the main component interface.
68 public class SessionImpl implements SessionSPI, CacheManager, StoreManager {
70 /** My message translator */
71 static final I18NHelper local = I18NHelper.getInstance(SessionImpl.class);
74 static final Logger logger = LoggerFactoryService.getFactory().getInstance(SessionImpl.class);
77 protected SessionFactoryImpl factory;
79 /** Db: one per session. */
82 /** Dictionary: one per session. */
83 protected Dictionary dictionary;
85 /** One transaction at a time. */
86 protected TransactionImpl transactionImpl;
88 /** The partition key */
89 protected PartitionKey partitionKey = null;
91 /** Rollback only status */
92 protected boolean rollbackOnly = false;
94 /** The underlying ClusterTransaction */
95 protected ClusterTransaction clusterTransaction;
97 /** The transaction id to join */
98 protected String joinTransactionId = null;
100 /** The properties for this session */
101 protected Map properties;
103 /** Flags for iterating a scan */
104 protected final int RESULT_READY = 0;
105 protected final int SCAN_FINISHED = 1;
106 protected final int CACHE_EMPTY = 2;
108 /** The list of objects changed since the last flush */
109 protected List<StateManager> changeList = new ArrayList<StateManager>();
111 /** The list of pending operations, such as load operations, that need to be
112 * processed after the operation is sent to the database via @see #executeNoCommit().
114 protected List<Runnable> postExecuteOperations = new ArrayList<Runnable>();
116 /** The transaction state of this session. */
117 protected TransactionState transactionState;
119 /** The exception state of an internal transaction. */
120 private ClusterJException transactionException;
122 /** Nested auto transaction counter. */
123 protected int nestedAutoTransactionCounter = 0;
125 /** Number of retries for retriable exceptions */
126 // TODO get this from properties
127 protected int numberOfRetries = 5;
129 /** The lock mode for read operations */
130 private LockMode lockmode = LockMode.READ_COMMITTED;
132 /** Our post-execute callback handler */
133 private Runnable postExecuteCallbackHandler = new Runnable() {
135 for (Runnable postExecuteCallback: postExecuteOperations) {
136 postExecuteCallback.run();
138 postExecuteOperations.clear();
142 /** Create a SessionImpl with factory, properties, Db, and dictionary
144 SessionImpl(SessionFactoryImpl factory, Map properties,
145 Db db, Dictionary dictionary) {
146 this.factory = factory;
148 this.dictionary = dictionary;
149 this.properties = properties;
150 transactionImpl = new TransactionImpl(this);
151 transactionState = transactionStateNotActive;
154 /** Create a query from a query definition.
156 * @param qd the query definition
159 public <T> Query<T> createQuery(QueryDefinition<T> qd) {
160 if (!(qd instanceof QueryDomainTypeImpl)) {
161 throw new ClusterJUserException(
162 local.message("ERR_Exception_On_Method", "createQuery"));
164 return new QueryImpl<T>(this, (QueryDomainTypeImpl<T>)qd);
167 /** Find an instance by its class and primary key.
168 * If there is a compound primary key, the key is an Object[] containing
169 * all of the primary key fields in order of declaration in annotations.
171 * @param cls the class
172 * @param key the primary key
173 * @return the instance
175 public <T> T find(Class<T> cls, Object key) {
176 DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls);
177 T instance = (T) factory.newInstance(cls, dictionary);
178 ValueHandler keyHandler = domainTypeHandler.createKeyValueHandler(key);
179 ValueHandler instanceHandler = domainTypeHandler.getValueHandler(instance);
180 // initialize from the database using the key
181 return (T) initializeFromDatabase(
182 domainTypeHandler, instance, instanceHandler, keyHandler);
185 /** Initialize fields from the database. The keyHandler must
186 * contain the primary keys, none of which can be null.
187 * The instanceHandler, which may be null, manages the values
188 * of the instance. If it is null, both the instanceHandler and the
189 * instance are created if the instance exists in the database.
190 * The instance, which may be null, is the domain instance that is
191 * returned after loading the values from the database.
193 * @param domainTypeHandler domain type handler for the class
194 * @param keyHandler the primary key handler
195 * @param instanceHandler the handler for the instance
196 * (may be null if not yet initialized)
197 * @param instance the instance (may be null)
198 * @return the instance with fields initialized from the database
200 public <T> T initializeFromDatabase(DomainTypeHandler<T> domainTypeHandler,
202 ValueHandler instanceHandler, ValueHandler keyHandler) {
203 startAutoTransaction();
205 ResultData rs = selectUnique(domainTypeHandler, keyHandler, null);
207 // we have a result; initialize the instance
208 if (instanceHandler == null) {
209 if (logger.isDetailEnabled()) logger.detail("Creating instanceHandler for class " + domainTypeHandler.getName() + " table: " + domainTypeHandler.getTableName() + keyHandler.pkToString(domainTypeHandler));
210 // we need both a new instance and its handler
211 instance = domainTypeHandler.newInstance();
212 instanceHandler = domainTypeHandler.getValueHandler(instance);
213 } else if (instance == null) {
214 if (logger.isDetailEnabled()) logger.detail("Creating instance for class " + domainTypeHandler.getName() + " table: " + domainTypeHandler.getTableName() + keyHandler.pkToString(domainTypeHandler));
215 // we have a handler but no instance
216 instance = domainTypeHandler.getInstance(instanceHandler);
218 // found the instance in the datastore
219 instanceHandler.found(Boolean.TRUE);
220 // put the results into the instance
221 domainTypeHandler.objectSetValues(rs, instanceHandler);
222 // set the cache manager to track updates
223 domainTypeHandler.objectSetCacheManager(this, instanceHandler);
224 // reset modified bits in instance
225 domainTypeHandler.objectResetModified(instanceHandler);
227 if (logger.isDetailEnabled()) logger.detail("No instance found in database for class " + domainTypeHandler.getName() + " table: " + domainTypeHandler.getTableName() + keyHandler.pkToString(domainTypeHandler));
228 // no instance found in database
229 if (instanceHandler != null) {
230 // mark the handler as not found
231 instanceHandler.found(Boolean.FALSE);
233 endAutoTransaction();
236 } catch (ClusterJException ex) {
237 failAutoTransaction();
240 endAutoTransaction();
244 /** If a transaction is already enlisted, ignore. Otherwise, set
245 * the partition key based on the key handler.
246 * @param domainTypeHandler the domain type handler
247 * @param keyHandler the value handler that holds the key values
249 private void setPartitionKey(DomainTypeHandler<?> domainTypeHandler,
250 ValueHandler keyHandler) {
252 // there is still time to set the partition key
253 PartitionKey partitionKey =
254 domainTypeHandler.createPartitionKey(keyHandler);
255 clusterTransaction.setPartitionKey(partitionKey);
259 /** Create an instance of a class to be persisted.
261 * @param cls the class
262 * @return a new instance that can be used with makePersistent
264 public <T> T newInstance(Class<T> cls) {
265 return factory.newInstance(cls, dictionary);
268 /** Create an instance of a class to be persisted and set the primary key.
270 * @param cls the class
271 * @return a new instance that can be used with makePersistent,
272 * savePersistent, writePersistent, updatePersistent, or deletePersistent
274 public <T> T newInstance(Class<T> cls, Object key) {
275 DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls);
276 T instance = factory.newInstance(cls, dictionary);
277 domainTypeHandler.objectSetKeys(key, instance);
281 /** Load the instance from the database into memory. Loading
282 * is asynchronous and will be executed when an operation requiring
283 * database access is executed: find, flush, or query. The instance must
284 * have been returned from find or query; or
285 * created via session.newInstance and its primary key initialized.
286 * @param object the instance to load
287 * @return the instance
288 * @see #found(Object)
290 public <T> T load(final T object) {
291 if (object == null) {
294 if (Iterable.class.isAssignableFrom(object.getClass())) {
295 Iterable<?> instances = (Iterable<?>)object;
296 for (Object instance:instances) {
301 if (object.getClass().isArray()) {
302 Object[] instances = (Object[])object;
303 for (Object instance:instances) {
308 // a transaction must already be active (autocommit is not supported)
310 final DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(object);
311 final ValueHandler instanceHandler = domainTypeHandler.getValueHandler(object);
312 setPartitionKey(domainTypeHandler, instanceHandler);
313 Table storeTable = domainTypeHandler.getStoreTable();
314 // perform a primary key operation
315 final Operation op = clusterTransaction.getSelectOperation(storeTable);
316 // set the keys into the operation
317 domainTypeHandler.operationSetKeys(instanceHandler, op);
318 // set the expected columns into the operation
319 domainTypeHandler.operationGetValues(op);
320 final ResultData rs = op.resultData(false);
321 final SessionImpl cacheManager = this;
322 // defer execution of the key operation until the next find, flush, or query
323 Runnable postExecuteOperation = new Runnable() {
326 // found row in database
327 instanceHandler.found(Boolean.TRUE);
328 // put the results into the instance
329 domainTypeHandler.objectSetValues(rs, instanceHandler);
330 // set the cache manager to track updates
331 domainTypeHandler.objectSetCacheManager(cacheManager, instanceHandler);
332 // reset modified bits in instance
333 domainTypeHandler.objectResetModified(instanceHandler);
335 // mark instance as not found
336 instanceHandler.found(Boolean.FALSE);
341 postExecuteOperations.add(postExecuteOperation);
345 /** Was this instance found in the database?
346 * @param instance the instance
347 * @return <ul><li>null if the instance is null or was created via newInstance and never loaded;
348 * </li><li>true if the instance was returned from a find or query
349 * or created via newInstance and successfully loaded;
350 * </li><li>false if the instance was created via newInstance and not found.
353 public Boolean found(Object instance) {
354 if (instance == null) {
357 if (instance instanceof DynamicObject) {
358 return ((DynamicObject)instance).found();
360 // make sure the instance is a persistent type
361 getDomainTypeHandler(instance);
365 /** Make an instance persistent. Also recursively make an iterable collection or array persistent.
367 * @param object the instance or array or iterable collection of instances
368 * @return the instance
370 public <T> T makePersistent(T object) {
371 if (object == null) {
374 if (Iterable.class.isAssignableFrom(object.getClass())) {
375 startAutoTransaction();
376 Iterable<?> instances = (Iterable<?>)object;
377 for (Object instance:instances) {
378 makePersistent(instance);
380 endAutoTransaction();
383 if (object.getClass().isArray()) {
384 startAutoTransaction();
385 Object[] instances = (Object[])object;
386 for (Object instance:instances) {
387 makePersistent(instance);
389 endAutoTransaction();
392 DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(object);
393 ValueHandler valueHandler = domainTypeHandler.getValueHandler(object);
394 insert(domainTypeHandler, valueHandler);
398 public Operation insert(
399 DomainTypeHandler<?> domainTypeHandler, ValueHandler valueHandler) {
400 startAutoTransaction();
401 setPartitionKey(domainTypeHandler, valueHandler);
403 Table storeTable = null;
405 storeTable = domainTypeHandler.getStoreTable();
406 op = clusterTransaction.getInsertOperation(storeTable);
407 // set all values in the operation, keys first
408 domainTypeHandler.operationSetKeys(valueHandler, op);
409 domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op);
410 // reset modified bits in instance
411 domainTypeHandler.objectResetModified(valueHandler);
412 } catch (ClusterJUserException cjuex) {
413 failAutoTransaction();
415 } catch (ClusterJException cjex) {
416 failAutoTransaction();
417 logger.error(local.message("ERR_Insert", storeTable.getName()));
418 throw new ClusterJException(
419 local.message("ERR_Insert", storeTable.getName()), cjex);
420 } catch (RuntimeException rtex) {
421 failAutoTransaction();
422 logger.error(local.message("ERR_Insert", storeTable.getName()));
423 throw new ClusterJException(
424 local.message("ERR_Insert", storeTable.getName()), rtex);
426 endAutoTransaction();
430 /** Make a number of instances persistent.
432 * @param instances a Collection or array of objects to persist
433 * @return a Collection or array with the same order of iteration
435 public Iterable makePersistentAll(Iterable instances) {
436 startAutoTransaction();
437 List<Object> result = new ArrayList<Object>();
438 for (Object instance:instances) {
439 result.add(makePersistent(instance));
441 endAutoTransaction();
445 /** Delete an instance of a class from the database given its primary key.
446 * For single-column keys, the key parameter is a wrapper (e.g. Integer).
447 * For multi-column keys, the key parameter is an Object[] in which
448 * elements correspond to the primary keys in order as defined in the schema.
449 * @param cls the class
450 * @param key the primary key
452 public <T> void deletePersistent(Class<T> cls, Object key) {
453 DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls);
454 ValueHandler keyValueHandler = domainTypeHandler.createKeyValueHandler(key);
455 delete(domainTypeHandler, keyValueHandler);
458 /** Remove an instance from the database. Only the key field(s)
459 * are used to identify the instance.
461 * @param object the instance to remove from the database
463 public void deletePersistent(Object object) {
464 if (object == null) {
467 DomainTypeHandler domainTypeHandler = getDomainTypeHandler(object);
468 ValueHandler valueHandler = domainTypeHandler.getValueHandler(object);
469 delete(domainTypeHandler, valueHandler);
472 public Operation delete(DomainTypeHandler domainTypeHandler, ValueHandler valueHandler) {
473 startAutoTransaction();
474 Table storeTable = domainTypeHandler.getStoreTable();
475 setPartitionKey(domainTypeHandler, valueHandler);
478 op = clusterTransaction.getDeleteOperation(storeTable);
479 domainTypeHandler.operationSetKeys(valueHandler, op);
480 } catch (ClusterJException ex) {
481 failAutoTransaction();
482 throw new ClusterJException(
483 local.message("ERR_Delete", storeTable.getName()), ex);
485 endAutoTransaction();
489 /** Delete the instances corresponding to the parameters.
490 * @param objects the objects to delete
492 public void deletePersistentAll(Iterable objects) {
493 startAutoTransaction();
494 for (Iterator it = objects.iterator(); it.hasNext();) {
495 deletePersistent(it.next());
497 endAutoTransaction();
500 /** Delete all instances of the parameter class.
501 * @param cls the class of instances to delete
503 public <T> int deletePersistentAll(Class<T> cls) {
504 DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls);
505 return deletePersistentAll(domainTypeHandler);
508 /** Delete all instances of the parameter domainTypeHandler.
509 * @param domainTypeHandler the domainTypeHandler of instances to delete
510 * @return the number of instances deleted
512 public int deletePersistentAll(DomainTypeHandler<?> domainTypeHandler) {
513 startAutoTransaction();
514 Table storeTable = domainTypeHandler.getStoreTable();
515 String tableName = storeTable.getName();
516 ScanOperation op = null;
519 op = clusterTransaction.getTableScanOperationLockModeExclusiveScanFlagKeyInfo(storeTable);
520 count = deletePersistentAll(op, true);
521 } catch (ClusterJException ex) {
522 failAutoTransaction();
523 // TODO add table name to the error message
524 throw new ClusterJException(
525 local.message("ERR_Delete_All", tableName), ex);
527 endAutoTransaction();
531 /** Delete all instances retrieved by the operation. The operation must have exclusive
532 * access to the instances and have the ScanFlag.KEY_INFO flag set.
533 * @param op the scan operation
534 * @return the number of instances deleted
536 public int deletePersistentAll(ScanOperation op, boolean abort) {
539 boolean done = false;
540 boolean fetch = true;
541 // cannot use early autocommit optimization here
542 clusterTransaction.setAutocommit(false);
543 // execute the operation
544 clusterTransaction.executeNoCommit(true, true);
546 int result = op.nextResult(fetch);
549 op.deleteCurrentTuple();
556 if (cacheCount != 0) {
557 clusterTransaction.executeNoCommit(abort, true);
562 clusterTransaction.executeNoCommit(abort, true);
567 throw new ClusterJException(
568 local.message("ERR_Next_Result_Illegal", result));
574 /** Select a single row from the database. Only the fields requested
575 * will be selected. A transaction must be active (either via begin
576 * or startAutoTransaction).
578 * @param domainTypeHandler the domainTypeHandler to be selected
579 * @param keyHandler the key supplier for the select
580 * @param fields the fields to select; null to select all fields
581 * @return the ResultData from the database
583 public ResultData selectUnique(DomainTypeHandler domainTypeHandler,
584 ValueHandler keyHandler, BitSet fields) {
586 setPartitionKey(domainTypeHandler, keyHandler);
587 Table storeTable = domainTypeHandler.getStoreTable();
588 // perform a single select by key operation
589 Operation op = clusterTransaction.getSelectOperation(storeTable);
590 // set the keys into the operation
591 domainTypeHandler.operationSetKeys(keyHandler, op);
592 // set the expected columns into the operation
593 domainTypeHandler.operationGetValues(op);
594 // execute the select and get results
595 ResultData rs = op.resultData();
599 /** Update an instance in the database. The key field(s)
600 * are used to identify the instance; modified fields change the
601 * values in the database.
603 * @param object the instance to update in the database
605 public void updatePersistent(Object object) {
606 if (object == null) {
609 DomainTypeHandler domainTypeHandler = getDomainTypeHandler(object);
610 if (logger.isDetailEnabled()) logger.detail("UpdatePersistent on object " + object);
611 ValueHandler valueHandler = domainTypeHandler.getValueHandler(object);
612 update(domainTypeHandler, valueHandler);
615 public Operation update(DomainTypeHandler domainTypeHandler, ValueHandler valueHandler) {
616 startAutoTransaction();
617 setPartitionKey(domainTypeHandler, valueHandler);
618 Table storeTable = null;
621 storeTable = domainTypeHandler.getStoreTable();
622 op = clusterTransaction.getUpdateOperation(storeTable);
623 domainTypeHandler.operationSetKeys(valueHandler, op);
624 domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op);
625 if (logger.isDetailEnabled()) logger.detail("Updated object " +
627 } catch (ClusterJException ex) {
628 failAutoTransaction();
629 throw new ClusterJException(
630 local.message("ERR_Update", storeTable.getName()) ,ex);
632 endAutoTransaction();
636 /** Update the instances corresponding to the parameters.
637 * @param objects the objects to update
639 public void updatePersistentAll(Iterable objects) {
640 startAutoTransaction();
641 for (Iterator it = objects.iterator(); it.hasNext();) {
642 updatePersistent(it.next());
644 endAutoTransaction();
647 /** Save the instance even if it does not exist.
648 * @param instance the instance to save
650 public <T> T savePersistent(T instance) {
651 DomainTypeHandler domainTypeHandler = getDomainTypeHandler(instance);
652 if (logger.isDetailEnabled()) logger.detail("UpdatePersistent on object " + instance);
653 ValueHandler valueHandler = domainTypeHandler.getValueHandler(instance);
654 startAutoTransaction();
655 setPartitionKey(domainTypeHandler, valueHandler);
656 Table storeTable = null;
658 storeTable = domainTypeHandler.getStoreTable();
660 op = clusterTransaction.getWriteOperation(storeTable);
661 domainTypeHandler.operationSetKeys(valueHandler, op);
662 domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op);
663 if (logger.isDetailEnabled()) logger.detail("Wrote object " +
665 } catch (ClusterJException ex) {
666 failAutoTransaction();
667 throw new ClusterJException(
668 local.message("ERR_Write", storeTable.getName()) ,ex);
670 endAutoTransaction();
674 /** Save the instances even if they do not exist.
677 public Iterable savePersistentAll(Iterable instances) {
678 List<Object> result = new ArrayList<Object>();
679 startAutoTransaction();
680 for (Iterator it = instances.iterator(); it.hasNext();) {
681 result.add(savePersistent(it.next()));
683 endAutoTransaction();
687 /** Get the current transaction.
689 * @return the transaction
691 public Transaction currentTransaction() {
692 return transactionImpl;
695 /** Close this session and deallocate all resources.
698 public void close() {
699 if (clusterTransaction != null) {
700 clusterTransaction.close();
701 clusterTransaction = null;
709 public boolean isClosed() {
713 /** Assert this session is not yet closed. */
714 protected void assertNotClosed() {
716 throw new ClusterJUserException(
717 local.message("ERR_Session_Closed"));
721 /** Begin the current transaction.
724 public void begin() {
725 if (logger.isDebugEnabled()) logger.debug("begin transaction.");
726 transactionState = transactionState.begin();
727 handleTransactionException();
730 /** Internally begin the transaction.
731 * Called by transactionState.begin().
733 protected void internalBegin() {
735 clusterTransaction = db.startTransaction(joinTransactionId);
736 clusterTransaction.setLockMode(lockmode);
737 // if a transaction has already begun, tell the cluster transaction about the key
738 if (partitionKey != null) {
739 clusterTransaction.setPartitionKey(partitionKey);
741 // register our post-execute callback
742 clusterTransaction.postExecuteCallback(postExecuteCallbackHandler);
743 } catch (ClusterJException ex) {
744 throw new ClusterJException(
745 local.message("ERR_Ndb_Start"), ex);
749 /** Commit the current transaction.
752 public void commit() {
753 if (logger.isDebugEnabled()) logger.debug("commit transaction.");
754 transactionState = transactionState.commit();
755 handleTransactionException();
758 /** Internally commit the transaction.
759 * Called by transactionState.commit().
761 protected void internalCommit() {
765 throw new ClusterJException(
766 local.message("ERR_Transaction_Rollback_Only"));
767 } catch (ClusterJException ex) {
768 throw new ClusterJException(
769 local.message("ERR_Transaction_Rollback_Only"), ex);
773 clusterTransaction.executeCommit(true, true);
775 // always close the transaction
776 clusterTransaction.close();
777 clusterTransaction = null;
782 /** Roll back the current transaction.
785 public void rollback() {
786 if (logger.isDebugEnabled()) logger.debug("roll back transaction.");
787 transactionState = transactionState.rollback();
788 handleTransactionException();
791 /** Internally roll back the transaction.
792 * Called by transactionState.rollback() and
793 * transactionState.commit() if the transaction is marked for rollback.
796 protected void internalRollback() {
798 clusterTransaction.executeRollback();
799 } catch (ClusterJException ex) {
800 throw new ClusterJException(
801 local.message("ERR_Transaction_Execute", "rollback"), ex);
803 if (clusterTransaction != null) {
804 clusterTransaction.close();
806 clusterTransaction = null;
811 /** Start a transaction if there is not already an active transaction.
812 * Throw a ClusterJException if there is any problem.
814 public void startAutoTransaction() {
815 if (logger.isDebugEnabled()) logger.debug("start AutoTransaction");
816 transactionState = transactionState.start();
817 handleTransactionException();
820 /** End an auto transaction if it was started.
821 * Throw a ClusterJException if there is any problem.
823 public void endAutoTransaction() {
824 if (logger.isDebugEnabled()) logger.debug("end AutoTransaction");
825 transactionState = transactionState.end();
826 handleTransactionException();
829 /** Fail an auto transaction if it was started.
830 * Throw a ClusterJException if there is any problem.
832 public void failAutoTransaction() {
833 if (logger.isDebugEnabled()) logger.debug("fail AutoTransaction");
834 transactionState = transactionState.fail();
837 protected void handleTransactionException() {
838 if (transactionException == null) {
841 ClusterJException ex = transactionException;
842 transactionException = null;
847 /** Mark the current transaction as rollback only.
850 public void setRollbackOnly() {
854 /** Is the current transaction marked for rollback only?
855 * @return true if the current transaction is marked for rollback only
857 public boolean getRollbackOnly() {
861 /** Manage the state of the transaction associated with this
864 protected interface TransactionState {
867 TransactionState begin();
868 TransactionState commit();
869 TransactionState rollback();
871 TransactionState start();
872 TransactionState end();
873 TransactionState fail();
876 /** This represents the state of Transaction Not Active. */
877 protected TransactionState transactionStateNotActive =
878 new TransactionState() {
880 public boolean isActive() {
884 public TransactionState begin() {
887 return transactionStateActive;
888 } catch (ClusterJException ex) {
889 transactionException = ex;
890 return transactionStateNotActive;
894 public TransactionState commit() {
895 transactionException = new ClusterJUserException(
896 local.message("ERR_Transaction_Must_Be_Active_For_Method",
898 return transactionStateNotActive;
901 public TransactionState rollback() {
902 transactionException = new ClusterJUserException(
903 local.message("ERR_Transaction_Must_Be_Active_For_Method",
905 return transactionStateNotActive;
908 public TransactionState start() {
911 clusterTransaction.setAutocommit(true);
912 nestedAutoTransactionCounter = 1;
913 return transactionStateAutocommit;
914 } catch (ClusterJException ex) {
915 transactionException = ex;
916 return transactionStateNotActive;
920 public TransactionState end() {
921 throw new ClusterJFatalInternalException(
922 local.message("ERR_Transaction_Auto_Start", "end"));
925 public TransactionState fail() {
926 throw new ClusterJFatalInternalException(
927 local.message("ERR_Transaction_Auto_Start", "end"));
932 /** This represents the state of Transaction Active. */
933 protected TransactionState transactionStateActive =
934 new TransactionState() {
936 public boolean isActive() {
940 public TransactionState begin() {
941 transactionException = new ClusterJUserException(
942 local.message("ERR_Transaction_Must_Not_Be_Active_For_Method",
944 return transactionStateActive;
947 public TransactionState commit() {
949 // flush unwritten changes
952 } catch (ClusterJException ex) {
953 transactionException = ex;
955 return transactionStateNotActive;
958 public TransactionState rollback() {
961 return transactionStateNotActive;
962 } catch (ClusterJException ex) {
963 transactionException = ex;
964 return transactionStateNotActive;
968 public TransactionState start() {
970 return transactionStateActive;
973 public TransactionState end() {
975 return transactionStateActive;
978 public TransactionState fail() {
980 return transactionStateActive;
985 protected TransactionState transactionStateAutocommit =
986 new TransactionState() {
988 public boolean isActive() {
992 public TransactionState begin() {
993 throw new ClusterJFatalInternalException(
994 local.message("ERR_Transaction_Auto_End", "begin"));
997 public TransactionState commit() {
998 throw new ClusterJFatalInternalException(
999 local.message("ERR_Transaction_Auto_End", "commit"));
1002 public TransactionState rollback() {
1003 throw new ClusterJFatalInternalException(
1004 local.message("ERR_Transaction_Auto_End", "rollback"));
1007 public TransactionState start() {
1008 // nested start; increment counter
1009 nestedAutoTransactionCounter++;
1010 return transactionStateAutocommit;
1013 public TransactionState end() {
1014 if (--nestedAutoTransactionCounter > 0) {
1015 return transactionStateAutocommit;
1016 } else if (nestedAutoTransactionCounter == 0) {
1019 } catch (ClusterJException ex) {
1020 transactionException = ex;
1022 return transactionStateNotActive;
1024 throw new ClusterJFatalInternalException(
1025 local.message("ERR_Transaction_Auto_Start", "end"));
1029 public TransactionState fail() {
1031 nestedAutoTransactionCounter = 0;
1033 return transactionStateNotActive;
1034 } catch (ClusterJException ex) {
1035 // ignore failures caused by internal rollback
1036 return transactionStateNotActive;
1042 /** Get the domain type handler for an instance.
1044 * @param object the instance for which to get the domain type handler
1045 * @return the domain type handler
1047 protected synchronized <T> DomainTypeHandler<T> getDomainTypeHandler(T object) {
1048 DomainTypeHandler<T> domainTypeHandler =
1049 factory.getDomainTypeHandler(object, dictionary);
1050 return domainTypeHandler;
1053 /** Get the domain type handler for a class.
1055 * @param cls the class
1056 * @return the domain type handler
1058 public synchronized <T> DomainTypeHandler<T> getDomainTypeHandler(Class<T> cls) {
1059 DomainTypeHandler<T> domainTypeHandler =
1060 factory.getDomainTypeHandler(cls, dictionary);
1061 return domainTypeHandler;
1064 public Dictionary getDictionary() {
1068 /** Is there an active transaction.
1070 * @return true if there is an active transaction
1072 boolean isActive() {
1073 return transactionState.isActive();
1076 /** Is the transaction enlisted. A transaction is enlisted if and only if
1077 * an operation has been defined that requires an ndb transaction to be
1079 * @return true if the transaction is enlisted
1081 public boolean isEnlisted() {
1082 return clusterTransaction==null?false:clusterTransaction.isEnlisted();
1085 /** Assert that there is an active transaction (the user has called begin
1086 * or an autotransaction has begun).
1087 * Throw a user exception if not.
1089 private void assertActive() {
1090 if (!transactionState.isActive()) {
1091 throw new ClusterJUserException(
1092 local.message("ERR_Transaction_Must_Be_Active"));
1096 /** Assert that there is not an active transaction.
1097 * Throw a user exception if there is an active transaction.
1098 * @param methodName the name of the method
1100 private void assertNotActive(String methodName) {
1101 if (transactionState.isActive()) {
1102 throw new ClusterJUserException(
1103 local.message("ERR_Transaction_Must_Not_Be_Active_For_Method",
1108 /** Create a query from a class.
1110 * @param cls the class
1113 public Query createQuery(Class cls) {
1114 throw new UnsupportedOperationException(
1115 local.message("ERR_NotImplemented"));
1118 /** Get a query builder.
1120 * @return the query builder
1122 public QueryBuilder getQueryBuilder() {
1123 return new QueryBuilderImpl(this);
1126 /** Create an index scan operation for an index and table.
1128 * @param storeIndex the index
1129 * @param storeTable the table
1130 * @return the index scan operation
1132 public IndexScanOperation getIndexScanOperation(Index storeIndex, Table storeTable) {
1135 IndexScanOperation result = clusterTransaction.getIndexScanOperation(storeIndex, storeTable);
1137 } catch (ClusterJException ex) {
1138 throw new ClusterJException(
1139 local.message("ERR_Index_Scan", storeTable.getName(), storeIndex.getName()), ex);
1143 /** Create an index scan operation for an index and table to be used for a multi-range scan.
1145 * @param storeIndex the index
1146 * @param storeTable the table
1147 * @return the index scan operation
1149 public IndexScanOperation getIndexScanOperationMultiRange(Index storeIndex, Table storeTable) {
1152 IndexScanOperation result = clusterTransaction.getIndexScanOperationMultiRange(storeIndex, storeTable);
1154 } catch (ClusterJException ex) {
1155 throw new ClusterJException(
1156 local.message("ERR_Index_Scan", storeTable.getName(), storeIndex.getName()), ex);
1160 /** Create an index scan delete operation for an index and table.
1162 * @param storeIndex the index
1163 * @param storeTable the table
1164 * @return the index scan operation
1166 public IndexScanOperation getIndexScanDeleteOperation(Index storeIndex, Table storeTable) {
1169 IndexScanOperation result = clusterTransaction.getIndexScanOperationLockModeExclusiveScanFlagKeyInfo(storeIndex, storeTable);
1171 } catch (ClusterJException ex) {
1172 throw new ClusterJException(
1173 local.message("ERR_Index_Scan", storeTable.getName(), storeIndex.getName()), ex);
1177 /** Create a table scan operation for a table.
1179 * @param storeTable the table
1180 * @return the table scan operation
1182 public ScanOperation getTableScanOperation(Table storeTable) {
1185 ScanOperation result = clusterTransaction.getTableScanOperation(storeTable);
1187 } catch (ClusterJException ex) {
1188 throw new ClusterJException(
1189 local.message("ERR_Table_Scan", storeTable.getName()), ex);
1193 /** Create a table scan delete operation for a table.
1195 * @param storeTable the table
1196 * @return the table scan operation
1198 public ScanOperation getTableScanDeleteOperation(Table storeTable) {
1201 ScanOperation result = clusterTransaction.getTableScanOperationLockModeExclusiveScanFlagKeyInfo(storeTable);
1203 } catch (ClusterJException ex) {
1204 throw new ClusterJException(
1205 local.message("ERR_Table_Scan", storeTable.getName()), ex);
1209 /** Create an index operation for an index and table.
1211 * @param storeIndex the index
1212 * @param storeTable the table
1213 * @return the index operation
1215 public IndexOperation getUniqueIndexOperation(Index storeIndex, Table storeTable) {
1218 IndexOperation result = clusterTransaction.getUniqueIndexOperation(storeIndex, storeTable);
1220 } catch (ClusterJException ex) {
1221 throw new ClusterJException(
1222 local.message("ERR_Unique_Index", storeTable.getName(), storeIndex.getName()), ex);
1226 /** Create a select operation for a table.
1228 * @param storeTable the table
1229 * @return the operation
1231 public Operation getSelectOperation(Table storeTable) {
1234 Operation result = clusterTransaction.getSelectOperation(storeTable);
1236 } catch (ClusterJException ex) {
1237 throw new ClusterJException(
1238 local.message("ERR_Select", storeTable), ex);
1242 /** Create a delete operation for a table.
1244 * @param storeTable the table
1245 * @return the operation
1247 public Operation getDeleteOperation(Table storeTable) {
1250 Operation result = clusterTransaction.getDeleteOperation(storeTable);
1252 } catch (ClusterJException ex) {
1253 throw new ClusterJException(
1254 local.message("ERR_Delete", storeTable), ex);
1258 /** Create an index delete operation for an index and table.
1260 * @param storeIndex the index
1261 * @param storeTable the table
1262 * @return the index operation
1264 public IndexOperation getUniqueIndexDeleteOperation(Index storeIndex, Table storeTable) {
1267 IndexOperation result = clusterTransaction.getUniqueIndexDeleteOperation(storeIndex, storeTable);
1269 } catch (ClusterJException ex) {
1270 throw new ClusterJException(
1271 local.message("ERR_Unique_Index_Delete", storeTable.getName(), storeIndex.getName()), ex);
1275 public void flush() {
1276 if (logger.isDetailEnabled()) logger.detail("flush changes with changeList size: " + changeList.size());
1277 if (!changeList.isEmpty()) {
1278 for (StateManager sm: changeList) {
1283 // now flush changes to the back end
1284 if (clusterTransaction != null) {
1289 public List getChangeList() {
1290 return Collections.unmodifiableList(changeList);
1293 public void persist(Object instance) {
1294 makePersistent(instance);
1297 public void remove(Object instance) {
1298 deletePersistent(instance);
1301 public void markModified(StateManager instance) {
1302 changeList.add(instance);
1305 public void setPartitionKey(Class<?> domainClass, Object key) {
1306 DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(domainClass);
1307 String tableName = domainTypeHandler.getTableName();
1308 // if transaction is enlisted, throw a user exception
1310 throw new ClusterJUserException(
1311 local.message("ERR_Set_Partition_Key_After_Enlistment", tableName));
1313 // if a partition key has already been set, throw a user exception
1314 if (this.partitionKey != null) {
1315 throw new ClusterJUserException(
1316 local.message("ERR_Set_Partition_Key_Twice", tableName));
1318 ValueHandler handler = domainTypeHandler.createKeyValueHandler(key);
1319 this.partitionKey= domainTypeHandler.createPartitionKey(handler);
1320 // if a transaction has already begun, tell the cluster transaction about the key
1321 if (clusterTransaction != null) {
1322 clusterTransaction.setPartitionKey(partitionKey);
1326 /** Mark the field in the instance as modified so it is flushed.
1328 * @param instance the persistent instance
1329 * @param fieldName the field to mark as modified
1331 public void markModified(Object instance, String fieldName) {
1332 DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(instance);
1333 ValueHandler handler = domainTypeHandler.getValueHandler(instance);
1334 domainTypeHandler.objectMarkModified(handler, fieldName);
1337 /** Execute any pending operations (insert, delete, update, load)
1338 * and then perform post-execute operations (for load) via
1339 * clusterTransaction.postExecuteCallback().
1340 * @param abort abort this transaction on error
1341 * @param force force the operation to be sent immediately
1343 public void executeNoCommit(boolean abort, boolean force) {
1344 if (clusterTransaction != null) {
1345 clusterTransaction.executeNoCommit(abort, force);
1349 /** Execute any pending operations (insert, delete, update, load)
1350 * and then perform post-execute operations (for load) via
1351 * clusterTransaction.postExecuteCallback().
1352 * Abort the transaction on error. Force the operation to be sent immediately.
1354 public void executeNoCommit() {
1355 executeNoCommit(true, true);
1358 public <T> QueryDomainType<T> createQueryDomainType(DomainTypeHandler<T> domainTypeHandler) {
1359 QueryBuilderImpl builder = (QueryBuilderImpl)getQueryBuilder();
1360 return builder.createQueryDefinition(domainTypeHandler);
1363 /** Return the coordinatedTransactionId of the current transaction.
1364 * The transaction might not have been enlisted.
1365 * @return the coordinatedTransactionId
1367 public String getCoordinatedTransactionId() {
1368 return clusterTransaction.getCoordinatedTransactionId();
1371 /** Set the coordinatedTransactionId for the next transaction. This
1372 * will take effect as soon as the transaction is enlisted.
1373 * @param coordinatedTransactionId the coordinatedTransactionId
1375 public void setCoordinatedTransactionId(String coordinatedTransactionId) {
1376 clusterTransaction.setCoordinatedTransactionId(coordinatedTransactionId);
1379 /** Set the lock mode for subsequent operations. The lock mode takes effect immediately
1380 * and continues until set again.
1381 * @param lockmode the lock mode
1383 public void setLockMode(LockMode lockmode) {
1384 this.lockmode = lockmode;
1385 if (clusterTransaction != null) {
1386 clusterTransaction.setLockMode(lockmode);