tidb: 2PC transaction 写入 tikv 详解

ExecuteStmt

ExecuteStmt 是执行一条 Stmt 的大致流程。

EnterNewTxn

是用来准备事务相关的信息

tidb-transaction-1

RunInNewTxn

tidb-transaction-2

tikvStore::Begin

在 tikvStroe 内部结构如下:

tidb-transaction-3

KVStroe::Begin

每个 client connection 对应着一个 session, 事务相关数据的放在了session中, 它包含了对 kv.Storage 和 KVTxn 接口的引用。
kv.Storage接口定义了 Begin/BeginWithOption 接口,用来创建开始一个事务,主要实现者为KVStore。

KVStore::Begin 开始一个事务,如果此时 start_ts为nil,则会从PD 获取一个时间戳start_ts,并且作为事务的唯一标识txn_id,会由 KVSnapshot 对象保存事务开始的快照信息,方便后续和 TiKV 进行交互。其中 KVStore::Begin 内部的关系如下:

tidb-transaction-4

Begin之后,就基于Begin中创建的 snapshot 元数据信息从 TiKV 获取 snapshot,并基于该 snapshot 执行用户的 DML 操作,在 tikvTxn::Commit 之后开始将DDL的结果 mutations 使用 2PC 算法提交到 TiKV。

DML

DML 操作都是基于 Next 执行框架完成,即在 Next 函数完成执行。 比如 InsertExec::Next 函数中完成将插入的数据封装成{key, value} 写入到 KVTxn的 MemDB 中。等到 Commit 时从 MemDB 中取出写入的数据,再进入 2PC 阶段向 TiKV 提交数据。

增删改的流程如下,在 Next 的执行框架中完成操作最终都是调用 Table 的 xxxRecord 接口,操作的结果缓存在 KVTxn.KVUnionStore.MemDB 中。

tidb-transaction-5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
var (
tablePrefix = []byte{'t'}
recordPrefixSep = []byte("_r")
indexPrefixSep = []byte("_i")
metaPrefix = []byte{'m'}
)

func appendTableRecordPrefix(buf []byte, tableID int64) []byte {
buf = append(buf, tablePrefix...)
buf = codec.EncodeInt(buf, tableID)
buf = append(buf, recordPrefixSep...)
return buf
}

// appendTableIndexPrefix appends table index prefix "t[tableID]_i".
func appendTableIndexPrefix(buf []byte, tableID int64) []byte {
buf = append(buf, tablePrefix...)
buf = codec.EncodeInt(buf, tableID)
buf = append(buf, indexPrefixSep...)
return buf
}


// GenTableRecordPrefix composes record prefix with tableID: "t[tableID]_r".
func GenTableRecordPrefix(tableID int64) kv.Key {
buf := make([]byte, 0, len(tablePrefix)+8+len(recordPrefixSep))
return appendTableRecordPrefix(buf, tableID)
}

// GenTableIndexPrefix composes index prefix with tableID: "t[tableID]_i".
func GenTableIndexPrefix(tableID int64) kv.Key {
buf := make([]byte, 0, len(tablePrefix)+8+len(indexPrefixSep))
return appendTableIndexPrefix(buf, tableID)
}

func initTableCommon(t *TableCommon, tblInfo *model.TableInfo, physicalTableID int64, cols []*table.Column, allocs autoid.Allocators) {
t.tableID = tblInfo.ID
t.physicalTableID = physicalTableID
t.allocs = allocs
t.meta = tblInfo
t.Columns = cols
t.PublicColumns = t.Cols()
t.VisibleColumns = t.VisibleCols()
t.HiddenColumns = t.HiddenCols()
t.WritableColumns = t.WritableCols()
t.FullHiddenColsAndVisibleColumns = t.FullHiddenColsAndVisibleCols()
t.recordPrefix = tablecodec.GenTableRecordPrefix(physicalTableID)
t.indexPrefix = tablecodec.GenTableIndexPrefix(physicalTableID)
if tblInfo.IsSequence() {
t.sequence = &sequenceCommon{meta: tblInfo.Sequence}
}
}

