主要是描述下 Segment 的多线程读流程是如何实现的。
就是为了实现读取同一个segemnt
SegmentReadTask SegmentReadTask 表征的是读取一个 segment 不同 ranges 的任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 struct SegmentReadTask { SegmentPtr segment; SegmentSnapshotPtr read_snapshot; RowKeyRanges ranges; SegmentReadTask (const SegmentPtr & segment_, const SegmentSnapshotPtr & read_snapshot_, const RowKeyRanges & ranges_); explicit SegmentReadTask (const SegmentPtr & segment_, const SegmentSnapshotPtr & read_snapshot_) ; ~SegmentReadTask (); std::pair<size_t , size_t > getRowsAndBytes () const ; void addRange (const RowKeyRange & range) { ranges.push_back (range); } void mergeRanges () { ranges = DM::tryMergeRanges (std::move (ranges), 1 ); } static SegmentReadTasks trySplitReadTasks (const SegmentReadTasks & tasks, size_t expected_size) ; };
SegmentReadTaskPool 表征的是读取某个 table_id
的中的 segment,记录在 tasks
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class SegmentReadTaskPool : private boost::noncopyable{ private : const uint64_t pool_id; const int64_t table_id; DMContextPtr dm_context; ColumnDefines columns_to_read; RSOperatorPtr filter; const uint64_t max_version; const size_t expected_block_size; const bool is_raw; const bool do_range_filter_for_raw; SegmentReadTasks tasks; AfterSegmentRead after_segment_read; }
比如,在DeltaMergeStore::read
函数中相关代码片段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 SegmentReadTasks tasks = getReadTasksByRanges (*dm_context, sorted_ranges, num_streams, read_segments, !enable_read_thread); auto read_task_pool = std::make_shared <SegmentReadTaskPool>( physical_table_id, dm_context, columns_to_read, filter, max_version, expected_block_size, is_fast_mode, is_fast_mode, std::move (tasks), after_segment_read); SegmentReadTaskScheduler::instance ().add (read_task_pool);
SegmentReadTaskScheduler SegmentReadTaskPool 表征的一个 table 级别要读取的任务信息,下面就是如何调度这些读取任务。
下面来介绍调度器 SegmentReadTaskScheduler
。 其中,
read_pools
: 记录的所有的 SegmentReadTaskPool 数据
merging_segments
: 记录的是 DeltaMergeStore 中某个 Segment 出现在了哪些 SegmentReadTaskPool 中,方便后续收集不同 SegmentReadTaskPool 中出现的相同 Segment。
merged_task_pool
: 是个cache,用于存放上次读取但是尚未完成的读取任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class SegmentReadTaskScheduler { private : std::mutex mtx; CircularScanList<SegmentReadTaskPool> read_pools; std::unordered_map<int64_t , std::unordered_map<uint64_t , std::vector<uint64_t >>> merging_segments; MergedTaskPool merged_task_pool; std::atomic<bool > stop; std::thread sched_thread; };
add 该函数是用于添加新的读取任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 void SegmentReadTaskScheduler::add (const SegmentReadTaskPoolPtr & pool) { std::lock_guard lock (mtx) ; read_pools.add (pool); std::unordered_set<uint64_t > seg_ids; for (const auto & task : pool->getTasks ()) { auto seg_id = task->segment->segmentId (); merging_segments[pool->tableId ()][seg_id].push_back (pool->poolId ()); if (!seg_ids.insert (seg_id).second) { throw DB::Exception (fmt::format("Not support split segment task. seg_ids {} => seg_id {} already exist." , seg_ids, seg_id)); } } auto [unexpired, expired] = read_pools.count (pool->tableId ()); LOG_FMT_DEBUG (log, "add pool {} table {} block_slots {} segment count {} segments {} unexpired pool {} expired pool {}" , pool->poolId (), pool->tableId (), pool->getFreeBlockSlots (), seg_ids.size (), seg_ids, unexpired, expired); }
解释下 read_pools.count(pool->tableId());
的返回值
由于可能多处在读取同一个 table,即多处引用同一个 SegmentReadTaskPoolPtr 对象来读取同一个 DeltaMergeStore,当所有引用 SegmentReadTaskPoolPtr 来读取 DeltaMergeStore 的地方都完成了,则 SegmentReadTaskPoolPtr 则处于 invalid 状态,否则即处于 valid 状态。
返回值 [unexpired, expired] 分别表示 [valid, invalid] 两种状态的个数。
unexpired
: 有几个 UnorderedInputStream 对象正在读取同一个 table
SegmentReadTaskPool::valid UnorderedInputStream
放在最后来讲,可以先看看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 bool SegmentReadTaskPool::valid () const { return !exceptionHappened () && unordered_input_stream_ref_count.load (std::memory_order_relaxed) > 0 ; } class UnorderedInputStream : public IProfilingBlockInputStream{ static constexpr auto NAME = "UnorderedInputStream" ; public : UnorderedInputStream ( const SegmentReadTaskPoolPtr & task_pool_, const ColumnDefines & columns_to_read_, const int extra_table_id_index, const TableID physical_table_id, const String & req_id) : task_pool (task_pool_) , header (toEmptyBlock (columns_to_read_)) , extra_table_id_index (extra_table_id_index) , physical_table_id (physical_table_id) , log (Logger::get (NAME, req_id)) , ref_no (0 ) { if (extra_table_id_index != InvalidColumnID) { ColumnDefine extra_table_id_col_define = getExtraTableIDColumnDefine (); ColumnWithTypeAndName col{extra_table_id_col_define.type->createColumn (), extra_table_id_col_define.type, extra_table_id_col_define.name, extra_table_id_col_define.id, extra_table_id_col_define.default_value}; header.insert (extra_table_id_index, col); } ref_no = task_pool->increaseUnorderedInputStreamRefCount (); LOG_FMT_DEBUG (log, "pool {} ref {} created" , task_pool->poolId (), ref_no); } ~UnorderedInputStream () override { task_pool->decreaseUnorderedInputStreamRefCount (); LOG_FMT_DEBUG (log, "pool {} ref {} destroy" , task_pool->poolId (), ref_no); } };
SegmentReadTaskScheduler SegmentReadTaskScheduler 在构造函数中启动读取任务的调度功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 SegmentReadTaskScheduler::SegmentReadTaskScheduler () : stop (false ) , log (&Poco::Logger::get ("SegmentReadTaskScheduler" )) { sched_thread = std::thread (&SegmentReadTaskScheduler::schedThread, this ); } void SegmentReadTaskScheduler::schedThread () { while (!isStop ()) { if (!schedule ()) { using namespace std::chrono_literals; std::this_thread::sleep_for (2 ms); } } }
schedule schedule
函数:
将 read_pools 中读取相同 segment 的任务聚在一起
将 merged_task 算一个批次进行读取1 2 3 4 5 6 7 8 9 10 11 12 13 14 bool SegmentReadTaskScheduler::schedule () { Stopwatch sw; auto [merged_task, run_sche] = scheduleMergedTask (); if (merged_task != nullptr ) { LOG_FMT_DEBUG (log, "scheduleMergedTask seg_id {} pools {} => {} ms" , merged_task->getSegmentId (), merged_task->getPoolIds (), sw.elapsedMilliseconds ()); SegmentReaderPoolManager::instance ().addTask (std::move (merged_task)); } return run_sche; }
scheduleMergedTask 执行流程如下:
scheduleSegmentReadTaskPoolUnlock
函数先选出一个处于 valid
状态的 SegmentReadTaskPool
从 merged_task_pool
查看是否之前已经读取过,但是上次读取没有完成, merged_task_pool 在此处是个 cache
如果仍然没有,则调用 scheduleSegmentUnlock
函数选择一个 segment 来读取,
如果存在待读取的 segment, 则调用 getPoolsUnlock
函数,获取 segment
所存在的 SegmentReadTaskPool 对象 pools
取出 pools
中的同一个segement
读取任务
详细见代码注释。
顺带解释下,只有在第一个分支才会返回 {nullptr, false}
,重点是 false,因为只有这个时候才能说明整个队列中没有可执行的读请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 std::pair<MergedTaskPtr, bool > SegmentReadTaskScheduler::scheduleMergedTask () { std::lock_guard lock (mtx) ; auto pool = scheduleSegmentReadTaskPoolUnlock (); if (pool == nullptr ) { return {nullptr , false }; } auto merged_task = merged_task_pool.pop (pool->poolId ()); if (merged_task != nullptr ) { GET_METRIC (tiflash_storage_read_thread_counter, type_sche_from_cache).Increment (); return {merged_task, true }; } auto segment = scheduleSegmentUnlock (pool); if (!segment) { GET_METRIC (tiflash_storage_read_thread_counter, type_sche_no_segment).Increment (); return {nullptr , true }; } SegmentReadTaskPools pools = getPoolsUnlock (segment->second); if (pools.empty ()) { return {nullptr , true }; } std::vector<MergedUnit> units; units.reserve (pools.size ()); for (auto & pool : pools) { units.emplace_back (pool, pool->getTask (segment->first)); } GET_METRIC (tiflash_storage_read_thread_counter, type_sche_new_task).Increment (); return {std::make_shared <MergedTask>(segment->first, std::move (units)), true }; }
scheduleSegmentUnlock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 std::optional<std::pair<uint64_t , std::vector<uint64_t >>> SegmentReadTaskScheduler::scheduleSegmentUnlock (const SegmentReadTaskPoolPtr & pool) { auto [unexpired, expired] = read_pools.count (pool->tableId ()); auto expected_merge_seg_count = std::min (unexpired, 2 ); auto itr = merging_segments.find (pool->tableId ()); if (itr == merging_segments.end ()) { return std::nullopt ; } std::optional<std::pair<uint64_t , std::vector<uint64_t >>> result; auto & segments = itr->second; auto target = pool->scheduleSegment (segments, expected_merge_seg_count); if (target != segments.end ()) { if (MergedTask::getPassiveMergedSegments () < 100 || target->second.size () == 1 ) { result = *target; segments.erase (target); if (segments.empty ()) { merging_segments.erase (itr); } } else { result = std::pair{target->first, std::vector <uint64_t >(1 , pool->poolId ())}; auto mutable_target = segments.find (target->first); auto itr = std::find (mutable_target->second.begin (), mutable_target->second.end (), pool->poolId ()); *itr = mutable_target->second.back (); mutable_target->second.resize (mutable_target->second.size () - 1 ); } } return result; }
scheduleSegment 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 std::unordered_map<uint64_t , std::vector<uint64_t >>::const_iterator SegmentReadTaskPool::scheduleSegment (const std::unordered_map<uint64_t , std::vector<uint64_t >> & segments, uint64_t expected_merge_count) { auto target = segments.end (); std::lock_guard lock (mutex) ; if (getFreeActiveSegmentCountUnlock () <= 0 ) { return target; } for (const auto & task : tasks) { auto itr = segments.find (task->segment->segmentId ()); if (itr == segments.end ()) { throw DB::Exception (fmt::format("seg_id {} not found from merging segments" , task->segment->segmentId ())); } if (std::find (itr->second.begin (), itr->second.end (), poolId ()) == itr->second.end ()) { throw DB::Exception (fmt::format("pool {} not found from merging segment {}=>{}" , poolId (), itr->first, itr->second)); } if (target == segments.end () || itr->second.size () > target->second.size ()) { target = itr; } if (target->second.size () >= expected_merge_count) { break ; } } return target; }
SegmentReaderPoolManager SegmentReader
用于真正的读取segment。SegmentReaderPool
是一组 SegmentReader
1 2 3 4 5 6 void SegmentReaderPoolManager::addTask (MergedTaskPtr && task) { static std::hash<uint64_t > hash_func; auto idx = hash_func (task->getSegmentId ()) % reader_pools.size (); reader_pools[idx]->addTask (std::move (task)); }
SegmentReaderPool::addTask 1 2 3 4 5 6 7 8 void SegmentReaderPool::addTask (MergedTaskPtr && task) { if (!task_queue.push (std::move (task), nullptr )) { throw Exception ("addTask fail" ); } }
SegmentReader::readSegments 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 void SegmentReader::readSegments () { MergedTaskPtr merged_task; try { if (!task_queue.pop (merged_task)) { LOG_FMT_INFO (log, "pop fail, stop {}" , isStop ()); return ; } SCOPE_EXIT ({ if (!merged_task->allStreamsFinished ()) { SegmentReadTaskScheduler::instance ().pushMergedTask (merged_task); } }); int read_count = 0 ; while (!merged_task->allStreamsFinished () && !isStop ()) { auto c = merged_task->readBlock (); read_count += c; if (c <= 0 ) { break ; } } if (read_count <= 0 ) { LOG_FMT_DEBUG (log, "pool {} seg_id {} read_count {}" , merged_task->getPoolIds (), merged_task->getSegmentId (), read_count); } } catch (DB::Exception & e) { LOG_FMT_ERROR (log, "ErrMsg: {} StackTrace {}" , e.message (), e.getStackTrace ().toString ()); if (merged_task != nullptr ) { merged_task->setException (e); } } catch (std::exception & e) { LOG_FMT_ERROR (log, "ErrMsg: {}" , e.what ()); if (merged_task != nullptr ) { merged_task->setException (DB::Exception (e.what ())); } } catch (...) { tryLogCurrentException ("exception thrown in SegmentReader" ); if (merged_task != nullptr ) { merged_task->setException (DB::Exception ("unknown exception thrown in SegmentReader" )); } } }
MergedTask::readOneBlock 读取相同segment的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 int MergedTask::readOneBlock () { int read_block_count = 0 ; for (cur_idx = 0 ; cur_idx < static_cast <int >(units.size ()); cur_idx++) { if (isStreamFinished (cur_idx)) { continue ; } auto & [pool, task, stream] = units[cur_idx]; if (!pool->valid ()) { setStreamFinished (cur_idx); continue ; } if (pool->getFreeBlockSlots () <= 0 ) { continue ; } if (pool->readOneBlock (stream, task->segment)) { read_block_count++; } else { setStreamFinished (cur_idx); } } return read_block_count; }
SegmentReadTaskPool::readOneBlock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 bool SegmentReadTaskPool::readOneBlock (BlockInputStreamPtr & stream, const SegmentPtr & seg) { MemoryTrackerSetter setter (true , mem_tracker) ; auto block = stream->read (); if (block) { pushBlock (std::move (block)); return true ; } else { finishSegment (seg); return false ; } }
上面的流程执行后执行了, UnorderedInputStream::read 执行读取的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 Block UnorderedInputStream::readImpl (FilterPtr & , bool ) override { if (done) return {}; while (true ) { FAIL_POINT_PAUSE (FailPoints::pause_when_reading_from_dt_stream); Block res; task_pool->popBlock (res); if (res) { if (extra_table_id_index != InvalidColumnID) { ColumnDefine extra_table_id_col_define = getExtraTableIDColumnDefine (); ColumnWithTypeAndName col{{}, extra_table_id_col_define.type, extra_table_id_col_define.name, extra_table_id_col_define.id}; size_t row_number = res.rows (); col.column = col.type->createColumnConst (row_number, Field (physical_table_id)); res.insert (extra_table_id_index, std::move (col)); } if (!res.rows ()) { continue ; } else { total_rows += res.rows (); return res; } } else { done = true ; return {}; } } }