risingwave_stream/executor/
lookup.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16
17mod cache;
18mod sides;
19use self::cache::LookupCache;
20use self::sides::*;
21mod impl_;
22
23pub use impl_::LookupExecutorParams;
24
25use crate::executor::prelude::*;
26
27#[cfg(test)]
28mod tests;
29
30/// `LookupExecutor` takes one input stream and one arrangement. It joins the input stream with the
31/// arrangement. Currently, it only supports inner join. See `LookupExecutorParams` for more
32/// information.
33///
34/// The output schema is `| stream columns | arrangement columns |`.
35/// The input is required to be first stream and then arrangement.
36pub struct LookupExecutor<S: StateStore> {
37    ctx: ActorContextRef,
38
39    /// the data types of the produced data chunk inside lookup (before reordering)
40    chunk_data_types: Vec<DataType>,
41
42    /// The join side of the arrangement
43    arrangement: ArrangeJoinSide<S>,
44
45    /// The join side of the stream
46    stream: StreamJoinSide,
47
48    /// The executor for arrangement.
49    arrangement_executor: Option<Executor>,
50
51    /// The executor for stream.
52    stream_executor: Option<Executor>,
53
54    /// The last received barrier.
55    last_barrier: Option<Barrier>,
56
57    /// Information of column reordering
58    column_mapping: Vec<usize>,
59
60    /// When we receive a row from the stream side, we will first convert it to join key, and then
61    /// map it to arrange side. For example, if we receive `[a, b, c]` from the stream side, where:
62    /// `stream_join_key = [1, 2]`, `arrange_join_key = [3, 2]` with order rules [2 ascending, 3
63    /// ascending].
64    ///
65    /// * We will first extract join key `[b, c]`,
66    /// * then map it to the order of arrangement join key `[c, b]`.
67    ///
68    /// This vector records such mapping.
69    key_indices_mapping: Vec<usize>,
70
71    /// The cache for arrangement side.
72    lookup_cache: LookupCache,
73    /// The maximum size of the chunk produced by executor at a time.
74    chunk_size: usize,
75}
76
77#[async_trait]
78impl<S: StateStore> Execute for LookupExecutor<S> {
79    fn execute(self: Box<Self>) -> BoxedMessageStream {
80        self.execute_inner().boxed()
81    }
82}