]> review.fuel-infra Code Review - packages/trusty/mysql-wsrep-5.6.git/blob
ea4ba7a5e2d0f578051ec1fbfa5497b2b04e4570
[packages/trusty/mysql-wsrep-5.6.git] /
1 /*
2    Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
3
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
16 */
17
18 package com.mysql.clusterj.core;
19
20 import com.mysql.clusterj.ClusterJException;
21 import com.mysql.clusterj.ClusterJFatalInternalException;
22 import com.mysql.clusterj.ClusterJUserException;
23 import com.mysql.clusterj.DynamicObject;
24 import com.mysql.clusterj.LockMode;
25 import com.mysql.clusterj.Query;
26 import com.mysql.clusterj.Transaction;
27
28 import com.mysql.clusterj.core.spi.DomainTypeHandler;
29 import com.mysql.clusterj.core.spi.ValueHandler;
30
31 import com.mysql.clusterj.core.query.QueryDomainTypeImpl;
32 import com.mysql.clusterj.core.query.QueryBuilderImpl;
33 import com.mysql.clusterj.core.query.QueryImpl;
34
35 import com.mysql.clusterj.core.spi.SessionSPI;
36
37 import com.mysql.clusterj.core.store.ClusterTransaction;
38 import com.mysql.clusterj.core.store.Db;
39 import com.mysql.clusterj.core.store.Dictionary;
40 import com.mysql.clusterj.core.store.Index;
41 import com.mysql.clusterj.core.store.IndexOperation;
42 import com.mysql.clusterj.core.store.IndexScanOperation;
43 import com.mysql.clusterj.core.store.Operation;
44 import com.mysql.clusterj.core.store.PartitionKey;
45 import com.mysql.clusterj.core.store.ResultData;
46 import com.mysql.clusterj.core.store.ScanOperation;
47 import com.mysql.clusterj.core.store.Table;
48
49 import com.mysql.clusterj.core.util.I18NHelper;
50 import com.mysql.clusterj.core.util.Logger;
51 import com.mysql.clusterj.core.util.LoggerFactoryService;
52
53 import com.mysql.clusterj.query.QueryBuilder;
54 import com.mysql.clusterj.query.QueryDefinition;
55 import com.mysql.clusterj.query.QueryDomainType;
56
57 import java.util.ArrayList;
58 import java.util.BitSet;
59 import java.util.Collections;
60 import java.util.Iterator;
61 import java.util.List;
62 import java.util.Map;
63
64 /**
65  * This class implements Session, the main user interface to ClusterJ.
66  * It also implements SessionSPI, the main component interface.
67  */
68 public class SessionImpl implements SessionSPI, CacheManager, StoreManager {
69
70     /** My message translator */
71     static final I18NHelper local = I18NHelper.getInstance(SessionImpl.class);
72
73     /** My logger */
74     static final Logger logger = LoggerFactoryService.getFactory().getInstance(SessionImpl.class);
75
76     /** My Factory. */
77     protected SessionFactoryImpl factory;
78
79     /** Db: one per session. */
80     protected Db db;
81
82     /** Dictionary: one per session. */
83     protected Dictionary dictionary;
84
85     /** One transaction at a time. */
86     protected TransactionImpl transactionImpl;
87
88     /** The partition key */
89     protected PartitionKey partitionKey = null;
90
91     /** Rollback only status */
92     protected boolean rollbackOnly = false;
93
94     /** The underlying ClusterTransaction */
95     protected ClusterTransaction clusterTransaction;
96
97     /** The transaction id to join */
98     protected String joinTransactionId = null;
99
100     /** The properties for this session */
101     protected Map properties;
102
103     /** Flags for iterating a scan */
104     protected final int RESULT_READY = 0;
105     protected final int SCAN_FINISHED = 1;
106     protected final int CACHE_EMPTY = 2;
107
108     /** The list of objects changed since the last flush */
109     protected List<StateManager> changeList = new ArrayList<StateManager>();
110
111     /** The list of pending operations, such as load operations, that need to be
112      * processed after the operation is sent to the database via @see #executeNoCommit().
113      */
114     protected List<Runnable> postExecuteOperations = new ArrayList<Runnable>();
115
116     /** The transaction state of this session. */
117     protected TransactionState transactionState;
118
119     /** The exception state of an internal transaction. */
120     private ClusterJException transactionException;
121
122     /** Nested auto transaction counter. */
123     protected int nestedAutoTransactionCounter = 0;
124
125     /** Number of retries for retriable exceptions */
126     // TODO get this from properties
127     protected int numberOfRetries = 5;
128
129     /** The lock mode for read operations */
130     private LockMode lockmode = LockMode.READ_COMMITTED;
131
132     /** Our post-execute callback handler */
133     private Runnable postExecuteCallbackHandler = new Runnable() {
134         public void run() {
135             for (Runnable postExecuteCallback: postExecuteOperations) {
136                 postExecuteCallback.run();
137             }
138             postExecuteOperations.clear();
139         }
140     };
141
142     /** Create a SessionImpl with factory, properties, Db, and dictionary
143      */
144     SessionImpl(SessionFactoryImpl factory, Map properties, 
145             Db db, Dictionary dictionary) {
146         this.factory = factory;
147         this.db = db;
148         this.dictionary = dictionary;
149         this.properties = properties;
150         transactionImpl = new TransactionImpl(this);
151         transactionState = transactionStateNotActive;
152     }
153
154     /** Create a query from a query definition.
155      * 
156      * @param qd the query definition
157      * @return the query
158      */
159     public <T> Query<T> createQuery(QueryDefinition<T> qd) {
160         if (!(qd instanceof QueryDomainTypeImpl)) {
161             throw new ClusterJUserException(
162                     local.message("ERR_Exception_On_Method", "createQuery"));
163         }
164         return new QueryImpl<T>(this, (QueryDomainTypeImpl<T>)qd);
165     }
166
167     /** Find an instance by its class and primary key.
168      * If there is a compound primary key, the key is an Object[] containing
169      * all of the primary key fields in order of declaration in annotations.
170      * 
171      * @param cls the class
172      * @param key the primary key
173      * @return the instance
174      */
175     public <T> T find(Class<T> cls, Object key) {
176         DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls);
177         T instance = (T) factory.newInstance(cls, dictionary);
178         ValueHandler keyHandler = domainTypeHandler.createKeyValueHandler(key);
179         ValueHandler instanceHandler = domainTypeHandler.getValueHandler(instance);
180         // initialize from the database using the key
181         return (T) initializeFromDatabase(
182                 domainTypeHandler, instance, instanceHandler, keyHandler);
183     }
184
185     /** Initialize fields from the database. The keyHandler must
186      * contain the primary keys, none of which can be null.
187      * The instanceHandler, which may be null, manages the values
188      * of the instance. If it is null, both the instanceHandler and the
189      * instance are created if the instance exists in the database.
190      * The instance, which may be null, is the domain instance that is
191      * returned after loading the values from the database.
192      *
193      * @param domainTypeHandler domain type handler for the class
194      * @param keyHandler the primary key handler
195      * @param instanceHandler the handler for the instance
196      * (may be null if not yet initialized)
197      * @param instance the instance (may be null)
198      * @return the instance with fields initialized from the database
199      */
200     public <T> T initializeFromDatabase(DomainTypeHandler<T> domainTypeHandler,
201             T instance,
202             ValueHandler instanceHandler, ValueHandler keyHandler) {
203         startAutoTransaction();
204         try {
205             ResultData rs = selectUnique(domainTypeHandler, keyHandler, null);
206             if (rs.next()) {
207                 // we have a result; initialize the instance
208                 if (instanceHandler == null) {
209                     if (logger.isDetailEnabled()) logger.detail("Creating instanceHandler for class " + domainTypeHandler.getName() + " table: " + domainTypeHandler.getTableName() + keyHandler.pkToString(domainTypeHandler));
210                     // we need both a new instance and its handler
211                     instance = domainTypeHandler.newInstance();
212                     instanceHandler = domainTypeHandler.getValueHandler(instance);
213                 } else if (instance == null) {
214                 if (logger.isDetailEnabled()) logger.detail("Creating instance for class " + domainTypeHandler.getName() + " table: " + domainTypeHandler.getTableName() + keyHandler.pkToString(domainTypeHandler));
215                     // we have a handler but no instance
216                     instance = domainTypeHandler.getInstance(instanceHandler);
217                 }
218                 // found the instance in the datastore
219                 instanceHandler.found(Boolean.TRUE);
220                 // put the results into the instance
221                 domainTypeHandler.objectSetValues(rs, instanceHandler);
222                 // set the cache manager to track updates
223                 domainTypeHandler.objectSetCacheManager(this, instanceHandler);
224                 // reset modified bits in instance
225                 domainTypeHandler.objectResetModified(instanceHandler);
226             } else {
227                 if (logger.isDetailEnabled()) logger.detail("No instance found in database for class " + domainTypeHandler.getName() + " table: " + domainTypeHandler.getTableName() + keyHandler.pkToString(domainTypeHandler));
228                 // no instance found in database
229                 if (instanceHandler != null) {
230                     // mark the handler as not found
231                     instanceHandler.found(Boolean.FALSE);
232                 }
233                 endAutoTransaction();
234                 return null;
235             }
236         } catch (ClusterJException ex) {
237             failAutoTransaction();
238             throw ex;
239         }
240         endAutoTransaction();
241         return instance;
242     }
243
244     /** If a transaction is already enlisted, ignore. Otherwise, set
245      * the partition key based on the key handler.
246      * @param domainTypeHandler the domain type handler
247      * @param keyHandler the value handler that holds the key values
248      */
249     private void setPartitionKey(DomainTypeHandler<?> domainTypeHandler,
250             ValueHandler keyHandler) {
251         if (!isEnlisted()) {
252             // there is still time to set the partition key
253             PartitionKey partitionKey = 
254                 domainTypeHandler.createPartitionKey(keyHandler);
255             clusterTransaction.setPartitionKey(partitionKey);
256         }
257     }
258
259     /** Create an instance of a class to be persisted.
260      * 
261      * @param cls the class
262      * @return a new instance that can be used with makePersistent
263      */
264     public <T> T newInstance(Class<T> cls) {
265         return factory.newInstance(cls, dictionary);
266     }
267
268     /** Create an instance of a class to be persisted and set the primary key.
269      * 
270      * @param cls the class
271      * @return a new instance that can be used with makePersistent,
272      * savePersistent, writePersistent, updatePersistent, or deletePersistent
273      */
274     public <T> T newInstance(Class<T> cls, Object key) {
275         DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls);
276         T instance = factory.newInstance(cls, dictionary);
277         domainTypeHandler.objectSetKeys(key, instance);
278         return instance;
279     }
280
281     /** Load the instance from the database into memory. Loading
282      * is asynchronous and will be executed when an operation requiring
283      * database access is executed: find, flush, or query. The instance must
284      * have been returned from find or query; or
285      * created via session.newInstance and its primary key initialized.
286      * @param object the instance to load
287      * @return the instance
288      * @see #found(Object)
289      */
290     public <T> T load(final T object) {
291         if (object == null) {
292             return null;
293         }
294         if (Iterable.class.isAssignableFrom(object.getClass())) {
295             Iterable<?> instances = (Iterable<?>)object;
296             for (Object instance:instances) {
297                 load(instance);
298             }
299             return object;
300         }
301         if (object.getClass().isArray()) {
302             Object[] instances = (Object[])object;
303             for (Object instance:instances) {
304                 load(instance);
305             }
306             return object;
307         }
308         // a transaction must already be active (autocommit is not supported)
309         assertActive();
310         final DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(object);
311         final ValueHandler instanceHandler = domainTypeHandler.getValueHandler(object);
312         setPartitionKey(domainTypeHandler, instanceHandler);
313         Table storeTable = domainTypeHandler.getStoreTable();
314         // perform a primary key operation
315         final Operation op = clusterTransaction.getSelectOperation(storeTable);
316         // set the keys into the operation
317         domainTypeHandler.operationSetKeys(instanceHandler, op);
318         // set the expected columns into the operation
319         domainTypeHandler.operationGetValues(op);
320         final ResultData rs = op.resultData(false);
321         final SessionImpl cacheManager = this;
322         // defer execution of the key operation until the next find, flush, or query
323         Runnable postExecuteOperation = new Runnable() {
324             public void run() {
325                 if (rs.next()) {
326                     // found row in database
327                     instanceHandler.found(Boolean.TRUE);
328                    // put the results into the instance
329                     domainTypeHandler.objectSetValues(rs, instanceHandler);
330                     // set the cache manager to track updates
331                     domainTypeHandler.objectSetCacheManager(cacheManager, instanceHandler);
332                     // reset modified bits in instance
333                     domainTypeHandler.objectResetModified(instanceHandler);
334                 } else {
335                     // mark instance as not found
336                     instanceHandler.found(Boolean.FALSE);
337                 }
338                 
339             }
340         };
341         postExecuteOperations.add(postExecuteOperation);
342         return object;
343     }
344
345     /** Was this instance found in the database?
346      * @param instance the instance
347      * @return <ul><li>null if the instance is null or was created via newInstance and never loaded;
348      * </li><li>true if the instance was returned from a find or query
349      * or created via newInstance and successfully loaded;
350      * </li><li>false if the instance was created via newInstance and not found.
351      * </li></ul>
352      */
353     public Boolean found(Object instance) {
354         if (instance == null) {
355             return null;
356         }
357         if (instance instanceof DynamicObject) {
358             return ((DynamicObject)instance).found();
359         }
360         // make sure the instance is a persistent type
361         getDomainTypeHandler(instance);
362         return true;
363     }
364     
365     /** Make an instance persistent. Also recursively make an iterable collection or array persistent.
366      * 
367      * @param object the instance or array or iterable collection of instances
368      * @return the instance
369      */
370     public <T> T makePersistent(T object) {
371         if (object == null) {
372             return null;
373         }
374         if (Iterable.class.isAssignableFrom(object.getClass())) {
375             startAutoTransaction();
376             Iterable<?> instances = (Iterable<?>)object;
377             for (Object instance:instances) {
378                 makePersistent(instance);
379             }
380             endAutoTransaction();
381             return object;
382         }
383         if (object.getClass().isArray()) {
384             startAutoTransaction();
385             Object[] instances = (Object[])object;
386             for (Object instance:instances) {
387                 makePersistent(instance);
388             }
389             endAutoTransaction();
390             return object;
391         }
392         DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(object);
393         ValueHandler valueHandler = domainTypeHandler.getValueHandler(object);
394         insert(domainTypeHandler, valueHandler);
395         return object;
396     }
397
398     public Operation insert(
399             DomainTypeHandler<?> domainTypeHandler, ValueHandler valueHandler) {
400         startAutoTransaction();
401         setPartitionKey(domainTypeHandler, valueHandler);
402         Operation op = null;
403         Table storeTable = null;
404         try {
405             storeTable = domainTypeHandler.getStoreTable();
406             op = clusterTransaction.getInsertOperation(storeTable);
407             // set all values in the operation, keys first
408             domainTypeHandler.operationSetKeys(valueHandler, op);
409             domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op);
410             // reset modified bits in instance
411             domainTypeHandler.objectResetModified(valueHandler);
412         } catch (ClusterJUserException cjuex) {
413             failAutoTransaction();
414             throw cjuex;
415         } catch (ClusterJException cjex) {
416             failAutoTransaction();
417             logger.error(local.message("ERR_Insert", storeTable.getName()));
418             throw new ClusterJException(
419                     local.message("ERR_Insert", storeTable.getName()), cjex);
420         } catch (RuntimeException rtex) {
421             failAutoTransaction();
422             logger.error(local.message("ERR_Insert", storeTable.getName()));
423             throw new ClusterJException(
424                     local.message("ERR_Insert", storeTable.getName()), rtex);
425         }
426         endAutoTransaction();
427         return op;
428     }
429
430     /** Make a number of instances persistent.
431      * 
432      * @param instances a Collection or array of objects to persist
433      * @return a Collection or array with the same order of iteration
434      */
435     public Iterable makePersistentAll(Iterable instances) {
436         startAutoTransaction();
437         List<Object> result = new ArrayList<Object>();
438         for (Object instance:instances) {
439             result.add(makePersistent(instance));
440             }
441         endAutoTransaction();
442         return result;
443     }
444
445     /** Delete an instance of a class from the database given its primary key.
446      * For single-column keys, the key parameter is a wrapper (e.g. Integer).
447      * For multi-column keys, the key parameter is an Object[] in which
448      * elements correspond to the primary keys in order as defined in the schema.
449      * @param cls the class
450      * @param key the primary key
451      */
452     public <T> void deletePersistent(Class<T> cls, Object key) {
453         DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls);
454         ValueHandler keyValueHandler = domainTypeHandler.createKeyValueHandler(key);
455         delete(domainTypeHandler, keyValueHandler);
456     }
457
458     /** Remove an instance from the database. Only the key field(s) 
459      * are used to identify the instance.
460      * 
461      * @param object the instance to remove from the database
462      */
463     public void deletePersistent(Object object) {
464         if (object == null) {
465             return;
466         }
467         DomainTypeHandler domainTypeHandler = getDomainTypeHandler(object);
468         ValueHandler valueHandler = domainTypeHandler.getValueHandler(object);
469         delete(domainTypeHandler, valueHandler);
470     }
471
472     public Operation delete(DomainTypeHandler domainTypeHandler, ValueHandler valueHandler) {
473         startAutoTransaction();
474         Table storeTable = domainTypeHandler.getStoreTable();
475         setPartitionKey(domainTypeHandler, valueHandler);
476         Operation op = null;
477         try {
478             op = clusterTransaction.getDeleteOperation(storeTable);
479             domainTypeHandler.operationSetKeys(valueHandler, op);
480         } catch (ClusterJException ex) {
481             failAutoTransaction();
482             throw new ClusterJException(
483                     local.message("ERR_Delete", storeTable.getName()), ex);
484         }
485         endAutoTransaction();
486         return op;
487     }
488
489     /** Delete the instances corresponding to the parameters.
490      * @param objects the objects to delete
491      */
492     public void deletePersistentAll(Iterable objects) {
493         startAutoTransaction();
494         for (Iterator it = objects.iterator(); it.hasNext();) {
495             deletePersistent(it.next());
496         }
497         endAutoTransaction();
498     }
499
500     /** Delete all instances of the parameter class.
501      * @param cls the class of instances to delete
502      */
503     public <T> int deletePersistentAll(Class<T> cls) {
504         DomainTypeHandler<T> domainTypeHandler = getDomainTypeHandler(cls);
505         return deletePersistentAll(domainTypeHandler);
506     }
507
508     /** Delete all instances of the parameter domainTypeHandler.
509      * @param domainTypeHandler the domainTypeHandler of instances to delete
510      * @return the number of instances deleted
511      */
512     public int deletePersistentAll(DomainTypeHandler<?> domainTypeHandler) {
513         startAutoTransaction();
514         Table storeTable = domainTypeHandler.getStoreTable();
515         String tableName = storeTable.getName();
516         ScanOperation op = null;
517         int count = 0;
518         try {
519             op = clusterTransaction.getTableScanOperationLockModeExclusiveScanFlagKeyInfo(storeTable);
520             count = deletePersistentAll(op, true);
521         } catch (ClusterJException ex) {
522             failAutoTransaction();
523             // TODO add table name to the error message
524             throw new ClusterJException(
525                     local.message("ERR_Delete_All", tableName), ex);
526         }
527         endAutoTransaction();
528         return count;
529     }
530
531     /** Delete all instances retrieved by the operation. The operation must have exclusive
532      * access to the instances and have the ScanFlag.KEY_INFO flag set.
533      * @param op the scan operation
534      * @return the number of instances deleted
535      */
536     public int deletePersistentAll(ScanOperation op, boolean abort) {
537         int cacheCount = 0;
538         int count = 0;
539         boolean done = false;
540         boolean fetch = true;
541         // cannot use early autocommit optimization here
542         clusterTransaction.setAutocommit(false);
543         // execute the operation
544         clusterTransaction.executeNoCommit(true, true);
545         while (!done ) {
546             int result = op.nextResult(fetch);
547             switch (result) {
548                 case RESULT_READY:
549                     op.deleteCurrentTuple();
550                     ++count;
551                     ++cacheCount;
552                     fetch = false;
553                     break;
554                 case SCAN_FINISHED:
555                     done = true;
556                     if (cacheCount != 0) {
557                         clusterTransaction.executeNoCommit(abort, true);
558                     }
559                     op.close();
560                     break;
561                 case CACHE_EMPTY:
562                     clusterTransaction.executeNoCommit(abort, true);
563                     cacheCount = 0;
564                     fetch = true;
565                     break;
566                 default: 
567                     throw new ClusterJException(
568                             local.message("ERR_Next_Result_Illegal", result));
569             }
570         }
571         return count;
572     }
573
574     /** Select a single row from the database. Only the fields requested
575      * will be selected. A transaction must be active (either via begin
576      * or startAutoTransaction).
577      *
578      * @param domainTypeHandler the domainTypeHandler to be selected
579      * @param keyHandler the key supplier for the select
580      * @param fields the fields to select; null to select all fields
581      * @return the ResultData from the database
582      */
583     public ResultData selectUnique(DomainTypeHandler domainTypeHandler,
584             ValueHandler keyHandler, BitSet fields) {
585         assertActive();
586         setPartitionKey(domainTypeHandler, keyHandler);
587         Table storeTable = domainTypeHandler.getStoreTable();
588         // perform a single select by key operation
589         Operation op = clusterTransaction.getSelectOperation(storeTable);
590         // set the keys into the operation
591         domainTypeHandler.operationSetKeys(keyHandler, op);
592         // set the expected columns into the operation
593         domainTypeHandler.operationGetValues(op);
594         // execute the select and get results
595         ResultData rs = op.resultData();
596         return rs;
597     }
598
599     /** Update an instance in the database. The key field(s)
600      * are used to identify the instance; modified fields change the
601      * values in the database.
602      *
603      * @param object the instance to update in the database
604      */
605     public void updatePersistent(Object object) {
606         if (object == null) {
607             return;
608         }
609         DomainTypeHandler domainTypeHandler = getDomainTypeHandler(object);
610         if (logger.isDetailEnabled()) logger.detail("UpdatePersistent on object " + object);
611         ValueHandler valueHandler = domainTypeHandler.getValueHandler(object);
612         update(domainTypeHandler, valueHandler);
613     }
614
615     public Operation update(DomainTypeHandler domainTypeHandler, ValueHandler valueHandler) {
616         startAutoTransaction();
617         setPartitionKey(domainTypeHandler, valueHandler);
618         Table storeTable = null;
619         Operation op = null;
620         try {
621             storeTable = domainTypeHandler.getStoreTable();
622             op = clusterTransaction.getUpdateOperation(storeTable);
623             domainTypeHandler.operationSetKeys(valueHandler, op);
624             domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op);
625             if (logger.isDetailEnabled()) logger.detail("Updated object " +
626                     valueHandler);
627         } catch (ClusterJException ex) {
628             failAutoTransaction();
629             throw new ClusterJException(
630                     local.message("ERR_Update", storeTable.getName()) ,ex);
631         }
632         endAutoTransaction();
633         return op;
634     }
635
636     /** Update the instances corresponding to the parameters.
637      * @param objects the objects to update
638      */
639     public void updatePersistentAll(Iterable objects) {
640         startAutoTransaction();
641         for (Iterator it = objects.iterator(); it.hasNext();) {
642             updatePersistent(it.next());
643         }
644         endAutoTransaction();
645     }
646
647     /** Save the instance even if it does not exist.
648      * @param instance the instance to save
649      */
650     public <T> T savePersistent(T instance) {
651         DomainTypeHandler domainTypeHandler = getDomainTypeHandler(instance);
652         if (logger.isDetailEnabled()) logger.detail("UpdatePersistent on object " + instance);
653         ValueHandler valueHandler = domainTypeHandler.getValueHandler(instance);
654         startAutoTransaction();
655         setPartitionKey(domainTypeHandler, valueHandler);
656         Table storeTable = null;
657         try {
658             storeTable = domainTypeHandler.getStoreTable();
659             Operation op = null;
660             op = clusterTransaction.getWriteOperation(storeTable);
661             domainTypeHandler.operationSetKeys(valueHandler, op);
662             domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op);
663             if (logger.isDetailEnabled()) logger.detail("Wrote object " +
664                     valueHandler);
665         } catch (ClusterJException ex) {
666             failAutoTransaction();
667             throw new ClusterJException(
668                     local.message("ERR_Write", storeTable.getName()) ,ex);
669         }
670         endAutoTransaction();
671         return instance;
672     }
673
674     /** Save the instances even if they do not exist.
675      * @param instances
676      */
677     public Iterable savePersistentAll(Iterable instances) {
678         List<Object> result = new ArrayList<Object>();
679         startAutoTransaction();
680         for (Iterator it = instances.iterator(); it.hasNext();) {
681             result.add(savePersistent(it.next()));
682         }
683         endAutoTransaction();
684         return result;
685     }
686
687     /** Get the current transaction.
688      * 
689      * @return the transaction
690      */
691     public Transaction currentTransaction() {
692         return transactionImpl;
693     }
694
695     /** Close this session and deallocate all resources.
696      * 
697      */
698     public void close() {
699         if (clusterTransaction != null) {
700             clusterTransaction.close();
701             clusterTransaction = null;
702         }
703         if (db != null) {
704             db.close();
705             db = null;
706         }
707     }
708
709     public boolean isClosed() {
710         return db==null;
711     }
712
713     /** Assert this session is not yet closed. */
714     protected void assertNotClosed() {
715         if (isClosed()) {
716             throw new ClusterJUserException(
717                     local.message("ERR_Session_Closed"));
718         }
719     }
720
721     /** Begin the current transaction.
722      * 
723      */
724     public void begin() {
725         if (logger.isDebugEnabled()) logger.debug("begin transaction.");
726         transactionState = transactionState.begin();
727         handleTransactionException();
728     }
729
730     /** Internally begin the transaction.
731      * Called by transactionState.begin().
732      */
733     protected void internalBegin() {
734         try {
735             clusterTransaction = db.startTransaction(joinTransactionId);
736             clusterTransaction.setLockMode(lockmode);
737             // if a transaction has already begun, tell the cluster transaction about the key
738             if (partitionKey != null) {
739                 clusterTransaction.setPartitionKey(partitionKey);
740             }
741             // register our post-execute callback
742             clusterTransaction.postExecuteCallback(postExecuteCallbackHandler);
743         } catch (ClusterJException ex) {
744             throw new ClusterJException(
745                     local.message("ERR_Ndb_Start"), ex);
746         }
747     }
748
749     /** Commit the current transaction.
750      * 
751      */
752     public void commit() {
753         if (logger.isDebugEnabled()) logger.debug("commit transaction.");
754         transactionState = transactionState.commit();
755         handleTransactionException();
756     }
757
758     /** Internally commit the transaction.
759      * Called by transactionState.commit().
760      */
761     protected void internalCommit() {
762         if (rollbackOnly) {
763             try {
764                 internalRollback();
765                 throw new ClusterJException(
766                         local.message("ERR_Transaction_Rollback_Only"));
767             } catch (ClusterJException ex) {
768                 throw new ClusterJException(
769                         local.message("ERR_Transaction_Rollback_Only"), ex);
770             }
771         }
772         try {
773             clusterTransaction.executeCommit(true, true);
774         } finally {
775             // always close the transaction
776             clusterTransaction.close();
777             clusterTransaction = null;
778             partitionKey = null;
779         }
780     }
781
782     /** Roll back the current transaction.
783      *
784      */
785     public void rollback() {
786         if (logger.isDebugEnabled()) logger.debug("roll back transaction.");
787         transactionState = transactionState.rollback();
788         handleTransactionException();
789     }
790
791     /** Internally roll back the transaction.
792      * Called by transactionState.rollback() and
793      * transactionState.commit() if the transaction is marked for rollback.
794      *
795      */
796     protected void internalRollback() {
797         try {
798                 clusterTransaction.executeRollback();
799         } catch (ClusterJException ex) {
800             throw new ClusterJException(
801                     local.message("ERR_Transaction_Execute", "rollback"), ex);
802         } finally {
803             if (clusterTransaction != null) {
804                 clusterTransaction.close();
805             }
806             clusterTransaction = null;
807             partitionKey = null;
808         }
809     }
810
811     /** Start a transaction if there is not already an active transaction.
812      * Throw a ClusterJException if there is any problem.
813      */
814     public void startAutoTransaction() {
815         if (logger.isDebugEnabled()) logger.debug("start AutoTransaction");
816         transactionState = transactionState.start();
817         handleTransactionException();
818     }
819
820     /** End an auto transaction if it was started.
821      * Throw a ClusterJException if there is any problem.
822      */
823     public void endAutoTransaction() {
824         if (logger.isDebugEnabled()) logger.debug("end AutoTransaction");
825         transactionState = transactionState.end();
826         handleTransactionException();
827     }
828
829     /** Fail an auto transaction if it was started.
830      * Throw a ClusterJException if there is any problem.
831      */
832     public void failAutoTransaction() {
833         if (logger.isDebugEnabled()) logger.debug("fail AutoTransaction");
834         transactionState = transactionState.fail();
835     }
836
837     protected void handleTransactionException() {
838         if (transactionException == null) {
839             return;
840         } else {
841             ClusterJException ex = transactionException;
842             transactionException = null;
843             throw ex;
844         }
845     }
846
847     /** Mark the current transaction as rollback only.
848      *
849      */
850     public void setRollbackOnly() {
851         rollbackOnly = true;
852     }
853
854     /** Is the current transaction marked for rollback only?
855      * @return true if the current transaction is marked for rollback only
856      */
857     public boolean getRollbackOnly() {
858         return rollbackOnly;
859     }
860
861     /** Manage the state of the transaction associated with this
862      * StateManager.
863      */
864     protected interface TransactionState {
865         boolean isActive();
866
867         TransactionState begin();
868         TransactionState commit();
869         TransactionState rollback();
870
871         TransactionState start();
872         TransactionState end();
873         TransactionState fail();
874     }
875
876     /** This represents the state of Transaction Not Active. */
877     protected TransactionState transactionStateNotActive =
878             new TransactionState() {
879
880         public boolean isActive() {
881             return false;
882         }
883
884         public TransactionState begin() {
885             try {
886                 internalBegin();
887                 return transactionStateActive;
888             } catch (ClusterJException ex) {
889                 transactionException = ex;
890                 return transactionStateNotActive;
891             }
892         }
893
894         public TransactionState commit() {
895             transactionException = new ClusterJUserException(
896                     local.message("ERR_Transaction_Must_Be_Active_For_Method",
897                     "commit"));
898             return transactionStateNotActive;
899         }
900
901         public TransactionState rollback() {
902             transactionException = new ClusterJUserException(
903                     local.message("ERR_Transaction_Must_Be_Active_For_Method",
904                     "rollback"));
905             return transactionStateNotActive;
906         }
907
908         public TransactionState start() {
909             try {
910                 internalBegin();
911                 clusterTransaction.setAutocommit(true);
912                 nestedAutoTransactionCounter = 1;
913                 return transactionStateAutocommit;
914             } catch (ClusterJException ex) {
915                 transactionException = ex;
916                 return transactionStateNotActive;
917             }
918         }
919
920         public TransactionState end() {
921             throw new ClusterJFatalInternalException(
922                     local.message("ERR_Transaction_Auto_Start", "end"));
923         }
924
925         public TransactionState fail() {
926             throw new ClusterJFatalInternalException(
927                     local.message("ERR_Transaction_Auto_Start", "end"));
928         }
929
930     };
931
932     /** This represents the state of Transaction Active. */
933     protected TransactionState transactionStateActive =
934             new TransactionState() {
935
936         public boolean isActive() {
937             return true;
938         }
939
940         public TransactionState begin() {
941             transactionException = new ClusterJUserException(
942                     local.message("ERR_Transaction_Must_Not_Be_Active_For_Method",
943                     "begin"));
944             return transactionStateActive;
945         }
946
947         public TransactionState commit() {
948             try {
949                 // flush unwritten changes
950                 flush();
951                 internalCommit();
952             } catch (ClusterJException ex) {
953                 transactionException = ex;
954             }
955             return transactionStateNotActive;
956         }
957
958         public TransactionState rollback() {
959             try {
960                 internalRollback();
961                 return transactionStateNotActive;
962             } catch (ClusterJException ex) {
963                 transactionException = ex;
964                 return transactionStateNotActive;
965             }
966         }
967
968         public TransactionState start() {
969             // nothing to do
970             return transactionStateActive;
971         }
972
973         public TransactionState end() {
974             // nothing to do
975             return transactionStateActive;
976         }
977
978         public TransactionState fail() {
979             // nothing to do
980             return transactionStateActive;
981         }
982
983     };
984
985     protected TransactionState transactionStateAutocommit =
986             new TransactionState() {
987
988         public boolean isActive() {
989             return true;
990         }
991
992         public TransactionState begin() {
993             throw new ClusterJFatalInternalException(
994                     local.message("ERR_Transaction_Auto_End", "begin"));
995         }
996
997         public TransactionState commit() {
998             throw new ClusterJFatalInternalException(
999                     local.message("ERR_Transaction_Auto_End", "commit"));
1000         }
1001
1002         public TransactionState rollback() {
1003             throw new ClusterJFatalInternalException(
1004                     local.message("ERR_Transaction_Auto_End", "rollback"));
1005         }
1006
1007         public TransactionState start() {
1008             // nested start; increment counter
1009             nestedAutoTransactionCounter++;
1010             return transactionStateAutocommit;
1011         }
1012
1013         public TransactionState end() {
1014             if (--nestedAutoTransactionCounter > 0) {
1015                 return transactionStateAutocommit;
1016             } else if (nestedAutoTransactionCounter == 0) {
1017                 try {
1018                     internalCommit();
1019                 } catch (ClusterJException ex) {
1020                     transactionException = ex;
1021                 }
1022                 return transactionStateNotActive;
1023             } else {
1024             throw new ClusterJFatalInternalException(
1025                     local.message("ERR_Transaction_Auto_Start", "end"));
1026             }
1027         }
1028
1029         public TransactionState fail() {
1030             try {
1031                 nestedAutoTransactionCounter = 0;
1032                 internalRollback();
1033                 return transactionStateNotActive;
1034             } catch (ClusterJException ex) {
1035                 // ignore failures caused by internal rollback
1036                 return transactionStateNotActive;
1037             }
1038         }
1039
1040     };
1041
1042     /** Get the domain type handler for an instance.
1043      * 
1044      * @param object the instance for which to get the domain type handler
1045      * @return the domain type handler
1046      */
1047     protected synchronized <T> DomainTypeHandler<T> getDomainTypeHandler(T object) {
1048         DomainTypeHandler<T> domainTypeHandler =
1049                 factory.getDomainTypeHandler(object, dictionary);
1050         return domainTypeHandler;
1051     }
1052
1053     /** Get the domain type handler for a class.
1054      * 
1055      * @param cls the class
1056      * @return the domain type handler
1057      */
1058     public synchronized <T> DomainTypeHandler<T> getDomainTypeHandler(Class<T> cls) {
1059         DomainTypeHandler<T> domainTypeHandler =
1060                 factory.getDomainTypeHandler(cls, dictionary);
1061         return domainTypeHandler;
1062     }
1063
1064     public Dictionary getDictionary() {
1065         return dictionary;
1066     }
1067
1068     /** Is there an active transaction.
1069      * 
1070      * @return true if there is an active transaction
1071      */
1072     boolean isActive() {
1073         return transactionState.isActive();
1074     }
1075
1076     /** Is the transaction enlisted. A transaction is enlisted if and only if
1077      * an operation has been defined that requires an ndb transaction to be
1078      * started.
1079      * @return true if the transaction is enlisted
1080      */
1081     public boolean isEnlisted() {
1082         return clusterTransaction==null?false:clusterTransaction.isEnlisted();
1083     }
1084
1085     /** Assert that there is an active transaction (the user has called begin
1086      * or an autotransaction has begun).
1087      * Throw a user exception if not.
1088      */
1089     private void assertActive() {
1090         if (!transactionState.isActive()) {
1091             throw new ClusterJUserException(
1092                     local.message("ERR_Transaction_Must_Be_Active"));
1093         }
1094     }
1095
1096     /** Assert that there is not an active transaction.
1097      * Throw a user exception if there is an active transaction.
1098      * @param methodName the name of the method
1099      */
1100     private void assertNotActive(String methodName) {
1101         if (transactionState.isActive()) {
1102             throw new ClusterJUserException(
1103                     local.message("ERR_Transaction_Must_Not_Be_Active_For_Method",
1104                     methodName));
1105         }
1106     }
1107
1108     /** Create a query from a class.
1109      * 
1110      * @param cls the class
1111      * @return the query
1112      */
1113     public Query createQuery(Class cls) {
1114         throw new UnsupportedOperationException(
1115                 local.message("ERR_NotImplemented"));
1116     }
1117
1118     /** Get a query builder.
1119      * 
1120      * @return the query builder
1121      */
1122     public QueryBuilder getQueryBuilder() {
1123         return new QueryBuilderImpl(this);
1124     }
1125
1126     /** Create an index scan operation for an index and table.
1127      * 
1128      * @param storeIndex the index
1129      * @param storeTable the table
1130      * @return the index scan operation
1131      */
1132     public IndexScanOperation getIndexScanOperation(Index storeIndex, Table storeTable) {
1133         assertActive();
1134         try {
1135             IndexScanOperation result = clusterTransaction.getIndexScanOperation(storeIndex, storeTable);
1136             return result;
1137         } catch (ClusterJException ex) {
1138             throw new ClusterJException(
1139                     local.message("ERR_Index_Scan", storeTable.getName(), storeIndex.getName()), ex);
1140         }
1141     }
1142
1143     /** Create an index scan operation for an index and table to be used for a multi-range scan.
1144      * 
1145      * @param storeIndex the index
1146      * @param storeTable the table
1147      * @return the index scan operation
1148      */
1149     public IndexScanOperation getIndexScanOperationMultiRange(Index storeIndex, Table storeTable) {
1150         assertActive();
1151         try {
1152             IndexScanOperation result = clusterTransaction.getIndexScanOperationMultiRange(storeIndex, storeTable);
1153             return result;
1154         } catch (ClusterJException ex) {
1155             throw new ClusterJException(
1156                     local.message("ERR_Index_Scan", storeTable.getName(), storeIndex.getName()), ex);
1157         }
1158     }
1159
1160     /** Create an index scan delete operation for an index and table.
1161      * 
1162      * @param storeIndex the index
1163      * @param storeTable the table
1164      * @return the index scan operation
1165      */
1166     public IndexScanOperation getIndexScanDeleteOperation(Index storeIndex, Table storeTable) {
1167         assertActive();
1168         try {
1169             IndexScanOperation result = clusterTransaction.getIndexScanOperationLockModeExclusiveScanFlagKeyInfo(storeIndex, storeTable);
1170             return result;
1171         } catch (ClusterJException ex) {
1172             throw new ClusterJException(
1173                     local.message("ERR_Index_Scan", storeTable.getName(), storeIndex.getName()), ex);
1174         }
1175     }
1176
1177     /** Create a table scan operation for a table.
1178      *
1179      * @param storeTable the table
1180      * @return the table scan operation
1181      */
1182     public ScanOperation getTableScanOperation(Table storeTable) {
1183         assertActive();
1184         try {
1185             ScanOperation result = clusterTransaction.getTableScanOperation(storeTable);
1186             return result;
1187         } catch (ClusterJException ex) {
1188             throw new ClusterJException(
1189                     local.message("ERR_Table_Scan", storeTable.getName()), ex);
1190         }
1191     }
1192
1193     /** Create a table scan delete operation for a table.
1194     *
1195     * @param storeTable the table
1196     * @return the table scan operation
1197     */
1198    public ScanOperation getTableScanDeleteOperation(Table storeTable) {
1199        assertActive();
1200        try {
1201            ScanOperation result = clusterTransaction.getTableScanOperationLockModeExclusiveScanFlagKeyInfo(storeTable);
1202            return result;
1203        } catch (ClusterJException ex) {
1204            throw new ClusterJException(
1205                    local.message("ERR_Table_Scan", storeTable.getName()), ex);
1206        }
1207    }
1208
1209     /** Create an index operation for an index and table.
1210      *
1211      * @param storeIndex the index
1212      * @param storeTable the table
1213      * @return the index operation
1214      */
1215     public IndexOperation getUniqueIndexOperation(Index storeIndex, Table storeTable) {
1216         assertActive();
1217         try {
1218             IndexOperation result = clusterTransaction.getUniqueIndexOperation(storeIndex, storeTable);
1219             return result;
1220         } catch (ClusterJException ex) {
1221             throw new ClusterJException(
1222                     local.message("ERR_Unique_Index", storeTable.getName(), storeIndex.getName()), ex);
1223         }
1224     }
1225
1226     /** Create a select operation for a table.
1227      *
1228      * @param storeTable the table
1229      * @return the operation
1230      */
1231     public Operation getSelectOperation(Table storeTable) {
1232         assertActive();
1233         try {
1234             Operation result = clusterTransaction.getSelectOperation(storeTable);
1235             return result;
1236         } catch (ClusterJException ex) {
1237             throw new ClusterJException(
1238                     local.message("ERR_Select", storeTable), ex);
1239         }
1240     }
1241
1242     /** Create a delete operation for a table.
1243      * 
1244      * @param storeTable the table
1245      * @return the operation
1246      */
1247     public Operation getDeleteOperation(Table storeTable) {
1248         assertActive();
1249         try {
1250             Operation result = clusterTransaction.getDeleteOperation(storeTable);
1251             return result;
1252         } catch (ClusterJException ex) {
1253             throw new ClusterJException(
1254                     local.message("ERR_Delete", storeTable), ex);
1255         }
1256     }
1257
1258     /** Create an index delete operation for an index and table.
1259     *
1260     * @param storeIndex the index
1261     * @param storeTable the table
1262     * @return the index operation
1263     */
1264    public IndexOperation getUniqueIndexDeleteOperation(Index storeIndex, Table storeTable) {
1265        assertActive();
1266        try {
1267            IndexOperation result = clusterTransaction.getUniqueIndexDeleteOperation(storeIndex, storeTable);
1268            return result;
1269        } catch (ClusterJException ex) {
1270            throw new ClusterJException(
1271                    local.message("ERR_Unique_Index_Delete", storeTable.getName(), storeIndex.getName()), ex);
1272        }
1273    }
1274
1275     public void flush() {
1276         if (logger.isDetailEnabled()) logger.detail("flush changes with changeList size: " + changeList.size());
1277         if (!changeList.isEmpty()) {
1278             for (StateManager sm: changeList) {
1279                 sm.flush(this);
1280             }
1281             changeList.clear();
1282         }
1283         // now flush changes to the back end
1284         if (clusterTransaction != null) {
1285             executeNoCommit();
1286         }
1287     }
1288
1289     public List getChangeList() {
1290         return Collections.unmodifiableList(changeList);
1291     }
1292
1293     public void persist(Object instance) {
1294         makePersistent(instance);
1295     }
1296
1297     public void remove(Object instance) {
1298         deletePersistent(instance);
1299     }
1300
1301     public void markModified(StateManager instance) {
1302         changeList.add(instance);
1303     }
1304
1305     public void setPartitionKey(Class<?> domainClass, Object key) {
1306         DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(domainClass);
1307         String tableName = domainTypeHandler.getTableName();
1308         // if transaction is enlisted, throw a user exception
1309         if (isEnlisted()) {
1310             throw new ClusterJUserException(
1311                     local.message("ERR_Set_Partition_Key_After_Enlistment", tableName));
1312         }
1313         // if a partition key has already been set, throw a user exception
1314         if (this.partitionKey != null) {
1315             throw new ClusterJUserException(
1316                     local.message("ERR_Set_Partition_Key_Twice", tableName));
1317         }
1318         ValueHandler handler = domainTypeHandler.createKeyValueHandler(key);
1319         this.partitionKey= domainTypeHandler.createPartitionKey(handler);
1320         // if a transaction has already begun, tell the cluster transaction about the key
1321         if (clusterTransaction != null) {
1322             clusterTransaction.setPartitionKey(partitionKey);
1323         }
1324     }
1325
1326     /** Mark the field in the instance as modified so it is flushed.
1327      *
1328      * @param instance the persistent instance
1329      * @param fieldName the field to mark as modified
1330      */
1331     public void markModified(Object instance, String fieldName) {
1332         DomainTypeHandler<?> domainTypeHandler = getDomainTypeHandler(instance);
1333         ValueHandler handler = domainTypeHandler.getValueHandler(instance);
1334         domainTypeHandler.objectMarkModified(handler, fieldName);
1335     }
1336
1337     /** Execute any pending operations (insert, delete, update, load)
1338      * and then perform post-execute operations (for load) via
1339      * clusterTransaction.postExecuteCallback().
1340      * @param abort abort this transaction on error
1341      * @param force force the operation to be sent immediately
1342      */
1343     public void executeNoCommit(boolean abort, boolean force) {
1344         if (clusterTransaction != null) {
1345             clusterTransaction.executeNoCommit(abort, force);
1346         }
1347     }
1348
1349     /** Execute any pending operations (insert, delete, update, load)
1350      * and then perform post-execute operations (for load) via
1351      * clusterTransaction.postExecuteCallback().
1352      * Abort the transaction on error. Force the operation to be sent immediately.
1353      */
1354     public void executeNoCommit() {
1355         executeNoCommit(true, true);
1356     }
1357
1358     public <T> QueryDomainType<T> createQueryDomainType(DomainTypeHandler<T> domainTypeHandler) {
1359         QueryBuilderImpl builder = (QueryBuilderImpl)getQueryBuilder();
1360         return builder.createQueryDefinition(domainTypeHandler);
1361     }
1362
1363     /** Return the coordinatedTransactionId of the current transaction.
1364      * The transaction might not have been enlisted.
1365      * @return the coordinatedTransactionId
1366      */
1367     public String getCoordinatedTransactionId() {
1368         return clusterTransaction.getCoordinatedTransactionId();
1369     }
1370
1371     /** Set the coordinatedTransactionId for the next transaction. This
1372      * will take effect as soon as the transaction is enlisted.
1373      * @param coordinatedTransactionId the coordinatedTransactionId
1374      */
1375     public void setCoordinatedTransactionId(String coordinatedTransactionId) {
1376         clusterTransaction.setCoordinatedTransactionId(coordinatedTransactionId);
1377     }
1378
1379     /** Set the lock mode for subsequent operations. The lock mode takes effect immediately
1380      * and continues until set again.
1381      * @param lockmode the lock mode
1382      */
1383     public void setLockMode(LockMode lockmode) {
1384         this.lockmode = lockmode;
1385         if (clusterTransaction != null) {
1386             clusterTransaction.setLockMode(lockmode);
1387         }
1388     }
1389
1390 }