risingwave_stream/executor/lookup/
impl_.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_common::catalog::ColumnDesc;
17use risingwave_common::row::RowExt;
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_common::util::iter_util::ZipEqDebug;
20use risingwave_common::util::sort_util::ColumnOrder;
21use risingwave_common_estimate_size::collections::EstimatedVec;
22use risingwave_hummock_sdk::HummockReadEpoch;
23use risingwave_storage::store::PrefetchOptions;
24use risingwave_storage::table::TableIter;
25use risingwave_storage::table::batch_table::BatchTable;
26
27use super::sides::{stream_lookup_arrange_prev_epoch, stream_lookup_arrange_this_epoch};
28use crate::cache::cache_may_stale;
29use crate::common::metrics::MetricsInfo;
30use crate::executor::join::builder::JoinStreamChunkBuilder;
31use crate::executor::lookup::LookupExecutor;
32use crate::executor::lookup::cache::LookupCache;
33use crate::executor::lookup::sides::{ArrangeJoinSide, ArrangeMessage, StreamJoinSide};
34use crate::executor::monitor::LookupExecutorMetrics;
35use crate::executor::prelude::*;
36
37/// Parameters for [`LookupExecutor`].
38pub struct LookupExecutorParams<S: StateStore> {
39    pub ctx: ActorContextRef,
40    pub info: ExecutorInfo,
41
42    /// The side for arrangement. Currently, it should be a
43    /// `MaterializeExecutor`.
44    pub arrangement: Executor,
45
46    /// The side for stream. It can be any stream, but it will generally be a
47    /// `MaterializeExecutor`.
48    pub stream: Executor,
49
50    /// Should be the same as [`ColumnDesc`] in the arrangement.
51    ///
52    /// From the perspective of arrangements, `arrangement_col_descs` include all columns of the
53    /// `MaterializeExecutor`. For example, if we already have a table with 3 columns: `a, b,
54    /// _row_id`, and we create an arrangement with join key `a` on it. `arrangement_col_descs`
55    /// should contain all 3 columns.
56    pub arrangement_col_descs: Vec<ColumnDesc>,
57
58    /// Should only contain [`ColumnOrder`] for arrange in the arrangement.
59    ///
60    /// Still using the above `a, b, _row_id` example. If we create an arrangement with join key
61    /// `a`, there will be 3 elements in `arrangement_col_descs`, and only 1 element in
62    /// `arrangement_order_rules`.
63    ///
64    /// * The only element is the order rule for `a`, which is the join key. Join keys should
65    ///   always come first.
66    ///
67    /// For the MV pk, they will only be contained in `arrangement_col_descs`, without being part
68    /// of this `arrangement_order_rules`.
69    pub arrangement_order_rules: Vec<ColumnOrder>,
70
71    /// By default, the output of [`LookupExecutor`] is `stream columns + arrangement columns`.
72    /// The executor will do a reorder of columns before producing output, so that data can be
73    /// consistent among A lookup B and B lookup A.
74    pub column_mapping: Vec<usize>,
75
76    /// Whether to use the current epoch of the arrangement to lookup.
77    ///
78    /// [`LookupExecutor`] will act differently on whether `use_current_epoch` is set to true. In a
79    /// nutshell, lookup in the previous epoch (this option set to false) will be more efficient.
80    /// Therefore, the optimizer should choose an optimal order for lookup executors.
81    ///
82    /// See [`stream_lookup_arrange_this_epoch`] and [`stream_lookup_arrange_prev_epoch`] for more
83    /// information.
84    pub use_current_epoch: bool,
85
86    /// The join keys on the stream side.
87    pub stream_join_key_indices: Vec<usize>,
88
89    /// The join keys on the arrangement side.
90    pub arrange_join_key_indices: Vec<usize>,
91
92    pub batch_table: BatchTable<S>,
93
94    pub watermark_epoch: AtomicU64Ref,
95
96    pub chunk_size: usize,
97}
98
99impl<S: StateStore> LookupExecutor<S> {
100    pub fn new(params: LookupExecutorParams<S>) -> Self {
101        let LookupExecutorParams {
102            ctx,
103            info,
104            arrangement,
105            stream,
106            arrangement_col_descs,
107            arrangement_order_rules,
108            use_current_epoch,
109            stream_join_key_indices,
110            arrange_join_key_indices,
111            column_mapping,
112            batch_table: storage_table,
113            watermark_epoch,
114            chunk_size,
115        } = params;
116
117        let output_column_length = stream.schema().len() + arrangement.schema().len();
118
119        // internal output schema: | stream | arrange |
120        // will be rewritten using `column_mapping`.
121
122        let schema_fields = stream
123            .schema()
124            .fields
125            .iter()
126            .chain(arrangement.schema().fields.iter())
127            .cloned()
128            .collect_vec();
129
130        assert_eq!(schema_fields.len(), output_column_length);
131
132        let schema = Schema::new(schema_fields);
133
134        let chunk_data_types = schema.data_types();
135        let arrangement_data_types = arrangement.schema().data_types();
136        let stream_data_types = stream.schema().data_types();
137
138        let arrangement_pk_indices = arrangement.pk_indices().to_vec();
139        let stream_pk_indices = stream.pk_indices().to_vec();
140
141        // check if arrange join key is exactly the same as order rules
142        {
143            let mut arrange_join_key_indices = arrange_join_key_indices.clone();
144            arrange_join_key_indices.sort_unstable();
145            let mut arrangement_order_types_indices = arrangement_order_rules
146                .iter()
147                .map(|x| x.column_index)
148                .collect_vec();
149            arrangement_order_types_indices.sort_unstable();
150            assert_eq!(
151                arrange_join_key_indices,
152                &arrangement_order_types_indices[0..arrange_join_key_indices.len()],
153                "invalid join key: arrange_join_key_indices = {:?}, order_rules: {:?}",
154                arrange_join_key_indices,
155                arrangement_order_rules
156            );
157        }
158
159        // check whether join keys are of the same length.
160        assert_eq!(
161            stream_join_key_indices.len(),
162            arrange_join_key_indices.len()
163        );
164
165        // resolve mapping from join keys in stream row -> joins keys for arrangement.
166        let key_indices_mapping = arrangement_order_rules
167            .iter()
168            .map(|x| x.column_index) // the required column idx in this position
169            .filter_map(|x| arrange_join_key_indices.iter().position(|y| *y == x)) // the position of the item in join keys
170            .map(|x| stream_join_key_indices[x]) // the actual column idx in stream
171            .collect_vec();
172
173        // check the inferred schema is really the same as the output schema of the lookup executor.
174        assert_eq!(
175            info.schema
176                .fields
177                .iter()
178                .map(|x| x.data_type())
179                .collect_vec(),
180            column_mapping
181                .iter()
182                .map(|x| schema.fields[*x].data_type())
183                .collect_vec(),
184            "mismatched output schema"
185        );
186
187        let metrics_info = MetricsInfo::new(
188            ctx.streaming_metrics.clone(),
189            storage_table.table_id().table_id(),
190            ctx.id,
191            "Lookup",
192        );
193
194        Self {
195            ctx,
196            chunk_data_types,
197            last_barrier: None,
198            stream_executor: Some(stream),
199            arrangement_executor: Some(arrangement),
200            stream: StreamJoinSide {
201                key_indices: stream_join_key_indices,
202                pk_indices: stream_pk_indices,
203                col_types: stream_data_types,
204            },
205            arrangement: ArrangeJoinSide {
206                pk_indices: arrangement_pk_indices,
207                col_types: arrangement_data_types,
208                col_descs: arrangement_col_descs,
209                order_rules: arrangement_order_rules,
210                key_indices: arrange_join_key_indices,
211                use_current_epoch,
212                batch_table: storage_table,
213            },
214            column_mapping,
215            key_indices_mapping,
216            lookup_cache: LookupCache::new(watermark_epoch, metrics_info),
217            chunk_size,
218        }
219    }
220
221    /// Try produce one stream message from [`LookupExecutor`]. If there's no message to produce, it
222    /// will return `None`, and the `next` function of [`LookupExecutor`] will continuously polling
223    /// messages until there's one.
224    ///
225    /// If we can use `async_stream` to write this part, things could be easier.
226    #[try_stream(ok = Message, error = StreamExecutorError)]
227    pub async fn execute_inner(mut self: Box<Self>) {
228        let input = if self.arrangement.use_current_epoch {
229            stream_lookup_arrange_this_epoch(
230                self.stream_executor.take().unwrap(),
231                self.arrangement_executor.take().unwrap(),
232            )
233            .boxed()
234        } else {
235            stream_lookup_arrange_prev_epoch(
236                self.stream_executor.take().unwrap(),
237                self.arrangement_executor.take().unwrap(),
238            )
239            .boxed()
240        };
241
242        let metrics = self.ctx.streaming_metrics.new_lookup_executor_metrics(
243            self.arrangement.batch_table.table_id(),
244            self.ctx.id,
245            self.ctx.fragment_id,
246        );
247
248        let (stream_to_output, arrange_to_output) = JoinStreamChunkBuilder::get_i2o_mapping(
249            &self.column_mapping,
250            self.stream.col_types.len(),
251            self.arrangement.col_types.len(),
252        );
253
254        let reorder_chunk_data_types = self
255            .column_mapping
256            .iter()
257            .map(|x| self.chunk_data_types[*x].clone())
258            .collect_vec();
259
260        #[for_await]
261        for msg in input {
262            let msg = msg?;
263            self.lookup_cache.evict();
264            match msg {
265                ArrangeMessage::Barrier(barrier) => {
266                    self.process_barrier(&barrier);
267
268                    if self.arrangement.use_current_epoch {
269                        // When lookup this epoch, stream side barrier always come after arrangement
270                        // ready, so we can forward barrier now.
271                        yield Message::Barrier(barrier);
272                    }
273                }
274                ArrangeMessage::ArrangeReady(arrangement_chunks, barrier) => {
275                    // The arrangement is ready, and we will receive a bunch of stream messages for
276                    // the next poll.
277                    // TODO: apply chunk as soon as we receive them, instead of batching.
278
279                    for chunk in arrangement_chunks {
280                        self.lookup_cache
281                            .apply_batch(chunk, &self.arrangement.key_indices)
282                    }
283
284                    if !self.arrangement.use_current_epoch {
285                        // When look prev epoch, arrange ready will always come after stream
286                        // barrier. So we yield barrier now.
287                        yield Message::Barrier(barrier);
288                    }
289                }
290                ArrangeMessage::Stream(chunk) => {
291                    let chunk = chunk.compact();
292                    let (chunk, ops) = chunk.into_parts();
293
294                    let mut builder = JoinStreamChunkBuilder::new(
295                        self.chunk_size,
296                        reorder_chunk_data_types.clone(),
297                        stream_to_output.clone(),
298                        arrange_to_output.clone(),
299                    );
300
301                    for (op, row) in ops.iter().zip_eq_debug(chunk.rows()) {
302                        for matched_row in self
303                            .lookup_one_row(
304                                &row,
305                                self.last_barrier.as_ref().unwrap().epoch,
306                                &metrics,
307                            )
308                            .await?
309                        {
310                            tracing::debug!(target: "events::stream::lookup::put", "{:?} {:?}", row, matched_row);
311
312                            if let Some(chunk) = builder.append_row(*op, row, &matched_row) {
313                                yield Message::Chunk(chunk);
314                            }
315                        }
316                        // TODO: support outer join (return null if no rows are matched)
317                    }
318
319                    if let Some(chunk) = builder.take() {
320                        yield Message::Chunk(chunk);
321                    }
322                }
323            }
324        }
325    }
326
327    /// Process the barrier and apply changes if necessary.
328    fn process_barrier(&mut self, barrier: &Barrier) {
329        if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) {
330            let previous_vnode_bitmap = self
331                .arrangement
332                .batch_table
333                .update_vnode_bitmap(vnode_bitmap.clone());
334
335            // Manipulate the cache if necessary.
336            if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) {
337                self.lookup_cache.clear();
338            }
339        }
340
341        // Use the new stream barrier epoch as new cache epoch
342        self.last_barrier = Some(barrier.clone());
343    }
344
345    /// Lookup all rows corresponding to a join key in shared buffer.
346    async fn lookup_one_row(
347        &mut self,
348        stream_row: &RowRef<'_>,
349        epoch_pair: EpochPair,
350        metrics: &LookupExecutorMetrics,
351    ) -> StreamExecutorResult<Vec<OwnedRow>> {
352        // stream_row is the row from stream side, we need to transform into the correct order of
353        // the arrangement side.
354        let lookup_row = stream_row
355            .project(&self.key_indices_mapping)
356            .into_owned_row();
357
358        metrics.lookup_total_query_cache_count.inc();
359        if let Some(result) = self.lookup_cache.lookup(&lookup_row) {
360            return Ok(result.iter().cloned().collect_vec());
361        }
362
363        // cache miss
364        metrics.lookup_cache_miss_count.inc();
365
366        tracing::debug!(target: "events::stream::lookup::lookup_row", "{:?}", lookup_row);
367
368        let mut all_rows = EstimatedVec::new();
369        // Drop the stream.
370        {
371            let all_data_iter = match self.arrangement.use_current_epoch {
372                true => {
373                    self.arrangement
374                        .batch_table
375                        .batch_iter_with_pk_bounds(
376                            HummockReadEpoch::NoWait(epoch_pair.curr),
377                            &lookup_row,
378                            ..,
379                            false,
380                            PrefetchOptions::default(),
381                        )
382                        .await?
383                }
384                false => {
385                    self.arrangement
386                        .batch_table
387                        .batch_iter_with_pk_bounds(
388                            HummockReadEpoch::NoWait(epoch_pair.prev),
389                            &lookup_row,
390                            ..,
391                            false,
392                            PrefetchOptions::default(),
393                        )
394                        .await?
395                }
396            };
397
398            pin_mut!(all_data_iter);
399            while let Some(row) = all_data_iter.next_row().await? {
400                // Only need value (include storage pk).
401                all_rows.push(row);
402            }
403        }
404
405        tracing::debug!(target: "events::stream::lookup::result", "{:?} => {:?}", lookup_row, all_rows.inner());
406
407        self.lookup_cache.batch_update(lookup_row, all_rows.clone());
408
409        metrics
410            .lookup_cached_entry_count
411            .set(self.lookup_cache.len() as i64);
412
413        Ok(all_rows.into_inner())
414    }
415}