Redis 是一款内存高速缓存数据库。其底层使用 C 语言编写,是一个 key-value 存储系统。支持丰富的数据类型,如:String,List,Set,zSet,Hash 以及 5.0 版本新增的 Stream。
读写性能优异:Redis 能读的速度是 110000 次/s,写的速度是 81000 次/s
数据类型丰富:支持 String,List,Set,zSet,Hash 以及 Stream 数据类型操作
原子性:Redis 所有的操作都是原子性的,并且支持多个操作合并之后原子性执行
丰富的特性:支持发布、订阅,支持通知,支持 key 过期设置等特性
持久化:有 RDB 和 AOF 等多种持久化方式
发布订阅:支持发布订阅模式
分布式:Redis Cluster
基于如此多的优点,Redis 可用于缓存,事件发布或订阅,高速队列等场景。支持网络,提供字符串,哈希,列表,队列,集合结构直接存储,基于内存,可以持久化。
缓存是 Redis 最常见的应用场景,之所以这么使用,主要是因为 Redis 的读写性能优越。而且主键有取代 memcached,成为首选服务器缓存的组件。除此之外,Redis 内部还是支持事务,在使用的时候有效的保证了数据的一致性。
作为缓存使用时候,一般通过两种方式保存数据:
读取前,先读取 Redis,如果没有数据,再读取数据库,将数据拉入到 Redis 中
插入数据库时候,同时插入 Redis
方案一:实施起来简单,但是有两个注意的地方:
避免缓存击穿(数据库没有需要命中的数据,导致 Redis 一直没有数据,从而一直命中数据库)
数据的实时性相对较差一点
方案二:数据实时性抢,但是开发时候不便于统一处理
当然,不同的实际情况使用不同的场景。方案一适用于对于数据实时性要求不是特别高的场景。方案二适用于字典表、数据量不大的数据存储。
Redis 中可以使用 expire 命令来设置一个键的生时间,到期后 Redis 会删除它,利用这一特性可以运用在现实的优惠活动信息,手机验证码等场景业务。
Redis 由于 incrby 命令可以实现原子性的递增,所以可以运用于高并发的秒杀活动、分布式序列号的生成,具体业务还体现在比如限制一个手机号发多少条信息、一个接口一分钟限制请求多少次,一个接口一天限制调用多少次等等。
这个主要利用 Redis 的 setnx 命令,setnx:”set if not exists” 就是如果不存在则成功设置缓存并且返回 1,否则返回 0,这个特性在很多后台中有所使用。
因为我们服务器是集群的,定时任务可能在两台机器上执行,所以在定时任务中首先通过 setnx 设置一个 lock,如果成功设置就执行,如果失败,就表明该任务正在执行。根据业务场景,我们还可以给这个 lock 加一个过期时间,比如 30 分钟执行一次的定时任务,那么设置这个过期时间小于 30 分钟的某个时间就可以了,具体可根据定时任务的周期以及定时任务执行消耗时间来定。
在分布式锁的场景中,主要用于秒杀系统等。
常见的延时操作就是订单生成之后延时付款。在订单生成后就占用库存,10 分钟后检验用户是否购买,如果没有购买将该订单设置无效,同时还原库存。
由于 Redis 在 2.8.0 之后提供了 Keyspace Notifications 功能,允许客户订阅 Pub/Sub 频道,以便以某种方式接收影响 Redis 的时间。所以以上的需求就可以用以下解决方案,我们在订单产生时,设置一个 key,同时设置 10 分钟后过期,再在后台实现一个监听器,检测到 key 的时效,将 key 失效后的后续逻辑加上。
当然我们也可以用 rabbitmq,activemq 等消息中间件的延时队列服务实现该需求。
关系型数据库在排行榜方面查询速度普遍较慢,所以可以借助 Redis 中的 zSet(sorted set)进行热点数据的排序。
比如点赞排行榜,做一个 SortedSet,然后以用户的 openId 作为上面的 username,以用户的点赞数作为上面的 score,然后针对每一个用户,通过 zrangebyscore 就可以按照点赞数获取排行榜,然后再根据 username 获取用户的 hash 信息。
Redis 利用集合的一些命令,比如求交集、并集、差集等。
在微博应用中,每个用户关注的人都在一个集合中,很容易实现两个人共同好友功能。
由于 Redis 有 List push 和 List pop 这样的命令,所以很方板执行队列操作。

