抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

EasyDB 项目回顾

最近为简历头疼死了😐,总感觉之前投的简历没写好,而且挺久没看这个项目了,忽然想起来貌似之前有篇博客说要写篇关于这个项目的文章,好吧,那就回顾回顾正好写写

除了这个轮子项目还有一个微服务直播项目,有空也写写(立Flag小能手)🙃,那个业务上写的挺好的感觉

实现功能

俺就不像简历上的那样逐字逐句斟酌了,简单随随便便写写

列一下

  1. 事务持久化
  2. 计数缓存框架
  3. 日志持久化 + WAL机制
  4. 第一检验页面和普通页面 + 可靠数据检测恢复机制
  5. 页面索引(比较简陋)
  6. 数据共享类和全局统一数据类
  7. MVCC + 2PL + 等待图死锁检测 和基于此的调度序列可串行化 + 读已提交、可重复读事务隔离级别
  8. B+树结构的索引
  9. 分词器和SQL解析器
  10. 表字段、结构管理器
  11. 基于Socket的客户端服务端通信加密机制

小小说明

持久化

这里持久化都是放在文件中去做的,所以每次从持久层读都是做磁盘IO,是比较消耗资源的

计数缓存

于是就弄了个技术缓存框架,存放资源的引用情况,当某个资源释放到了引用为0时就直接写回磁盘文件,框架的意思就是提供静态方法直接使用和抽象方法让子类去实现,所以每个缓存实现类都有从数据源/持久层拿到资源的能力

日志 WAL

日志说的是数据库操作的日志,不是项目的日志,日志中存储的是数据操作的记录,WAL(Write-Ahead Logging)即预写式日志,每次数据修改之前先将修改记录落到日志,同时确保其已经刷新到磁盘,再做真正的数据修改

页面设计

页面设计的是固定大小,然后第一页有个特殊用处是做数据校验,每次启动数据库会先将某个字符串写入第一页的某个位置,然后正常关闭时在另一个位置写入同一条字符串;下一次启动时查看这两处的字符串是否一致,不一致就执行redo重做事务undo回滚事务

页面索引机制则是将页面划分为40个区间,然后以每个页面的空闲区间数量为关键字存到哈希表中,值则是一个页号列表,每次需要将数据写入页面中就从能够满足空间需求的页号列表中拿出一个页面写入

数据传递管理

因为Java取一个数组的分片时,时拷贝这段分片内存中的数据,所以设计一个数据共享类,数据放在一个字节数组中,每次将这个数据传递,并给使用这个数组不同的模块设置不同的起始、结束位置

并发控制调度序列

版本管理中,抽象出一个记录,每个记录会标识创建、删除本身的事务;再维护活跃事务快照;

针对每个事务,看到的都是某个时间点的数据库版本,记录当时的事务快照,这样MVCC多版本并发控制就可以实现;

再基于两段锁(2PL)协议,读和插入时不用在版本管理模块申请锁,而在记录被修改时删除原记录(标识,非真删)时才强制加锁,事务完成记录修改或者回滚时进行解锁,整个过程中相较于单纯的2PL就降低了阻塞概率;

另外加锁时还会利用等待图法,使用深度优先遍历方法进行环检测,有环就是有死锁,有死锁就会尝试回滚这个尝试加锁的事务

B+树索引

这里实现了B+树结构,当时写的时候感觉挺难搞

非叶子节点存储索引数据,叶子节点存储真实数据,同层节点以链表形式连接,单个节点设置有平衡因子,超过这个因子的2倍时进行分裂操作

SQL解析

先写了个分词器,根据特定语法结构将SQL语句进行分割并向外提供逐个读取的方法,然后解析器就利用分词器和字符串匹配,解析不同类型的语句并返回特殊定义的相应结果

其实按道理应该是使用语法树 + 过滤器

表字段、结构管理器

这里就根据不同类型的语句解析解析结果,调用下层的数据模块、版本模块、事务模块做数据持久化、缓存操作

通信

其实这里使用的就是Java原生的Socket,服务端和客户端各自创建ServerSocketClientSocket,然后去做连接,这里也比较简陋简单

整体架构模块

通用模块

  1. AbstractCache 抽象缓存层
  2. SubArray 数据共享层

核心层

  1. TransactionManager(TM)事物管理模块
  2. DataManager(DM)数据管理模块
  3. VersionManager(VM)版本管理模块
  4. IndexManager(IM)索引管理模块
  5. StatementParser(SP)语句解析模块
  6. TableManager(TBM)表管理模块

通信层

  1. Transpoter 数据传输器
  2. Server 服务端
  3. Client 客户端

模块解析

针对每个模块稍稍详细地写了些,贴了些源码,整个项目源码可以直接看仓库 -> EasyDB

缓存框架

这里使用的是计数缓存框架,不使用LRU是因为考虑到资源驱逐不可控,而使用计数缓存则可以让上层模块主动释放引用,确保模块中不存在这个资源的引用,再去释放资源

实现机制

1. 成员变量和构造函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 // 缓存数据
private final HashMap<Long, T> cacheData;

// 缓存中资源引用个数标记
private final HashMap<Long, Integer> referenceRecord;

// 从数据源/持久区中获取资源的记录情况
private final HashMap<Long, Boolean> acquisitionSituation;

// 缓存最大资源数
private final int maxResourceNum;

// 现有缓存资源数量
private int cacheCounter;

private final Lock lock;

public AbstractCache(int maxResourceNum) throws ErrorException {
if(maxResourceNum < 0){
Log.logErrorMessage(ErrorMessage.CACHE_RESOURCE_NUMBER_ERROR);
}
this.maxResourceNum = maxResourceNum;
this.cacheData = new HashMap<>();
this.referenceRecord = new HashMap<>();
this.acquisitionSituation = new HashMap<>();
this.lock = new ReentrantLock();
this.cacheCounter = 0;
}

其中最大缓存资源数设置为一个配置项

2. 抽象方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* @Author: 711lxsky
* @Description: 当资源不在缓存中时,从数据源加载获取
*/
protected abstract T getCacheFromDataSourceByKey(long cacheKey) throws ErrorException, WarningException;

/**
* @Author: 711lxsky
* @Description: 释放缓存,并写回文件/磁盘
* 写回一般指的是将脏页数据写入到对应硬盘或者其他持久化存储设备中
*/
protected abstract void releaseCacheForObject(T Object) throws WarningException, ErrorException;

这两个抽象方法是具体的缓存实现类继承后去做的,分别是从数据源获取数据(类似从磁盘读)和释放缓存时的写回操作(写入持久化文件/磁盘)

3. 操作逻辑:

类中提供两个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
 /**
* @Author: 711lxsky
* @Description: 获取某个资源,可能是从缓存中拿到,也可能是去获取,然后放入缓存
* 所以这个方法本身也就是在获取缓存数据
*/
protected T getResource(long key) throws WarningException, ErrorException {
while(true){
this.lock.lock();
// 如果请求的资源一直在被其他线程获取,就反复等待尝试
if(this.acquisitionSituation.containsKey(key)){
this.lock.unlock();
try {
Thread.sleep(ThreadSetting.CACHE_GET_SLEEP_TIME);
} catch (InterruptedException e) {
Log.logWarningMessage(e.getMessage());
continue;
}
continue;
}
// 如果资源在缓存中,直接返回
if(this.cacheData.containsKey(key)){
T object = this.cacheData.get(key);
// 拿到资源,给引用数+1
this.referenceRecord.put(key, this.referenceRecord.get(key) + 1);
this.lock.unlock();
return object;
}
// 尝试获取该资源并放入缓存
if(this.maxResourceNum > 0 && this.cacheCounter == this.maxResourceNum){
// 缓存已经满了
this.lock.unlock();
Log.logWarningMessage(WarningMessage.CACHE_FULL);
return null;
}
// 缓存没满,在资源获取中注册一下,准备从数据源获取资源
this.acquisitionSituation.put(key, true);
this.lock.unlock();
break;
}
this.lock.lock();
T object;
try {
object = this.getCacheFromDataSourceByKey(key);
this.cacheData.put(key, object);
this.cacheCounter ++;
this.referenceRecord.put(key, 1);
this.acquisitionSituation.remove(key);
}
finally {
this.lock.unlock();
}
return object;
}

/**
* @Author: 711lxsky
* @Description: 释放一个资源引用
*/
protected void releaseOneReference(long key) throws WarningException, ErrorException {
this.lock.lock();
try {
// 把引用数 - 1
int referenceNum = this.referenceRecord.get(key) - 1;
// 如果接下来资源没有被引用,就释放写回
if(referenceNum == 0){
T obj = cacheData.get(key);
this.releaseCacheForObject(obj);
this.referenceRecord.remove(key);
this.cacheData.remove(key);
this.cacheCounter --;
}
else {
this.referenceRecord.put(key, referenceNum);
}
}
finally {
this.lock.unlock();
}
}
  • getResource()方法先在一个while循环中不断申请资源,如果在资源获取情况记录表中别的资源正在从数据源拿到这个资源就让线程sleep,直到没有其他线程获取这个资源,然后如果缓存就在缓存资源表中就直接拿到返回,同时将引用次数 + 1;如果不在其中,就先判断是否达到最大缓存数量,再在资源获取表中注册一下,调用实现类的获取资源方法,跳出循环,如果获取成功就放入缓存。
  • releaseOneReference()方法释放资源引用时,先将引用表中的引用数 - 1,如果降到0了,说明这个资源没有被引用,可以被释放了,就调用实现类的释放资源写回持久区的方法,同时移除删除这个缓存
  • 这两个方法执行的过程中都有加解锁操作,就是为了保证线程安全,缓存资源数据不会出现紊乱
4. 安全关闭

此外,还有一个安全关闭策略,用于缓存关闭时强制将资源全部写回持久区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* @Author: 711lxsky
* @Description: 安全关闭缓存,并将资源数据写回
*/
protected void close() throws ErrorException, WarningException {
this.lock.lock();
try{
Set<Long> keys = this.cacheData.keySet();
for(long key: keys){
T obj = this.cacheData.get(key);
this.releaseCacheForObject(obj);
this.cacheData.remove(key);
this.referenceRecord.remove(key);
}
}
finally {
this.lock.unlock();
}
}

SubArray 数据共享层

因为Java中数组分片截取时,是做一个元素复制,而不是指向同一片内存,所以针对截取部分数组的修改对原数组不可见,这里封装了一个类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class SubArray {

// 原始数据
public byte[] rawData;

// 数据开始位置标记
public int start;

// 数据结束位置标记
public int end;

public SubArray(byte[] rawData, int start, int end) {
this.rawData = rawData;
this.start = start;
this.end = end;
}
}

传递类实例的时候,对数据进行修改了可以感知到

TransactionManager(TM)

概述

作为事物管理模块,以XID为第一关键字,将所有事务信息持久化在.xid文件中。并提供接口供其他模块查询某个事务的状态

事务状态

设为全局常量,放在一个常量配置类中,有三种:

1
2
3
4
// 事务状态,正在执行,已提交,已撤销
public static final byte TRANSACTION_ACTIVE = 0;
public static final byte TRANSACTION_COMMITTED = 1;
public static final byte TRANSACTION_ABORTED = 2;

特殊

  1. 提供一个超级事务,这个事务的XID为0,可以在没有申请的事务的情况下执行某些操作。且超级事务状态永远是committed
  2. TM模块只负责记录、维护某个事务状态,涉及事务数据提交、回滚另有数据管理模块做

.xid文件结构

每个事务都有一个XID,这个XID唯一标识此事务,且XID从1开始自增,不可重复(相对单个的.xid文件而言,如果不在同一个文件自然可以重复)。
文件头部有一个8字节大小的数字,记录当前文件中的事务数量,然后每个事务的状态占据1个字节

所以结构是这样:
[t_cnt 事务个数(8字节)][t_status 事务状态(1字节)]…
某个XID = x_id的事务状态存储在(x_id - 1) + 8字节位置(XID=0的超级事务不需记录)

提供接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* @Author: 711lxsky
* @Description: 开启新事务
*/
long begin() throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 提交新事务
*/
void commit(long xid) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 取消事务
*/
void abort(long xid) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 查询某个事务状态是否为活动状态
*/
boolean isActive(long xid) throws ErrorException;

/**
* @Author: 711lxsky
* @Description: 查询某个事务状态是否为已提交状态
*/
boolean isCommitted(long xid) throws ErrorException;

/**
* @Author: 711lxsky
* @Description: 查询某个事务状态是否为已回滚状态
*/
boolean isAborted(long xid) throws ErrorException;

/**
* @Author: 711lxsky
* @Description: 关闭事务管理器
*/
void close() throws ErrorException;


/**
* @Author: 711lxsky
* @Description: 根据某个路径创建一个新的事务管理器
*/
static TransactionManagerImpl create(String xidFileFullName) throws WarningException, ErrorException {
// 创建基础文件
File newFile = FileManager.createFile(xidFileFullName + TMSetting.XID_FILE_SUFFIX);
return buildTMWithFile(newFile, false);
}

/**
* @Author: 711lxsky
* @Description: 根据某个路径打开一个事务管理器
*/
static TransactionManagerImpl open(String xidFileFullName) throws WarningException, ErrorException {
// 创建基础文件
File newFile = FileManager.openFile(xidFileFullName + TMSetting.XID_FILE_SUFFIX);
return buildTMWithFile(newFile, true);
}