func EncodeRecordKey(recordPrefix kv.Key, h kv.Handle) kv.Key {
buf := make([]byte, 0, len(recordPrefix)+h.Len())
buf = append(buf, recordPrefix...)
buf = append(buf, h.Encoded()...)
return buf
}

// 对键进行编码
func (t *TableCommon) RecordKey(h kv.Handle) kv.Key {
return tablecodec.EncodeRecordKey(t.recordPrefix, h)
}

COMMIT

twoPhaseCommitter

  1. 在 Commit 之前,数据会缓存在 KVUnionStore.MemDB 中
  2. 当执行用户的 COMMIT 时,在 KVTxn::Commit 函数中创建 twpPhaseCommitter,并调用 twoPhaseCommitter.initKeysAndMutations 函数遍历 KVUnionStore.MemDB 来初始化 memBuffMutations,作为后续的和 TiKV 交互的基础
  3. 在twoPhaseCommitter::execute 函数中,主要有五种操作:prewriteMutations、commitMutations、pessimisticLockMutations、pessimisticRollbackMutations 和 cleanupMutations。

他们输入都是 memBuffMutations ,然后都对 memBuffMutations 按照如下流程进行处理后再向 TiKV 发起请求:

  • 先基于 region 进行分组,将相同的 region 的数据分到一个 groupMutations
  • 再基于 batch_size 阈值,对 groupMutations 划分为多个 batchMutations,
  • 最终每个 batch 的数据经由各自的 action 实现的 handleSingleBatch 函数向 TiKV 发送数据。

有如下四个 actions,对应着事务的不同过程:

1
2
3
4
5
func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error;
func (action actionPessimisticLock) handleSingleBatch(..);
func (action PessimisticRollback) handleSingleBatch(...);
func (action Commit) handleSingleBatch(..);
func (action Cleanup) handleSingleBatch(..);

如果 memBuffMutations 只有一个 region 的数据,那么就可以直接使用 1 phase commit 进行优化。

下面是分组分批提交的细节

doActionOnMutations

groupSortedMutationsByRegion

在 groupSortedMutationsByRegion,通过向 pd 查询每个 key 所属的region,直接对已排序的 key 进行分组,划分到不同的group中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// groupSortedMutationsByRegion separates keys into groups by their belonging Regions.
func groupSortedMutationsByRegion(c *locate.RegionCache, bo *retry.Backoffer, m CommitterMutations) ([]groupedMutations, error) {
var (
groups []groupedMutations
lastLoc *locate.KeyLocation
)
lastUpperBound := 0
for i := 0; i < m.Len(); i++ {
if lastLoc == nil || !lastLoc.Contains(m.GetKey(i)) {
if lastLoc != nil {
// 新生成一个 group
groups = append(groups, groupedMutations {
mutations: m.Slice(lastUpperBound, i), // [lastUpperBound, i) 区间的 mutations 属于一个 group
region: lastLoc.Region, // 这个区间的 mutations 所属的 region
})
lastUpperBound = i
}
var err error
lastLoc, err = c.LocateKey(bo, m.GetKey(i)) // 确定新的key所在的 region
if err != nil {
return nil, err
}
}
}
// 最后一个分组
if lastLoc != nil {
groups = append(groups, groupedMutations{
region: lastLoc.Region,
mutations: m.Slice(lastUpperBound, m.Len()),
})
}
return groups, nil
}

preSplitRegion

如果一个 groupedMutations 的数据量过大,tikv-client 会先向 tikv-server 发起 split_region gRPC,请求这个 groupedMutations 对应的 region 分裂,并等待 region 分裂完成。这样避免单个 region 的写负载太重错误: “too much write workload”, 避免了不必要的重试

如果确实分裂了,那么基于 groupSortedMutationsByRegion 函数得到的 region 信息就过时了,需要重新执行一次 groupSortedMutationsByRegion 函数,再执行写入操作。

tidb-transaction-6

groupMutations

完整的 groupMutations 逻辑如下:

tidb-transaction-7

doActionOnGroupMutations

doActionOnGroupMutations 会对每个 group 的 mutations 做进一步的分批处理,最终将所有的 groupMutations 统一划分成多个 batchMutations,其中只有一个 Primary BatchMutations,其他的是 Secondary BatchMutations。写入的时先同步执行 Primary BatchMutations 中的数据,再异步执行 Secondary BatchMutations 中的数据。

