当前位置: 首页 > news >正文

分布式ID自增算法snowflake机器标识分配优化

前言

image

雪花算法的构成为

  • 第一位固定为0,表示正数
  • 41位表示时间戳,一共可以使用69年
  • 5位表示数据中心节点,5位表示机器标识,一共可以支持1024个节点
  • 12位表示一毫秒内的序列号,共4096个

如果多个服务实例使用的数据中心和机器标识都一样,那么在高并发情况下会生成重复的ID,这个问题是很严重的,所以我们需要尽量保证数据中心和机器标识给每个服务分配的不一样,这里我使用redis来存储服务ID和机器标识的映射关系,使用数据库也是可以的。

代码实现

分配器

package com.framework.manager;import com.framework.base.log.LogTreadLocal;
import com.framework.base.util.IpUtil;
import com.framework.redis.CustomRedisTemplate;
import com.framework.redis.RedisLock;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;@Slf4j
public class SnowflakeIdWorkerAllocator implements InitializingBean {//可用的id池private static final String AVAILABLE_IDS_KEY = "snowflake:available_ids";//已分配的id池private static final String ASSIGNED_IDS_KEY = "snowflake:assigned_ids";private static final String HEARTBEAT_KEY = "snowflake:heartbeat";//操作可用ID池的分布式锁private static final String AVAILABLE_ID_LOCK = "snowflake:available_id:lock";// 心跳间隔1小时(毫秒)private static final long HEARTBEAT_INTERVAL = 3600 * 1000;// 超时时间5小时(毫秒)private static final long TIMEOUT_THRESHOLD = 5 * 3600 * 1000;@Autowiredprivate CustomRedisTemplate customRedisTemplate;@Autowiredprivate RedisLock redisLock;@Autowiredprivate ApplicationContext applicationContext;/*** 当前应用分配的ID*/@Getterprivate String currentAssignedId;/*** 当前应用的实例ID*/private String currentInstanceId;@Overridepublic void afterPropertiesSet() {this.currentInstanceId = generateInstanceId();assignWorkerId();startHeartbeat();}/*** 分配机器ID*/private void assignWorkerId() {if (StringUtils.isBlank(currentInstanceId)) {log.error("snowflakeID分配过程异常,获取实例ID失败");return;}if (!customRedisTemplate.hasKey(AVAILABLE_IDS_KEY)) {log.error("snowflakeID分配过程异常,前置初始化失败,instanceId: {}", currentInstanceId);return;}String finalAssignedId;String existsAssignedId = customRedisTemplate.getHashValue(ASSIGNED_IDS_KEY, currentInstanceId);if (StringUtils.isBlank(existsAssignedId)) {String newAssignedId = (String) customRedisTemplate.popFromSet(AVAILABLE_IDS_KEY);if (Objects.isNull(newAssignedId)) {// 分配异常,配置告警log.error("snowflakeID分配过程异常,No available datacenter and worker IDs,instanceId: {}", currentInstanceId);return;}finalAssignedId = newAssignedId;} else {// 要么服务启动异常重启了,要么pod name重复了,告警,并继续往下走log.error("snowflakeID分配过程异常,当前实例已分配过 instanceId: {},assignedId: {}", currentInstanceId, existsAssignedId);finalAssignedId = existsAssignedId;}this.currentAssignedId = finalAssignedId;// 记录分配信息customRedisTemplate.setHashValue(ASSIGNED_IDS_KEY, currentInstanceId, finalAssignedId);customRedisTemplate.setHashValue(HEARTBEAT_KEY, currentInstanceId, System.currentTimeMillis());log.info("snowflakeID分配过程,Assigned datacenter and worker ID {} to instance {}", finalAssignedId, currentInstanceId);}/*** 启动心跳任务*/private void startHeartbeat() {if (StringUtils.isBlank(currentAssignedId)) {return;}ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();heartbeatExecutor.scheduleAtFixedRate(() -> {LogTreadLocal.setTrackingNo(UUID.randomUUID().toString());log.info("snowflakeID心跳任务开始处理");try {customRedisTemplate.setHashValue(HEARTBEAT_KEY, currentInstanceId, System.currentTimeMillis());reclaimExpiredIds();} catch (Exception e) {log.error("snowflakeID心跳任务处理异常", e);}}, 10000, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);}/*** 回收过期ID*/private void reclaimExpiredIds() {String lockKey = AVAILABLE_ID_LOCK;String lockVal = UUID.randomUUID().toString();boolean lockSuccess = redisLock.tryLock(lockKey, lockVal, 10);if (!lockSuccess) {log.info("snowflakeID心跳任务,有其他应用在处理");return;}log.info("snowflakeID心跳任务,开始检查过期时间");try {long now = System.currentTimeMillis();Map<Object, Object> entries = customRedisTemplate.getHashEntries(HEARTBEAT_KEY);List<String> expiredInstanceIds = new ArrayList<>();for (Map.Entry<Object, Object> entry : entries.entrySet()) {String instanceId = (String) entry.getKey();Long lastHeartbeat = (Long) entry.getValue();if (lastHeartbeat != null && (now - lastHeartbeat) > TIMEOUT_THRESHOLD) {expiredInstanceIds.add(instanceId);}}if (CollectionUtils.isNotEmpty(expiredInstanceIds)) {List<String> expiredAssignedIds = customRedisTemplate.getHashValueList(ASSIGNED_IDS_KEY, expiredInstanceIds);releaseAssignedIds(expiredInstanceIds, expiredAssignedIds);log.info("snowflakeID心跳任务,过期的实例ID为 {},要归还的ID为 {}", expiredInstanceIds, expiredAssignedIds);}} finally {redisLock.releaseLock(lockKey, lockVal);}}/*** 释放占用的ID*/private void releaseAssignedIds(List<String> instanceIds, List<String> assignedIds) {if (CollectionUtils.isNotEmpty(instanceIds)) {customRedisTemplate.deleteHashValue(ASSIGNED_IDS_KEY, instanceIds);customRedisTemplate.deleteHashValue(HEARTBEAT_KEY, instanceIds);}if (CollectionUtils.isNotEmpty(assignedIds)) {customRedisTemplate.addForSet(AVAILABLE_IDS_KEY, assignedIds);}}/*** 生成实例ID*/private String generateInstanceId() {//获取pod名称String instanceId = applicationContext.getEnvironment().getProperty("HOSTNAME");if (StringUtils.isBlank(instanceId)) {instanceId = applicationContext.getEnvironment().getProperty("POD_NAME");}if (StringUtils.isBlank(instanceId)) {//降级为服务名称+pod的IPinstanceId = applicationContext.getEnvironment().getProperty("spring.application.name") + IpUtil.getLocalIP();}log.info("snowflakeID分配过程,获取的实例ID为 {}", instanceId);return instanceId;}
}

