]> review.fuel-infra Code Review - packages/trusty/mysql-wsrep-5.6.git/blob
c340e205e5210b7a318865599a5b77ab643bf459
[packages/trusty/mysql-wsrep-5.6.git] /
1 /*
2  *  Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
3  *
4  *  This program is free software; you can redistribute it and/or modify
5  *  it under the terms of the GNU General Public License as published by
6  *  the Free Software Foundation; version 2 of the License.
7  *
8  *  This program is distributed in the hope that it will be useful,
9  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  *  GNU General Public License for more details.
12  *
13  *  You should have received a copy of the GNU General Public License
14  *  along with this program; if not, write to the Free Software
15  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
16  */
17
18 package com.mysql.clusterj.jdbc;
19
20 import com.mysql.clusterj.ClusterJHelper;
21 import com.mysql.clusterj.ClusterJUserException;
22 import com.mysql.clusterj.SessionFactory;
23 import com.mysql.clusterj.core.query.QueryDomainTypeImpl;
24 import com.mysql.clusterj.core.spi.SessionSPI;
25 import com.mysql.clusterj.core.store.Dictionary;
26 import com.mysql.clusterj.core.util.I18NHelper;
27 import com.mysql.clusterj.core.util.Logger;
28 import com.mysql.clusterj.core.util.LoggerFactoryService;
29 import com.mysql.jdbc.Connection;
30 import com.mysql.jdbc.ResultSetInternalMethods;
31 import com.mysql.jdbc.Statement;
32 import com.mysql.clusterj.jdbc.antlr.ANTLRNoCaseStringStream;
33 import com.mysql.clusterj.jdbc.antlr.MySQL51Parser;
34 import com.mysql.clusterj.jdbc.antlr.MySQL51Lexer;
35 import com.mysql.clusterj.jdbc.antlr.QueuingErrorListener;
36 import com.mysql.clusterj.jdbc.antlr.node.Node;
37 import com.mysql.clusterj.jdbc.antlr.node.PlaceholderNode;
38 import com.mysql.clusterj.jdbc.antlr.node.SelectNode;
39 import com.mysql.clusterj.jdbc.antlr.node.WhereNode;
40 import com.mysql.clusterj.query.Predicate;
41
42 import com.mysql.clusterj.jdbc.SQLExecutor.Executor;
43 import java.sql.SQLException;
44 import java.sql.Savepoint;
45 import java.util.ArrayList;
46 import java.util.IdentityHashMap;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.Properties;
50
51 import org.antlr.runtime.CommonTokenStream;
52 import org.antlr.runtime.RecognitionException;
53 import org.antlr.runtime.Token;
54 import org.antlr.runtime.TokenStream;
55 import org.antlr.runtime.tree.CommonErrorNode;
56 import org.antlr.runtime.tree.CommonTree;
57 import org.antlr.runtime.tree.CommonTreeAdaptor;
58 import org.antlr.runtime.tree.TreeAdaptor;
59
60 /** This class implements the behavior associated with connection callbacks for statement execution
61  * and connection lifecycle. There is a clusterj session associated with the interceptor that
62  * is used to interact with the cluster. There is exactly one statement interceptor and one
63  * connection lifecycle interceptor associated with the interceptor.
64  * All of the SQL post-parsing behavior is contained here, and uses classes in the org.antlr.runtime and
65  * com.mysql.clusterj.jdbc.antlr packages to perform the parsing of the SQL statement. Analysis
66  * of the parsed SQL statement occurs here, and clusterj artifacts are constructed for use in
67  * other classes, in particular SQLExecutor and its command-specific subclasses.
68  */
69 public class InterceptorImpl {
70
71     /** Register logger for JDBC stuff. */
72     static {
73         LoggerFactoryService.getFactory().registerLogger("com.mysql.clusterj.jdbc");
74     }
75
76     /** My message translator */
77     static final I18NHelper local = I18NHelper.getInstance(InterceptorImpl.class);
78
79     /** My logger */
80     static final Logger logger = LoggerFactoryService.getFactory().getInstance(InterceptorImpl.class);
81
82     static Map<String, Executor> parsedSqlMap = new IdentityHashMap<String, Executor>();
83
84     /** The map of connection to interceptor */
85     private static Map<Connection, InterceptorImpl> interceptorImplMap =
86             new IdentityHashMap<Connection, InterceptorImpl>();
87
88     /** The connection properties */
89     private Properties properties;
90
91     /** The connection being intercepted */
92     private Connection connection;
93
94     /** The session factory for this connection */
95     SessionFactory sessionFactory;
96
97     /** The current session (null if no session) */
98     private SessionSPI session;
99
100     /** The statement interceptor (only used during initialization) */
101     private StatementInterceptor statementInterceptor;
102
103     /** The connection lifecycle interceptor (only used during initialization) */
104     private ConnectionLifecycleInterceptor connectionLifecycleInterceptor;
105
106     /** The interceptor is ready (both interceptors are registered) */
107     private boolean ready = false;
108
109     private boolean autocommit;
110
111     private static String LOTSOBLANKS = "                                                                          "; 
112
113     /** Create the interceptor.
114      * 
115      * @param connection the connection being intercepted
116      * @param properties the connection properties
117      */
118     public InterceptorImpl(Connection connection, Properties properties) {
119         if (logger.isDebugEnabled()) logger.debug("constructed with properties: " + properties);
120         this.properties = properties;
121         this.connection = connection;
122         // if database name is not specified, translate DBNAME to the required clusterj property
123         String dbname = properties.getProperty("com.mysql.clusterj.database",
124                 properties.getProperty("DBNAME"));
125         properties.put("com.mysql.clusterj.database", dbname);
126     }
127
128     /** Return the interceptor for the connection lifecycle callbacks.
129      * 
130      * @param connectionLifecycleInterceptor the connection lifecycle interceptor
131      * @param connection the connection
132      * @param properties the connection properties
133      * @return the interceptor delegate
134      */
135     public static InterceptorImpl getInterceptorImpl(
136             ConnectionLifecycleInterceptor connectionLifecycleInterceptor,
137             Connection connection, Properties properties) {
138         InterceptorImpl result = getInterceptorImpl(connection, properties);
139         if (result.connectionLifecycleInterceptor != null) {
140             if (result.connectionLifecycleInterceptor != connectionLifecycleInterceptor) {
141                 throw new ClusterJUserException(
142                         local.message("ERR_Duplicate_Connection_Lifecycle_Interceptor"));
143             }
144         } else {
145             result.connectionLifecycleInterceptor = connectionLifecycleInterceptor;
146         }
147         if (result.statementInterceptor != null) {
148             result.ready = true;
149         }
150         return result;
151     }
152
153     /** Return the interceptor for the statement interceptor callbacks.
154      * 
155      * @param statementInterceptor the statement interceptor
156      * @param connection the connection
157      * @param properties the connection properties
158      * @return the interceptor delegate
159      */
160     public static InterceptorImpl getInterceptorImpl(
161             StatementInterceptor statementInterceptor, Connection connection,
162             Properties properties) {
163         InterceptorImpl result = getInterceptorImpl(connection, properties);
164         if (result.statementInterceptor != null) {
165             throw new ClusterJUserException(
166                     local.message("ERR_Duplicate_Statement_Interceptor"));
167         }
168         result.statementInterceptor = statementInterceptor;
169         if (result.connectionLifecycleInterceptor != null) {
170             result.ready = true;
171         }
172         return result;
173     }
174
175     /** Create the interceptor to handle both connection lifecycle and statement interceptors.
176      * 
177      * @param connection the connection
178      * @param properties the connection properties
179      * @return
180      */
181     public static InterceptorImpl getInterceptorImpl(Connection connection, Properties properties) {
182         InterceptorImpl result;
183         synchronized(interceptorImplMap) {
184             result = interceptorImplMap.get(connection);
185             if (result == null) {
186                 result = new InterceptorImpl(connection, properties);
187                 interceptorImplMap.put(connection, result);
188             }
189         }
190         return result;
191     }
192
193     /** Return the interceptor assigned to the connection. If there is no interceptor, return null.
194      * 
195      * @param connection the connection
196      * @return the interceptor for this connection or null if there is no interceptor
197      */
198     public static InterceptorImpl getInterceptorImpl(java.sql.Connection connection) {
199         synchronized (interceptorImplMap) {
200             return interceptorImplMap.get(connection);
201         }
202     }
203
204     @Override
205     public String toString() {
206         return "InterceptorImpl "
207 //        + " properties: "+ properties.toString()
208         ;
209     }
210
211     void destroy() {
212         if (sessionFactory != null) {
213             if (session != null) {
214                 session.close();
215             }
216             sessionFactory.close();
217             sessionFactory = null;
218             synchronized(interceptorImplMap) {
219                 interceptorImplMap.remove(connection);
220             }
221         }
222     }
223
224     public SessionSPI getSession() {
225         if (session == null) {
226             session = (SessionSPI)sessionFactory.getSession();
227         }
228         return session;
229     }
230
231     public boolean executeTopLevelOnly() {
232 //        assertReady();
233         boolean result = true;
234         return result;
235     }
236
237     public ResultSetInternalMethods postProcess(String sql, Statement statement,
238             ResultSetInternalMethods result, Connection connection, int arg4,
239             boolean arg5, boolean arg6, SQLException sqlException) throws SQLException {
240         assertReady();
241         return null;
242     }
243
244     public ResultSetInternalMethods preProcess(String sql, Statement statement,
245             Connection connection) throws SQLException {
246         assertReady();
247         if (statement instanceof com.mysql.jdbc.PreparedStatement) {
248             com.mysql.jdbc.PreparedStatement preparedStatement =
249                 (com.mysql.jdbc.PreparedStatement)statement;
250             // key must be interned because we are using IdentityHashMap
251             String preparedSql = preparedStatement.getPreparedSql().intern();
252             // see if we have a parsed version of this query
253             Executor sQLExecutor = null;
254             synchronized(parsedSqlMap) {
255                 sQLExecutor = parsedSqlMap.get(preparedSql);
256             }
257             // if no cached SQLExecutor, create it, which might take some time
258             if (sQLExecutor == null) {
259                 sQLExecutor = createSQLExecutor(preparedSql);
260                 if (sQLExecutor != null) {
261                     // multiple thread might have created a SQLExecutor but it's ok
262                     synchronized(parsedSqlMap) {
263                         parsedSqlMap.put(preparedSql, sQLExecutor);
264                     }
265                 }
266             }
267             return sQLExecutor.execute(this, preparedStatement.getParameterBindings());
268         }
269         return null;
270     }
271
272     /**
273      * @param preparedSql
274      */
275     private Executor createSQLExecutor(String preparedSql) {
276         if (logger.isDetailEnabled()) logger.detail(preparedSql);
277         Executor result = null;
278         // parse the sql
279         CommonTree root = parse(preparedSql);
280         // get the root of the tree
281         int tokenType = root.getType();
282         // perform command-specific actions
283         String tableName = "";
284         CommonTree tableNode;
285         WhereNode whereNode;
286         List<String> columnNames = new ArrayList<String>();
287         Dictionary dictionary;
288         DomainTypeHandlerImpl<?> domainTypeHandler;
289         QueryDomainTypeImpl<?> queryDomainType = null;
290         switch (tokenType) {
291             case MySQL51Parser.INSERT:
292                 tableNode = (CommonTree)root.getFirstChildWithType(MySQL51Parser.TABLE);
293                 tableName = getTableName(tableNode);
294                 getSession();
295                 dictionary = session.getDictionary();
296                 domainTypeHandler = getDomainTypeHandler(tableName, dictionary);
297                 CommonTree insertValuesNode = (CommonTree)root.getFirstChildWithType(MySQL51Parser.INSERT_VALUES);
298                 CommonTree columnsNode = (CommonTree)insertValuesNode.getFirstChildWithType(MySQL51Parser.COLUMNS);
299                 List<CommonTree> fields = columnsNode.getChildren();
300                 for (CommonTree field: fields) {
301                     columnNames.add(getColumnName(field));
302                 }
303                 if (logger.isDetailEnabled()) logger.detail(
304                         "StatementInterceptorImpl.preProcess parse result INSERT INTO " + tableName
305                         + " COLUMNS " + columnNames);
306                 result = new SQLExecutor.Insert(domainTypeHandler, columnNames);
307                 break;
308             case MySQL51Parser.SELECT:
309                 CommonTree fromNode = (CommonTree)root.getFirstChildWithType(MySQL51Parser.FROM);
310                 if (fromNode == null) {
311                     // no from clause; cannot handle this case so return a do-nothing ParsedSQL
312                     result = new SQLExecutor.Noop();
313                     break;
314                 }
315                 try {
316                     // this currently handles only FROM clauses with a single table
317                     tableNode = (CommonTree) fromNode.getFirstChildWithType(MySQL51Parser.TABLE);
318                     tableName = getTableName(tableNode);
319                 } catch (Exception e) {
320                     // trouble with the FROM clause; log the SQL statement and the parser output
321                     logger.info("Problem with FROM clause in SQL statement: " + preparedSql);
322                     logger.info(walk(root));
323                     result = new SQLExecutor.Noop();
324                     break;
325                 }
326                 getSession();
327                 dictionary = session.getDictionary();
328                 domainTypeHandler = getDomainTypeHandler(tableName, dictionary);
329                 columnsNode = (CommonTree)root.getFirstChildWithType(MySQL51Parser.COLUMNS);
330                 List<CommonTree> selectExprNodes = columnsNode.getChildren();
331                 for (CommonTree selectExprNode: selectExprNodes) {
332                     columnNames.add(getColumnName(getFieldNode(selectExprNode)));
333                 }
334                 String whereType = "empty";
335                 if (logger.isDetailEnabled()) logger.detail(
336                         "SELECT FROM " + tableName
337                         + " COLUMNS " + columnNames);
338                 // we need to distinguish three cases:
339                 // - no where clause (select all rows)
340                 // - where clause that cannot be executed by clusterj
341                 // - where clause that can be executed by clusterj
342                 whereNode = ((SelectNode)root).getWhereNode();
343                 queryDomainType = (QueryDomainTypeImpl<?>) session.createQueryDomainType(domainTypeHandler);
344                 if (whereNode == null) {
345                     // no where clause (select all rows)
346                     result = new SQLExecutor.Select(domainTypeHandler, columnNames, queryDomainType);
347                 } else {
348                     // create a predicate from the tree
349                     Predicate predicate = whereNode.getPredicate(queryDomainType);
350                     if (predicate != null) {
351                         // where clause that can be executed by clusterj
352                         queryDomainType.where(predicate);
353                         result = new SQLExecutor.Select(domainTypeHandler, columnNames, queryDomainType);
354                         whereType = "clusterj";
355                     } else {
356                         // where clause that cannot be executed by clusterj
357                         result = new SQLExecutor.Noop();
358                         whereType = "non-clusterj";
359                     }
360                     if (logger.isDetailEnabled()) logger.detail(walk(root));
361                 }
362                 if (logger.isDetailEnabled()) {
363                     logger.detail(
364                         "SELECT FROM " + tableName
365                         + " COLUMNS " + columnNames + " whereType " + whereType);
366                     logger.detail(walk(root));
367                 }
368                 break;
369             case MySQL51Parser.DELETE:
370                 tableNode = (CommonTree)root.getFirstChildWithType(MySQL51Parser.TABLE);
371                 tableName = getTableName(tableNode);
372                 getSession();
373                 dictionary = session.getDictionary();
374                 domainTypeHandler = getDomainTypeHandler(tableName, dictionary);
375                 whereNode = ((WhereNode)root.getFirstChildWithType(MySQL51Parser.WHERE));
376                 int numberOfParameters = 0;
377                 if (whereNode == null) {
378                     // no where clause (delete all rows)
379                     result = new SQLExecutor.Delete(domainTypeHandler);
380                     whereType = "empty";
381                 } else {
382                     // create a predicate from the tree
383                     queryDomainType = (QueryDomainTypeImpl<?>) session.createQueryDomainType(domainTypeHandler);
384                     Predicate predicate = whereNode.getPredicate(queryDomainType);
385                     if (predicate != null) {
386                         // where clause that can be executed by clusterj
387                         queryDomainType.where(predicate);
388                         numberOfParameters = whereNode.getNumberOfParameters();
389                         result = new SQLExecutor.Delete(domainTypeHandler, queryDomainType, numberOfParameters);
390                         whereType = "clusterj";
391                     } else {
392                         // where clause that cannot be executed by clusterj
393                         result = new SQLExecutor.Noop();
394                         whereType = "non-clusterj";
395                     }
396                     if (logger.isDetailEnabled()) logger.detail(walk(root));
397                 }
398                 if (logger.isDetailEnabled()) logger.detail(
399                         "DELETE FROM " + tableName
400                         + " whereType " + whereType
401                         + " number of parameters " + numberOfParameters);
402                 break;
403             default:
404                 // return a do-nothing ParsedSQL
405                 if (logger.isDetailEnabled()) logger.detail("ClusterJ cannot process this SQL statement: unsupported statement type.");
406                 result = new SQLExecutor.Noop();
407         }
408         return result;
409     }
410
411     private String getPrimaryKeyFieldName(CommonTree whereNode) {
412         String result = null;
413         CommonTree operation = (CommonTree) whereNode.getChild(0);
414         if (MySQL51Parser.EQUALS == operation.getType()) {
415             result = operation.getChild(0).getChild(0).getText();
416         } else {
417             throw new ClusterJUserException("Cannot find primary key in WHERE clause.");
418         }
419         return result;
420     }
421
422     private String walk(CommonTree tree) {
423         StringBuilder buffer = new StringBuilder();
424         walk(tree, buffer, 0);
425         return buffer.toString();
426     }
427
428     @SuppressWarnings("unchecked") // tree.getChildren()
429     private void walk(CommonTree tree, StringBuilder buffer, int level) {
430             String indent = LOTSOBLANKS.substring(0, level);
431             Token token = tree.token;
432             int tokenType = token.getType();
433             String tokenText = token.getText();
434             int childCount = tree.getChildCount();
435             int childIndex = tree.getChildIndex();
436             buffer.append('\n');
437             buffer.append(indent);
438             buffer.append(tokenText);
439             buffer.append(" class: ");
440             buffer.append(tree.getClass().getName());
441             buffer.append(" tokenType ");
442             buffer.append(tokenType);
443             buffer.append(" child count ");
444             buffer.append(childCount);
445             buffer.append(" child index ");
446             buffer.append(childIndex);
447             List<CommonTree> children = tree.getChildren();
448             if (children == null) {
449                 return;
450             }
451             for (CommonTree child: children) {
452                 walk(child, buffer, level + 2);
453             }
454     }
455
456     private CommonTree parse(String preparedSql) {
457         CommonTree result = null;
458         ANTLRNoCaseStringStream inputStream = new ANTLRNoCaseStringStream(preparedSql);
459         MySQL51Lexer lexer = new MySQL51Lexer(inputStream);
460         CommonTokenStream tokens = new CommonTokenStream(lexer);
461         lexer.setErrorListener(new QueuingErrorListener(lexer));
462         tokens.getTokens();
463         if (lexer.getErrorListener().hasErrors()) {
464             logger.warn(local.message("ERR_Lexing_SQ",preparedSql));
465             return result;
466         }
467         PlaceholderNode.resetId();
468         MySQL51Parser parser = new MySQL51Parser(tokens);
469         parser.setTreeAdaptor(mySQLTreeAdaptor);
470         parser.setErrorListener(new QueuingErrorListener(parser));
471         try {
472             CommonTree stmtTree = (CommonTree) parser.statement().getTree();
473             result = stmtTree;
474         } catch (RecognitionException e) {
475             logger.warn(local.message("ERR_Parsing_SQL", preparedSql));
476         }
477         if (parser.getErrorListener().hasErrors()) {
478             logger.warn(local.message("ERR_Parsing_SQL", preparedSql));
479         }
480         return result;
481     }
482
483     private TreeAdaptor mySQLTreeAdaptor = new CommonTreeAdaptor() {
484         public Object create(Token token) { return new Node(token); }
485         public Object dupNode(Object t) {
486             if ( t==null ) return null;
487             return create(((Node)t).token);
488         }
489     };
490
491     private String getTableName(CommonTree tableNode) {
492         return tableNode.getChild(0).getText();
493     }
494
495     private String getColumnName(CommonTree fieldNode) {
496         return fieldNode.getChild(0).getText();
497     }
498
499     private CommonTree getFieldNode(CommonTree selectExprNode) {
500         return (CommonTree)selectExprNode.getChild(0);
501     }
502
503     public void destroy(StatementInterceptor statementInterceptor) {
504     }
505
506     public void destroy(
507             ConnectionLifecycleInterceptor connectionLifecycleInterceptor) {
508     }
509
510     private void assertReady() {
511         if (!ready) {
512             if (statementInterceptor == null) {
513                 throw new ClusterJUserException(local.message("ERR_No_Statement_Interceptor"));
514             }
515             if (connectionLifecycleInterceptor == null) {
516                 throw new ClusterJUserException(local.message("ERR_No_Connection_Lifecycle_Interceptor"));
517             }
518         } else {
519             if (sessionFactory == null) {
520                 sessionFactory = ClusterJHelper.getSessionFactory(properties);
521             }
522         }
523     }
524
525     /** TODO This needs to be rewritten with a proper state machine. */
526     public boolean setAutoCommit(boolean autocommit) throws SQLException {
527         assertReady();
528         logStatus("setAutoCommit(" + autocommit + ")");
529         this.autocommit = autocommit;
530         getSession();
531         if (!autocommit) {
532             // start a transaction
533             if (!session.currentTransaction().isActive()) {
534                 session.begin();
535             }
536         } else {
537             // roll back the previous transaction if active
538             if (session.currentTransaction().isActive()) {
539                 session.rollback();
540             }
541         }
542         return true; // let the driver perform its own autocommit behavior
543     }
544
545     public void close() {
546     }
547
548     public boolean commit() throws SQLException {
549         logStatus("commit");
550         if (session.currentTransaction().isActive()) {
551             session.commit();
552         } else {
553             System.out.println("WARNING: commit called when session.transaction is not active");
554         }
555         session.begin();
556         return true;
557     }
558
559     public boolean rollback() throws SQLException {
560         logStatus("rollback");
561         session.rollback();
562         session.begin();
563         return true;
564     }
565
566     public boolean rollback(Savepoint savepoint) throws SQLException {
567         logStatus("rollback(Savepoint)");
568         return true;
569     }
570
571     public boolean setCatalog(String catalog) throws SQLException {
572         if (logger.isDebugEnabled()) logger.debug("catalog: " + catalog);
573         return true;
574     }
575
576     public boolean transactionCompleted() throws SQLException {
577         logStatus("transactionCompleted");
578         return true;
579     }
580
581     public boolean transactionBegun() throws SQLException {
582         logStatus("transactionBegun");
583         return true;
584     }
585
586     private DomainTypeHandlerImpl<?> getDomainTypeHandler(String tableName, Dictionary dictionary) {
587         DomainTypeHandlerImpl<?> domainTypeHandler = 
588             DomainTypeHandlerImpl.getDomainTypeHandler(tableName, dictionary);
589         return domainTypeHandler;
590     }
591
592     private void logStatus(String s) throws SQLException {
593         if (logger.isDetailEnabled()) {
594             StringBuilder builder = new StringBuilder("In ");
595             builder.append(s);
596             builder.append(" with");
597             if (connection != null) {
598                 builder.append(" connection.getAutocommit: " + connection.getAutoCommit());
599             }
600             if (session != null) {
601                 builder.append(" session.isActive: " + session.currentTransaction().isActive());
602             }
603             builder.append('\n');
604             String message = builder.toString();
605             logger.detail(message);
606         }
607     }
608
609 }