Skip to main content

risingwave_stream/executor/lookup/
impl_.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16
17use futures::TryStreamExt;
18use itertools::Itertools;
19use risingwave_common::catalog::ColumnDesc;
20use risingwave_common::row::RowExt;
21use risingwave_common::util::iter_util::ZipEqDebug;
22use risingwave_common::util::sort_util::ColumnOrder;
23use risingwave_common_estimate_size::collections::EstimatedVec;
24use risingwave_storage::row_serde::value_serde::ValueRowSerde;
25use risingwave_storage::store::PrefetchOptions;
26
27use super::sides::{stream_lookup_arrange_prev_epoch, stream_lookup_arrange_this_epoch};
28use crate::common::metrics::MetricsInfo;
29use crate::common::table::state_table::ReplicatedStateTable;
30use crate::executor::join::builder::JoinStreamChunkBuilder;
31use crate::executor::lookup::LookupExecutor;
32use crate::executor::lookup::cache::LookupCache;
33use crate::executor::lookup::sides::{ArrangeJoinSide, ArrangeMessage, StreamJoinSide};
34use crate::executor::monitor::LookupExecutorMetrics;
35use crate::executor::prelude::*;
36
37/// Parameters for [`LookupExecutor`].
38pub struct LookupExecutorParams<S: StateStore, SD: ValueRowSerde> {
39    pub ctx: ActorContextRef,
40    pub info: ExecutorInfo,
41
42    /// The side for arrangement. Currently, it should be a
43    /// `MaterializeExecutor`.
44    pub arrangement: Executor,
45
46    /// The side for stream. It can be any stream, but it will generally be a
47    /// `MaterializeExecutor`.
48    pub stream: Executor,
49
50    /// Should be the same as [`ColumnDesc`] in the arrangement.
51    ///
52    /// From the perspective of arrangements, `arrangement_col_descs` include all columns of the
53    /// `MaterializeExecutor`. For example, if we already have a table with 3 columns: `a, b,
54    /// _row_id`, and we create an arrangement with join key `a` on it. `arrangement_col_descs`
55    /// should contain all 3 columns.
56    pub arrangement_col_descs: Vec<ColumnDesc>,
57
58    /// Should only contain [`ColumnOrder`] for arrange in the arrangement.
59    ///
60    /// Still using the above `a, b, _row_id` example. If we create an arrangement with join key
61    /// `a`, there will be 3 elements in `arrangement_col_descs`, and only 1 element in
62    /// `arrangement_order_rules`.
63    ///
64    /// * The only element is the order rule for `a`, which is the join key. Join keys should
65    ///   always come first.
66    ///
67    /// For the MV pk, they will only be contained in `arrangement_col_descs`, without being part
68    /// of this `arrangement_order_rules`.
69    pub arrangement_order_rules: Vec<ColumnOrder>,
70
71    /// By default, the output of [`LookupExecutor`] is `stream columns + arrangement columns`.
72    /// The executor will do a reorder of columns before producing output, so that data can be
73    /// consistent among A lookup B and B lookup A.
74    pub column_mapping: Vec<usize>,
75
76    /// Whether to use the current epoch of the arrangement to lookup.
77    ///
78    /// [`LookupExecutor`] will act differently on whether `use_current_epoch` is set to true. In a
79    /// nutshell, lookup in the previous epoch (this option set to false) will be more efficient.
80    /// Therefore, the optimizer should choose an optimal order for lookup executors.
81    ///
82    /// See [`stream_lookup_arrange_this_epoch`] and [`stream_lookup_arrange_prev_epoch`] for more
83    /// information.
84    pub use_current_epoch: bool,
85
86    /// The join keys on the stream side.
87    pub stream_join_key_indices: Vec<usize>,
88
89    /// The join keys on the arrangement side.
90    pub arrange_join_key_indices: Vec<usize>,
91
92    pub state_table: ReplicatedStateTable<S, SD>,
93
94    pub watermark_epoch: AtomicU64Ref,
95
96    pub chunk_size: usize,
97}
98
99impl<S: StateStore, SD: ValueRowSerde> LookupExecutor<S, SD> {
100    pub fn new(params: LookupExecutorParams<S, SD>) -> Self {
101        let LookupExecutorParams {
102            ctx,
103            info,
104            arrangement,
105            stream,
106            arrangement_col_descs,
107            arrangement_order_rules,
108            use_current_epoch,
109            stream_join_key_indices,
110            arrange_join_key_indices,
111            column_mapping,
112            state_table: storage_table,
113            watermark_epoch,
114            chunk_size,
115        } = params;
116
117        let output_column_length = stream.schema().len() + arrangement.schema().len();
118
119        // internal output schema: | stream | arrange |
120        // will be rewritten using `column_mapping`.
121
122        let schema_fields = stream
123            .schema()
124            .fields
125            .iter()
126            .chain(arrangement.schema().fields.iter())
127            .cloned()
128            .collect_vec();
129
130        assert_eq!(schema_fields.len(), output_column_length);
131
132        let schema = Schema::new(schema_fields);
133
134        let chunk_data_types = schema.data_types();
135        let arrangement_data_types = arrangement.schema().data_types();
136        let stream_data_types = stream.schema().data_types();
137
138        let arrangement_pk_indices = arrangement.stream_key().to_vec();
139        let stream_pk_indices = stream.stream_key().to_vec();
140
141        // check if arrange join key is exactly the same as order rules
142        {
143            let mut arrange_join_key_indices = arrange_join_key_indices.clone();
144            arrange_join_key_indices.sort_unstable();
145            let mut arrangement_order_types_indices = arrangement_order_rules
146                .iter()
147                .map(|x| x.column_index)
148                .collect_vec();
149            arrangement_order_types_indices.sort_unstable();
150            assert_eq!(
151                arrange_join_key_indices,
152                &arrangement_order_types_indices[0..arrange_join_key_indices.len()],
153                "invalid join key: arrange_join_key_indices = {:?}, order_rules: {:?}",
154                arrange_join_key_indices,
155                arrangement_order_rules
156            );
157        }
158
159        // check whether join keys are of the same length.
160        assert_eq!(
161            stream_join_key_indices.len(),
162            arrange_join_key_indices.len()
163        );
164
165        // resolve mapping from join keys in stream row -> joins keys for arrangement.
166        let key_indices_mapping = arrangement_order_rules
167            .iter()
168            .map(|x| x.column_index) // the required column idx in this position
169            .filter_map(|x| arrange_join_key_indices.iter().position(|y| *y == x)) // the position of the item in join keys
170            .map(|x| stream_join_key_indices[x]) // the actual column idx in stream
171            .collect_vec();
172
173        // check the inferred schema is really the same as the output schema of the lookup executor.
174        assert_eq!(
175            info.schema
176                .fields
177                .iter()
178                .map(|x| x.data_type())
179                .collect_vec(),
180            column_mapping
181                .iter()
182                .map(|x| schema.fields[*x].data_type())
183                .collect_vec(),
184            "mismatched output schema"
185        );
186
187        let metrics_info = MetricsInfo::new(
188            ctx.streaming_metrics.clone(),
189            storage_table.table_id(),
190            ctx.id,
191            "Lookup",
192        );
193
194        Self {
195            ctx,
196            chunk_data_types,
197            stream_executor: Some(stream),
198            arrangement_executor: Some(arrangement),
199            stream: StreamJoinSide {
200                key_indices: stream_join_key_indices,
201                pk_indices: stream_pk_indices,
202                col_types: stream_data_types,
203            },
204            arrangement: ArrangeJoinSide {
205                pk_indices: arrangement_pk_indices,
206                col_types: arrangement_data_types,
207                col_descs: arrangement_col_descs,
208                order_rules: arrangement_order_rules,
209                key_indices: arrange_join_key_indices,
210                use_current_epoch,
211                state_table: storage_table,
212            },
213            column_mapping,
214            key_indices_mapping,
215            lookup_cache: LookupCache::new(watermark_epoch, metrics_info),
216            chunk_size,
217        }
218    }
219
220    /// Try produce one stream message from [`LookupExecutor`]. If there's no message to produce, it
221    /// will return `None`, and the `next` function of [`LookupExecutor`] will continuously polling
222    /// messages until there's one.
223    ///
224    /// If we can use `async_stream` to write this part, things could be easier.
225    #[try_stream(ok = Message, error = StreamExecutorError)]
226    pub async fn execute_inner(mut self: Box<Self>) {
227        let mut stream_input = self.stream_executor.take().unwrap().execute();
228        let mut arrangement_input = self.arrangement_executor.take().unwrap().execute();
229
230        let first_barrier = expect_first_barrier(&mut stream_input).await?;
231        let arrangement_first_barrier = expect_first_barrier(&mut arrangement_input).await?;
232        if first_barrier.epoch != arrangement_first_barrier.epoch {
233            return Err(StreamExecutorError::align_barrier(
234                first_barrier.clone(),
235                arrangement_first_barrier,
236            ));
237        }
238
239        let input = if self.arrangement.use_current_epoch {
240            stream_lookup_arrange_this_epoch(stream_input, arrangement_input).boxed()
241        } else {
242            stream_lookup_arrange_prev_epoch(stream_input, arrangement_input).boxed()
243        };
244
245        let metrics = self.ctx.streaming_metrics.new_lookup_executor_metrics(
246            self.arrangement.state_table.table_id(),
247            self.ctx.id,
248            self.ctx.fragment_id,
249        );
250
251        let (stream_to_output, arrange_to_output) = JoinStreamChunkBuilder::get_i2o_mapping(
252            &self.column_mapping,
253            self.stream.col_types.len(),
254            self.arrangement.col_types.len(),
255        );
256
257        let reorder_chunk_data_types = self
258            .column_mapping
259            .iter()
260            .map(|x| self.chunk_data_types[*x].clone())
261            .collect_vec();
262
263        // Yield the barrier before init_epoch: init_epoch calls wait_for_epoch(epoch.prev)
264        // which blocks until the previous epoch is committed, and that requires all actors
265        // to have already forwarded this barrier downstream.
266        yield Message::Barrier(first_barrier.clone());
267        self.arrangement
268            .state_table
269            .init_epoch(first_barrier.epoch)
270            .await?;
271
272        #[for_await]
273        for msg in input {
274            let msg = msg?;
275            self.lookup_cache.evict();
276            match msg {
277                ArrangeMessage::Barrier(barrier) => {
278                    barrier.assume_no_update_vnode_bitmap(self.ctx.id)?;
279                    self.arrangement
280                        .state_table
281                        .commit_assert_no_update_vnode_bitmap(barrier.epoch)
282                        .await?;
283
284                    if self.arrangement.use_current_epoch {
285                        // When lookup this epoch, stream side barrier always come after arrangement
286                        // ready, so we can forward barrier now.
287                        yield Message::Barrier(barrier.clone());
288                    }
289                }
290                ArrangeMessage::ArrangeReady(arrangement_chunks, barrier) => {
291                    // The arrangement is ready, and we will receive a bunch of stream messages for
292                    // the next poll.
293                    // TODO: apply chunk as soon as we receive them, instead of batching.
294                    for chunk in &arrangement_chunks {
295                        self.arrangement.state_table.write_chunk(chunk.clone());
296                    }
297
298                    for chunk in arrangement_chunks {
299                        self.lookup_cache
300                            .apply_batch(chunk, &self.arrangement.key_indices)
301                    }
302
303                    if !self.arrangement.use_current_epoch {
304                        // When look prev epoch, arrange ready will always come after stream
305                        // barrier. So we yield barrier now.
306                        yield Message::Barrier(barrier);
307                    }
308                }
309                ArrangeMessage::Stream(chunk) => {
310                    let chunk = chunk.compact_vis();
311                    let (chunk, ops) = chunk.into_parts();
312
313                    let mut builder = JoinStreamChunkBuilder::new(
314                        self.chunk_size,
315                        reorder_chunk_data_types.clone(),
316                        stream_to_output.clone(),
317                        arrange_to_output.clone(),
318                    );
319
320                    for (op, row) in ops.iter().zip_eq_debug(chunk.rows()) {
321                        for matched_row in self.lookup_one_row(&row, &metrics).await? {
322                            tracing::debug!(target: "events::stream::lookup::put", "{:?} {:?}", row, matched_row);
323
324                            if let Some(chunk) = builder.append_row(*op, row, &matched_row) {
325                                yield Message::Chunk(chunk);
326                            }
327                        }
328                        // TODO: support outer join (return null if no rows are matched)
329                    }
330
331                    if let Some(chunk) = builder.take() {
332                        yield Message::Chunk(chunk);
333                    }
334                }
335            }
336        }
337    }
338
339    /// Lookup all rows corresponding to a join key in the replicated state table.
340    async fn lookup_one_row(
341        &mut self,
342        stream_row: &RowRef<'_>,
343        metrics: &LookupExecutorMetrics,
344    ) -> StreamExecutorResult<Vec<OwnedRow>> {
345        // stream_row is the row from stream side, we need to transform into the correct order of
346        // the arrangement side.
347        let lookup_row = stream_row
348            .project(&self.key_indices_mapping)
349            .into_owned_row();
350
351        metrics.lookup_total_query_cache_count.inc();
352        if let Some(result) = self.lookup_cache.lookup(&lookup_row) {
353            return Ok(result.iter().cloned().collect_vec());
354        }
355
356        // cache miss
357        metrics.lookup_cache_miss_count.inc();
358
359        tracing::debug!(target: "events::stream::lookup::lookup_row", "{:?}", lookup_row);
360
361        let mut all_rows = EstimatedVec::new();
362        {
363            let all_data_iter = self
364                .arrangement
365                .state_table
366                .iter_with_prefix(
367                    &lookup_row,
368                    &(Bound::<OwnedRow>::Unbounded, Bound::<OwnedRow>::Unbounded),
369                    PrefetchOptions::default(),
370                )
371                .await?;
372
373            pin_mut!(all_data_iter);
374            while let Some(row) = all_data_iter.try_next().await? {
375                all_rows.push(row);
376            }
377        }
378
379        tracing::debug!(target: "events::stream::lookup::result", "{:?} => {:?}", lookup_row, all_rows.inner());
380
381        self.lookup_cache.batch_update(lookup_row, all_rows.clone());
382
383        metrics
384            .lookup_cached_entry_count
385            .set(self.lookup_cache.len() as i64);
386
387        Ok(all_rows.into_inner())
388    }
389}