batched 的定义如下

1
2
3
4
5
type batched struct {
batches []batchMutations // 保存所有的数据
primaryIdx int // 记录 Primary BatchMutations 所属 batch 的位置索引
primaryKey []byte // 记录 Primary Key
}
batched::appendBatchMutationsBySize

对 groupMutations 进行分批是由 appendBatchMutationsBySize 函数实现:

  • 对每个 groupMutations 进行划分成 batchMutations,每个 batchMutations 的大小限制在 16 * 1024 字节大小。
  • 同时,在batch分组中,会寻找 groupMutations 中是否存在「主键」,如果存在则记录主键所属的 batch的在整个batches中的索引位置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type batched struct {
batches []batchMutations
primaryIdx int
primaryKey []byte
}

func (b *batched) appendBatchMutationsBySize(region locate.RegionVerID, mutations CommitterMutations, sizeFn func(k, v []byte) int, limit int) {
var start, end int
// 将 mutations 按照大小阈值 limit 进行划分
for start = 0; start < mutations.Len(); start = end {
var size int
for end = start; end < mutations.Len() && size < limit; end++ {
var k, v []byte
k = mutations.GetKey(end)
v = mutations.GetValue(end)
size += sizeFn(k, v)
if b.primaryIdx < 0 && bytes.Equal(k, b.primaryKey) {
// 更新 PrimaryIdx 为 PrimaryKey 在 batches 中所属 batch 的位置
b.primaryIdx = len(b.batches)
}
}
b.batches = append(b.batches, batchMutations{
region: region,
mutations: mutations.Slice(start, end),
})
}
}
batched::setPrimary

分批结束后,使用 setPrimary 函数将 primaryIdx 对应 batchMutations 放到首位,方便后面先发送 Primary BatchMutations 中的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
func (b *batched) setPrimary() bool {
// If the batches include the primary key, put it to the first
if b.primaryIdx >= 0 {
if len(b.batches) > 0 {
b.batches[b.primaryIdx].isPrimary = true
b.batches[0], b.batches[b.primaryIdx] = b.batches[b.primaryIdx], b.batches[0] // 互换
b.primaryIdx = 0
}
return true
}

return false
}

完整 doActionOnGroupMutations 的核心逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPhaseCommitAction, groups []groupedMutations) error {
// 1.每个分组内的再分批
batchBuilder := newBatched(c.primary())
for _, group := range groups {
batchBuilder.appendBatchMutationsBySize(group.region, group.mutations, sizeFunc, txnCommitBatchSize)
}

firstIsPrimary := batchBuilder.setPrimary()

//2. 先同步提交 primary batchMutations
if firstIsPrimary &&
((actionIsCommit && !c.isAsyncCommit()) || actionIsCleanup || actionIsPessimiticLock) {
// primary should be committed(not async commit)/cleanup/pessimistically locked first
err = c.doActionOnBatches(bo, action, batchBuilder.primaryBatch())
//...
batchBuilder.forgetPrimary() // 去掉 batches 中的 primary batch
}
//...

//3. 再异步提交 Secondary BatchMutations
if actionIsCommit && !actionCommit.retry && !c.isAsyncCommit() {
//...
go func() {
// 其它的action异步提交
e := c.doActionOnBatches(secondaryBo, action, batchBuilder.allBatches())
}
} else {
err = c.doActionOnBatches(bo, action, batchBuilder.allBatches())
}
//...
}

doActionOnBatches

内部调用的 handleSingleBatch,不同的 actions 实现不同,下面结合乐观事务和悲观事务进行讲解。

乐观事务

在 COMMIT 阶段,TiDB 开始两阶段提交:

  • TiDB 并发地向所有涉及的 TiKV 发起 prewrite 请求。TiKV 收到 prewrite 数据后,检查数据版本信息是否存在冲突或已过期。符合条件的数据会被加锁,将锁信息写入 CF_LOCK,{key_start_ts},数据写入 CF_DAFAULT
  • TiDB 收到所有 prewrite 响应且所有 prewrite 都成功。
  • TiDB 向 PD 获取第二个全局唯一递增版本号,定义为本次事务的 commit_ts。
  • TiDB 向 Primary Key 所在 TiKV 发起第二阶段提交。TiKV 收到 commit 操作后,检查数据合法性,向 CF_WRITE中写入信息,并清理 prewrite 阶段留下的锁,即清理 CF_LOCK中的。
  • TiDB 收到两阶段提交成功的信息。

