对象编程五大原则(如何模拟会话级别的信号量)

现在,因为种种因素,你必须对一个请求或者方法进行频率上的访问限制。

比如, 你对外提供了一个API接口,注册用户每秒钟最多可以调用100次,非注册用户每秒钟最多可以调用10次。

比如, 有一个非常吃服务器资源的方法,在同一时刻不能超过10个人调用这个方法,否则服务器满载。

比如, 有一些特殊的页面,访客并不能频繁的访问或发言。

比如, 秒杀活动等进行。

比如 ,防范DDOS,当达到一定频率后调用脚本iis服务器ip黑名单,防火墙黑名单。

如上种种的举例,也就是说,如何从一个面向切面的方式对调用的方法进行频率上的限制。而对频率限制,服务器层面都有最直接的解决方法,现在我说的则是代码层面上的频率管控。

本文给出两个示例,一个是基于单机环境的实现,第二个则是基于分布式的redis实现

以第一个API接口需求为例,先说下单机环境下的实现。

按照惯性思维,我们自然会想到缓存的过期策略这种方法,但是严格来讲就HttpRuntime.Cache而言,通过缓存的过期策略来对请求进行频率的并发控制是不合适的。

HttpRuntime.Cache 是应用程序级别的Asp.Net的缓存技术,通过这个技术可以申明多个缓存对象,可以为每个对象设置过期时间,当过期时间到达后该缓存对象就会消失(也就是当你访问该对象的时候为Null)

为什么这样说呢?比如对某个方法(方法名:GetUserList)我们要进行1秒钟最多10次的限制,现在我们就新建一个int型的Cache对象,然后设置1秒钟后过期消失。那么每当访问GetUserList方法前,我们就先判断这个Cache对象的值是否大于10,如果大于10就不执行GetUserList方法,如果小于10则允许执行。每当访问该对象的时候如果不存在或者过期就新建,这样周而复始,则该对象永远不可能超过10。

if ((int)HttpRuntime.Cache["GetUserListNum"] > 10) //大于10请求失败 { Console.WriteLine("禁止请求"); } else { HttpRuntime.Cache["GetUserListNum"] = (int)HttpRuntime.Cache["GetUserListNum"] 1; //否则该缓存对象的值 1 Console.WriteLine("允许请求"); }

这样的思想及实现相对来说非常简单,但是基于这样的一个模型设定,那么就会出现这种情况:

对象编程五大原则(如何模拟会话级别的信号量)(1)

如上图,每个点代表一次访问请求,我在0秒的时候 新建了一个名字为GetUserListNum的缓存对象。

在0~0.5秒期间 我访问了3次,在0.5~1秒期间,我们访问了7次。此时,该对象消失,然后我们接着访问,该对象重置为0.

在第1~1.5秒期间,还是访问了7次,在第1.5秒~2秒期间访问了3次。

基于这种简单缓存过期策略的模型,在这2秒钟内,我们虽然平均每秒钟都访问了10次,满足这个规定,但是如果我们从中取一个期间段,0.5秒~1.5秒期间,也是1秒钟,但是却实实在在的访问了14次!远远超过了我们设置的 1秒钟最多访问10次的 限制。

那么如何科学的来解决上面的问题呢?我们可以通过模拟会话级别的信号量这一手段,这也就是我们今天的主题了。

什么是信号量?

仅就以代码而言, static SemaphoreSlim semaphoreSlim = new SemaphoreSlim(5); 它的意思就代表在多线程情况下,在任何一时刻,只能同时5个线程去访问。

4容器4线程模型

现在,在实现代码的之前我们先设计一个模型。

对象编程五大原则(如何模拟会话级别的信号量)(2)

假设我们有一个用户A的管道,这个管道里装着用户A的请求,比如用户A在一秒钟发出了10次请求,那么每一个请求过来,管道里的元素都会多一个。但是我们设定这个管道最多只能容纳10个元素,而且每个元素的存活期为1秒,1秒后则该元素消失。那么这样设计的话,无论是速率还是数量的突进,都会有管道长度的限制。这样一来,无论从哪一个时间节点或者时间间隔出发,这个管道都能满足我们的频率限制需求。