实现逻辑要点

  • NIO、文件
.xid文件类型和读取

.xid文件使用的类型使用的是RandomAccessFile,文件读写基于从其中拿到的FileChannel

  • 首先,RandomAccessFile提供文件的随机访问能力,允许程序直接跳转到文件的任意位置进行读写操作。
  • 其次,FileChannelJava NIO的一部分,支持直接缓冲区内存映射文件,这允许操作系统在不涉及Java堆的情况下处理大文件,减少数据拷贝,提高性能;另外还提供文件锁定功能,实现文件或文件区域的独占访问,在多线程环境下保证数据一致性
计数器校验

打开.xid文件并创建一个TransactionManager实现类实例时,会校验.xid文件头,先是看文件长度会不会小于计数器长度,然后根据计数器算出事务数量以及相应需要的文件长度,和实际文件长度对比

计数器操作

实现类中有一个锁成员变量,用以在修改计数器时进行加解锁操作,防止其他线程修改,造成数据不一致

接口实现
  • begin(): 先将计数器 + 1 位置的事务状态设置为执行中,再自增计数器
  • close(): 关闭RandomAccessFileFileChannel
  • 其他方法就是根据XID从相应位置读出状态或者更新状态 or 加以判断(中间包含校验超级事务逻辑)

DM 数据管理模块

DM模块管理数据库DB文件和日志文件。

主要职责:

  • 分页管理DB文件,并缓存
  • 管理日志文件,保证在发生错误时可以根据日志进行恢复
  • 抽象DB文件DataItem供上层模块调用

所以,DM模块是上层模块和文件系统至之间的抽象层,向下读写文件,向上提供数据包装,再加上一个日志功能

对外提供接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @Author: 711lxsky
* @Description: 以DataItem形式读取并返回数据
*/
DataItem readDataItem(long uid) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 插入数据,先包裹成DataRecord格式,然后再借助页面索引插入到相应的页中,返回uid
*/
long insertData(long xid, byte[] data) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 调用Logger的writeLog方法,将日志写入到日志文件中
*/
void writeLog(byte[] log) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 释放一个DataItem对象
*/
void releaseOneDataItem(long uid) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 关闭相应的资源
*/
void close() throws ErrorException, WarningException;

除了这些,还有create()和open()方法,前者会调用PageCache和Logger进行相应文件创建,后者则是打开并校验数据

实现逻辑

页面

针对文件系统,将其抽象成为页面,每次对文件系统的读写都是以页面为单位,且这里将单个页面的大小设置为8KB,放在配置类中,可修改

页面缓存

有一个页面缓存的实现,这个缓存对外提供接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* @Author: 711lxsky
* @Description: 拿到数据库文件中的页数,表示有多少个页面
*/
int getPagesNumber();

/**
* @Author: 711lxsky
* @Description: 新建一个页面到数据库文件,并放入需要存放的数据,返回页号
* 这里并没有自动放到缓存里
*/
int buildNewPageWithData(byte[] initData) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 根据页号,从缓存中获取页面
*/
Page getPageByPageNumber(int pageNumber) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 释放一个页面资源引用
*/
void releaseOneReference(Page page) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 这个方法是用来恢复数据或者其他场景下,截断页面文件
*/
void truncatePageWithMPageNum(int maxPageNumber) throws WarningException;

/**
* @Author: 711lxsky
* @Description: 刷新页面缓存到文件中
*/
void flushPage(Page page) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 关闭页面缓存,这里是基于RandomAccessFile实现,需要关闭资源
*/
void close() throws ErrorException;

同时类似于TM,会创建或者打开一个.pg文件,用以做页面数据的持久化,不同的是加入了一个long类型memory内存大小参数,用以计算数据页资源的最大缓存数量

实现类中,成员变量有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 页面数据文件
*/
private final RandomAccessFile pageDataFile;

/**
* 数据文件通道
*/
private final FileChannel pageFileChannel;

/**
* 记录当前打开的页面数据文件所含页面的页数,
* 此数据在打开时就会被计算,并在创建页面时自增
*/
private final AtomicInteger pageNumbers;

/**
* 放锁,防止多个线程同时操作文件造成数据不一致
*/
private final Lock pageFileLock;

所以新建一个页面时,就自增原子数据,然后将数据包裹为通用Page对象,写入刷新到.pg文件

前面说了单个页面数据的大小,所以需要拿到某个页面的数据时,偏移量就是 (long) (pageNumber - 1) * PageSetting.PAGE_SIZE;

至于从数据源拿到缓存资源,这里的key就是页面号,算到偏移量之后直接读数据;释放资源写回文件操作是,判断这个page是否是脏页,是的话就写回

截断页面方法,是为了为恢复数据作铺垫,直接将.pg文件截断到某个长度

页面对象

这里首先有一个通用的页面接口Page

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* @Author: 711lxsky
* @Description: 加锁,或者这个页面读写的时候上锁,保证数据一致性
*/
void lock();

/**
* @Author: 711lxsky
* @Description: 释放锁
*/
void unlock();

/**
* @Author: 711lxsky
* @Description: 释放页面缓存
*/
void releaseOneReference() throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 设置页面数据的脏标记
*/
void setDirtyStatus(Boolean status);

/**
* @Author: 711lxsky
* @Description: 判断页面是不是脏数据
*/
boolean isDirty();

/**
* @Author: 711lxsky
* @Description: 获取页面的页号
*/
int getPageNumber();

/**
* @Author: 711lxsky
* @Description: 获取页面数据的字节形式的原始数据
*/
byte[] getPageData();

落到实现类上去也很简单,成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 页面页号,从 1 开始
*/
private final int pageNumber;

/**
* 页面包含的字节数组形式数据
*/
private final byte[] data;

/**
* 脏状态标记
* 脏的话意味着缓存中的数据和内存/持久层中的数据不一致,缓存驱逐时务必写回
*/
private boolean dirtyStatus;

/**
* 页面缓存
*/
private final PageCache pageCache;

/**
* 页面锁,资源控制
*/
private final Lock lock;

唯一需要注意的就是PageCache的使用,释放一个资源时,就是释放本身

其次是第一页,这个页面用以做数据库启动时的数据校验,DB启动时,给100~107字节处填入一个随机字节ValidCheckDB关闭时再将其拷贝到108~115字节处(这个位置可调), 数据库每次启动时,会检查两处字节是否相同,以此判断上一次是否正常关闭。如果非正常关闭,就需要执行数据恢复

再就是普通页,这个页在文件中的结构是 [页头][存储数据] 页头是个2字节的无符号整形,记录了当前页的空闲空间的偏移量
此类就提供一些数据修改的方法

注意: 第一页和普通页提供的都是静态方法,数据都是持久化在.pg文件中的,是由通用Page控制的

