1. 概述

可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。
吼吼吼,让我们开启这段神奇的“旅途”。

本文主要分成四部分:

  1. 总体流程,让你有个整体的认识

  2. 查询操作

  3. 插入操作

  4. 彩蛋,彩蛋,彩蛋

建议你看过这两篇文章(非必须):

  1. 《MyCAT 源码分析 —— 【单库单表】插入》

  2. 《MyCAT 源码分析 —— 【单库单表】查询》

2. 主流程

  1. MyCATServer 接收 MySQLClient 基于 MySQL协议 的请求,翻译 SQL 成 MongoDB操作 发送给 MongoDBServer

  2. MyCATServer 接收 MongoDBServer 返回的 MongoDB数据,翻译成 MySQL数据结果 返回给 MySQLClient

这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。


Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。

MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。

是不是熟悉的味道。不得不说 JDBC 规范的精妙。

3. 查询操作

SELECT id, name FROM user WHERE name > '' ORDER BY _id DESC;

看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。

1、查询 MongoDB

// MongoSQLParser.javapublic MongoData query() throws MongoSQLException {   if (!(statement instanceof SQLSelectStatement)) {       //return null;       throw new IllegalArgumentException("not a query sql statement");   }   MongoData mongo = new MongoData();   DBCursor c = null;   SQLSelectStatement selectStmt = (SQLSelectStatement) statement;   SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();   int icount = 0;   if (sqlSelectQuery instanceof MySqlSelectQueryBlock) {       MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock) selectStmt.getSelect().getQuery();       BasicDBObject fields = new BasicDBObject();       // 显示(返回)的字段       for (SQLSelectItem item : mysqlSelectQuery.getSelectList()) {           //System.out.println(item.toString());           if (!(item.getExpr() instanceof SQLAllColumnExpr)) {               if (item.getExpr() instanceof SQLAggregateExpr) {                   SQLAggregateExpr expr = (SQLAggregateExpr) item.getExpr();                   if (expr.getMethodName().equals("COUNT")) { // TODO 待读:count(*)                       icount = 1;                       mongo.setField(getExprFieldName(expr), Types.BIGINT);                   }                   fields.put(getExprFieldName(expr), 1);               } else {                   fields.put(getFieldName(item), 1);               }           }       }       // 表名       SQLTableSource table = mysqlSelectQuery.getFrom();       DBCollection coll = this._db.getCollection(table.toString());       mongo.setTable(table.toString());       // WHERE       SQLExpr expr = mysqlSelectQuery.getWhere();       DBObject query = parserWhere(expr);       // GROUP BY       SQLSelectGroupByClause groupby = mysqlSelectQuery.getGroupBy();       BasicDBObject gbkey = new BasicDBObject();       if (groupby != null) {           for (SQLExpr gbexpr : groupby.getItems()) {               if (gbexpr instanceof SQLIdentifierExpr) {                   String name = ((SQLIdentifierExpr) gbexpr).getName();                   gbkey.put(name, Integer.valueOf(1));               }           }           icount = 2;       }       // SKIP / LIMIT       int limitoff = 0;       int limitnum = 0;       if (mysqlSelectQuery.getLimit() != null) {           limitoff = getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());           limitnum = getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());       }       if (icount == 1) { // COUNT(*)           mongo.setCount(coll.count(query));       } else if (icount == 2) { // MapReduce           BasicDBObject initial = new BasicDBObject();           initial.put("num", 0);           String reduce = "function (obj, prev) { " + "  prev.num++}";           mongo.setGrouyBy(coll.group(gbkey, query, initial, reduce));       } else {           if ((limitoff > 0) || (limitnum > 0)) {               c = coll.find(query, fields).skip(limitoff).limit(limitnum);           } else {               c = coll.find(query, fields);           }           // order by           SQLOrderBy orderby = mysqlSelectQuery.getOrderBy();           if (orderby != null) {               BasicDBObject order = new BasicDBObject();               for (int i = 0; i < orderby.getItems().size(); i++) {                   SQLSelectOrderByItem orderitem = orderby.getItems().get(i);                   order.put(orderitem.getExpr().toString(), getSQLExprToAsc(orderitem.getType()));               }               c.sort(order);               // System.out.println(order);           }       }       mongo.setCursor(c);   }   return mongo;}

2、查询条件

