最近工作中使用到了消息中間件,另外一個(gè)組的同事經(jīng)過(guò)評(píng)估選擇了Redis stream作為最終選擇。我自己寫(xiě)的性能測(cè)試框架自然也需要接入這套消息系統(tǒng)。所以我也抓緊學(xué)習(xí)起來(lái)。
Redis Stream 是 Redis 5.0 版本新增加的數(shù)據(jù)結(jié)構(gòu)。 Redis Stream 主要用于消息隊(duì)列(MQ,Message Queue),Redis 本身是有一個(gè) Redis 發(fā)布訂閱 (pub/sub) 來(lái)實(shí)現(xiàn)消息隊(duì)列的功能,但它有個(gè)缺點(diǎn)就是消息無(wú)法持久化,如果出現(xiàn)網(wǎng)絡(luò)斷開(kāi)、Redis 宕機(jī)等,消息就會(huì)被丟棄。
之前還沒(méi)發(fā)現(xiàn)Redis還有這種使用方法,著實(shí)有點(diǎn)少見(jiàn)過(guò)怪了。照例我后面會(huì)進(jìn)行一些基本功能的性能測(cè)試,下面分享基本功能的使用演示。
準(zhǔn)備工作
依賴(lài)
如果想自己操作的話,請(qǐng)注意這個(gè)版本,因?yàn)樵谖艺屹Y料的過(guò)程中發(fā)現(xiàn),不同版本的API有不少的差異,算是踩了一些坑。如果你使用其他版本的redis.clients遇到代碼無(wú)法運(yùn)行的時(shí)候,可以直接翻看源碼查看相關(guān)參數(shù)類(lèi)型。
Maven依賴(lài):
redis.clients jedis 4.2.3
Gradle依賴(lài):
// https://mvnrepository.com/artifact/redis.clients/jedisimplementation group: ‘redis.clients’, name: ‘jedis’, version: ‘4.2.3’
Redis server版本:Redis 6.2.5。
XADD – 添加消息到末尾
如果key對(duì)應(yīng)的隊(duì)列不存在,則會(huì)自動(dòng)創(chuàng)建。
首先我們需要?jiǎng)?chuàng)建一個(gè)redis.clients.jedis.params.XAddParams,顧名思義就是查詢(xún)參數(shù),這里面有重要的參數(shù):redis.clients.jedis.params.XAddParams#maxLen表示設(shè)置隊(duì)列的長(zhǎng)度,但是不常用。語(yǔ)法如下:
def len = XAddParams.xAddParams()
xadd API使用方式如下:
public static void main(String[] args) { def base = new RedisBase(“127.0.0.1”, 6379) Jedis jedis = base.getJedis() def len = XAddParams.xAddParams() def map = new HashMap() map.put(“FunTester”, Time.getDate() + TAB + 325) jedis.xadd(“fun”, len, map) jedis.close() }
XTRIM – 對(duì)流進(jìn)行修剪,限制長(zhǎng)度
這個(gè)API就是設(shè)置隊(duì)列長(zhǎng)度。使用方式也非常簡(jiǎn)單。
public static void main(String[] args) { def base = new RedisBase(“127.0.0.1”, 6379) Jedis jedis = base.getJedis() def xtrim = jedis.xtrim(“fun”, XTrimParams.xTrimParams().maxLen(10)) output(xtrim) jedis.close() }
返回值是丟棄的消息的數(shù)量。
XDEL – 刪除消息
這個(gè)就是刪除某個(gè)消息,使用更簡(jiǎn)單了。
public static void main(String[] args) { def base = new RedisBase(“127.0.0.1”, 6379) Jedis jedis = base.getJedis() jedis.xdel(“fun”,new StreamEntryID(1653129389004,1)) jedis.close() }
XLEN – 獲取流包含的元素?cái)?shù)量,即消息長(zhǎng)度
話不多說(shuō)了,使用如下:
jedis.xlen(“fun”)
XREAD – 以阻塞或非阻塞方式獲取消息列表
這個(gè)要著重介紹一下,因?yàn)槲矣玫木褪沁@個(gè),首先我們需要?jiǎng)?chuàng)建一個(gè)redis.clients.jedis.params.XReadParams,這里有兩個(gè)參數(shù):redis.clients.jedis.params.XReadParams#count和redis.clients.jedis.params.XReadParams#block。前者控制返回?cái)?shù)量,后者控制阻塞時(shí)間,如果時(shí)間小于0則認(rèn)為不阻塞,等于0則一直會(huì)阻塞,小于0會(huì)報(bào)錯(cuò)。不設(shè)置該參數(shù)責(zé)任無(wú)非阻塞模式。PS:數(shù)量不足不會(huì)造成阻塞。示例如下:
def block = XReadParams.xReadParams().count(3).block(1000)
還有我們需要redis.clients.jedis.Jedis#xread(redis.clients.jedis.params.XReadParams, java.util.Map)第二個(gè)參數(shù),這里常用的兩種:
Map entry = [“fun”: new StreamEntryID()]//獲取歷史消息 Map entry = [“fun”: StreamEntryID.LAST_ENTRY]//獲取在請(qǐng)求之后添加的消息
遍歷消息:
List<Map.Entry> xread = jedis.xread(block, entry) output(xread.size()) Map.Entry get = xread.get(0) def value = get.getValue() value.each { println(it.getID()) println(it.getFields().get(“FunTester”)) }
控制臺(tái)響應(yīng)如下:
16:40:56.065 main redis連接池IP:127.0.0.1,端口:6379,超時(shí)設(shè)置:500016:40:56.280 main 11653725282325-02022-05-28 16:08:02 3251653725282325-12022-05-28 16:08:02 3251653725282325-22022-05-28 16:08:02 325
XRANGE – 獲取消息列表,會(huì)自動(dòng)過(guò)濾已經(jīng)刪除的消息
這個(gè)API獲取某個(gè)范圍內(nèi)的消息,有個(gè)start和end的參數(shù),可以傳String類(lèi)型的消息ID,也可以傳redis.clients.jedis.StreamEntryID,方法重載的比較多,有興趣可以翻一翻源碼。
jedis.xrange(“fun”, “1653129389045-0”, “1653129389047-0”)
后面會(huì)對(duì)Redis stream API進(jìn)行性能測(cè)試,歡迎繼續(xù)關(guān)注FunTester。