tiflash: 多线程读取流程

主要是描述下 Segment 的多线程读流程是如何实现的。

就是为了实现读取同一个segemnt

SegmentReadTask

SegmentReadTask 表征的是读取一个 segment 不同 ranges 的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct SegmentReadTask
{
SegmentPtr segment;
SegmentSnapshotPtr read_snapshot;
RowKeyRanges ranges;

SegmentReadTask(const SegmentPtr & segment_, //
const SegmentSnapshotPtr & read_snapshot_,
const RowKeyRanges & ranges_);

explicit SegmentReadTask(const SegmentPtr & segment_, const SegmentSnapshotPtr & read_snapshot_);

~SegmentReadTask();

std::pair<size_t, size_t> getRowsAndBytes() const;

void addRange(const RowKeyRange & range) { ranges.push_back(range); }

void mergeRanges() { ranges = DM::tryMergeRanges(std::move(ranges), 1); }

// 将 tasks 划分为 >= expected_size 个任务
static SegmentReadTasks trySplitReadTasks(const SegmentReadTasks & tasks, size_t expected_size);
};

SegmentReadTaskPool

表征的是读取某个 table_id 的中的 segment,记录在 tasks.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

class SegmentReadTaskPool : private boost::noncopyable
{
private:
const uint64_t pool_id;
const int64_t table_id;
DMContextPtr dm_context;
ColumnDefines columns_to_read;
RSOperatorPtr filter;
const uint64_t max_version;
const size_t expected_block_size;
const bool is_raw;
const bool do_range_filter_for_raw;
SegmentReadTasks tasks;
AfterSegmentRead after_segment_read;
///...
}

比如,在DeltaMergeStore::read 函数中相关代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//1. 获得要读取的segment及其ranges
SegmentReadTasks tasks = getReadTasksByRanges(*dm_context,
sorted_ranges,
num_streams,
read_segments,
!enable_read_thread);

// 2. read_task_pool 则表征此 table 所有要读的任务的信息
auto read_task_pool = std::make_shared<SegmentReadTaskPool>(
physical_table_id,
dm_context,
columns_to_read,
filter,
max_version,
expected_block_size,
/* is_raw = */ is_fast_mode,
/* do_delete_mark_filter_for_raw = */ is_fast_mode,
std::move(tasks),
after_segment_read);

// 3. 将新的读取任务添加到调度器中
SegmentReadTaskScheduler::instance().add(read_task_pool);

SegmentReadTaskScheduler

SegmentReadTaskPool 表征的一个 table 级别要读取的任务信息,下面就是如何调度这些读取任务。

下面来介绍调度器 SegmentReadTaskScheduler。 其中,

  • read_pools: 记录的所有的 SegmentReadTaskPool 数据
  • merging_segments: 记录的是 DeltaMergeStore 中某个 Segment 出现在了哪些 SegmentReadTaskPool 中,方便后续收集不同 SegmentReadTaskPool 中出现的相同 Segment。
  • merged_task_pool: 是个cache,用于存放上次读取但是尚未完成的读取任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class SegmentReadTaskScheduler
{
//...
private:
std::mutex mtx;
CircularScanList<SegmentReadTaskPool> read_pools;
// table_id -> {seg_id -> pool_ids, seg_id -> pool_ids, ...}
std::unordered_map<int64_t, std::unordered_map<uint64_t, std::vector<uint64_t>>> merging_segments;

MergedTaskPool merged_task_pool;

std::atomic<bool> stop;
std::thread sched_thread;
};

add

该函数是用于添加新的读取任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool)
{
std::lock_guard lock(mtx);

// 1. 添加读取任务本身
read_pools.add(pool);

// 2. 记录读取任务的一些元数据信息
std::unordered_set<uint64_t> seg_ids;
for (const auto & task : pool->getTasks())
{
auto seg_id = task->segment->segmentId();
merging_segments[pool->tableId()][seg_id].push_back(pool->poolId());
// 读取同一个 segment 的不同 range, 要表征为一个 SegmentReadTask
if (!seg_ids.insert(seg_id).second)
{
throw DB::Exception(fmt::format("Not support split segment task. seg_ids {} => seg_id {} already exist.", seg_ids, seg_id));
}
}

auto [unexpired, expired] = read_pools.count(pool->tableId());
LOG_FMT_DEBUG(log, "add pool {} table {} block_slots {} segment count {} segments {} unexpired pool {} expired pool {}", //
pool->poolId(),
pool->tableId(),
pool->getFreeBlockSlots(),
seg_ids.size(),
seg_ids,
unexpired,
expired);
}

