Pipeline: Morsel 和 OlapScanOperator (1)
MorselQueue 用于存放本次待读取的数据源 Morsels 集合,继承派生体系如下:
convert_scan_range_to_morsel_queue
convert_scan_range_to_morsel_queue
函数将本次带查询的数据源 scan_ranges
转换成 std::vector<pipeline::Morsel>
对象 morsels
,并根据是否能开启 Tablet
选择具体的 MorselQueue 子类。
因此,区别在于 morsels
在 MorselQueue 内部怎么划分。
1 | StatusOr<pipeline::MorselQueuePtr> OlapScanNode::convert_scan_range_to_morsel_queue( |
_could_tablet_internal_parallel
变量 enable_tablet_internal_parallel
为 true 时,只是建议开启 tablet 并行,能不能真开启还要使用 _could_tablet_internal_parallel 函数根据实际多个因素判断。
主键模型开启了持久化索引 use_pk_index 为 false
tablet_internal_parallel_mode 默认值是 TTabletInternalParallelMode::AUTO,如果不是主动设置为 FORCE_SPLIT,并且实际的读取的 num_total_scan_ranges >= 本次 Pipeline 的并行度 pipeline_dop, 也返回 false
如果 num_total_scan_ranges >= pipeline_dop,则说明当前有非常多的 tablet 待读取,不应该再把划分,否则会导致很多竞争。
统计本次需要读取的行数 num_table_rows
config::tablet_internal_parallel_max_splitted_scan_bytes,默认是 512M,是单次从一个 segment 文件中读取的最大字节数,_estimated_scan_row_bytes 本次要读取的所有字段类型的大小(即一行数据的大小),二者相除的结果 splitted_scan_rows 即单次从 segment 文件中读取的最大行数。
再将 splitted_scan_rows 限制在 [min_splitted_scan_rows, max_splitted_scan_rows] 区间,
先估计本次的 scan_dop = num_table_rows / splitted_scan_rows,再将 scan_dop 限制到 [1, pipeline_dop] 区间。
只要 scan_dop > pipeline_dop 或者 config::tablet_internal_parallel_min_scan_dop 本函数就返回 true。
因此,如果函数返回 false 上层就直接启用 FixedMorselQueue 来粗存储本次数据源 Morsels,返回 true 则会使用 SplitMorseQueue。
scan_dop, splitted_scan_rows 两个参数输出后再作为 SplitMorselQueue 的参数。
1 | StatusOr<bool> OlapScanNode::_could_tablet_internal_parallel( |
FixedMorselQueue
顾名思义,FixedMorselQueue
存储是固定数据量的 Morsels,FixedMorselQueue::_morsels
都是 ScanMorsel 对象(不是 ScanMorsel 的子类),即是以 tablet 为级别,后续 TabletReader 要从存储层查询该 _morsels[i] 指向的数据时,是需要 FullScan。
当从 OlapScanOperator 从存储层获得数据后,才会调用 FixedMorselQueue::set_tablet_rowsets
函数将数据赋值给 _tablet_rowsets
,进而 ScanOperator::pull 就可以从 FixedMorselQueue::try_get
函数从 _tablet_rowsets 中获得数据,
1 | class FixedMorselQueue final : public MorselQueue { |
FixedMorselQueue::try_get
FixedMorselQueue 是有可能在多个 Pipeline 线程之间共享,即共享数据源,多个 Pipelines 同时从 FixedMorselQueue 获取数据源信息。而 FixedMorselQueue::_morsels 大小在构造函数已经确定,且 FixedMorselQueue 内部不会再细分每个 morsel(和 SplitMorselQueue 的对比),因此可以将 FixedMorselQueue 设计成 LookFree 的数据结构,因此使用 _pop_index 来指向当前已经分配的 morsel 就能满足要求。
在 FixedMorselQueue::try_get 中,通 std::move 将 _morsels[idx]、_tablet_rowsets[idx] 中的数据转移给上层,这样不会破坏 _morsels、_tablet_rowsets 结构,在多线程只读场景下就不用 mutex 保护。当 _pop_index == _num_morsels, _tablet_rowsets 和 _morsels 就是大小为 _num_morsels 的空壳。如此,也就实现了 LookFree。
源码如下。
1 | StatusOr<MorselPtr> FixedMorselQueue::try_get() { |
SplitMorselQueue
scan_dop 传递给 SplitMorselQueue::_degree_of_parallelism,后续用于生成 MorselQueueFactory。
1 | SplitMorselQueue::SplitMorselQueue(Morsels&& morsels, |
SplitMorselQueue 有两个子类,根据 _could_split_tablet_physically 函数来选择。只有在表类型是聚合模型(即 AGG_KEYS 或者 UNIQUE_KEYS)且数据尚未聚合完成(skip_aggr 为 false)时,该函数才会返回 false。
返回 true 时,选择 PhysicalSplitMorselQueue,否则选择 LogicalSplitMorselQueue 对象。
1 | StatusOr<bool> OlapScanNode::_could_split_tablet_physically(const std::vector<TScanRangeParams>& scan_ranges) const { |
PhysicalSplitMorselQueue::try_get
StarRocks 中数据持久化后文件的基本单位是 Segment,一个 Tablet 数据组织结构是 Tablet/Rowset/Segment,每次从 Segment 中读取的行数是 _splitted_scan_rows,因此可以将 PhysicalSplitMorselQueue::try_get 分为两个部分:
检测 _cur_segment 是否为空,以及 _cur_segment 是还有剩余可读数据
如果没有,则需要从磁盘中能读取下一个 Segment 文件,将其加载到内存中,生成 SegemntIterator 对象 _segment_range_iter,再进行下一步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26StatusOr<MorselPtr> PhysicalSplitMorselQueue::try_get() {
std::lock_guard<std::mutex> lock(_mutex);
if (_tablet_idx >= _tablets.size()) {
return nullptr;
}
// When it hasn't initialized any segment,
// or _segment_idx exceeds the segments of the current rowset,
// or current segment is empty or finished,
// we should pick up the next segment and init it.
while (!_has_init_any_segment
|| _cur_segment() == nullptr
|| _cur_segment()->num_rows() == 0 ||
!_segment_range_iter.has_more()) {
if (!_next_segment()) {
return nullptr;
}
if (auto status = _init_segment(); !status.ok()) {
// Morsel_queue cannot generate morsels after errors occurring.
_tablet_idx = _tablets.size();
return status;
}
}
//...
}当仍有待读取的数据,则通过 _segment_range_iter 获取下一个批次数据(即 _splitted_scan_rows 行),如果剩余的数据不足 _splitted_scan_rows 行,则将剩余的数据合并到当前批次,(合并)生成一个 PhysicalSplitScanMorsel 对象,其中本次读取的元信息 {rowset, segment_id, token_range} 精确指示了本次要读取的数据位置,最终在读取数据时,用于初始化 OlapChunkSource::_params 参数的 rowid_range_option 字段。
这里的 _tablet_rowsets[_tablet_idx] 就不能用 std::move 赋值给 Morsel 了,因为其他线程也要使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28StatusOr<MorselPtr> PhysicalSplitMorselQueue::try_get() {
std::lock_guard<std::mutex> lock(_mutex);
//... above code
vectorized::SparseRange taken_range;
_segment_range_iter.next_range(_splitted_scan_rows, &taken_range);
_num_segment_rest_rows -= taken_range.span_size();
if (_num_segment_rest_rows < _splitted_scan_rows) {
// If there are too few rows left in the segment, take them all this time.
_segment_range_iter.next_range(_splitted_scan_rows, &taken_range);
_num_segment_rest_rows = 0;
}
auto* scan_morsel = _cur_scan_morsel();
auto* rowset = _cur_rowset();
auto rowid_range = std::make_shared<vectorized::RowidRangeOption>(
rowset->rowset_id(),
rowset->segments()[_segment_idx]->id(),
std::move(taken_range));
MorselPtr morsel = std::make_unique<PhysicalSplitScanMorsel>(
scan_morsel->get_plan_node_id(),
*(scan_morsel->get_scan_range()),
std::move(rowid_range));
morsel->set_rowsets(_tablet_rowsets[_tablet_idx]);
_inc_num_splits(_is_last_split_of_current_morsel());
return morsel;
}
_next_segment
寻找下个 segment 是比较简单的,依次按照 segment_id/rowset_id/tablet_id 递增的顺序。比如,如果当前 rowset 的 segments 读取完,则 ++rowset_id,切换到当前 tablet 的下一个 rowset,如果当前 tablet 的所有 rowsets 都读取完毕,则 ++tablet_id,切换到下一个 tablet。
当 tablet_id >= _tablets.size(),_next_segment 函数返回 false,表示所有数据都已经读取完毕。
这部分代码比较简单。
1 | Rowset* PhysicalSplitMorselQueue::_cur_rowset() { |
_inti_segment
初始化 segment 要复杂点。但是也可以大致分为两个步骤:
加载元数据
针对每个 Tablet 都有一个查询范围 **{_range_start_key, _Range_end_key}**,两边是开闭还是闭区间由 {_range_start_op, _range_end_op} 表示。比如,如果 _range_xxx_op 中包含等于操作即闭区间,否则是开区间,这两个符号定义如下:
1
2enum class RangeStartOperation { GT = 0, GE, EQ };
enum class RangeEndOperation { LT = 0, LE, EQ };第一步主要是将 OlapScanRange 类型的 {_range_start_key, _range_end_key} 转化为 SeekRange 类型的 _tablet_seek_ranges,这样方便后续遍历。
注意:这里的 _range_start_key 和 _range_end_key 都是 std::vector 类型,可能包含多个查询范围,比如 1 < x <= 2, 10 < y < 20,而起始条件都在 _range_start_key 中,终止条件都在 _Range_end_key 中。
顺带提下,在读过程中,数据的内存分配方式基本都是基于内存池 MemPool。这个 MemPool 只是缓存作用,实际内部分配内存的操作是由 ChunkAllocator 完成,其分配内存特点是先在线程所在 core 上分配,如果线程所在 core 上分配的内存超过了限制(默认2G)则会 cross-core 分配,这样可以减少线程跨 core 通信。
因为数据读取的单位是 Segment,当 segment == 0 即表示需要从一个新的 Rowset 中读取数据,因此需要先调用 Rowset::load 函数初始化 Rowset::_segments,来获悉所有 semgnet 信息。
如果,当前 rowset 没有可读的数据,则 return。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26Status PhysicalSplitMorselQueue::_init_segment() {
// Load the meta of the new rowset and the index of the new segment。
if (0 == _segment_idx) {
// Read a new tablet.
if (0 == _rowset_idx) {
_tablet_seek_ranges.clear();
_mempool.clear();
RETURN_IF_ERROR(vectorized::TabletReader::parse_seek_range(
_tablets[_tablet_idx], _range_start_op, _range_end_op,
_range_start_key, _range_end_key,
&_tablet_seek_ranges, &_mempool));
}
// Read a new rowset.
RETURN_IF_ERROR(_cur_rowset()->load());
}
_num_segment_rest_rows = 0;
_segment_scan_range.clear();
auto* segment = _cur_segment();
// The new rowset doesn't contain any segment.
if (segment == nullptr || segment->num_rows() == 0) {
return Status::OK();
}
//...step(1) 过后,得到可迭代的 _tablet_seek_ranges,下面就需要将其转化为在 Segment 中的所有读取范围,并合并到 _segment_scan_range 中。
而且 _segment_scan_range 中每一个 range 的起始位置 {lower_rowid, upper_rowid} 都待读取数据的行号,因此后续读取效率会非常高。
比如找某个 key 在 segment 的上限位置,这是由于 _upper_bound_ordinal 函数:尝试将该 key 根据表的 short_keys 编码成 index_key,再在该 segment 中利用二分搜索定位到 key 在 segment 文件中的上限位置 end。
1
2
3
4
5
6
7
8
9
10
11rowid_t PhysicalSplitMorselQueue::_upper_bound_ordinal(Segment* segment, const vectorized::SeekTuple& key, bool lower,
rowid_t end) const {
std::string index_key =
key.short_key_encode(segment->num_short_keys(), lower ? KEY_MINIMAL_MARKER : KEY_MAXIMAL_MARKER);
// 定位到 key 在 segment 中的上限
auto end_iter = segment->upper_bound(index_key);
if (end_iter.valid()) {
end = end_iter.ordinal() * segment->num_rows_per_block();
}
return end;
}所以, _upper_bound_ordinal 函数也说明了step(1) 中将 OlapScanRange 转为 SeekTuple 类型的原因,因为 SeekTuple 包含每一行的 Schema,可以对查找的 key 基于 short_keys 进行编码。最后得到的
_segment_scan_range
即表征了 curr_segment 要读取的数据范围。第二部分完整的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37Status PhysicalSplitMorselQueue::_init_segment() {
//...above code
// Find the rowid range of each key range in this segment.
if (_tablet_seek_ranges.empty()) {
_segment_scan_range.add(vectorized::Range(0, segment->num_rows()));
} else {
RETURN_IF_ERROR(segment->load_index());
for (const auto& range : _tablet_seek_ranges) {
rowid_t lower_rowid = 0;
rowid_t upper_rowid = segment->num_rows();
// 上限
if (!range.upper().empty()) {
upper_rowid = _upper_bound_ordinal(segment,
range.upper(),
!range.inclusive_upper(),
segment->num_rows());
}
// 下限
if (!range.lower().empty() && upper_rowid > 0) {
lower_rowid = _lower_bound_ordinal(segment,
range.lower(),
range.inclusive_lower());
}
// 在 segment 中, 一个可读取的 range 就产生了
if (lower_rowid <= upper_rowid) {
_segment_scan_range.add(vectorized::Range{lower_rowid, upper_rowid});
}
}
}
_segment_range_iter = _segment_scan_range.new_iterator();
_num_segment_rest_rows = _segment_scan_range.span_size();
return Status::OK();
}
MorselQueueFactory
MorselQueue 创建完后,就要准备创建 MorselQueueFactory。因为后续由 MorselQueueFactory 将 MorselQueue 分配给 PipelineDriver。
顾名思义,SharedMorselQueueFactory 即所有的 PipelineDrivers 共享一个 MorselQueue,此时就要求该 MorselQueue 具有较好的并发性能,比如FixedMorselQueue,而 IndividualMorselQueueFactory 则为每个 PipelineDriver 都分配一个 MorselQueue。
通过 Factory 创建 MorselQueue 的函数如下:
1 | MorselQueue* SharedMorselQueueFactory::create(int driver_sequence) override { |
convert_scan_range_to_morsel_queue_factory
在创建 Factory 前,需要先由 convert_scan_range_to_morsel_queue 函数获得 MorselQueue,再根据 frontend 优化器是否设置了 scan_ranges_per_driver_seq 来做一些决策。
如果 scan_ranges_per_driver_seq 为空,则为全局要读取的数据范围 global_scan_ranges 创建对应的 MorselQueue。
这种情况是会发生 LocalShuffle。
MorselQueue::max_degree_of_parallelism() 表征同时支持的读取并行度,比如 FixedMorseQueue 的 max_degree_of_parallelism 即 _morsel_size,最多只允许 _morsel_size 一起读取。而 SplitMorselQueue 的 max_degree_of_parallelism 则是在 _could_tablet_internal_parallel 函数中计算出来的 scan_dop。
到底是什么规模想要用 FixedMorselQueue
scan_ranges_per_driver_seq 不为空,是因为 BE 不需要再次进行 LocalShuffle,即每个 PipelineDriver 在 FE 的 Coordinator 中已经分配好了自己要读取的 scan_range(对应某个 tablet),则直接使用 convert_scan_range_to_morsel_queue 函数每个 PipelineDriver 生成 MorselQueue,并添加到 pipeline::IndividualMorselQueueFactory 对象中暂存。
这种情况,一般是 HashAggreagte、ColocationJOIN、BucketShuffleJOIN 时才有可能发生。
这部分代码如下。
1 | StatusOr<pipeline::MorselQueueFactoryPtr> ScanNode::convert_scan_range_to_morsel_queue_factory( |
下一部分从 SourceOperator 角度看怎么获取 Morsels 去存储层获取数据。