而这里的管道,就必须和会话Id来对应了。每当有新会话进来的时候就生成一个新管道。这个会话id根据自己场景所定,可以是sessionId,可以是ip,也可以是token。

那么既然这个管道是会话级别的,我们肯定得需要一个容器,来装这些管道。现在,我们以IP来命名会话管道,并把所有的管道都装载在一个容器中,如图

对象编程五大原则(如何模拟会话级别的信号量)(3)

而基于刚才的设定,我们还需要对容器内的每条管道的元素进行处理,把过期的给剔除掉,为此,还需要单独为该容器开辟出一个线程来为每条管道进行元素的清理。而当管道的元素为0时,我们就清掉该管道,以便节省容器空间。

对象编程五大原则(如何模拟会话级别的信号量)(4)

当然,由于用户量多,一个容器内可能存在上万个管道,这个时候仅仅用一个容器来装载来清理,在效率上显然是不够的。这个时候,我们就得对容器进行横向扩展了。

比如,我们可以根据Cpu核心数自动生成对应的数量的容器,然后根据一个算法,对IP来进行导流。我当前cpu是4个逻辑核心,就生成了4个容器,每当用户访问的时候,都会最先经过一个算法,这个算法会对IP进行处理,如192.168.1.11~192.168.1.13这个Ip段进第一个容器,xxx~xxx进第二个容器,依次类推,相应的,也就有了4个线程去分别处理4个容器中的管道。

对象编程五大原则(如何模拟会话级别的信号量)(5)

那么,最终就形成了我们的4容器4线程模型了。

现在,着眼于编码实现:

首先我们需要一个能承载这些容器的载体,这个载体类似于连接池的概念,可以根据一些需要自动生成适应数量的容器,如果有特殊要求的话,还可以在容器上切出一个容器管理的面,在线程上切出一个线程管理的面以便于实时监控和调度。如果真要做这样一个系统,那么 容器的调度 和 线程的调度功能 是必不可少的,而本Demo则是完成了主要功能,像容器和线程在代码中我也没剥离开来,为了更好的直观的体现demo的含义,算法也是直接写死的,集合容器的内存管理与元素的选型待优化,实际设计中,这些都得优化,还有多线程模型中,怎样上锁才能让效率最大化也是重中之重的。

而这里为了案例的直观就直接写死成4个容器。

public static List<Container> ContainerList = new List<Container>(); //容器载体 static Factory() { for (int i = 0; i < 4; i ) { ContainerList.Add(new Container(i)); //遍历4次 生成4个容器 } foreach (var item in ContainerList) { item.Run(); //开启线程 } }

现在,我们假定 有编号为 0 到 40 这样的 41个用户。那么这个导流算法 我也就直接写死,编号0至9的用户 将他们的请求给抛转到第一个容器,编号10~19的用户 放到第二个容器,编号20~29放到第三个容器,编号30~40的用户放到第四个容器。

那么这个代码就是这样的:

static Container GetContainer(int userId, out int i) //获取容器的算法 { if (0 <= userId && userId < 10) //编号0至9的用户 返回第一个容器 依次类推 { i = 0; return ContainerList[0]; } if (10 <= userId && userId < 20) { i = 1; return ContainerList[1]; } if (20 <= userId && userId < 30) { i = 2; return ContainerList[2]; } i = 3; return ContainerList[3]; }

当我们的会话请求经过算法的导流之后,都必须调用一个方法,用于辨别管道数量。如果管道数量已经大于10,则请求失败,否则成功

public static void Add(int userId) { if (GetContainer(userId, out int i).Add(userId)) Console.WriteLine("容器" i " 用户" userId " 发起请求"); else Console.WriteLine("容器" i " 用户" userId " 被拦截"); }

接下来就是容器Container的代码了。

这里,对容器的选型用线程安全的ConcurrentDictionary类。

线程安全:当多个线程同时读写同一个共享元素的时候,就会出现数据错乱,迭代报错等安全问提

