当前位置: 首页 > news >正文

实现kvstore的持久化功能:全量持久化和增量持久化

目录

一、设计思路

二、核心代码

三、测试功能


一、设计思路

数据结构:使用哈希表(链式地址法解决哈希冲突)存储键值对
全量持久化和增量持久化的核心流程:
增删改操作:先写 WAL 日志 → 再更新内存哈希表
Checkpoint:全量持久化内存数据 → 截断 WAL 日志(清空)
重启恢复:加载全量文件 → 重放 WAL 日志 → 恢复增量数据
异步 Checkpoint:后台线程执行全量持久化,避免阻塞前台写操作。

全量持久化(Checkpoint)的触发条件:
后台线程checkpoint_worker循环检查触发条件,满足任一条件则执行 Checkpoint:
定时触发:每隔CHECKPOINT_INTERVAL(30 秒)执行一次。
条件触发:WAL 文件大小≥10MB。

二、核心代码

实现全量持久化的两个函数:

int kvs_hash_persist(hashtable_t *hash) { if (hash == NULL) return -1; FILE *fp = fopen(PERSIST_FILE, "w"); if (fp == NULL) return -1; // 遍历所有哈希桶 for (size_t i = 0; i < hash->max_slots; i++) { hashnode_t *curr = hash->nodes[i]; while (curr != NULL) { // 写入格式:key value\n if (fprintf(fp, "%s %s\n", curr->key, curr->value) < 0) { fclose(fp); return -1; } curr = curr->next; } } fflush(fp);//用户态缓冲区→内核缓冲区 的刷新,参数是流指针FILE * fclose(fp); return 0; } int kvs_hash_load(hashtable_t *hash) { if (hash == NULL) return -1; FILE *fp = fopen(PERSIST_FILE, "r"); if (fp == NULL) return -1; char line[4096]; // 每行最大长度(可根据需求调整) while (fgets(line, sizeof(line), fp) != NULL) { // 去除换行符 int len = strlen(line); if(len==0)continue;//跳过空行 if (len > 0 && line[len - 1] == '\n') { line[len - 1] = '\0'; } // 分割键和值(空格" "为分隔符) char *key = strtok(line, " "); char *value = strtok(NULL, " "); if (key == NULL || value == NULL) { /*fclose(fp); return -1;*/ continue; } // 插入到KVStore,hash存储引擎先初始化才能调用如下函数,否则报段错误 if (kvs_hash_set(hash, key, value) != 0) { /*fclose(fp); return -1;*/ continue; } } fclose(fp); return 0; }

实现增量持久化的两个函数:

int kvs_hash_logpersist(char *op_type,char *key,char *value){ FILE *fp=fopen(WAL_FILE,"a");//权限a:只追加写;权限a+:追加写+可读 if(!fp)return -1; if(value){//增/改操作 fprintf(fp,"%s %s %s\n",op_type,key,value); }else {//删除操作 fprintf(fp,"%s %s\n",op_type,key); } fflush(fp); //fsync(fp); fclose(fp); } int kvs_hash_logreplay(hashtable_t *hash){ FILE *fp=fopen(WAL_FILE,"r"); if(!fp)return -1; char line[1024];//行缓冲区 int count=0;//重放计数器 //逐行读取WAL日志文件 while(fgets(line,sizeof(line),fp)){ // 去除行末的换行符 line[strcspn(line,"\n")]='\0'; //跳过空行 if(strlen(line)==0)continue; char *data=strdup(line);//strtok会改变原字符串 char *op_type=strtok(data," "); char *key=strtok(NULL," "); char *value=strtok(NULL," "); if(!op_type||!key)continue; if(strcmp(op_type,"HSET")==0||strcmp(op_type,"HMOD")==0){ if(value){ kvs_hash_set(hash,key,value); count++; } }else if(strcmp(op_type,"HDEL")==0){ kvs_hash_del(hash,key); count++; } free(data); } fclose(fp); return count; }

后台线程实现全量持久化的入口函数:

// -------------------------- Checkpoint后台线程 -------------------------- //获取文件大小 static int get_file_size(char *filename){ struct stat stat_buf; if(stat(filename,&stat_buf)==-1)return -1; return (int)stat_buf.st_size; } // 检查Checkpoint触发条件 static int is_checkpoint_needed(hashtable_t *hash) { // 1. 检查WAL文件大小 int wal_size = get_file_size(WAL_FILE); if (wal_size >= WAL_SIZE_THRESHOLD) return 1; return 0; } // Checkpoint后台线程函数 void *checkpoint_worker(void *arg) { hashtable_t *hash = (hashtable_t *)arg; time_t last_checkpoint = time(NULL);//返回的单位是秒s while (ENABLE_PERSIST) { time_t now = time(NULL); //以下两者都是满足条件,进行全量持久化并截断WAL日志(清空) // 触发条件1:定时(间隔30秒) if (difftime(now, last_checkpoint) >= CHECKPOINT_INTERVAL) { kvs_hash_persist(hash); FILE *fp=fopen(WAL_FILE,"w+"); if(fp==NULL)return NULL; fclose(fp); last_checkpoint = now; sleep(1); // 避免频繁触发 continue; } // 触发条件2:WAL大小/ KV数量阈值 if (is_checkpoint_needed(hash)) { kvs_hash_persist(hash); FILE *fp=fopen(WAL_FILE,"w+"); if(fp==NULL)return NULL; fclose(fp); last_checkpoint = now; sleep(1); continue; } // 每秒检查一次 sleep(1); } return NULL; }

创建线程进行异步全量持久化,并在重新运行kvstore时恢复数据(加载全量和增量文件)

int init_persist(void){ //创建线程 pthread_t pid; pthread_create(&pid,NULL,checkpoint_worker,(void *)&Hash); pthread_detach(pid); //恢复数据(加载全量 + 重放WAL增量日志) kvstore_hash_load(); kvstore_hash_logreplay(); }

kvstore解析协议函数,增删改操作时先进行增量持久化再进行内存写操作

//“CRUD” 对应 Create(创建)、Read(读取)、Update(更新)、Delete(删除) 四个关键动作 int kvstore_parser_protocol(struct conn_item *item,char **tokens,int count){ if(item==NULL||tokens==NULL||count==0)return -1; int cmd; for(cmd=KVS_CMD_START;cmd<KVS_CMD_SIZE;cmd++){ if(strcmp(commands[cmd],tokens[0])==0){//strcmp当中传的就是地址,自动根据地址比较值 break; } } char *msg=item->wbuffer; //memset(msg,0,BUFFER_LENGTH); int len=strlen(msg); char *key=tokens[1]; char *value=tokens[2]; //printf("cmd=%d,commands=%s\n",cmd,commands[cmd]); switch(cmd){ 。。。。。。 //hash case KVS_CMD_HSET:{ //printf("set\n"); #if ENABLE_PERSIST kvs_hash_logpersist(tokens[0],key,value); #endif int res=kvstore_hash_set(key,value); //需要给客户端回信息 if(res==0){ snprintf(msg+len,BUFFER_LENGTH-len,"SUCCESS\r\n");//指定缓冲区最大容量,避免缓冲区溢出 }else{ snprintf(msg+len,BUFFER_LENGTH-len,"FAILED\r\n"); } break; } case KVS_CMD_HGET:{ //printf("get\n"); char *val=kvstore_hash_get(key); if(val){//value不为空 snprintf(msg+len,BUFFER_LENGTH-len,"%s\r\n",val); }else{ snprintf(msg+len,BUFFER_LENGTH-len,"NO EXIST\r\n"); } //LOG("get:%s\n",val); break; } case KVS_CMD_HDEL:{ //printf("del\n"); #if ENABLE_PERSIST kvs_hash_logpersist(tokens[0],key,value); #endif int res=kvstore_hash_del(key); if(res<0){ snprintf(msg+len,BUFFER_LENGTH-len,"ERROR\r\n"); }else if(res==0){ snprintf(msg+len,BUFFER_LENGTH-len,"SUCCESS\r\n"); }else { snprintf(msg+len,BUFFER_LENGTH-len,"NO EXIST\r\n"); } break; } case KVS_CMD_HMOD:{ //printf("mod\n"); #if ENABLE_PERSIST kvs_hash_logpersist(tokens[0],key,value); #endif int res=kvstore_hash_mod(key,value); if(res<0){ snprintf(msg+len,BUFFER_LENGTH-len,"ERROR\r\n"); }else if(res==0){ snprintf(msg+len,BUFFER_LENGTH-len,"SUCCESS\r\n"); }else { snprintf(msg+len,BUFFER_LENGTH-len,"NO EXIST\r\n"); } break; } case KVS_CMD_HCOUNT:{ int count=kvstore_hash_count(); if(count<0){ snprintf(msg+len,BUFFER_LENGTH-len,"ERROR\r\n"); }else { snprintf(msg+len,BUFFER_LENGTH-len,"%d\r\n",count);//"%d",count } break; } default:{ //LOG("cmd:%s\n",commands[cmd]); assert(0); } } }

三、测试功能

上次运行kvstore服务器是已通过协议存入key-value对并进行了持久化:1-2,2-3,3-4

重新运行,测试恢复数据情况

http://www.rkmt.cn/news/83754.html

相关文章:

  • 摄影师必备Lightroom修图软件最新版下载与安装指南
  • unity运行后笔记本风扇声音太大的解决办法
  • 故障处理:Oracle ADG 主库想备库传输日志的归档路径禁用的报错
  • 5种必知的前端数据加密防护技术:从React安全到浏览器原生方案
  • Windows11安装docker
  • Cameralink采集软件-Espeedgrab软件应用【2.存储图片和视频】
  • AcWing 846:树的重心 ← 类似“东方博宜OJ 2190:树的重心”代码
  • 容器化部署在软件许可优化中的应用:跨部门资源共享实践
  • 2025年可观测平台选型指南:头部厂商综合测评与推荐
  • docker启动mysql及部分命令回顾
  • Teams Agent开发避坑指南,90%新手都会忽略的3大陷阱
  • 直播带货APP开发的核心流程:推流端、观看端与运营端后台搭建指南
  • 用循环神经网络生成0^n 1^n形式的简单序列
  • AcWing 846:树的重心 ← 链式前向星 or 邻接表
  • 251211
  • Python自然语言处理的未来:技术栈与开发范式
  • 观察者模式
  • 2025年东莞优质的铝门窗批发选哪家,安全门窗/铝门窗/慕莎尼奥门窗/窗纱一体铝门窗/门窗/铝门窗品牌选哪家 - 品牌推荐师
  • 2025.12.11总结
  • 124_尚硅谷_闭包的基本介绍
  • One Year XTOOL D9S Update Service: Keep Diagnostics Up-to-Date for EU US Vehicles
  • 2025年数控车床品牌新格局,机械手集成能力排行揭晓,动力刀塔数控车/牙科配件数控车床/新能源数控车床/军工配件数控机床数控车床设计怎么选择 - 品牌推荐师
  • 如何确定arm固件的加载地址
  • 2025年国内靠谱的门窗源头厂家推荐,全屋门窗/环保门窗/复古门窗/极简门窗/欧式门窗/智能门窗/门窗直销厂家找哪家 - 品牌推荐师
  • 基于协同过滤推荐算法的求职招聘推荐系统u1ydn3f4(程序、源码、数据库、调试部署优秀的方案及开发环境)系统界面展示及获取方式置于文档末尾,可供参考。
  • 12.11笔记
  • 中国人工智能学会推荐国际学术会议和国际/国内期刊目录
  • 蓝桥杯-Python-题目整理2
  • 喵喵喵 XI
  • 深度学习方法在语音识别中的全面解析