redis缓存双写

1.业务问题

新建一个springboot项目,写一个User的业务,看下列对redis的操作

package com.dreams.file.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.dreams.file.domain.User;
import com.dreams.file.mapper.UserMapper;
import com.dreams.file.service.UserService;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * @author PoemsAndDreams
 * @date 2023-11-28 20:42
 */
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
    //redis的key前缀
    public static final String CACHE_KEY_USER = "user:";
    @Resource
    private RedisTemplate redisTemplate;

    public User searchUserById(Integer id) {
        String key = CACHE_KEY_USER + id;
        User user = new User();
        //从redis查询
        user = (User) redisTemplate.opsForValue().get(key);
        if (user == null) {
            //redis没有查询数据库
            user = baseMapper.selectById(id);
            //mysql没有数据
            if (user == null) {
                //处理key,防止多次穿透,记录下这个null值的key,列入黑名单或者记录或者异常。
                //......
                return user;
            } else {
                //将mysql数据写入redis
                redisTemplate.opsForValue().set(key, user);
            }
        }
        return user;
    }
}
但是上述代码存在问题
如果数据量大,高并发:
  • 如果redis没有数据,就正常统统访问mysql,mysql负载瞬间变大
  • 如果高并发,可能出现多个请求都发现redis没有数据,然后同时访问mysql后,同时将mysql数据写入redis,可能出现数据覆盖。

2.解决方法

双检加锁:

这里只是举个例子,实际应该考虑更多问题,比如加锁会卡顿。

package com.dreams.file.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.dreams.file.domain.User;
import com.dreams.file.mapper.UserMapper;
import com.dreams.file.service.UserService;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

/**
 * @author PoemsAndDreams
 * @date 2023-11-28 20:42
 */
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
    //redis的key前缀
    public static final String CACHE_KEY_USER = "user:";
    @Resource
    private RedisTemplate redisTemplate;

    public User searchUserById(Integer id) {
        String key = CACHE_KEY_USER + id;
        User user = new User();
        //从redis查询
        user = (User) redisTemplate.opsForValue().get(key);
        if (user != null) {
            return user;
        } else {
            //应对高并发,加锁
            synchronized (UserServiceImpl.class) {
                //从redis查询,第二次检查,双检加锁
                user = (User) redisTemplate.opsForValue().get(key);
                //redis没有数据就查询mysql数据库
                user = baseMapper.selectById(id);
                //mysql没有数据
                if (user == null) {
                    //处理key,防止多次穿透,记录下这个null值的key,列入黑名单或者记录或者异常。

                    //......

                    return user;
                } else {
                    //将mysql数据写入redis,完成数据一次性操作,设置7天后过期,定期清理缓存并回写
                    redisTemplate.opsForValue().setIfAbsent(key, user, 7L, TimeUnit.DAYS);
                }
            }
        }
        return user;
    }
}

 

 

3.更新策略

以下为综合考虑,实际应考虑真实情况。

先更新数据库,再更新缓存

  • 容易读到脏数据,比如更新完数据库后,回写到Redis失败,或被请求数据但还来不及回写到Redis,就会读到脏数据。
  • 会出现数据覆盖,比如更新数据库为数据1,此时还来不及回写到Redis,另一个请求更新数据库为数据2,同时回写到Redis为数据2,再数据1才来得及回写到Redis,最终mysql里是数据1,Redis里是数据2,同样数据不一致。

先更新缓存,再更新数据库

  • 会出现数据覆盖,比如更新Redis为数据1,此时还来不及更新到数据库,另一个请求更新Redis为数据2,同时更新到数据库为数据2,再数据1才来得及更新到数据库,最终数据库里是数据1,Redis里是数据2,同样数据不一致。
  • 业务上一般把mysql作为底单数据库,保证最后解释,所以不推荐先更新缓存,再更新数据库。

先删除缓存,再更新数据库

  • 操作丢失,A请求删除redis缓存后,但是更新到mysql失败,同时B请求到来,但是redis已经delete了,就没有命中缓存,所以直接请求mysql,读到旧数据,同时会将旧数据回写到redis中,这样A请求的操作就丢失了。
  • 就算此时A线程更新mysql成功了,会发现redis里面的缓存是脏数据。其他请求命中缓存,使用的一直还是脏数据。
