1.添加缓存
思路:标准的操作方式就是查询数据库之前先查询缓存,如果缓存数据存在,则直接从缓存中返回,如果缓存数据不存在,再查询数据库,然后将数据存入redis。

代码思路:如果缓存有,则直接返回,如果缓存不存在,则查询数据库,然后存入redis。
//添加redis缓存
@Override
public Result queryById(Long id){
String key =CACHE_SHOP_KEY + id;
//1.从redis查询数据缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
if(StrUtil.isNotBlank(shopJson)){
//3.存在,直接返回
Shop shop = BeanUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//4.不存在,根据id查询数据库
Shop shop = getById(id);
if(shop == null){
//5.数据库不存在,返回错误
return Result.fail("店铺不存在");
}
//6.存在,写入redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));
//7.返回数据
return Result.ok(shop);
}
数据库和缓存不一致采用什么方案? ---------采用先操作数据库,在删除缓存。
设置redis缓存时添加过期时间
2.缓存穿透
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
(用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力)。
常见的解决方案有两种:
-
缓存空对象
-
缓存空对象思路分析:当我们客户端访问不存在的数据时,先请求redis,但是此时redis中没有数据,此时会访问到数据库,但是数据库中也没有数据,这个数据穿透了缓存,直击数据库,我们都知道数据库能够承载的并发不如redis这么高,如果大量的请求同时过来访问这种不存在的数据,这些请求就都会访问到数据库,简单的解决方案就是哪怕这个数据在数据库中也不存在,我们也把这个数据存入到redis中去,这样,下次用户过来访问这个不存在的数据,那么在redis中也能找到这个数据就不会进入到缓存了
-
优点:实现简单,维护方便
-
缺点:
-
额外的内存消耗
-
可能造成短期的不一致
-
-
-
布隆过滤
-
布隆过滤器其实采用的是哈希思想来解决这个问题,通过一个庞大的二进制数组,走哈希思想去判断当前这个要查询的这个数据是否存在,如果布隆过滤器判断存在,则放行,这个请求会去访问redis,哪怕此时redis中的数据过期了,但是数据库中一定存在这个数据,在数据库中查询出来这个数据后,再将其放入到redis中,
-
优点:内存占用较少,没有多余key
- 缺点:
-
-
实现复杂
-
存在误判可能

-
-
编码解决商品查询的缓存穿透问题:
核心思路如下:
在原来的逻辑中,我们如果发现这个数据在mysql中不存在,直接就返回404了,这样是会存在缓存穿透问题的
现在的逻辑中:如果这个数据不存在,我们不会返回404 ,还是会把这个数据写入到Redis中,并且将value设置为空,欧当再次发起查询时,我们如果发现命中之后,判断这个value是否是null,如果是null,则是之前写入的数据,证明是缓存穿透数据,如果不是,则直接返回数据。
//缓存穿透 public Shop queryWithPassThrough(Long id){ String key =CACHE_SHOP_KEY + id; //1.从redis查询数据缓存 String shopJson = stringRedisTemplate.opsForValue().get(key); //2.判断时是否存在 if(StrUtil.isNotBlank(shopJson)){ //3.存在直接返回 Shop shop = BeanUtil.toBean(shopJson, Shop.class); return shop; } if(shopJson != null){ return null; } //4.不存在,根据id查询数据库 Shop shop = getById(id); //5.数据库不存在,空值存入redis if(shop == null){ stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES); } //6.数据库存在,写入redis, stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop)); stringRedisTemplate.expire(key, RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES); //7.返回数据 return shop; }
3.缓存击穿
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
常见的解决方案有两种:
-
互斥锁
-
逻辑过期
逻辑分析:假设线程1在查询缓存之后,本来应该去查询数据库,然后把这个数据重新加载到缓存的,此时只要线程1走完这个逻辑,其他线程就都能从缓存中加载这些数据了,但是假设在线程1没有走完的时候,后续的线程2,线程3,线程4同时过来访问当前这个方法, 那么这些线程都不能从缓存中查询到数据,那么他们就会同一时刻来访问查询缓存,都没查到,接着同一时间去访问数据库,同时的去执行数据库代码,对数据库访问压力过大

解决方案一、使用锁来解决:
因为锁能实现互斥性。假设线程过来,只能一个人一个人的来访问数据库,从而避免对于数据库访问压力过大,但这也会影响查询的性能,因为此时会让查询的性能从并行变成了串行,我们可以采用tryLock方法 + double check来解决这样的问题。
假设现在线程1过来访问,他查询缓存没有命中,但是此时他获得到了锁的资源,那么线程1就会一个人去执行逻辑,假设现在线程2过来,线程2在执行过程中,并没有获得到锁,那么线程2就可以进行到休眠,直到线程1把锁释放后,线程2获得到锁,然后再来执行逻辑,此时就能够从缓存中拿到数据了。

利用互斥锁解决缓存击穿问题
核心思路:相较于原来从缓存中查询不到数据后直接查询数据库而言,现在的方案是 进行查询之后,如果从缓存没有查询到数据,则进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有获得到,则休眠,过一会再进行尝试,直到获取到锁为止,才能进行查询
如果获取到了锁的线程,再去进行查询,查询后将数据写入redis,再释放锁,返回数据,利用互斥锁就能保证只有一个线程去执行操作数据库的逻辑,防止缓存击穿

操作锁的代码:
核心思路就是利用redis的setnx方法来表示获取锁,该方法含义是redis中如果没有这个key,则插入成功,返回1,在stringRedisTemplate中返回true, 如果有这个key则插入失败,则返回0,在stringRedisTemplate返回false,我们可以通过true,或者是false,来表示是否有线程成功插入key,成功插入的key的线程我们认为他就是获得到锁的线程。
private boolean tryLock(String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } private void unlock(String key) { stringRedisTemplate.delete(key); }
//缓存击穿 public Shop queryWithMutex(Long id){ // 1.从redis查询数据缓存 String key = CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get(key); // 2.判断缓存是否存在 if (StrUtil.isNotBlank(shopJson)) { // 3.存在直接返回 Shop shop = JSONUtil.toBean(shopJson, Shop.class); return shop; } if(shopJson!=null){ return null; } // 4.实现缓存重建 // 4.1.判断是否获取互斥锁 String lockKey = RedisConstants.LOCK_SHOP_KEY + id; Shop shop = null; try { boolean isLock = tryLock(lockKey); if(!isLock){ // 4.2.获取锁失败,休眠重试 Thread.sleep(50); return queryWithMutex(id); } //4.3成功,根据id查询数据库 shop = getById(id); // 5.不存在,返回错误 if(shop == null){ //将空值写入redis stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES); //返回错误信息 return null; } // 6.存在,写入redis stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop)); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { //7.释放锁 unLock(lockKey); } return shop; }
4.缓存雪崩
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
-
给不同的Key的TTL添加随机值
-
利用Redis集群提高服务的可用性
-
给缓存业务添加降级限流策略
-
给业务添加多级缓存
5.封装Redis工具类
基于StringRedisTemplate封装一个缓存工具类,满足下列需求:
-
方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
-
方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓
存击穿问题
-
方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
-
方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
将逻辑进行封装
@Slf4j @Component public class CacheClient {
private final StringRedisTemplate stringRedisTemplate; private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10); public CacheClient(StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } public void set(String key, Object value, Long time, TimeUnit unit) { stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit); } public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) { // 设置逻辑过期 RedisData redisData = new RedisData(); redisData.setData(value); redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time))); // 写入Redis stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData)); } public <R,ID> R queryWithPassThrough( String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit){ String key = keyPrefix + id; // 1.从redis查询商铺缓存 String json = stringRedisTemplate.opsForValue().get(key); // 2.判断是否存在 if (StrUtil.isNotBlank(json)) { // 3.存在,直接返回 return JSONUtil.toBean(json, type); } // 判断命中的是否是空值 if (json != null) { // 返回一个错误信息 return null; } // 4.不存在,根据id查询数据库 R r = dbFallback.apply(id); // 5.不存在,返回错误 if (r == null) { // 将空值写入redis stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES); // 返回错误信息 return null; } // 6.存在,写入redis this.set(key, r, time, unit); return r; } public <R, ID> R queryWithLogicalExpire( String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) { String key = keyPrefix + id; // 1.从redis查询商铺缓存 String json = stringRedisTemplate.opsForValue().get(key); // 2.判断是否存在 if (StrUtil.isBlank(json)) { // 3.存在,直接返回 return null; } // 4.命中,需要先把json反序列化为对象 RedisData redisData = JSONUtil.toBean(json, RedisData.class); R r = JSONUtil.toBean((JSONObject) redisData.getData(), type); LocalDateTime expireTime = redisData.getExpireTime(); // 5.判断是否过期 if(expireTime.isAfter(LocalDateTime.now())) { // 5.1.未过期,直接返回店铺信息 return r; } // 5.2.已过期,需要缓存重建 // 6.缓存重建 // 6.1.获取互斥锁 String lockKey = LOCK_SHOP_KEY + id; boolean isLock = tryLock(lockKey); // 6.2.判断是否获取锁成功 if (isLock){ // 6.3.成功,开启独立线程,实现缓存重建 CACHE_REBUILD_EXECUTOR.submit(() -> { try { // 查询数据库 R newR = dbFallback.apply(id); // 重建缓存 this.setWithLogicalExpire(key, newR, time, unit); } catch (Exception e) { throw new RuntimeException(e); }finally { // 释放锁 unlock(lockKey); } }); } // 6.4.返回过期的商铺信息 return r; } public <R, ID> R queryWithMutex( String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) { String key = keyPrefix + id; // 1.从redis查询商铺缓存 String shopJson = stringRedisTemplate.opsForValue().get(key); // 2.判断是否存在 if (StrUtil.isNotBlank(shopJson)) { // 3.存在,直接返回 return JSONUtil.toBean(shopJson, type); } // 判断命中的是否是空值 if (shopJson != null) { // 返回一个错误信息 return null; } // 4.实现缓存重建 // 4.1.获取互斥锁 String lockKey = LOCK_SHOP_KEY + id; R r = null; try { boolean isLock = tryLock(lockKey); // 4.2.判断是否获取成功 if (!isLock) { // 4.3.获取锁失败,休眠并重试 Thread.sleep(50); return queryWithMutex(keyPrefix, id, type, dbFallback, time, unit); } // 4.4.获取锁成功,根据id查询数据库 r = dbFallback.apply(id); // 5.不存在,返回错误 if (r == null) { // 将空值写入redis stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES); // 返回错误信息 return null; } // 6.存在,写入redis this.set(key, r, time, unit); } catch (InterruptedException e) { throw new RuntimeException(e); }finally { // 7.释放锁 unlock(lockKey); } // 8.返回 return r; } private boolean tryLock(String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } private void unlock(String key) { stringRedisTemplate.delete(key); } }
在ShopServiceImpl 中
@Resource private CacheClient cacheClient; @Override public Result queryById(Long id) { // 解决缓存穿透 Shop shop = cacheClient .queryWithPassThrough(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES); // 互斥锁解决缓存击穿 // Shop shop = cacheClient // .queryWithMutex(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES); // 逻辑过期解决缓存击穿 // Shop shop = cacheClient // .queryWithLogicalExpire(CACHE_SHOP_KEY, id, Shop.class, this::getById, 20L, TimeUnit.SECONDS); if (shop == null) { return Result.fail("店铺不存在!"); } // 7.返回 return Result.ok(shop); }
6.优惠卷秒杀
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题。
场景分析:如果我们的id具有太明显的规则,用户或者说商业对手很容易猜测出来我们的一些敏感信息,比如商城在一天时间内,卖出了多少单,这明显不合适。
场景分析二:随着我们商城规模越来越大,mysql的单表的容量不宜超过500W,数据量过大之后,我们要进行拆库拆表,但拆分表了之后,他们从逻辑上讲他们是同一张表,所以他们的id是不能一样的, 于是乎我们需要保证id的唯一性。
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:唯一性、高可用性、高性能、递增性、安全性。

Redis实现全局唯一Id
@Component public class RedisIdWorker { /** * 开始时间戳 */ private static final long BEGIN_TIMESTAMP = 1640995200L; /** * 序列号的位数 */ private static final int COUNT_BITS = 32; private StringRedisTemplate stringRedisTemplate; public RedisIdWorker(StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } public long nextId(String keyPrefix) { // 1.生成时间戳 LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timestamp = nowSecond - BEGIN_TIMESTAMP; // 2.生成序列号 // 2.1.获取当前日期,精确到天 String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd")); // 2.2.自增长 long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); // 3.拼接并返回 return timestamp << COUNT_BITS | count; } }
7.库存超卖问题
原有代码
if (voucher.getStock() < 1) { // 库存不足 return Result.fail("库存不足!"); } //5,扣减库存 boolean success = seckillVoucherService.update() .setSql("stock= stock -1") .eq("voucher_id", voucherId).update(); if (!success) { //扣减库存 return Result.fail("库存不足!"); }
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:而对于加锁,我们通常有两种解决方案:见下图:

悲观锁:
悲观锁可以实现对于数据的串行化执行,比如syn,和lock都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等
乐观锁:
乐观锁:会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过,当然乐观锁还有一些变种的处理方式比如cas
乐观锁的典型代表:就是cas,利用cas进行无锁化机制加锁,var5 是操作前读取的内存值,while中的var1+var2 是预估值,如果预估值 == 内存值,则代表中间没有被人修改过,此时就将新值去替换 内存值
乐观锁解决超卖问题
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update().gt("stock",0); //where id = ? and stock > 0
优惠券秒杀一人一单:
利用setnx方法进行加锁,同时增加过期时间,防止死锁,此方法可以保证加锁和增加过期时间具有原子性
private static final String KEY_PREFIX="lock:"
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = Thread.currentThread().getId()
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
-
释放锁逻辑
SimpleRedisLock
释放锁,防止删除别人的锁
public void unlock() {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标示是否一致
if(threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
修改业务代码
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
//创建锁对象(新增代码)
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
//获取锁对象
boolean isLock = lock.tryLock(1200);
//加锁失败
if (!isLock) {
return Result.fail("不允许重复下单");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
//释放锁
lock.unlock();
}
}
小总结:
基于Redis的分布式锁实现思路:
-
利用set nx ex获取锁,并设置过期时间,保存线程标示
-
释放锁时先判断线程标示是否与自己一致,一致则删除锁
-
特性:
-
利用set nx满足互斥性
-
利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
-
利用Redis集群保证高可用和高并发特性
-
-
8.分布式锁-redission功能介绍
基于setnx实现的分布式锁存在下面的问题:
重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。
不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。
超时释放:我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患
主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。
那么什么是Redission呢
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
Redission提供了分布式锁的多种多样的功能
快速入门:
引入依赖:
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.6</version> </dependency>
配置Redisson客户端:
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.150.101:6379")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
在 VoucherOrderServiceImpl
注入RedissonClient
@Resource
private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
//创建锁对象 这个代码不用了,因为我们现在要使用分布式锁
//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userId);
//获取锁对象
boolean isLock = lock.tryLock();
//加锁失败
if (!isLock) {
return Result.fail("不允许重复下单");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
//释放锁
lock.unlock();
}
}
共有 0 条评论