分布式锁的实现方式有很多,本篇文章讲述一下使用Redis实现分布式锁。网上有很多使用Redis实现分布式锁的代码,但是这些代码或多或少都有问题。这篇文章会写一个实现,同时标明一些注意点。

场景

为了便于阐述,这里假设一个游戏场景,用户A有开山斧一把,价值500元宝,用户B有800元宝,想买A的开山斧,这些数据都存在Redis中。需要编写代码成功的实现该笔交易。

问题

Redis实现分布式锁,需要考虑如下问题:

持有锁的进程因为操作时间过长而导致锁被自动释放,但进程本身并不知晓这一点,甚至还可能会错误地释放掉了其他进程持有的锁。一个持有锁并打算执行长时间操作的进程已经崩溃,但其他想要获取锁的进程不知道哪个进程持有着锁,也无法检测出持有锁的进程已经崩溃,只能白白地浪费时间等待锁被释放。在一个进程持有的锁过期之后,其他多个进程同时尝试去获取锁,并且都获得了锁。

三个特性

实现一个最低保障的分布式锁,需要具备三个特性

安全属性(Safety property): 独享(相互排斥)。在任意一个时刻,只有一个客户端持有锁。活性A(Liveness property A): 无死锁。即便持有锁的客户端崩溃(crashed)或者网络被分裂(gets partitioned),锁仍然可以被获取。活性B(Liveness property B): 容错。 只要大部分Redis节点都活着,客户端就可以获取和释放锁.

命令

使用Redis实现分布式锁,一般使用SETNX或者SET命令,SETNX不能同时设置过期时间,如果使用的版本大于等于2.6.12,可以使用SET命令,可以使用这个命令原子性的实现SETNX和EXPIRE的功能,下面是两个命令的简介

SETNX

命令格式:SETNX key value

时间复杂度:O(1)

说明:将key设置值为value,如果key不存在,这种情况下等同SET命令。 当key存在时,什么也不做。SETNX是”SETifNot eXists”的简写。

返回值

1如果key被设置了如果key没有被设置

SET

命令格式:SET key value [EX seconds] [PX milliseconds] [NX|XX]

时间复杂度:O(1)

说明:将键key设定为指定的“字符串”值。如果 key 已经保存了一个值,那么这个操作会直接覆盖原来的值,并且忽略原始类型。当set命令执行成功之后,之前设置的过期时间都将失效。

选项

从2.6.12版本开始,redis为SET命令增加了一系列选项:

EXseconds– 设置键key的过期时间,单位时秒PXmilliseconds– 设置键key的过期时间,单位是毫秒NX– 只有键key不存在的时候才会设置key的值XX– 只有键key存在的时候才会设置key的值

实现

此处使用SETNX实现,毕竟有的公司Redis版本可能较低,QQ号码出售使用SETNX可以实现,SET更加没有问题。

代码如下:

<?phpfunction uuid($prefix = ''){
$chars = md5(uniqid(mt_rand(), true));
$uuid = substr($chars, , 8) . '-';
$uuid .= substr($chars, 8, 4) . '-';
$uuid .= substr($chars, 12, 4) . '-';
$uuid .= substr($chars, 16, 4) . '-';
$uuid .= substr($chars, 20, 12);
$ret = $prefix . $uuid;
return strtoupper($ret);}function acquireLock($redis,$lockName, $acquireTime = 10, $lockTime = 10){
$lockKey = 'lock:' + $lockName;
$identifier = uuid('identify');
$end = time() + $acquireTime;
while (time() < $end) {
if ($redis->setnx($lockKey, $identifier)) {
$redis->expire($lockKey, $lockTime);
return $identifier;
} elseif ($redis->ttl($lockKey) == -1) {
$redis->expire($lockKey, $lockTime);
}
usleep(1000);
}
return false;}function process(){
$redis = new Redis();
$lockName = 'market';
//1.获取锁 $locked = acquireLock($redis,$lockName);
if($locked === false){
return false;
}
//2.进行交易 //判断A和B是否满足交易条件 //使用管道,对A和B进行操作
//3.释放锁 $releaseRes = releaseLock($redis,$lockName,$locked);
if($releaseRes === false){
return false;
}}function releaseLock($redis,$lockName,$identifier){
$lockKey = 'lock:' + $lockName;
$redis->watch($lockKey);
if($redis->get($lockKey) === $identifier){
$redis->multi();
$redis->del($lockKey);
$redis->exec();
return true;
}
$redis->unwatch();
return false;}

说明:

获取锁:
创建唯一的$identifier,这个值用于删除key的时候,判断是否为当前客户端获取的锁,以免删除其它客户端的锁while循环用于在一段时间内不停的获取锁如果能够获取锁便获取,同时设置超时时间,防止线程运行时崩溃,锁永远无法释放如果没能成功获取到锁,检查当前锁的过期时间,如果未设置过期时间,进行设置,防止其他线程获得锁后立即崩溃,没有设置过期时间

2. 处理业务

需要先判断交易双方是否都满足条件,因为锁定的是整个市场,所以一旦获得锁,交易双方的状态都不会再进行改变使用管道能保证整个交易能像事务一样被处理,而且性能会比用redis的事务更好

3. 释放锁

使用watch监控锁,一旦key被变更,删除key的事务不会被执行需要判断key的值是否和本线程记录的$identifier一样,只有一致才能进行删除使用事务来做删除key的操作,使用事务的原因是防止中途该锁被别的线程获取如果失败,记得unwatch

4. 其他问题

可重入问题:可重入指的是,线程可以再次获取到锁。实现方法比较简单,只需要在acquireLock的时候,传入identifier,判断当前锁的identifier和传入的是否一致,如果一致则可以进行操作线程未执行完毕,锁的超时时间已过,其他线程获取到锁:解决该问题的一个方案是,当获取到锁后,在超时时间经过一半的时候检查锁是否存在或者被修改,如果没有变化且线程正常运行,则延长超时时间


思考

如果基于Redis单实例,假设这个单实例总是可用,这种方法已经足够安全。

但有两种特殊情况大家需要关注一下:

主从结构中存在明显的竞态:

客户端A从master获取到锁在master将锁同步到slave之前,master宕掉了。slave节点被晋级为master节点客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。安全失效!

在Redis的分布式环境中,有N个Redis master

这种情况可以使用Redlock算法

总结

本文讲述了怎样用Redis实现分布式锁,并写了具体实现和相关的分析。要用Redis实现分布式锁,有很多细节需要思考,大家可以根据自己的业务形态设计符合自己要求的锁,在复杂度和安全性上做好折中。


更多相关文章

  1. 阅读代码:Spark 与 Flink 中的 RPC 实现
  2. django中的session实现
  3. 用node和express连接mysql实现登录注册
  4. 使用面向对象方法实现用户信息增删改查
  5. PHP用数据库类实现数据库增删改查
  6. 如何实现一个iOS AOP框架?
  7. ES5、ES6 如何实现继承
  8. 用小鸟云服务器如何实现Nginx静态资源配置?
  9. 实现一个简单的计算器功能

随机推荐

  1. Android(安卓)- 时间 日期相关组件
  2. Android JNI 分析
  3. android之datepicker控件用法
  4. Android Lights笔记
  5. android:layout_width="match_parent"
  6. android java和js交互
  7. Android视频桌面,动态桌面开发
  8. 在Android上模拟MetroUI
  9. android 音乐信息乱码处理
  10. Android模拟器无法启动