Skip to main content

risingwave_stream/from_proto/
lookup.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::catalog::{ColumnDesc, ColumnId};
18use risingwave_common::util::sort_util::ColumnOrder;
19use risingwave_common::util::value_encoding::BasicSerde;
20use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
21use risingwave_pb::plan_common::StorageTableDesc;
22use risingwave_pb::stream_plan::LookupNode;
23
24use super::*;
25use crate::common::table::state_table::{StateTableBuilder, StateTableOpConsistencyLevel};
26use crate::executor::{LookupExecutor, LookupExecutorParams};
27
28pub struct LookupExecutorBuilder;
29
30impl_stream_node_body!(Lookup(LookupNode) => LookupExecutorBuilder);
31
32impl ExecutorBuilder for LookupExecutorBuilder {
33    type Node = LookupNode;
34
35    async fn new_boxed_executor(
36        params: ExecutorParams,
37        node: &Self::Node,
38        store: impl StateStore,
39    ) -> StreamResult<Executor> {
40        let lookup = node;
41
42        let [stream, arrangement]: [_; 2] = params.input.try_into().unwrap();
43
44        let arrangement_order_rules = lookup
45            .get_arrangement_table_info()?
46            .arrange_key_orders
47            .iter()
48            .map(ColumnOrder::from_protobuf)
49            .collect();
50
51        let arrangement_col_descs = lookup
52            .get_arrangement_table_info()?
53            .column_descs
54            .iter()
55            .map(ColumnDesc::from)
56            .collect();
57
58        let table_desc: &StorageTableDesc = lookup
59            .get_arrangement_table_info()?
60            .table_desc
61            .as_ref()
62            .unwrap();
63
64        let column_ids = lookup
65            .get_arrangement_table_info()?
66            .get_output_col_idx()
67            .iter()
68            .map(|&idx| ColumnId::new(table_desc.columns[idx as usize].column_id))
69            .collect_vec();
70
71        let versioned = table_desc.versioned;
72        let vnodes = params.vnode_bitmap.clone().map(Arc::new);
73
74        macro_rules! build_lookup {
75            ($SD:ident) => {{
76                let state_table =
77                    StateTableBuilder::<_, $SD, true, _>::new_from_storage_table_desc(
78                        table_desc,
79                        store.clone(),
80                        vnodes.clone(),
81                        params.fragment_id,
82                    )
83                    .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
84                    .with_output_column_ids(column_ids.clone())
85                    .forbid_preload_all_rows()
86                    .build()
87                    .await;
88
89                let exec = LookupExecutor::new(LookupExecutorParams {
90                    ctx: params.actor_context,
91                    info: params.info.clone(),
92                    arrangement,
93                    stream,
94                    arrangement_col_descs,
95                    arrangement_order_rules,
96                    use_current_epoch: lookup.use_current_epoch,
97                    stream_join_key_indices: lookup
98                        .stream_key
99                        .iter()
100                        .map(|x| *x as usize)
101                        .collect(),
102                    arrange_join_key_indices: lookup
103                        .arrange_key
104                        .iter()
105                        .map(|x| *x as usize)
106                        .collect(),
107                    column_mapping: lookup.column_mapping.iter().map(|x| *x as usize).collect(),
108                    state_table,
109                    watermark_epoch: params.watermark_epoch,
110                    chunk_size: params.config.developer.chunk_size,
111                });
112                Ok((params.info, exec).into())
113            }};
114        }
115
116        if versioned {
117            build_lookup!(ColumnAwareSerde)
118        } else {
119            build_lookup!(BasicSerde)
120        }
121    }
122}