risingwave_stream/executor/mview/
cache.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::ops::Deref as _;
17
18use bytes::Bytes;
19use futures::{StreamExt as _, stream};
20use itertools::Itertools as _;
21use risingwave_common::array::{Op, StreamChunk};
22use risingwave_common::catalog::{ConflictBehavior, checked_conflict_behaviors};
23use risingwave_common::row::{CompactedRow, OwnedRow, Row as _};
24use risingwave_common::types::ScalarImpl;
25use risingwave_common::util::iter_util::ZipEqFast as _;
26use risingwave_common::util::sort_util::{OrderType, cmp_datum};
27use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer as _};
28use risingwave_storage::StateStore;
29use risingwave_storage::row_serde::value_serde::ValueRowSerde;
30
31use crate::cache::ManagedLruCache;
32use crate::common::metrics::MetricsInfo;
33use crate::common::table::state_table::StateTableInner;
34use crate::executor::StreamExecutorResult;
35use crate::executor::monitor::MaterializeCacheMetrics;
36use crate::task::AtomicU64Ref;
37
38/// A cache for materialize executors.
39pub struct MaterializeCache {
40    lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
41    row_serde: BasicSerde,
42    version_column_indices: Vec<u32>,
43    conflict_behavior: ConflictBehavior,
44    toastable_column_indices: Option<Vec<usize>>,
45    metrics: MaterializeCacheMetrics,
46}
47
48type CacheValue = Option<CompactedRow>;
49type ChangeBuffer = crate::common::change_buffer::ChangeBuffer<Vec<u8>, OwnedRow>;
50
51/// Check whether the given row is below the committed watermark. If so, we may want to treat
52/// them as non-existent. This extra step is necessary for TTL-ed tables because:
53///
54/// - the storage may clean expired rows asynchronously
55/// - an expired row may still be in the cache while we expect a consistent view
56fn row_below_committed_watermark<S: StateStore, SD: ValueRowSerde>(
57    row_serde: &BasicSerde,
58    table: &StateTableInner<S, SD>,
59    row: &CompactedRow,
60) -> StreamExecutorResult<bool> {
61    let Some(clean_watermark_index) = table.clean_watermark_index else {
62        return Ok(false);
63    };
64    let Some(committed_watermark) = table.get_committed_watermark() else {
65        return Ok(false);
66    };
67    let deserialized = row_serde.deserializer.deserialize(row.row.clone())?;
68    Ok(cmp_datum(
69        deserialized.datum_at(clean_watermark_index),
70        Some(committed_watermark.as_scalar_ref_impl()),
71        OrderType::ascending(),
72    ) == std::cmp::Ordering::Less)
73}
74
75impl MaterializeCache {
76    /// Create a new `MaterializeCache`.
77    ///
78    /// Returns `None` if the conflict behavior is `NoCheck`.
79    pub fn new(
80        watermark_sequence: AtomicU64Ref,
81        metrics_info: MetricsInfo,
82        row_serde: BasicSerde,
83        version_column_indices: Vec<u32>,
84        conflict_behavior: ConflictBehavior,
85        toastable_column_indices: Option<Vec<usize>>,
86        materialize_cache_metrics: MaterializeCacheMetrics,
87    ) -> Option<Self> {
88        match conflict_behavior {
89            checked_conflict_behaviors!() => {}
90            ConflictBehavior::NoCheck => {
91                assert!(
92                    toastable_column_indices.is_none(),
93                    "when there are toastable columns, conflict handling must be enabled"
94                );
95                return None;
96            }
97        }
98
99        let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
100            ManagedLruCache::unbounded(watermark_sequence, metrics_info);
101        Some(Self {
102            lru_cache,
103            row_serde,
104            version_column_indices,
105            conflict_behavior,
106            toastable_column_indices,
107            metrics: materialize_cache_metrics,
108        })
109    }
110
111    /// First populate the cache from `table`, and then calculate a [`ChangeBuffer`].
112    /// `table` will not be written in this method.
113    pub async fn handle_new<S: StateStore, SD: ValueRowSerde>(
114        &mut self,
115        chunk: StreamChunk,
116        table: &StateTableInner<S, SD>,
117    ) -> StreamExecutorResult<ChangeBuffer> {
118        if table.value_indices().is_some() {
119            // TODO(st1page): when materialize partial columns(), we should
120            // construct some columns in the pk
121            unimplemented!("materialize cache cannot handle conflicts with partial table values");
122        };
123
124        let (data_chunk, ops) = chunk.clone().into_parts();
125        let values = data_chunk.serialize();
126
127        let key_chunk = data_chunk.project(table.pk_indices());
128
129        let pks = {
130            let mut pks = vec![vec![]; data_chunk.capacity()];
131            key_chunk
132                .rows_with_holes()
133                .zip_eq_fast(pks.iter_mut())
134                .for_each(|(r, vnode_and_pk)| {
135                    if let Some(r) = r {
136                        table.pk_serde().serialize(r, vnode_and_pk);
137                    }
138                });
139            pks
140        };
141        let (_, vis) = key_chunk.into_parts();
142        let row_ops = ops
143            .iter()
144            .zip_eq_fast(pks.into_iter())
145            .zip_eq_fast(values.into_iter())
146            .zip_eq_fast(vis.iter())
147            .filter_map(|(((op, k), v), vis)| vis.then_some((*op, k, v)))
148            .collect_vec();
149
150        self.handle_inner(row_ops, table).await
151    }
152
153    async fn handle_inner<S: StateStore, SD: ValueRowSerde>(
154        &mut self,
155        row_ops: Vec<(Op, Vec<u8>, Bytes)>,
156        table: &StateTableInner<S, SD>,
157    ) -> StreamExecutorResult<ChangeBuffer> {
158        let key_set: HashSet<Box<[u8]>> = row_ops
159            .iter()
160            .map(|(_, k, _)| k.as_slice().into())
161            .collect();
162
163        // Populate the LRU cache with the keys in input chunk.
164        // For new keys, row values are set to None.
165        self.fetch_keys(key_set.iter().map(|v| v.deref()), table)
166            .await?;
167
168        let mut change_buffer = ChangeBuffer::new();
169        let row_serde = self.row_serde.clone();
170        let version_column_indices = self.version_column_indices.clone();
171        for (op, key, row) in row_ops {
172            // Use a macro instead of method to workaround partial borrow.
173            macro_rules! get_expected {
174                () => {
175                    self.lru_cache.get(&key).unwrap_or_else(|| {
176                        panic!(
177                            "the key {:?} has not been fetched in the materialize executor's cache",
178                            key
179                        )
180                    })
181                };
182            }
183
184            match op {
185                Op::Insert | Op::UpdateInsert => {
186                    let Some(old_row) = get_expected!() else {
187                        // not exists before, meaning no conflict, simply insert
188                        let new_row_deserialized =
189                            row_serde.deserializer.deserialize(row.clone())?;
190                        change_buffer.insert(key.clone(), new_row_deserialized);
191                        self.lru_cache.put(key, Some(CompactedRow { row }));
192                        continue;
193                    };
194
195                    // now conflict happens, handle it according to the specified behavior
196                    match self.conflict_behavior {
197                        ConflictBehavior::Overwrite => {
198                            let old_row_deserialized =
199                                row_serde.deserializer.deserialize(old_row.row.clone())?;
200                            let new_row_deserialized =
201                                row_serde.deserializer.deserialize(row.clone())?;
202
203                            let need_overwrite = if !version_column_indices.is_empty() {
204                                versions_are_newer_or_equal(
205                                    &old_row_deserialized,
206                                    &new_row_deserialized,
207                                    &version_column_indices,
208                                )
209                            } else {
210                                // no version column specified, just overwrite
211                                true
212                            };
213
214                            if need_overwrite {
215                                if let Some(toastable_indices) = &self.toastable_column_indices {
216                                    // For TOAST-able columns, replace Debezium's unavailable value placeholder with old row values.
217                                    let final_row = toast::handle_toast_columns_for_postgres_cdc(
218                                        &old_row_deserialized,
219                                        &new_row_deserialized,
220                                        toastable_indices,
221                                    );
222
223                                    change_buffer.update(
224                                        key.clone(),
225                                        old_row_deserialized,
226                                        final_row.clone(),
227                                    );
228                                    let final_row_bytes =
229                                        Bytes::from(row_serde.serializer.serialize(final_row));
230                                    self.lru_cache.put(
231                                        key.clone(),
232                                        Some(CompactedRow {
233                                            row: final_row_bytes,
234                                        }),
235                                    );
236                                } else {
237                                    // No TOAST columns, use the original row bytes directly to avoid unnecessary serialization
238                                    change_buffer.update(
239                                        key.clone(),
240                                        old_row_deserialized,
241                                        new_row_deserialized,
242                                    );
243                                    self.lru_cache
244                                        .put(key.clone(), Some(CompactedRow { row: row.clone() }));
245                                }
246                            };
247                        }
248                        ConflictBehavior::IgnoreConflict => {
249                            // ignore conflict, do nothing
250                        }
251                        ConflictBehavior::DoUpdateIfNotNull => {
252                            // In this section, we compare the new row and old row column by column and perform `DoUpdateIfNotNull` replacement.
253
254                            let old_row_deserialized =
255                                row_serde.deserializer.deserialize(old_row.row.clone())?;
256                            let new_row_deserialized =
257                                row_serde.deserializer.deserialize(row.clone())?;
258                            let need_overwrite = if !version_column_indices.is_empty() {
259                                versions_are_newer_or_equal(
260                                    &old_row_deserialized,
261                                    &new_row_deserialized,
262                                    &version_column_indices,
263                                )
264                            } else {
265                                true
266                            };
267
268                            if need_overwrite {
269                                let mut row_deserialized_vec =
270                                    old_row_deserialized.clone().into_inner().into_vec();
271                                replace_if_not_null(
272                                    &mut row_deserialized_vec,
273                                    new_row_deserialized.clone(),
274                                );
275                                let mut updated_row = OwnedRow::new(row_deserialized_vec);
276
277                                // Apply TOAST column fix for CDC tables with TOAST columns
278                                if let Some(toastable_indices) = &self.toastable_column_indices {
279                                    // Note: we need to use old_row_deserialized again, but it was moved above
280                                    // So we re-deserialize the old row
281                                    let old_row_deserialized_again =
282                                        row_serde.deserializer.deserialize(old_row.row.clone())?;
283                                    updated_row = toast::handle_toast_columns_for_postgres_cdc(
284                                        &old_row_deserialized_again,
285                                        &updated_row,
286                                        toastable_indices,
287                                    );
288                                }
289
290                                change_buffer.update(
291                                    key.clone(),
292                                    old_row_deserialized,
293                                    updated_row.clone(),
294                                );
295                                let updated_row_bytes =
296                                    Bytes::from(row_serde.serializer.serialize(updated_row));
297                                self.lru_cache.put(
298                                    key.clone(),
299                                    Some(CompactedRow {
300                                        row: updated_row_bytes,
301                                    }),
302                                );
303                            }
304                        }
305                        ConflictBehavior::NoCheck => unreachable!(),
306                    };
307                }
308
309                Op::UpdateDelete
310                    if matches!(
311                        self.conflict_behavior,
312                        ConflictBehavior::Overwrite | ConflictBehavior::DoUpdateIfNotNull
313                    ) =>
314                {
315                    // For `UpdateDelete`s, we skip processing them but directly handle the following `UpdateInsert`
316                    // instead. This is because...
317                    //
318                    // - For `Overwrite`, we only care about the new row.
319                    // - For `DoUpdateIfNotNull`, we don't want the whole row to be deleted, but instead perform
320                    //   column-wise replacement when handling the `UpdateInsert`.
321                    //
322                    // However, for `IgnoreConflict`, we still need to delete the old row first, otherwise the row
323                    // cannot be updated at all.
324                }
325
326                Op::Delete | Op::UpdateDelete => {
327                    if let Some(old_row) = get_expected!() {
328                        let old_row_deserialized =
329                            row_serde.deserializer.deserialize(old_row.row.clone())?;
330                        change_buffer.delete(key.clone(), old_row_deserialized);
331                        // put a None into the cache to represent deletion
332                        self.lru_cache.put(key, None);
333                    } else {
334                        // delete a non-existent value
335                        // this is allowed in the case of mview conflict, so ignore
336                    }
337                }
338            }
339        }
340        Ok(change_buffer)
341    }
342
343    async fn fetch_keys<'a, S: StateStore, SD: ValueRowSerde>(
344        &mut self,
345        keys: impl Iterator<Item = &'a [u8]>,
346        table: &StateTableInner<S, SD>,
347    ) -> StreamExecutorResult<()> {
348        let mut futures = vec![];
349        for key in keys {
350            self.metrics.materialize_cache_total_count.inc();
351
352            if let Some(cached) = self.lru_cache.get(key) {
353                self.metrics.materialize_cache_hit_count.inc();
354                if let Some(row) = cached {
355                    if row_below_committed_watermark(&self.row_serde, table, row)? {
356                        self.lru_cache.put(key.to_vec(), None);
357                    } else {
358                        self.metrics.materialize_data_exist_count.inc();
359                    }
360                }
361                continue;
362            }
363
364            let row_serde = self.row_serde.clone();
365            futures.push(async move {
366                let key_row = table.pk_serde().deserialize(key).unwrap();
367                let row = table.get_row(key_row).await?.map(CompactedRow::from);
368                let row = match row {
369                    Some(row) if row_below_committed_watermark(&row_serde, table, &row)? => None,
370                    other => other,
371                };
372                StreamExecutorResult::Ok((key.to_vec(), row))
373            });
374        }
375
376        let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
377        while let Some(result) = buffered.next().await {
378            let (key, row) = result?;
379            if row.is_some() {
380                self.metrics.materialize_data_exist_count.inc();
381            }
382            // for keys that are not in the table, `value` is None
383            match self.conflict_behavior {
384                checked_conflict_behaviors!() => self.lru_cache.put(key, row),
385                _ => unreachable!(),
386            };
387        }
388
389        Ok(())
390    }
391
392    /// Evict the LRU cache entries that are lower than the watermark.
393    pub fn evict(&mut self) {
394        self.lru_cache.evict()
395    }
396
397    /// Clear the LRU cache.
398    pub fn clear(&mut self) {
399        self.lru_cache.clear()
400    }
401}
402
403/// Replace columns in an existing row with the corresponding columns in a replacement row, if the
404/// column value in the replacement row is not null.
405///
406/// # Example
407///
408/// ```ignore
409/// let mut row = vec![Some(1), None, Some(3)];
410/// let replacement = vec![Some(10), Some(20), None];
411/// replace_if_not_null(&mut row, replacement);
412/// ```
413///
414/// After the call, `row` will be `[Some(10), Some(20), Some(3)]`.
415fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
416    for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
417        if let Some(new_value) = new_col {
418            *old_col = Some(new_value);
419        }
420    }
421}
422
423/// Compare multiple version columns lexicographically.
424/// Returns true if `new_row` has a newer or equal version compared to `old_row`.
425fn versions_are_newer_or_equal(
426    old_row: &OwnedRow,
427    new_row: &OwnedRow,
428    version_column_indices: &[u32],
429) -> bool {
430    if version_column_indices.is_empty() {
431        // No version columns specified, always consider new version as newer
432        return true;
433    }
434
435    for &idx in version_column_indices {
436        let old_value = old_row.datum_at(idx as usize);
437        let new_value = new_row.datum_at(idx as usize);
438
439        match cmp_datum(old_value, new_value, OrderType::ascending_nulls_first()) {
440            std::cmp::Ordering::Less => return true,     // new is newer
441            std::cmp::Ordering::Greater => return false, // old is newer
442            std::cmp::Ordering::Equal => continue,       // equal, check next column
443        }
444    }
445
446    // All version columns are equal, consider new version as equal (should overwrite)
447    true
448}
449
450/// TOAST column handling for CDC tables with TOAST columns.
451mod toast {
452    use risingwave_common::row::Row as _;
453    use risingwave_common::types::DEBEZIUM_UNAVAILABLE_VALUE;
454
455    use super::*;
456
457    /// Fast string comparison to check if a string equals `DEBEZIUM_UNAVAILABLE_VALUE`.
458    /// Optimized by checking length first to avoid expensive string comparison.
459    fn is_unavailable_value_str(s: &str) -> bool {
460        s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE
461    }
462
463    /// Check if a datum represents Debezium's unavailable value placeholder.
464    /// This function handles both scalar types and one-dimensional arrays.
465    fn is_debezium_unavailable_value(
466        datum: &Option<risingwave_common::types::ScalarRefImpl<'_>>,
467    ) -> bool {
468        match datum {
469            Some(risingwave_common::types::ScalarRefImpl::Utf8(val)) => {
470                is_unavailable_value_str(val)
471            }
472            Some(risingwave_common::types::ScalarRefImpl::Jsonb(jsonb_ref)) => {
473                // For jsonb type, check if it's a string containing the unavailable value
474                jsonb_ref
475                    .as_str()
476                    .map(is_unavailable_value_str)
477                    .unwrap_or(false)
478            }
479            Some(risingwave_common::types::ScalarRefImpl::Bytea(bytea)) => {
480                // For bytea type, we need to check if it contains the string bytes of DEBEZIUM_UNAVAILABLE_VALUE
481                // This is because when processing bytea from Debezium, we convert the base64-encoded string
482                // to `DEBEZIUM_UNAVAILABLE_VALUE` in the json.rs parser to maintain consistency
483                if let Ok(bytea_str) = std::str::from_utf8(bytea) {
484                    is_unavailable_value_str(bytea_str)
485                } else {
486                    false
487                }
488            }
489            Some(risingwave_common::types::ScalarRefImpl::List(list_ref)) => {
490                // For list type, check if it contains exactly one element with the unavailable value
491                // This is because when any element in an array triggers TOAST, Debezium treats the entire
492                // array as unchanged and sends a placeholder array with only one element
493                if list_ref.len() == 1 {
494                    if let Some(Some(element)) = list_ref.get(0) {
495                        // Recursively check the array element
496                        is_debezium_unavailable_value(&Some(element))
497                    } else {
498                        false
499                    }
500                } else {
501                    false
502                }
503            }
504            _ => false,
505        }
506    }
507
508    /// Fix TOAST columns by replacing unavailable values with old row values.
509    pub fn handle_toast_columns_for_postgres_cdc(
510        old_row: &OwnedRow,
511        new_row: &OwnedRow,
512        toastable_indices: &[usize],
513    ) -> OwnedRow {
514        let mut fixed_row_data = new_row.as_inner().to_vec();
515
516        for &toast_idx in toastable_indices {
517            // Check if the new value is Debezium's unavailable value placeholder
518            let is_unavailable = is_debezium_unavailable_value(&new_row.datum_at(toast_idx));
519            if is_unavailable {
520                // Replace with old row value if available
521                if let Some(old_datum_ref) = old_row.datum_at(toast_idx) {
522                    fixed_row_data[toast_idx] = Some(old_datum_ref.into_scalar_impl());
523                }
524            }
525        }
526
527        OwnedRow::new(fixed_row_data)
528    }
529}