ConcurrentDictionary:除了GetOrAdd方法要慎用外,是.Net4.0专为解决Dictionary线程安全而出的新类型

ReaderWriterlockSlim:较ReaderWriterLock优化的读写锁,多个线程同时访问读锁 或 一个线程访问写锁

private ReaderWriterLockSlim obj = new ReaderWriterLockSlim(); //在每个容器中申明一个读写锁 public ConcurrentDictionary<string, ConcurrentList<DateTime>> dic = new ConcurrentDictionary<string, ConcurrentList<DateTime>>(); //创建该容器 dic

然后当你向容器添加一条管道中的数据是通过这个方法:

public bool Add(int userId) { obj.EnterReadLock();//挂读锁,允许多个线程同时写入该方法 try { ConcurrentList<DateTime> dtList = dic.GetOrAdd(userId.ToString(),t=>{ new ConcurrentList<DateTime>()}); //如果不存在就新建 ConcurrentList return dtList.CounterAdd(10, DateTime.Now); //管道容量10,当临界管道容量后 返回false } finally { obj.ExitReadLock(); } }

这里,为了在后面的线程遍历删除ConcurrentList的管道的时候保证ConcurrentList的安全性,所以此处要加读锁。

而ConcurrentList,因为.Net没有推出List集合类的线程安全(这里我申明下:之所以不用ConcurrentBag是因为要保证count和add的一致性,这里补充一下),所以自己新建了一个继承于List<T>的安全类型,在这里 封装了3个需要使用的方法。

public class ConcurrentList<T> : List<T> { private object obj = new object(); public bool CounterAdd(int num, T value) { lock (obj) { if (base.Count >= num) return false; else base.Add(value); return true; } } public new bool Remove(T value) { lock (obj) { base.Remove(value); return true; } } public new T[] ToArray() { lock (obj) { return base.ToArray(); } } }

最后就是线程的运行方法:

public void Run() { ThreadPool.QueueUserWorkItem(c => { while (true) { if (dic.Count > 0) { foreach (var item in dic.ToArray()) { ConcurrentList<DateTime> list = item.Value; foreach (DateTime dt in list.ToArray()) { if (DateTime.Now.AddSeconds(-3) > dt) { list.Remove(dt); Console.WriteLine("容器" seat " 已删除用户" item.Key "管道中的一条数据"); } } if (list.Count == 0) { obj.EnterWriteLock(); try { if (list.Count == 0) { if (dic.TryRemove(item.Key, out ConcurrentList<DateTime> i)) { Console.WriteLine("容器" seat " 已清除用户" item.Key "的List管道"); } } } finally { obj.ExitWriteLock(); } } } } else { Thread.Sleep(100); } } } ); }

最后,是效果图,一个是基于控制台的,还一个是基于Signalr的。

对象编程五大原则(如何模拟会话级别的信号量)(6)

对象编程五大原则(如何模拟会话级别的信号量)(7)

分布式下Redis

上面介绍了一种频率限制的模型,分布式与单机相比,无非就是载体不同,我们只要把这个容器的载体从程序上移植出来,来弄成一个单独的服务或者直接借用Redis也是可行的。

这里就介绍分布式情况下,Redis的实现。

不同于Asp.Net的多线程模型,大概因为Redis的各种类型的元素非常粒度的操作导致各种加锁的复杂性,所以在网络请求处理这块Redis是单线程的,基于Redis的实现则因为单线程的缘故在编码角度不用太多考虑到与逻辑无关的问题。

简单介绍下,Redis是一个内存数据库,这个数据库属于非关系型数据库,它的概念不同于一般的我们认知的Mysql Oracle SqlServer关系型数据库,它没有Sql没有字段名没有表名这些概念,它和HttpRunTime.Cache的概念差不多一样,首先从操作上属于键值对模式,就如 Cache["键名"] 这样就能获取到值类似,而且可以对每个Key设置过期策略,而Redis中的Key所对应的值并不是想存啥就存啥的,它支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及sorted set(有序集合)。

今天要说的是Sorted set有序集合,有序集合相比其它的集合类型的特殊点在于,使用有序集合的时候还能给插入的元素指定一个 积分score,我们把这个积分score理解为排序列,它内部会对积分进行排序,积分允许重复,而有序集合中的元素则是唯一。

还是同样的思路,每当有用户访问的时候,都对该用户的 管道(有序集合)中添加一个元素,然后设置该元素的积分为当前时间。接着在程序中开个线程,来对管道中积分小于约定时间的元素进行清理。因为规定有序集合中的元素只能是唯一值,所以在赋值方面只要是满足uuid即可。

对象编程五大原则(如何模拟会话级别的信号量)(8)

那么用Redis来实现的代码那就是类似这种:

对象编程五大原则(如何模拟会话级别的信号量)(9)

通过using语法糖实现IDisposable而包装的Redis分布式锁,然后里面正常的逻辑判断。

这样的代码虽然也能完成功能,但不够友好。Redis是个基于内存的数据库,于性能而言,瓶颈在于网络 IO 上,与Get一次发出一次请求相比,能不能通过一段脚本来实现大部分逻辑呢?

有的,Redis支持 Lua脚本:

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

大致意思就是,直接向Redis发送一段脚本或者让它直接本地读取一段脚本从而直接实现所有的逻辑。

/// <summary> /// 如果 大于10(AccountNum) 就返回1 否则就增加一条集合中的元素 并返回 空 /// </summary> /// <param name="zcardKey"></param> /// <param name="score"></param> /// <param name="zcardValue"></param> /// <param name="AccountNum"></param> /// <returns></returns> public string LuaAddAccoundSorted(string zcardKey, double score, string zcardValue, int AccountNum) { string str = "local uu = redis.call('zcard',@zcardKey) if (uu >=tonumber(@AccountNum)) then return 1 else redis.call('zadd',@zcardKey,@score,@zcardValue) end"; var re = _instance.GetDatabase(_num).ScriptEvaluate(LuaScript.Prepare(str), new { zcardKey = zcardKey, score = score, zcardValue = zcardValue, AccountNum=AccountNum }); return re.ToString(); }

local uu就是申明一个为名uu的变量的意思,redis.call就是redis命令,这段脚本意思就是如果 大于10(AccountNum) 就返回1 否则就增加一条集合中的元素 并返回 空。

管道内元素处理的方法就是:

/// <summary> /// 遍历当前所有前缀的有序集合,如果数量为0,那么就返回1 否则 就删除 满足最大分值条件区间的元素,如果该集合个数为0则消失 /// </summary> /// <param name="zcardPrefix"></param> /// <param name="score"></param> /// <returns></returns> public string LuaForeachRemove(string zcardPrefix, double score) { StringBuilder str = new StringBuilder(); str.Append("local uu = redis.call('keys',@zcardPrefix) "); //声明一个变量 去获取 模糊查询的结果集合 str.Append("if(#uu==0) then"); //如果集合长度=0 str.Append(" return 1 "); str.Append("else "); str.Append(" for i=1,#uu do "); //遍历 str.Append(" redis.call('ZREMRANGEBYSCORE',uu[i],0,@score) "); //删除从0 到 该score 积分区间的元素 str.Append(" if(redis.call('zcard',uu[i])==0) then "); //如果管道长度=0 str.Append(" redis.call('del',uu[i]) "); //删除 str.Append(" end "); str.Append(" end "); str.Append("end "); var re = _instance.GetDatabase(_num).ScriptEvaluate(LuaScript.Prepare(str.ToString()), new { zcardPrefix = zcardPrefix "*", score = score }); return re.ToString();

这2段代码通过发送Lua脚本的形式来完成了整个过程,因为Redis的网络模型原因,所以把LuaForeachRemove方法给提出来做个服务来单独处理即可。至于那种多容器多线程的实现,则完全可以开多个Redis的实例来实现。最后放上效果图。

对象编程五大原则(如何模拟会话级别的信号量)(10)

希望这篇文档对大家有一定的启发和帮助。

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页