risingwave_stream/from_proto/
agg_common.rsuse std::collections::HashMap;
use std::sync::Arc;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::util::sort_util::ColumnOrder;
use super::*;
use crate::common::table::state_table::StateTable;
use crate::common::StateTableColumnMapping;
use crate::executor::aggregation::AggStateStorage;
pub async fn build_agg_state_storages_from_proto<S: StateStore>(
agg_call_states: &[risingwave_pb::stream_plan::AggCallState],
store: S,
vnodes: Option<Arc<Bitmap>>,
) -> Vec<AggStateStorage<S>> {
use risingwave_pb::stream_plan::agg_call_state;
let mut result = vec![];
for agg_call_state in agg_call_states {
let agg_state_store = match agg_call_state.get_inner().unwrap() {
agg_call_state::Inner::ValueState(..) => AggStateStorage::Value,
agg_call_state::Inner::MaterializedInputState(state) => {
let table = StateTable::from_table_catalog(
state.get_table().unwrap(),
store.clone(),
vnodes.clone(),
)
.await;
let mapping = StateTableColumnMapping::new(
state
.get_included_upstream_indices()
.iter()
.map(|idx| *idx as usize)
.collect(),
Some(
state
.get_table_value_indices()
.iter()
.map(|idx| *idx as usize)
.collect(),
),
);
let order_columns = state
.order_columns
.iter()
.map(ColumnOrder::from_protobuf)
.collect();
AggStateStorage::MaterializedInput {
table,
mapping,
order_columns,
}
}
};
result.push(agg_state_store)
}
result
}
pub async fn build_distinct_dedup_table_from_proto<S: StateStore>(
dedup_tables: &HashMap<u32, risingwave_pb::catalog::Table>,
store: S,
vnodes: Option<Arc<Bitmap>>,
) -> HashMap<usize, StateTable<S>> {
if dedup_tables.is_empty() {
return HashMap::new();
}
futures::future::join_all(dedup_tables.iter().map(|(distinct_col, table_pb)| async {
let table = StateTable::from_table_catalog(table_pb, store.clone(), vnodes.clone()).await;
(*distinct_col as usize, table)
}))
.await
.into_iter()
.collect()
}