分配池初始化接口

package com.domservice.controller;import com.framework.redis.CustomRedisTemplate;
import com.framework.redis.RedisLock;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.ArrayList;
import java.util.List;
import java.util.Objects;@RestController
@RequestMapping("/snowflake")
@Slf4j
public class SnowflakeController {@Autowiredprivate CustomRedisTemplate customRedisTemplate;@Autowiredprivate RedisLock redisLock;@PostMapping("/workerid/init")public String initWorkerId(@RequestHeader() {String AVAILABLE_IDS_KEY = "snowflake:available_ids";boolean exists = customRedisTemplate.hasKey(AVAILABLE_IDS_KEY);if (!exists) {try {List<String> allIds = new ArrayList<>();for (int i = 0; i <= 31; i++) {for (int j = 0; j <= 31; j++) {String idStr = i + ":" + j;allIds.add(idStr);}}customRedisTemplate.addForSet(AVAILABLE_IDS_KEY, allIds);log.info("snowflakeID初始化过程,Initialized available datacenter and worker IDs: {}", allIds);} catch (Exception e) {log.error("snowflakeID初始化失败", e);return "init failed";}} else {return "already inited";}return "success";}
}

我们服务部署在k8s下,使用 HOSTNAME 来获取每个pod的唯一标识,分配过程如下

  1. 首先初始化分配池,大小为1024,数据依次为 0:0,0:1,..0:15..15:15,分别表示数据中心和机器标识,表示最大支持1024个pod。
  2. 每个pod启动时从池子中随机拿出一个,并记录当前pod实例的ID->分配ID的映射关系,再记录当前实例ID的最后心跳时间。
  3. 开启一个定时心跳任务,更新当前实例ID的最后心跳时间,并检查当前已经分配的所有实例ID的最后心跳时间,是否已经过期了(最后心跳时间-当前时间>5小时),过期了需要回收此实例ID已经分配的分配ID,放到分配池。服务重启或者发版都会造成过期。

其他

实际上雪花算法还有时钟回拨的问题,不过这个问题完全解决很麻烦,我们可以部分解决,如果回拨时间小于指定时间(比如10ms),那就sleep等待系统时间追回,并添加日志告警。美团开源的 Leaf 项目就使用了这种方式。

参考

深入解析Java系统设计中的分布式ID生成方案:从Snowflake到Leaf-segment

http://www.rkmt.cn/news/183027.html

相关文章:

  • 2025.11.7
  • prometheus监控
  • Miniconda环境下安装PyTorch Text进行NLP实验
  • 利用Miniconda环境实现PyTorch模型的批量推理任务
  • C#之return
  • 【inductor】scheduler学习
  • 如何通过Docker Run命令加载Miniconda镜像并启用GPU支持
  • 解决‘CondaValueError: prefix already exists’冲突提示
  • C#之ref与out
  • PyTorch,MNIST,DataLoader,Transformer
  • 合作文章|ChIP-seq联合RNA-seq揭示FOXS1-BSCL2轴调控胆固醇代谢与炎症的新机制
  • Miniconda环境版本控制:Git跟踪environment.yml
  • 【Week2_Day5】【软件测试学习记录与反思】【坚定职业规划、数据库的了解、navicat操作、MairaDB配置、创建远程登录用户、连接服务器数据库、SQL语句练习】
  • 解码GPIO、寄存器与蜂鸣器(三极管)
  • Conda安装包冲突怎么办?用Miniconda-Python3.10构建隔离环境
  • HTML Canvas动态绘图:实时显示Miniconda训练指标
  • conda install pytorch torchvision torchaudio -c pytorch 完整命令解析
  • Jupyter Voilà将Notebook转换为独立Web应用
  • 我的私密知识库探索:为什么选择了访答
  • 【扣子Coze教程】智能出题工作流,一键生成试卷(零代码)
  • Docker diff查看Miniconda容器文件变更记录
  • GitHub Pages发布技术博客:分享Miniconda使用心得
  • SSH免密登录配置:提升频繁连接Miniconda容器效率
  • Linux nice命令调整Miniconda进程优先级
  • 对抗样本攻击详解:如何让AI模型产生错误判断
  • 精选天猫超市卡回收优质平台 - 京顺回收
  • KEDA 自动伸缩管理实践指南
  • 解决‘No space left on device’:清理Miniconda缓存
  • Java日记12月
  • 读书笔记6-11.20