日志
对外提供接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @Author: 711lxsky
* @Description: 写入日志
*/
void writeLog(byte[] data) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 定位指针到日志数据起始位置
*/
void rewind();

/**
* @Author: 711lxsky
* @Description: 读取下一条日志信息
*/
byte [] readNextLogData() throws ErrorException, WarningException;

/**
* @Author: 711lxsky
* @Description: 截断日志文件到指定长度
*/
void truncate(long length) throws WarningException;

/**
* @Author: 711lxsky
* @Description: 关闭日志
*/
void close() throws ErrorException;

持久化也是基于一个.log文件

具体实现

实现类的成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 日志文件
*/
private final RandomAccessFile logFile;

/**
* 日志文件通道
*/
private final FileChannel logFileChannel;

/**
* 资源锁
*/
private final Lock lock;

/**
* 日志文件位置指针
*/
private long logFileLocationPointer;

/**
* 日志文件原始长度,读取日志的时候不去改动
*/
private long logFileOriginLength;

/**
* 日志文件总校验和
*/
private int logsChecksum;

日志文件的格式:
[LogsChecksum] [Log1] [Log2] … [LogN] [BadTail]

  • LogsChecksum为后续所有日志计算的Checksum4字节int类型
  • Log1...LogN是常规日志数据
  • BadTail是在数据库崩溃时,没有来得及写完的日志数据,这个BadTail不一定存在

单条日志记录格式:

  • [Size][Checksum][Data]
  • Size标记Data字段的字节数, 4字节int类型
  • Checksum是该条数据的校验和, 4字节int类型
  • Data是实际的数据

日志数据类型分为插入和更新,没有删除是因为直接将这条数据的有效标志位设为invalidate即可

单条日志的校验和基于一个种子算出:

1
2
3
4
5
6
7
8
9
10
/**
* @Author: 711lxsky
* @Description: 根据种子计算校验和
*/
private int calculateChecksum(int logChecksum, byte[] log){
for(byte littleData : log){
logChecksum = logChecksum * LoggerSetting.LOGGER_SEED + littleData;
}
return logChecksum;
}

实现类有一个用于读取下一条日志的方法,可以为其他模块提供迭代读取日志数据的功能

另外还提供一个截断方法,将FileChannel截断到指定位置

恢复策略

有一个Recover类,专门用于进行数据恢复

调用日志模块,迭代读取每条日志,并拿到最大的事务xid,以此xid为基准,截断页面文件,再从前到后顺序redo重做所有状态不是活跃(也就是已提交和撤销)的事务;从后到前undo逆序回滚所有未完成(也就是活跃)的事务

页面索引

页面空间在页面索引管理视角下,是被分割成40个(默认,可调)小区间的,
初始时空闲区间数量就是40, 然后用着用着就会减少空间,空闲区间数量会减少
所以页面索引就以空闲区间的数量为基准,管理页面,每次写一个页面时,就会按照需要的空间大小去找合适的页面

在启动时,就会遍历所有的页面信息,获取页面的空闲空间,安排到这40个区间中
insert在请求一个页时,会首先将所需的空间向上取整,映射到某一个区间,随后取出这个区间的任何一页,都可以满足需求
实现的逻辑是在能满足空间要求的情况下,优先去找空闲空间更小的页面

注意:插入时被选择的页会被直接从PageIndex中暂时移除,上层模块调用完之后再重新插入

DataItem

DM层向上提供的数据抽象接口,上层模块通过地址,向DM请求到相应的DataItem,再获取数据
DataItem中保存的数据结构DataRecord格式:
[Valid][DataSize][Data]

  • Valid: 1字节, 用于标记数据是否有效,1有效0无效
  • DataSize: 2字节, 标识Data的大小
提供接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/**
* @Author: 711lxsky
* @Description: 获取DataRecord数据记录,非完整原始数据
*/
SubArray getDataRecord();

/**
* @Author: 711lxsky
* @Description: 获取完整原始数据记录
*/
SubArray getRawDataRecord();

/**
* @Author: 711lxsky
* @Description: 检查数据是否有效
*/
boolean isValid();

/**
* @Author: 711lxsky
* @Description: 在修改数据之前进行的操作
*/
void beforeModify();

/**
* @Author: 711lxsky
* @Description: 撤销修改操作
*/
void unBeforeModify();

/**
* @Author: 711lxsky
* @Description: 数据修改之后的操作
*/
void afterModify(long xid) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 释放一个引用
*/
void releaseOneReference() throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 读锁申请
*/
void readLock();

/**
* @Author: 711lxsky
* @Description: 读锁释放
*/
void readUnlock();

/**
* @Author: 711lxsky
* @Description: 写锁申请
*/
void writeLock();

/**
* @Author: 711lxsky
* @Description: 写锁释放
*/
void writeUnlock();

/**
* @Author: 711lxsky
* @Description: 拿到数据页
*/
Page getPage();

/**
* @Author: 711lxsky
* @Description: 获取DataItem的唯一标识
*/
long getUid();

/**
* @Author: 711lxsky
* @Description: 获取原始数据记录
*/
byte[] getOldDataRecord();

/**
* @Author: 711lxsky
* @Description: 包裹Data构建DataRecord
*/
static byte[] buildDataRecord(byte[] data){
byte[] valid = new byte[]{DataItemSetting.DATA_VALID};
byte[] size = ByteParser.shortToBytes((short)data.length);
return Bytes.concat(valid, size, data);
}

/**
* @Author: 711lxsky
* @Description: 构建DataItem
*/
static DataItem buildDataItem(Page page, short offset, DataManager dm){
byte[] rawData = page.getPageData();
// 注意这里是获取DataARecord中的DataSize
byte[] dataItemDataSizeBytes = Arrays.copyOfRange(rawData, offset + DataItemSetting.DATA_SIZE_OFFSET, offset + DataItemSetting.DATA_DATA_OFFSET);
short dataItemDataSize = ByteParser.parseBytesToShort(dataItemDataSizeBytes);
// 得到整个DataRecord的大小
short dataRecordLength = (short)(DataItemSetting.DATA_DATA_OFFSET + dataItemDataSize);
long uid = Logger.parsePageNumberAndOffsetToUid(page.getPageNumber(), offset);
// 转换成SubArray的形式进行构建DataItem
SubArray dataRecord = new SubArray(rawData, offset, offset + dataRecordLength);
return new DataItemImpl(dataRecord, new byte[dataRecordLength], page, uid, dm);
}

/**
* @Author: 711lxsky
* @Description: 设置DataRecord为无效
*/
static void setDataRecordInvalid(byte[] dataRecord){
dataRecord[DataItemSetting.DATA_VALID_OFFSET] = DataItemSetting.DATA_INVALID;
}

上层模块需要对DataItem进行修改操作时,需要先调用beforeModify()方法,然后修改数据,最后调用afterModify()方法将记录写入日志;如果要撤销修改,就调用unBeforeModify()方法,将备份数据拷回

