Pipeline: Morsel 和 OlapScanOperator (1)

MorselQueue 用于存放本次待读取的数据源 Morsels 集合,继承派生体系如下:

Morsel-Queue

convert_scan_range_to_morsel_queue

convert_scan_range_to_morsel_queue 函数将本次带查询的数据源 scan_ranges 转换成 std::vector<pipeline::Morsel> 对象 morsels,并根据是否能开启 Tablet 选择具体的 MorselQueue 子类。

因此,区别在于 morsels 在 MorselQueue 内部怎么划分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
StatusOr<pipeline::MorselQueuePtr> OlapScanNode::convert_scan_range_to_morsel_queue(
const std::vector<TScanRangeParams>& scan_ranges,
int node_id, int32_t pipeline_dop,
bool enable_tablet_internal_parallel,
TTabletInternalParallelMode::type tablet_internal_parallel_mode,
size_t num_total_scan_ranges) {
pipeline::Morsels morsels;
for (const auto& scan_range : scan_ranges) {
morsels.emplace_back(std::make_unique<pipeline::ScanMorsel>(node_id, scan_range));
}

// None tablet to read shouldn't use tablet internal parallel.
if (morsels.empty()) {
return std::make_unique<pipeline::FixedMorselQueue>(std::move(morsels));
}

// Disable by the session variable shouldn't use tablet internal parallel.
if (!enable_tablet_internal_parallel) {
return std::make_unique<pipeline::FixedMorselQueue>(std::move(morsels));
}

int64_t scan_dop;
int64_t splitted_scan_rows;
ASSIGN_OR_RETURN(auto could, _could_tablet_internal_parallel(
scan_ranges, pipeline_dop,
num_total_scan_ranges,
tablet_internal_parallel_mode,
&scan_dop, &splitted_scan_rows));
if (!could) {
return std::make_unique<pipeline::FixedMorselQueue>(std::move(morsels));
}

// Split tablet physically.
ASSIGN_OR_RETURN(bool ok, _could_split_tablet_physically(scan_ranges));
if (ok) {
return std::make_unique<pipeline::PhysicalSplitMorselQueue>(
std::move(morsels), scan_dop, splitted_scan_rows);
}

return std::make_unique<pipeline::LogicalSplitMorselQueue>(
std::move(morsels), scan_dop, splitted_scan_rows);
}

_could_tablet_internal_parallel

变量 enable_tablet_internal_parallel 为 true 时,只是建议开启 tablet 并行,能不能真开启还要使用 _could_tablet_internal_parallel 函数根据实际多个因素判断。

  • 主键模型开启了持久化索引 use_pk_index 为 false

  • tablet_internal_parallel_mode 默认值是 TTabletInternalParallelMode::AUTO,如果不是主动设置为 FORCE_SPLIT,并且实际的读取的 num_total_scan_ranges >= 本次 Pipeline 的并行度 pipeline_dop, 也返回 false

    如果 num_total_scan_ranges >= pipeline_dop,则说明当前有非常多的 tablet 待读取,不应该再把划分,否则会导致很多竞争。

  • 统计本次需要读取的行数 num_table_rows

    config::tablet_internal_parallel_max_splitted_scan_bytes,默认是 512M,是单次从一个 segment 文件中读取的最大字节数,_estimated_scan_row_bytes 本次要读取的所有字段类型的大小(即一行数据的大小),二者相除的结果 splitted_scan_rows 即单次从 segment 文件中读取的最大行数。

    再将 splitted_scan_rows 限制在 [min_splitted_scan_rows, max_splitted_scan_rows] 区间,

    先估计本次的 scan_dop = num_table_rows / splitted_scan_rows,再将 scan_dop 限制到 [1, pipeline_dop] 区间。

    只要 scan_dop > pipeline_dop 或者 config::tablet_internal_parallel_min_scan_dop 本函数就返回 true。

因此,如果函数返回 false 上层就直接启用 FixedMorselQueue 来粗存储本次数据源 Morsels,返回 true 则会使用 SplitMorseQueue。

