热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

LevelDB顺序存储操作Golang模块封装及持久化队列实现

LevelDB介绍Leveldb是一个google实现的非常高效的kv数据库,能够支持billion级别的数据量。在这个数量级别下还有着非常高的性能。LevelDB是单进程的服务,


LevelDB介绍

Leveldb是一个google实现的非常高效的kv数据库,能够支持billion级别的数据量。 在这个数量级别下还有着非常高的性能。

LevelDB 是单进程的服务,性能非常之高,在一台4个Q6600的CPU机器上,每秒钟写数据超过40w,而随机读的性能每秒钟超过10w。

内部LSM 树算法实现。

LSM 大致结构如上图所示。

LSM 树而且通过批量存储技术规避磁盘随机写入问题。 LSM 树的设计思想非常朴素, 它的原理是把一颗大树拆分成N棵小树, 它首先写入到内存中(内存没有寻道速度的问题,随机写的性能得到大幅提升),在内存中构建一颗有序小树,随着小树越来越大,内存的小树会flush到磁盘上。磁盘中的树定期可以做 merge 操作,合并成一棵大树,以优化读性能【读数据的过程可能需要从内存 memtable 到磁盘 sstfile 读取多次,称之为读放大】。LevelDB 的 LSM 体现在多 level 文件格式上,最热最新的数据尽在 L0 层,数据在内存中,最冷最老的数据尽在 LN 层,数据在磁盘或者固态盘上。还有一种日志文件叫做 manifest,用于记录对 sstfile 的更改,可以认为是 LevelDB 的 GIF。

LevelDB是Google的 Jeff Dean和Sanjay Ghemawat设计开发的key-value存储引擎。

LevelDB底层存储利用了LSM tree的思想, RocksDB是Facebook基于LevelDB开发的存储引擎,针对LevelDB做了很多优化,但是大部分模块的实现机制是一样的。

LevelDB是一个持久化存储的KV系统,和Redis这种内存型的KV系统不同,LevelDB不会像Redis一样狂吃内存,而是将大部分数据存储到磁盘上。LevleDB在存储数据时,是根据记录的key值有序存储的,就是说相邻的key值在存储文件中是依次顺序存储的,而应用可以自定义key大小比较函数,LevleDB会按照用户定义的比较函数依序存储这些记录。

像大多数KV系统一样,LevelDB的操作接口简单,基本操作包括写记录、读记录以及删除记录。另外,LevelDB支持数据快照(snapshot)功能,使得读取操作不受写操作影响,可以在读操作过程中始终看到一致的数据。除此之外,LevelDB还支持数据压缩等操作,这对于减小存储空间以及增快IO效率都有直接的帮助。


对LevelDB进一步封装的理由

虽然LevelDB很强大,但是有点儿底层的味道了,操作不够友好,就几个简单的put,get接口。比如想实现个持久化的顺序操作的队列,想按顺序存储和读取记录,比如不用关心底层的操作,仅使用接口。比如如果哪天想替换底层存储,可以灵活一点儿,不用改动业务......

这里对LevelDB记录存储做一个封装,并实现了持久化的消息队列。同时也可以用在嵌入式linux上,替代sqllite做一些记录存储的模块功能。

封装后可供使用的接口:

// Recorder 操作记录的接口声明
```
type Recorder interface {
// 初始化记录区(会清空所有数据!)

InitRecAreas() error
// 打开记录区(开机必须先打开一次)
OpenRecAreas() (err error)
// 保存记录 areaID,存储区ID,取值从1--至--MAXRECAREAS(相当于一个表)
SaveRec(areaID RecArea, data interface{}, recType int) (id int, err error)
// 更新记录
UpdateRec(areaID RecArea, recID int, data interface{}, recType int) (id int, err error)
// 删除记录
DeleteRec(areaID RecArea, num int) (err error)
// 获取未上传记录数量
GetNoUploadNum(areaID RecArea) int
// 按数据库中的ID读取一条记录
ReadRecByID(areaID RecArea, id int) (p *Records, err error)
// 顺序读取未上传的记录
ReadRecNotServer(areaID RecArea, sn int) (p *Records, err error)
// 倒数读取记录(如sn=1代表最后一次写入的记录)
ReadRecWriteNot(areaID RecArea, sn int) (p *Records, err error)
// 最后一条记录流水
GetLastRecNO(areaID RecArea) int
// 获取当前的读、写ID
GetReadWriteID(areaID RecArea) (rid, wid int)
}
```

