1.业务问题
新建一个springboot项目,写一个User的业务,看下列对redis的操作
package com.dreams.file.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.dreams.file.domain.User;
import com.dreams.file.mapper.UserMapper;
import com.dreams.file.service.UserService;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @author PoemsAndDreams
* @date 2023-11-28 20:42
*/
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
//redis的key前缀
public static final String CACHE_KEY_USER = "user:";
@Resource
private RedisTemplate redisTemplate;
public User searchUserById(Integer id) {
String key = CACHE_KEY_USER + id;
User user = new User();
//从redis查询
user = (User) redisTemplate.opsForValue().get(key);
if (user == null) {
//redis没有查询数据库
user = baseMapper.selectById(id);
//mysql没有数据
if (user == null) {
//处理key,防止多次穿透,记录下这个null值的key,列入黑名单或者记录或者异常。
//......
return user;
} else {
//将mysql数据写入redis
redisTemplate.opsForValue().set(key, user);
}
}
return user;
}
}但是上述代码存在问题
如果数据量大,高并发:
- 如果redis没有数据,就正常统统访问mysql,mysql负载瞬间变大
- 如果高并发,可能出现多个请求都发现redis没有数据,然后同时访问mysql后,同时将mysql数据写入redis,可能出现数据覆盖。
2.解决方法
双检加锁:
这里只是举个例子,实际应该考虑更多问题,比如加锁会卡顿。
package com.dreams.file.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.dreams.file.domain.User;
import com.dreams.file.mapper.UserMapper;
import com.dreams.file.service.UserService;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* @author PoemsAndDreams
* @date 2023-11-28 20:42
*/
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
//redis的key前缀
public static final String CACHE_KEY_USER = "user:";
@Resource
private RedisTemplate redisTemplate;
public User searchUserById(Integer id) {
String key = CACHE_KEY_USER + id;
User user = new User();
//从redis查询
user = (User) redisTemplate.opsForValue().get(key);
if (user != null) {
return user;
} else {
//应对高并发,加锁
synchronized (UserServiceImpl.class) {
//从redis查询,第二次检查,双检加锁
user = (User) redisTemplate.opsForValue().get(key);
//redis没有数据就查询mysql数据库
user = baseMapper.selectById(id);
//mysql没有数据
if (user == null) {
//处理key,防止多次穿透,记录下这个null值的key,列入黑名单或者记录或者异常。
//......
return user;
} else {
//将mysql数据写入redis,完成数据一次性操作,设置7天后过期,定期清理缓存并回写
redisTemplate.opsForValue().setIfAbsent(key, user, 7L, TimeUnit.DAYS);
}
}
}
return user;
}
}
3.更新策略
以下为综合考虑,实际应考虑真实情况。
先更新数据库,再更新缓存
- 容易读到脏数据,比如更新完数据库后,回写到Redis失败,或被请求数据但还来不及回写到Redis,就会读到脏数据。
- 会出现数据覆盖,比如更新数据库为数据1,此时还来不及回写到Redis,另一个请求更新数据库为数据2,同时回写到Redis为数据2,再数据1才来得及回写到Redis,最终mysql里是数据1,Redis里是数据2,同样数据不一致。
先更新缓存,再更新数据库
- 会出现数据覆盖,比如更新Redis为数据1,此时还来不及更新到数据库,另一个请求更新Redis为数据2,同时更新到数据库为数据2,再数据1才来得及更新到数据库,最终数据库里是数据1,Redis里是数据2,同样数据不一致。
- 业务上一般把mysql作为底单数据库,保证最后解释,所以不推荐先更新缓存,再更新数据库。
先删除缓存,再更新数据库
- 操作丢失,A请求删除redis缓存后,但是更新到mysql失败,同时B请求到来,但是redis已经delete了,就没有命中缓存,所以直接请求mysql,读到旧数据,同时会将旧数据回写到redis中,这样A请求的操作就丢失了。
- 就算此时A线程更新mysql成功了,会发现redis里面的缓存是脏数据。其他请求命中缓存,使用的一直还是脏数据。
这里可以采用延时双删解决
延时双删
如下图,悲观认为一定存在第二个请求回写,所以延时,再删除第二次
public boolean deleteData(Integer id){
String key=CACHE_KEY_USER+id;
//第一次删除
redisTemplate.delete(key);
baseMapper.deleteById(id);
//睡眠2秒,悲观认为一定存在第二个请求回写,线程Asleep的时间,需要大于线程B读取数据再写入缓存的时间。!
try{
TimeUnit.SECONDS.sleep(2);
}catch(InterruptedException e){
throw new RuntimeException(e);
}
//第二个删除,延时双删。
redisTemplate.delete(key);
return true;
}注意,线程Asleep的时间,就需要大于线程B读取数据再写入缓存的时间。
不过sleep时间不好估算,可以经过实际测试或使用看门狗WatchDog。
优化:
- 开启一个异步线程处理延时双删
- 使用看门狗WatchDog
先更新数据库,再删除缓存
A请求更新mysql时,同时B请求到来,命中缓存,而使用redis的数据,此时A请求才更新完mysql数据,很明显会读取到旧数据。不过A请求更新完mysql数据后会更新redis缓存的数据,所以仅仅只是B请求暂时读到旧数据,危险系数最小。
不过可能会出现删除缓存失败情况。所以可以考虑使用消息中间件。
比如:
- 先更新数据库数据
- 数据库会将操作信息写入binlog日志当中
- 消息中间件订阅程序提取出所需要的数据以及key。
- 如果删除缓存操作,发现删除失败。重新从消息队列中获得该数据,重试操作。
- 如果成功地删除或更新,将数据从消息队列中去除,避免重复操作。
4.回顾mysql主从复制
MySQL的主从复制步骤:
- master服务器将所有的数据变更操作(插入、更新、删除等)以二进制格式记录在二进制日志(binlog日志)中。
- salve从服务器会在一定时间间隔内对master主服务器上的二进制日志进行检测,如果发生数据变更操作,就开始一个I/OThread获取主服务器上的二进制日志文件,然后解析并应用其中的复制事件到自己的数据库上。
- 同时master主服务器为每个I/OThread启动一个dump Thread,用于向其发送二进制事件日志;
- slave从服务器将接收到的二进制事件日志保存至自己本地的中继日志(Relay Log)文件中,并启动另一个复制线程(SQL 线程),该线程从中继日志中读取复制事件,在从服务器上执行相同的数据更改操作,以保持与主服务器的数据一致。
- 执行完成后,I/OThread和SQL Thread进入睡眠状态,等待下一次被唤醒。