scan_dop, splitted_scan_rows 两个参数输出后再作为 SplitMorselQueue 的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
StatusOr<bool> OlapScanNode::_could_tablet_internal_parallel(
const std::vector<TScanRangeParams>& scan_ranges,
int32_t pipeline_dop, size_t num_total_scan_ranges,
TTabletInternalParallelMode::type tablet_internal_parallel_mode,
int64_t* scan_dop, int64_t* splitted_scan_rows) const {
if (_olap_scan_node.use_pk_index) {
return false;
}
bool force_split = tablet_internal_parallel_mode == TTabletInternalParallelMode::type::FORCE_SPLIT;
// The enough number of tablets shouldn't use tablet internal parallel.
if (!force_split && num_total_scan_ranges >= pipeline_dop) {
return false;
}

int64_t num_table_rows = 0;
for (const auto& tablet_scan_range : scan_ranges) {
ASSIGN_OR_RETURN(TabletSharedPtr tablet,
get_tablet(&(tablet_scan_range.scan_range.internal_scan_range)));
num_table_rows += static_cast<int64_t>(tablet->num_rows());
}

// splitted_scan_rows is restricted in the range [min_splitted_scan_rows, max_splitted_scan_rows].
*splitted_scan_rows = config::tablet_internal_parallel_max_splitted_scan_bytes / _estimated_scan_row_bytes;
*splitted_scan_rows =
std::max(config::tablet_internal_parallel_min_splitted_scan_rows,
std::min(*splitted_scan_rows, config::tablet_internal_parallel_max_splitted_scan_rows));
// scan_dop is restricted in the range [1, dop].
*scan_dop = num_table_rows / *splitted_scan_rows;
*scan_dop = std::max<int64_t>(1, std::min<int64_t>(*scan_dop, pipeline_dop));

if (force_split) {
return true;
}

bool could = *scan_dop >= pipeline_dop || *scan_dop >= config::tablet_internal_parallel_min_scan_dop;
return could;
}

FixedMorselQueue

顾名思义,FixedMorselQueue 存储是固定数据量的 Morsels,FixedMorselQueue::_morsels 都是 ScanMorsel 对象(不是 ScanMorsel 的子类),即是以 tablet 为级别,后续 TabletReader 要从存储层查询该 _morsels[i] 指向的数据时,是需要 FullScan。

当从 OlapScanOperator 从存储层获得数据后,才会调用 FixedMorselQueue::set_tablet_rowsets 函数将数据赋值给 _tablet_rowsets,进而 ScanOperator::pull 就可以从 FixedMorselQueue::try_get 函数从 _tablet_rowsets 中获得数据,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class FixedMorselQueue final : public MorselQueue {
public:
explicit FixedMorselQueue(Morsels&& morsels)
: _morsels(std::move(morsels)), _num_morsels(_morsels.size()), _pop_index(0) {}
~FixedMorselQueue() override = default;

std::vector<TInternalScanRange*> olap_scan_ranges() const override;

void set_tablet_rowsets(const std::vector<std::vector<RowsetSharedPtr>>& tablet_rowsets) override {
_tablet_rowsets = tablet_rowsets;
}

size_t num_original_morsels() const override {
return _num_morsels;
}
size_t max_degree_of_parallelism() const override {
return _num_morsels;
}
bool empty() const override {
return _unget_morsel == nullptr && _pop_index >= _num_morsels;
}
StatusOr<MorselPtr> try_get() override;

std::string name() const override {
return "fixed_morsel_queue";
}

private:
Morsels _morsels;
const size_t _num_morsels;
std::atomic<size_t> _pop_index;
std::vector<std::vector<RowsetSharedPtr>> _tablet_rowsets;
};

FixedMorselQueue::try_get

FixedMorselQueue 是有可能在多个 Pipeline 线程之间共享,即共享数据源,多个 Pipelines 同时从 FixedMorselQueue 获取数据源信息。而 FixedMorselQueue::_morsels 大小在构造函数已经确定,且 FixedMorselQueue 内部不会再细分每个 morsel(和 SplitMorselQueue 的对比),因此可以将 FixedMorselQueue 设计成 LookFree 的数据结构,因此使用 _pop_index 来指向当前已经分配的 morsel 就能满足要求。

在 FixedMorselQueue::try_get 中,通 std::move 将 _morsels[idx]、_tablet_rowsets[idx] 中的数据转移给上层,这样不会破坏 _morsels、_tablet_rowsets 结构,在多线程只读场景下就不用 mutex 保护。当 _pop_index == _num_morsels, _tablet_rowsets 和 _morsels 就是大小为 _num_morsels 的空壳。如此,也就实现了 LookFree。