tidb-transaction-8

actionPrewrite

TiKV-Client

TiDB 向 TiKV发送 Prewrite 请求,

  • 遇到了 lock error, 则尝试 ResolveLock 去解决锁冲突, 然后重试;
  • 如果遇到了regionError,说明之前获取的 region 信息错误,则需要重新调用 doActionONMutations 重新分组分批,重新尝试。

如果没有 keyError,并且 Primary BatchMutations. 则启动一个 tllManager,给 txn 的 primary lock续命,ttlManager 会定期的向 TiKV 发送txnHeartbeat, 更新 primary lock 的 ttl。

悲观事务

TiDB 在3.0版本引入了悲观事务,在两阶段提交之前增加了 Acquire Pessimistic Lock 获取悲观锁阶段,

  1. (同乐观锁)TiDB 收到来自客户端的 begin 请求,获取当前时间戳作为本事务的 StartTS。
  2. TiDB 收到来自客户端的更新数据的请求:TiDB 向 TiKV 发起加悲观锁请求,该锁持久化到 TiKV。
  3. (同乐观锁)客户端发起 commit,TiDB 开始执行与乐观锁一样的两阶段提交。

tidb-transaction-9

MvccTxn

TiKV 有三个 ColumnFamily(CF)保存事务信息:

  • CF_LOCK:用于保存事务的锁信息,表示有事务正在写这个 key,key => lock_info
  • CF_DEFAULT:用于保存 MVCC 数据,(user_key, start_ts) => value
  • CF_WRITE :用于保存 Value 可见的版本控制,(key, commit_ts)=> write_info

这些数据经由MvccTxn类,写入 modifies,最终都会进入raft状态机,由leader节点同步给follower,超过半数节点确认后才会回应 client。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
impl MvccTxn {
pub fn new(start_ts: TimeStamp, concurrency_manager: ConcurrencyManager) -> MvccTxn {
MvccTxn {
start_ts,
write_size: 0,
modifies: vec![],
locks_for_1pc: Vec::new(),
concurrency_manager,
guards: vec![],
}
}

// {key, lock_info}
pub(crate) fn put_lock(&mut self, key: Key, lock: &Lock) {
let write = Modify::Put(CF_LOCK, key, lock.to_bytes());
self.write_size += write.size();
self.modifies.push(write); // 相同的 key 会覆盖之前
}

// ts 是 start_ts
// {<key, start_ts>, value}
pub(crate) fn put_value(&mut self, key: Key, ts: TimeStamp, value: Value) {
let write = Modify::Put(CF_DEFAULT, key.append_ts(ts), value);
self.write_size += write.size();
self.modifies.push(write);
}

// ts 是 commit_ts
// {<key, commit_ts>, value}
pub(crate) fn put_write(&mut self, key: Key, ts: TimeStamp, value: Value) {
let write = Modify::Put(CF_WRITE, key.append_ts(ts), value);
self.write_size += write.size();
self.modifies.push(write);
}
///...
}

acquire_pessimistic_lock

TiKV 使用 acquire_pessimistic_lock 函数来获得每个 key 的锁,即 向 CF_LOCK 中写入当前 key 的锁信息,用来占位,等待 prewrite 阶段再向 CF_LOCK 和 CF_DEFAULT 中分别写入具体的 lock_info 和数据。

