risingwave_stream/from_proto/
agg_common.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use risingwave_common::bitmap::Bitmap;
use risingwave_common::util::sort_util::ColumnOrder;

use super::*;
use crate::common::table::state_table::StateTable;
use crate::common::StateTableColumnMapping;
use crate::executor::aggregation::AggStateStorage;

/// Parse from stream proto plan agg call states, generate state tables and column mappings.
/// The `vnodes` is generally `Some` for Hash Agg and `None` for Simple Agg.
pub async fn build_agg_state_storages_from_proto<S: StateStore>(
    agg_call_states: &[risingwave_pb::stream_plan::AggCallState],
    store: S,
    vnodes: Option<Arc<Bitmap>>,
) -> Vec<AggStateStorage<S>> {
    use risingwave_pb::stream_plan::agg_call_state;

    let mut result = vec![];
    for agg_call_state in agg_call_states {
        let agg_state_store = match agg_call_state.get_inner().unwrap() {
            agg_call_state::Inner::ValueState(..) => AggStateStorage::Value,
            agg_call_state::Inner::MaterializedInputState(state) => {
                let table = StateTable::from_table_catalog(
                    state.get_table().unwrap(),
                    store.clone(),
                    vnodes.clone(),
                )
                .await;
                let mapping = StateTableColumnMapping::new(
                    state
                        .get_included_upstream_indices()
                        .iter()
                        .map(|idx| *idx as usize)
                        .collect(),
                    Some(
                        state
                            .get_table_value_indices()
                            .iter()
                            .map(|idx| *idx as usize)
                            .collect(),
                    ),
                );
                let order_columns = state
                    .order_columns
                    .iter()
                    .map(ColumnOrder::from_protobuf)
                    .collect();
                AggStateStorage::MaterializedInput {
                    table,
                    mapping,
                    order_columns,
                }
            }
        };

        result.push(agg_state_store)
    }

    result
}

pub async fn build_distinct_dedup_table_from_proto<S: StateStore>(
    dedup_tables: &HashMap<u32, risingwave_pb::catalog::Table>,
    store: S,
    vnodes: Option<Arc<Bitmap>>,
) -> HashMap<usize, StateTable<S>> {
    if dedup_tables.is_empty() {
        return HashMap::new();
    }
    futures::future::join_all(dedup_tables.iter().map(|(distinct_col, table_pb)| async {
        let table = StateTable::from_table_catalog(table_pb, store.clone(), vnodes.clone()).await;
        (*distinct_col as usize, table)
    }))
    .await
    .into_iter()
    .collect()
}