| 结构类型 | 结构存储的值 | 结构的读写能力 |
|---|---|---|
| String 字符串 | 可以是字符串、整数或浮点数 | 对整个字符串或字符串的一部分进行操作;对整数或浮点数进行自增或自减操作; |
| Lit 列表 | 一个链表,链表上的每个节点都包含一个字符串 | 对链表的两端进行 push 和 pop 操作,读取单个或多个元素;根据值查找或删除元素; |
| Set 集合 | 包含字符串的无序集合 | 字符串的集合,包含基础的方法有看是否存在添加、获取、删除;还包含计算交集、并集、差集等 |
| Hash 散列 | 包含键值对的无序散列表 | 包含方法有添加、获取、删除单个元素 |
| Zset 有序集合 | 和散列一样,用于存储键值对 | 字符串成员与浮点数分数之间的有序映射;元素的排列顺序由分数的大小决定;包含方法有添加、获取、删除单个元素以及根据分值范围或成员来获取元素 |
String 是 Redis 中最基本的数据类型,一个 key 对应一个 value
String 类型是二进制安全的,意思是 Redis 的 String 类型可以包含任何数据,如数字,字符串,jpg 图片或者序列化对象。
下图是一个 String 类型的实力,其中键为 hello,值为 world

| 命令 | 简述 | 使用 |
|---|---|---|
| GET | 获取存储在给定键中的值 | GET name |
| SET | 设置存储在给定键中的值 | SET name value |
| DEL | 删除存储在给定键中的值 | DEL name |
| INCR | 将键存储的值加 1 | INCR key |
| DECR | 将键存储的值减 1 | DECR key |
| INCRBY | 将键存储的值加上整数 | INCRBY key amount |
| DECRBY | 将键存储的值减去整数 | DECRBY key amount |
127.0.0.1:6379> set hello world
OK
127.0.0.1:6379> get hello
"world"
127.0.0.1:6379> del hello
(integer) 1
127.0.0.1:6379> get hello
(nil)
127.0.0.1:6379> set counter 2
OK
127.0.0.1:6379> get counter
"2"
127.0.0.1:6379> incr counter
(integer) 3
127.0.0.1:6379> get counter
"3"
127.0.0.1:6379> incrby counter 100
(integer) 103
127.0.0.1:6379> get counter
"103"
127.0.0.1:6379> decr counter
(integer) 102
127.0.0.1:6379> get counter
"102"
缓存:经典使用场景,把常用信息,字符串,图片或者视频等信息放入到 redis 中,redis 作为缓冲层,mysql 作为持久层,降低 mysql 的读写压力
计数器:redis 是单线程模型,一个命令执行完才会执行下一个命令,同时数据库可以一步落地到其他数据源
session:常见的 Spring session + redis 实现 session 共享
Redis 中的 List 其实就是链表(Redis 用双端链表实现 List)
使用 List 结构,我们可以轻松实现最新消息排队功能(比如微博中的 TimeLine)。LIst 的另外一个应用就是消息队列,可以利用 List 的 PUSH 操作,将任务存入到 LIst 中,然后再用 POP 操作将任务去除进行执行。

| 命令 | 简述 | 使用 |
|---|---|---|
| RPUSH | 将给定值推入到列表右端 | RPUSH key value |
| LPUSH | 将给定值推入到列表左端 | LPUSH key value |
| RPOP | 从列表的右端弹出一个值,并返回被弹出的值 | RPOP key |
| LPOP | 从列表的左端弹出一个值,并返回被弹出的值 | LPOP key |
| LRANGE | 获取列表在给定范围上的所有值 | LRANGE key 0 -1 |
| LINDEX | 通过索引获取列表中的元素。你也可以使用负数下标,以 -1 表示列表的最后一个元素,-2 表示列表的倒数第二个元素,以此类推。 | LINDEX key index |
lpush + lpop = Stack(栈)
lpush + rpop = Queue(队列)
lpush + ltrim = Capped Collection(有限集合)
lpush + brpop = Message Queue(消息队列)
127.0.0.1:6379> lpush mylist 1 2 ll ls mem
(integer) 5
127.0.0.1:6379> lrange mylist 0 -1
1) "mem"
2) "ls"
3) "ll"
4) "2"
5) "1"
127.0.0.1:6379> lindex mylist -1
"1"
127.0.0.1:6379> lindex mylist 10 # index不在 mylist 的区间范围内
(nil)
微博 TimeLine:有人发布微博,就用 lpush 加入时间轴,展示新的列表信息
消息队列
Redis 的 set 是 String 类型的无序集合。集合成员是唯一的,这就意味着集合中不能出现重复的数据。

| 命令 | 简述 | 使用 |
|---|---|---|
| SADD | 向集合添加一个或多个成员 | SADD key value |
| SCARD | 获取集合的成员数 | SCARD key |
| SMEMBERS | 返回集合中的所有成员 | SMEMBERS key member |
| SISMEMBER | 判断 member 元素是否是集合 key 的成员 | SISMEMBER key member |
其他一些集合操作,可以参考 这里
127.0.0.1:6379> sadd myset hao hao1 xiaohao hao
(integer) 3
127.0.0.1:6379> smembers myset
1) "xiaohao"
2) "hao1"
3) "hao"
127.0.0.1:6379> sismember myset hao
(integer) 1
标签(tag):给用户添加标签,或者用户给消息添加标签,这样有同一个标签或者类似标签的可以推送给关注的事或者人
点赞或点踩,收藏等,可以放到 set 中实现
Redis hash 是一个 String 类型的 field(字段)和 value(值)的映射表,hash 特别适合用于存储对象。