TiKV 需要先从 CF_LOCK 中检测是否已经存在该 key 的锁信息:

  • Yes: 此时就存在一些冲突了,需要解决这个冲突

    • 如果 lock.ts != reader.start_ts:则说明不是同一个事务。即设当前事务是 T1,则在 T1 之前已经有个事务 T0 抢占了这个 key 的锁,而且 T0 尚未 commit,那么就向 client 返回 ErrorInner::KeyIsLocked 错误,client 接受到这个错误就会进行 ResolveLock 处理本次冲突。
    • 如果 lock.lock_type != LockType::Pessimistic:则已存在的 T0 事务和 T1 不是同一个类型的事务,则直接返 ErrorInner::LockTypeNotMatch 错误信息。

    如果上述两个 check 都通过了,则视为同一个事务 T0,此时会尝试更新 lock 的 for_update_ts 信息,然后根据 return_value、need_check_existence、need_old_value 等标志位来决定是否需要从 RocksDB 中读取数据返回给 client。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    pub fn acquire_pessimistic_lock<S: Snapshot>(
    txn: &mut MvccTxn,
    reader: &mut SnapshotReader<S>,
    key: Key,
    primary: &[u8],
    should_not_exist: bool, // 针对插入操作,应该不存在
    lock_ttl: u64,
    for_update_ts: TimeStamp,
    need_value: bool,
    need_check_existence: bool,
    min_commit_ts: TimeStamp,
    need_old_value: bool,
    ) -> MvccResult<(Option<Value>, OldValue)> {
    //... 检测 CF_LOCK 中是否有锁记录
    if let Some(lock) = reader.load_lock(&key)? {
    // CF_LOCK
    if lock.ts != reader.start_ts {
    return Err(ErrorInner::KeyIsLocked(lock.into_lock_info(key.into_raw()?)).into()); // lock
    }
    // 不是同一个事务
    // 上一个必须也是 LockType::Pessimistic
    if lock.lock_type != LockType::Pessimistic {
    return Err(ErrorInner::LockTypeNotMatch {
    start_ts: reader.start_ts,
    key: key.into_raw()?,
    pessimistic: false,
    }
    .into());
    }
    if need_load_value {
    val = reader.get(&key, for_update_ts)?;
    } else if need_check_existence {
    val = reader.get_write(&key, for_update_ts)?.map(|_| vec![]);
    }
    // Pervious write is not loaded.
    let (prev_write_loaded, prev_write) = (false, None);
    let old_value = load_old_value(
    need_old_value,
    need_load_value,
    val.as_ref(),
    reader,
    &key,
    for_update_ts,
    prev_write_loaded,
    prev_write,
    )?;

    // Overwrite the lock with small for_update_ts
    if for_update_ts > lock.for_update_ts {
    let lock = PessimisticLock {
    primary: primary.into(),
    start_ts: reader.start_ts,
    ttl: lock_ttl,
    for_update_ts,
    min_commit_ts,
    };
    txn.put_pessimistic_lock(key, lock); // 更新 lock 信息
    } else {
    MVCC_DUPLICATE_CMD_COUNTER_VEC
    .acquire_pessimistic_lock
    .inc();
    }
    // 视为同一个事务的重复请求
    return Ok((ret_val(need_value, need_check_existence, val), old_value));
    }
    //...
    }
  • No: 如果 CF_LOCK 中没有 key 的锁信息,即 reader.load_lock(&key) 返回的是 None,此时仍需要去 CF_WRTIE 中检测是否有对应的写记录,进一步判断当前事务 T1 的状态。

    因为当前事务 T1 可能被其他事务清除了,或者 TTL 过期。当删除事务时会删除 Key 在 CF_LOCK、CF_DEFAULT 中的记录,在 CF_WRITE 中留下一条 Rollback 记录。

    reader.seek_write(&key, TimeStamp::max()) 寻找的是 key 在 CF_WRITE 中最新的写入记录:

    • 如果返回不为 None 且假设为 Some(commit_ts, write),则有两种可能:

      1. 上次 committed 记录:正常情况下, T0 的 commit_ts > T1 的 for_update_ts,如果违背了,则产生了一次写冲突
      2. 上次 rollback 记录

      如果 commit_ts == reader.start_ts 并且满足 write.write_type == WriteType::Rollback || write.has_overlapped_rollback,那么说明当前事务 T1 已经被删除了,只留下一条 rollback 记录,不能再获取锁了,则返回 ErrorInner::PessimisticLockRolledBack 错误给 client。

      如果 commit_ts > reader.start_ts,则说明当前事务已经过期,去检测 reader.start_ts 上一个事务的的状态,如果也是 rollback,则也向 client 回复 ErrorInner::PessimisticLockRolledBack 错误。

    • 返回 None,则表示 CF_WRITE 中不存在 key 的记录,那么当前事物 T1 可以顺利获取Key的锁,则会向 CF_LOCK 中插入当前 key 的 lock 信息,表示获得该 key 的锁成功了。

    这部分代码如下。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    pub fn acquire_pessimistic_lock<S: Snapshot>( /*..ingore params.*/ ) -> MvccResult<(Option<Value>, OldValue)> {
    /// ...
    /// 当前没有 lock,即没有别的任务在写


    let (prev_write_loaded, mut prev_write) = (true, None);
    // 进一步在 CF_WRITE 中寻找
    if let Some((commit_ts, write)) = reader.seek_write(&key, TimeStamp::max())? {
    // Find a previous write.
    if need_old_value {
    prev_write = Some(write.clone());
    }

    // 上个提交的版本,比当前事务的时间戳大
    // 产生了写冲突
    if commit_ts > for_update_ts {
    return Err(ErrorInner::WriteConflict {
    start_ts: reader.start_ts,
    conflict_start_ts: write.start_ts,
    conflict_commit_ts: commit_ts,
    key: key.into_raw()?,
    primary: primary.to_vec(),
    }
    .into());
    }

    // Handle rollback.
    // The rollback information may come from either a Rollback record or a record with
    // `has_overlapped_rollback` flag.
    if commit_ts == reader.start_ts
    && (write.write_type == WriteType::Rollback || write.has_overlapped_rollback)
    {
    assert!(write.has_overlapped_rollback || write.start_ts == commit_ts);
    return Err(ErrorInner::PessimisticLockRolledBack {
    start_ts: reader.start_ts,
    key: key.into_raw()?,
    }
    .into());
    }

    // If `commit_ts` we seek is already before `start_ts`, the rollback must not exist.
    // 可能是一个反复重试的
    if commit_ts > reader.start_ts {
    if let Some((older_commit_ts, older_write)) =
    reader.seek_write(&key, reader.start_ts)?
    {
    if older_commit_ts == reader.start_ts
    && (older_write.write_type == WriteType::Rollback || older_write.has_overlapped_rollback)
    {
    return Err(ErrorInner::PessimisticLockRolledBack {
    start_ts: reader.start_ts,
    key: key.into_raw()?,
    }
    .into());
    }
    }
    }
    //...
    }

    let lock = PessimisticLock {
    primary: primary.into(),
    start_ts: reader.start_ts,
    ttl: lock_ttl,
    for_update_ts,
    min_commit_ts,
    };

    // 写入的是 lock 信息,此 lock 信息稍后会统一写入存储层
    txn.put_pessimistic_lock(key, lock);
    // TODO don't we need to commit the modifies in txn?

    Ok((ret_val(need_value, need_check_existence, val), old_value))
    }

