EasyDB 项目回顾
最近为简历头疼死了😐,总感觉之前投的简历没写好,而且挺久没看这个项目了,忽然想起来貌似之前有篇博客说要写篇关于这个项目的文章,好吧,那就回顾回顾正好写写
除了这个轮子项目还有一个微服务直播项目,有空也写写(立Flag小能手)🙃,那个业务上写的挺好的感觉
实现功能
俺就不像简历上的那样逐字逐句斟酌了,简单随随便便写写
列一下
- 事务持久化
- 计数缓存框架
- 日志持久化 +
WAL机制
- 第一检验页面和普通页面 + 可靠数据检测恢复机制
- 页面索引(比较简陋)
- 数据共享类和全局统一数据类
MVCC + 2PL + 等待图死锁检测 和基于此的调度序列可串行化 + 读已提交、可重复读事务隔离级别
B+树结构的索引
- 分词器和
SQL解析器
- 表字段、结构管理器
- 基于
Socket的客户端服务端通信加密机制
小小说明
持久化
这里持久化都是放在文件中去做的,所以每次从持久层读都是做磁盘IO,是比较消耗资源的
计数缓存
于是就弄了个技术缓存框架,存放资源的引用情况,当某个资源释放到了引用为0时就直接写回磁盘文件,框架的意思就是提供静态方法直接使用和抽象方法让子类去实现,所以每个缓存实现类都有从数据源/持久层拿到资源的能力
日志 WAL
日志说的是数据库操作的日志,不是项目的日志,日志中存储的是数据操作的记录,WAL(Write-Ahead Logging)即预写式日志,每次数据修改之前先将修改记录落到日志,同时确保其已经刷新到磁盘,再做真正的数据修改
页面设计
页面设计的是固定大小,然后第一页有个特殊用处是做数据校验,每次启动数据库会先将某个字符串写入第一页的某个位置,然后正常关闭时在另一个位置写入同一条字符串;下一次启动时查看这两处的字符串是否一致,不一致就执行redo重做事务和undo回滚事务
页面索引机制则是将页面划分为40个区间,然后以每个页面的空闲区间数量为关键字存到哈希表中,值则是一个页号列表,每次需要将数据写入页面中就从能够满足空间需求的页号列表中拿出一个页面写入
数据传递管理
因为Java取一个数组的分片时,时拷贝这段分片内存中的数据,所以设计一个数据共享类,数据放在一个字节数组中,每次将这个数据传递,并给使用这个数组不同的模块设置不同的起始、结束位置
并发控制调度序列
版本管理中,抽象出一个记录,每个记录会标识创建、删除本身的事务;再维护活跃事务快照;
针对每个事务,看到的都是某个时间点的数据库版本,记录当时的事务快照,这样MVCC多版本并发控制就可以实现;
再基于两段锁(2PL)协议,读和插入时不用在版本管理模块申请锁,而在记录被修改时删除原记录(标识,非真删)时才强制加锁,事务完成记录修改或者回滚时进行解锁,整个过程中相较于单纯的2PL就降低了阻塞概率;
另外加锁时还会利用等待图法,使用深度优先遍历方法进行环检测,有环就是有死锁,有死锁就会尝试回滚这个尝试加锁的事务
B+树索引
这里实现了B+树结构,当时写的时候感觉挺难搞
非叶子节点存储索引数据,叶子节点存储真实数据,同层节点以链表形式连接,单个节点设置有平衡因子,超过这个因子的2倍时进行分裂操作
SQL解析
先写了个分词器,根据特定语法结构将SQL语句进行分割并向外提供逐个读取的方法,然后解析器就利用分词器和字符串匹配,解析不同类型的语句并返回特殊定义的相应结果
其实按道理应该是使用语法树 + 过滤器
表字段、结构管理器
这里就根据不同类型的语句解析解析结果,调用下层的数据模块、版本模块、事务模块做数据持久化、缓存操作
通信
其实这里使用的就是Java原生的Socket,服务端和客户端各自创建ServerSocket和ClientSocket,然后去做连接,这里也比较简陋简单
整体架构模块
通用模块
- AbstractCache 抽象缓存层
- SubArray 数据共享层
核心层
- TransactionManager(TM)事物管理模块
- DataManager(DM)数据管理模块
- VersionManager(VM)版本管理模块
- IndexManager(IM)索引管理模块
- StatementParser(SP)语句解析模块
- TableManager(TBM)表管理模块
通信层
- Transpoter 数据传输器
- Server 服务端
- Client 客户端
模块解析
针对每个模块稍稍详细地写了些,贴了些源码,整个项目源码可以直接看仓库 -> EasyDB
缓存框架
这里使用的是计数缓存框架,不使用LRU是因为考虑到资源驱逐不可控,而使用计数缓存则可以让上层模块主动释放引用,确保模块中不存在这个资源的引用,再去释放资源
实现机制
1. 成员变量和构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| private final HashMap<Long, T> cacheData;
private final HashMap<Long, Integer> referenceRecord;
private final HashMap<Long, Boolean> acquisitionSituation;
private final int maxResourceNum;
private int cacheCounter;
private final Lock lock;
public AbstractCache(int maxResourceNum) throws ErrorException { if(maxResourceNum < 0){ Log.logErrorMessage(ErrorMessage.CACHE_RESOURCE_NUMBER_ERROR); } this.maxResourceNum = maxResourceNum; this.cacheData = new HashMap<>(); this.referenceRecord = new HashMap<>(); this.acquisitionSituation = new HashMap<>(); this.lock = new ReentrantLock(); this.cacheCounter = 0; }
|
其中最大缓存资源数设置为一个配置项
2. 抽象方法:
1 2 3 4 5 6 7 8 9 10 11 12 13
|
protected abstract T getCacheFromDataSourceByKey(long cacheKey) throws ErrorException, WarningException;
protected abstract void releaseCacheForObject(T Object) throws WarningException, ErrorException;
|
这两个抽象方法是具体的缓存实现类继承后去做的,分别是从数据源获取数据(类似从磁盘读)和释放缓存时的写回操作(写入持久化文件/磁盘)
3. 操作逻辑:
类中提供两个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
protected T getResource(long key) throws WarningException, ErrorException { while(true){ this.lock.lock(); if(this.acquisitionSituation.containsKey(key)){ this.lock.unlock(); try { Thread.sleep(ThreadSetting.CACHE_GET_SLEEP_TIME); } catch (InterruptedException e) { Log.logWarningMessage(e.getMessage()); continue; } continue; } if(this.cacheData.containsKey(key)){ T object = this.cacheData.get(key); this.referenceRecord.put(key, this.referenceRecord.get(key) + 1); this.lock.unlock(); return object; } if(this.maxResourceNum > 0 && this.cacheCounter == this.maxResourceNum){ this.lock.unlock(); Log.logWarningMessage(WarningMessage.CACHE_FULL); return null; } this.acquisitionSituation.put(key, true); this.lock.unlock(); break; } this.lock.lock(); T object; try { object = this.getCacheFromDataSourceByKey(key); this.cacheData.put(key, object); this.cacheCounter ++; this.referenceRecord.put(key, 1); this.acquisitionSituation.remove(key); } finally { this.lock.unlock(); } return object; }
protected void releaseOneReference(long key) throws WarningException, ErrorException { this.lock.lock(); try { int referenceNum = this.referenceRecord.get(key) - 1; if(referenceNum == 0){ T obj = cacheData.get(key); this.releaseCacheForObject(obj); this.referenceRecord.remove(key); this.cacheData.remove(key); this.cacheCounter --; } else { this.referenceRecord.put(key, referenceNum); } } finally { this.lock.unlock(); } }
|
getResource()方法先在一个while循环中不断申请资源,如果在资源获取情况记录表中别的资源正在从数据源拿到这个资源就让线程sleep,直到没有其他线程获取这个资源,然后如果缓存就在缓存资源表中就直接拿到返回,同时将引用次数 + 1;如果不在其中,就先判断是否达到最大缓存数量,再在资源获取表中注册一下,调用实现类的获取资源方法,跳出循环,如果获取成功就放入缓存。
releaseOneReference()方法释放资源引用时,先将引用表中的引用数 - 1,如果降到0了,说明这个资源没有被引用,可以被释放了,就调用实现类的释放资源写回持久区的方法,同时移除删除这个缓存
- 这两个方法执行的过程中都有加解锁操作,就是为了保证线程安全,缓存资源数据不会出现紊乱
4. 安全关闭
此外,还有一个安全关闭策略,用于缓存关闭时强制将资源全部写回持久区
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
protected void close() throws ErrorException, WarningException { this.lock.lock(); try{ Set<Long> keys = this.cacheData.keySet(); for(long key: keys){ T obj = this.cacheData.get(key); this.releaseCacheForObject(obj); this.cacheData.remove(key); this.referenceRecord.remove(key); } } finally { this.lock.unlock(); } }
|
SubArray 数据共享层
因为Java中数组分片截取时,是做一个元素复制,而不是指向同一片内存,所以针对截取部分数组的修改对原数组不可见,这里封装了一个类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class SubArray {
public byte[] rawData;
public int start;
public int end;
public SubArray(byte[] rawData, int start, int end) { this.rawData = rawData; this.start = start; this.end = end; } }
|
传递类实例的时候,对数据进行修改了可以感知到
TransactionManager(TM)
概述
作为事物管理模块,以XID为第一关键字,将所有事务信息持久化在.xid文件中。并提供接口供其他模块查询某个事务的状态
事务状态
设为全局常量,放在一个常量配置类中,有三种:
1 2 3 4
| public static final byte TRANSACTION_ACTIVE = 0; public static final byte TRANSACTION_COMMITTED = 1; public static final byte TRANSACTION_ABORTED = 2;
|
特殊
- 提供一个超级事务,这个事务的
XID为0,可以在没有申请的事务的情况下执行某些操作。且超级事务状态永远是committed
- TM模块只负责记录、维护某个事务状态,涉及事务数据提交、回滚另有数据管理模块做
.xid文件结构
每个事务都有一个XID,这个XID唯一标识此事务,且XID从1开始自增,不可重复(相对单个的.xid文件而言,如果不在同一个文件自然可以重复)。
文件头部有一个8字节大小的数字,记录当前文件中的事务数量,然后每个事务的状态占据1个字节
所以结构是这样:
[t_cnt 事务个数(8字节)][t_status 事务状态(1字节)]…
某个XID = x_id的事务状态存储在(x_id - 1) + 8字节位置(XID=0的超级事务不需记录)
提供接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
long begin() throws WarningException, ErrorException;
void commit(long xid) throws WarningException, ErrorException;
void abort(long xid) throws WarningException, ErrorException;
boolean isActive(long xid) throws ErrorException;
boolean isCommitted(long xid) throws ErrorException;
boolean isAborted(long xid) throws ErrorException;
void close() throws ErrorException;
static TransactionManagerImpl create(String xidFileFullName) throws WarningException, ErrorException { File newFile = FileManager.createFile(xidFileFullName + TMSetting.XID_FILE_SUFFIX); return buildTMWithFile(newFile, false); }
static TransactionManagerImpl open(String xidFileFullName) throws WarningException, ErrorException { File newFile = FileManager.openFile(xidFileFullName + TMSetting.XID_FILE_SUFFIX); return buildTMWithFile(newFile, true); }
|
实现逻辑要点
.xid文件类型和读取
.xid文件使用的类型使用的是RandomAccessFile,文件读写基于从其中拿到的FileChannel
- 首先,
RandomAccessFile提供文件的随机访问能力,允许程序直接跳转到文件的任意位置进行读写操作。
- 其次,
FileChannel是Java NIO的一部分,支持直接缓冲区和内存映射文件,这允许操作系统在不涉及Java堆的情况下处理大文件,减少数据拷贝,提高性能;另外还提供文件锁定功能,实现文件或文件区域的独占访问,在多线程环境下保证数据一致性
计数器校验
打开.xid文件并创建一个TransactionManager实现类实例时,会校验.xid文件头,先是看文件长度会不会小于计数器长度,然后根据计数器算出事务数量以及相应需要的文件长度,和实际文件长度对比
计数器操作
实现类中有一个锁成员变量,用以在修改计数器时进行加解锁操作,防止其他线程修改,造成数据不一致
接口实现
begin(): 先将计数器 + 1 位置的事务状态设置为执行中,再自增计数器
close(): 关闭RandomAccessFile和FileChannel
- 其他方法就是根据
XID从相应位置读出状态或者更新状态 or 加以判断(中间包含校验超级事务逻辑)
DM 数据管理模块
DM模块管理数据库DB文件和日志文件。
主要职责:
- 分页管理
DB文件,并缓存
- 管理日志文件,保证在发生错误时可以根据日志进行恢复
- 抽象
DB文件为DataItem供上层模块调用
所以,DM模块是上层模块和文件系统至之间的抽象层,向下读写文件,向上提供数据包装,再加上一个日志功能
对外提供接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
DataItem readDataItem(long uid) throws WarningException, ErrorException;
long insertData(long xid, byte[] data) throws WarningException, ErrorException;
void writeLog(byte[] log) throws WarningException, ErrorException;
void releaseOneDataItem(long uid) throws WarningException, ErrorException;
void close() throws ErrorException, WarningException;
|
除了这些,还有create()和open()方法,前者会调用PageCache和Logger进行相应文件创建,后者则是打开并校验数据
实现逻辑
页面
针对文件系统,将其抽象成为页面,每次对文件系统的读写都是以页面为单位,且这里将单个页面的大小设置为8KB,放在配置类中,可修改
页面缓存
有一个页面缓存的实现,这个缓存对外提供接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
int getPagesNumber();
int buildNewPageWithData(byte[] initData) throws WarningException, ErrorException;
Page getPageByPageNumber(int pageNumber) throws WarningException, ErrorException;
void releaseOneReference(Page page) throws WarningException, ErrorException;
void truncatePageWithMPageNum(int maxPageNumber) throws WarningException;
void flushPage(Page page) throws WarningException, ErrorException;
void close() throws ErrorException;
|
同时类似于TM,会创建或者打开一个.pg文件,用以做页面数据的持久化,不同的是加入了一个long类型的memory内存大小参数,用以计算数据页资源的最大缓存数量
实现类中,成员变量有:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
private final RandomAccessFile pageDataFile;
private final FileChannel pageFileChannel;
private final AtomicInteger pageNumbers;
private final Lock pageFileLock;
|
所以新建一个页面时,就自增原子数据,然后将数据包裹为通用Page对象,写入刷新到.pg文件中
前面说了单个页面数据的大小,所以需要拿到某个页面的数据时,偏移量就是 (long) (pageNumber - 1) * PageSetting.PAGE_SIZE;
至于从数据源拿到缓存资源,这里的key就是页面号,算到偏移量之后直接读数据;释放资源写回文件操作是,判断这个page是否是脏页,是的话就写回
截断页面方法,是为了为恢复数据作铺垫,直接将.pg文件截断到某个长度
页面对象
这里首先有一个通用的页面接口Page
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
void lock();
void unlock();
void releaseOneReference() throws WarningException, ErrorException;
void setDirtyStatus(Boolean status);
boolean isDirty();
int getPageNumber();
byte[] getPageData();
|
落到实现类上去也很简单,成员变量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
private final int pageNumber;
private final byte[] data;
private boolean dirtyStatus;
private final PageCache pageCache;
private final Lock lock;
|
唯一需要注意的就是PageCache的使用,释放一个资源时,就是释放本身
其次是第一页,这个页面用以做数据库启动时的数据校验,DB启动时,给100~107字节处填入一个随机字节ValidCheck,DB关闭时再将其拷贝到108~115字节处(这个位置可调), 数据库每次启动时,会检查两处字节是否相同,以此判断上一次是否正常关闭。如果非正常关闭,就需要执行数据恢复
再就是普通页,这个页在文件中的结构是 [页头][存储数据] 页头是个2字节的无符号整形,记录了当前页的空闲空间的偏移量
此类就提供一些数据修改的方法
注意: 第一页和普通页提供的都是静态方法,数据都是持久化在.pg文件中的,是由通用Page控制的
日志
对外提供接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
void writeLog(byte[] data) throws WarningException, ErrorException;
void rewind();
byte [] readNextLogData() throws ErrorException, WarningException;
void truncate(long length) throws WarningException;
void close() throws ErrorException;
|
持久化也是基于一个.log文件
具体实现
实现类的成员变量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
private final RandomAccessFile logFile;
private final FileChannel logFileChannel;
private final Lock lock;
private long logFileLocationPointer;
private long logFileOriginLength;
private int logsChecksum;
|
日志文件的格式:
[LogsChecksum] [Log1] [Log2] … [LogN] [BadTail]
LogsChecksum为后续所有日志计算的Checksum,4字节int类型
Log1...LogN是常规日志数据
BadTail是在数据库崩溃时,没有来得及写完的日志数据,这个BadTail不一定存在
单条日志记录格式:
- [Size][Checksum][Data]
Size标记Data字段的字节数, 4字节int类型
Checksum是该条数据的校验和, 4字节int类型
Data是实际的数据
日志数据类型分为插入和更新,没有删除是因为直接将这条数据的有效标志位设为invalidate即可
单条日志的校验和基于一个种子算出:
1 2 3 4 5 6 7 8 9 10
|
private int calculateChecksum(int logChecksum, byte[] log){ for(byte littleData : log){ logChecksum = logChecksum * LoggerSetting.LOGGER_SEED + littleData; } return logChecksum; }
|
实现类有一个用于读取下一条日志的方法,可以为其他模块提供迭代读取日志数据的功能
另外还提供一个截断方法,将FileChannel截断到指定位置
恢复策略
有一个Recover类,专门用于进行数据恢复
调用日志模块,迭代读取每条日志,并拿到最大的事务xid,以此xid为基准,截断页面文件,再从前到后顺序redo重做所有状态不是活跃(也就是已提交和撤销)的事务;从后到前undo逆序回滚所有未完成(也就是活跃)的事务
页面索引
页面空间在页面索引管理视角下,是被分割成40个(默认,可调)小区间的,
初始时空闲区间数量就是40, 然后用着用着就会减少空间,空闲区间数量会减少
所以页面索引就以空闲区间的数量为基准,管理页面,每次写一个页面时,就会按照需要的空间大小去找合适的页面
在启动时,就会遍历所有的页面信息,获取页面的空闲空间,安排到这40个区间中
insert在请求一个页时,会首先将所需的空间向上取整,映射到某一个区间,随后取出这个区间的任何一页,都可以满足需求
实现的逻辑是在能满足空间要求的情况下,优先去找空闲空间更小的页面
注意:插入时被选择的页会被直接从PageIndex中暂时移除,上层模块调用完之后再重新插入
DataItem
DM层向上提供的数据抽象接口,上层模块通过地址,向DM请求到相应的DataItem,再获取数据
DataItem中保存的数据结构DataRecord格式:
[Valid][DataSize][Data]
Valid: 1字节, 用于标记数据是否有效,1有效, 0无效
DataSize: 2字节, 标识Data的大小
提供接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
|
SubArray getDataRecord();
SubArray getRawDataRecord();
boolean isValid();
void beforeModify();
void unBeforeModify();
void afterModify(long xid) throws WarningException, ErrorException;
void releaseOneReference() throws WarningException, ErrorException;
void readLock();
void readUnlock();
void writeLock();
void writeUnlock();
Page getPage();
long getUid();
byte[] getOldDataRecord();
static byte[] buildDataRecord(byte[] data){ byte[] valid = new byte[]{DataItemSetting.DATA_VALID}; byte[] size = ByteParser.shortToBytes((short)data.length); return Bytes.concat(valid, size, data); }
static DataItem buildDataItem(Page page, short offset, DataManager dm){ byte[] rawData = page.getPageData(); byte[] dataItemDataSizeBytes = Arrays.copyOfRange(rawData, offset + DataItemSetting.DATA_SIZE_OFFSET, offset + DataItemSetting.DATA_DATA_OFFSET); short dataItemDataSize = ByteParser.parseBytesToShort(dataItemDataSizeBytes); short dataRecordLength = (short)(DataItemSetting.DATA_DATA_OFFSET + dataItemDataSize); long uid = Logger.parsePageNumberAndOffsetToUid(page.getPageNumber(), offset); SubArray dataRecord = new SubArray(rawData, offset, offset + dataRecordLength); return new DataItemImpl(dataRecord, new byte[dataRecordLength], page, uid, dm); }
static void setDataRecordInvalid(byte[] dataRecord){ dataRecord[DataItemSetting.DATA_VALID_OFFSET] = DataItemSetting.DATA_INVALID; }
|
上层模块需要对DataItem进行修改操作时,需要先调用beforeModify()方法,然后修改数据,最后调用afterModify()方法将记录写入日志;如果要撤销修改,就调用unBeforeModify()方法,将备份数据拷回
另外,与之相关的缓存键是由页号和偏移量组成的8 字节无符号整数
VersionManager 版本控制模块
VM模块是事务额数据版本的管理核心
实现MVCC多版本并发控制
DM向外提供DataItem,而VM通过管理所有数据项,向上层提供记录,上层模块操作数据的最小单位就是记录,然后VM在内部为每个记录维护了多个版本,每当上层模块对某个记录进行修改时,VM就会为这个记录创建一个新版本
采用两段锁协议2PL实现调度序列的可串行化,同时利用MVCC降低事务阻塞概率
实现事务隔离级别的读已提交和可重复读
事务抽象
针对事务操作,实现一个事务类
成员变量
1 2 3 4 5 6 7 8 9 10 11
| private long xid;
private final int transactionIsolationLevel;
private Set<Long> snapshotXIDsForActiveTransaction;
private boolean accidentalTermination;
|
snapshotXIDsForActiveTransaction是专门用以记录某个事务执行时活跃的事务xid,方便可重复读级别下进行判断某个事务是否在当前事务的快照中
同时还有一个事务意外终止判断,如果发生意外终止就将此成员变量设为true
记录Record
这个类用以维护记录结构,且一个这个对象只有一个版本,一条记录存储在一个DataItem中
结构
[XMIN][XMAX][Data]
XMIN表示的是创建这个记录的事务XID,也就是在此事务之后的事务才有可能拿到这个记录
XMAX表示的是删除这个记录的事务XID,也就是在此事务之前的事务才有可能拿到这个记录,前两者都是8字节long
Data是这个事务持有的数据
逻辑操作
- 数据读取时,需要获取读锁
- 修改数据
XMAX时,需要调用DataItem的方法
这里只有一个构建方法,用以根据xid和data包裹成一个Record字节数组数据,然后就是设置XMAX方法,data数据修改逻辑交给TBM管理
事务可见性判断
这里有一个类VisibilityJudge实现了针对读已提交和可重复读的事务可见性判读
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
|
public static boolean judgeVisibility(TransactionManager tm, Transaction transaction, Record record) throws WarningException, ErrorException { long transactionXid = transaction.getXid(); long recordXmin = record.getXMIN(); long recordXmax = record.getXMAX(); if(transactionXid == recordXmin && recordXmax == VMSetting.RECORD_XMAX_DEFAULT) { return true; } switch (transaction.getTransactionIsolationLevel()){ case VMSetting.TRANSACTION_ISOLATION_LEVEL_READ_COMMITTED: return judgeForReadCommitted(tm, recordXmin, recordXmax); case VMSetting.TRANSACTION_ISOLATION_LEVEL_REPEATABLE_READ: return judgeForRepeatableRead(tm, transaction, recordXmin, recordXmax); default: Log.logWarningMessage(WarningMessage.TRANSACTION_ISOLATION_LEVEL_UNKNOWN); return false; } }
private static boolean judgeForReadCommitted(TransactionManager tm, long recordXmin, long recordXmax) throws ErrorException { if(tm.isCommitted(recordXmax)){ if(recordXmax == VMSetting.RECORD_XMAX_DEFAULT){ return true; } if(recordXmax != recordXmin){ return ! tm.isCommitted(recordXmax); } } return false; }
private static boolean judgeForRepeatableRead(TransactionManager tm, Transaction transaction, long recordXmin, long recordXmax) throws ErrorException { long transactionXid = transaction.getXid(); if(tm.isCommitted(recordXmin) && (recordXmin < transactionXid && !transaction.isInSnapshot(recordXmin))){ if(recordXmax == VMSetting.RECORD_XMAX_DEFAULT){ return true; } if(recordXmax != recordXmin){ return ! tm.isCommitted(recordXmax) || recordXmax > transactionXid || transaction.isInSnapshot(recordXmax); } } return false; }
|
读已提交的问题: 不可重复读和幻读
不可重复读(Non-repeatable Read):
发生在同一个事务内,当事务在不同时间点读取相同数据时,由于其他事务对数据的更新(修改或删除),导致事务内部的两次读取结果不一致。
例子:假设事务A读取一行记录,然后事务B更新了该记录并提交,事务A再次读取同一行时,会发现数据已被修改,尽管事务A自己没有进行任何写操作。
幻读(Phantom Read):
也是在同一个事务内,当事务执行两次相同的查询(比如范围查询),由于其他事务在两次查询之间插入了新的记录,使得第二次查询的结果包含了第一次查询时不存在的记录。
例子:事务A首次执行一个区间查询,得到一定范围内的记录。随后,事务B在该范围内插入新的记录并提交,事务A再次执行相同的查询,会发现新的记录出现在结果集中,仿佛是“幻影”般突然出现。
不可重复读关注的是数据行本身的修改,即数据值的变更,而幻读关注的是数据集的完整性,即在查询范围内行数的增减
在此基础上,可重复读需要忽略:在当前事务之后开始的事务数据、本事务开始时还是active状态事务的数据
版本跳跃问题:事务读取数据时跳过了某些版本,实际上问题不是很大,中间事务的修改看不到了而已,最终保存的是最后事务的提交结果
读已提交允许版本跳跃,可重复读不允许
解决版本跳跃: 如果当前事务Ti需要修改某个数据X,但是X已经被当前事务Tj不可见的事务修改,那么就要求Tj回滚。至于不可见的条件,就是Tj在Ti之后执行且已提交或者Tj在Ti的快照中(Ti开始时Tj活跃)
1 2 3 4 5 6 7 8 9 10 11
|
public static boolean judgeVersionHopping(TransactionManager tm, Transaction transaction, Record record) throws ErrorException { if(transaction.getTransactionIsolationLevel() == VMSetting.TRANSACTION_ISOLATION_LEVEL_READ_COMMITTED){ return false; } long recordXmax = record.getXMAX(); return tm.isCommitted(recordXmax) && (recordXmax > transaction.getXid() || transaction.isInSnapshot(recordXmax)); }
|
死锁检测
因为2PL会阻塞事务,直到持有锁的线程释放锁。为了检测死锁,可以将锁的等待关系抽象成有向边,查看这个图中是否存在环
VersionLockManager
这里实现了一个VersionLockManager类,用以在内存中维护图
每次出现等待时,就尝试向图中增加一条边,并进行死锁检测,如果检测死锁就撤销这条边,不允许添加,并撤销事务
成员变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
private final Map<Long, List<Long>> transactionControlledRecords;
private final Map<Long, Long> recordControlledByTransaction;
private final Map<Long, List<Long>> recordWaitByTransactions;
private final Map<Long, Long> transactionWaitForRecord;
private final Map<Long, Lock> transactionWaitWithLock;
private final Lock selfLock;
|
逻辑设计
此类中有一个方法,尝试获取记录资源,如果资源没有被任何事务持久则在成员变量表中注册一下,返回null;如果已经被持有,将当前事务xid放入一个等待队列中,返回一个ReentrantLock锁
死锁检测方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
private boolean detectDeadlock() throws ErrorException { this.transactionStamp = new HashMap<>(); this.stampMark = VMSetting.VERSION_LOCK_DEADLOCK_DETECT_RING_STAMP_DEFAULT; for(long transactionXid : this.transactionControlledRecords.keySet()){ Integer xidStamp = this.transactionStamp.get(transactionXid); if(Objects.nonNull(xidStamp) && xidStamp > VMSetting.VERSION_LOCK_DEADLOCK_DETECT_RING_STAMP_DEFAULT){ continue; } this.stampMark ++; if(this.deepFirstSearchForDeadLock(transactionXid)){ return true; } } return false; }
private boolean deepFirstSearchForDeadLock(long searchXid) throws ErrorException { Integer searchXidStamp = this.transactionStamp.get(searchXid); if(Objects.nonNull(searchXidStamp) && searchXidStamp == this.stampMark){ return true; } if(Objects.nonNull(searchXidStamp) && searchXidStamp < this.stampMark){ return false; } this.transactionStamp.put(searchXid, this.stampMark); Long waitingRecordUid = this.transactionWaitForRecord.get(searchXid); if(Objects.isNull(waitingRecordUid)){ return false; } Long tarRecordControlledTransactionXid = recordControlledByTransaction.get(waitingRecordUid); if(Objects.isNull(tarRecordControlledTransactionXid)){ Log.logErrorMessage(ErrorMessage.VERSION_CONTROL_RESOURCE_ERROR); } return this.deepFirstSearchForDeadLock(tarRecordControlledTransactionXid); }
|
有环的话就会回到某个节点,访问到戳
另外,有一个方法用以释放资源锁,当某个事务提交或者撤销后,就可以释放其持有的锁,并在图中注销
同时,还有一个方法用以选择某个方法来获取被释放的资源,按等待的顺序即FIFO来选择事务持有这个资源
VM对外提供接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
long begin(int transactionIsolationLevel) throws WarningException, ErrorException;
byte[] read(long xid, long uid) throws WarningException, ErrorException;
long insert(long xid, byte[] data) throws WarningException, ErrorException;
boolean delete(long xid, long uid) throws WarningException, ErrorException;
void commit(long xid) throws WarningException, ErrorException;
void abort(long xid) throws WarningException, ErrorException;
static VersionManagerImpl buildVersionManager(TransactionManager tm, DataManager dm) throws ErrorException { return new VersionManagerImpl(tm, dm); }
|
实现类
这个实现类同时实现了Record的缓存
成员变量
1 2 3 4 5 6 7 8 9 10 11 12
| private final TransactionManager tm;
private final DataManager dm;
private final Map<Long, Transaction> activeTransactions;
private final VersionLockManager vlm;
private final Lock selfLock;
|
逻辑操作
begin事务开始
创建一个新事务,然后将事务xid返回,同时在活跃事务表中进行注册
read读数据
先根据xid拿到事务,再调用缓存方法拿到Record,如果检测事务可见性通过就返回数据,最后释放缓存资源引用
insert插入数据
直接将需要插入的数据包装为Record的字节数组形式,并利用DM持久化
delete删除记录
拿到事务后检测事务可见性,再使用VersionLockManager进行资源获取尝试,再检测是否发生版本跳跃,再将目标记录的XMAX设置为当前事务xid
commit提交事务
拿到事务后,将活跃快照表中的数据删掉,再调用VersionLockManager进行资源释放,再调用TM提交事务持久化
abort撤销事务
拿到事务后调用VersionLockManager进行资源释放,再调用TM撤销事务持久化
之所以撤销事务比较容易,是因为对于其他事务而言,只能看到处于已提交状态的事务所产生的数据,撤销后的事务不会对其他事务产生影响
IndexManager 索引管理
提供基于B+树的聚簇索引,索引数据直接插入数据库文件,而不需要经过版本管理
B+树节点
这里是有一个类BPlusTreeNode实现了B+树节点
结构
- NodeHead: [LeafFlag][KeysCount][SiblingUID]
LeafFlag byte类型,标识当前节点是否是叶子节点
KeyCount short类型,标识当前节点的关键字数量
SiblingUID long类型,标识当前节点的兄弟节点UID
- NodeBody: [SonNode0Uid][Key0][SonNode1Uid]…[SonNodeNUid][KeyN]
SonNodeUid子节点Uid(唯一标识)
Key 索引关键字
注意:
- 在叶子节点中
uid和key一一对应,存储的就是底层数据
- 而在非叶子节点中,
uid0是没有配对值的,因为默认其左侧是无限小,key0是和uid1配对的, 是uid1子节点中的最小数据,而keyN是MAX_VALUE无限大,以方便查找
- 每个
Node都存储在一条DataItem中
- 这里和
Mysql中的索引结构是有区别的,innodb引擎中的节点之间使用的是双向链表,可以比较方便的从后往前进行范围查询
成员变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
private BPlusTree bPlusTree;
private DataItem dataItem;
private SubArray nodeData;
long nodeUid;
|
实现逻辑
配置类中设置有一个平衡因子,当节点的子节点数量达到平衡因子的两倍时,进行分裂
所以新建一个节点时,会直接申请完整的所需空间NODE_SIZE = NODE_HEAD_SIZE + NODE_SON_COUPLE_SIZE * (NODE_BALANCE_NUMBER + 1) * 2;
新建节点
提供新建根节点和叶子节点的字节数组形式方法
- 根节点放入左右子节点的
uid,同时设置叶子标志为false,以及节点数量为2
- 叶子节点设置叶子标志为
true,节点数量为0
查找
- 精确查找
参数是一个Key,表示目标值,这是实现的是遍历查找(也能改成二分)
- 如果在这一层找到了,就返回下一层的
uid值,非叶子节点就返回下一层节点;叶子节点就是返回实际存储数据位置的持久层位置
- 如果没有找到,就返回兄弟节点
uid,让调用方可以切到右边去找
- 范围查找
参数是两个边界值,直接下到节点中去,先定位到大于等于leftKey的位置,然后往右走拿到后续符合条件的uid
- 如果这个节点还没走完就超出了右边界就返回
- 反之就同时返回兄弟节点
uid
插入
参数是插入的uid和key
- 先找到第一个
>= key 的位置,如果没找到而且当前节点有兄弟节点就切到兄弟节点去插
- 如果没有找到且没有兄弟节点,或者是找到了,就在当前节点中插入
- 如果插入的节点是叶子节点,直接将
uid和key插入即可
- 如果插入的是非叶子节点,因为其
uid和key不是对应关系,所以是将kth位置的key和kth + 1位置uid数据覆盖
插入之后,会立即判断是否需要分裂,如果节点数达到2 * 平衡因子就进行分裂
分裂
逻辑也比较好理解:
- 创建新节点
- 复制当前节点的叶子/非叶子属性、兄弟节点信息给新节点
- 将新节点的子节点数量设置为平衡因子
- 将右侧的半边数据分给新节点
- 将新节点持久化
- 更新当前节点的兄弟节点为新节点
B+树
有一个B+树类,索引的数据直接被插入到数据库文件进行持久化,不经过版本管理
成员变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
private DataManager dm;
private DataItem rootUidDataItem;
private Lock selfLock;
|
实现逻辑
构建和加载B+树
初始构建时,创建一个叶子节点作为初始根节点,然后利用DM持久化,并拿到这个节点的标识uid
加载B+树时,则利用DM从持久层拿到数据,然后包裹成B+树对象
更新节点
这个方法用以更新一个节点为真正的根节点,构建一个字节数组形式的根节点之后持久化,并设置为根节点(也可以理解为非叶子节点)
数据查找
- 提供一个方法用以搜索下一层能够满足条件的节点
借助B+树节点类型实现,直接查找满足特定条件的节点
- 提供一个方法查找查找符合条件的叶子节点
- 提供方法在叶子节点中进行范围查找
也是借助B+树节点类实现
插入数据
暴露给外部模块以进行数据的插入
- 如果插入位置是叶子节点,下到内部插入方法,借助
B+树节点进行插入,然后直接返回结果
- 如果插入位置是非叶子节点,就还需要借助精确查找定位到下一层去,进行递归插入,如果插入完成了且发生分裂,还需要更新分裂节点的插入
上面完成后返回插入结果,如果发生分裂,就需要更新根节点
注意:可能需要进行多次节点分裂更新,所以这里比较绕
StatementParser 语句解析模块
这个模块用以进行SQL语句的解析
逻辑实现
Tokenizer 分词器
这个类用以将文本分割为一些最小单位的token
成员变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
private final byte[] statement;
private int curAnalysisPos;
private String curToken;
private boolean needFlushToken;
|
实现方法
Tokenizer(byte[] statement):
构造函数,接收一个字节数组作为需要分割的语句,并初始化相关成员变量
String peek() throws WarningException:
获取当前解析出的token,如果需要刷新token,则先解析下一个token
void pop():
强制要求刷新token,即将下一个token解析出来
String nextMetaState() throws WarningException:
解析下一个token,具体实现根据不同的字符类型进行处理
String nextQuoteState() throws WarningException:
处理引号包含的toke
String nextTokenState():
处理只含字母、数字、下划线的token
Byte peekByte():
获取当前分析位置的字符
void popByte():
将当前分析位置向后移动一位
String StateAnalysisWrong() throws WarningException:
处理解析错误的情况
byte[] getStatement():
获取完整的解析语句
此类的分词方法结合字符串配置类实现
StatementParser 语句解析器
借助分词器,逐步解析语句,并返回相应的解析结果类,用以供上层模块使用,其中还包含解析where语句,但是比较简陋
这里的判断是写死的,所以如果改动的话会比较麻烦,正规的做法应该是使用语法树
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| # begin事务开始语句,设置事务隔离级别 begin [isolation level] [read committed | repeatable read]
# commit事务提交语句 commit
# abort事务回滚语句 abort
# create table 建表语句,指定字段和索引 create table [tableName] [fieldName1][fieldType1], ... [fieldNameN][fieldTypeN] (index [indexName1]...[indexNameM])
# drop table 删除表语句 drop table [tableName]
# select 查询语句, where语句是可选的 select * | [fieldName1], [fieldName2], ... from [tableName] where ...
# insert 插入语句 insert into [tableName] values [value1], [value2], ...
# delete 删除语句 delete from [tableName] where ...
# update 更新语句 update [tableName] set [fieldName] = [value] where ...
|
TableManager 表管理模块
这个模块实现的是表、字段管理,当上层模块调用语法解析器解析出结果后面,就会调用这个模块进行表的创建、数据修改
表结构实现的是在用一个数据库/架构下的表使用链表的形式进行串联
提供接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
|
TBMSetting.BeginResult begin(SPSetting.Begin begin) throws WarningException, ErrorException;
byte[] commit(long xid) throws WarningException, ErrorException;
byte[] abort(long xid) throws WarningException, ErrorException;
byte[] create(long xid, SPSetting.Create create) throws WarningException, ErrorException;
byte[] insert(long xid, SPSetting.Insert insert) throws WarningException, ErrorException;
byte[] drop(long xid, SPSetting.Drop drop) throws WarningException;
byte[] select(long xid, SPSetting.Select select) throws WarningException, ErrorException;
byte[] delete(long xid, SPSetting.Delete delete) throws WarningException, ErrorException;
byte[] update(long xid, SPSetting.Update update) throws WarningException, ErrorException;
|
逻辑实现
Field 表字段实体类
此类是表字段的模型,记录有字段的uid唯一标识,字段类型,索引uid,同时成员变量中有一个字段索引关联的B+树
持久化时会调用DM写入数据,同时提供取出数据转化为本类的方法
Table实体类
这个类是表的模型,存储有当前表的uid,下一个表的uid,以及该表字段列表,同样持久化时也是调用DM写入,实现字节数据解析为当前类方法
TBM实现的表数据管理部分就是基于这个实体类在做,调用这个类的Insert、Update、Select方法
Booter启动类
这个类时表管理器的启动类,存储有第一张表(头表)的信息,更新此文件时会先创建一个临时文件再替换为实际文件
TableManager 实现
提供创建方法和打开方法,分别用以创建启动类和打开启动类文件拿到表信息
实现类中包含表缓存信息,以及针对某个事务的一系列表缓存,一开始就会将表信息放入缓存中
创建表也会放入缓存,修改表会进行相应持久化,删除表则会将缓存中数据删除,持久层不用删
传输模块
这个模块用以在创建的Socket基础上进行数据传输
自定义数据包
这里自定义了一个数据包实现类,成员变量是一个字节数组和一个异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| byte[] data;
Exception exception;
public DataPackage(byte[] data, Exception exception) { this.data = data; this.exception = exception; }
public byte[] getData() { return data; }
public Exception getException() { return exception; }
|
编解码器
通信数据使用的是二进制,这个编解码器负责将自定义数据包和字节数组之间的相互转换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
public static byte[] dataEncode(DataPackage dataPackage){ Exception dataException = dataPackage.getException(); if(Objects.nonNull(dataException)){ return Bytes.concat(new byte[]{TransportSetting.DATA_IS_EXCEPTION_TRUE}, dataException.getMessage().getBytes()); } else { return Bytes.concat(new byte[]{TransportSetting.DATA_IS_EXCEPTION_FALSE}, dataPackage.getData()); } }
public static DataPackage dataDecode(byte[] data) throws WarningException { if(data.length < TransportSetting.DATA_MIN_LENGTH_DEFAULT){ Log.logWarningMessage(WarningMessage.DATA_ERROR); } if(data[0] == TransportSetting.DATA_IS_EXCEPTION_FALSE){ return new DataPackage(Arrays.copyOfRange(data, TransportSetting.DATA_EXCEPTION_MARK_OFFSET, data.length), null); } else if(data[0] == TransportSetting.DATA_IS_EXCEPTION_TRUE){ return new DataPackage(null, new RuntimeException(new String(Arrays.copyOfRange(data, TransportSetting.DATA_EXCEPTION_MARK_OFFSET, data.length)))); } else { Log.logWarningMessage(WarningMessage.DATA_ERROR); return null; } }
|
编码时,会根据是有异常,如果有异常就将异常信息放到字节数组中发出,并设置一个校验位;解码时,根据校验位来判断是将二进制数据转换为data还是异常
Transporter 传输器
传输器实现类基于socket,先构建获取自socket读流、写流
同时提供方法实现十六进制数据和字节数组之间的互相转换
写时,将字节数据转换为十六进制数据
读时,将十六进制数据转换为字节数据
向外提供 Packager 数据包装传输器
暴露一个Packager类,以传输器为成员变量,提供发送数据、接受数据方法
服务端
Server类
Server类中以TBM和一个int类型的端口为成员变量,只有一个启动方法,先尝试创建一个监听特定端口的ServerSocket,然后构建一个线程池,之后进入一个true条件的while循环,不断接收客户端的连接请求,对每个连接进行处理,然后创建一个SocketHandler处理器,传入tbm和socket连接,并交给之前构建的线程池去执行
SocketHandler 处理器
这个类继承了Runnable,算是一个线程类,先打印和客户端的连接信息,然后创建一个传输模块提供的Packager,然后创建一个Executor执行器,利用packager接收客户端数据,再交给执行器执行SQL语句,得到结果后封装返回
Executor 服务端SQL语句解析执行器
这个类以xid和tbm为成员变量,调用SP模块静态方法解析SQL语句,然后根据返回的类型调用tbm做实际的数据操作
Launcher 服务端启动类
这个类用以进行服务端的启动,借助 org.apache.commons.cli创建命令行交互,指导创建或者打开数据库,去调用TM、DM、VM、TBM模块方法,如果是打开数据库就会启动Server,准备监听客户端
客户端
Client类
以Packager为成员变量,实现有一个execute方法以进行数据的收发
Shell 命令行类
以Client为成员变量,比较简陋地读取用户输入,然后调用Client以发送、拿到返回的数据,同时执行Client的close方法,关闭数据包
Launcher 客户端启动类
同样,客户端也有一个启动类,先创建一个ClientSocket,然后构建Packager,最后构建Shell,然后启动Shell
结语
整体来说项目难度还是不低的,比起业务上的设计我感觉更考验Java基础,而且项目整体设计和把握也需要一定的架构能力
哎哎,还是得继续撸码🤥