// TxnRead represents a read-only transaction with operations that will not // block other read transactions. type TxnRead interface { ReadView // End marks the transaction is complete and ready to commit. End() }
type TxnWrite interface { TxnRead WriteView // Changes gets the changes made since opening the write txn. Changes() []mvccpb.KeyValue }
// txnReadWrite coerces a read txn to a write, panicking on any write operation. type txnReadWrite struct{ TxnRead }
type storeTxnWrite struct { storeTxnRead tx backend.BatchTx // beginRev is the revision where the txn begins; it will write to the next revision. beginRev int64 changes []mvccpb.KeyValue }
func(tw *storeTxnWrite) End() { // only update index if the txn modifies the mvcc state. iflen(tw.changes) != 0 { // hold revMu lock to prevent new read txns from opening until writeback. tw.s.revMu.Lock() tw.s.currentRev++ // 存在变更则 version++ } tw.tx.Unlock() iflen(tw.changes) != 0 { tw.s.revMu.Unlock() } tw.s.mu.RUnlock() }
lock
store 中新增了两个锁:mu 、recvMu:
mu 是个 RWMutex, 写锁是为了 stroe 的状态变更,读锁是用于事物 -recvMu 是为了版本控制线程安全。
type store struct { // mu read locks for txns and write locks for non-txn store changes. mu sync.RWMutex // revMuLock protects currentRev and compactMainRev. // Locked at end of write txn and released after write txn unlock lock. // Locked before locking read txn and released after locking. revMu sync.RWMutex currentRev int64// 当前版本 compactMainRev int64// 压缩版本 //... }
func(tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) { if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 { return n, tw.beginRev + 1 } return0, tw.beginRev }
func(tw *storeTxnWrite) End() { // only update index if the txn modifies the mvcc state. iflen(tw.changes) != 0 { // hold revMu lock to prevent new read txns from opening until writeback. tw.s.revMu.Lock() tw.s.currentRev++ // 更新全局 revision } tw.tx.Unlock() iflen(tw.changes) != 0 { tw.s.revMu.Unlock() } tw.s.mu.RUnlock() }
// get gets the modified, created revision and version of the key that satisfies the given atRev. // Rev must be higher than or equal to the given atRev. func(ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) { if ki.isEmpty() { lg.Panic( "'get' got an unexpected empty keyIndex", zap.String("key", string(ki.key)), ) } // 先找到 atRev 所在的 generation g := ki.findGeneration(atRev) if g.isEmpty() { return revision{}, revision{}, 0, ErrRevisionNotFound }
n := g.walk(func(rev revision)bool { return rev.main > atRev }) if n != -1 { return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil }
walk 函数也是根据 时间局部性原理 进行遍历,返回符合条件的位置索引 l-i-1,那么上层 keyIndex.get 返回的 ver = g.ver - (l - i - 1)
1 2 3 4 5 6 7 8 9 10
func(g *generation) walk(f func(rev revision)bool) int { l := len(g.revs) for i := range g.revs { ok := f(g.revs[l-i-1]) if !ok { return l - i - 1 } } return-1 }
message KeyValue { // key is the key in bytes. An empty key is not allowed. bytes key = 1; // create_revision is the revision of last creation on this key. int64 create_revision = 2; // mod_revision is the revision of last modification on this key. int64 mod_revision = 3; // version is the version of the key. A deletion resets // the version to zero and any modification of the key // increases its version. int64 version = 4; // value is the value held by the key, in bytes. bytes value = 5; // lease is the ID of the lease that attached to key. // When the attached lease expires, the key will be deleted. // If lease is 0, then no lease is attached to the key. int64 lease = 6; }
func(tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { rev := tw.beginRev + 1 c := rev oldLease := lease.NoLease
// if the key exists before, use its previous created and // get its previous leaseID _, created, ver, err := tw.s.kvindex.Get(key, rev) if err == nil { c = created.main oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)}) tw.trace.Step("get key's previous created_revision and leaseID") } ibytes := newRevBytes() idxRev := revision{ main: rev, sub: int64(len(tw.changes)), } revToBytes(idxRev, ibytes)