1 /* -*- mode: java; c-basic-offset: 4; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=4:tabstop=4:smarttab:
4 * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; version 2 of the License.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 package com.mysql.cluster.benchmark.tws;
22 import com.mysql.ndbjtie.ndbapi.Ndb_cluster_connection;
23 import com.mysql.ndbjtie.ndbapi.Ndb;
24 import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
25 import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst;
26 import com.mysql.ndbjtie.ndbapi.NdbError;
27 import com.mysql.ndbjtie.ndbapi.NdbTransaction;
28 import com.mysql.ndbjtie.ndbapi.NdbOperation;
30 import java.nio.ByteBuffer;
31 import java.nio.ByteOrder;
32 import java.nio.CharBuffer;
33 import java.nio.charset.Charset;
34 import java.nio.charset.CodingErrorAction;
35 import java.nio.charset.CharsetEncoder;
36 import java.nio.charset.CharsetDecoder;
37 import java.nio.charset.CoderResult;
38 import java.nio.charset.CharacterCodingException;
41 class NdbjtieLoad extends TwsLoad {
44 protected String mgmdConnect;
45 protected String catalog;
46 protected String schema;
49 protected Ndb_cluster_connection mgmd;
51 protected NdbTransaction tx;
52 protected int ndbOpLockMode;
54 // NDB JTie metadata resources
55 protected TableConst table_t0;
57 // NDB JTie data resources
58 protected ByteBuffer bb;
60 // NDB JTie static resources
61 static protected final ByteOrder bo = ByteOrder.nativeOrder();
62 static protected final Charset cs;
63 static protected final CharsetEncoder csEncoder;
64 static protected final CharsetDecoder csDecoder;
66 // default charset for mysql is "ISO-8859-1" ("US-ASCII", "UTF-8")
67 cs = Charset.forName("ISO-8859-1");
68 csDecoder = cs.newDecoder();
69 csEncoder = cs.newEncoder();
71 // report any unclean transcodings
73 .onMalformedInput(CodingErrorAction.REPORT)
74 .onUnmappableCharacter(CodingErrorAction.REPORT);
76 .onMalformedInput(CodingErrorAction.REPORT)
77 .onUnmappableCharacter(CodingErrorAction.REPORT);
80 public NdbjtieLoad(TwsDriver driver) {
84 // ----------------------------------------------------------------------
85 // NDB Base intializers/finalizers
86 // ----------------------------------------------------------------------
88 protected void initProperties() {
90 out.print("setting ndb properties ...");
92 final StringBuilder msg = new StringBuilder();
93 final String eol = System.getProperty("line.separator");
95 // the hostname and port number of NDB mgmd
96 mgmdConnect = driver.props.getProperty("ndb.mgmdConnect", "localhost");
97 assert mgmdConnect != null;
100 catalog = driver.props.getProperty("ndb.catalog", "crunddb");
101 assert catalog != null;
104 schema = driver.props.getProperty("ndb.schema", "def");
105 assert schema != null;
107 if (msg.length() == 0) {
108 out.println(" [ok]");
111 out.print(msg.toString());
114 // have mgmdConnect initialized first
115 descr = "ndbjtie(" + mgmdConnect + ")";
118 protected void printProperties() {
119 out.println("ndb.mgmdConnect: \"" + mgmdConnect + "\"");
120 out.println("ndb.catalog: \"" + catalog + "\"");
121 out.println("ndb.schema: \"" + schema + "\"");
124 public void init() throws Exception {
128 // load native library (better diagnostics doing it explicitely)
129 Driver.loadSystemLibrary("ndbclient");
131 // instantiate NDB cluster singleton
132 out.print("creating cluster connection ...");
134 mgmd = Ndb_cluster_connection.create(mgmdConnect);
136 out.println(" [ok: mgmd@" + mgmdConnect + "]");
139 public void close() throws Exception {
143 out.print("closing cluster connection ...");
145 Ndb_cluster_connection.delete(mgmd);
147 out.println(" [ok]");
152 // ----------------------------------------------------------------------
153 // NDB JTie datastore operations
154 // ----------------------------------------------------------------------
156 public void initConnection() {
157 assert (mgmd != null);
158 assert (ndb == null);
161 out.println("initializing ndbjtie resources ...");
163 // connect to cluster management node (ndb_mgmd)
164 out.print("connecting to cluster ...");
166 final int retries = 0; // retries (< 0 = indefinitely)
167 final int delay = 0; // seconds to wait after retry
168 final int verbose = 1; // print report of progess
169 // 0 = success, 1 = recoverable error, -1 = non-recoverable error
170 if (mgmd.connect(retries, delay, verbose) != 0) {
171 final String msg = ("mgmd@" + mgmdConnect
172 + " was not ready within "
173 + (retries * delay) + "s.");
175 throw new RuntimeException("!!! " + msg);
177 out.println(" [ok: " + mgmdConnect + "]");
179 // connect to data nodes (ndbds)
180 out.print("waiting for data nodes ...");
182 final int initial_wait = 10; // secs to wait until first node detected
183 final int final_wait = 0; // secs to wait after first node detected
184 // returns: 0 all nodes live, > 0 at least one node live, < 0 error
185 if (mgmd.wait_until_ready(initial_wait, final_wait) < 0) {
186 final String msg = ("data nodes were not ready within "
187 + (initial_wait + final_wait) + "s.");
189 throw new RuntimeException(msg);
191 out.println(" [ok]");
193 // connect to database
194 out.print("connecting to database ...");
195 ndb = Ndb.create(mgmd, catalog, schema);
196 final int max_no_tx = 10; // maximum number of parallel tx (<=1024)
197 // note each scan or index scan operation uses one extra transaction
198 if (ndb.init(max_no_tx) != 0) {
199 String msg = "Error caught: " + ndb.getNdbError().message();
200 throw new RuntimeException(msg);
202 out.println(" [ok: " + catalog + "." + schema + "]");
204 metaData = new MetaData(ndb);
206 metaData.initNdbjtieModel();
208 initNdbjtieBuffers();
210 out.print("using lock mode for reads ...");
213 switch (driver.lockMode) {
215 ndbOpLockMode = NdbOperation.LockMode.LM_CommittedRead;
216 lm = "LM_CommittedRead";
219 ndbOpLockMode = NdbOperation.LockMode.LM_Read;
223 ndbOpLockMode = NdbOperation.LockMode.LM_Exclusive;
227 ndbOpLockMode = NdbOperation.LockMode.LM_CommittedRead;
228 lm = "LM_CommittedRead";
231 out.println(" [ok: " + lm + "]");
234 public void closeConnection() {
235 assert (ndb != null);
238 out.println("releasing ndbjtie resources ...");
240 closeNdbjtieBuffers();
244 out.print("closing database connection ...");
248 out.println(" [ok]");
252 protected void closeNdbjtieModel() {
253 assert (ndb != null);
254 assert (table_t0 != null);
255 assert (metaData.getColumn(0) != null);
257 out.print("clearing metadata cache...");
263 out.println(" [ok]");
266 public void initNdbjtieBuffers() {
267 assert (metaData.getColumn(0) != null);
270 out.print("allocating buffers...");
273 bb = ByteBuffer.allocateDirect(metaData.getRowWidth() * driver.nRows);
275 // initial order of a byte buffer is always BIG_ENDIAN
278 out.println(" [ok]");
281 protected void closeNdbjtieBuffers() {
282 assert (metaData.getColumn(0) != null);
285 out.print("releasing buffers...");
290 out.println(" [ok]");
294 // ----------------------------------------------------------------------
296 public void runOperations() {
298 out.println("running NDB JTie operations ..."
299 + " [nRows=" + driver.nRows + "]");
301 if (driver.doSingle) {
302 if (driver.doInsert) runNdbjtieInsert(TwsDriver.XMode.SINGLE);
303 if (driver.doLookup) runNdbjtieLookup(TwsDriver.XMode.SINGLE);
304 if (driver.doUpdate) runNdbjtieUpdate(TwsDriver.XMode.SINGLE);
305 if (driver.doDelete) runNdbjtieDelete(TwsDriver.XMode.SINGLE);
308 if (driver.doInsert) runNdbjtieInsert(TwsDriver.XMode.BULK);
309 if (driver.doLookup) runNdbjtieLookup(TwsDriver.XMode.BULK);
310 if (driver.doUpdate) runNdbjtieUpdate(TwsDriver.XMode.BULK);
311 if (driver.doDelete) runNdbjtieDelete(TwsDriver.XMode.BULK);
313 if (driver.doBatch) {
314 if (driver.doInsert) runNdbjtieInsert(TwsDriver.XMode.BATCH);
315 if (driver.doLookup) runNdbjtieLookup(TwsDriver.XMode.BATCH);
316 if (driver.doUpdate) runNdbjtieUpdate(TwsDriver.XMode.BATCH);
317 if (driver.doDelete) runNdbjtieDelete(TwsDriver.XMode.BATCH);
321 // ----------------------------------------------------------------------
323 protected void runNdbjtieInsert(TwsDriver.XMode mode) {
324 final String name = "insert_" + mode.toString().toLowerCase();
329 if (mode == TwsDriver.XMode.SINGLE) {
330 for(int i = 0; i < driver.nRows; i++) {
331 ndbjtieBeginTransaction();
333 ndbjtieCommitTransaction();
334 ndbjtieCloseTransaction();
337 ndbjtieBeginTransaction();
338 for(int i = 0; i < driver.nRows; i++) {
341 if (mode == TwsDriver.XMode.BULK)
342 ndbjtieExecuteTransaction();
344 ndbjtieCommitTransaction();
345 ndbjtieCloseTransaction();
351 protected void initTable() {
352 if(table_t0 == null) {
354 final Dictionary dict = ndb.getDictionary();
356 if ((table_t0 = dict.getTable("mytable")) == null)
357 throw new RuntimeException(TwsUtils.toStr(dict.getNdbError()));
361 protected void ndbjtieInsert(int c0) {
363 // get an insert operation for the table
364 NdbOperation op = tx.getNdbOperation(table_t0);
366 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
367 if (op.insertTuple() != 0)
368 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
370 // include exception handling as part of transcoding pattern
372 final CharBuffer str = CharBuffer.wrap(Integer.toString(i));
374 // set values; key attribute needs to be set first
376 ndbjtieTranscode(bb, str);
377 if (op.equal(metaData.getAttr(0), bb) != 0) // key
378 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
379 bb.position(bb.position() + metaData.getColumnWidth(0));
382 ndbjtieTranscode(bb, str);
383 if (op.setValue(metaData.getAttr(1), bb) != 0)
384 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
385 bb.position(bb.position() + metaData.getColumnWidth(1));
387 if (op.setValue(metaData.getAttr(2), i) != 0)
388 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
390 if (op.setValue(metaData.getAttr(3), i) != 0)
391 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
394 if (op.setValue(metaData.getAttr(4), null) != 0)
395 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
398 ndbjtieTranscode(bb, str);
399 if (op.setValue(metaData.getAttr(5), bb) != 0)
400 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
401 bb.position(bb.position() + metaData.getColumnWidth(5));
404 ndbjtieTranscode(bb, str);
405 if (op.setValue(metaData.getAttr(6), bb) != 0)
406 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
407 bb.position(bb.position() + metaData.getColumnWidth(6));
410 ndbjtieTranscode(bb, str);
411 if (op.setValue(metaData.getAttr(7), bb) != 0)
412 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
413 bb.position(bb.position() + metaData.getColumnWidth(7));
416 ndbjtieTranscode(bb, str);
417 if (op.setValue(metaData.getAttr(8), bb) != 0)
418 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
419 bb.position(bb.position() + metaData.getColumnWidth(8));
422 if (op.setValue(metaData.getAttr(9), null) != 0)
423 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
425 if (op.setValue(metaData.getAttr(10), null) != 0)
426 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
428 if (op.setValue(metaData.getAttr(11), null) != 0)
429 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
431 if (op.setValue(metaData.getAttr(12), null) != 0)
432 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
434 if (op.setValue(metaData.getAttr(13), null) != 0)
435 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
437 if (op.setValue(metaData.getAttr(14), null) != 0)
438 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
439 } catch (CharacterCodingException e) {
440 throw new RuntimeException(e);
444 // ----------------------------------------------------------------------
446 protected void runNdbjtieLookup(TwsDriver.XMode mode) {
447 final String name = "lookup_" + mode.toString().toLowerCase();
450 if (mode == TwsDriver.XMode.SINGLE) {
451 for(int i = 0; i < driver.nRows; i++) {
452 ndbjtieBeginTransaction();
454 ndbjtieCommitTransaction();
456 ndbjtieCloseTransaction();
459 ndbjtieBeginTransaction();
460 for(int i = 0; i < driver.nRows; i++) {
463 if (mode == TwsDriver.XMode.BULK)
464 ndbjtieExecuteTransaction();
466 ndbjtieCommitTransaction();
467 for(int i = 0; i < driver.nRows; i++) {
470 ndbjtieCloseTransaction();
476 protected void ndbjtieLookup(int c0) {
477 // get a lookup operation for the table
478 NdbOperation op = tx.getNdbOperation(table_t0);
480 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
481 if (op.readTuple(ndbOpLockMode) != 0)
482 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
484 int p = bb.position();
486 // include exception handling as part of transcoding pattern
487 final CharBuffer str = CharBuffer.wrap(Integer.toString(c0));
489 // set values; key attribute needs to be set first
491 ndbjtieTranscode(bb, str);
492 if (op.equal(metaData.getAttr(0), bb) != 0) // key
493 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
494 bb.position(p += metaData.getColumnWidth(0));
495 } catch (CharacterCodingException e) {
496 throw new RuntimeException(e);
499 // get attributes (not readable until after commit)
500 for(int i = 1; i < metaData.getColumnCount(); i++) {
501 if (op.getValue(metaData.getAttr(i), bb) == null)
502 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
503 bb.position(p += metaData.getColumnWidth(i));
507 protected void ndbjtieRead(int c0) {
508 // include exception handling as part of transcoding pattern
510 //final CharBuffer str = CharBuffer.wrap(Integer.toString(i));
511 //assert (str.position() == 0);
514 int p = bb.position();
515 bb.position(p += metaData.getColumnWidth(0));
517 // not verifying at this time
518 // (str.equals(ndbjtieTranscode(bb_c1)));
519 // (i == bb_c2.asIntBuffer().get());
520 //CharBuffer y = ndbjtieTranscode(bb);
522 ndbjtieTranscode(bb);
523 bb.position(p += metaData.getColumnWidth(1));
525 bb.asIntBuffer().get();
526 bb.position(p += metaData.getColumnWidth(2));
527 bb.asIntBuffer().get();
528 bb.position(p += metaData.getColumnWidth(3));
529 bb.asIntBuffer().get();
530 bb.position(p += metaData.getColumnWidth(4));
533 for(int i = 5; i < metaData.getColumnCount(); i++) {
534 ndbjtieTranscode(bb);
535 bb.position(p += metaData.getColumnWidth(i));
537 } catch (CharacterCodingException e) {
538 throw new RuntimeException(e);
542 // ----------------------------------------------------------------------
544 protected void runNdbjtieUpdate(TwsDriver.XMode mode) {
545 final String name = "update_" + mode.toString().toLowerCase();
548 if (mode == TwsDriver.XMode.SINGLE) {
549 for(int i = 0; i < driver.nRows; i++) {
550 ndbjtieBeginTransaction();
552 ndbjtieCommitTransaction();
553 ndbjtieCloseTransaction();
556 ndbjtieBeginTransaction();
557 for(int i = 0; i < driver.nRows; i++) {
560 if (mode == TwsDriver.XMode.BULK)
561 ndbjtieExecuteTransaction();
563 ndbjtieCommitTransaction();
564 ndbjtieCloseTransaction();
570 protected void ndbjtieUpdate(int c0) {
571 final CharBuffer str0 = CharBuffer.wrap(Integer.toString(c0));
573 final CharBuffer str1 = CharBuffer.wrap(Integer.toString(r));
575 // get an update operation for the table
576 NdbOperation op = tx.getNdbOperation(table_t0);
578 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
579 if (op.updateTuple() != 0)
580 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
582 // include exception handling as part of transcoding pattern
584 // set values; key attribute needs to be set first
586 ndbjtieTranscode(bb, str0);
587 if (op.equal(metaData.getAttr(0), bb) != 0) // key
588 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
589 bb.position(bb.position() + metaData.getColumnWidth(0));
592 ndbjtieTranscode(bb, str1);
593 if (op.setValue(metaData.getAttr(1), bb) != 0)
594 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
595 bb.position(bb.position() + metaData.getColumnWidth(1));
597 if (op.setValue(metaData.getAttr(2), r) != 0)
598 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
600 if (op.setValue(metaData.getAttr(3), r) != 0)
601 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
603 for(int i = 5; i < metaData.getColumnCount(); i++) {
605 ndbjtieTranscode(bb, str1);
606 if (op.setValue(metaData.getAttr(i), bb) != 0)
607 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
608 bb.position(bb.position() + metaData.getColumnWidth(i));
611 } catch (CharacterCodingException e) {
612 throw new RuntimeException(e);
616 // ----------------------------------------------------------------------
618 protected void runNdbjtieDelete(TwsDriver.XMode mode) {
619 final String name = "delete_" + mode.toString().toLowerCase();
622 if (mode == TwsDriver.XMode.SINGLE) {
623 for(int i = 0; i < driver.nRows; i++) {
624 ndbjtieBeginTransaction();
626 ndbjtieCommitTransaction();
627 ndbjtieCloseTransaction();
630 ndbjtieBeginTransaction();
631 for(int i = 0; i < driver.nRows; i++) {
634 if (mode == TwsDriver.XMode.BULK)
635 ndbjtieExecuteTransaction();
637 ndbjtieCommitTransaction();
638 ndbjtieCloseTransaction();
644 protected void ndbjtieDelete(int c0) {
645 // get a delete operation for the table
646 NdbOperation op = tx.getNdbOperation(table_t0);
648 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
649 if (op.deleteTuple() != 0)
650 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
652 int p = bb.position();
654 // include exception handling as part of transcoding pattern
656 final CharBuffer str = CharBuffer.wrap(Integer.toString(c0));
658 // set values; key attribute needs to be set first
660 ndbjtieTranscode(bb, str);
661 if (op.equal(metaData.getAttr(0), bb) != 0) // key
662 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
663 bb.position(p += metaData.getColumnWidth(0));
664 } catch (CharacterCodingException e) {
665 throw new RuntimeException(e);
669 // ----------------------------------------------------------------------
671 protected void ndbjtieBeginTransaction() {
674 // prepare buffer for writing
677 // start a transaction
678 // must be closed with NdbTransaction.close
679 final TableConst table = null;
680 final ByteBuffer keyData = null;
681 final int keyLen = 0;
682 if ((tx = ndb.startTransaction(table, keyData, keyLen)) == null)
683 throw new RuntimeException(TwsUtils.toStr(ndb.getNdbError()));
686 protected void ndbjtieExecuteTransaction() {
689 // execute but don't commit the current transaction
690 final int execType = NdbTransaction.ExecType.NoCommit;
691 final int abortOption = NdbOperation.AbortOption.AbortOnError;
693 if (tx.execute(execType, abortOption, force) != 0
694 || tx.getNdbError().status() != NdbError.Status.Success)
695 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
698 protected void ndbjtieCommitTransaction() {
701 // commit the current transaction
702 final int execType = NdbTransaction.ExecType.Commit;
703 final int abortOption = NdbOperation.AbortOption.AbortOnError;
705 if (tx.execute(execType, abortOption, force) != 0
706 || tx.getNdbError().status() != NdbError.Status.Success)
707 throw new RuntimeException(TwsUtils.toStr(tx.getNdbError()));
709 // prepare buffer for reading
713 protected void ndbjtieCloseTransaction() {
716 // close the current transaction (required after commit, rollback)
717 ndb.closeTransaction(tx);
721 // ----------------------------------------------------------------------
723 protected CharBuffer ndbjtieTranscode(ByteBuffer from)
724 throws CharacterCodingException {
726 final int p = from.position();
728 // read 1-byte length prefix
729 final int l = from.get();
730 assert ((0 <= l) && (l < 256)); // or (l <= 256)?
732 // prepare buffer for reading
733 from.limit(from.position() + l);
736 final CharBuffer to = csDecoder.decode(from);
737 assert (!from.hasRemaining());
739 // allow for repositioning
740 from.limit(from.capacity());
742 assert (to.position() == 0);
743 assert (to.limit() == to.capacity());
747 protected void ndbjtieTranscode(ByteBuffer to, CharBuffer from)
748 throws CharacterCodingException {
750 final int p = to.position();
752 // advance 1-byte length prefix
757 final boolean endOfInput = true;
758 final CoderResult cr = csEncoder.encode(from, to, endOfInput);
759 if (!cr.isUnderflow())
761 assert (!from.hasRemaining());
763 // write 1-byte length prefix
764 final int l = (to.position() - p) - 1;
765 assert (0 <= l && l < 256); // or (l <= 256)?