这里可以采用延时双删解决

延时双删

如下图,悲观认为一定存在第二个请求回写,所以延时,再删除第二次

public boolean deleteData(Integer id){
    String key=CACHE_KEY_USER+id;
    //第一次删除
    redisTemplate.delete(key);
    baseMapper.deleteById(id);
    //睡眠2秒,悲观认为一定存在第二个请求回写,线程Asleep的时间,需要大于线程B读取数据再写入缓存的时间。!
    try{
        TimeUnit.SECONDS.sleep(2);
    }catch(InterruptedException e){
        throw new RuntimeException(e);
    }
    //第二个删除,延时双删。
    redisTemplate.delete(key);
    return true;
}
注意,线程Asleep的时间,就需要大于线程B读取数据再写入缓存的时间。
不过sleep时间不好估算,可以经过实际测试或使用看门狗WatchDog。
优化:
  • 开启一个异步线程处理延时双删
  • 使用看门狗WatchDog

先更新数据库,再删除缓存

A请求更新mysql时,同时B请求到来,命中缓存,而使用redis的数据,此时A请求才更新完mysql数据,很明显会读取到旧数据。不过A请求更新完mysql数据后会更新redis缓存的数据,所以仅仅只是B请求暂时读到旧数据,危险系数最小。
不过可能会出现删除缓存失败情况。所以可以考虑使用消息中间件
比如:
  1. 先更新数据库数据
  2. 数据库会将操作信息写入binlog日志当中
  3. 消息中间件订阅程序提取出所需要的数据以及key。
  4. 如果删除缓存操作,发现删除失败。重新从消息队列中获得该数据,重试操作。
  5. 如果成功地删除或更新,将数据从消息队列中去除,避免重复操作。

 

4.回顾mysql主从复制

 

MySQL的主从复制步骤:

  1. master服务器将所有的数据变更操作(插入、更新、删除等)以二进制格式记录在二进制日志(binlog日志)中。
  2. salve从服务器会在一定时间间隔内对master主服务器上的二进制日志进行检测,如果发生数据变更操作,就开始一个I/OThread获取主服务器上的二进制日志文件,然后解析并应用其中的复制事件到自己的数据库上。
  3. 同时master主服务器为每个I/OThread启动一个dump Thread,用于向其发送二进制事件日志;
  4. slave从服务器将接收到的二进制事件日志保存至自己本地的中继日志(Relay Log)文件中,并启动另一个复制线程(SQL 线程),该线程从中继日志中读取复制事件,在从服务器上执行相同的数据更改操作,以保持与主服务器的数据一致。
  5. 执行完成后,I/OThread和SQL Thread进入睡眠状态,等待下一次被唤醒。

 

5.canal的使用

canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

比如:

官网:Home · alibaba/canal Wiki · GitHub

canal的使用ClientExample · alibaba/canal Wiki · GitHub

 

查看主机二进制日志

show master status

 

是否开放权限访问

show variables like 'log_bin';

如果未开启,执行下列操作,

开启MySQL的binlog写入功能

如果在window下,进入安装目录下

在[mysqld]下添加配置:

log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式
server_id=1    #配置MySQL replaction需要定义,不要和canal的 slaveId重复

 

 

其中binlog-format的值有3种模式:

  • ROW模式:在该模式下,MySQL会将每个受影响的行的变化记录到二进制日志文件中。即记录sql语句之外,还会记录每个字段的变化情况,但会占用较多的空间。
  • STATEMENT模式:在该模式下,MySQL会将每个执行的语句记录到二进制日志文件中。即记录sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况;
  • MIX模式比较灵活的记录,理论上说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;

 

再重启即可

net stop mysql
net start mysql

 

授权canal连接MySQL账号,执行以下sql即可。

DROP USER IF EXISTS 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '123456';
GRANT ALL PRIVILEGES ON *.* to 'canal'@'%';
FLUSH PRIVILEGES;
SELECT * FROM mysql.user;

 

再执行就可以了。

show variables like 'log_bin';