5.canal的使用
canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
比如:

官网:Home · alibaba/canal Wiki · GitHub
canal的使用ClientExample · alibaba/canal Wiki · GitHub
查看主机二进制日志
show master status

是否开放权限访问
show variables like 'log_bin';

如果未开启,执行下列操作,
开启MySQL的binlog写入功能
如果在window下,进入安装目录下

在[mysqld]下添加配置:
log-bin=mysql-bin #开启 binlog binlog-format=ROW #选择 ROW 模式 server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复

其中binlog-format的值有3种模式:
- ROW模式:在该模式下,MySQL会将每个受影响的行的变化记录到二进制日志文件中。即记录sql语句之外,还会记录每个字段的变化情况,但会占用较多的空间。
- STATEMENT模式:在该模式下,MySQL会将每个执行的语句记录到二进制日志文件中。即记录sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况;
- MIX模式比较灵活的记录,理论上说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;
再重启即可
net stop mysql net start mysql

授权canal连接MySQL账号,执行以下sql即可。
DROP USER IF EXISTS 'canal'@'%'; CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '123456'; GRANT ALL PRIVILEGES ON *.* to 'canal'@'%'; FLUSH PRIVILEGES; SELECT * FROM mysql.user;
再执行就可以了。
show variables like 'log_bin';

下载canal

下载完成后放到linux上


