risingwave_stream/from_proto/
lookup.rs1use risingwave_common::catalog::{ColumnDesc, ColumnId};
16use risingwave_common::util::sort_util::ColumnOrder;
17use risingwave_pb::plan_common::StorageTableDesc;
18use risingwave_pb::stream_plan::LookupNode;
19use risingwave_storage::table::batch_table::BatchTable;
20
21use super::*;
22use crate::executor::{LookupExecutor, LookupExecutorParams};
23
24pub struct LookupExecutorBuilder;
25
26impl ExecutorBuilder for LookupExecutorBuilder {
27 type Node = LookupNode;
28
29 async fn new_boxed_executor(
30 params: ExecutorParams,
31 node: &Self::Node,
32 store: impl StateStore,
33 ) -> StreamResult<Executor> {
34 let lookup = node;
35
36 let [stream, arrangement]: [_; 2] = params.input.try_into().unwrap();
37
38 let arrangement_order_rules = lookup
39 .get_arrangement_table_info()?
40 .arrange_key_orders
41 .iter()
42 .map(ColumnOrder::from_protobuf)
43 .collect();
44
45 let arrangement_col_descs = lookup
46 .get_arrangement_table_info()?
47 .column_descs
48 .iter()
49 .map(ColumnDesc::from)
50 .collect();
51
52 let table_desc: &StorageTableDesc = lookup
53 .get_arrangement_table_info()?
54 .table_desc
55 .as_ref()
56 .unwrap();
57
58 let column_ids = lookup
59 .get_arrangement_table_info()?
60 .get_output_col_idx()
61 .iter()
62 .map(|&idx| ColumnId::new(table_desc.columns[idx as usize].column_id))
63 .collect_vec();
64
65 let storage_table = BatchTable::new_partial(
66 store,
67 column_ids,
68 params.vnode_bitmap.map(Into::into),
69 table_desc,
70 );
71
72 let exec = LookupExecutor::new(LookupExecutorParams {
73 ctx: params.actor_context,
74 info: params.info.clone(),
75 arrangement,
76 stream,
77 arrangement_col_descs,
78 arrangement_order_rules,
79 use_current_epoch: lookup.use_current_epoch,
80 stream_join_key_indices: lookup.stream_key.iter().map(|x| *x as usize).collect(),
81 arrange_join_key_indices: lookup.arrange_key.iter().map(|x| *x as usize).collect(),
82 column_mapping: lookup.column_mapping.iter().map(|x| *x as usize).collect(),
83 batch_table: storage_table,
84 watermark_epoch: params.watermark_epoch,
85 chunk_size: params.env.config().developer.chunk_size,
86 });
87 Ok((params.info, exec).into())
88 }
89}