size_t cur_rows_offset = rows_offset; size_t cur_deletes_offset = deletes_offset; auto flush_task = std::make_shared<ColumnFileFlushTask>(context, shared_from_this(), flush_version); for (auto & column_file : column_files) { auto & task = flush_task->addColumnFile(column_file); if (auto * m_file = column_file->tryToInMemoryFile(); m_file) { // If the ColumnFile is not yet persisted in the disk, it will contain block data. // In this case, let's write the block data in the flush process as well. task.rows_offset = cur_rows_offset; // 每个file的起始地址 task.deletes_offset = cur_deletes_offset; task.block_data = m_file->readDataForFlush(); } cur_rows_offset += column_file->getRows(); cur_deletes_offset += column_file->getDeletes(); } return flush_task; }
DeltaIndexPtr tryCloneInner(size_t placed_deletes_limit, const Updates * updates = nullptr) { DeltaTreePtr delta_tree_copy; size_t placed_rows_copy = 0; size_t placed_deletes_copy = 0; // Make sure the delta index do not place more deletes than `placed_deletes_limit`. // Because delete ranges can break MVCC view. { std::scoped_lock lock(mutex); // Safe to reuse the copy of the existing DeltaIndex if (placed_deletes <= placed_deletes_limit) { delta_tree_copy = delta_tree; placed_rows_copy = placed_rows; placed_deletes_copy = placed_deletes; } }
if (delta_tree_copy) { auto new_delta_tree = std::make_shared<DefaultDeltaTree>(*delta_tree_copy); auto new_index = std::make_shared<DeltaIndex>(new_delta_tree, placed_rows_copy, placed_deletes_copy); // try to do some updates before return it if need if (updates) new_index->applyUpdates(*updates); return new_index; } else { // Otherwise, create an empty new DeltaIndex. return std::make_shared<DeltaIndex>(); } }
applyUpdates
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
voidapplyUpdates(const Updates & updates) { for (constauto & update : updates) { // Current index does not contain any inserts which go shuffled. if (placed_rows <= update.rows_offset) return;
// Current index contains part of inserts which go shuffled, they should be removed. if (placed_rows < update.rows_offset + update.idx_mapping.size()) { delta_tree->removeInsertsStartFrom(update.rows_offset); placed_rows = update.rows_offset; return; } // Current index contains all inserts which go shuffled, let's update them directly. delta_tree->updateTupleId(update.idx_mapping, update.rows_offset); } }