当前位置: 首页 > 科技观察

distributeTemplate分布式配置,上下文同步维护

时间:2023-03-12 04:38:41 科技观察

我们也是按照这种循序渐进的方式来实现分发的。第一个是公共资源对象。第一个必要的公共资源对象是配置。这个配置交由用户对外填写手动配置,并且可以保持同步更新,所以需要一个配置对象。维护在内存中,需要事件驱动来监听配置文件的变化。好吧,让我们看看代码是如何完成的。首先,iniconfpropxml有多种配置方式。本质上,我们的分布式方法需要定位每个主机。所有节点都可以知道我在网络上的情况,也可以找zookeeper之类的。只有当他们知道或找到它时,他们才能进行未来的通信同步。为了以后支持多种配置,我们先将配置定义为接口publicinterfaceIConfig{publicstaticfinalStringDEFAULTFILE="distribute.conf";/****从文件读取配置初始化*@paramfile*添加(修改)人:zhuyuping*/publicvoidreadConfigFormFile(Stringfile);/***从用户提供的json字符串初始化*@paramjson*添加(修改)人:zhuyuping*/publicvoidreadConfigFormString(Stringjson);/***获取系统维护的内存表memory**添加(修改)人:zhuyuping*/publicTablegetConfig();/***Ingestcontextcontext*@paramcontext*添加(修改)人:zhuyuping*/publicvoidsetContext(Contextcontext);/***获取上下文context*@return*Add(modify)person:zhuyuping*/publicContextgetContext();publicBiMapgetAlias();}主要有三种方法,一种是将配置文件读入内存对象,另一种是写入内存对象到文件。对于分布式,我们需要同步。有时我们可能需要历史版本。能够恢复容错,因此可能需要版本号来记录当前版本,然后将配置保留在可能的多个节点上更改文件后,可以在发送请求时更新订单。后面我会加一个分布式的先进先出队列。如果这里没有添加,稍后添加。还有一个上下文对象。有人会说为什么需要上下文对象来保存变量?很多框架都有上下文对象,可能是拦截器模式,可能是门面模式等其他模式,我这里就不说了。实际上,上下文只是针对本地节点的各个功能代码段。一座桥,有句话叫两情若长久,何来朝夕?我们有一个鹊桥,其中上下文最重要的本质就是维护本地节点的上下文。如果上下文中有一个门面,那么它就有一个门面。该函数方便用户随时获取门面对象进行操作,ok我们来看看context是如何定义的/******@authorzhuyuping*@version1.0*@created2014-7-9*@function:thecontextinterface只是存储上面所有用户类的过程中的变量,不是config配置,在分布式中不会同步的,只会在单点上有效。记住,后面如果要支持xml配置或者其他配置,可以加一个策略模式*/publicinterfaceContext{publicfinalstaticStringdefaultConfig="distribute.conf";//默认配置名publicvoidputValue(Stringkey,Objectvalue);publicObjectgetValue(Stringkey);publicvoidsetCurrHost(Stringhost,intport);publicInetSocketAddressgetCurrHost();/***获取默认配置文件*@return*添加(修改)人:zhuyuping*/publicStringgetDefaultFc();///**//*设置名字defaultpropertyfile//*@parampfile//*添加(修改)person:zhuyuping//*///publicvoidsetDefaultFc(Stringpfile);///**//*直接通过context注入templatefacade供后面使用如果你想集成springApplicationContextAware//*@paramdistributedTemplate//*添加(修改)人:zhuyuping//*///publicvoidsetTemplate(DistributedOperationsdistributedTemplate);////publicDistributedOperationsgetTemplate();}这里其实是一个map来保存属性键值,常用的拿出来作为一个方法。这个context是因为后面我们给用户继承,让用户自己实现context或者交给其他frameworkcontext集成,所以我们实现了一个抽象的默认实现/******@authorzhuyuping*@version1。0*@created2014-7-9下午5:58:37*@function:abstractcontext主要是管理context的资源,提供自定义集成spring来使用这个类//这里,后期需要实现集成策略*/publicabstractclassAbstractContextimplementsContext{//?也可以使用LocalThread也可以privateMapcontext=Maps.newConcurrentMap();privateInetSocketAddresscurrHost;//当前主机如192.168.0.18888privateStringdfConfig;//默认读取的配置文件,用户提供时修改它。如果不提供,默认为publicAbstractContext(StringdfConfig){super();this.dfConfig=dfConfig;//当前类下的文件//currentPortthis.currHost=newInetSocketAddress(ConfigFactory.load(dfConfig).getString("client.currentHost"),ConfigFactory.load(dfConfig).getInt("client.currentPort"));}@OverridepublicInetSocketAddressgetCurrHost(){returncurrHost;}@OverridepublicvoidsetCurrHost(Stringhost,intport){this.currHost=newInetSocketAddress(host,port);}@OverridepublicStringgetDefaultFc(){returndfConfig!=null?dfConfig:defaultConfig;}publicAbstractContext(){super();this.dfConfig=defaultConfig;this.currHost=newInetSocketAddress(ConfigFactory.load(defaultConfig).getString("client.currentHost"),ConfigFactory.load(defaultConfig).getInt("client.currentPort"));}@OverridepublicvoidputValue(Stringkey,Objectvalue){context.put(key,value);}@OverridepublicObjectgetValue(Stringkey){returncontext.get(key);}}ok很简单maintainer然后回到刚才的配置,首先我们要从文件中读取配置文件到配置中object,this是供用户修改或初始化,将配置文件初始化为配置内存对象,然后同步配置文件改变网络通信时会用到这个内存对象。当世界上所有节点都倒下时,简单上下文只维护这个节点的桥接信息。这还不够,因为它不会同步,这就是需要它的原因。我这里使用的配置文件是conf,也就是configLoad方法。后面我会逐步增加更多的支持方式。无非就是读xml之类的问题,这个不重要,思路重要/******@authorzhuyuping*@version1.0*@created2014-7-9pm4:07:00如果你后面需要支持xml,只需要使用策略模式*@功能:基类config主要维护配置基本信息和配置信息的同步备份,同时在内存中维护一个内存表table*/publicabstractclassAbstractConfigimplementsIConfig{/***当前配置表行为主机别名,必须有一列是版本号AotmicLong,配置相关字段值与对象相关*/protectedTableconfig=HashBasedTable.create();//表信息表信息不需要用whichConcurrentHashMap因为这个只会加载读取//不会被修改因为这个表会在用户使用的时候同步更新到文件实际上更新的是config中的,并且每次用户提供查询配置//不会更新到文件protectedAtomicLongversion=newAtomicLong(0);//初始化版本为0;//判断当前版本是否有config中的protectedBiMapalias=HashBiMap.create();protectedContextcontext;publicBiMapgetAlias(){returnalias;}/***context需要提供当前host一个nd*@paramcontext*/publicAbstractConfig(Contextcontext){super();this.context=context;wrapConfig(ConfigFactory.load(context.getDefaultFc()));}@OverridepublicvoidsetContext(Contextcontext){this.context=context;}@OverridepublicContextgetContext(){returncontext;}@OverridepublicvoidreadConfigFormFile(Stringfile){Configconfig=TypeSafeConfigLoadUtils.loadFromFile(newFile(file));wrapConfig(config);}@OverridepublicvoidreadConfigFormString(Stringjson){Configconfig=TypeSafeConfigLoadUtils.loadFromString(json);wrapConfig(config);}/***初始化配置表*@paramconfig*添加(修改)人:zhuyuping*/protectedabstractvoidwrapConfig(Configconfig);/***从内存中读取表并改写到配置文件中*@paramconfig*添加(修改)人:zhuyuping*/protectedabstractStringwrapTable(Tableconfig);/***只保留最新的5个版本,可以回滚更新****添加(修改)人:zhuyuping*/publicvoidupdateVersion(Longversion){}/***版本号更新*需要添加(更新后修改))人:zhuyuping*/publicvoidaddVersion(){Longv=version.getAndIncrement();//TODO需要通知所有节点我要修改版本。如果几个人同时做同样的事情,那么接受节点下次更新的版本号,//在回调函数中更新配置,然后只同步5个版本}@OverridepublicTablegetConfig(){returnconfig;}/****提交对文件配置的修改*添加(修改)人:zhuyuping*/protectedabstractvoidcommit();/****提交对配置的修改。如果有人更改了节点上的配置,他需要检查版本并重新更新本地配置文件。*添加(修改)人:zhuyuping*/protectedabstractvoidsync();}这里我直接使用guava的表,方便维护。其实可以用List来实现。这里重要的是获取所有主机的列表,然后将配置文件写入con文本对象当然是上面提到的对内存对象的读取配置,而将内存对象写入配置文件是基础,接下来看看是怎么写的。为了以后支持xml,读方法和写方法交给后续的子类实现类来实现,只需要实现这两个方法即可/******@authorzhuyuping*@version1。0*@created2014-7-9pm10:38:05*@function:默认配置允许用户实现自定义配置规则只需要继承AbstractConfig*/publicclassSimpleDistributeConfigextendsAbstractConfig{publicSimpleDistributeConfig(Contextcontext){super(context);}@OverrideprotectedvoidwrapConfig(ConfigconfigObj){//获取所有节点Listnodes=configObj.getObjectList("server.hosts");inti=0;for(ConfigObjectnode:nodes){i++;//如果后面添加其他mysql支持,这里需要加上判断//Integerlocalport=node.containsKey("localPort")?Integer.parseInt(node.get("localPort").render()):LOCALPORT;//Integer.parseInt(node.get("localPort").render());IntegerremotePort=Integer.parseInt(node.get("remotePort").render());StringremoteIp=node.get("remoteHost").unwrapped().toString();//远程主机的ip//开始初始化配置tableStringname=node.containsKey("name")?node.get("name").unwrapped().toString():remoteIp;//主机别名InetSocketAddressremoteHost=newInetSocketAddress(remoteIp,remotePort);super.alias.put(name,remoteHost);super.config.put(remoteHost,"版本",super.version.incrementAndGet());super.config.put(remoteHost,"remoteHost",remoteHost);//super.config.put(remoteHost,"localPort",localport);super.config.put(remoteHost,"remotePort",remotePort);super.config.put(remoteHost,"name",name);if(node.containsKey("file")){HashMapfcs=(HashMap)node.get("file")。unwrapped();StringsyncPath=fcs.get("syncPath").toString();//文件同步路径//System.out.println("SimpleDistributeConfig.wrapConfig()"+syncPath);super.config.put(remoteHost,"file",syncPath);//以后配置比较多的时候,会打包成一个bean存放在}}Stringchost=configObj.getString("client.currentHost");intport=configObj.getInt("client.currentPort");super.context.setCurrHost(chost,port);//config.root().containsKey(key)//config.getString(path);}@OverrideprotectedStringwrapTable(Tabletable){StringBuildersb=newStringBuilder("server{");sb.append("\n\t").append("hosts=[");Setrows=table.rowKeySet();intsize=rows.size();inti=0;for(Stringrow:rows){i++;Mapmap=table.row(row);if(!map.containsKey("remoteHost"))continue;sb.append("\t{");ObjectremoteHost=map.get("remoteHost");ObjectremotePort=map.get("remotePort");//ObjectlocalPort=map.get("localPort");Objectname=map.get("name");sb.append("name=").append(name).append("\n\t");//sb.append("localPort=").append(localPort).append("\n\t");sb.append("remoteHost=").append(remoteHost).append("\n\t");sb.append("remotePort=").append(remotePort).append("\n\t");if(map.containsKey("file")){sb.append("file{").append("\n\t").append("syncPath=").append(map.get("syncPath")).append("}");}sb.append("\t}");if(i!=size){sb.append(",");}}sb.append("]").append("\n\t").append("}");//继续保存clientsb.append("\n\t").append("client{").append("\n\t").append("currentHost=").append(context.getCurrHost().getHostString()).append("\n\t");sb.append("\n\t").append("currentPort=").append(context.getCurrHost().getPort()).append("\n\t");sb.append("}");returnsb.toString();}@Overrideprotectedvoidcommit(){}@Overrideprotectedvoidsync(){}publicstaticvoidmain(String[]args){}}ok基本的简单配置和context,用于同步和充当桥梁的都已经搞定了。下面介绍如何监听配置文件的变化。有人说如何监控一个配置文件的变化。其实这个文件有一个属性***changetime,只要听这个,就可以对初级版本不加任何改动,随时可以加,以后的改动,只写相关的方法不用添加importjava.io.File;publicinterfaceFileListener{publicvoidfileChanged(Filefile);}然后用timer定时器或者线程池定时线程轮流训练他。注意这里的弱应用。其实对于这些时候,我建议使用LinkTransferQueue,它是一个FIFO先进先出的非阻塞队列,然后加上如果引用,它保证了一些时间顺序,然后通知接口我们正在对界面做相应的改动来配置和同步配置相关的操作。本章的基本思路是同步资源配置上下文由桥接事件驱动,引擎有支出。timerexcutors.newSchulerXXXX还有很多其他方法可以实现轮训。这个方法也可以实现心跳线。同时给大家介绍一下Apache和camel。已经托管在http://git.oschina.net/zhuyuping/distributeTemplate