]> review.fuel-infra Code Review - packages/trusty/mysql-wsrep-5.6.git/blob
ebc4e721b74d19234d71bc08a063ed79f7e3cfc5
[packages/trusty/mysql-wsrep-5.6.git] /
1 /*
2  *  Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License as published by
6  *  the Free Software Foundation; version 2 of the License.
7  *
8  *  This program is distributed in the hope that it will be useful,
9  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  *  GNU General Public License for more details.
12  *
13  *  You should have received a copy of the GNU General Public License
14  *  along with this program; if not, write to the Free Software
15  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
16  */
17
18 package com.mysql.clusterj.tie;
19
20 import java.util.ArrayList;
21 import java.util.List;
22
23 import com.mysql.clusterj.ClusterJDatastoreException;
24 import com.mysql.clusterj.ClusterJFatalInternalException;
25 import com.mysql.clusterj.LockMode;
26
27 import com.mysql.clusterj.core.store.ClusterTransaction;
28 import com.mysql.clusterj.core.store.Index;
29 import com.mysql.clusterj.core.store.IndexOperation;
30 import com.mysql.clusterj.core.store.IndexScanOperation;
31 import com.mysql.clusterj.core.store.Operation;
32 import com.mysql.clusterj.core.store.PartitionKey;
33 import com.mysql.clusterj.core.store.ScanOperation;
34 import com.mysql.clusterj.core.store.Table;
35
36 import com.mysql.clusterj.core.util.I18NHelper;
37 import com.mysql.clusterj.core.util.Logger;
38 import com.mysql.clusterj.core.util.LoggerFactoryService;
39 import com.mysql.clusterj.tie.DbImpl.BufferManager;
40
41 import com.mysql.ndbjtie.ndbapi.NdbErrorConst;
42 import com.mysql.ndbjtie.ndbapi.NdbIndexOperation;
43 import com.mysql.ndbjtie.ndbapi.NdbIndexScanOperation;
44 import com.mysql.ndbjtie.ndbapi.NdbOperation;
45 import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
46 import com.mysql.ndbjtie.ndbapi.NdbTransaction;
47 import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
48 import com.mysql.ndbjtie.ndbapi.NdbDictionary.IndexConst;
49 import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst;
50 import com.mysql.ndbjtie.ndbapi.NdbOperationConst.AbortOption;
51 import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanFlag;
52
53 /**
54  *
55  */
56 class ClusterTransactionImpl implements ClusterTransaction {
57
58     /** My message translator */
59     static final I18NHelper local = I18NHelper
60             .getInstance(ClusterTransactionImpl.class);
61
62     /** My logger */
63     static final Logger logger = LoggerFactoryService.getFactory()
64             .getInstance(ClusterTransactionImpl.class);
65
66     protected NdbTransaction ndbTransaction;
67     private List<Runnable> postExecuteCallbacks = new ArrayList<Runnable>();
68
69     /** The DbImpl associated with this NdbTransaction */
70     protected DbImpl db;
71
72     /** The partition key; by default it doesn't do anything */
73     protected PartitionKeyImpl partitionKey = PartitionKeyImpl.getInstance();
74
75     /** The NdbDictionary */
76     private Dictionary ndbDictionary;
77
78     /** The coordinated transaction identifier */
79     private String coordinatedTransactionId = null;
80
81     /** Is getCoordinatedTransactionId supported? True until proven false. */
82     private static boolean supportsGetCoordinatedTransactionId = true;
83
84     /** Lock mode for find operations */
85     private int findLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
86
87     /** Lock mode for index lookup operations */
88     private int lookupLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
89
90     /** Lock mode for index scan operations */
91     private int indexScanLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
92
93     /** Lock mode for table scan operations */
94     private int tableScanLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
95
96     /** Autocommit flag if we are in an autocommit transaction */
97     private boolean autocommit = false;
98
99     /** Autocommitted flag if we autocommitted early */
100     private boolean autocommitted = false;
101
102     /** The transaction id to join this transaction to */
103     private String joinTransactionId;
104
105     private BufferManager bufferManager;
106
107     public ClusterTransactionImpl(DbImpl db, Dictionary ndbDictionary, String joinTransactionId) {
108         this.db = db;
109         this.ndbDictionary = ndbDictionary;
110         this.joinTransactionId = joinTransactionId;
111         this.bufferManager = db.getBufferManager();
112     }
113
114     public void close() {
115         if (ndbTransaction != null) {
116             ndbTransaction.close();
117             ndbTransaction = null;
118         }
119     }
120
121     public void executeCommit() {
122         executeCommit(true, true);
123     }
124
125     public boolean isEnlisted() {
126         return ndbTransaction != null;
127     }
128
129     /**
130      * Enlist the ndb transaction if not already enlisted.
131      * If the coordinated transaction id is set, join an existing transaction.
132      * Otherwise, use the partition key to enlist the transaction.
133      */
134     private void enlist() {
135         if (ndbTransaction == null) {
136             if (coordinatedTransactionId != null) {
137                 ndbTransaction = db.joinTransaction(coordinatedTransactionId);
138             } else {
139                 ndbTransaction = partitionKey.enlist(db);
140                 getCoordinatedTransactionId(db);
141             }
142         }
143     }
144
145     public void executeCommit(boolean abort, boolean force) {
146         if (logger.isTraceEnabled()) logger.trace("");
147         // nothing to do if no ndbTransaction was ever enlisted or already autocommitted
148         if (isEnlisted() && !autocommitted) {
149             handlePendingPostExecuteCallbacks();
150             int abortOption = abort?AbortOption.AbortOnError:AbortOption.AO_IgnoreError;
151             int forceOption = force?1:0;
152             int returnCode = ndbTransaction.execute(NdbTransaction.ExecType.Commit,
153                     abortOption, forceOption);
154             handleError(returnCode, ndbTransaction);
155         }
156         autocommitted = false;
157         autocommit = false;
158     }
159
160     public void executeNoCommit() {
161         executeNoCommit(true, true);
162     }
163
164     public void executeNoCommit(boolean abort, boolean force) {
165         if (logger.isTraceEnabled()) logger.trace("");
166         if (!isEnlisted()) {
167             // nothing to do if no ndbTransaction was ever enlisted
168             return;
169         }
170         if (autocommit && postExecuteCallbacks.size() == 0) {
171             // optimization to commit now because no blob columns
172             executeCommit(abort, force);
173             autocommitted = true;
174             return;
175         }
176         int abortOption = abort?AbortOption.AbortOnError:AbortOption.AO_IgnoreError;
177         int forceOption = force?1:0;
178         int returnCode = ndbTransaction.execute(NdbTransaction.ExecType.NoCommit,
179                 abortOption, forceOption);
180         handleError(returnCode, ndbTransaction);
181         performPostExecuteCallbacks();
182     }
183
184     public void executeRollback() {
185         if (!isEnlisted()) {
186             // nothing to do if no ndbTransaction was ever enlisted
187             return;
188         }
189         int abortOption = AbortOption.AO_IgnoreError;
190         int forceOption = 1;
191         int returnCode = ndbTransaction.execute(NdbTransaction.ExecType.Rollback,
192                 abortOption, forceOption);
193         handleError(returnCode, ndbTransaction);
194     }
195
196     public Operation getDeleteOperation(Table storeTable) {
197         enlist();
198         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
199         handleError(ndbTable, ndbDictionary);
200         NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
201         handleError(ndbOperation, ndbTransaction);
202         int returnCode = ndbOperation.deleteTuple();
203         handleError(returnCode, ndbTransaction);
204         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());;
205         return new OperationImpl(ndbOperation, this);
206     }
207
208     public Operation getInsertOperation(Table storeTable) {
209         enlist();
210         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
211         handleError(ndbTable, ndbDictionary);
212         NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
213         handleError(ndbOperation, ndbTransaction);
214         int returnCode = ndbOperation.insertTuple();
215         handleError(returnCode, ndbTransaction);
216         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
217         return new OperationImpl(ndbOperation, this);
218     }
219
220     public IndexScanOperation getIndexScanOperation(Index storeIndex, Table storeTable) {
221         enlist();
222         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
223         handleError(ndbIndex, ndbDictionary);
224         NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
225         handleError(ndbOperation, ndbTransaction);
226         int lockMode = indexScanLockMode;
227         int scanFlags = 0;
228         int parallel = 0;
229         int batch = 0;
230         int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
231         handleError(returnCode, ndbTransaction);
232         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
233         return new IndexScanOperationImpl(storeTable, ndbOperation, this);
234     }
235
236     public IndexScanOperation getIndexScanOperationMultiRange(Index storeIndex, Table storeTable) {
237         enlist();
238         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
239         handleError(ndbIndex, ndbDictionary);
240         NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
241         handleError(ndbOperation, ndbTransaction);
242         int lockMode = indexScanLockMode;
243         int scanFlags = ScanFlag.SF_MultiRange;
244         int parallel = 0;
245         int batch = 0;
246         int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
247         handleError(returnCode, ndbTransaction);
248         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
249         return new IndexScanOperationImpl(storeTable, ndbOperation, this);
250     }
251
252     public IndexScanOperation getIndexScanOperationLockModeExclusiveScanFlagKeyInfo(Index storeIndex, Table storeTable) {
253         enlist();
254         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
255         handleError(ndbIndex, ndbDictionary);
256         NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
257         handleError(ndbOperation, ndbTransaction);
258         int lockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive;
259         int scanFlags = ScanFlag.SF_KeyInfo;
260         int parallel = 0;
261         int batch = 0;
262         int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
263         handleError(returnCode, ndbTransaction);
264         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
265         return new IndexScanOperationImpl(storeTable, ndbOperation, this);
266     }
267
268     public Operation getSelectOperation(Table storeTable) {
269         enlist();
270         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
271         handleError(ndbTable, ndbDictionary);
272         NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
273         handleError(ndbOperation, ndbTransaction);
274         int lockMode = findLockMode;
275         int returnCode = ndbOperation.readTuple(lockMode);
276         handleError(returnCode, ndbTransaction);
277         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
278         return new OperationImpl(storeTable, ndbOperation, this);
279     }
280
281     public ScanOperation getTableScanOperation(Table storeTable) {
282         enlist();
283         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
284         handleError(ndbTable, ndbDictionary);
285         NdbScanOperation ndbScanOperation = ndbTransaction.getNdbScanOperation(ndbTable);
286         handleError(ndbScanOperation, ndbTransaction);
287         int lockMode = tableScanLockMode;
288         int scanFlags = 0;
289         int parallel = 0;
290         int batch = 0;
291         int returnCode = ndbScanOperation.readTuples(lockMode, scanFlags, parallel, batch);
292         handleError(returnCode, ndbTransaction);
293         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
294         return new ScanOperationImpl(storeTable, ndbScanOperation, this);
295     }
296
297     public ScanOperation getTableScanOperationLockModeExclusiveScanFlagKeyInfo(Table storeTable) {
298         enlist();
299         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
300         handleError(ndbTable, ndbDictionary);
301         NdbScanOperation ndbScanOperation = ndbTransaction.getNdbScanOperation(ndbTable);
302         handleError(ndbScanOperation, ndbTransaction);
303         int lockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive;
304         int scanFlags = ScanFlag.SF_KeyInfo;
305         int parallel = 0;
306         int batch = 0;
307         int returnCode = ndbScanOperation.readTuples(lockMode, scanFlags, parallel, batch);
308         handleError(returnCode, ndbTransaction);
309         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
310         return new ScanOperationImpl(storeTable, ndbScanOperation, this);
311     }
312
313     public IndexOperation getUniqueIndexOperation(Index storeIndex, Table storeTable) {
314         enlist();
315         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
316         handleError(ndbIndex, ndbDictionary);
317         NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex);
318         handleError(ndbIndexOperation, ndbTransaction);
319         int lockMode = lookupLockMode;
320         int returnCode = ndbIndexOperation.readTuple(lockMode);
321         handleError(returnCode, ndbTransaction);
322         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
323         return new IndexOperationImpl(storeTable, ndbIndexOperation, this);
324     }
325
326     public IndexOperation getUniqueIndexDeleteOperation(Index storeIndex, Table storeTable) {
327         enlist();
328         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
329         handleError(ndbIndex, ndbDictionary);
330         NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex);
331         handleError(ndbIndexOperation, ndbTransaction);
332         int returnCode = ndbIndexOperation.deleteTuple();
333         handleError(returnCode, ndbTransaction);
334         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
335         return new IndexOperationImpl(storeTable, ndbIndexOperation, this);
336     }
337
338     public Operation getUpdateOperation(Table storeTable) {
339         enlist();
340         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
341         handleError(ndbTable, ndbDictionary);
342         NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
343         handleError(ndbOperation, ndbTransaction);
344         int returnCode = ndbOperation.updateTuple();
345         handleError(returnCode, ndbTransaction);
346         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
347         return new OperationImpl(storeTable, ndbOperation, this);
348     }
349
350     public Operation getWriteOperation(Table storeTable) {
351         enlist();
352         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
353         handleError(ndbTable, ndbDictionary);
354         NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
355         handleError(ndbOperation, ndbTransaction);
356         int returnCode = ndbOperation.writeTuple();
357         handleError(returnCode, ndbTransaction);
358         if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
359         return new OperationImpl(storeTable, ndbOperation, this);
360     }
361
362     public void postExecuteCallback(Runnable callback) {
363         postExecuteCallbacks.add(callback);
364     }
365
366     private void clearPostExecuteCallbacks() {
367         postExecuteCallbacks.clear();
368     }
369
370     private void handlePendingPostExecuteCallbacks() {
371         // if any pending postExecuteCallbacks, flush via executeNoCommit
372         if (!postExecuteCallbacks.isEmpty()) {
373             executeNoCommit();
374         }
375     }
376
377     private void performPostExecuteCallbacks() {
378         // TODO this will abort on the first postExecute failure
379         // TODO should this set rollback only?
380         try {
381             for (Runnable runnable: postExecuteCallbacks) {
382                 try {
383                     runnable.run();
384                 } catch (Throwable t) {
385                     throw new ClusterJDatastoreException(
386                             local.message("ERR_Datastore"), t);
387                 }
388             }
389         } finally {
390             clearPostExecuteCallbacks();
391         }
392     }
393
394     /** Handle errors from ScanOperation where the error returnCode is -1.
395      * 
396      * @param returnCode the return code from the nextResult operation
397      */
398     protected void handleError(int returnCode) {
399         if (returnCode == -1) {
400             NdbErrorConst ndbError = ndbTransaction.getNdbError();
401             String detail = db.getNdbErrorDetail(ndbError);
402             Utility.throwError(returnCode, ndbError, detail);
403         }
404     }
405
406     protected void handleError(int returnCode, NdbTransaction ndbTransaction) {
407         if (returnCode == 0) {
408             return;
409         } else {
410             NdbErrorConst ndbError = ndbTransaction.getNdbError();
411             if (ndbError.code() == 0) {
412                 return;
413             }
414             String detail = db.getNdbErrorDetail(ndbError);
415             Utility.throwError(returnCode, ndbError, detail);
416         }
417     }
418
419     protected void handleError(Object object, NdbTransaction ndbTransaction) {
420         if (object != null) {
421             return;
422         } else {
423             NdbErrorConst ndbError = ndbTransaction.getNdbError();
424             String detail = db.getNdbErrorDetail(ndbError);
425             Utility.throwError(null, ndbError, detail);
426         }
427     }
428
429     protected void handleError(Object object, Dictionary ndbDictionary) {
430         if (object != null) {
431             return;
432         } else {
433             NdbErrorConst ndbError = ndbDictionary.getNdbError();
434             String detail = db.getNdbErrorDetail(ndbError);
435             Utility.throwError(null, ndbError, detail);
436         }
437     }
438
439     public void setPartitionKey(PartitionKey partitionKey) {
440         if (partitionKey == null) {
441             throw new ClusterJFatalInternalException(
442                     local.message("ERR_Partition_Key_Null"));
443         }
444         this.partitionKey = (PartitionKeyImpl)partitionKey;
445     }
446
447     public String getCoordinatedTransactionId() {
448         return coordinatedTransactionId;
449     }
450
451     /** Get the coordinated transaction id if possible and update the field with
452      * the id. If running on a back level system (prior to 7.1.6 for the ndbjtie
453      * and native library) the ndbTransaction.getCoordinatedTransactionId() method
454      * will throw an Error of some kind (java.lang.NoSuchMethodError or
455      * java.lang.UnsatisfiedLinkError) and this will cause this instance
456      * (and any other instance with access to the new value of the static variable 
457      * supportsGetCoordinatedTransactionId) to never try again.
458      * @param db the DbImpl instance
459      */
460     private void getCoordinatedTransactionId(DbImpl db) {
461         try {
462             if (supportsGetCoordinatedTransactionId) {
463 // not implemented quite yet...
464 //                ByteBuffer buffer = db.getCoordinatedTransactionIdBuffer();
465 //                coordinatedTransactionId = ndbTransaction.
466 //                        getCoordinatedTransactionId(buffer, buffer.capacity());
467                 if (logger.isDetailEnabled()) logger.detail("CoordinatedTransactionId: "
468                         + coordinatedTransactionId);
469                 throw new ClusterJFatalInternalException("Not Implemented");
470             }
471         } catch (Throwable t) {
472             // oops, don't do this again
473             supportsGetCoordinatedTransactionId = false;
474         }
475     }
476
477     public void setCoordinatedTransactionId(String coordinatedTransactionId) {
478         this.coordinatedTransactionId = coordinatedTransactionId;
479     }
480
481     public void setLockMode(LockMode lockmode) {
482         findLockMode = translateLockMode(lockmode);
483         lookupLockMode = findLockMode;
484         indexScanLockMode = findLockMode;
485         tableScanLockMode = findLockMode;
486     }
487
488     private int translateLockMode(LockMode lockmode) {
489         switch(lockmode) {
490             case READ_COMMITTED:
491                 return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
492             case SHARED:
493                 return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Read;
494             case EXCLUSIVE:
495                 return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive;
496             default:
497                 throw new ClusterJFatalInternalException(local.message("ERR_Unknown_Lock_Mode", lockmode));
498         }
499     }
500
501     public void setAutocommit(boolean autocommit) {
502         this.autocommit = autocommit;
503     }
504
505     public BufferManager getBufferManager() {
506         return bufferManager;
507     }
508
509 }