risingwave_stream/executor/
lookup.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;

mod cache;
mod sides;
use self::cache::LookupCache;
use self::sides::*;
mod impl_;

pub use impl_::LookupExecutorParams;

use crate::executor::prelude::*;

#[cfg(test)]
mod tests;

/// `LookupExecutor` takes one input stream and one arrangement. It joins the input stream with the
/// arrangement. Currently, it only supports inner join. See `LookupExecutorParams` for more
/// information.
///
/// The output schema is `| stream columns | arrangement columns |`.
/// The input is required to be first stream and then arrangement.
pub struct LookupExecutor<S: StateStore> {
    ctx: ActorContextRef,

    /// the data types of the produced data chunk inside lookup (before reordering)
    chunk_data_types: Vec<DataType>,

    /// The join side of the arrangement
    arrangement: ArrangeJoinSide<S>,

    /// The join side of the stream
    stream: StreamJoinSide,

    /// The executor for arrangement.
    arrangement_executor: Option<Executor>,

    /// The executor for stream.
    stream_executor: Option<Executor>,

    /// The last received barrier.
    last_barrier: Option<Barrier>,

    /// Information of column reordering
    column_mapping: Vec<usize>,

    /// When we receive a row from the stream side, we will first convert it to join key, and then
    /// map it to arrange side. For example, if we receive `[a, b, c]` from the stream side, where:
    /// `stream_join_key = [1, 2]`, `arrange_join_key = [3, 2]` with order rules [2 ascending, 3
    /// ascending].
    ///
    /// * We will first extract join key `[b, c]`,
    /// * then map it to the order of arrangement join key `[c, b]`.
    ///
    /// This vector records such mapping.
    key_indices_mapping: Vec<usize>,

    /// The cache for arrangement side.
    lookup_cache: LookupCache,
    /// The maximum size of the chunk produced by executor at a time.
    chunk_size: usize,
}

#[async_trait]
impl<S: StateStore> Execute for LookupExecutor<S> {
    fn execute(self: Box<Self>) -> BoxedMessageStream {
        self.execute_inner().boxed()
    }
}