risingwave_stream/from_proto/
locality_provider.rs1use std::sync::Arc;
16
17use risingwave_pb::stream_plan::LocalityProviderNode;
18use risingwave_storage::StateStore;
19
20use super::*;
21use crate::common::table::state_table::StateTableBuilder;
22use crate::executor::Executor;
23use crate::executor::locality_provider::LocalityProviderExecutor;
24
25impl_stream_node_body!(LocalityProvider(LocalityProviderNode) => LocalityProviderBuilder);
26
27impl ExecutorBuilder for LocalityProviderBuilder {
28 type Node = LocalityProviderNode;
29
30 async fn new_boxed_executor(
31 params: ExecutorParams,
32 node: &Self::Node,
33 store: impl StateStore,
34 ) -> StreamResult<Executor> {
35 let [input]: [_; 1] = params.input.try_into().unwrap();
36
37 let locality_columns = node
38 .locality_columns
39 .iter()
40 .map(|&i| i as usize)
41 .collect::<Vec<_>>();
42
43 let input_schema = input.schema().clone();
44
45 let vnodes = Some(Arc::new(
46 params
47 .vnode_bitmap
48 .expect("vnodes not set for locality provider"),
49 ));
50
51 let state_table = StateTableBuilder::new(
53 node.get_state_table().unwrap(),
54 store.clone(),
55 vnodes.clone(),
56 )
57 .enable_preload_all_rows_by_config(¶ms.config)
58 .build()
59 .await;
60
61 let progress_table =
63 StateTableBuilder::new(node.get_progress_table().unwrap(), store, vnodes)
64 .enable_preload_all_rows_by_config(¶ms.config)
65 .build()
66 .await;
67
68 let progress = params
69 .local_barrier_manager
70 .register_create_mview_progress(¶ms.actor_context);
71
72 let exec = LocalityProviderExecutor::new(
73 input,
74 locality_columns,
75 state_table,
76 progress_table,
77 input_schema,
78 progress,
79 params.executor_stats.clone(),
80 params.config.developer.chunk_size,
81 params.actor_context.fragment_id,
82 );
83
84 Ok((params.info, exec).into())
85 }
86}
87
88pub struct LocalityProviderBuilder;