risingwave_stream/from_proto/
agg_common.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use risingwave_common::bitmap::Bitmap;
19use risingwave_common::config::StreamingConfig;
20use risingwave_common::util::sort_util::ColumnOrder;
21
22use super::*;
23use crate::common::StateTableColumnMapping;
24use crate::common::table::state_table::{StateTable, StateTableBuilder};
25use crate::executor::aggregate::AggStateStorage;
26
27/// Parse from stream proto plan agg call states, generate state tables and column mappings.
28/// The `vnodes` is generally `Some` for Hash Agg and `None` for Simple Agg.
29pub async fn build_agg_state_storages_from_proto<S: StateStore>(
30    agg_call_states: &[risingwave_pb::stream_plan::AggCallState],
31    store: S,
32    vnodes: Option<Arc<Bitmap>>,
33    streaming_config: &StreamingConfig,
34) -> Vec<AggStateStorage<S>> {
35    use risingwave_pb::stream_plan::agg_call_state;
36
37    let mut result = vec![];
38    for agg_call_state in agg_call_states {
39        let agg_state_store = match agg_call_state.get_inner().unwrap() {
40            agg_call_state::Inner::ValueState(..) => AggStateStorage::Value,
41            agg_call_state::Inner::MaterializedInputState(state) => {
42                let table = StateTableBuilder::new(
43                    state.get_table().unwrap(),
44                    store.clone(),
45                    vnodes.clone(),
46                )
47                .enable_preload_all_rows_by_config(streaming_config)
48                .build()
49                .await;
50                let mapping = StateTableColumnMapping::new(
51                    state
52                        .get_included_upstream_indices()
53                        .iter()
54                        .map(|idx| *idx as usize)
55                        .collect(),
56                    Some(
57                        state
58                            .get_table_value_indices()
59                            .iter()
60                            .map(|idx| *idx as usize)
61                            .collect(),
62                    ),
63                );
64                let order_columns = state
65                    .order_columns
66                    .iter()
67                    .map(ColumnOrder::from_protobuf)
68                    .collect();
69                AggStateStorage::MaterializedInput {
70                    table,
71                    mapping,
72                    order_columns,
73                }
74            }
75        };
76
77        result.push(agg_state_store)
78    }
79
80    result
81}
82
83pub async fn build_distinct_dedup_table_from_proto<S: StateStore>(
84    dedup_tables: &HashMap<u32, risingwave_pb::catalog::Table>,
85    store: S,
86    vnodes: Option<Arc<Bitmap>>,
87    streaming_config: &StreamingConfig,
88) -> HashMap<usize, StateTable<S>> {
89    if dedup_tables.is_empty() {
90        return HashMap::new();
91    }
92    futures::future::join_all(dedup_tables.iter().map(|(distinct_col, table_pb)| async {
93        let table = StateTableBuilder::new(table_pb, store.clone(), vnodes.clone())
94            .enable_preload_all_rows_by_config(streaming_config)
95            .build()
96            .await;
97        (*distinct_col as usize, table)
98    }))
99    .await
100    .into_iter()
101    .collect()
102}