当前位置: 首页 > 科技观察

Rb(redisblaster),一个用于Redis非复制分片的Python库

时间:2023-03-14 19:35:38 科技观察

Rb,redisblaster,是一个用于redis非复制分片的库。它在pythonredis之上实现了一个自定义路由系统,允许您自动定位不同的服务器,而无需手动将请求路由到各个节点。它没有实现redis所做的一切,也没有尝试这样做。您始终可以将客户端连接到特定主机,但主要假设您的操作仅限于可以自动路由到不同节点的基本键/值操作。您可以做什么:针对主机自动执行单键操作。在所有或部分节点上执行命令。并行执行所有这些。在PyPI上可以安装rb,并且可以从那里安装:$pipinstallrbconfigurationrb的入门非常简单。如果您以前一直在使用py-redis,您会感到宾至如归。主要区别在于集群不是连接到单个主机,而是配置为连接到多个:romrbimportClustercluster=Cluster(hosts={0:{'port':6379},1:{'port':6380},2:{'port':6381},3:{'port':6382},4:{'port':6379},5:{'port':6380},6:{'port':6381},7:{'port':6382},},host_defaults={'host':'127.0.0.1',})在这种情况下,我们在同一主机节点上的四个不同服务器进程上有8个设置。hosts参数是要连接的主机的映射。字典的键是主机ID(整数),值是参数字典。host_defaults是一个可选的默认值字典,用于为所有主机填写。如果您想共享一些重复的公共默认值(在这种情况下,所有主机都连接到本地主机),这将很有用。在默认配置中,PartitionRouter用于路由。路由现在集群已经搭建好了,我们可以使用Cluster.get_routing_client()来获取一个redisclient,它会为每个命令自动路由到正确的redis节点:client=cluster.get_routing_client()results={}forkeyinkeys_to_look_up:results[key]=client.get(key)这个客户端的工作方式与标准的pyredisStrictClient非常相似,主要区别在于它只能执行只涉及一个键的命令。然而,这个基本操作是串行运行的。rb之所以有用,是因为它可以自动构建redis管道并向许多主机并行发送查询。但是,这会稍微改变用法,因为现在值不是立即可用的:results={}withcluster.map()asclient:forkeyinkeys_to_look_up:results[key]=client.get(key)whilelookingsofar听起来很相似,但不是在结果字典中存储实际值,而是存储Promise对象。当地图上下文管理器结束时,它们保证已经完成,您可以访问Promise.value属性以获取值:对于键,results.iteritems()中的承诺:打印'%s:%s'%(key,promise.value)如果你想向所有参与的主机发送一个命令(比如删除一个数据库),你可以使用Cluster.all()方法:withcluster.all()asclient:client.flushdb()如果你这样做,promise值是一个字典,主机ID作为键,结果作为值。例如:以cluster.all()作为客户端:results=client.info()forhost_id,infoinresults.iteritems():print'host%sisrunning%s'%(host_id,info['os'])要显式定位某些主机,您可以使用Cluster.fanout()接受要向其发送命令的主机ID列表。API这是公共API的完整参考。请注意,此库扩展了Pythonredis库,因此其中一些类具有更多功能,您需要查阅py-redis库。Clusterclassrb.Cluster(hosts,host_defaults=None,pool_cls=None,pool_options=None,router_cls=None,router_options=None)cluster是rb背后的核心对象。它被保存到每个节点的连接池中,并且可以在应用程序运行期间在一个中心位置共享。四个具有默认路由器的redis实例上的集群的基本示例:cluster=Cluster(hosts={0:{'port':6379},1:{'port':6380},2:{'port':6381},3:{'port':6382},},host_defaults={'host':'127.0.0.1',})hosts是一个主机字典,将主机ID号映射到配置参数。参数对应于add_host()函数的签名。这些参数的默认值是从host_defaults中拉取的。要覆盖池类,可以使用pool_cls和pool_options参数。这同样适用于路由器的router_cls和router_options。pool选项对于设置套接字超时和类似参数很有用。add_host(host_id=None,host='localhost',port=6379,unix_socket_path=None,db=0,password=None,ssl=False,ssl_options=None)向集群添加一个新主机。这仅对单元测试真正有用,因为通常主机是通过构造函数添加的,并且在首次使用集群后更改它们不太可能有意义。all(timeout=None,max_concurrency=64,auto_batch=True)扇出到所有主机。否则与fanout()完全相同。示例:withcluster.all()asclient:client.flushdb()disconnect_pools()断开与内部池的所有连接。execute_commands(mapping,*args,**kwargs)在Redis集群上同时执行与路由键关联的一系列命令,返回一个新映射,其中值是与同一位置的命令对应的结果列表。例如:>>>cluster.execute_commands({...'foo':[...('PING',),...('TIME',),...],...'bar':[...('CLIENT','GETNAME'),...],...}){'bar':[],'foo':[,]}作为redis.client.Script实例的命令将首先检查它们在目标节点上的存在,然后在执行前加载到目标上,并且可以与其他命令交错:>>>fromredis.clientimportScript>>>TestScript=Script(None,'return{KEYS,ARGV}')>>>cluster.execute_commands({...'foo':[...(TestScript,('key:1','key:2'),range(0,3)),...],...'bar':[...(TestScript,('key:3','key:4'),range(3,6)),...],...}){'bar':[],'foo':[]}在内部,FanoutClient用于发出Order。fanout(hosts=None,timeout=None,max_concurrency=64,auto_batch=True)用于获取路由客户端、启动扇出操作和加入结果的快捷上下文管理器。在上下文管理器中,可用的客户端是FanoutClient。用法示例:以cluster.fanout(hosts='all')作为客户端:client.flushdb()get_local_client(host_id)get_local_client(host_id)返回特定主机ID的本地化客户端。这个客户端就像一个普通的Pythonredis客户端一样工作,并立即返回结果。get_local_client_for_key(key)类似于get_local_client_for_key()但根据路由器所说的关键目的地返回客户端。get_pool_for_host(host_id)返回给定主机的连接池。Redis客户端使用这个连接池来确保它不必不断地重新连接。如果你想使用自定义的redis客户端,你可以手动将其作为连接池传入。get_router()返回集群的路由器。如果重新配置集群,将重新创建路由器。通常,您不需要自己与路由器交互,因为集群的路由客户端会自动执行此操作。这将返回BaseRouter的一个实例。get_routing_client(auto_batch=True)返回一个路由客户端。客户端能够自动将请求路由到各种主机。它是线程安全的,可以像主机本地客户端一样使用,但它会拒绝执行不能直接路由到单个节点的命令。路由客户端的默认行为是尝试将满足条件的命令批处理成批处理版本。例如,路由到同一节点的多个GET命令最终可以合并为一个MGET命令。可以通过将auto_batch设置为False来禁用此行为。这对于调试很有用,因为MONITOR将更准确地反映代码中发出的命令。有关详细信息,请参阅路由客户端。map(timeout=None,max_concurrency=64,auto_batch=True)用于获取路由客户端、启动地图操作和连接结果的快捷上下文管理器。max_concurrency定义在隐式连接发生之前可以存在多少未完成的并行查询。在上下文管理器中,可用的客户端是MappingClient。示例用法:results={}withcluster.map()asclient:forkeyinkeys_to_fetch:results[key]=client.get(key)forkey,promiseinresults.iteritems():print'%s=>%s'%(key,promise.value)remove_host(host_id)从客户端移除主机。这仅对单元测试真正有用。Clientsclassrb.RoutingClient(cluster,auto_batch=True)可以路由到单个目标的客户端。有关参数,请参阅Cluster.get_routing_client()。execute_command(*args,**options)执行命令并返回解析后的响应指定的主机而不是使用路由系统。例如,这可用于清空所有主机上的数据库。上下文管理器返回一个FanoutClient。用法示例:以cluster.fanout(hosts=[0,1,2,3])作为客户端:results=client.info()forhost_id,infoinresults.value.iteritems():print'%s->%s'%(host_id,info['is'])返回的承诺将所有结果累积到以host_id为键的字典中。hosts参数是一个host_id列表,或者是字符串“all”,用于将命令发送到所有主机。扇出API需要非常小心地使用,因为当将密钥写入不需要它们的主机时,它会造成很多损害。get_fanout_client(hosts,max_concurrency=64,auto_batch=None)返回线程不安全的扇出客户端。返回FanoutClient的实例。get_mapping_client(max_concurrency=64,auto_batch=None)返回线程不安全的映射客户端。这个客户端像redis管道一样工作并返回最终结果对象。它需要连接才能正常工作。您应该使用自动连接的map()上下文管理器而不是直接使用它。返回MappingClient的一个实例。map(timeout=None,max_concurrency=64,auto_batch=None)返回映射操作的上下文管理器。这会并行运行多个查询,然后在最后加入以收集所有结果。在上下文管理器中,可用的客户端是MappingClient。示例用法:results={}withcluster.map()asclient:forkeyinkeys_to_fetch:results[key]=client.get(key)forkey,promiseinresults.iteritems():print'%s=>%s'%(key,promise.value)classrb.MappingClient(connection_pool,max_concurrency=None,auto_batch=True)路由客户端使用集群的路由器根据执行的redis命令的key自动定位单个节点。有关参数,请参阅Cluster.map()。cancel()取消所有未完成的请求。execute_command(*args,**options)执行命令并返回解析后的响应join(timeout=None)等待所有未完成的响应返回或超时mget(keys,*args)返回相同顺序的值列表作为键mset(*args,**kwargs)根据映射设置键/值。映射是键/值对的字典。键和值都应该是字符串或可通过str()转换为字符串的类型。classrb.FanoutClient(hosts,connection_pool,max_concurrency=None,auto_batch=True)这与MappingClient的工作方式类似,但它不是使用路由器来定位主机,而是向所有手动指定的主机发送命令。结果累积在由host_id键入的字典中。有关参数,请参阅Cluster.fanout()。execute_command(*args,**options)执行命令并返回解析的响应target(hosts)以临时重新定位客户端以进行调用。当必须为单个调用处理主机子集时,这很有用。target_key(key)暂时重新定位客户端以进行一次调用,以专门路由到给定密钥路由到的一个主机。在这种情况下,承诺的结果只是一个主机值而不是字典。新版本1.3。Promiseclassrb.Promise一个Promise对象,它试图为Promise对象镜像ES6API。与ES6Promises不同的是,这个Promise还直接提供了对底层值的访问,并且它有一些稍微不同的静态方法名称,因为这个Promise可以在外部解析。staticall(iterable_or_dict)当所有传递的承诺都解决时,承诺就会解决。您可以传递承诺列表或承诺字典。done(on_success=None,on_failure=None)将一些回调附加到Promise并返回一个Promise。is_pending如果promise仍未决则为True,否则为False。is_rejected如果承诺被拒绝则为真,否则为假。is_resolved如果promise已解决,则为True,否则为False。reason这个承诺被拒绝的原因。reject(reason)以给定的理由拒绝承诺。staticrejected(reason)创建一个被拒绝的具有特定值的promise对象。resolve(value)用给定的值解析承诺。staticresolved(value)创建一个使用特定值解析的promise对象。then(success=None,failure=None)将成功和/或失败回调添加到Promise的实用方法,该方法还将在此过程中返回另一个Promise。value此承诺在解决后所持有的价值。Routersclassrb.BaseRouter(cluster)所有路由的基类。如果你想实现自定义路由,这是适合你的子类。cluster指向这个路由器所属的集群。get_host_for_command(command,args)返回执行该命令的主机。get_host_for_key(key)执行路由并返回目标的host_id。子类需要实现这一点。get_key(command,args)返回命令操作的键。classrb.ConsistentHashingRouter(cluster)基于一致性哈希算法返回host_id的路由器。一致性哈希算法仅在提供关键参数时才起作用。路由器要求主机是无间隙的,即N台主机的ID范围从0到N-1。get_host_for_key(key)执行路由并返回目标的host_id。子类需要实现这一点。classrb.PartitionRouter(cluster)一个简单的路由器,它仅根据简单的crc32%node_count设置将命令单独路由到单个节点。路由器要求主机是无间隙的,即N台主机的ID范围从0到N-1。get_host_for_key(key)执行路由并返回目标的host_id。子类需要实现这一点。exceptionrb.UnroutableCommand如果发出的命令不能通过路由器路由到单个主机则引发。Testingclassrb.testing.TestSetup(servers=4,databases_each=8,server_executable='redis-server')测试设置是生成多个redis服务器进行测试并自动关闭它们的便捷方式。这可以用作自动终止客户端的上下文管理器。rb.testing.make_test_cluster(*args,**kwargs)创建测试设置然后从中创建集群的便捷快捷方式。这必须用作上下文管理器:fromrb.testingimportmake_test_clusterwithmake_test_cluster()ascluster:...