| 命令 | 简述 | 使用 |
|---|---|---|
| HSET | 添加键值对 | HSET hash-key sub-key1 value1 |
| HGET | 获取指定散列键的值 | HGET hash-key key1 |
| HGETALL | 获取散列中包含的所有键值对 | HGETALL hash-key |
| HDEL | 如果给定键存在于散列中,那么就移除这个键 | HDEL hash-key sub-key1 |
127.0.0.1:6379> hset user name1 hao
(integer) 1
127.0.0.1:6379> hset user email1 hao@163.com
(integer) 1
127.0.0.1:6379> hgetall user
1) "name1"
2) "hao"
3) "email1"
4) "hao@163.com"
127.0.0.1:6379> hget user user
(nil)
127.0.0.1:6379> hget user name1
"hao"
127.0.0.1:6379> hset user name2 xiaohao
(integer) 1
127.0.0.1:6379> hset user email2 xiaohao@163.com
(integer) 1
127.0.0.1:6379> hgetall user
1) "name1"
2) "hao"
3) "email1"
4) "hao@163.com"
5) "name2"
6) "xiaohao"
7) "email2"
8) "xiaohao@163.com"
Redis 有序集合和集合一样也是 String 类型元素的集合,且不允许重复的成员。不同的是每个元素都会关联一个 double 类型的分数。redis 通过分数来为集合中的成员中进行从小到大的排序。
有序集合的成员是唯一的,但是分数(score)可以重复。有序集合是通过两种数据接口实现:
压缩列表(ziplist):ziplist 是为了提高存储效率而设计的一种特殊编码的双向链表。它可以存储字符串或者整数,存储整数时是采用整数的二进制而不是字符串形式存储。他能在 O(1) 的时间复杂度下完成 list 两端的 push 和 pop 操作。但是因为每次操作都需要重新分配 ziplist 内容,所以实际复杂度和 ziplist 的内存使用量有关
跳跃表(zSkiplist):跳跃表的性能可以保证在查找,删除,添加等操作时候在对数期望时间内完成,这个性能可以和平衡树相比较的,而且实现方面比平衡树要优雅。这也是采用跳跃表的主要原因。跳跃表的复杂度是 O(log(n))。