另外,与之相关的缓存键是由页号和偏移量组成的8 字节无符号整数

VersionManager 版本控制模块

VM模块是事务额数据版本的管理核心

实现MVCC多版本并发控制
DM向外提供DataItem,而VM通过管理所有数据项,向上层提供记录,上层模块操作数据的最小单位就是记录,然后VM在内部为每个记录维护了多个版本,每当上层模块对某个记录进行修改时,VM就会为这个记录创建一个新版本

采用两段锁协议2PL实现调度序列的可串行化,同时利用MVCC降低事务阻塞概率

实现事务隔离级别的读已提交和可重复读

事务抽象

针对事务操作,实现一个事务类

成员变量
1
2
3
4
5
6
7
8
9
10
11
// 抽象事务的XID
private long xid;

// 事务隔离级别
private final int transactionIsolationLevel;

// 当前事务执行(开始)时的活跃事务XID快照集合
private Set<Long> snapshotXIDsForActiveTransaction;

// 意外终止标志,后续出现问题,这个成员变量会被设置为true,表示事务选择中止
private boolean accidentalTermination;

snapshotXIDsForActiveTransaction是专门用以记录某个事务执行时活跃的事务xid,方便可重复读级别下进行判断某个事务是否在当前事务的快照中

同时还有一个事务意外终止判断,如果发生意外终止就将此成员变量设为true

记录Record

这个类用以维护记录结构,且一个这个对象只有一个版本,一条记录存储在一个DataItem

结构

[XMIN][XMAX][Data]

  • XMIN表示的是创建这个记录的事务XID,也就是在此事务之后的事务才有可能拿到这个记录
  • XMAX表示的是删除这个记录的事务XID,也就是在此事务之前的事务才有可能拿到这个记录,前两者都是8字节long
  • Data是这个事务持有的数据
逻辑操作
  1. 数据读取时,需要获取读锁
  2. 修改数据XMAX时,需要调用DataItem的方法

这里只有一个构建方法,用以根据xiddata包裹成一个Record字节数组数据,然后就是设置XMAX方法data数据修改逻辑交给TBM管理

事务可见性判断

这里有一个类VisibilityJudge实现了针对读已提交和可重复读的事务可见性判读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* @Author: 711lxsky
* @Description: 针对特定事务隔离级别,判断记录对事务是否可见
*/
public static boolean judgeVisibility(TransactionManager tm, Transaction transaction, Record record) throws WarningException, ErrorException {
long transactionXid = transaction.getXid();
long recordXmin = record.getXMIN();
long recordXmax = record.getXMAX();
// 记录由当前事务创建且未被删除,可见
if(transactionXid == recordXmin && recordXmax == VMSetting.RECORD_XMAX_DEFAULT) {
return true;
}
switch (transaction.getTransactionIsolationLevel()){
case VMSetting.TRANSACTION_ISOLATION_LEVEL_READ_COMMITTED:
return judgeForReadCommitted(tm, recordXmin, recordXmax);
case VMSetting.TRANSACTION_ISOLATION_LEVEL_REPEATABLE_READ:
return judgeForRepeatableRead(tm, transaction, recordXmin, recordXmax);
default:
Log.logWarningMessage(WarningMessage.TRANSACTION_ISOLATION_LEVEL_UNKNOWN);
return false;
}
}

/**
* @Author: 711lxsky
* @Description: 读已提交级别判断
*/
private static boolean judgeForReadCommitted(TransactionManager tm, long recordXmin, long recordXmax) throws ErrorException {
// 如果记录由某个已经提交的事务创建
if(tm.isCommitted(recordXmax)){
// 如果还未被删除,则可见
if(recordXmax == VMSetting.RECORD_XMAX_DEFAULT){
return true;
}
// 如果记录已被删除,则判断删除事务是否已经提交
// 未提交则可见, 已提交则不可见
if(recordXmax != recordXmin){
return ! tm.isCommitted(recordXmax);
}
}
// 其他情况不可见
return false;
}

/**
* @Author: 711lxsky
* @Description: 重复读级别判断
*/
private static boolean judgeForRepeatableRead(TransactionManager tm, Transaction transaction, long recordXmin, long recordXmax) throws ErrorException {
long transactionXid = transaction.getXid();
// 如果记录由某个已经提交的事务创建,且该事务在当前事务执行之前提交
if(tm.isCommitted(recordXmin) && (recordXmin < transactionXid && !transaction.isInSnapshot(recordXmin))){
// 未被删除,可见
if(recordXmax == VMSetting.RECORD_XMAX_DEFAULT){
return true;
}
// 被删除,则判断删除事务是否已经提交,或者在当前事务执行之后执行/提交
if(recordXmax != recordXmin){
return ! tm.isCommitted(recordXmax) || recordXmax > transactionXid || transaction.isInSnapshot(recordXmax);
}
}
// 其他情况不可见
return false;
}

读已提交的问题: 不可重复读和幻读

不可重复读(Non-repeatable Read):
发生在同一个事务内,当事务在不同时间点读取相同数据时,由于其他事务对数据的更新(修改或删除),导致事务内部的两次读取结果不一致。
例子:假设事务A读取一行记录,然后事务B更新了该记录并提交,事务A再次读取同一行时,会发现数据已被修改,尽管事务A自己没有进行任何写操作。

幻读(Phantom Read):
也是在同一个事务内,当事务执行两次相同的查询(比如范围查询),由于其他事务在两次查询之间插入了新的记录,使得第二次查询的结果包含了第一次查询时不存在的记录。
例子:事务A首次执行一个区间查询,得到一定范围内的记录。随后,事务B在该范围内插入新的记录并提交,事务A再次执行相同的查询,会发现新的记录出现在结果集中,仿佛是“幻影”般突然出现。

不可重复读关注的是数据行本身的修改,即数据值的变更,而幻读关注的是数据集的完整性,即在查询范围内行数的增减

在此基础上,可重复读需要忽略:在当前事务之后开始的事务数据、本事务开始时还是active状态事务的数据

版本跳跃问题:事务读取数据时跳过了某些版本,实际上问题不是很大,中间事务的修改看不到了而已,最终保存的是最后事务的提交结果
读已提交允许版本跳跃,可重复读不允许
解决版本跳跃: 如果当前事务Ti需要修改某个数据X,但是X已经被当前事务Tj不可见的事务修改,那么就要求Tj回滚。至于不可见的条件,就是TjTi之后执行且已提交或者TjTi的快照中(Ti开始时Tj活跃)