prewrite

prewrite 目的是在 CF_LOCK 和 CF_DEFAULT 写入记录,如果成功,则客户端能顺利发起 Commit 请求。当然,如果是悲观事务,则在 acquire_pessimistic_lock 阶段就已经在 CF_LOCK 中写入记录,那么在 prewrite 阶段就肯定能成功。
因此,在 prewrite 函数中,先尝试从 CF_LOCK 中读取 lock 的信息:

  1. 有 lock,则需要在 PrewriteMutation::check_lock 函数中进一步 check;
  2. 没有 lock,但是当前又是悲观事务,则需要在 CF_LOCK 中加上 lock_info:这是TiKV 在v6.1引入 pipeline write 设计可能出现的 corner case
  3. 返回None,即乐观事务

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
pub fn prewrite<S: Snapshot>(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<S>,
txn_props: &TransactionProperties<'_>,
mutation: Mutation,
secondary_keys: &Option<Vec<Vec<u8>>>,
is_pessimistic_lock: bool,
) -> Result<(TimeStamp, OldValue)> {
let mut mutation = PrewriteMutation::from_mutation(
mutation,
secondary_keys,
is_pessimistic_lock,
txn_props)?;

// Update max_ts for Insert operation to guarante linearizability and snapshot isolation
if mutation.should_not_exist {
txn.concurrency_manager.update_max_ts(txn_props.start_ts);
}

let mut lock_amended = false;

let lock_status = match reader.load_lock(&mutation.key)? {
Some(lock) => mutation.check_lock(lock, is_pessimistic_lock)?,
None if is_pessimistic_lock => {
amend_pessimistic_lock(&mutation, reader)?;
lock_amended = true;
LockStatus::None
}
None => LockStatus::None,
};
//...
}