| 命令 | 简述 | 使用 |
|---|---|---|
| ZADD | 将一个带有给定分值的成员添加到有序集合里面 | ZADD zset-key 178 member1 |
| ZRANGE | 根据元素在有序集合中所处的位置,从有序集合中获取多个元素 | ZRANGE zset-key 0-1 withccores |
| ZREM | 如果给定元素成员存在于有序集合中,那么就移除这个元素 | ZREM zset-key member1 |
更多命令请参考 这里
127.0.0.1:6379> zadd myscoreset 100 hao 90 xiaohao
(integer) 2
127.0.0.1:6379> ZRANGE myscoreset 0 -1
1) "xiaohao"
2) "hao"
127.0.0.1:6379> ZSCORE myscoreset hao
"100"
举个例子,A =「1,2,3,4,5」,B =「3,5,6,7,9」;那么基数就是不重复的元素 = 1,2,4,6,7,9;(允许一定范围内的误差)
这个数据结构可以非常节省内存的去统计各种计数,比如注册的 IP 数,每日访问 IP 数,页面实时 UV,在线人数,共同好友等;
一个大型的网站,每天访问的 IP 是 100w,粗略计算一个 IP 消耗 15 字节,那么 100w 个 IP 就是 15M。而 HyperLogLogs 在 Redis 中的每个键占用内容是 12k,理论上存储近似 2^64 个值。这个值是有 0.81% 的标准错误的近似值。
127.0.0.1:6379> pfadd key1 a b c d e f g h i # 创建第一组元素
(integer) 1
127.0.0.1:6379> pfcount key1 # 统计元素的基数数量
(integer) 9
127.0.0.1:6379> pfadd key2 c j k l m e g a # 创建第二组元素
(integer) 1
127.0.0.1:6379> pfcount key2
(integer) 8
127.0.0.1:6379> pfmerge key3 key1 key2 # 合并两组:key1 key2 -> key3 并集
OK
127.0.0.1:6379> pfcount key3
(integer) 13
Bitmap 即位图数据结构,都是操作二进制数来进行记录,只有 0 和 1 两种状态。
比如:统计用户信息,登录!不登录!活跃!不活跃!打卡!不打卡!只要是两种状态都可以用 Bitmap 来存储。
比如存储一年的打卡状态需要多少内存呢?365 天 = 356bit 1 字节 = 8bit,相当于 46 个字节左右!
使用 bitmap 来记录 周一到周日的打卡!周一:1 周二:0 周三:0 周四:1 ……
127.0.0.1:6379> setbit sign 0 1
(integer) 0
127.0.0.1:6379> setbit sign 1 1
(integer) 0
127.0.0.1:6379> setbit sign 2 0
(integer) 0
127.0.0.1:6379> setbit sign 3 1
(integer) 0
127.0.0.1:6379> setbit sign 4 0
(integer) 0
127.0.0.1:6379> setbit sign 5 0
(integer) 0
127.0.0.1:6379> setbit sign 6 1
(integer) 0
查看某天是否打卡!
127.0.0.1:6379> getbit sign 3
(integer) 1
127.0.0.1:6379> getbit sign 5
(integer) 0
统计操作,统计打卡次数
127.0.0.1:6379> bitcount sign # 统计这周的打卡记录,就可以看到是否有全勤!
(integer) 3
Redis 的 Geo 在 3.2 版本推出,这个功能可以推算出地理位置的信息:两地之间的距离,方圆几里的人
127.0.0.1:6379> geoadd china:city 118.76 32.04 manjing 112.55 37.86 taiyuan 123.43 41.80 shenyang
(integer) 3
127.0.0.1:6379> geoadd china:city 144.05 22.52 shengzhen 120.16 30.24 hangzhou 108.96 34.26 xian
(integer) 3
规则
两级无法直接添加,我们一般会下载城市数据 (这个网址可以查询 GEO:http://www.jsons.cn/lngcode)!
有效的经度从 -180 度到 180 度。
有效的纬度从 -85.05112878 度到 85.05112878 度。
# 当坐标位置超出上述指定范围时,该命令将会返回一个错误。
127.0.0.1:6379> geoadd china:city 39.90 116.40 beijin
(error) ERR invalid longitude,latitude pair 39.900000,116.400000
127.0.0.1:6379> geopos china:city taiyuan manjing
1) 1) "112.54999905824661255"
2) "37.86000073876942196"
3) 1) "118.75999957323074341"
4) "32.03999960287850968"
获得当前定位, 一定是一个坐标值!
单位:m,km,mi(英里),ft(英尺)
127.0.0.1:6379> geodist china:city taiyuan shenyang m
"1026439.1070"
127.0.0.1:6379> geodist china:city taiyuan shenyang km
"1026.4391"
获得指定数量的人
127.0.0.1:6379> georadius china:city 110 30 1000 km #以110,30 这个坐标为中心,寻找半径为1000km的城市
1) "xian"
2) "hangzhou"
3) "manjing"
4) "taiyuan"
127.0.0.1:6379> georadius china:city 110 30 500 km
1) "xian"
127.0.0.1:6379> georadius china:city 110 30 500 km withdist
1) 1) "xian"
2) "483.8340"
127.0.0.1:6379> georadius china:city 110 30 1000 km withcoord withdist count 2
1) 1) "xian"
2) "483.8340"
3) 1) "108.96000176668167114"
4) "34.25999964418929977"
5) 1) "manjing"
6) "864.9816"
7) 1) "118.75999957323074341"
8) "32.03999960287850968"
127.0.0.1:6379> georadiusbymember china:city taiyuan 1000 km
1) "manjing"
2) "taiyuan"
3) "xian"
127.0.0.1:6379> georadiusbymember china:city taiyuan 1000 km withcoord withdist count 2
1) 1) "taiyuan"
2) "0.0000"
3) 1) "112.54999905824661255"
4) "37.86000073876942196"
5) 1) "xian"
6) "514.2264"
7) 1) "108.96000176668167114"
8) "34.25999964418929977"
参数与 georadius 一样
127.0.0.1:6379> geohash china:city taiyuan shenyang
1) "ww8p3hhqmp0"
2) "wxrvb9qyxk0"
如果这两个字符串差值越小,则距离越近
geo 底层的视线原理实际上就是 Zset,我们可以通过 Zset 的命令来操作 geo
127.0.0.1:6379> type china:city
zset
查看全部元素、删除指定元素
127.0.0.1:6379> zrange china:city 0 -1 withscores
1) "xian"
2) "4040115445396757"
3) "hangzhou"
4) "4054133997236782"
5) "manjing"
6) "4066006694128997"
7) "taiyuan"
8) "4068216047500484"
9) "shenyang"
10) "4072519231994779"
11) "shengzhen"
12) "4154606886655324"
127.0.0.1:6379> zrem china:city manjing
(integer) 1
127.0.0.1:6379> zrange china:city 0 -1
1) "xian"
2) "hangzhou"
3) "taiyuan"
4) "shenyang"
5) "shengzhen"
Redis 5.0 添加了一种数据类型 Stream 数据类型,它借鉴了 Kafka 的设计,是一个新的强大的支持多播的可持久化的消息队列。
用过 Redis 做消息队列的都了解,基于 Redis 的消息队列实现有很多种,例如:
PUB/SUB,订阅/发布模式
但是发布订阅模式无法持久化,如果出现网络断开,Redis 宕机等,消息会被丢弃;
基于 List LPUSH+BRPOP 或者基于 Sorted-Set 的实现
只吃了持久化,但是不能多播,分组消费等
那么,我们就需要考虑一下满足消息队列的数据类型到底需要什么特性?
消息的生产
消息的消费
单播和多播(多对多)
阻塞和非阻塞读取
消息有序化
消息的持久化
还需要考虑其他什么特性,可以借助美团技术团队的文章,消息队列设计精要

