risingwave_stream/executor/lookup.rs
1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use risingwave_storage::row_serde::value_serde::ValueRowSerde;
17
18mod cache;
19mod sides;
20use self::cache::LookupCache;
21use self::sides::*;
22mod impl_;
23
24pub use impl_::LookupExecutorParams;
25
26use crate::executor::prelude::*;
27
28#[cfg(test)]
29mod tests;
30
31/// `LookupExecutor` takes one input stream and one arrangement. It joins the input stream with the
32/// arrangement. Currently, it only supports inner join. See `LookupExecutorParams` for more
33/// information.
34///
35/// The output schema is `| stream columns | arrangement columns |`.
36/// The input is required to be first stream and then arrangement.
37pub struct LookupExecutor<S: StateStore, SD: ValueRowSerde> {
38 ctx: ActorContextRef,
39
40 /// the data types of the produced data chunk inside lookup (before reordering)
41 chunk_data_types: Vec<DataType>,
42
43 /// The join side of the arrangement
44 arrangement: ArrangeJoinSide<S, SD>,
45
46 /// The join side of the stream
47 stream: StreamJoinSide,
48
49 /// The executor for arrangement.
50 arrangement_executor: Option<Executor>,
51
52 /// The executor for stream.
53 stream_executor: Option<Executor>,
54
55 /// Information of column reordering
56 column_mapping: Vec<usize>,
57
58 /// When we receive a row from the stream side, we will first convert it to join key, and then
59 /// map it to arrange side. For example, if we receive `[a, b, c]` from the stream side, where:
60 /// `stream_join_key = [1, 2]`, `arrange_join_key = [3, 2]` with order rules [2 ascending, 3
61 /// ascending].
62 ///
63 /// * We will first extract join key `[b, c]`,
64 /// * then map it to the order of arrangement join key `[c, b]`.
65 ///
66 /// This vector records such mapping.
67 key_indices_mapping: Vec<usize>,
68
69 /// The cache for arrangement side.
70 lookup_cache: LookupCache,
71 /// The maximum size of the chunk produced by executor at a time.
72 chunk_size: usize,
73}
74
75#[async_trait]
76impl<S: StateStore, SD: ValueRowSerde> Execute for LookupExecutor<S, SD> {
77 fn execute(self: Box<Self>) -> BoxedMessageStream {
78 self.execute_inner().boxed()
79 }
80}