2 * Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; version 2 of the License.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 package com.mysql.clusterj.tie;
20 import java.util.ArrayList;
21 import java.util.List;
23 import com.mysql.clusterj.ClusterJDatastoreException;
24 import com.mysql.clusterj.ClusterJFatalInternalException;
25 import com.mysql.clusterj.LockMode;
27 import com.mysql.clusterj.core.store.ClusterTransaction;
28 import com.mysql.clusterj.core.store.Index;
29 import com.mysql.clusterj.core.store.IndexOperation;
30 import com.mysql.clusterj.core.store.IndexScanOperation;
31 import com.mysql.clusterj.core.store.Operation;
32 import com.mysql.clusterj.core.store.PartitionKey;
33 import com.mysql.clusterj.core.store.ScanOperation;
34 import com.mysql.clusterj.core.store.Table;
36 import com.mysql.clusterj.core.util.I18NHelper;
37 import com.mysql.clusterj.core.util.Logger;
38 import com.mysql.clusterj.core.util.LoggerFactoryService;
39 import com.mysql.clusterj.tie.DbImpl.BufferManager;
41 import com.mysql.ndbjtie.ndbapi.NdbErrorConst;
42 import com.mysql.ndbjtie.ndbapi.NdbIndexOperation;
43 import com.mysql.ndbjtie.ndbapi.NdbIndexScanOperation;
44 import com.mysql.ndbjtie.ndbapi.NdbOperation;
45 import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
46 import com.mysql.ndbjtie.ndbapi.NdbTransaction;
47 import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
48 import com.mysql.ndbjtie.ndbapi.NdbDictionary.IndexConst;
49 import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst;
50 import com.mysql.ndbjtie.ndbapi.NdbOperationConst.AbortOption;
51 import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanFlag;
56 class ClusterTransactionImpl implements ClusterTransaction {
58 /** My message translator */
59 static final I18NHelper local = I18NHelper
60 .getInstance(ClusterTransactionImpl.class);
63 static final Logger logger = LoggerFactoryService.getFactory()
64 .getInstance(ClusterTransactionImpl.class);
66 protected NdbTransaction ndbTransaction;
67 private List<Runnable> postExecuteCallbacks = new ArrayList<Runnable>();
69 /** The DbImpl associated with this NdbTransaction */
72 /** The partition key; by default it doesn't do anything */
73 protected PartitionKeyImpl partitionKey = PartitionKeyImpl.getInstance();
75 /** The NdbDictionary */
76 private Dictionary ndbDictionary;
78 /** The coordinated transaction identifier */
79 private String coordinatedTransactionId = null;
81 /** Is getCoordinatedTransactionId supported? True until proven false. */
82 private static boolean supportsGetCoordinatedTransactionId = true;
84 /** Lock mode for find operations */
85 private int findLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
87 /** Lock mode for index lookup operations */
88 private int lookupLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
90 /** Lock mode for index scan operations */
91 private int indexScanLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
93 /** Lock mode for table scan operations */
94 private int tableScanLockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
96 /** Autocommit flag if we are in an autocommit transaction */
97 private boolean autocommit = false;
99 /** Autocommitted flag if we autocommitted early */
100 private boolean autocommitted = false;
102 /** The transaction id to join this transaction to */
103 private String joinTransactionId;
105 private BufferManager bufferManager;
107 public ClusterTransactionImpl(DbImpl db, Dictionary ndbDictionary, String joinTransactionId) {
109 this.ndbDictionary = ndbDictionary;
110 this.joinTransactionId = joinTransactionId;
111 this.bufferManager = db.getBufferManager();
114 public void close() {
115 if (ndbTransaction != null) {
116 ndbTransaction.close();
117 ndbTransaction = null;
121 public void executeCommit() {
122 executeCommit(true, true);
125 public boolean isEnlisted() {
126 return ndbTransaction != null;
130 * Enlist the ndb transaction if not already enlisted.
131 * If the coordinated transaction id is set, join an existing transaction.
132 * Otherwise, use the partition key to enlist the transaction.
134 private void enlist() {
135 if (ndbTransaction == null) {
136 if (coordinatedTransactionId != null) {
137 ndbTransaction = db.joinTransaction(coordinatedTransactionId);
139 ndbTransaction = partitionKey.enlist(db);
140 getCoordinatedTransactionId(db);
145 public void executeCommit(boolean abort, boolean force) {
146 if (logger.isTraceEnabled()) logger.trace("");
147 // nothing to do if no ndbTransaction was ever enlisted or already autocommitted
148 if (isEnlisted() && !autocommitted) {
149 handlePendingPostExecuteCallbacks();
150 int abortOption = abort?AbortOption.AbortOnError:AbortOption.AO_IgnoreError;
151 int forceOption = force?1:0;
152 int returnCode = ndbTransaction.execute(NdbTransaction.ExecType.Commit,
153 abortOption, forceOption);
154 handleError(returnCode, ndbTransaction);
156 autocommitted = false;
160 public void executeNoCommit() {
161 executeNoCommit(true, true);
164 public void executeNoCommit(boolean abort, boolean force) {
165 if (logger.isTraceEnabled()) logger.trace("");
167 // nothing to do if no ndbTransaction was ever enlisted
170 if (autocommit && postExecuteCallbacks.size() == 0) {
171 // optimization to commit now because no blob columns
172 executeCommit(abort, force);
173 autocommitted = true;
176 int abortOption = abort?AbortOption.AbortOnError:AbortOption.AO_IgnoreError;
177 int forceOption = force?1:0;
178 int returnCode = ndbTransaction.execute(NdbTransaction.ExecType.NoCommit,
179 abortOption, forceOption);
180 handleError(returnCode, ndbTransaction);
181 performPostExecuteCallbacks();
184 public void executeRollback() {
186 // nothing to do if no ndbTransaction was ever enlisted
189 int abortOption = AbortOption.AO_IgnoreError;
191 int returnCode = ndbTransaction.execute(NdbTransaction.ExecType.Rollback,
192 abortOption, forceOption);
193 handleError(returnCode, ndbTransaction);
196 public Operation getDeleteOperation(Table storeTable) {
198 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
199 handleError(ndbTable, ndbDictionary);
200 NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
201 handleError(ndbOperation, ndbTransaction);
202 int returnCode = ndbOperation.deleteTuple();
203 handleError(returnCode, ndbTransaction);
204 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());;
205 return new OperationImpl(ndbOperation, this);
208 public Operation getInsertOperation(Table storeTable) {
210 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
211 handleError(ndbTable, ndbDictionary);
212 NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
213 handleError(ndbOperation, ndbTransaction);
214 int returnCode = ndbOperation.insertTuple();
215 handleError(returnCode, ndbTransaction);
216 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
217 return new OperationImpl(ndbOperation, this);
220 public IndexScanOperation getIndexScanOperation(Index storeIndex, Table storeTable) {
222 IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
223 handleError(ndbIndex, ndbDictionary);
224 NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
225 handleError(ndbOperation, ndbTransaction);
226 int lockMode = indexScanLockMode;
230 int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
231 handleError(returnCode, ndbTransaction);
232 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
233 return new IndexScanOperationImpl(storeTable, ndbOperation, this);
236 public IndexScanOperation getIndexScanOperationMultiRange(Index storeIndex, Table storeTable) {
238 IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
239 handleError(ndbIndex, ndbDictionary);
240 NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
241 handleError(ndbOperation, ndbTransaction);
242 int lockMode = indexScanLockMode;
243 int scanFlags = ScanFlag.SF_MultiRange;
246 int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
247 handleError(returnCode, ndbTransaction);
248 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
249 return new IndexScanOperationImpl(storeTable, ndbOperation, this);
252 public IndexScanOperation getIndexScanOperationLockModeExclusiveScanFlagKeyInfo(Index storeIndex, Table storeTable) {
254 IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
255 handleError(ndbIndex, ndbDictionary);
256 NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
257 handleError(ndbOperation, ndbTransaction);
258 int lockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive;
259 int scanFlags = ScanFlag.SF_KeyInfo;
262 int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
263 handleError(returnCode, ndbTransaction);
264 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
265 return new IndexScanOperationImpl(storeTable, ndbOperation, this);
268 public Operation getSelectOperation(Table storeTable) {
270 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
271 handleError(ndbTable, ndbDictionary);
272 NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
273 handleError(ndbOperation, ndbTransaction);
274 int lockMode = findLockMode;
275 int returnCode = ndbOperation.readTuple(lockMode);
276 handleError(returnCode, ndbTransaction);
277 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
278 return new OperationImpl(storeTable, ndbOperation, this);
281 public ScanOperation getTableScanOperation(Table storeTable) {
283 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
284 handleError(ndbTable, ndbDictionary);
285 NdbScanOperation ndbScanOperation = ndbTransaction.getNdbScanOperation(ndbTable);
286 handleError(ndbScanOperation, ndbTransaction);
287 int lockMode = tableScanLockMode;
291 int returnCode = ndbScanOperation.readTuples(lockMode, scanFlags, parallel, batch);
292 handleError(returnCode, ndbTransaction);
293 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
294 return new ScanOperationImpl(storeTable, ndbScanOperation, this);
297 public ScanOperation getTableScanOperationLockModeExclusiveScanFlagKeyInfo(Table storeTable) {
299 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
300 handleError(ndbTable, ndbDictionary);
301 NdbScanOperation ndbScanOperation = ndbTransaction.getNdbScanOperation(ndbTable);
302 handleError(ndbScanOperation, ndbTransaction);
303 int lockMode = com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive;
304 int scanFlags = ScanFlag.SF_KeyInfo;
307 int returnCode = ndbScanOperation.readTuples(lockMode, scanFlags, parallel, batch);
308 handleError(returnCode, ndbTransaction);
309 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
310 return new ScanOperationImpl(storeTable, ndbScanOperation, this);
313 public IndexOperation getUniqueIndexOperation(Index storeIndex, Table storeTable) {
315 IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
316 handleError(ndbIndex, ndbDictionary);
317 NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex);
318 handleError(ndbIndexOperation, ndbTransaction);
319 int lockMode = lookupLockMode;
320 int returnCode = ndbIndexOperation.readTuple(lockMode);
321 handleError(returnCode, ndbTransaction);
322 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
323 return new IndexOperationImpl(storeTable, ndbIndexOperation, this);
326 public IndexOperation getUniqueIndexDeleteOperation(Index storeIndex, Table storeTable) {
328 IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
329 handleError(ndbIndex, ndbDictionary);
330 NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex);
331 handleError(ndbIndexOperation, ndbTransaction);
332 int returnCode = ndbIndexOperation.deleteTuple();
333 handleError(returnCode, ndbTransaction);
334 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName() + " index: " + storeIndex.getName());
335 return new IndexOperationImpl(storeTable, ndbIndexOperation, this);
338 public Operation getUpdateOperation(Table storeTable) {
340 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
341 handleError(ndbTable, ndbDictionary);
342 NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
343 handleError(ndbOperation, ndbTransaction);
344 int returnCode = ndbOperation.updateTuple();
345 handleError(returnCode, ndbTransaction);
346 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
347 return new OperationImpl(storeTable, ndbOperation, this);
350 public Operation getWriteOperation(Table storeTable) {
352 TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
353 handleError(ndbTable, ndbDictionary);
354 NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
355 handleError(ndbOperation, ndbTransaction);
356 int returnCode = ndbOperation.writeTuple();
357 handleError(returnCode, ndbTransaction);
358 if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
359 return new OperationImpl(storeTable, ndbOperation, this);
362 public void postExecuteCallback(Runnable callback) {
363 postExecuteCallbacks.add(callback);
366 private void clearPostExecuteCallbacks() {
367 postExecuteCallbacks.clear();
370 private void handlePendingPostExecuteCallbacks() {
371 // if any pending postExecuteCallbacks, flush via executeNoCommit
372 if (!postExecuteCallbacks.isEmpty()) {
377 private void performPostExecuteCallbacks() {
378 // TODO this will abort on the first postExecute failure
379 // TODO should this set rollback only?
381 for (Runnable runnable: postExecuteCallbacks) {
384 } catch (Throwable t) {
385 throw new ClusterJDatastoreException(
386 local.message("ERR_Datastore"), t);
390 clearPostExecuteCallbacks();
394 /** Handle errors from ScanOperation where the error returnCode is -1.
396 * @param returnCode the return code from the nextResult operation
398 protected void handleError(int returnCode) {
399 if (returnCode == -1) {
400 NdbErrorConst ndbError = ndbTransaction.getNdbError();
401 String detail = db.getNdbErrorDetail(ndbError);
402 Utility.throwError(returnCode, ndbError, detail);
406 protected void handleError(int returnCode, NdbTransaction ndbTransaction) {
407 if (returnCode == 0) {
410 NdbErrorConst ndbError = ndbTransaction.getNdbError();
411 if (ndbError.code() == 0) {
414 String detail = db.getNdbErrorDetail(ndbError);
415 Utility.throwError(returnCode, ndbError, detail);
419 protected void handleError(Object object, NdbTransaction ndbTransaction) {
420 if (object != null) {
423 NdbErrorConst ndbError = ndbTransaction.getNdbError();
424 String detail = db.getNdbErrorDetail(ndbError);
425 Utility.throwError(null, ndbError, detail);
429 protected void handleError(Object object, Dictionary ndbDictionary) {
430 if (object != null) {
433 NdbErrorConst ndbError = ndbDictionary.getNdbError();
434 String detail = db.getNdbErrorDetail(ndbError);
435 Utility.throwError(null, ndbError, detail);
439 public void setPartitionKey(PartitionKey partitionKey) {
440 if (partitionKey == null) {
441 throw new ClusterJFatalInternalException(
442 local.message("ERR_Partition_Key_Null"));
444 this.partitionKey = (PartitionKeyImpl)partitionKey;
447 public String getCoordinatedTransactionId() {
448 return coordinatedTransactionId;
451 /** Get the coordinated transaction id if possible and update the field with
452 * the id. If running on a back level system (prior to 7.1.6 for the ndbjtie
453 * and native library) the ndbTransaction.getCoordinatedTransactionId() method
454 * will throw an Error of some kind (java.lang.NoSuchMethodError or
455 * java.lang.UnsatisfiedLinkError) and this will cause this instance
456 * (and any other instance with access to the new value of the static variable
457 * supportsGetCoordinatedTransactionId) to never try again.
458 * @param db the DbImpl instance
460 private void getCoordinatedTransactionId(DbImpl db) {
462 if (supportsGetCoordinatedTransactionId) {
463 // not implemented quite yet...
464 // ByteBuffer buffer = db.getCoordinatedTransactionIdBuffer();
465 // coordinatedTransactionId = ndbTransaction.
466 // getCoordinatedTransactionId(buffer, buffer.capacity());
467 if (logger.isDetailEnabled()) logger.detail("CoordinatedTransactionId: "
468 + coordinatedTransactionId);
469 throw new ClusterJFatalInternalException("Not Implemented");
471 } catch (Throwable t) {
472 // oops, don't do this again
473 supportsGetCoordinatedTransactionId = false;
477 public void setCoordinatedTransactionId(String coordinatedTransactionId) {
478 this.coordinatedTransactionId = coordinatedTransactionId;
481 public void setLockMode(LockMode lockmode) {
482 findLockMode = translateLockMode(lockmode);
483 lookupLockMode = findLockMode;
484 indexScanLockMode = findLockMode;
485 tableScanLockMode = findLockMode;
488 private int translateLockMode(LockMode lockmode) {
491 return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead;
493 return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Read;
495 return com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_Exclusive;
497 throw new ClusterJFatalInternalException(local.message("ERR_Unknown_Lock_Mode", lockmode));
501 public void setAutocommit(boolean autocommit) {
502 this.autocommit = autocommit;
505 public BufferManager getBufferManager() {
506 return bufferManager;