解压
tar -zxvf canal.deployer-1.1.6.tar.gz
解压后如图,多出几个文件夹

修改配置,在安装目录下执行命令
编辑文件
vim conf/example/instance.properties
修改mysql主机密码

修改canal账户,密码

保存退出即可。
启动canal
./bin/startup.sh

如果启动成功,会产生日志文件
cat logs/canal/canal.log

查看实例日志
cat logs/example/example.log

创建sql,在数据库canal_redis下建一个user表
create database canal_redis; use canal_redis; CREATE TABLE `user` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(100) NOT NULL, PRIMARY KEY (`id`) ) ENGINE = InnoDB AUTO_INCREMENT = 10 DEFAULT CHARSET = utf8mb4;
创建一个maven项目redis_demo
创建一个工具类来连接redis
package com.dreams.file.util;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisUtils {
public static final String REDIS_IP_ADDR = "192.168.188.201";
public static final String REDIS_pwd = "123456";
public static JedisPool jedisPool;
static {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(20);
jedisPoolConfig.setMaxIdle(10);
jedisPool = new JedisPool(jedisPoolConfig, REDIS_IP_ADDR, 6379, 10000, REDIS_pwd);
}
public static Jedis getJedis() throws Exception {
if (null != jedisPool) {
return jedisPool.getResource();
}
throw new Exception("Jedispool is not ok");
}
}一个核心类,在main方法中监控,printEntry方法打印,redisInsert,redisDelete,redisUpdate只是存储,删除,更新redis。
package com.dreams.file;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.dreams.file.util.RedisUtils;
import redis.clients.jedis.Jedis;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class RedisCanalClientExample {
public static final Integer _60SECONDS = 60;
public static final String REDIS_IP_ADDR = "192.168.188.201";
private static void redisInsert(List<Column> columns) {
JSONObject jsonObject = new JSONObject();
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
jsonObject.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
try (Jedis jedis = RedisUtils.getJedis()) {
jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static void redisDelete(List<Column> columns) {
JSONObject jsonObject = new JSONObject();
for (Column column : columns) {
jsonObject.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
try (Jedis jedis = RedisUtils.getJedis()) {
jedis.del(columns.get(0).getValue());
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static void redisUpdate(List<Column> columns) {
JSONObject jsonObject = new JSONObject();
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
jsonObject.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
try (Jedis jedis = RedisUtils.getJedis()) {
jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
System.out.println("---------update after: " + jedis.get(columns.get(0).getValue()));
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
//获取变更的row数据
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e);
}
//获取变动类型
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
} else if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
} else {//EventType.UPDATE
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}
public static void main(String[] args) {
System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
//=================================
// 创建链接canal服务端
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,
11111), "example", "", "");
int batchSize = 1000;
//空闲空转计数器
int emptyCount = 0;
System.out.println("---------------------canal init OK,开始监听mysql变化------");
try {
connector.connect();
//connector.subscribe(".*\\..*");
connector.subscribe("canal_redis.user");
connector.rollback();
int totalEmptyCount = 10 * _60SECONDS;
while (emptyCount < totalEmptyCount) {
System.out.println("我是canal,每秒一次正在监听:" + UUID.randomUUID().toString());
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//计数器重新置零
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("已经监听了" + totalEmptyCount + "秒,无任何消息,请重启重试......");
} finally {
connector.disconnect();
}
}
}这里不填,默认使用配置文件instance.properties中配置的值

一般我们配置为监控指定表

监控所有库所有表
connector.subscribe(".*\\..*");监控指定库所有表
connector.subscribe("canal_redis\\..*");监控指定库指定表
connector.subscribe("canal_redis.user");监控多规则
connector.subscribe("database1\\\\..*,database2.table1,database3.table2");
或在配置文件instance.properties中配置

先看看redis有没有数据

启动RedisCanalClientExample类

比如添加一条数据

日志如下:

这里输出的格式为这里的代码定义的

更新数据

日志如下:

在redis中
这里因为有代码操控了redis,所以存储进去了


删除数据



搞定!


