risingwave_stream/from_proto/
locality_provider.rs1use std::sync::Arc;
16
17use risingwave_pb::stream_plan::LocalityProviderNode;
18use risingwave_storage::StateStore;
19
20use super::*;
21use crate::common::table::state_table::StateTableBuilder;
22use crate::executor::Executor;
23use crate::executor::locality_provider::LocalityProviderExecutor;
24
25impl ExecutorBuilder for LocalityProviderBuilder {
26 type Node = LocalityProviderNode;
27
28 async fn new_boxed_executor(
29 params: ExecutorParams,
30 node: &Self::Node,
31 store: impl StateStore,
32 ) -> StreamResult<Executor> {
33 let [input]: [_; 1] = params.input.try_into().unwrap();
34
35 let locality_columns = node
36 .locality_columns
37 .iter()
38 .map(|&i| i as usize)
39 .collect::<Vec<_>>();
40
41 let input_schema = input.schema().clone();
42
43 let vnodes = Some(Arc::new(
44 params
45 .vnode_bitmap
46 .expect("vnodes not set for locality provider"),
47 ));
48
49 let state_table = StateTableBuilder::new(
51 node.get_state_table().unwrap(),
52 store.clone(),
53 vnodes.clone(),
54 )
55 .enable_preload_all_rows_by_config(¶ms.actor_context.streaming_config)
56 .build()
57 .await;
58
59 let progress_table =
61 StateTableBuilder::new(node.get_progress_table().unwrap(), store, vnodes)
62 .enable_preload_all_rows_by_config(¶ms.actor_context.streaming_config)
63 .build()
64 .await;
65
66 let progress = params
67 .local_barrier_manager
68 .register_create_mview_progress(params.actor_context.id);
69
70 let exec = LocalityProviderExecutor::new(
71 input,
72 locality_columns,
73 state_table,
74 progress_table,
75 input_schema,
76 progress,
77 params.executor_stats.clone(),
78 params.env.config().developer.chunk_size,
79 params.actor_context.fragment_id,
80 );
81
82 Ok((params.info, exec).into())
83 }
84}
85
86pub struct LocalityProviderBuilder;