// MongoSQLParser.javaprivate void parserWhere(SQLExpr aexpr, BasicDBObject o) {   if (aexpr instanceof SQLBinaryOpExpr) {       SQLBinaryOpExpr expr = (SQLBinaryOpExpr) aexpr;       SQLExpr exprL = expr.getLeft();       if (!(exprL instanceof SQLBinaryOpExpr)) {           if (expr.getOperator().getName().equals("=")) {               o.put(exprL.toString(), getExpValue(expr.getRight()));           } else {               String op = "";               if (expr.getOperator().getName().equals("<")) {                   op = "$lt";               } else if (expr.getOperator().getName().equals("<=")) {                   op = "$lte";               } else if (expr.getOperator().getName().equals(">")) {                   op = "$gt";               } else if (expr.getOperator().getName().equals(">=")) {                   op = "$gte";               } else if (expr.getOperator().getName().equals("!=")) {                   op = "$ne";               } else if (expr.getOperator().getName().equals("<>")) {                   op = "$ne";               }               parserDBObject(o, exprL.toString(), op, getExpValue(expr.getRight()));           }       } else {           if (expr.getOperator().getName().equals("AND")) {               parserWhere(exprL, o);               parserWhere(expr.getRight(), o);           } else if (expr.getOperator().getName().equals("OR")) {               orWhere(exprL, expr.getRight(), o);           } else {               throw new RuntimeException("Can't identify the operation of  of where");           }       }   }}private void orWhere(SQLExpr exprL, SQLExpr exprR, BasicDBObject ob) {   BasicDBObject xo = new BasicDBObject();   BasicDBObject yo = new BasicDBObject();   parserWhere(exprL, xo);   parserWhere(exprR, yo);   ob.put("$or", new Object[]{xo, yo});}

3、解析 MongoDB 数据

// MongoResultSet.javapublic MongoResultSet(MongoData mongo, String schema) throws SQLException {   this._cursor = mongo.getCursor();   this._schema = schema;   this._table = mongo.getTable();   this.isSum = mongo.getCount() > 0;   this._sum = mongo.getCount();   this.isGroupBy = mongo.getType();   if (this.isGroupBy) {       dblist = mongo.getGrouyBys();       this.isSum = true;   }   if (this._cursor != null) {       select = _cursor.getKeysWanted().keySet().toArray(new String[0]);       // 解析 fields       if (this._cursor.hasNext()) {           _cur = _cursor.next();           if (_cur != null) {               if (select.length == 0) {                   SetFields(_cur.keySet());               }               _row = 1;           }       }       // 设置 fields 类型       if (select.length == 0) {           select = new String[]{"_id"};           SetFieldType(true);       } else {           SetFieldType(false);       }   } else {       SetFields(mongo.getFields().keySet());//new String[]{"COUNT(*)"};       SetFieldType(mongo.getFields());   }}
  • 当使用 SELECT* 查询字段时,fields 使用第一条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。

4、返回数据给 MySQL Client