1
2
3
4
5
6
7
8
9
10
11
/**
* @Author: 711lxsky
* @Description: 判断记录是否出现版本跳跃
*/
public static boolean judgeVersionHopping(TransactionManager tm, Transaction transaction, Record record) throws ErrorException {
if(transaction.getTransactionIsolationLevel() == VMSetting.TRANSACTION_ISOLATION_LEVEL_READ_COMMITTED){
return false;
}
long recordXmax = record.getXMAX();
return tm.isCommitted(recordXmax) && (recordXmax > transaction.getXid() || transaction.isInSnapshot(recordXmax));
}

死锁检测

因为2PL会阻塞事务,直到持有锁的线程释放锁。为了检测死锁,可以将锁的等待关系抽象成有向边,查看这个图中是否存在环

VersionLockManager

这里实现了一个VersionLockManager类,用以在内存中维护图
每次出现等待时,就尝试向图中增加一条边,并进行死锁检测,如果检测死锁就撤销这条边,不允许添加,并撤销事务

成员变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 某个XID事务已经控制的Record记录列表, 一个XID事务可以控制多个Record记录
*/
private final Map<Long, List<Long>> transactionControlledRecords;

/**
* 某个Record记录被哪个XID事务持有,一个Record记录只能被一个XID事务持有
*/
private final Map<Long, Long> recordControlledByTransaction;

/**
* 等待获取某个Record记录的XID事务列表,一个Record记录可以被多个XID事务等待
*/
private final Map<Long, List<Long>> recordWaitByTransactions;

/**
* 某个XID事务,在等待获取目标Record记录, 一个XID事务只能等待一个Record记录
*/
private final Map<Long, Long> transactionWaitForRecord;

/**
* 正在等待资源的XID事务,携带锁
*/
private final Map<Long, Lock> transactionWaitWithLock;

/**
* 内部进程资源锁
*/
private final Lock selfLock;
逻辑设计

此类中有一个方法,尝试获取记录资源,如果资源没有被任何事务持久则在成员变量表中注册一下,返回null;如果已经被持有,将当前事务xid放入一个等待队列中,返回一个ReentrantLock锁

死锁检测方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* @Author: 711lxsky
* @Description: 死锁检测
*/
private boolean detectDeadlock() throws ErrorException {
this.transactionStamp = new HashMap<>();
this.stampMark = VMSetting.VERSION_LOCK_DEADLOCK_DETECT_RING_STAMP_DEFAULT;
// 从已经获取资源的事务中寻找
for(long transactionXid : this.transactionControlledRecords.keySet()){
// 当前事务戳
Integer xidStamp = this.transactionStamp.get(transactionXid);
// 该事务已经被标记过,跳过检测
if(Objects.nonNull(xidStamp) && xidStamp > VMSetting.VERSION_LOCK_DEADLOCK_DETECT_RING_STAMP_DEFAULT){
continue;
}
// 自增,dfs测环
this.stampMark ++;
if(this.deepFirstSearchForDeadLock(transactionXid)){
return true;
}
}
return false;
}

/**
* @Author: 711lxsky
* @Description: 深度优先搜索检测死锁
*/
private boolean deepFirstSearchForDeadLock(long searchXid) throws ErrorException {
Integer searchXidStamp = this.transactionStamp.get(searchXid);
// 存在环,有死锁
if(Objects.nonNull(searchXidStamp) && searchXidStamp == this.stampMark){
return true;
}
// 已经被检查,且不在当前检查路径中
if(Objects.nonNull(searchXidStamp) && searchXidStamp < this.stampMark){
return false;
}
// 标记事务戳
this.transactionStamp.put(searchXid, this.stampMark);
Long waitingRecordUid = this.transactionWaitForRecord.get(searchXid);
// 看当前事务有没有正在等待的Record记录
if(Objects.isNull(waitingRecordUid)){
return false;
}
// 向上定位资源(相当于有向图的一条边指向后继节点)
Long tarRecordControlledTransactionXid = recordControlledByTransaction.get(waitingRecordUid);
if(Objects.isNull(tarRecordControlledTransactionXid)){
Log.logErrorMessage(ErrorMessage.VERSION_CONTROL_RESOURCE_ERROR);
}
// 看获取目标记录的事务是否会造成死锁
return this.deepFirstSearchForDeadLock(tarRecordControlledTransactionXid);
}

有环的话就会回到某个节点,访问到戳

另外,有一个方法用以释放资源锁,当某个事务提交或者撤销后,就可以释放其持有的锁,并在图中注销

同时,还有一个方法用以选择某个方法来获取被释放的资源,按等待的顺序即FIFO来选择事务持有这个资源

VM对外提供接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* @Author: 711lxsky
* @Description: 事务开始
*/
long begin(int transactionIsolationLevel) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 读取记录中的数据
*/
byte[] read(long xid, long uid) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 插入记录数据
*/
long insert(long xid, byte[] data) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 删除记录
* 实现的是将记录的XMAX设置为当前事务XID,这样后续事务就无法读取到这条记录
*/
boolean delete(long xid, long uid) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 事务提交
*/
void commit(long xid) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 事务撤销
*/
void abort(long xid) throws WarningException, ErrorException;

static VersionManagerImpl buildVersionManager(TransactionManager tm, DataManager dm) throws ErrorException {
return new VersionManagerImpl(tm, dm);
}

实现类

这个实现类同时实现了Record的缓存

成员变量
1
2
3
4
5
6
7
8
9
10
11
12
private final TransactionManager tm;

private final DataManager dm;

/**
* 档期那活跃的事务快照
*/
private final Map<Long, Transaction> activeTransactions;

private final VersionLockManager vlm;

private final Lock selfLock;
逻辑操作
  1. begin事务开始
    创建一个新事务,然后将事务xid返回,同时在活跃事务表中进行注册
  2. read读数据
    先根据xid拿到事务,再调用缓存方法拿到Record,如果检测事务可见性通过就返回数据,最后释放缓存资源引用
  3. insert插入数据
    直接将需要插入的数据包装为Record的字节数组形式,并利用DM持久化
  4. delete删除记录
    拿到事务后检测事务可见性,再使用VersionLockManager进行资源获取尝试,再检测是否发生版本跳跃,再将目标记录的XMAX设置为当前事务xid
  5. commit提交事务
    拿到事务后,将活跃快照表中的数据删掉,再调用VersionLockManager进行资源释放,再调用TM提交事务持久化
  6. abort撤销事务
    拿到事务后调用VersionLockManager进行资源释放,再调用TM撤销事务持久化

之所以撤销事务比较容易,是因为对于其他事务而言,只能看到处于已提交状态的事务所产生的数据,撤销后的事务不会对其他事务产生影响

IndexManager 索引管理

提供基于B+树的聚簇索引,索引数据直接插入数据库文件,而不需要经过版本管理

B+树节点

这里是有一个类BPlusTreeNode实现了B+树节点