实现原理

单独维护一个key,类似于目录的概念,作为全局使用,记录当前的写的位置,读的位置,是否是循环覆盖写。

如图所示,RecDirTB相当于目录表,这里分配了三个对应可以有三个不同的表/存储区/队列供使用。

sn是记录流水号,wid当前写的位置,rid当前读(消费)的位置。flag是否循环覆盖写标识。

可指定容量,比如为100W,这样可做到记录循环覆盖写,记录留有底可查。

对应的记录区1的表名为Rec01TB,,后面的|xx为顺序存储的记录的id.

value为存储的json格式的内容。

格式为id,sn(记录流水),type(记录类型),time(操作时间),data(记录内容,任意的json格式数据),ext,res留作备注用。

 

测试的写入性能,封装后测试,并发写入1000条记录,总共耗时30ms左右。 

github地址:https://github.com/yangyongzhen/dbmod


简单的使用demo:

package main
import "fmt"
var (
recApi dbmod.Recorder
dataType int
)
// RequestQrcode模拟写入的数据内容json
type RequestQrcode struct {
ChanNO string `json:"chanNO"`
TermID string `json:"termID"`
Qrcode string `json:"qrcode"`
Money uint32 `json:"money"`
Recsn uint32 `json:"recsn"`
Orderno string `json:"orderno"`
Dealtime string `json:"dealtime"`
}
func main() {

//每次必须先打开存储区
err := recApi.OpenRecAreas()
if err != nil {
fmt.Printf("OpenRecAreas error,%s\n", err.Error())
}

data := RequestQrcode{}
data.ChanNO = "YS_CHANaaa"
data.TermID = "12345678"
data.Recsn = uint32(sn)
data.Qrcode = "6225882618789"
data.MOney= 1
dataType = 0x0A
// 按队列顺序写入一条记录,data为interface{},会序列化为json存储
id, err := recApi.SaveRec(dbmod.RecArea01, data, datatype)
if err != nil {
fmt.Printf("SaveRec error,%s\n", err.Error())
}
//按队列顺序读取一条记录
rec, err := recApi.ReadRecNotServer(dbmod.RecArea01, 1)
if err != nil {
fmt.Printf("ReadRecNotServer error,%s\n", err.Error())
}
fmt.Printf("rec:%#v\n", rec)

//按队列顺序删除一条记录(注:只更新标记)
recApi.DeleteRec(dbmod.RecArea01, 1)

//获取队列中未上传/消费的记录数量
num := recApi.GetNoUploadNum(dbmod.RecArea01)
fmt.Printf("GetNoUploadNum:%d\n", num)
}

 部分源码:

