北大青鸟小游网:值得大家信赖的游戏下载站!
发布时间:2021-06-10 15:17:11来源:北大青鸟手游网作者:北大青鸟手游网
这篇文章主要介绍了Redis怎么实现分布式锁和等待序列,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
在集群下,经常会因为同时处理发生资源争抢和并发问题,但是我们都知道同步锁 synchronized 、 cas 、 ReentrankLock 这些锁的作用范围都是 JVM ,说白了在集群下没啥用。这时我们就需要能在多台 JVM 之间决定执行顺序的锁了,现在分布式锁主要有 redis 、 Zookeeper 实现的,还有数据库的方式,不过性能太差,也就是需要一个第三方的监管。
背景
最近在做一个消费 Kafka 消息的时候发现,由于线上的消费者过多,经常会遇到,多个机器同时处理一个主键类型的数据的情况发生,如果最后是执行更新操作的话,也就是一个更新顺序的问题,但是如果恰好都需要插入数据的时候,会出现主键重复的问题。这是生产上不被允许的(因为公司有异常监管的机制,扣分啥的),这是就需要个分布式锁了,斟酌后用了 Redis 的实现方式(因为网上例子多)
分析
redis 实现的分布式锁,实现原理是 set 方法,因为多个线程同时请求的时候,只有一个线程可以成功并返回结果,还可以设置有效期,来避免死锁的发生,一切都是这么的完美,不过有个问题,在 set 的时候,会直接返回结果,成功或者失败,不具有阻塞效果,需要我们自己对失败的线程进程处理,有两种方式
丢弃
等待重试 由于我们的系统需要这些数据,那么只能重新尝试获取。这里使用 redis 的 List 类型实现等待序列的作用
代码
直接上代码 其实直接redis的工具类就可以解决了
package com.testimport redis.clients.jedis.Jedis;import java.util.Collections;import java.util.List;/** * @desc redis队列实现方式 * @anthor * @date **/public class RedisUcUitl { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; private RedisUcUitl() { } /** * logger **/ /** * 存储redis队列顺序存储 在队列首部存入 * * @param key 字节类型 * @param value 字节类型 */ public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) { return jedis.lpush(key, value); } /** * 移除列表中最后一个元素 并将改元素添加入另一个列表中 ,当列表为空时 将阻塞连接 直到等待超时 * * @param srckey * @param dstkey * @param timeout 0 表示永不超时 * @return */ public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) { return jedis.brpoplpush(srckey, dstkey, timeout); } /** * 返回制定的key,起始位置的redis数据 * @param redisKey * @param start * @param end -1 表示到最后 * @return */ public static List<byte[]> lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) { return jedis.lrange(redisKey, start, end); } /** * 删除key * @param redisKey */ public static void delete(Jedis jedis, final byte[] redisKey) { return jedis.del(redisKey); } /** * 尝试加锁 * @param lockKey key名称 * @param requestId 身份标识 * @param expireTime 过期时间 * @return */ public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) { String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); return LOCK_SUCCESS.equals(result); } /** * 释放锁 * @param lockKey key名称 * @param requestId 身份标识 * @return */ public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) { final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); return RELEASE_SUCCESS.equals(result); } }
业务逻辑主要代码如下
1.先消耗队列中的
while(true){ // 消费队列 try{ // 被放入redis队列的数据 序列化后的 byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1); if(bytes == null || bytes.isEmpty()){ // 队列中没数据时退出 break; } // 反序列化对象 Map<String, Object> singleMap = (Map<String, Object>) ObjectSerialUtil.bytesToObject(bytes); // 塞入唯一的值 防止被其他线程误解锁 String requestId = UUID.randomUUID().toString(); boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100); if(lockGetFlag){ // 成功获取锁 进行业务处理 //TODO // 处理完毕释放锁 boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId); }else{ // 未能获得锁放入等待队列 RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param)); } }catch(Exception e){ break; } }
2.处理最新接到的数据
同样是走尝试获取锁,获取不到放入队列的流程
一般序列化用 fastJson 之列的就可以了,这里用的是 JDK 自带的,工具类如下
public class ObjectSerialUtil { private ObjectSerialUtil() {// 工具类 } /** * 将Object对象序列化为byte[] * * @param obj 对象 * @return byte数组 * @throws Exception */ public static byte[] objectToBytes(Object obj) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); byte[] bytes = bos.toByteArray(); bos.close(); oos.close(); return bytes; } /** * 将bytes数组还原为对象 * * @param bytes * @return * @throws Exception */ public static Object bytesToObject(byte[] bytes) { try { ByteArrayInputStream bin = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream(bin); return ois.readObject(); } catch (Exception e) { throw new BaseException("反序列化出错!", e); } } }
上一篇:redis中队列消息实现应用解耦
和平精英通讯塔是什么 和平精英通讯塔玩法详细解析
跑跑卡丁车手游制霸赛场挑战任务全攻略
神雕侠侣2手游氪金玩家消费指南
王牌战士幽灵实战技巧讲解
王者荣耀王者模拟战即将上线 王者模拟战玩法介绍
王牌战士团战如何切入详细讲解
第五人格先知天赋怎么加点
崩坏3精英工坊新加入了什么武器圣痕 精英工坊新武器圣痕一览
古今江湖童姥牌组搭配使用技巧攻略
坠落星界
其它游戏
炽姬无双
角色扮演
王者荣耀
角色扮演
和平精英
枪战射击
邪恶疯人院
休闲益智
神雕侠侣2
角色扮演
一刀传世
角色扮演
九州天空城3D
角色扮演
斗罗大陆手游
角色扮演