数据库 发布日期:2025/1/2 浏览次数:1
本系列将和大家分享Redis分布式缓存,本章主要简单介绍下Redis中的List类型,以及如何使用Redis解决博客数据分页、生产者消费者模型和发布订阅等问题。
Redis List的实现为一个双向链表,即可以支持反向查找和遍历,更方便操作,不过带来了部分额外的内存开销,Redis内部的很多实现,包括发送缓冲队列等也都是用这个数据结构。
List类型主要用于队列和栈,先进先出,后进先出等。
存储形式:key--LinkList<value>
首先先给大家Show一波Redis中与List类型相关的API:
using System; using System.Collections.Generic; using ServiceStack.Redis; namespace TianYa.Redis.Service { /// <summary> /// Redis List的实现为一个双向链表,即可以支持反向查找和遍历,更方便操作,不过带来了部分额外的内存开销, /// Redis内部的很多实现,包括发送缓冲队列等也都是用这个数据结构。 /// </summary> public class RedisListService : RedisBase { #region Queue队列(先进先出) /// <summary> /// 入队 /// </summary> /// <param name="listId">集合Id</param> /// <param name="value">入队的值</param> public void EnqueueItemOnList(string listId, string value) { base._redisClient.EnqueueItemOnList(listId, value); } /// <summary> /// 出队 /// </summary> /// <param name="listId">集合Id</param> /// <returns>出队的值</returns> public string DequeueItemFromList(string listId) { return base._redisClient.DequeueItemFromList(listId); } /// <summary> /// 出队(阻塞) /// </summary> /// <param name="listId">集合Id</param> /// <param name="timeOut">阻塞时间(超时时间)</param> /// <returns>出队的值</returns> public string BlockingDequeueItemFromList(string listId, TimeSpan"listIds">集合Id</param> /// <param name="timeOut">阻塞时间(超时时间)</param> /// <returns>返回出队的 listId & Item</returns> public ItemRef BlockingDequeueItemFromLists(string[] listIds, TimeSpan"listId">集合Id</param> /// <param name="value">入栈的值</param> public void PushItemToList(string listId, string value) { base._redisClient.PushItemToList(listId, value); } /// <summary> /// 入栈,并设置过期时间 /// </summary> /// <param name="listId">集合Id</param> /// <param name="value">入栈的值</param> /// <param name="expireAt">过期时间</param> public void PushItemToList(string listId, string value, DateTime expireAt) { base._redisClient.PushItemToList(listId, value); base._redisClient.ExpireEntryAt(listId, expireAt); } /// <summary> /// 入栈,并设置过期时间 /// </summary> /// <param name="listId">集合Id</param> /// <param name="value">入栈的值</param> /// <param name="expireIn">过期时间</param> public void PushItemToList(string listId, string value, TimeSpan expireIn) { base._redisClient.PushItemToList(listId, value); base._redisClient.ExpireEntryIn(listId, expireIn); } /// <summary> /// 出栈 /// </summary> /// <param name="listId">集合Id</param> /// <returns>出栈的值</returns> public string PopItemFromList(string listId) { return base._redisClient.PopItemFromList(listId); } /// <summary> /// 出栈(阻塞) /// </summary> /// <param name="listId">集合Id</param> /// <param name="timeOut">阻塞时间(超时时间)</param> /// <returns>出栈的值</returns> public string BlockingPopItemFromList(string listId, TimeSpan"listIds">集合Id</param> /// <param name="timeOut">阻塞时间(超时时间)</param> /// <returns>返回出栈的 listId & Item</returns> public ItemRef BlockingPopItemFromLists(string[] listIds, TimeSpan"fromListId">出栈集合Id</param> /// <param name="toListId">入栈集合Id</param> /// <returns>返回移动的值</returns> public string PopAndPushItemBetweenLists(string fromListId, string toListId) { return base._redisClient.PopAndPushItemBetweenLists(fromListId, toListId); } /// <summary> /// 从fromListId集合出栈并入栈到toListId集合(阻塞) /// </summary> /// <param name="fromListId">出栈集合Id</param> /// <param name="toListId">入栈集合Id</param> /// <param name="timeOut">阻塞时间(超时时间)</param> /// <returns>返回移动的值</returns> public string BlockingPopAndPushItemBetweenLists(string fromListId, string toListId, TimeSpan"listId">集合Id</param> /// <param name="keepStartingFrom">保留起点</param> /// <param name="keepEndingAt">保留终点</param> public void TrimList(string listId, int keepStartingFrom, int keepEndingAt) { base._redisClient.TrimList(listId, keepStartingFrom, keepEndingAt); } #endregion 其它 #region 发布订阅 /// <summary> /// 发布 /// </summary> /// <param name="channel">频道</param> /// <param name="message">消息</param> public void Publish(string channel, string message) { base._redisClient.PublishMessage(channel, message); } /// <summary> /// 订阅 /// </summary> /// <param name="channel">频道</param> /// <param name="actionOnMessage"></param> public void Subscribe(string channel, Action<string, string, IRedisSubscription> actionOnMessage) { var subscription = base._redisClient.CreateSubscription(); subscription.OnSubscribe = c => { Console.WriteLine($"订阅频道{c}"); Console.WriteLine(); }; //取消订阅 subscription.OnUnSubscribe = c => { Console.WriteLine($"取消订阅 {c}"); Console.WriteLine(); }; subscription.OnMessage += (c, s) => { actionOnMessage(c, s, subscription); }; Console.WriteLine($"开始启动监听 {channel}"); subscription.SubscribeToChannels(channel); //blocking } /// <summary> /// 取消订阅 /// </summary> /// <param name="channel">频道</param> public void UnSubscribeFromChannels(string channel) { var subscription = base._redisClient.CreateSubscription(); subscription.UnSubscribeFromChannels(channel); } #endregion 发布订阅 } }
使用如下:
/// <summary> /// Redis List的实现为一个双向链表,即可以支持反向查找和遍历,更方便操作,不过带来了部分额外的内存开销, /// Redis内部的很多实现,包括发送缓冲队列等也都是用这个数据结构。 /// 队列/栈/生产者消费者模型/发布订阅 /// </summary> public static void ShowList() { using (RedisListService service = new RedisListService()) { service.FlushAll(); service.AddItemToList("article", "张三"); service.AddItemToList("article", "李四"); service.AddItemToList("article", "王五"); service.PrependItemToList("article", "赵六"); service.PrependItemToList("article", "钱七"); var result1 = service.GetAllItemsFromList("article"); //一次性获取所有的数据 var result2 = service.GetRangeFromList("article", 0, 3); //可以按照添加顺序自动排序,而且可以分页获取 Console.WriteLine($"result1={JsonConvert.SerializeObject(result1)}"); Console.WriteLine($"result2={JsonConvert.SerializeObject(result2)}"); Console.WriteLine("====================================================="); //栈:后进先出 service.FlushAll(); service.PushItemToList("article", "张三"); //入栈 service.PushItemToList("article", "李四"); service.PushItemToList("article", "王五"); service.PushItemToList("article", "赵六"); service.PushItemToList("article", "钱七"); for (int i = 0; i < 5; i++) { Console.WriteLine(service.PopItemFromList("article")); //出栈 } Console.WriteLine("====================================================="); //队列:先进先出,生产者消费者模型 //MSMQ---RabbitMQ---ZeroMQ---RedisList 学习成本、技术成本 service.FlushAll(); service.EnqueueItemOnList("article", "张三"); //入队 service.EnqueueItemOnList("article", "李四"); service.EnqueueItemOnList("article", "王五"); service.EnqueueItemOnList("article", "赵六"); service.EnqueueItemOnList("article", "钱七"); for (int i = 0; i < 5; i++) { Console.WriteLine(service.DequeueItemFromList("article")); //出队 } //分布式缓存,多服务器都可以访问到,多个生产者,多个消费者,任何产品只被消费一次 } }
运行结果如下所示:
下面我们就来看下如何使用上面的API来解决一些具体的问题:
一、博客数据分页
应用场景:
博客网站每天新增的随笔和文章可能都是几千几万的,表里面是几千万数据。首页要展示最新的随笔,还有前20页是很多人访问的。
这种情况下如果首页分页数据每次都去查询数据库,那么就会有很大的性能问题。
解决方案:
每次写入数据库的时候,把 ID_标题 写入到Redis的List中(后面搞个TrimList,只要最近的200个)。
这样的话用户每次刷页面就不需要去访问数据库了,直接读取Redis中的数据。
第一页(当然也可以是前几页)的时候可以不体现总记录数,只拿最新数据展示,这样就能避免访问数据库了。
还有一种就是水平分表了,数据存到Redis的时候可以保存 ID_表名称_标题
使用List主要是解决数据量大,变化快的数据分页问题。
二八原则:80%的访问集中在20%的数据,List里面只用保存大概的量就够用了。
using TianYa.Redis.Service; namespace MyRedis.Scene { /// <summary> /// 博客数据分页 /// /// 应用场景: /// 博客网站每天新增的随笔和文章可能都是几千几万的,表里面是几千万数据。首页要展示最新的随笔,还有前20页是很多人访问的。 /// 这种情况下如果首页分页数据每次都去查询数据库,那么就会有很大的性能问题。 /// /// 解决方案: /// 每次写入数据库的时候,把 ID_标题 写入到Redis的List中(后面搞个TrimList,只要最近的200个)。 /// 这样的话用户每次刷页面就不需要去访问数据库了,直接读取Redis中的数据。 /// 第一页(当然也可以是前几页)的时候可以不体现总记录数,只拿最新数据展示,这样就能避免访问数据库了。 /// /// 还有一种就是水平分表了,数据存到Redis的时候可以保存 ID_表名称_标题 /// /// 使用List主要是解决数据量大,变化快的数据分页问题。 /// 二八原则:80%的访问集中在20%的数据,List里面只用保存大概的量就够用了。 /// </summary> public class BlogPageList { public static void Show() { using (RedisListService service = new RedisListService()) { service.AddItemToList("newBlog", "10001_IOC容器的实现原理"); service.AddItemToList("newBlog", "10002_AOP面向切面编程"); service.AddItemToList("newBlog", "10003_行为型设计模式"); service.AddItemToList("newBlog", "10004_结构型设计模式"); service.AddItemToList("newBlog", "10005_创建型设计模式"); service.AddItemToList("newBlog", "10006_GC垃圾回收"); service.TrimList("newBlog", 0, 200); //保留最新的201个(一个List最多只能存放2的32次方-1个) var result1 = service.GetRangeFromList("newBlog", 0, 9); //第一页 var result2 = service.GetRangeFromList("newBlog", 10, 19); //第二页 var result3 = service.GetRangeFromList("newBlog", 20, 29); //第三页 } } } }
二、生产者消费者模型
分布式缓存,多服务器都可以访问到,多个生产者,多个消费者,任何产品只被消费一次。(使用队列实现)
其中一个(或多个)程序写入,另外一个(或多个)程序读取消费。按照时间顺序,数据失败了还可以放回去下次重试。
下面我们来看个例子:
Demo中添加了2个控制台应用程序,分别模拟生产者和消费者:
using System; using TianYa.Redis.Service; namespace TianYa.Producer { /// <summary> /// 模拟生产者 /// </summary> class Program { static void Main(string[] args) { Console.WriteLine("生产者程序启动了。。。"); using (RedisListService service = new RedisListService()) { Console.WriteLine("开始生产test产品"); for (int i = 1; i <= 20; i++) { service.EnqueueItemOnList("test", $"产品test{i}"); } Console.WriteLine("开始生产task产品"); for (int i = 1; i <= 20; i++) { service.EnqueueItemOnList("task", $"产品task{i}"); } Console.WriteLine("模拟生产结束"); while (true) { Console.WriteLine("************请输入数据************"); string testTask = Console.ReadLine(); service.EnqueueItemOnList("test", testTask); } } } } }
using System; using System.Threading; using TianYa.Redis.Service; namespace TianYa.Consumer { /// <summary> /// 模拟消费者 /// </summary> class Program { static void Main(string[] args) { Console.WriteLine("消费者程序启动了。。。"); using (RedisListService service = new RedisListService()) { while (true) { var result = service.BlockingDequeueItemFromLists(new string[] { "test", "task" }, TimeSpan.FromHours(1)); Thread.Sleep(100); Console.WriteLine($"消费者消费了 {result.Id} {result.Item}"); } } } } }
接下来我们使用.NET Core CLI来启动2个消费者实例和1个生产者实例,运行结果如下所示:
像这种异步队列在项目中有什么价值呢?
PS:此处事务是一个很大问题,真实项目中需根据实际情况决定是否采用异步队列。
三、发布订阅
发布订阅:
发布一个数据,全部的订阅者都能收到。
观察者,一个数据源,多个接收者,只要订阅了就可以收到的,能被多个数据源共享。
观察者模式:微信订阅号---群聊天---数据同步。。。
下面我们来看个小Demo:
/// <summary> /// 发布订阅 /// 发布一个数据,全部的订阅者都能收到。 /// 观察者,一个数据源,多个接收者,只要订阅了就可以收到的,能被多个数据源共享。 /// 观察者模式:微信订阅号---群聊天---数据同步。。。 /// </summary> public static void ShowPublishAndSubscribe() { Task.Run(() => { using (RedisListService service = new RedisListService()) { service.Subscribe("TianYa", (c, message, iRedisSubscription) => { Console.WriteLine($"注册{1}{c}:{message},Dosomething else"); if (message.Equals("exit")) iRedisSubscription.UnSubscribeFromChannels("TianYa"); });//blocking } }); Task.Run(() => { using (RedisListService service = new RedisListService()) { service.Subscribe("TianYa", (c, message, iRedisSubscription) => { Console.WriteLine($"注册{2}{c}:{message},Dosomething else"); if (message.Equals("exit")) iRedisSubscription.UnSubscribeFromChannels("TianYa"); });//blocking } }); Task.Run(() => { using (RedisListService service = new RedisListService()) { service.Subscribe("Twelve", (c, message, iRedisSubscription) => { Console.WriteLine($"注册{3}{c}:{message},Dosomething else"); if (message.Equals("exit")) iRedisSubscription.UnSubscribeFromChannels("Twelve"); });//blocking } }); using (RedisListService service = new RedisListService()) { Thread.Sleep(1000); service.Publish("TianYa", "TianYa1"); Thread.Sleep(1000); service.Publish("TianYa", "TianYa2"); Thread.Sleep(1000); service.Publish("TianYa", "TianYa3"); Thread.Sleep(1000); service.Publish("Twelve", "Twelve1"); Thread.Sleep(1000); service.Publish("Twelve", "Twelve2"); Thread.Sleep(1000); service.Publish("Twelve", "Twelve3"); Thread.Sleep(1000); Console.WriteLine("**********************************************"); Thread.Sleep(1000); service.Publish("TianYa", "exit"); Thread.Sleep(1000); service.Publish("TianYa", "TianYa6"); Thread.Sleep(1000); service.Publish("TianYa", "TianYa7"); Thread.Sleep(1000); service.Publish("TianYa", "TianYa8"); Thread.Sleep(1000); service.Publish("Twelve", "exit"); Thread.Sleep(1000); service.Publish("Twelve", "Twelve6"); Thread.Sleep(1000); service.Publish("Twelve", "Twelve7"); Thread.Sleep(1000); service.Publish("Twelve", "Twelve8"); Thread.Sleep(1000); Console.WriteLine("结束"); } }
运行结果如下所示:
至此本文就全部介绍完了,如果觉得对您有所启发请记得点个赞哦!!!
Demo源码:
链接: https://pan.baidu.com/s/1_kEMCtbf2iT5pLV7irxR5Q 提取码: v4sr
此文由博主精心撰写转载请保留此原文链接:https://www.cnblogs.com/xyh9039/p/14022264.html