package dbmod
import (
"encoding/json"
"errors"
"fmt"
"log"
"sync"
"time"
)
var (
//IsDebug 是否调试
IsDebug = true
recDir [MAXRECAREAS]RecDir
lockSave = sync.Mutex{}
lockDel = sync.Mutex{}
once sync.Once
singleintance *Records
)
// Records ...
type Records struct {
ID int `json:"id"`
RecNo int `json:"sn"`
RecType int `json:"type" `
RecTime string `json:"time" `
Data interface{} `json:"data" `
Ext string `json:"ext" `
Res string `json:"res" `
}
// InitRecAreas 初始化记录存储区
func (rec Records) InitRecAreas() error {
// 清空数据
err := DelAllData()
if err != nil {
log.Fatal(err.Error())
return err
}
// 初始化目录表
err = InitRecDir()
if err != nil {
log.Fatal(err.Error())
return err
}
return err
}
// OpenRecAreas 打开记录存储区,每次开机,需要先打开一下
func (rec *Records) OpenRecAreas() (err error) {
//加载RecDir
for i := 0; i log.Printf("LoadDirs %02d \n", i+1)
err = recDir[i].LoadDirs(RecArea(i) + 1)
if err != nil {
log.Println(err.Error())
return
}
log.Printf("LoadDirs %02d ok!\n", i+1)
}
//log.Println(recDir)
return err
}
func saveRecToLDB(areaID RecArea, rec *Records, wid int) (id int, err error) {
t := time.Now()
rec.RecTime = t.Format("20060102150405")
rec.ID = wid
key := fmt.Sprintf("Rec%02dTB|%d", areaID, wid)
bv, err := json.Marshal(rec)
if err != nil {
log.Println("saveRecToLDB Marshal Error:", err)
return id, err
}
err = PutData(key, bv)
if err != nil {
log.Println("saveRecToLDB PutData Error:", err)
return id, err
}
return id, err
}
// SaveRec 保存记录
func (rec *Records) SaveRec(areaID RecArea, data interface{}, recType int) (id int, err error) {
lockSave.Lock()
defer lockSave.Unlock()
//log.Printf("SaveRec,area=%02d \n", areaID)
if (areaID <= 0) || (areaID > MAXRECAREAS) {
err = fmt.Errorf("area id %02d is not right,must between 1 and %02d", areaID, MAXRECAREAS)
log.Println(err.Error())
return
}
rec.RecNo = recDir[areaID-1].RecNo
rec.Data = data
rec.RecType = recType
//记录是否存储满,判断
if (recDir[areaID-1].WriteID + 1) > (MAXRECCOUNTS) {
if recDir[areaID-1].ReadID == 0 {
err = fmt.Errorf("rec area %02d is full", areaID)
log.Println(err.Error())
return
}
if (recDir[areaID-1].WriteID + 1 - (MAXRECCOUNTS)) == recDir[areaID-1].ReadID {
err = fmt.Errorf("rec area %02d is full", areaID)
log.Println(err.Error())
return
}
//保存记录
recDir[areaID-1].RecNo++
recDir[areaID-1].WriteID = 1
recDir[areaID-1].Flag = "1"
id = 1
_, err = saveRecToLDB(areaID, rec, id)
if err != nil {
log.Println("saveRecToLDB Error:", err)
return
}
err = recDir[areaID-1].UpdateDirs(areaID)
if err != nil {
log.Println("SaveRec UpdateDirs Error:", err)
return
}
//log.Printf("SaveRec,area=%02d ok!\n", areaID)
return id, err
}
if recDir[areaID-1].Flag == "1" {
//记录是否满判断
if (recDir[areaID-1].WriteID + 1) == recDir[areaID-1].ReadID {
err = fmt.Errorf("rec area %02d is full", areaID)
log.Println(err.Error())
return
}
rec.RecNo += 1
id = recDir[areaID-1].WriteID + 1
_, err = saveRecToLDB(areaID, rec, id)
if err != nil {
log.Println("saveRecToLDB Error:", err)
return
}
recDir[areaID-1].RecNo++
recDir[areaID-1].WriteID = id
err = recDir[areaID-1].UpdateDirs(areaID)
if err != nil {
log.Fatal(err.Error())
return 0, err
}
//log.Printf("SaveRec,area=%02d ok!\n", areaID)
return id, err
}
rec.RecNo += 1
id = recDir[areaID-1].WriteID + 1
_, err = saveRecToLDB(areaID, rec, id)
if err != nil {
log.Println("saveRecToLDB Error:", err)
return
}
recDir[areaID-1].RecNo++
recDir[areaID-1].WriteID = id
err = recDir[areaID-1].UpdateDirs(areaID)
if err != nil {
log.Fatal(err.Error())
return 0, err
}
//log.Printf("SaveRec,area=%02d ok!\n", areaID)
return id, err
}
// UpdateRec 更新记录
func (rec *Records) UpdateRec(areaID RecArea, recID int, data interface{}, recType int) (id int, err error) {
if (areaID <= 0) || (areaID > MAXRECAREAS) {
err = fmt.Errorf("area id %02d is not right,must between 1 and %02d", areaID, MAXRECAREAS)
log.Println(err.Error())
return
}
rec.Data = data
rec.RecType = recType
id, err = saveRecToLDB(areaID, rec, recID)
return id, err
}
// DeleteRec 删除记录(并不是真正删除表里记录,而是清除该记录的上传标记)
// areaID:记录区 num:删除的数量
func (rec Records) DeleteRec(areaID RecArea, num int) (err error) {
lockDel.Lock()
defer lockDel.Unlock()
if (areaID <= 0) || (areaID > MAXRECAREAS) {
err = errors.New("area id is not right")
log.Fatal(err.Error())
return
}
id := recDir[areaID-1].ReadID
//如果写的位置等于读的位置,说明记录已上传完,没有要删除的了
if recDir[areaID-1].WriteID == recDir[areaID-1].ReadID {
return
}
//如果要删除的数量大于了最大的记录数
if (id + num) > MAXRECCOUNTS {
if (id + num - MAXRECCOUNTS) > recDir[areaID-1].WriteID {
recDir[areaID-1].ReadID = recDir[areaID-1].WriteID
err = recDir[areaID-1].UpdateDirs(areaID)
if err != nil {
log.Fatal(err.Error())
return err
}
return
}
//更新读指针(读的位置)
recDir[areaID-1].ReadID = id + num - MAXRECCOUNTS
err = recDir[areaID-1].UpdateDirs(areaID)
if err != nil {
log.Fatal(err.Error())
return err
}
return
}
//如果当前写的位置大于读的位置
if recDir[areaID-1].WriteID > recDir[areaID-1].ReadID {
if id+num > recDir[areaID-1].WriteID {
//更新读指针(读的位置)
recDir[areaID-1].ReadID = recDir[areaID-1].WriteID
err = recDir[areaID-1].UpdateDirs(areaID)
if err != nil {
log.Fatal(err.Error())
return err
}
return
}
}
//更新读指针(读的位置)
recDir[areaID-1].ReadID = id + num
err = recDir[areaID-1].UpdateDirs(areaID)
if err != nil {
log.Fatal(err.Error())
return err
}
return
}
//GetNoUploadNum 获取未上传记录数量
func (rec Records) GetNoUploadNum(areaID RecArea) int {
num := 0
if recDir[areaID-1].WriteID == recDir[areaID-1].ReadID {
num = 0
return num
}
if recDir[areaID-1].Flag != "1" {
num = int(recDir[areaID-1].WriteID - recDir[areaID-1].ReadID)
} else {
if recDir[areaID-1].WriteID > recDir[areaID-1].ReadID {
num = int(recDir[areaID-1].WriteID - recDir[areaID-1].ReadID)
} else {
num = int(MAXRECCOUNTS - recDir[areaID-1].ReadID + recDir[areaID-1].WriteID)
}
}
return num
}
// ReadRecByID 按数据库ID读取记录
func (rec Records) ReadRecByID(areaID RecArea, rid int) (p *Records, err error) {
var rec1 Records
if (areaID <= 0) || (areaID > MAXRECAREAS) {
err = errors.New("area id is not right")
log.Fatal(err.Error())
return
}
key := fmt.Sprintf("Rec%02dTB|%d", areaID, rid)
bv, err := GetData(key)
err = json.Unmarshal(bv, &rec1)
if err != nil {
log.Println("ReadRecByID Unmarshal Error:", key, err)
}
return &rec1, nil
}
//ReadRecNotServer 读取未上传的记录数据,顺序读取第SN条未上传的记录
//sn取值 1-到-->未上传记录数目
func (rec Records) ReadRecNotServer(areaID RecArea, sn int) (p *Records, err error) {
if (areaID <= 0) || (areaID > MAXRECAREAS) {
err = errors.New("area id is not right")
log.Fatal(err.Error())
return
}
id := recDir[areaID-1].ReadID
//fmt.Printf("id=%d\n", id)
if (int(id) + sn) > MAXRECCOUNTS {
if int(id)+sn-MAXRECCOUNTS > int(recDir[areaID-1].WriteID) {
return nil, errors.New("no records")
}
p, err = rec.ReadRecByID(areaID, int(id)+sn-MAXRECCOUNTS)
} else {
if recDir[areaID-1].ReadID <= recDir[areaID-1].WriteID {
if (int(id) + sn) > int(recDir[areaID-1].WriteID) {
return nil, errors.New("no records")
}
}
p, err = rec.ReadRecByID(areaID, int(recDir[areaID-1].ReadID)+sn)
}
return p, err
}
// ReadRecWriteNot 倒数读取第SN条写入的记录
//读取一条记录 倒数读取第SN条写入的记录
func (rec Records) ReadRecWriteNot(areaID RecArea, sn int) (p *Records, err error) {
id := int(recDir[areaID-1].WriteID)
if (id - sn) <0 {
if recDir[areaID-1].Flag == "1" {
p, err = rec.ReadRecByID(areaID, MAXRECCOUNTS-(sn-id-1))
} else {
return nil, errors.New("no records")
}
} else {
p, err = rec.ReadRecByID(areaID, (id - sn + 1))
}
return
}
// GetLastRecNO 获取最后一条记录流水号
func (rec Records) GetLastRecNO(areaID RecArea) int {
if (areaID <= 0) || (areaID > MAXRECAREAS) {
log.Println("area id is not right")
return 0
}
id := recDir[areaID-1].RecNo
return id
}
// GetReadWriteID 获取当前的读、写ID
func (rec Records) GetReadWriteID(areaID RecArea) (rid, wid int) {
if (areaID <= 0) || (areaID > MAXRECAREAS) {
log.Println("area id is not right")
return 0, 0
}
rid = recDir[areaID-1].ReadID
wid = recDir[areaID-1].WriteID
return rid, wid
}
// NewRecords ...
func NewRecords(debug bool) *Records {
IsDebug = debug
if singleintance == nil {
once.Do(func() {
fmt.Println("Init singleintance Record operation ")
singleintance = new(Records)
})
}
return singleintance
}

 