源码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
StatusOr<MorselPtr> FixedMorselQueue::try_get() {
// 和 QueryCache 有关
if (_unget_morsel != nullptr) {
return std::move(_unget_morsel);
}

auto idx = _pop_index.load();
// prevent _num_morsels from superfluous addition
if (idx >= _num_morsels) {
return nullptr;
}
idx = _pop_index.fetch_add(1);
if (idx < _num_morsels) {
if (!_tablet_rowsets.empty()) {
_morsels[idx]->set_rowsets(std::move(_tablet_rowsets[idx]));
}
return std::move(_morsels[idx]);
} else {
return nullptr;
}
}

SplitMorselQueue

scan_dop 传递给 SplitMorselQueue::_degree_of_parallelism,后续用于生成 MorselQueueFactory。

1
2
3
4
5
6
7
SplitMorselQueue::SplitMorselQueue(Morsels&& morsels,
int64_t degree_of_parallelism,
int64_t splitted_scan_rows)
: _morsels(std::move(morsels)),
_num_original_morsels(_morsels.size()),
_degree_of_parallelism(degree_of_parallelism),
_splitted_scan_rows(splitted_scan_rows) {}

SplitMorselQueue 有两个子类,根据 _could_split_tablet_physically 函数来选择。只有在表类型是聚合模型(即 AGG_KEYS 或者 UNIQUE_KEYS)且数据尚未聚合完成(skip_aggr 为 false)时,该函数才会返回 false。

返回 true 时,选择 PhysicalSplitMorselQueue,否则选择 LogicalSplitMorselQueue 对象。

1
2
3
4
5
6
7
8
StatusOr<bool> OlapScanNode::_could_split_tablet_physically(const std::vector<TScanRangeParams>& scan_ranges) const {
// Keys type needn't merge or aggregate.
ASSIGN_OR_RETURN(TabletSharedPtr first_tablet, get_tablet(&(scan_ranges[0].scan_range.internal_scan_range)));
KeysType keys_type = first_tablet->tablet_schema().keys_type();
const auto skip_aggr = thrift_olap_scan_node().is_preaggregation;
return keys_type == PRIMARY_KEYS || keys_type == DUP_KEYS ||
((keys_type == UNIQUE_KEYS || keys_type == AGG_KEYS) && skip_aggr);
}

PhysicalSplitMorselQueue::try_get

StarRocks 中数据持久化后文件的基本单位是 Segment,一个 Tablet 数据组织结构是 Tablet/Rowset/Segment,每次从 Segment 中读取的行数是 _splitted_scan_rows,因此可以将 PhysicalSplitMorselQueue::try_get 分为两个部分:

  1. 检测 _cur_segment 是否为空,以及 _cur_segment 是还有剩余可读数据

    如果没有,则需要从磁盘中能读取下一个 Segment 文件,将其加载到内存中,生成 SegemntIterator 对象 _segment_range_iter,再进行下一步

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    StatusOr<MorselPtr> PhysicalSplitMorselQueue::try_get() {
    std::lock_guard<std::mutex> lock(_mutex);
    if (_tablet_idx >= _tablets.size()) {
    return nullptr;
    }

    // When it hasn't initialized any segment,
    // or _segment_idx exceeds the segments of the current rowset,
    // or current segment is empty or finished,
    // we should pick up the next segment and init it.
    while (!_has_init_any_segment
    || _cur_segment() == nullptr
    || _cur_segment()->num_rows() == 0 ||
    !_segment_range_iter.has_more()) {
    if (!_next_segment()) {
    return nullptr;
    }

    if (auto status = _init_segment(); !status.ok()) {
    // Morsel_queue cannot generate morsels after errors occurring.
    _tablet_idx = _tablets.size();
    return status;
    }
    }
    //...
    }
  2. 当仍有待读取的数据,则通过 _segment_range_iter 获取下一个批次数据(即 _splitted_scan_rows 行),如果剩余的数据不足 _splitted_scan_rows 行,则将剩余的数据合并到当前批次,(合并)生成一个 PhysicalSplitScanMorsel 对象,其中本次读取的元信息 {rowset, segment_id, token_range} 精确指示了本次要读取的数据位置,最终在读取数据时,用于初始化 OlapChunkSource::_params 参数的 rowid_range_option 字段。

    这里的 _tablet_rowsets[_tablet_idx] 就不能用 std::move 赋值给 Morsel 了,因为其他线程也要使用。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
     StatusOr<MorselPtr> PhysicalSplitMorselQueue::try_get() {
    std::lock_guard<std::mutex> lock(_mutex);
    //... above code

    vectorized::SparseRange taken_range;
    _segment_range_iter.next_range(_splitted_scan_rows, &taken_range);
    _num_segment_rest_rows -= taken_range.span_size();
    if (_num_segment_rest_rows < _splitted_scan_rows) {
    // If there are too few rows left in the segment, take them all this time.
    _segment_range_iter.next_range(_splitted_scan_rows, &taken_range);
    _num_segment_rest_rows = 0;
    }

    auto* scan_morsel = _cur_scan_morsel();
    auto* rowset = _cur_rowset();
    auto rowid_range = std::make_shared<vectorized::RowidRangeOption>(
    rowset->rowset_id(),
    rowset->segments()[_segment_idx]->id(),
    std::move(taken_range));

    MorselPtr morsel = std::make_unique<PhysicalSplitScanMorsel>(
    scan_morsel->get_plan_node_id(),
    *(scan_morsel->get_scan_range()),
    std::move(rowid_range));
    morsel->set_rowsets(_tablet_rowsets[_tablet_idx]);
    _inc_num_splits(_is_last_split_of_current_morsel());
    return morsel;
    }

