risingwave_stream/from_proto/
lookup.rs1use std::sync::Arc;
16
17use risingwave_common::catalog::{ColumnDesc, ColumnId};
18use risingwave_common::util::sort_util::ColumnOrder;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::plan_common::StorageTableDesc;
22use risingwave_pb::stream_plan::LookupNode;
23
24use super::*;
25use crate::common::table::state_table::{StateTableBuilder, StateTableOpConsistencyLevel};
26use crate::executor::{LookupExecutor, LookupExecutorParams};
27
28pub struct LookupExecutorBuilder;
29
30impl_stream_node_body!(Lookup(LookupNode) => LookupExecutorBuilder);
31
32impl ExecutorBuilder for LookupExecutorBuilder {
33 type Node = LookupNode;
34
35 async fn new_boxed_executor(
36 params: ExecutorParams,
37 node: &Self::Node,
38 store: impl StateStore,
39 ) -> StreamResult<Executor> {
40 let lookup = node;
41
42 let [stream, arrangement]: [_; 2] = params.input.try_into().unwrap();
43
44 let arrangement_order_rules = lookup
45 .get_arrangement_table_info()?
46 .arrange_key_orders
47 .iter()
48 .map(ColumnOrder::from_protobuf)
49 .collect();
50
51 let arrangement_col_descs = lookup
52 .get_arrangement_table_info()?
53 .column_descs
54 .iter()
55 .map(ColumnDesc::from)
56 .collect();
57
58 let table_desc: &StorageTableDesc = lookup
59 .get_arrangement_table_info()?
60 .table_desc
61 .as_ref()
62 .unwrap();
63
64 let column_ids = lookup
65 .get_arrangement_table_info()?
66 .get_output_col_idx()
67 .iter()
68 .map(|&idx| ColumnId::new(table_desc.columns[idx as usize].column_id))
69 .collect_vec();
70
71 let versioned = table_desc.versioned;
72 let vnodes = params.vnode_bitmap.clone().map(Arc::new);
73
74 macro_rules! build_lookup {
75 ($SD:ident) => {{
76 let state_table =
77 StateTableBuilder::<_, $SD, true, _>::new_from_storage_table_desc(
78 table_desc,
79 store.clone(),
80 vnodes.clone(),
81 params.fragment_id,
82 )
83 .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
84 .with_output_column_ids(column_ids.clone())
85 .forbid_preload_all_rows()
86 .build()
87 .await;
88
89 let exec = LookupExecutor::new(LookupExecutorParams {
90 ctx: params.actor_context,
91 info: params.info.clone(),
92 arrangement,
93 stream,
94 arrangement_col_descs,
95 arrangement_order_rules,
96 use_current_epoch: lookup.use_current_epoch,
97 stream_join_key_indices: lookup
98 .stream_key
99 .iter()
100 .map(|x| *x as usize)
101 .collect(),
102 arrange_join_key_indices: lookup
103 .arrange_key
104 .iter()
105 .map(|x| *x as usize)
106 .collect(),
107 column_mapping: lookup.column_mapping.iter().map(|x| *x as usize).collect(),
108 state_table,
109 watermark_epoch: params.watermark_epoch,
110 chunk_size: params.config.developer.chunk_size,
111 });
112 Ok((params.info, exec).into())
113 }};
114 }
115
116 if versioned {
117 build_lookup!(ColumnAwareSerde)
118 } else {
119 build_lookup!(BasicSerde)
120 }
121 }
122}