解释下 read_pools.count(pool->tableId()); 的返回值

由于可能多处在读取同一个 table,即多处引用同一个 SegmentReadTaskPoolPtr 对象来读取同一个 DeltaMergeStore,当所有引用 SegmentReadTaskPoolPtr 来读取 DeltaMergeStore 的地方都完成了,则 SegmentReadTaskPoolPtr 则处于 invalid 状态,否则即处于 valid 状态。

返回值 [unexpired, expired] 分别表示 [valid, invalid] 两种状态的个数。

  • unexpired: 有几个 UnorderedInputStream 对象正在读取同一个 table

SegmentReadTaskPool::valid

UnorderedInputStream 放在最后来讲,可以先看看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
bool SegmentReadTaskPool::valid() const
{
return !exceptionHappened() && unordered_input_stream_ref_count.load(std::memory_order_relaxed) > 0;
}

class UnorderedInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "UnorderedInputStream";

public:
UnorderedInputStream(
const SegmentReadTaskPoolPtr & task_pool_,
const ColumnDefines & columns_to_read_,
const int extra_table_id_index,
const TableID physical_table_id,
const String & req_id)
: task_pool(task_pool_)
, header(toEmptyBlock(columns_to_read_))
, extra_table_id_index(extra_table_id_index)
, physical_table_id(physical_table_id)
, log(Logger::get(NAME, req_id))
, ref_no(0)
{
if (extra_table_id_index != InvalidColumnID)
{
ColumnDefine extra_table_id_col_define = getExtraTableIDColumnDefine();
ColumnWithTypeAndName col{extra_table_id_col_define.type->createColumn(),
extra_table_id_col_define.type,
extra_table_id_col_define.name,
extra_table_id_col_define.id,
extra_table_id_col_define.default_value};
header.insert(extra_table_id_index, col);
}
ref_no = task_pool->increaseUnorderedInputStreamRefCount();
LOG_FMT_DEBUG(log, "pool {} ref {} created", task_pool->poolId(), ref_no);
}

~UnorderedInputStream() override
{
task_pool->decreaseUnorderedInputStreamRefCount();
LOG_FMT_DEBUG(log, "pool {} ref {} destroy", task_pool->poolId(), ref_no);
}
};

SegmentReadTaskScheduler

SegmentReadTaskScheduler 在构造函数中启动读取任务的调度功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
SegmentReadTaskScheduler::SegmentReadTaskScheduler()
: stop(false) , log(&Poco::Logger::get("SegmentReadTaskScheduler"))
{
sched_thread = std::thread(&SegmentReadTaskScheduler::schedThread, this);
}

void SegmentReadTaskScheduler::schedThread()
{
while (!isStop())
{
// 没有任务可执行,则 sleep
if (!schedule())
{
using namespace std::chrono_literals;
std::this_thread::sleep_for(2ms);
}
}
}

schedule

schedule 函数:

  • 将 read_pools 中读取相同 segment 的任务聚在一起
  • 将 merged_task 算一个批次进行读取
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    bool SegmentReadTaskScheduler::schedule()
    {
    Stopwatch sw;
    auto [merged_task, run_sche] = scheduleMergedTask();
    if (merged_task != nullptr)
    {
    LOG_FMT_DEBUG(log, "scheduleMergedTask seg_id {} pools {} => {} ms",
    merged_task->getSegmentId(),
    merged_task->getPoolIds(),
    sw.elapsedMilliseconds());
    SegmentReaderPoolManager::instance().addTask(std::move(merged_task));
    }
    return run_sche;
    }

scheduleMergedTask

执行流程如下:

  • scheduleSegmentReadTaskPoolUnlock 函数先选出一个处于 valid 状态的 SegmentReadTaskPool
  • merged_task_pool 查看是否之前已经读取过,但是上次读取没有完成, merged_task_pool 在此处是个 cache
  • 如果仍然没有,则调用 scheduleSegmentUnlock 函数选择一个 segment 来读取,
  • 如果存在待读取的 segment, 则调用 getPoolsUnlock 函数,获取 segment 所存在的 SegmentReadTaskPool 对象 pools
  • 取出 pools 中的同一个segement 读取任务

详细见代码注释。