PrewriteMutation::check_lock

check_lock 的逻辑和 acquire_pessimistic_lock 有点类似,仍假设当前事务为 T1,之前的事务为 T0
进入这个函数已经确定 CF_LOCK 中存在锁:

  • 若 lock.ts != self.txn_props.start_ts,则说明已经存在事务 T0:
  • 若 T1 是悲观事务,则之前在 acquire_pessimistc_lock 阶段写入的 lock 不存在了,可能被清除了,则返回 ErrorInner::PessimisticLockNotFound 错误;
  • 否则 T1 即乐观事务,由于乐观锁只是在 prewrite 阶段尝试上锁,如果相同key上的锁已经被其他事务 T0 占据,则返回 ErrorInner::KeyIsLocked, 则让客户端发起 ResolveLock 来解决这个锁冲突
  • 如果 lock.ts == start_ts, 则再检测 lock.lock_type 是否一致:
    • 如果不一致,则返回 ErrorInner::LockTypeNotMatch 错位。
    • 如果一致:
      • 悲观事务:更新 lock_ttl、min_commit_ts
      • 乐观事务:那么当前这次 prewrite 就是个重复指令,无需更新任何数,幂等操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/// Check whether the current key is locked at any timestamp.
fn check_lock(&mut self, lock: Lock, is_pessimistic_lock: bool) -> Result<LockStatus> {
// 相同的事务的 start_ts 相同, 因此,不等则说明已经被其他事务锁住
if lock.ts != self.txn_props.start_ts {
if is_pessimistic_lock {
return Err(ErrorInner::PessimisticLockNotFound {
start_ts: self.txn_props.start_ts,
key: self.key.to_raw()?,
}
.into());
}

// 乐观事务
return Err(ErrorInner::KeyIsLocked(self.lock_info(lock)?).into());
}

if lock.lock_type == LockType::Pessimistic {
if !self.txn_props.is_pessimistic() {
return Err(ErrorInner::LockTypeNotMatch {
start_ts: self.txn_props.start_ts,
key: self.key.to_raw()?,
pessimistic: true,
}
.into());
}

// The lock is pessimistic and owned by this txn, go through to overwrite it.
// The ttl and min_commit_ts of the lock may have been pushed forward.
self.lock_ttl = std::cmp::max(self.lock_ttl, lock.ttl);
self.min_commit_ts = std::cmp::max(self.min_commit_ts, lock.min_commit_ts); // 更新

return Ok(LockStatus::Pessimistic);
}

// Duplicated command. No need to overwrite the lock and data.
let min_commit_ts = if lock.use_async_commit {
lock.min_commit_ts
} else {
TimeStamp::zero()
};
Ok(LockStatus::Locked(min_commit_ts))
}

PrewriteMutation::write_lock

上述 check 都通过了,则将 CF_LOCK 和 CF_DEFAULT 中写入数据,流程如下:

  • 如果 sizoef(value) < 256,即 is_short_value(&value) 为 true,则直接会有个优化,直接将 value 写入到 CF_LOCK 中,否则value需要写入 CF_DEFAULT 中;

  • 如果开启了 async_commit,则会在 key 中记录所有的 secondaries key,方便后续查询所有 secondaries 的状态。

    如果开启了 async_commit,这里需要基于所有的 secondaries 计算 final_min_commit_ts,返回给 tikv-client,让 client 使用这个 final_min_commit_ts 来作为进行异步 commit 的时间戳

  • 最后将向 txn 中写入 lock 信息
    因此, prewtite 阶段成功后,Rocksdb 中就记录了 锁信息和真正的数据,就等待 commit 中的 write 信息了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