推荐阅读
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
  • Nginx使用AWStats日志分析的步骤及注意事项
    本文介绍了在Centos7操作系统上使用Nginx和AWStats进行日志分析的步骤和注意事项。通过AWStats可以统计网站的访问量、IP地址、操作系统、浏览器等信息,并提供精确到每月、每日、每小时的数据。在部署AWStats之前需要确认服务器上已经安装了Perl环境,并进行DNS解析。 ... [详细]
  • vue使用
    关键词: ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • 本文介绍了Hyperledger Fabric外部链码构建与运行的相关知识,包括在Hyperledger Fabric 2.0版本之前链码构建和运行的困难性,外部构建模式的实现原理以及外部构建和运行API的使用方法。通过本文的介绍,读者可以了解到如何利用外部构建和运行的方式来实现链码的构建和运行,并且不再受限于特定的语言和部署环境。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • 本文介绍了sqlserver云存储和本地存储的区别,云存储是将数据存储在网络上,方便查看和调用;本地存储是将数据存储在电脑磁盘上,只能在存储的电脑上查看。同时提供了几种启动sqlserver的方法。此外,还介绍了如何导出数据库的步骤和工具。 ... [详细]
  • Linux的uucico命令使用方法及工作模式介绍
    本文介绍了Linux的uucico命令的使用方法和工作模式,包括主动模式和附属模式。uucico是用来处理uucp或uux送到队列的文件传输工具,具有操作简单快捷、实用性强的特点。文章还介绍了uucico命令的参数及其说明,包括-c或--quiet、-C或--ifwork、-D或--nodetach、-e或--loop、-f或--force、-i或--stdin、-I--config、-l或--prompt等。通过本文的学习,读者可以更好地掌握Linux的uucico命令的使用方法。 ... [详细]
  • 在Oracle11g以前版本中的的DataGuard物理备用数据库,可以以只读的方式打开数据库,但此时MediaRecovery利用日志进行数据同步的过 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • 本文介绍了一种轻巧方便的工具——集算器,通过使用集算器可以将文本日志变成结构化数据,然后可以使用SQL式查询。集算器利用集算语言的优点,将日志内容结构化为数据表结构,SPL支持直接对结构化的文件进行SQL查询,不再需要安装配置第三方数据库软件。本文还详细介绍了具体的实施过程。 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
author-avatar
手机用户2502908935
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有