结构
  1. NodeHead: [LeafFlag][KeysCount][SiblingUID]
    • LeafFlag byte类型,标识当前节点是否是叶子节点
    • KeyCount short类型,标识当前节点的关键字数量
    • SiblingUID long类型,标识当前节点的兄弟节点UID
  2. NodeBody: [SonNode0Uid][Key0][SonNode1Uid]…[SonNodeNUid][KeyN]
  • SonNodeUid子节点Uid(唯一标识)
  • Key 索引关键字

注意

  • 在叶子节点中uidkey一一对应,存储的就是底层数据
  • 而在非叶子节点中,uid0是没有配对值的,因为默认其左侧是无限小,key0是和uid1配对的, 是uid1子节点中的最小数据,而keyNMAX_VALUE无限大,以方便查找
  • 每个Node都存储在一条DataItem
  • 这里和Mysql中的索引结构是有区别的,innodb引擎中的节点之间使用的是双向链表,可以比较方便的从后往前进行范围查询
成员变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 存放一个B+树的引用,方便使用dm
*/
private BPlusTree bPlusTree;

/**
* 方便管理数据
*/
private DataItem dataItem;

/**
* 节点数据
*/
private SubArray nodeData;

/**
* 当前节点的唯一标识uid
*/
long nodeUid;
实现逻辑

配置类中设置有一个平衡因子,当节点的子节点数量达到平衡因子的两倍时,进行分裂

所以新建一个节点时,会直接申请完整的所需空间NODE_SIZE = NODE_HEAD_SIZE + NODE_SON_COUPLE_SIZE * (NODE_BALANCE_NUMBER + 1) * 2;

新建节点

提供新建根节点和叶子节点的字节数组形式方法

  • 根节点放入左右子节点的uid,同时设置叶子标志为false,以及节点数量为2
  • 叶子节点设置叶子标志为true,节点数量为0
查找
  1. 精确查找
    参数是一个Key,表示目标值,这是实现的是遍历查找(也能改成二分)
    • 如果在这一层找到了,就返回下一层的uid值,非叶子节点就返回下一层节点;叶子节点就是返回实际存储数据位置的持久层位置
    • 如果没有找到,就返回兄弟节点uid,让调用方可以切到右边去找
  2. 范围查找
    参数是两个边界值,直接下到节点中去,先定位到大于等于leftKey的位置,然后往右走拿到后续符合条件的uid
    • 如果这个节点还没走完就超出了右边界就返回
    • 反之就同时返回兄弟节点uid
插入

参数是插入的uidkey

  • 先找到第一个 >= key 的位置,如果没找到而且当前节点有兄弟节点就切到兄弟节点去插
  • 如果没有找到且没有兄弟节点,或者是找到了,就在当前节点中插入
  1. 如果插入的节点是叶子节点,直接将uidkey插入即可
  2. 如果插入的是非叶子节点,因为其uidkey不是对应关系,所以是将kth位置的keykth + 1位置uid数据覆盖

插入之后,会立即判断是否需要分裂,如果节点数达到2 * 平衡因子就进行分裂

分裂

逻辑也比较好理解:

  1. 创建新节点
  2. 复制当前节点的叶子/非叶子属性、兄弟节点信息给新节点
  3. 将新节点的子节点数量设置为平衡因子
  4. 将右侧的半边数据分给新节点
  5. 将新节点持久化
  6. 更新当前节点的兄弟节点为新节点

B+树

有一个B+树类,索引的数据直接被插入到数据库文件进行持久化,不经过版本管理

成员变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* @Description: 数据管理
*/
private DataManager dm;

/**
* @Description: 用以对 rootUid 数据进行处理
*/
private DataItem rootUidDataItem;

/**
* @Description: 自身资源锁
*/
private Lock selfLock;
实现逻辑
构建和加载B+树

初始构建时,创建一个叶子节点作为初始根节点,然后利用DM持久化,并拿到这个节点的标识uid

加载B+树时,则利用DM从持久层拿到数据,然后包裹成B+树对象

更新节点

这个方法用以更新一个节点为真正的根节点,构建一个字节数组形式的根节点之后持久化,并设置为根节点(也可以理解为非叶子节点)

数据查找
  1. 提供一个方法用以搜索下一层能够满足条件的节点
    借助B+树节点类型实现,直接查找满足特定条件的节点
  2. 提供一个方法查找查找符合条件的叶子节点
  3. 提供方法在叶子节点中进行范围查找
    也是借助B+树节点类实现
插入数据

暴露给外部模块以进行数据的插入

  • 如果插入位置是叶子节点,下到内部插入方法,借助B+树节点进行插入,然后直接返回结果
  • 如果插入位置是非叶子节点,就还需要借助精确查找定位到下一层去,进行递归插入,如果插入完成了且发生分裂,还需要更新分裂节点的插入

上面完成后返回插入结果,如果发生分裂,就需要更新根节点

注意:可能需要进行多次节点分裂更新,所以这里比较绕

StatementParser 语句解析模块

这个模块用以进行SQL语句的解析

逻辑实现

Tokenizer 分词器

这个类用以将文本分割为一些最小单位的token

成员变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 语句
*/
private final byte[] statement;

/**
* 当前在语句中的分析定位
*/
private int curAnalysisPos;

/**
* 当前解析出的token
*/
private String curToken;

/**
* 是否需要刷新token,即将下一个token解析出来
*/
private boolean needFlushToken;
实现方法
  • Tokenizer(byte[] statement)
    构造函数,接收一个字节数组作为需要分割的语句,并初始化相关成员变量
  • String peek() throws WarningException
    获取当前解析出的token,如果需要刷新token,则先解析下一个token
  • void pop()
    强制要求刷新token,即将下一个token解析出来
  • String nextMetaState() throws WarningException
    解析下一个token,具体实现根据不同的字符类型进行处理
  • String nextQuoteState() throws WarningException
    处理引号包含的toke
  • String nextTokenState()
    处理只含字母、数字、下划线的token
  • Byte peekByte()
    获取当前分析位置的字符
  • void popByte()
    将当前分析位置向后移动一位
  • String StateAnalysisWrong() throws WarningException
    处理解析错误的情况
  • byte[] getStatement()
    获取完整的解析语句

此类的分词方法结合字符串配置类实现

StatementParser 语句解析器

借助分词器,逐步解析语句,并返回相应的解析结果类,用以供上层模块使用,其中还包含解析where语句,但是比较简陋

这里的判断是写死的,所以如果改动的话会比较麻烦,正规的做法应该是使用语法树

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# begin事务开始语句,设置事务隔离级别
begin [isolation level] [read committed | repeatable read]

# commit事务提交语句
commit

# abort事务回滚语句
abort

# create table 建表语句,指定字段和索引
create table [tableName]
[fieldName1][fieldType1],
...
[fieldNameN][fieldTypeN]
(index [indexName1]...[indexNameM])