fn write_lock(self, lock_status: LockStatus, txn: &mut MvccTxn) -> Result<TimeStamp> {
let mut try_one_pc = self.try_one_pc();

let mut lock = Lock::new(
self.lock_type.unwrap(),
self.txn_props.primary.to_vec(),
self.txn_props.start_ts,
self.lock_ttl,
None,
self.txn_props.for_update_ts(),
self.txn_props.txn_size,
self.min_commit_ts,
);

if let Some(value) = self.value {
if is_short_value(&value) {
// If the value is short, embed it in Lock.
lock.short_value = Some(value);
} else {
// value is long
// 向 CF_DEFAULT 中写入数据
txn.put_value(self.key.clone(), self.txn_props.start_ts, value);
}
}

if let Some(secondary_keys) = self.secondary_keys {
lock.use_async_commit = true;
lock.secondaries = secondary_keys.to_owned();
}

let final_min_commit_ts = if lock.use_async_commit || try_one_pc {
let res = async_commit_timestamps(
&self.key,
&mut lock,
self.txn_props.start_ts,
self.txn_props.for_update_ts(),
self.txn_props.max_commit_ts(),
txn,
);
if let Err(Error(box ErrorInner::CommitTsTooLarge { .. })) = &res {
try_one_pc = false;
lock.use_async_commit = false;
lock.secondaries = Vec::new();
}
res
} else {
Ok(TimeStamp::zero())
};

if try_one_pc {
txn.put_locks_for_1pc(self.key, lock, lock_status.has_pessimistic_lock());
} else {
txn.put_lock(self.key, &lock); // 向 CF_LOCK 中写入 lock 信息
}

final_min_commit_ts
}

Commit

第一阶段 prewrite 成功,TiDB 向 PD 再获取一个单调递增的时间戳 commit_ts 用于第二段提交,在 CF_WRITE 中写入提交信息,并清除第一阶段prewrite中写入的 lock 信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
pub fn commit<S: Snapshot>(
txn: &mut MvccTxn,
reader: &mut SnapshotReader<S>,
key: Key,
commit_ts: TimeStamp,
) -> MvccResult<Option<ReleasedLock>> {
let mut lock = match reader.load_lock(&key)? {
Some(mut lock) if lock.ts == reader.start_ts => {
// lock.ts == reader.start_ts => 确实是需要 commit 的 lock
// commit_ts < lock.min_commit_ts => 计算出来的 commit_ts 不满足要求

// A lock with larger min_commit_ts than current commit_ts can't be committed
if commit_ts < lock.min_commit_ts {
return Err(ErrorInner::CommitTsExpired {
start_ts: reader.start_ts,
commit_ts,
key: key.into_raw()?,
min_commit_ts: lock.min_commit_ts,
}
.into());
}

if lock.lock_type == LockType::Pessimistic {
// Commit with WriteType::Lock.
lock.lock_type = LockType::Lock; // commit 更改 lock 类型
}
lock
}
_ => {
// lock 缺失
return match reader.get_txn_commit_record(&key)?.info() {
// 一条 rollback 记录找不到了 => TxnLockNotFound
// Rollback 自然就无法 commit
Some((_, WriteType::Rollback)) | None => {
// None: related Rollback has been collapsed.
// Rollback: rollback by concurrent transaction.
Err(ErrorInner::TxnLockNotFound {
start_ts: reader.start_ts,
commit_ts,
key: key.into_raw()?,
}
.into())
}
// Committed by concurrent transaction.
Some((_, WriteType::Put))
| Some((_, WriteType::Delete))
| Some((_, WriteType::Lock)) => {
Ok(None)
}
};
}
};
let mut write = Write::new(
WriteType::from_lock_type(lock.lock_type).unwrap(),
reader.start_ts,
lock.short_value.take(),
);

for ts in &lock.rollback_ts {
if *ts == commit_ts {
write = write.set_overlapped_rollback(true, None);
break;
}
}

// 在 CF_WRITE 中写入数据才算 {key, commit} => write
txn.put_write(key.clone(), commit_ts, write.as_ref().to_bytes());
// 再将 CF_LOCK 中删除 lock
Ok(txn.unlock_key(key, lock.is_pessimistic_txn()))
}

Others