Redis/MySQL 中间件深度优化与生产选型
Redis/MySQL 中间件深度优化与生产选型
一、场景痛点:中间件性能瓶颈
Redis 和 MySQL 是互联网应用中使用最广泛的中间件,承担着缓存和存储的核心职责。在高并发场景下,这两个中间件往往成为系统的性能瓶颈。
Redis 的常见问题:内存碎片化、bigkey 导致阻塞、hotkey 引发数据倾斜、持久化阻塞主线程。
MySQL 的常见问题:慢查询、锁竞争、连接池耗尽、主从延迟、索引失效。
这些问题在低并发下可能完全不会显现,但在流量高峰时会突然爆发,导致服务雪崩。本文将深入探讨 Redis 和 MySQL 的深度优化策略和选型建议。
二、底层机制与原理深度剖析
2.1 Redis 内存管理与持久化机制
flowchart TD A[客户端请求] --> B[命令解析] B --> C[内存分配] C --> D{data} subgraph 内存分配器 E[jemalloc] --> F[small class] E --> G[large class] E --> F[ huge class] end D --> E subgraph 持久化 H[RDB] --> I[BGSAVE] J[AOF] --> K[fsync] H --> L[子进程快照] end style E fill:#b8d4ff style H fill:#FFE4B5 style J fill:#FFE4B52.2 MySQL 查询优化器原理
flowchart LR A[SQL 语句] --> B[解析器] B --> C[语法树] C --> D[查询优化器] D --> E{成本估算} E -->|基于规则| F[RBO] E -->|基于代价| G[CBO] F --> H[执行计划] G --> H三、生产级代码实现与最佳实践
3.1 Redis 连接池与性能优化
// ==================== Redis 连接池配置 ==================== @Configuration public class RedisConfig { @Bean public RedisConnectionFactory redisConnectionFactory() { GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig<>(); // 连接池配置 poolConfig.setMaxTotal(200); // 最大连接数 poolConfig.setMaxIdle(50); // 最大空闲连接 poolConfig.setMinIdle(10); // 最小空闲连接 poolConfig.setMaxWait(Duration.ofMillis(3000)); // 空闲检测 poolConfig.setTestWhileIdle(true); poolConfig.setTestOnBorrow(true); // 借出检测 poolConfig.setTestOnReturn(false); poolConfig.setTestOnCreate(true); // 创建检测 // 空闲时检测 poolConfig.setMinEvictableIdleTime(Duration.ofMinutes(5)); poolConfig.setEvictableIdleTime(Duration.ofMinutes(10)); poolConfig.setNumTestsPerEvictionRun(10); // 连接耗尽策略 poolConfig.setBlockWhenExhausted(true); // Redis Cluster 配置 RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration(); clusterConfig.setClusterNodes(Arrays.asList( new RedisNode("10.0.0.1", 6379), new RedisNode("10.0.0.2", 6379), new RedisNode("10.0.0.3", 6379) )); clusterConfig.setMaxRedirects(3); clusterConfig.setPassword(RedisPassword.of("password")); return new LettuceConnectionFactory(clusterConfig, ClientConfiguration.builder() .useSsl() .connectTimeout(Duration.ofSeconds(5)) .readTimeout(Duration.ofSeconds(3)) .commandTimeout(Duration.ofSeconds(3)) .build()); } @Bean public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) { StringRedisTemplate template = new StringRedisTemplate(factory); // 开启事务 template.setExecutePipeline(true); return template; } } // ==================== Redis 管道批量操作 ==================== @Component public class RedisBatchOperations { private final StringRedisTemplate redisTemplate; public RedisBatchOperations(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } /** * 批量获取(Pipeline) * 减少网络往返次数 */ public List<String> batchGet(List<String> keys) { if (keys.isEmpty()) return Collections.emptyList(); return redisTemplate.executePipelined((RedisCallback<List<String>>) connection -> { for (String key : keys) { connection.stringCommands().get(key.getBytes()); } return null; }); } /** * 批量写入(Pipeline) */ public void batchSet(Map<String, String> keyValues) { if (keyValues.isEmpty()) return; redisTemplate.executePipelined((RedisCallback<Object>) connection -> { for (Map.Entry<String, String> entry : keyValues.entrySet()) { connection.stringCommands() .set(entry.getKey().getBytes(), entry.getValue().getBytes()); } return null; }); } /** * Lua 脚本保证原子性 */ public Long incrementIfNotExists(String key, long delta, Duration ttl) { String luaScript = """ local current = redis.call('GET', KEYS[1]) if current == false then redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2]) return 1 else return 0 end """; return redisTemplate.execute( new DefaultRedisScript<>(luaScript, Long.class), List.of(key), String.valueOf(delta), String.valueOf(ttl.toMillis()) ); } }3.2 MySQL 连接池与查询优化
# ==================== Druid 连接池配置 ==================== spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/app?useUnicode=true&characterEncoding=utf8 username: root password: password # Druid 连接池配置 druid: # 初始连接数 initial-size: 10 # 最大连接数 max-active: 100 # 最小空闲连接 min-idle: 10 # 获取连接最大等待时间 max-wait: 60000 # 连接泄漏检测 remove-abandoned: true remove-abandoned-timeout: 60 # 连接有效性检测 validation-query: SELECT 1 test-while-idle: true test-on-borrow: false test-on-return: false # 检测间隔 time-between-eviction-runs-millis: 60000 # 最小保持空闲时间 min-evictable-idle-time-millis: 300000 # 监控配置 filter: stat: enabled: true log-slow-sql: true slow-sql-millis: 2000 wall: enabled: true config: multi-statement-allow: true-- ==================== 慢查询优化示例 ==================== -- 1. 检查慢查询 SELECT query, db, exec_count, total_latency, avg_latency, rows_sent, rows_scanned FROM performance_schema.events_statements_summary_by_digest WHERE schema = 'app' ORDER BY total_latency DESC LIMIT 10; -- 2. 分析执行计划 EXPLAIN ANALYZE SELECT u.*, o.* FROM users u LEFT JOIN orders o ON u.id = o.user_id WHERE u.created_at > '2024-01-01' AND o.status = 'completed'; -- 3. 创建合适索引 CREATE INDEX idx_users_created_at ON users(created_at); CREATE INDEX idx_orders_user_id_status ON orders(user_id, status); -- 4. 分页优化(延迟关联) SELECT u.* FROM users u INNER JOIN ( SELECT id FROM users WHERE created_at > '2024-01-01' ORDER BY created_at LIMIT 100 OFFSET 1000 ) t ON u.id = t.id; -- 5. 覆盖索引避免回表 CREATE INDEX idx_user_covered ON users(id, name, email, created_at);3.3 MySQL 分库分表策略
// ==================== ShardingSphere 分库分表配置 ==================== @Configuration public class ShardingConfig { @Bean public DataSource dataSource() { Map<String, DataSource> dataSourceMap = new HashMap<>(); // 配置分库 dataSourceMap.put("ds_0", createDataSource("10.0.0.1:3306")); dataSourceMap.put("ds_1", createDataSource("10.0.0.2:3306")); // 分片规则配置 ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration(); // 订单表分片规则 TableRuleConfiguration orderTableRule = new TableRuleConfiguration("orders", "ds_${0..1}.orders_${0..15}"); orderTableRule.setTableShardingStrategyConfig( new StandardShardingStrategyConfiguration( "user_id", new PreciseShardingAlgorithm<Long>() { @Override public String doSharding( Collection<String> availableTargetNames, PreciseShardingValue<Long> value) { long shardId = value.getValue() % 16; int dbIndex = (int) (shardId / 8); return "ds_" + dbIndex; } }, new RangeShardingAlgorithm<Long>() { @Override public Collection<String> doSharding( Collection<String> availableTargetNames, RangeShardingValue<Long> value) { // 范围查询广播到所有库 return availableTargetNames; } } ) ); shardingRuleConfig.getTableRuleConfigs().add(orderTableRule); // 默认数据库策略 shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig( new InlineShardingStrategyConfiguration( "user_id", "ds_${user_id % 2}" ) ); // 绑定表(避免跨库 join) shardingRuleConfig.getBindingTableGroups().add("orders,order_items"); try { return ShardingDataSourceFactory.createDataSource( dataSourceMap, shardingRuleConfig, new Properties() ); } catch (Exception e) { throw new RuntimeException(e); } } }3.4 缓存与数据库一致性
// ==================== Cache-Aside 模式实现 ==================== @Service public class UserService { private final StringRedisTemplate redisTemplate; private final UserMapper userMapper; private static final String USER_CACHE_KEY = "user:"; private static final Duration CACHE_TTL = Duration.ofMinutes(30); /** * Cache-Aside 读流程 * 1. 先查缓存 * 2. 缓存未命中,查数据库 * 3. 写入缓存 */ public User getUserById(Long userId) { String cacheKey = USER_CACHE_KEY + userId; // 1. 查缓存 User cached = getCachedUser(cacheKey); if (cached != null) { return cached; } // 2. 查数据库 User user = userMapper.selectById(userId); if (user == null) { return null; } // 3. 写缓存 cacheUser(cacheKey, user); return user; } /** * 更新时删除缓存(而非更新) * 避免并发下的缓存不一致 */ @Transactional public void updateUser(User user) { // 1. 更新数据库 userMapper.updateById(user); // 2. 删除缓存 String cacheKey = USER_CACHE_KEY + user.getId(); redisTemplate.delete(cacheKey); // 3. 延迟双删确保一致性 CompletableFuture.runAsync(() -> { try { Thread.sleep(200); redisTemplate.delete(cacheKey); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } /** * 分布式锁保证缓存重建的原子性 */ public User getUserWithLock(Long userId) { String cacheKey = USER_CACHE_KEY + userId; // 1. 尝试从缓存获取 User cached = getCachedUser(cacheKey); if (cached != null) { return cached; } String lockKey = "lock:" + cacheKey; String lockValue = UUID.randomUUID().toString(); // 2. 获取分布式锁 Boolean acquired = redisTemplate.opsForValue() .setIfAbsent(lockKey, lockValue, Duration.ofSeconds(10)); if (Boolean.TRUE.equals(acquired)) { try { // Double Check cached = getCachedUser(cacheKey); if (cached != null) { return cached; } // 3. 查数据库 User user = userMapper.selectById(userId); // 4. 写入缓存 if (user != null) { cacheUser(cacheKey, user); } else { // 缓存空结果,避免缓存穿透 cacheEmpty(cacheKey); } return user; } finally { // 5. 释放锁 redisTemplate.delete(lockKey); } } else { // 等待其他线程重建缓存 try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return getUserById(userId); } } private User getCachedUser(String key) { String json = redisTemplate.opsForValue().get(key); if (json == null) { return null; } if ("{}".equals(json)) { return null; // 空结果 } return JSON.parseObject(json, User.class); } private void cacheUser(String key, User user) { redisTemplate.opsForValue().set(key, JSON.toJSONString(user), CACHE_TTL); } private void cacheEmpty(String key) { // 短过期时间的空缓存,防止缓存穿透 redisTemplate.opsForValue().set(key, "{}", Duration.ofMinutes(5)); } }四、边界分析与 Trade-offs
4.1 Redis vs Memcached
| 维度 | Redis | Memcached |
|---|---|---|
| 数据类型 | 多种 | 字符串 |
| 持久化 | 支持 | 不支持 |
| 集群 | 原生 Cluster | 需客户端分片 |
| 性能 | 稍低 | 稍高 |
| 内存管理 | 更复杂 | 简单 |
4.2 MySQL 存储引擎选择
| 引擎 | 适用场景 | 特点 |
|---|---|---|
| InnoDB | OLTP | 事务支持、行锁 |
| MyISAM | 读多写少 | 表锁、空间节省 |
| Memory | 临时表 | 内存存储 |
| TokuDB | 大数据 | 压缩率高 |
五、总结
Redis 和 MySQL 的深度优化是后端工程师的必备技能:
- 连接池管理:合理配置连接池参数,避免连接耗尽
- 索引优化:基于 Explain 分析创建合适索引
- 缓存策略:合理使用缓存,避免缓存穿透/击穿/雪崩
- 分库分表:根据业务增长规划数据分片
- 一致性保证:在性能和一致性间权衡
中间件优化没有银弹,需要基于业务场景和数据特点进行针对性调优。