_next_segment

寻找下个 segment 是比较简单的,依次按照 segment_id/rowset_id/tablet_id 递增的顺序。比如,如果当前 rowset 的 segments 读取完,则 ++rowset_id,切换到当前 tablet 的下一个 rowset,如果当前 tablet 的所有 rowsets 都读取完毕,则 ++tablet_id,切换到下一个 tablet。

当 tablet_id >= _tablets.size(),_next_segment 函数返回 false,表示所有数据都已经读取完毕。

这部分代码比较简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Rowset* PhysicalSplitMorselQueue::_cur_rowset() {
return _tablet_rowsets[_tablet_idx][_rowset_idx].get();
}

Segment* PhysicalSplitMorselQueue::_cur_segment() {
const auto& segments = _cur_rowset()->segments();
return _segment_idx >= segments.size() ? nullptr : segments[_segment_idx].get();
}

bool PhysicalSplitMorselQueue::_next_segment() {
DCHECK(_num_segment_rest_rows == 0);
if (!_has_init_any_segment) {
_has_init_any_segment = true;
} else {
// Read the next segment of the current rowset.
if (++_segment_idx >= _cur_rowset()->segments().size()) {
_segment_idx = 0;
// Read the next rowset of the current tablet.
if (++_rowset_idx >= _tablet_rowsets[_tablet_idx].size()) {
_rowset_idx = 0;
// Read the next tablet.
++_tablet_idx;
}
}
}

return _tablet_idx < _tablets.size();
}

_inti_segment