顺带解释下,只有在第一个分支才会返回 {nullptr, false},重点是 false,因为只有这个时候才能说明整个队列中没有可执行的读请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
std::pair<MergedTaskPtr, bool> SegmentReadTaskScheduler::scheduleMergedTask()
{
std::lock_guard lock(mtx);

// 1. 是否存在 valid 状态 SegmentReadTaskPool 对象, 没有则说明读请求执行完毕
auto pool = scheduleSegmentReadTaskPoolUnlock();
if (pool == nullptr)
{
// No SegmentReadTaskPool to schedule. Maybe no read request or
// block queue of each SegmentReadTaskPool reaching the limit.
return {nullptr, false};
}

// 2. cache: 上次读取过此 pool 中的任务, 但是没有读取完
auto merged_task = merged_task_pool.pop(pool->poolId());
if (merged_task != nullptr)
{
GET_METRIC(tiflash_storage_read_thread_counter, type_sche_from_cache).Increment();
return {merged_task, true};
}

// 3. 选择一个 segment 来读取
auto segment = scheduleSegmentUnlock(pool);
if (!segment)
{
// The number of active segments reaches the limit.
GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_segment).Increment();
return {nullptr, true};
}

// 4. 包含此 segemnt 的 pools
SegmentReadTaskPools pools = getPoolsUnlock(segment->second);
if (pools.empty())
{
// Maybe SegmentReadTaskPools are expired because of upper threads finish the request.
return {nullptr, true};
}

// 5. 获取 pools 中包含同一个 segment_id(segment->first)的 SegmentReadTaskPool
std::vector<MergedUnit> units;
units.reserve(pools.size());
for (auto & pool : pools)
{
units.emplace_back(pool, pool->getTask(segment->first));
}
GET_METRIC(tiflash_storage_read_thread_counter, type_sche_new_task).Increment();

return {std::make_shared<MergedTask>(segment->first, std::move(units)), true};
}

scheduleSegmentUnlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
std::optional<std::pair<uint64_t, std::vector<uint64_t>>> 
SegmentReadTaskScheduler::scheduleSegmentUnlock(const SegmentReadTaskPoolPtr & pool)
{
auto [unexpired, expired] = read_pools.count(pool->tableId());
auto expected_merge_seg_count = std::min(unexpired, 2);
auto itr = merging_segments.find(pool->tableId());
if (itr == merging_segments.end())
{
// No segment of tableId left.
return std::nullopt;
}
std::optional<std::pair<uint64_t, std::vector<uint64_t>>> result;
auto & segments = itr->second; // segment_id --> pool_ids
auto target = pool->scheduleSegment(segments, expected_merge_seg_count);
if (target != segments.end())
{
// < 100 : 简单判断当前正在读取的压力不大
if (MergedTask::getPassiveMergedSegments() < 100 || target->second.size() == 1)
{
result = *target;
segments.erase(target);
if (segments.empty())
{
merging_segments.erase(itr);
}
}
else
{
// 当前读取的任务较多,则只读取一个
result = std::pair{target->first, std::vector<uint64_t>(1, pool->poolId())};
// 下面从 segments[target->first] 中删除 pool->poolId()
auto mutable_target = segments.find(target->first);
auto itr = std::find(mutable_target->second.begin(), mutable_target->second.end(), pool->poolId());
*itr = mutable_target->second.back();
mutable_target->second.resize(mutable_target->second.size() - 1);
}
}
return result;
}

scheduleSegment

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Choose a segment to read.
// Returns <segment_id, pool_ids>.
std::unordered_map<uint64_t, std::vector<uint64_t>>::const_iterator
SegmentReadTaskPool::scheduleSegment(const std::unordered_map<uint64_t, std::vector<uint64_t>> & segments, uint64_t expected_merge_count)
{
auto target = segments.end();
std::lock_guard lock(mutex);
if (getFreeActiveSegmentCountUnlock() <= 0)
{
return target;
}
for (const auto & task : tasks)
{
auto itr = segments.find(task->segment->segmentId());
if (itr == segments.end())
{
throw DB::Exception(fmt::format("seg_id {} not found from merging segments", task->segment->segmentId()));
}
if (std::find(itr->second.begin(), itr->second.end(), poolId()) == itr->second.end())
{
throw DB::Exception(fmt::format("pool {} not found from merging segment {}=>{}", poolId(), itr->first, itr->second));
}
if (target == segments.end() || itr->second.size() > target->second.size()) // 优先选择的 segment: 被更多 pool 读取
{
target = itr;
}

// 第一个满足条件的
// 读取此 task->segment 的 pool 个数 >= expected_merge_count
if (target->second.size() >= expected_merge_count)
{
break;
}
}
return target;
}