从上面可以看出,Stream 的出现很有必要了。那么实际上 Stream 能满足上述那些要求呢?
消息 ID 的序列化生成
消息遍历
消息的阻塞和非阻塞读取
消息的分组消费
未完成消息的处理
消息队列的监控
…
每一个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 命令追加消息时自动创建。

上图解析:
Consumer group:消费组,使用 xgroup create 命令创建,一个消费组有多个消费者(Consumer),这些消费者是竞争关系。
last_delivered_id:游标,每个消费组会有一个游标 last_delivered_id,任意一个消费者读取了消息都会使游标往前移动。
pending_ids:消费者(Consumer)的状态变量,作用是维护消费者的未确认的 ID。pending_ids 记录了当前被客户端读取的消息,但是还没有 ack(Acknowledge character:确认字符)。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 中称为 PEL,也就是 Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了一次消息,而不会出现网络传输的中途丢失了没处理。
此外,我们还需要理解两点:
消息ID:消息 ID 的形式是 timestampInmillis-sequence,例如:1527846880527-5,他表示在 1527846880527 时刻产生的第 5 条消息。消息 ID 可以由服务器自动生成,也可以客户端自己指定。但是形式必须是整数 - 整数,而且必须后面的消息 ID 大于前面的消息 ID。
消息内容:消息内容就是键值对,形如 hash 结构的键值对,没有特别之处。
消息队列的相关命令:
XADD - 添加消息到末尾;
XTRIM - 对流进行修剪,限制长度;
XDEL - 删除消息;
XLEN - 获取流包含的元素数量,即消息长度;
XRANGE - 获取消息列表,会自动过滤已经删除的消息;
XREVRANGE - 反向获取消息列表,ID 从大到校
XREAD - 以阻塞或者非阻塞方式获取消息列表
# *号表示服务器自动生成ID,后面书序跟着一堆key/value
127.0.0.1:6379> xadd codehole * name laoqian age 30 #名字叫laoqian,年龄30岁
1527849609889-0 # 生成的消息ID
127.0.0.1:6379> xadd codehole * name xiaoyu age 29
1527849629172-0
127.0.0.1:6379> xadd codehole * name xiaoqian age 1
1527849637634-0
127.0.0.1:6379> xlen codehole
(integer) 3
127.0.0.1:6379> xrange codehole - + # -表示最小值,+表示最大值
1) 1) 1527849609889-0
2) 1) "name"
3) "laoqian"
4) "age"
5) "30"
6) 1) 1527849629172-0
7) 1) "name"
8) "xiaoyu"
9) "age"
10) "29"
11) 1) 1527849637634-0
12) 1) "name"
13) "xiaoqian"
14) "age"
15) "1"
127.0.0.1:6379> xrange codehole 1527849629172-0 + # 指定最小消息ID的列表
1) 1) 1527849629172-0
2) 1) "name"
3) "xiaoyu"
4) "age"
5) "29"
6) 1) 1527849637634-0
7) 1) "name"
8) "xiaoqian"
9) "age"
10) "1"
127.0.0.1:6379> xrange codehole - 1527849629172-0 # 指定最大消息ID的列表
1) 1) 1527849609889-0
2) 1) "name"
3) "laoqian"
4) "age"
5) "30"
6) 1) 1527849629172-0
7) 1) "name"
8) "xiaoyu"
9) "age"
10) "29"
127.0.0.1:6379> xdel codehole 1527849609889-0
(integer) 1
127.0.0.1:6379> xlen codehole # 长度不受影响
(integer) 3
127.0.0.1:6379> xrange codehole - + # 被删除的消息没了
1) 1) 1527849629172-0
2) 1) "name"
3) "xiaoyu"
4) "age"
5) "29"
6) 1) 1527849637634-0
7) 1) "name"
8) "xiaoqian"
9) "age"
10) "1"
127.0.0.1:6379> del codehole # 删除整个Stream
(integer) 1
我们可以再不定义消费组的情况下进行 Stream 消息的独立消费,当 Stream 没有新消息是,甚至可以阻塞等待。Redis 设计了一个单独的消费指令 xread,可以将 Stream 当成普通的消息队列(list)来使用。使用 xread 时,我们可以完全忽略消费组(Cosumer Group)的存在,就好比 Stream 是一个普通的列表(list)。
# 从Stream头部读取两条消息
127.0.0.1:6379> xread count 2 streams codehole 0-0
1) 1) "codehole"
2) 1) 1) 1527851486781-0
3) 1) "name"
4) "laoqian"
5) "age"
6) "30"
7) 1) 1527851493405-0
8) 1) "name"
9) "yurui"
10) "age"
11) "29"
# 从Stream尾部读取一条消息,毫无疑问,这里不会返回任何消息
127.0.0.1:6379> xread count 1 streams codehole $
(nil)
# 从尾部阻塞等待新消息到来,下面的指令会堵住,直到新消息到来
127.0.0.1:6379> xread block 0 count 1 streams codehole $
# 我们从新打开一个窗口,在这个窗口往Stream里塞消息
127.0.0.1:6379> xadd codehole * name youming age 60
1527852774092-0
# 再切换到前面的窗口,我们可以看到阻塞解除了,返回了新的消息内容
# 而且还显示了一个等待时间,这里我们等待了93s
127.0.0.1:6379> xread block 0 count 1 streams codehole $
1) 1) "codehole"
2) 1) 1) 1527852774092-0
3) 1) "name"
4) "youming"
5) "age"
6) "60"
(93.11s)
客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪了,也就是返回的消息 ID,下次继续调用 xread 时候,将上次最后一个 ID 传入,就可以继续消费后续的消息。
block 0 代表永远阻塞,直到消息到来,block 1000 表示阻塞 1s,如果 1s 内没有消息,则返回 nil。
127.0.0.1:6379> xread block 1000 count 1 streams codehole $
(nil)
(1.07s)

