risingwave_stream/from_proto/
agg_common.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use risingwave_common::bitmap::Bitmap;
19use risingwave_common::util::sort_util::ColumnOrder;
20
21use super::*;
22use crate::common::StateTableColumnMapping;
23use crate::common::table::state_table::StateTable;
24use crate::executor::aggregate::AggStateStorage;
25
26pub async fn build_agg_state_storages_from_proto<S: StateStore>(
29 agg_call_states: &[risingwave_pb::stream_plan::AggCallState],
30 store: S,
31 vnodes: Option<Arc<Bitmap>>,
32) -> Vec<AggStateStorage<S>> {
33 use risingwave_pb::stream_plan::agg_call_state;
34
35 let mut result = vec![];
36 for agg_call_state in agg_call_states {
37 let agg_state_store = match agg_call_state.get_inner().unwrap() {
38 agg_call_state::Inner::ValueState(..) => AggStateStorage::Value,
39 agg_call_state::Inner::MaterializedInputState(state) => {
40 let table = StateTable::from_table_catalog(
41 state.get_table().unwrap(),
42 store.clone(),
43 vnodes.clone(),
44 )
45 .await;
46 let mapping = StateTableColumnMapping::new(
47 state
48 .get_included_upstream_indices()
49 .iter()
50 .map(|idx| *idx as usize)
51 .collect(),
52 Some(
53 state
54 .get_table_value_indices()
55 .iter()
56 .map(|idx| *idx as usize)
57 .collect(),
58 ),
59 );
60 let order_columns = state
61 .order_columns
62 .iter()
63 .map(ColumnOrder::from_protobuf)
64 .collect();
65 AggStateStorage::MaterializedInput {
66 table,
67 mapping,
68 order_columns,
69 }
70 }
71 };
72
73 result.push(agg_state_store)
74 }
75
76 result
77}
78
79pub async fn build_distinct_dedup_table_from_proto<S: StateStore>(
80 dedup_tables: &HashMap<u32, risingwave_pb::catalog::Table>,
81 store: S,
82 vnodes: Option<Arc<Bitmap>>,
83) -> HashMap<usize, StateTable<S>> {
84 if dedup_tables.is_empty() {
85 return HashMap::new();
86 }
87 futures::future::join_all(dedup_tables.iter().map(|(distinct_col, table_pb)| async {
88 let table = StateTable::from_table_catalog(table_pb, store.clone(), vnodes.clone()).await;
89 (*distinct_col as usize, table)
90 }))
91 .await
92 .into_iter()
93 .collect()
94}