# drop table 删除表语句
drop table [tableName]

# select 查询语句, where语句是可选的
select * | [fieldName1], [fieldName2], ...
from [tableName]
where ...

# insert 插入语句
insert into [tableName] values [value1], [value2], ...

# delete 删除语句
delete from [tableName] where ...

# update 更新语句
update [tableName] set [fieldName] = [value] where ...

TableManager 表管理模块

这个模块实现的是表、字段管理,当上层模块调用语法解析器解析出结果后面,就会调用这个模块进行表的创建、数据修改

表结构实现的是在用一个数据库/架构下的表使用链表的形式进行串联

提供接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* @Author: 711lxsky
* @Description: 开启事务
*/
TBMSetting.BeginResult begin(SPSetting.Begin begin) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 提交事务
*/
byte[] commit(long xid) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 撤销事务
*/
byte[] abort(long xid) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 创建表
*/
byte[] create(long xid, SPSetting.Create create) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 插入数据
*/
byte[] insert(long xid, SPSetting.Insert insert) throws WarningException, ErrorException;

byte[] drop(long xid, SPSetting.Drop drop) throws WarningException;

/**
* @Author: 711lxsky
* @Description: 查询数据
*/
byte[] select(long xid, SPSetting.Select select) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 删除数据
*/
byte[] delete(long xid, SPSetting.Delete delete) throws WarningException, ErrorException;

/**
* @Author: 711lxsky
* @Description: 更新数据
*/
byte[] update(long xid, SPSetting.Update update) throws WarningException, ErrorException;

逻辑实现

Field 表字段实体类

此类是表字段的模型,记录有字段的uid唯一标识,字段类型,索引uid,同时成员变量中有一个字段索引关联的B+树

持久化时会调用DM写入数据,同时提供取出数据转化为本类的方法

Table实体类

这个类是表的模型,存储有当前表的uid下一个表的uid,以及该表字段列表,同样持久化时也是调用DM写入,实现字节数据解析为当前类方法

TBM实现的表数据管理部分就是基于这个实体类在做,调用这个类的InsertUpdateSelect方法

Booter启动类

这个类时表管理器的启动类,存储有第一张表(头表)的信息,更新此文件时会先创建一个临时文件再替换为实际文件

TableManager 实现

提供创建方法和打开方法,分别用以创建启动类和打开启动类文件拿到表信息

实现类中包含表缓存信息,以及针对某个事务的一系列表缓存,一开始就会将表信息放入缓存中

创建表也会放入缓存,修改表会进行相应持久化,删除表则会将缓存中数据删除,持久层不用删

传输模块

这个模块用以在创建的Socket基础上进行数据传输

自定义数据包

这里自定义了一个数据包实现类,成员变量是一个字节数组和一个异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 原始数据
byte[] data;

// 异常信息
Exception exception;

public DataPackage(byte[] data, Exception exception) {
this.data = data;
this.exception = exception;
}

public byte[] getData() {
return data;
}

public Exception getException() {
return exception;
}

编解码器

通信数据使用的是二进制,这个编解码器负责将自定义数据包和字节数组之间的相互转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* @Author: 711lxsky
* @Description: 数据编码,将数据包转换成字节数组
*/
public static byte[] dataEncode(DataPackage dataPackage){
Exception dataException = dataPackage.getException();
if(Objects.nonNull(dataException)){
return Bytes.concat(new byte[]{TransportSetting.DATA_IS_EXCEPTION_TRUE}, dataException.getMessage().getBytes());
}
else {
return Bytes.concat(new byte[]{TransportSetting.DATA_IS_EXCEPTION_FALSE}, dataPackage.getData());
}
}

/**
* @Author: 711lxsky
* @Description: 数据解码,将字节数组转换成数据包
*/
public static DataPackage dataDecode(byte[] data) throws WarningException {
if(data.length < TransportSetting.DATA_MIN_LENGTH_DEFAULT){
Log.logWarningMessage(WarningMessage.DATA_ERROR);
}
// 正常数据
if(data[0] == TransportSetting.DATA_IS_EXCEPTION_FALSE){
return new DataPackage(Arrays.copyOfRange(data, TransportSetting.DATA_EXCEPTION_MARK_OFFSET, data.length), null);
}
// 异常信息
else if(data[0] == TransportSetting.DATA_IS_EXCEPTION_TRUE){
return new DataPackage(null, new RuntimeException(new String(Arrays.copyOfRange(data, TransportSetting.DATA_EXCEPTION_MARK_OFFSET, data.length))));
}
else {
Log.logWarningMessage(WarningMessage.DATA_ERROR);
return null;
}
}

编码时,会根据是有异常,如果有异常就将异常信息放到字节数组中发出,并设置一个校验位;解码时,根据校验位来判断是将二进制数据转换为data还是异常

Transporter 传输器

传输器实现类基于socket,先构建获取自socket读流、写流

同时提供方法实现十六进制数据和字节数组之间的互相转换

写时,将字节数据转换为十六进制数据
读时,将十六进制数据转换为字节数据

向外提供 Packager 数据包装传输器

暴露一个Packager类,以传输器为成员变量,提供发送数据、接受数据方法

服务端

Server类

Server类中以TBM和一个int类型的端口为成员变量,只有一个启动方法,先尝试创建一个监听特定端口的ServerSocket,然后构建一个线程池,之后进入一个true条件的while循环,不断接收客户端的连接请求,对每个连接进行处理,然后创建一个SocketHandler处理器,传入tbmsocket连接,并交给之前构建的线程池去执行

SocketHandler 处理器

这个类继承了Runnable,算是一个线程类,先打印和客户端的连接信息,然后创建一个传输模块提供的Packager,然后创建一个Executor执行器,利用packager接收客户端数据,再交给执行器执行SQL语句,得到结果后封装返回

Executor 服务端SQL语句解析执行器

这个类以xidtbm为成员变量,调用SP模块静态方法解析SQL语句,然后根据返回的类型调用tbm做实际的数据操作

Launcher 服务端启动类

这个类用以进行服务端的启动,借助 org.apache.commons.cli创建命令行交互,指导创建或者打开数据库,去调用TMDMVMTBM模块方法,如果是打开数据库就会启动Server,准备监听客户端

客户端

Client类

Packager为成员变量,实现有一个execute方法以进行数据的收发

Shell 命令行类

Client为成员变量,比较简陋地读取用户输入,然后调用Client以发送、拿到返回的数据,同时执行Clientclose方法,关闭数据包

Launcher 客户端启动类

同样,客户端也有一个启动类,先创建一个ClientSocket,然后构建Packager,最后构建Shell,然后启动Shell

结语

整体来说项目难度还是不低的,比起业务上的设计我感觉更考验Java基础,而且项目整体设计和把握也需要一定的架构能力

哎哎,还是得继续撸码🤥

评论