相关命令:
XGROUP CREATE - 创建消费者组
XREADGROUP GROUP - 读取消费者组中的消息
XACK - 将消息标记为 “ 已处理 “
XGROUP SETID - 为消费者组设置新的最后递送消息 ID
XGROUP DELCONSUMER - 删除消费者
XGROUP DESTROY - 删除消费者组
XPENDING - 显示待处理消息的相关信息
XCLAIM - 转移消息的归属权
XINFO - 查看流和消费者组的相关信息;
XINFO GROUPS - 打印消费者组的信息;
XINFO STREAM - 打印流信息
创建消费组
Stream 通过 xgroup create 指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。
127.0.0.1:6379> xgroup create codehole cg1 0-0 # 表示从头开始消费
OK
# $表示从尾部开始消费,只接受新消息,当前Stream消息会全部忽略
127.0.0.1:6379> xgroup create codehole cg2 $
OK
127.0.0.1:6379> xinfo stream codehole # 获取Stream信息
1) length
2) (integer) 3 # 共3个消息
3) radix-tree-keys
4) (integer) 1
5) radix-tree-nodes
6) (integer) 2
7) groups
8) (integer) 2 # 两个消费组
9) first-entry # 第一个消息
10) 1) 1527851486781-0
11) 1) "name"
12) "laoqian"
13) "age"
14) "30"
15) last-entry # 最后一个消息
16) 1) 1527851498956-0
17) 1) "name"
18) "xiaoqian"
19) "age"
20) "1"
127.0.0.1:6379> xinfo groups codehole # 获取Stream的消费组信息
1) 1) name
2) "cg1"
3) consumers
4) (integer) 0 # 该消费组还没有消费者
5) pending
6) (integer) 0 # 该消费组没有正在处理的消息
7) 1) name
8) "cg2"
9) consumers # 该消费组还没有消费者
10) (integer) 0
11) pending
12) (integer) 0 # 该消费组没有正在处理的消息
Stream 提供了 xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID。它同 xread 一样,也可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack 指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。
# >号表示从当前消费组的last_delivered_id后面开始读
# 每当消费者读取一条消息,last_delivered_id变量就会前进
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
1) 1) "codehole"
2) 1) 1) 1527851486781-0
3) 1) "name"
4) "laoqian"
5) "age"
6) "30"
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
1) 1) "codehole"
2) 1) 1) 1527851493405-0
3) 1) "name"
4) "yurui"
5) "age"
6) "29"
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 2 streams codehole >
1) 1) "codehole"
2) 1) 1) 1527851498956-0
3) 1) "name"
4) "xiaoqian"
5) "age"
6) "1"
7) 1) 1527852774092-0
8) 1) "name"
9) "youming"
10) "age"
11) "60"
# 再继续读取,就没有新消息了
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
(nil)
# 那就阻塞等待吧
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
# 开启另一个窗口,往里塞消息
127.0.0.1:6379> xadd codehole * name lanying age 61
1527854062442-0
# 回到前一个窗口,发现阻塞解除,收到新消息了
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
1) 1) "codehole"
2) 1) 1) 1527854062442-0
3) 1) "name"
4) "lanying"
5) "age"
6) "61"
(36.54s)
127.0.0.1:6379> xinfo groups codehole # 观察消费组信息
1) 1) name
2) "cg1"
3) consumers
4) (integer) 1 # 一个消费者
5) pending
6) (integer) 5 # 共5条正在处理的信息还有没有ack
7) 1) name
8) "cg2"
9) consumers
10) (integer) 0 # 消费组cg2没有任何变化,因为前面我们一直在操纵cg1
11) pending
12) (integer) 0
# 如果同一个消费组有多个消费者,我们可以通过xinfo consumers指令观察每个消费者的状态
127.0.0.1:6379> xinfo consumers codehole cg1 # 目前还有1个消费者
1) 1) name
2) "c1"
3) pending
4) (integer) 5 # 共5条待处理消息
5) idle
6) (integer) 418715 # 空闲了多长时间ms没有读取消息了
# 接下来我们ack一条消息
127.0.0.1:6379> xack codehole cg1 1527851486781-0
(integer) 1
127.0.0.1:6379> xinfo consumers codehole cg1
1) 1) name
2) "c1"
3) pending
4) (integer) 4 # 变成了5条
5) idle
6) (integer) 668504
# 下面ack所有消息
127.0.0.1:6379> xack codehole cg1 1527851493405-0 1527851498956-0 1527852774092-0 1527854062442-0
(integer) 4
127.0.0.1:6379> xinfo consumers codehole cg1
1) 1) name
2) "c1"
3) pending
4) (integer) 0 # pel空了
5) idle
6) (integer) 745505
Stream 提供了 XINFO 用来实现对服务器信息的监控,可以查询:
127.0.0.1:6379> Xinfo stream mq
1) "length"
2) (integer) 7
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "groups"
8) (integer) 1
9) "last-generated-id"
10) "1553585533795-9"
11) "first-entry"
12) 1) "1553585533795-3"
13) 1) "msg"
14) "4"
15) "last-entry"
16) 1) "1553585533795-9"
17) 1) "msg"
18) "10"
127.0.0.1:6379> Xinfo groups mq
1) 1) "name"
2) "mqGroup"
3) "consumers"
4) (integer) 3
5) "pending"
6) (integer) 3
7) "last-delivered-id"
8) "1553585533795-4"
127.0.0.1:6379> XINFO CONSUMERS mq mqGroup
1) 1) "name"
2) "consumerA"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 18949894
7) 1) "name"
8) "consumerB"
9) "pending"
10) (integer) 1
11) "idle"
12) (integer) 3092719
13) 1) "name"
14) "consumerC"
15) "pending"
16) (integer) 1
17) "idle"
18) (integer) 23683256
至此,消息队列的操作说明大体结束!
可用作时通信等,大数据分析,异地数据备份等

