risingwave_stream/executor/
lookup.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use risingwave_storage::row_serde::value_serde::ValueRowSerde;
17
18mod cache;
19mod sides;
20use self::cache::LookupCache;
21use self::sides::*;
22mod impl_;
23
24pub use impl_::LookupExecutorParams;
25
26use crate::executor::prelude::*;
27
28#[cfg(test)]
29mod tests;
30
31/// `LookupExecutor` takes one input stream and one arrangement. It joins the input stream with the
32/// arrangement. Currently, it only supports inner join. See `LookupExecutorParams` for more
33/// information.
34///
35/// The output schema is `| stream columns | arrangement columns |`.
36/// The input is required to be first stream and then arrangement.
37pub struct LookupExecutor<S: StateStore, SD: ValueRowSerde> {
38    ctx: ActorContextRef,
39
40    /// the data types of the produced data chunk inside lookup (before reordering)
41    chunk_data_types: Vec<DataType>,
42
43    /// The join side of the arrangement
44    arrangement: ArrangeJoinSide<S, SD>,
45
46    /// The join side of the stream
47    stream: StreamJoinSide,
48
49    /// The executor for arrangement.
50    arrangement_executor: Option<Executor>,
51
52    /// The executor for stream.
53    stream_executor: Option<Executor>,
54
55    /// Information of column reordering
56    column_mapping: Vec<usize>,
57
58    /// When we receive a row from the stream side, we will first convert it to join key, and then
59    /// map it to arrange side. For example, if we receive `[a, b, c]` from the stream side, where:
60    /// `stream_join_key = [1, 2]`, `arrange_join_key = [3, 2]` with order rules [2 ascending, 3
61    /// ascending].
62    ///
63    /// * We will first extract join key `[b, c]`,
64    /// * then map it to the order of arrangement join key `[c, b]`.
65    ///
66    /// This vector records such mapping.
67    key_indices_mapping: Vec<usize>,
68
69    /// The cache for arrangement side.
70    lookup_cache: LookupCache,
71    /// The maximum size of the chunk produced by executor at a time.
72    chunk_size: usize,
73}
74
75#[async_trait]
76impl<S: StateStore, SD: ValueRowSerde> Execute for LookupExecutor<S, SD> {
77    fn execute(self: Box<Self>) -> BoxedMessageStream {
78        self.execute_inner().boxed()
79    }
80}