1。概述MyCAT支持跨库表连接,当前版本只支持跨库双表连接。尽管如此,它已经能够满足我们大部分的业务场景。而且join太多表可能带来的性能问题也很麻烦。本文主要分享:总体流程、调用时序图核心代码分析预读:《MyCAT 源码分析 —— 【单库单表】查询》。好了,走吧。2、主要流程在执行跨库双表JoinSQL时,经历的大致流程如下:在SQL上,需要添加注解/*!mycat:catlet=io.mycat.catlets.ShareJoin*/${SQL}。RouteService#route(...)解析注解mycat:catlet后,将路由发送给HintCatletHandler做进一步处理。HintCatletHandler获取注解对应的Catlet实现类,io.mycat.catlets.ShareJoin是其中一个实现(目前只有一个实现),提供了跨库join两个表的功能。从类命名的角度来看,未来ShareJoin很有可能提供完整的跨库跨表的join功能。内核代码如下://HintCatletHandler.javapublicRouteResultsetroute(SystemConfigsysConfig,SchemaConfigschema,intsqlType,StringrealSQL,Stringcharset,ServerConnectionsc,LayerCachePoolcachePool,StringhintSQLValue,inthintSqlType,MaphintMap)throwsSQLNonTransientException{StringcateletClass=hintSQLValue){DebugEnable().loadcateletclass:"+hintSQLValue+"torunsql"+realSQL);}try{Catletcatlet=(Catlet)MycatServer.getInstance().getCatletClassLoader().getInstanceofClass(catetClass);catlet.route(sysConfig,schema,sqlType,realSQL,charset,sc,cachePool);catlet.processSQL(realSQL,newEngineCtx(sc.getSession2()));}catch(Exceptione){LOGGER.warn("catleterror"+e);thrownewSQLNonTransientException(e);}returnnull;}3.ShareJoin当前支持跨库双表Join。ShareJoin将SQL拆分为左表SQL和右表SQL,分别发送给各个数据节点执行,汇总数据结果,合并后返回。伪代码如下://SELECTu.id,o.idFROMt_ordero//INNERJOINt_useruONo.uid=u.id//[Order]查询左表StringleftSQL="SELECto.id,u.idFROMt_ordero";ListleftList=dn[0.select(leftSQL)+dn[1].select(leftSQL)+...+dn[n].select(leftsql);//【并行】查询右表StringrightSQL="SELECTu.idFROMt_useruWHEREu.idIN(${leftList.uid})";for(dn:dns){//这里是并行执行,使用回调逻辑for(rightRecord:dn.select(rightSQL)){//查询右表//合并结果for(leftRecord:leftList){if(leftRecord.uid==rightRecord.id){write(leftRecord+leftRecord.uid拼接结果);}}}}实际情况会更复杂,我们稍微往下看。3.1JoinParserJoinParser负责解析SQL。整体流程如下:例如/*!mycat:catlet=io.mycat.catlets.ShareJoin*/SELECTo.id,u.usernamefromt_orderojoint_useruono.uid=u.id;TableFilter解析后结果如下:tName:表名tAlia:表自定义命名where:过滤条件order:排序条件parenTable:leftjoin连接表名。t_user表的join属性的parenTable为“o”,即t_order。joinParentkey:左连接的连接字段joinKey:连接字段。t_user表是join属性中的id。加入:子表过滤器。即,连接右侧的表。parent:相对于join属性。看到这里,你可能会有疑问,为什么要把SQL解析成TableFilter。JoinParser根据TableFilter生成数据节点执行SQL。代码如下://TableFilter.javapublicStringgetSQL(){Stringsql="";//fieldsfor(Entryentry:fieldAliasMap.entrySet()){Stringkey=entry.getKey();Stringval=entry.getValue();if(val==null){sql=unionsql(sql,getFieldfrom(key),",");}else{sql=unionsql(sql,getFieldfrom(key)+"as"+val,",");}}//whereif(parent==null){//on/where等于左表StringparentJoinKey=getJoinKey(true);//fixsharejoinbug://(AbstractConnection.java:458)-closeconnection,reason:programerr:java.lang.IndexOutOfBoundsException://原因是左表的select列不包含join列,得到结果时报以上错误if(sql!=null&&parentJoinKey!=null&&!sql.toUpperCase().contains(parentJoinKey.trim().toUpperCase())){sql+=","+parentJoinKey;}sql="select"+sql+"from"+tName;if(!(where.trim().equals(""))){sql+="where"+where.trim();}}else{//on/where等于右边的表if(allField){sql="select"+sql+"from"+tName;}else{sql=unionField("select"+joinKey,sql,",");sql=sql+"from"+tName;//sql="select"+joinKey+","+sql+"from"+tName;}if(!(where.trim().equals(""))){sql+="where"+where.trim()+"and("+joinKey+"in%s)";}else{sql+="where"+joinKey+"in%s";}}//orderif(!(order.trim().equals(""))){sql+="orderby"+order.trim();}//limitif(parent==null){if((rowCount>0)&&(offset>0)){sql+="limit"+offset+","+rowCount;}else{if(rowCount>0){sql+="limit"+rowCount;}}}returnsql;}当父为空时为on/where等号左边的表例如:selectid,uidfromt_order当parent不为空时为on/where等号右边的表sign.例如:selectid,usernamefromt_userwhereidin(1,2,3)3.2ShareJoin.processSQL(...)SQL解析后生成左边表执行的SQL并发送到相应的数据节点查询数据。大体流程如下:当SQL为/*!mycat:catlet=io.mycat.catlets.ShareJoin*/SELECTo.id,u.usernamefromt_orderojoint_useruono.uid=u.id;,sql=getSql()的返回结果是selectid,uidfromt_order。左边表执行的SQL生成后,依次发送到对应的数据节点查询数据。具体的顺序查询如何实现,我们看下一章BatchSQLJob。3.3BatchSQLJobEngineCtx封装了BatchSQLJob,在上层提供了两个方法:executeNativeSQLSequnceJob:在每个数据节点上顺序(非并发)执行SQL任务executeNativeSQLParallJob:在每个数据节点上并发执行SQL任务核心代码如下://EngineCtx.javapublicvoidexecuteNativeSQLSequnceJob(String[]dataNodes,Stringsql,SQLJobHandlerjobHandler){for(StringdataNode:dataNodes){SQLJobjob=newSQLJob(jobId.incrementAndGet(),sql,dataNode,jobHandler,this);bachJob.addJob(job,false);}}publicvoidexecuteNativeSQLParallJob(String[]dataNodes,Stringsql,SQLJobHandlerjobHandler){for(StringdataNode:dataNodes){SQLJobjob=newSQLJob(jobId.incrementAndGet(),sql,dataNode,jobHandler,this);bachJob.addJob(job,true);}}BatchSQLJob通过listoftasksbeingexecuted,要执行的任务列表,实现顺序/并发执行任务。核心代码如下://BatchSQLJob.java/***执行任务列表*/privateConcurrentHashMaprunningJobs=newConcurrentHashMap();/***待执行任务列表*/privateConcurrentLinkedQueuewaitingJobs=newConcurrentLinkedQueue();publicvoidaddJob(SQLJobnewJob,booleanparallExecute){if(parallExecute){runJob(newJob);}else{waitingJobs.offer(newJob);if(runningJobs.isEmpty()){//如果不是正在执行的Tasks是从等待执行的队列。SQLJobjob=waitingJobs.poll();if(job!=null){runJob(job);}}}}publicbooleanjobFinished(SQLJobsqlJob){runningJobs.remove(sqlJob.getId());SQLJobjob=waitingJobs.poll();if(job!=null){runJob(job);returnfalse;}else{if(noMoreJobInput){returnrunningJobs.isEmpty()&&waitingJobs.isEmpty();}else{returnfalse;}}}当runningJobs存在执行时任务,#addJob(...)不会立即执行,它被添加到waitingJobs中。当SQLJob完成时,将调用序列中的下一个任务。并发执行时,当执行#addJob(...)时,会立即执行。SQLJobSQL异步执行作业。它的jobHandler(SQLJobHandler)属性,当SQL执行返回结果时,会回调实现异步执行。在ShareJoin中,SQLJobHandler有两个实现:ShareDBJoinHandler和ShareRowOutPutDataHandler。前者,左表执行的SQL回调;后者,由右表执行的SQL回调。3.4ShareDBJoinHandlerShareDBJoinHandler,左表执行的SQL回调。流程如下:#fieldEofResponse(...):接收数据节点返回的字段,放入内存。#rowResponse(...):接收数据节点返回的行,存入内存。#rowEofResponse(...):接收到数据节点后返回所有行。当所有数据节点执行完SQL后,提交右边表执行的SQL任务,并行执行,即图中的#createQryJob(...)。sql=getChildSQL()当SQL是/*!mycat:catlet=io.mycat.catlets.ShareJoin*/SELECTo.id,u.usernamefromt_orderojoint_useruono.uid=u.id;返回结果是selectid,usernamefromt_userwhereidin(1,2,3)。核心代码如下://ShareJoin.javaprivatevoidcreateQryJob(intbatchSize){intcount=0;MapbatchRows=newConcurrentHashMap();StringtheId=null;StringBuildersb=newStringBuilder().append('(');Stringsvalue="";for(Map.Entrye:ids.entrySet()){theId=e.getKey();byte[]rowbyte=rows.remove(theId);if(rowbyte!=null){batchRows.put(theId,rowbyte);}if(!svalue.equals(e.getValue())){if(joinKeyType==Fields.FIELD_TYPE_VAR_STRING||joinKeyType==Fields.FIELD_TYPE_STRING){//joinkey为varcharsb.append("'").append(e.getValue()).append("'").append(',');//('digdeep','yuanfang')}else{//默认joinkey为int/longsb.append(e.getValue()).append(',');//(1,2,3)}}svalue=e.getValue();if(count++>batchSize){break;}}if(count==0){return;}jointTableIsData=true;sb.deleteCharAt(sb.length()-1).append(')');Stringsql=String.format(joinParser.getChildSQL(),sb);getRoute(sql);ctx.executeNativeSQLParallJob(getDataNodes(),sql,newShareRowOutPutDataHandler(this,fields,joinindex,joinParser.getJoinRkey(),batchRows,ctx.getSession()));}3.5ShareRowOutPutDataHandlerShareRowOutPutDataHandler,右表执行的SQL回调过程如下:#fieldEofResponse(...):接收数据节点返回字段,返回头给MySQLClient。#rowResponse(...):接收数据节点返回的行,匹配左表中的记录,将合并后的行返回给MySQLClient。#rowEofResponse(...):当所有行都返回后,将eof返回给MySQLClient。核心代码如下://ShareRowOutPutDataHandler.javapublicbooleanonRowData(StringdataNode,byte[]rowData){RowDataPacketrowDataPkgold=ResultSetUtil.parseRowData(rowData,bfields);//复制一个batchRowsMapbatchRowsCopy=newConcurrentHashMap();batchRowsCopy.putAll(arows);//获取Id字段,Stringid=ByteUtil.getString(rowDataPkgold.fieldValues.get(joinR));//查找ID对应的A表的记录byte[]arow=getRow(batchRowsCopy,id,joinL);while(arow!=null){RowDataPacketrowDataPkg=ResultSetUtil.parseRowData(arow,afields);//ctx.getAllFields());for(inti=1;i0){StringrowValue=newString(columnData);middlerResultHandler.add(rowValue);}//}}}arow=getRow(batchRowsCopy,id,joinL);}returnfalse;}4.对于涉及到的核心类,有兴趣的同学可以看看ShareJoin其他不支持的功能:只支持innerjoin,不支持leftjoin、rightjoin等连接。不支持排序方式。不支持groupby和相关的聚合函数。即使未声明返回连接左表的字段,也会返回字段。好了,MyCAT弱XA源码继续走!