客户端可以平滑扩展,提高处理能力

在 分布式算法 - ID算法 设计中, 一个常见的问题就是时间回拨问题,那么 Redis 的消息 ID 设计中是否考虑到这个问题呢?
XADD 生成的 1553439850328-0,就是 Redis 生成的消息 ID,由两部分组成:时间戳 - 序号。时间戳是毫秒级单位,是生成消息的 Redis 服务器时间,它是个 64 位整型(int64)。序号是在这个毫秒时间点内的消息序号,它也是个 64 位整型。
可以通过 multi 批处理,来验证序号的递增:
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> XADD memberMessage * msg one
QUEUED
127.0.0.1:6379> XADD memberMessage * msg two
QUEUED
127.0.0.1:6379> XADD memberMessage * msg three
QUEUED
127.0.0.1:6379> XADD memberMessage * msg four
QUEUED
127.0.0.1:6379> XADD memberMessage * msg five
QUEUED
127.0.0.1:6379> EXEC
1) "1553441006884-0"
2) "1553441006884-1"
3) "1553441006884-2"
4) "1553441006884-3"
5) "1553441006884-4"
由于一个 redis 命令的执行很快,所以可以看到在同一时间戳内,是通过序号递增来表示消息的。
为了保证消息是有序的,因此 Redis 生成的 ID 是单调递增有序的。由于 ID 中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis 的每个 Stream 类型数据都维护一个 latest_generated_id 属性,用于记录最后一个消息的 ID。若发现当前时间戳退后(小于 latest_generated_id 所记录的),则采用时间戳不变而序号递增的方案来作为新消息 ID(这也是序号为什么使用 int64 的原因,保证有足够多的的序号),从而保证 ID 的单调递增性质。
强烈建议使用 Redis 的方案生成消息 ID,因为这种时间戳 + 序号的单调递增的 ID 方案,几乎可以满足你全部的需求。但同时,记住 ID 是支持自定义的,别忘了!
为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。命令 XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息。演示如下:
127.0.0.1:6379> XPENDING mq mqGroup # mpGroup的Pending情况
1) (integer) 5 # 5个已读取但未处理的消息
2) "1553585533795-0" # 起始ID
3) "1553585533795-4" # 结束ID
4) 1) 1) "consumerA" # 消费者A有3个
5) "3"
6) 1) "consumerB" # 消费者B有1个
7) "1"
8) 1) "consumerC" # 消费者C有1个
9) "1"
127.0.0.1:6379> XPENDING mq mqGroup - + 10 # 使用 start end count 选项可以获取详细信息
1) 1) "1553585533795-0" # 消息ID
2) "consumerA" # 消费者
3) (integer) 1654355 # 从读取到现在经历了1654355ms,IDLE
4) (integer) 5 # 消息被读取了5次,delivery counter
5) 1) "1553585533795-1"
6) "consumerA"
7) (integer) 1654355
8) (integer) 4
# 共5个,余下3个省略 ...
127.0.0.1:6379> XPENDING mq mqGroup - + 10 consumerA # 在加上消费者参数,获取具体某个消费者的Pending列表
1) 1) "1553585533795-0"
2) "consumerA"
3) (integer) 1641083
4) (integer) 5
# 共3个,余下2个省略 ...
每个 Pending 的消息有 4 个属性:
消息 ID
所属消费者
IDLE,已读取时长
delivery counter,消息被读取次数
上面的结果我们可以看到,我们之前读取的消息,都被记录在 Pending 列表中,说明全部读到的消息都没有处理,仅仅是读取了。那如何表示消费者处理完毕了消息呢?使用命令 XACK 完成告知消息处理完成,演示如下:
127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 # 通知消息处理结束,用消息ID标识
(integer) 1
127.0.0.1:6379> XPENDING mq mqGroup # 再次查看Pending列表
1) (integer) 4 # 已读取但未处理的消息已经变为4个
2) "1553585533795-1"
3) "1553585533795-4"
4) 1) 1) "consumerA" # 消费者A,还有2个消息处理
5) "2"
6) 1) "consumerB"
7) "1"
8) 1) "consumerC"
9) "1"
127.0.0.1:6379>
有了这样一个 Pending 机制,就意味着在某个消费者读取消息但未处理后,消息是不会丢失的。等待消费者再次上线后,可以读取该 Pending 列表,就可以继续处理该消息了,保证消息的有序和不丢失。
还有一个问题,就是若某个消费者宕机之后,没有办法再上线了,那么就需要将该消费者 Pending 的消息,转义给其他的消费者处理,就是消息转移。
消息转移的操作时将某个消息转移到自己的 Pending 列表中。使用语法 XCLAIM 来实现,需要设置组、转移的目标消费者和消息 ID,同时需要提供 IDLE(已被读取时长),只有超过这个时长,才能被转移。演示如下:
# 当前属于消费者A的消息1553585533795-1,已经15907,787ms未处理了
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"
2) "consumerA"
3) (integer) 15907787
4) (integer) 4
# 转移超过3600s的消息1553585533795-1到消费者B的Pending列表
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
1) 1) "1553585533795-1"
2) 1) "msg"
3) "2"
# 消息1553585533795-1已经转移到消费者B的Pending中。
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"
2) "consumerB"
3) (integer) 84404 # 注意IDLE,被重置了
4) (integer) 5 # 注意,读取次数也累加了1次
以上代码,完成了一次消息转移。转移除了要指定 ID 外,还需要指定 IDLE,保证是长时间未处理的才被转移。被转移的消息的 IDLE 会被重置,用以保证不会被重复转移,以为可能会出现将过期的消息同时转移给多个消费者的并发操作,设置了 IDLE,则可以避免后面的转移不会成功,因为 IDLE 不满足条件。例如下面的连续两条转移,第二条不会成功。
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1
这就是消息转移。至此我们使用了一个 Pending 消息的 ID,所属消费者和 IDLE 的属性,还有一个属性就是消息被读取次数,delivery counter,该属性的作用由于统计消息被读取的次数,包括被转移也算。这个属性主要用在判定是否为错误数据上。
正如上面所说,如果某个消息,不能被消费者处理,也就是不能被 XACK,这是要长时间处于 Pending 列表中,即使被反复的转移给各个消费者也是如此。此时该消息的 delivery counter 就会累加(上一节的例子可以看到),当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息),由于有了判定条件,我们将坏消息处理掉即可,删除即可。删除一个消息,使用 XDEL 语法,演示如下:
# 删除队列中的消息
127.0.0.1:6379> XDEL mq 1553585533795-1
(integer) 1
# 查看队列中再无此消息
127.0.0.1:6379> XRANGE mq - +
1) 1) "1553585533795-0"
2) 1) "msg"
3) "1"
4) 1) "1553585533795-2"
5) 1) "msg"
6) "3"
注意本例中,并没有删除 Pending 中的消息因此你查看 Pending,消息还会在。可以执行 XACK 标识其处理完毕!