risingwave_stream/from_proto/
agg_common.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use risingwave_common::bitmap::Bitmap;
19use risingwave_common::util::sort_util::ColumnOrder;
20
21use super::*;
22use crate::common::StateTableColumnMapping;
23use crate::common::table::state_table::StateTable;
24use crate::executor::aggregate::AggStateStorage;
25
26/// Parse from stream proto plan agg call states, generate state tables and column mappings.
27/// The `vnodes` is generally `Some` for Hash Agg and `None` for Simple Agg.
28pub async fn build_agg_state_storages_from_proto<S: StateStore>(
29    agg_call_states: &[risingwave_pb::stream_plan::AggCallState],
30    store: S,
31    vnodes: Option<Arc<Bitmap>>,
32) -> Vec<AggStateStorage<S>> {
33    use risingwave_pb::stream_plan::agg_call_state;
34
35    let mut result = vec![];
36    for agg_call_state in agg_call_states {
37        let agg_state_store = match agg_call_state.get_inner().unwrap() {
38            agg_call_state::Inner::ValueState(..) => AggStateStorage::Value,
39            agg_call_state::Inner::MaterializedInputState(state) => {
40                let table = StateTable::from_table_catalog(
41                    state.get_table().unwrap(),
42                    store.clone(),
43                    vnodes.clone(),
44                )
45                .await;
46                let mapping = StateTableColumnMapping::new(
47                    state
48                        .get_included_upstream_indices()
49                        .iter()
50                        .map(|idx| *idx as usize)
51                        .collect(),
52                    Some(
53                        state
54                            .get_table_value_indices()
55                            .iter()
56                            .map(|idx| *idx as usize)
57                            .collect(),
58                    ),
59                );
60                let order_columns = state
61                    .order_columns
62                    .iter()
63                    .map(ColumnOrder::from_protobuf)
64                    .collect();
65                AggStateStorage::MaterializedInput {
66                    table,
67                    mapping,
68                    order_columns,
69                }
70            }
71        };
72
73        result.push(agg_state_store)
74    }
75
76    result
77}
78
79pub async fn build_distinct_dedup_table_from_proto<S: StateStore>(
80    dedup_tables: &HashMap<u32, risingwave_pb::catalog::Table>,
81    store: S,
82    vnodes: Option<Arc<Bitmap>>,
83) -> HashMap<usize, StateTable<S>> {
84    if dedup_tables.is_empty() {
85        return HashMap::new();
86    }
87    futures::future::join_all(dedup_tables.iter().map(|(distinct_col, table_pb)| async {
88        let table = StateTable::from_table_catalog(table_pb, store.clone(), vnodes.clone()).await;
89        (*distinct_col as usize, table)
90    }))
91    .await
92    .into_iter()
93    .collect()
94}