// JDBCConnection.javaprivate void ouputResultSet(ServerConnection sc, String sql)       throws SQLException {   ResultSet rs = null;   Statement stmt = null;   try {       stmt = con.createStatement();       rs = stmt.executeQuery(sql);       // header       List<FieldPacket> fieldPks = new LinkedList<>();       ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs, this.isSpark);       int colunmCount = fieldPks.size();       ByteBuffer byteBuf = sc.allocate();       ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket();       headerPkg.fieldCount = fieldPks.size();       headerPkg.packetId = ++packetId;       byteBuf = headerPkg.write(byteBuf, sc, true);       byteBuf.flip();       byte[] header = new byte[byteBuf.limit()];       byteBuf.get(header);       byteBuf.clear();       List<byte[]> fields = new ArrayList<byte[]>(fieldPks.size());       for (FieldPacket curField : fieldPks) {           curField.packetId = ++packetId;           byteBuf = curField.write(byteBuf, sc, false);           byteBuf.flip();           byte[] field = new byte[byteBuf.limit()];           byteBuf.get(field);           byteBuf.clear();           fields.add(field);       }       // header eof       EOFPacket eofPckg = new EOFPacket();       eofPckg.packetId = ++packetId;       byteBuf = eofPckg.write(byteBuf, sc, false);       byteBuf.flip();       byte[] eof = new byte[byteBuf.limit()];       byteBuf.get(eof);       byteBuf.clear();       this.respHandler.fieldEofResponse(header, fields, eof, this);       // row       while (rs.next()) {           RowDataPacket curRow = new RowDataPacket(colunmCount);           for (int i = 0; i < colunmCount; i++) {               int j = i + 1;               if (MysqlDefs.isBianry((byte) fieldPks.get(i).type)) {                   curRow.add(rs.getBytes(j));               } else if (fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL ||                       fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL - 256)) { // field type is unsigned byte                   // ensure that do not use scientific notation format                   BigDecimal val = rs.getBigDecimal(j);                   curRow.add(StringUtil.encode(val != null ? val.toPlainString() : null, sc.getCharset()));               } else {                   curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset()));               }           }           curRow.packetId = ++packetId;           byteBuf = curRow.write(byteBuf, sc, false);           byteBuf.flip();           byte[] row = new byte[byteBuf.limit()];           byteBuf.get(row);           byteBuf.clear();           this.respHandler.rowResponse(row, this);       }       fieldPks.clear();       // row eof       eofPckg = new EOFPacket();       eofPckg.packetId = ++packetId;       byteBuf = eofPckg.write(byteBuf, sc, false);       byteBuf.flip();       eof = new byte[byteBuf.limit()];       byteBuf.get(eof);       sc.recycle(byteBuf);       this.respHandler.rowEofResponse(eof, this);   } finally {       if (rs != null) {           try {               rs.close();           } catch (SQLException e) {           }       }       if (stmt != null) {           try {               stmt.close();           } catch (SQLException e) {           }       }   }}// MongoResultSet.java@Overridepublic String getString(String columnLabel) throws SQLException {   Object x = getObject(columnLabel);   if (x == null) {       return null;   }   return x.toString();}
  • 当返回字段值是 Object 时,返回该对象.toString()。例如:

mysql> select * from user order by _id asc;+--------------------------+------+-------------------------------+| _id                      | name | profile                       |+--------------------------+------+-------------------------------+| 1                        | 123  | { "age" : 1 , "height" : 100} |

4. 插入操作

// MongoSQLParser.javapublic int executeUpdate() throws MongoSQLException {   if (statement instanceof SQLInsertStatement) {       return InsertData((SQLInsertStatement) statement);   }   if (statement instanceof SQLUpdateStatement) {       return UpData((SQLUpdateStatement) statement);   }   if (statement instanceof SQLDropTableStatement) {       return dropTable((SQLDropTableStatement) statement);   }   if (statement instanceof SQLDeleteStatement) {       return DeleteDate((SQLDeleteStatement) statement);   }   if (statement instanceof SQLCreateTableStatement) {       return 1;   }   return 1;}private int InsertData(SQLInsertStatement state) {   if (state.getValues().getValues().size() == 0) {       throw new RuntimeException("number of  columns error");   }   if (state.getValues().getValues().size() != state.getColumns().size()) {       throw new RuntimeException("number of values and columns have to match");   }   SQLTableSource table = state.getTableSource();   BasicDBObject o = new BasicDBObject();   int i = 0;   for (SQLExpr col : state.getColumns()) {       o.put(getFieldName2(col), getExpValue(state.getValues().getValues().get(i)));       i++;   }   DBCollection coll = this._db.getCollection(table.toString());   coll.insert(o);   return 1;}

5. 彩蛋

老铁,看到这里,来一波微信公众号关注吧?!


©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 路由(一)
  2. 数据库读写分离这个坑,让刚入职的我一脸懵逼!
  3. 老大甩给我 30G 文件,让小黑哥几天内全部导入到数据库
  4. 一条失去条件的动态 SQL,到手的年终奖飞了|文末彩蛋
  5. 这个可视化分析库,让你轻松玩转数据科学!
  6. 什么,3行Python代码就能获取海量数据?
  7. 超赞,20个炫酷的数据可视化大屏(含源码)
  8. NBA投篮数据可视化,4行代码就能实现!
  9. 做动态图表,没有数据?用Python就能获取!

随机推荐

  1. HSQLDB / Oracle - IN子句中的1000多个
  2. PHP mysql_fetch_array得不到数据
  3. linux下mysql下载安装
  4. SQL Server中存储过程比直接运行SQL语句
  5. oracle基础知识总结 part 3 : 三范式,PLSQL
  6. Unable to use slave's temporary direct
  7. CentOS下MySQL的安装和修改用户密码
  8. SQL View查找缺失值,应该很简单
  9. 更大的舞台在等你
  10. sql server查看表空间