初始化 segment 要复杂点。但是也可以大致分为两个步骤:

  1. 加载元数据

    针对每个 Tablet 都有一个查询范围 **{_range_start_key, _Range_end_key}**,两边是开闭还是闭区间由 {_range_start_op, _range_end_op} 表示。比如,如果 _range_xxx_op 中包含等于操作即闭区间,否则是开区间,这两个符号定义如下:

    1
    2
    enum class RangeStartOperation { GT = 0, GE, EQ };
    enum class RangeEndOperation { LT = 0, LE, EQ };

    第一步主要是将 OlapScanRange 类型的 {_range_start_key, _range_end_key} 转化为 SeekRange 类型的 _tablet_seek_ranges,这样方便后续遍历。

    注意:这里的 _range_start_key_range_end_key 都是 std::vector 类型,可能包含多个查询范围,比如 1 < x <= 2, 10 < y < 20,而起始条件都在 _range_start_key 中,终止条件都在 _Range_end_key 中。

    顺带提下,在读过程中,数据的内存分配方式基本都是基于内存池 MemPool。这个 MemPool 只是缓存作用,实际内部分配内存的操作是由 ChunkAllocator 完成,其分配内存特点是先在线程所在 core 上分配,如果线程所在 core 上分配的内存超过了限制(默认2G)则会 cross-core 分配,这样可以减少线程跨 core 通信。

    因为数据读取的单位是 Segment,当 segment == 0 即表示需要从一个新的 Rowset 中读取数据,因此需要先调用 Rowset::load 函数初始化 Rowset::_segments,来获悉所有 semgnet 信息。

    如果,当前 rowset 没有可读的数据,则 return。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    Status PhysicalSplitMorselQueue::_init_segment() {
    // Load the meta of the new rowset and the index of the new segment。
    if (0 == _segment_idx) {
    // Read a new tablet.
    if (0 == _rowset_idx) {
    _tablet_seek_ranges.clear();
    _mempool.clear();
    RETURN_IF_ERROR(vectorized::TabletReader::parse_seek_range(
    _tablets[_tablet_idx], _range_start_op, _range_end_op,
    _range_start_key, _range_end_key,
    &_tablet_seek_ranges, &_mempool));
    }
    // Read a new rowset.
    RETURN_IF_ERROR(_cur_rowset()->load());
    }


    _num_segment_rest_rows = 0;
    _segment_scan_range.clear();

    auto* segment = _cur_segment();
    // The new rowset doesn't contain any segment.
    if (segment == nullptr || segment->num_rows() == 0) {
    return Status::OK();
    }
    //...
  2. step(1) 过后,得到可迭代的 _tablet_seek_ranges,下面就需要将其转化为在 Segment 中的所有读取范围,并合并到 _segment_scan_range 中。

    而且 _segment_scan_range 中每一个 range 的起始位置 {lower_rowid, upper_rowid} 都待读取数据的行号,因此后续读取效率会非常高。

    比如找某个 key 在 segment 的上限位置,这是由于 _upper_bound_ordinal 函数:尝试将该 key 根据表的 short_keys 编码成 index_key,再在该 segment 中利用二分搜索定位到 key 在 segment 文件中的上限位置 end

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    rowid_t PhysicalSplitMorselQueue::_upper_bound_ordinal(Segment* segment, const vectorized::SeekTuple& key, bool lower,
    rowid_t end) const {
    std::string index_key =
    key.short_key_encode(segment->num_short_keys(), lower ? KEY_MINIMAL_MARKER : KEY_MAXIMAL_MARKER);
    // 定位到 key 在 segment 中的上限
    auto end_iter = segment->upper_bound(index_key);
    if (end_iter.valid()) {
    end = end_iter.ordinal() * segment->num_rows_per_block();
    }
    return end;
    }

    所以, _upper_bound_ordinal 函数也说明了step(1) 中将 OlapScanRange 转为 SeekTuple 类型的原因,因为 SeekTuple 包含每一行的 Schema,可以对查找的 key 基于 short_keys 进行编码。最后得到的 _segment_scan_range 即表征了 curr_segment 要读取的数据范围。

    第二部分完整的代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
     Status PhysicalSplitMorselQueue::_init_segment() {
    //...above code

    // Find the rowid range of each key range in this segment.
    if (_tablet_seek_ranges.empty()) {
    _segment_scan_range.add(vectorized::Range(0, segment->num_rows()));
    } else {
    RETURN_IF_ERROR(segment->load_index());
    for (const auto& range : _tablet_seek_ranges) {
    rowid_t lower_rowid = 0;
    rowid_t upper_rowid = segment->num_rows();

    // 上限
    if (!range.upper().empty()) {
    upper_rowid = _upper_bound_ordinal(segment,
    range.upper(),
    !range.inclusive_upper(),
    segment->num_rows());
    }
    // 下限
    if (!range.lower().empty() && upper_rowid > 0) {
    lower_rowid = _lower_bound_ordinal(segment,
    range.lower(),
    range.inclusive_lower());
    }
    // 在 segment 中, 一个可读取的 range 就产生了
    if (lower_rowid <= upper_rowid) {
    _segment_scan_range.add(vectorized::Range{lower_rowid, upper_rowid});
    }
    }
    }

    _segment_range_iter = _segment_scan_range.new_iterator();
    _num_segment_rest_rows = _segment_scan_range.span_size();

    return Status::OK();
    }

MorselQueueFactory

MorselQueue 创建完后,就要准备创建 MorselQueueFactory。因为后续由 MorselQueueFactory 将 MorselQueue 分配给 PipelineDriver。

pipeline-morsel-2

顾名思义,SharedMorselQueueFactory 即所有的 PipelineDrivers 共享一个 MorselQueue,此时就要求该 MorselQueue 具有较好的并发性能,比如FixedMorselQueue,而 IndividualMorselQueueFactory 则为每个 PipelineDriver 都分配一个 MorselQueue。

通过 Factory 创建 MorselQueue 的函数如下:

1
2
3
4
5
6
7
8
9
MorselQueue* SharedMorselQueueFactory::create(int driver_sequence) override { 
return _queue.get();
}

// 每个 PipelineDrvier 都有一个 driver_sequence
MorselQueue* IndividualMorselQueueFactory::create(int driver_sequence) override {
DCHECK_LT(driver_sequence, _queue_per_driver_seq.size());
return _queue_per_driver_seq[driver_sequence].get();
}

convert_scan_range_to_morsel_queue_factory

在创建 Factory 前,需要先由 convert_scan_range_to_morsel_queue 函数获得 MorselQueue,再根据 frontend 优化器是否设置了 scan_ranges_per_driver_seq 来做一些决策。

  • 如果 scan_ranges_per_driver_seq 为空,则为全局要读取的数据范围 global_scan_ranges 创建对应的 MorselQueue。

    这种情况是会发生 LocalShuffle。

    MorselQueue::max_degree_of_parallelism() 表征同时支持的读取并行度,比如 FixedMorseQueue 的 max_degree_of_parallelism 即 _morsel_size,最多只允许 _morsel_size 一起读取。而 SplitMorselQueue 的 max_degree_of_parallelism 则是在 _could_tablet_internal_parallel 函数中计算出来的 scan_dop。

    到底是什么规模想要用 FixedMorselQueue

  • scan_ranges_per_driver_seq 不为空,是因为 BE 不需要再次进行 LocalShuffle,即每个 PipelineDriver 在 FE 的 Coordinator 中已经分配好了自己要读取的 scan_range(对应某个 tablet),则直接使用 convert_scan_range_to_morsel_queue 函数每个 PipelineDriver 生成 MorselQueue,并添加到 pipeline::IndividualMorselQueueFactory 对象中暂存。

    这种情况,一般是 HashAggreagte、ColocationJOIN、BucketShuffleJOIN 时才有可能发生。

这部分代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
StatusOr<pipeline::MorselQueueFactoryPtr> ScanNode::convert_scan_range_to_morsel_queue_factory(
const std::vector<TScanRangeParams>& global_scan_ranges,
const std::map<int32_t, std::vector<TScanRangeParams>>& scan_ranges_per_driver_seq,
int node_id, int pipeline_dop,
bool enable_tablet_internal_parallel,
TTabletInternalParallelMode::type tablet_internal_parallel_mode) {
if (scan_ranges_per_driver_seq.empty()) {
ASSIGN_OR_RETURN(auto morsel_queue,
convert_scan_range_to_morsel_queue(
global_scan_ranges, node_id, pipeline_dop,
enable_tablet_internal_parallel,
tablet_internal_parallel_mode,
global_scan_ranges.size()));
int scan_dop = std::min<int>(std::max<int>(1,
morsel_queue->max_degree_of_parallelism()),
pipeline_dop);
int io_parallelism = scan_dop * io_tasks_per_scan_operator();

// If not so much morsels, try to assign morsel uniformly among operators to avoid data skew
if (scan_dop > 1 && dynamic_cast<pipeline::FixedMorselQueue*>(morsel_queue.get()) &&
morsel_queue->num_original_morsels() <= io_parallelism) {
auto morsel_queue_map = uniform_distribute_morsels(std::move(morsel_queue), scan_dop);
return std::make_unique<pipeline::IndividualMorselQueueFactory>(
std::move(morsel_queue_map), /*could_local_shuffle*/ true);
} else {
return std::make_unique<pipeline::SharedMorselQueueFactory>(
std::move(morsel_queue), scan_dop);
}
} else {
size_t num_total_scan_ranges = 0;
for (const auto& [_, scan_ranges] : scan_ranges_per_driver_seq) {
num_total_scan_ranges += scan_ranges.size();
}

std::map<int, pipeline::MorselQueuePtr> queue_per_driver_seq;
for (const auto& [dop, scan_ranges] : scan_ranges_per_driver_seq) {
ASSIGN_OR_RETURN(auto queue,
convert_scan_range_to_morsel_queue(
scan_ranges, node_id, pipeline_dop,
enable_tablet_internal_parallel,
tablet_internal_parallel_mode,
num_total_scan_ranges));
queue_per_driver_seq.emplace(dop, std::move(queue));
}

return std::make_unique<pipeline::IndividualMorselQueueFactory>(
std::move(queue_per_driver_seq), /*could_local_shuffle*/ false);
}
}

下一部分从 SourceOperator 角度看怎么获取 Morsels 去存储层获取数据。