risingwave_stream/executor/lookup.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16
17mod cache;
18mod sides;
19use self::cache::LookupCache;
20use self::sides::*;
21mod impl_;
22
23pub use impl_::LookupExecutorParams;
24
25use crate::executor::prelude::*;
26
27#[cfg(test)]
28mod tests;
29
30/// `LookupExecutor` takes one input stream and one arrangement. It joins the input stream with the
31/// arrangement. Currently, it only supports inner join. See `LookupExecutorParams` for more
32/// information.
33///
34/// The output schema is `| stream columns | arrangement columns |`.
35/// The input is required to be first stream and then arrangement.
36pub struct LookupExecutor<S: StateStore> {
37 ctx: ActorContextRef,
38
39 /// the data types of the produced data chunk inside lookup (before reordering)
40 chunk_data_types: Vec<DataType>,
41
42 /// The join side of the arrangement
43 arrangement: ArrangeJoinSide<S>,
44
45 /// The join side of the stream
46 stream: StreamJoinSide,
47
48 /// The executor for arrangement.
49 arrangement_executor: Option<Executor>,
50
51 /// The executor for stream.
52 stream_executor: Option<Executor>,
53
54 /// The last received barrier.
55 last_barrier: Option<Barrier>,
56
57 /// Information of column reordering
58 column_mapping: Vec<usize>,
59
60 /// When we receive a row from the stream side, we will first convert it to join key, and then
61 /// map it to arrange side. For example, if we receive `[a, b, c]` from the stream side, where:
62 /// `stream_join_key = [1, 2]`, `arrange_join_key = [3, 2]` with order rules [2 ascending, 3
63 /// ascending].
64 ///
65 /// * We will first extract join key `[b, c]`,
66 /// * then map it to the order of arrangement join key `[c, b]`.
67 ///
68 /// This vector records such mapping.
69 key_indices_mapping: Vec<usize>,
70
71 /// The cache for arrangement side.
72 lookup_cache: LookupCache,
73 /// The maximum size of the chunk produced by executor at a time.
74 chunk_size: usize,
75}
76
77#[async_trait]
78impl<S: StateStore> Execute for LookupExecutor<S> {
79 fn execute(self: Box<Self>) -> BoxedMessageStream {
80 self.execute_inner().boxed()
81 }
82}