下载canal
下载完成后放到linux上
解压
tar -zxvf canal.deployer-1.1.6.tar.gz
解压后如图,多出几个文件夹
修改配置,在安装目录下执行命令
编辑文件
vim conf/example/instance.properties
修改mysql主机密码
修改canal账户,密码
保存退出即可。
启动canal
./bin/startup.sh
如果启动成功,会产生日志文件
cat logs/canal/canal.log
查看实例日志
cat logs/example/example.log
创建sql,在数据库canal_redis下建一个user表
create database canal_redis;
use canal_redis;
CREATE TABLE `user`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(100) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 10
DEFAULT CHARSET = utf8mb4;
创建一个maven项目redis_demo
创建一个工具类来连接redis
package com.dreams.file.util;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;


public class RedisUtils {
    public static final String REDIS_IP_ADDR = "192.168.188.201";
    public static final String REDIS_pwd = "123456";
    public static JedisPool jedisPool;

    static {
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(20);
        jedisPoolConfig.setMaxIdle(10);
        jedisPool = new JedisPool(jedisPoolConfig, REDIS_IP_ADDR, 6379, 10000, REDIS_pwd);
    }

    public static Jedis getJedis() throws Exception {
        if (null != jedisPool) {
            return jedisPool.getResource();
        }
        throw new Exception("Jedispool is not ok");
    }
}
一个核心类,在main方法中监控,printEntry方法打印,redisInsert,redisDelete,redisUpdate只是存储,删除,更新redis。
package com.dreams.file;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.dreams.file.util.RedisUtils;
import redis.clients.jedis.Jedis;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

public class RedisCanalClientExample {
    public static final Integer _60SECONDS = 60;
    public static final String REDIS_IP_ADDR = "192.168.188.201";

    private static void redisInsert(List<Column> columns) {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "  update=" + column.getUpdated());
            jsonObject.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            try (Jedis jedis = RedisUtils.getJedis()) {
                jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static void redisDelete(List<Column> columns) {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns) {
            jsonObject.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            try (Jedis jedis = RedisUtils.getJedis()) {
                jedis.del(columns.get(0).getValue());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static void redisUpdate(List<Column> columns) {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "  update=" + column.getUpdated());
            jsonObject.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            try (Jedis jedis = RedisUtils.getJedis()) {
                jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
                System.out.println("---------update after: " + jedis.get(columns.get(0).getValue()));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            RowChange rowChage = null;
            try {
                //获取变更的row数据
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e);
            }
            //获取变动类型
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList());
                } else if (eventType == EventType.DELETE) {
                    redisDelete(rowData.getBeforeColumnsList());
                } else {//EventType.UPDATE
                    redisUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }

    public static void main(String[] args) {
        System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
        //=================================
        // 创建链接canal服务端
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,
                11111), "example", "", "");
        int batchSize = 1000;
        //空闲空转计数器
        int emptyCount = 0;
        System.out.println("---------------------canal init OK,开始监听mysql变化------");
        try {
            connector.connect();
            //connector.subscribe(".*\\..*");
            connector.subscribe("canal_redis.user");
            connector.rollback();
            int totalEmptyCount = 10 * _60SECONDS;
            while (emptyCount < totalEmptyCount) {
                System.out.println("我是canal,每秒一次正在监听:" + UUID.randomUUID().toString());
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //计数器重新置零
                    emptyCount = 0;
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("已经监听了" + totalEmptyCount + "秒,无任何消息,请重启重试......");
        } finally {
            connector.disconnect();
        }
    }
}
这里不填,默认使用配置文件instance.properties中配置的值
一般我们配置为监控指定表
监控所有库所有表
connector.subscribe(".*\\..*");
监控指定库所有表
connector.subscribe("canal_redis\\..*");
监控指定库指定表
connector.subscribe("canal_redis.user");
监控多规则
connector.subscribe("database1\\\\..*,database2.table1,database3.table2");

 

或在配置文件instance.properties中配置

先看看redis有没有数据
启动RedisCanalClientExample类
比如添加一条数据
日志如下:
这里输出的格式为这里的代码定义的
更新数据
日志如下:
在redis中
这里因为有代码操控了redis,所以存储进去了
删除数据
搞定!
暂无评论

发送评论 编辑评论

|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