当前位置: 首页 > 科技观察

使用Python和Redis超越缓存_0

时间:2023-03-15 23:53:12 科技观察

如果你是Python开发者,那么你一定用过Redis并且认为它是一个很棒的缓存。虽然你的印象是正确的,Redis确实是一个很棒的缓存,但是使用Redis可以解决的问题并不仅限于缓存。我们将探讨Redis和RedisEnterprise的一些其他用途。只是为了好玩,我将使用上一篇文章《使用 Redis 储存地理位置数据》中的大脚怪数据。另外,由于这篇文章的读者是Python开发人员,所以我将使用Python编写这篇文章的所有代码!我在接下来显示的代码中使用了aioredis客户端库,因为它对异步/等待的支持非常好。如果您不熟悉async/await,可以阅读这篇文章,其中提到了async/await如何帮助提高性能。使用Redis构建队列Redis提供了字符串、散列、集合和列表等多种数据结构供使用。这些数据结构是存储数据的好帮手,列表可以作为一个非常好的队列(queue)。为了将列表用作队列,我们??需要使用RPUSH将新项目推到列表的末尾,然后使用LPOP或BLPOP从列表的前面弹出它们。由于Redis对数据库的所有修改都是在单个线程中完成的,因此这些操作是原子的。例如,以下段落将一些Bigfoot跟踪添加到队列中。importasyncioimportaioredisasyncdefmain():redis=awaitaioredis.create_redis('redis://:foobared@localhost:6379/0',encoding='utf-8')awaitasyncio.gather(add_to_queue(redis,'PossiblevocalizationseastofMakanda'),add_to_queue(redis,'在哥伦比亚河附近目击'),add_to_queue(redis,'被所有毛茸茸的生物追赶'))redis.close()waitredis.wait_closed()defadd_to_queue(redis,message):returnredis.rpush('bigfoot:sightings:received',message)asyncio.run(main())importasyncio这个过程非常简单。我们只需要在第18行调用redis.rpush将指定的元素推入队列。接下来是从队列另一端读取元素的代码,同样非常简单。importaioredisfrompprintimportppasyncdefmain():redis=awaitaioredis.create_redis('redis://:foobared@localhost:6379/0',encoding='utf-8')whileTrue:sighting=awaitredis.blpop('bigfoot:sightings:received')pp(sighting)asyncio.run(main())Redis有一些同样很酷的命令,它们不仅可以将列表用作队列,甚至可以用作堆栈。我最喜欢的是BRPOPLPUSH,它阻止并弹出列表右侧的一些元素,然后将弹出的元素推入另一个列表。您可以使用此命令将元素从一个队列传输到另一个队列,这是一个非常好的命令。第11行和第12行的无限循环将等待并打印被推入队列的大脚怪的踪迹。这里使用redis.blpop而不是redis.lpop,因为前者可以阻塞客户端等待列表中的元素返回。让客户端阻塞并等待元素出现比让Redis和Python代码之间的网络无休止地轮询和做无用的工作要高效得多。使用Redis订阅和发送事件Redis提供的一些东西不是数据结构,例如订阅和发布(Pub/Sub)特性就是其中之一。这个特性就像它的名字一样,是Redis内置的一种发布和订阅机制。由于这个特性,我们只需要使用几个命令就可以为我们的Python应用程序添加一个强大的订阅和发布机制。通过执行订阅操作,我们可以发现事件。以下是代码:importasyncioimportaioredisfrompprintimportppasyncdefmain():redis=awaitaioredis.create_redis('redis://:foobared@localhost:6379/0',encoding='utf-8')[channel]=awaitredis.psubscribe('bigfoot:broadcast:channel:*')whileTrue:message=awaitchannel.get()pp(message)asyncio.run(main())redis.psubscribe函数用于模式匹配和非模式匹配redis.subscribe函数都返回Python列表,以便包含可变数量的元素。该程序将解构此列表(Python的解包术语)以获取我想要的频道,然后使用.get进行阻塞调用以等待下一条消息。由于我想接收所有与Bigfoot相关的消息,因此我在该代码的第10行使用redis.psubscribe订阅了Glob样式的模式。通过使用bigfoot:broadcast:channel:*作为模式,客户端将接收所有以bigfoot:broadcast:channel:开头的事件。发布事件很简单,这里是代码:importasyncioimportaioredisasyncdefmain():redis=awaitaioredis.create_redis('redis://:foobared@localhost:6379/0',encoding='utf-8')awaitasyncio.gather(publish(redis,1,'PossiblevocalizationseastofMakanda'),publish(redis,2,'SightingneartheColumbiaRiver'),publish(redis,2,'Chasedbyatallhairycreature'))redis.close()awaitredis.wait_closed()defpublish(redis,channel,message):返回redis。publish(f'bigfoot:broadcast:channel:{channel}',message)asyncio.run(main())值得注意的是,发布和订阅是一种即发即弃机制。如果代码发布了一个事件但没有人在监听,那么该事件就会消失。如果你想让你的事件持久化,可以考虑使用上面提到的队列,或者接下来要介绍的Redis流。这段代码的重点是第18行,它使用非常简单的名称redis.publish将消息发布到所需的频道。使用Redis存储数据流除了发布和订阅,Redis还可以使用流来发布和订阅事件。RedisStreams是一个很大的话题,但是使用它只需要掌握少量的命令。从Python的角度来看,这些命令的用法非常简单,我会一一为大家讲解。下面的代码将向流中添加三个大脚怪目击事件。importasyncioimportaioredisasyncdefmain():redis=awaitaioredis.create_redis('redis://:foobared@localhost:6379/0',encoding='utf-8')awaitasyncio.gather(add_to_stream(redis,1,'PossiblevocalizationseastofMakanda','ClassB'),add_to_stream(redis,2,'SightingneartheColumbiaRiver','ClassA'),add_to_stream(redis,3,'Chasedbyatallhairycreature','ClassA'))redis.close()awaitredis.wait_closed()defadd_to_stream(redis,id,title,分类):为每个新添加的流事件返回redis.xadd('bigfoot:sightings:stream',{'id':id,'title':title,'classification':classification})asyncio.run(main())每个都有一个唯一的标识符,由1970年以来的时间戳(以毫秒为单位)和一个由破折号连接的序列号组成。例如,在我写这篇文章时,1,593,120,357,193毫秒(1.59吉秒)已经过去了1970年1月1日午夜(Unix纪元)。因此,当我运行上面的代码时,该命令将创建一个ID为1593120357193-0的事件。这段代码最重要的部分是第17行和第18行,它们使用redis.xadd函数将目击事件的字段添加到流中。我们可以在添加事件的时候用*来代替具体的ID,这样Redis会根据当前时间自动生成事件的ID,这也是redis.xadd函数的默认行为。如下代码所示,在读取流元素时,我们需要设置一个起始ID。可以看到在第10行,程序将变量last_id设置为0-0,该ID代表流的起始位置。importasyncioimportaioredisfrompprintimportppasyncdefmain():redis=awaitaioredis.create_redis('redis://:foobared@localhost:6379/0',encoding='utf8')last_id='0-0'whileTrue:events=awaitredis.xread(['bigfoot:sightings:stream'],timeout=0,count=5,latest_ids=[last_id])forkey,id,fieldsinevents:pp(fields)last_id=idasyncio.run(main())程序的第12行使用了redis.xreadfunction在0-0之后从流中请求最多5个事件。该调用返回一个列表,程序随后对其进行循环和解构以获取事件的字段和标识符。事件的标识会被存储起来,以便以后调用redis.xread时可以获取新的事件,必要时可以重新读取之前读取过的旧事件。使用Redis作为搜索引擎Redis可以通过模块进行扩展以添加新的命令和功能。有大量模块可用于AI模型服务、图形数据库、时间序列数据库,在这种情况下还包括搜索引擎。RedisSearch是一个强大的搜索引擎,可以以惊人的速度摄取数据。有些人喜欢用它来进行即时搜索,但它也可以用于其他搜索。这是使用此模块的示例:importasyncioimportaioredisfrompprintimportppasyncdefmain():redis=awaitaioredis.create_redis('redis://:foobared@localhost:6379/0',encoding='utf-8')awaitredis.execute('FT.DROP','bigfoot:sightings:search')awaitredis.execute('FT.CREATE','bigfoot:sightings:search','SCHEMA','title','TEXT','classification','TEXT')等待同步。收集(add_document(redis,1,'PossiblevocalizationseastofMakanda','ClassB'),add_document(redis,2,'SightingneartheColumbiaRiver','ClassA'),add_document(redis,3,'Chasedbyatallhairycreature','ClassA'))结果=等待搜索(redis,'chase|east')pp(results)redis.close()awairedis.wait_closed()defadd_document(redis,id,title,classification):returnredis.execute('FT.ADD','bigfoot:sightings:search',id,'1.0','FIELDS','title',title,'classification',classification)defsearch(redis,query):returnredis.execute('FT.SEARCH','bigfoot:sightings:search',query)asyncio.run(main())有了索引后,程序就可以向其中添加文档了。这个操作发生在程序的第27、28行,由FT.ADD命令完成。每个读取的文档都需要一个唯一的ID,一个介于0.0和1.0之间的权重(排名),以及相应的字段。在第12和13行,程序使用FT.CREATE创建了一个索引。索引需要描述程序将添加的每个文档中字段的架构。在这个例子中,程序需要添加Bigfoot的踪迹,文档包含一个标题和一个类别,它们都是文本字段。如程序第31行所示,索引加载文档后,程序可以使用FT.SEARCH命令和具体的查询语句进行查询操作。第20行的特定查询指示RedisSearch在索引中查找包含这些术语之一的文档。在此示例中,查询将返回两个文档。使用Redis作为主数据库Redis可以用作速度惊人的内存存储数据库。下面的代码使用哈希来演示这种用法。散列是一种很棒的数据结构,它可以模拟您要存储的记录类型,并且可以使用数据的主键作为键名的一部分。importasyncioimportaioredisfrompprintimportppasyncdefmain():redis=awaitaioredis.create_redis('redis://:foobared@localhost:6379/0',encoding='utf-8')awaitasyncio.gather(add_sighting(redis,1,'PossiblevocalizationseastofMakanda','ClassB'),add_sighting(redis,2,'SightingneartheColumbiaRiver','ClassA'),add_sighting(redis,3,'Chasedbyatallhairycreature','ClassA'))sightings=awaitasyncio.gather(read_sighting(redis,1),read_sighting(redis,2),read_sighting(redis,3))pp(sightings)redis.close()awaitredis.wait_closed()defadd_sighting(redis,id,title,classification):returnredis.hmset(f'bigfoot:sighting:{id}','id',id,'title',title,'classification',classification)defread_sighting(redis,id):returnredis.hgetall(f'bigfoot:sighting:{id}')asyncio.run(main())你可能会想“如果我关闭服务器怎么办?如果它崩溃了怎么办?那我就没有数据了!”不,不!您可以修改redis.conf文件以通过多种不同方式将数据保存在内存中。另外,如果您使用的是RedisEnterprise,我们也为您提供了相应的解决方案,让您可以直接使用Redis,无需担心持久化问题。为了让您自己尝试这些示例,我将本文涉及的所有代码放在了GitHub上,您可以克隆并开始使用它们。如果您是Docker用户,该项目还有一个名为start-redis.sh的shell脚本,它可以拉取图像并启动运行示例的Redis版本。如果您想在尝试后认真构建一些软件,请注册并试用RedisCloudEssentials。它与您所了解和喜爱的Redis是一样的,唯一的区别是这个Redis是由云管理的,因此您只需要专注于构建您的软件。