SegmentReaderPoolManager

SegmentReader 用于真正的读取segment。SegmentReaderPool 是一组 SegmentReader

1
2
3
4
5
6
void SegmentReaderPoolManager::addTask(MergedTaskPtr && task)
{
static std::hash<uint64_t> hash_func;
auto idx = hash_func(task->getSegmentId()) % reader_pools.size();
reader_pools[idx]->addTask(std::move(task));
}

SegmentReaderPool::addTask

1
2
3
4
5
6
7
8
void SegmentReaderPool::addTask(MergedTaskPtr && task)
{
if (!task_queue.push(std::move(task), nullptr))
{
throw Exception("addTask fail");
}
}

SegmentReader::readSegments

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
void SegmentReader::readSegments()
{
MergedTaskPtr merged_task;
try
{
if (!task_queue.pop(merged_task))
{
LOG_FMT_INFO(log, "pop fail, stop {}", isStop());
return;
}

SCOPE_EXIT({
if (!merged_task->allStreamsFinished())
{
SegmentReadTaskScheduler::instance().pushMergedTask(merged_task);
}
});

int read_count = 0;
while (!merged_task->allStreamsFinished() && !isStop())
{
auto c = merged_task->readBlock();
read_count += c;
if (c <= 0)
{
break;
}
}
if (read_count <= 0)
{
LOG_FMT_DEBUG(log, "pool {} seg_id {} read_count {}", merged_task->getPoolIds(), merged_task->getSegmentId(), read_count);
}
}
catch (DB::Exception & e)
{
LOG_FMT_ERROR(log, "ErrMsg: {} StackTrace {}", e.message(), e.getStackTrace().toString());
if (merged_task != nullptr)
{
merged_task->setException(e);
}
}
catch (std::exception & e)
{
LOG_FMT_ERROR(log, "ErrMsg: {}", e.what());
if (merged_task != nullptr)
{
merged_task->setException(DB::Exception(e.what()));
}
}
catch (...)
{
tryLogCurrentException("exception thrown in SegmentReader");
if (merged_task != nullptr)
{
merged_task->setException(DB::Exception("unknown exception thrown in SegmentReader"));
}
}
}

MergedTask::readOneBlock

读取相同segment的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
int MergedTask::readOneBlock()
{
int read_block_count = 0;
for (cur_idx = 0; cur_idx < static_cast<int>(units.size()); cur_idx++)
{
if (isStreamFinished(cur_idx))
{
continue;
}

auto & [pool, task, stream] = units[cur_idx];

if (!pool->valid())
{
setStreamFinished(cur_idx);
continue;
}

if (pool->getFreeBlockSlots() <= 0)
{
continue;
}

if (pool->readOneBlock(stream, task->segment))
{
read_block_count++;
}
else
{
setStreamFinished(cur_idx);
}
}
return read_block_count;
}

SegmentReadTaskPool::readOneBlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
bool SegmentReadTaskPool::readOneBlock(BlockInputStreamPtr & stream, const SegmentPtr & seg)
{
MemoryTrackerSetter setter(true, mem_tracker);
auto block = stream->read();
if (block)
{
pushBlock(std::move(block));
return true;
}
else
{
finishSegment(seg);
return false;
}
}

UnorderedInputStream::readImpl

上面的流程执行后执行了, UnorderedInputStream::read 执行读取的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Block UnorderedInputStream::readImpl(FilterPtr & /*res_filter*/, bool /*return_filter*/) override
{
if (done)
return {};
while (true)
{
FAIL_POINT_PAUSE(FailPoints::pause_when_reading_from_dt_stream);

Block res;
task_pool->popBlock(res);
if (res)
{
if (extra_table_id_index != InvalidColumnID)
{
ColumnDefine extra_table_id_col_define = getExtraTableIDColumnDefine();
ColumnWithTypeAndName col{{},
extra_table_id_col_define.type,
extra_table_id_col_define.name,
extra_table_id_col_define.id};
size_t row_number = res.rows();
col.column = col.type->createColumnConst(row_number, Field(physical_table_id));
res.insert(extra_table_id_index, std::move(col));
}
if (!res.rows())
{
continue;
}
else
{
total_rows += res.rows();
return res;
}
}
else
{
done = true;
return {};
}
}
}