risingwave_stream/executor/mview/
cache.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::ops::Deref as _;
17
18use bytes::Bytes;
19use futures::{StreamExt as _, stream};
20use itertools::Itertools as _;
21use risingwave_common::array::{Op, StreamChunk};
22use risingwave_common::catalog::{ConflictBehavior, checked_conflict_behaviors};
23use risingwave_common::row::{CompactedRow, OwnedRow, Row as _};
24use risingwave_common::types::ScalarImpl;
25use risingwave_common::util::iter_util::ZipEqFast as _;
26use risingwave_common::util::sort_util::{OrderType, cmp_datum};
27use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer as _};
28use risingwave_storage::StateStore;
29use risingwave_storage::row_serde::value_serde::ValueRowSerde;
30
31use crate::cache::ManagedLruCache;
32use crate::common::metrics::MetricsInfo;
33use crate::common::table::state_table::StateTableInner;
34use crate::executor::StreamExecutorResult;
35use crate::executor::monitor::MaterializeCacheMetrics;
36use crate::task::AtomicU64Ref;
37
38/// A cache for materialize executors.
39pub struct MaterializeCache {
40    lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
41    row_serde: BasicSerde,
42    version_column_indices: Vec<u32>,
43    conflict_behavior: ConflictBehavior,
44    toastable_column_indices: Option<Vec<usize>>,
45    metrics: MaterializeCacheMetrics,
46}
47
48type CacheValue = Option<CompactedRow>;
49type ChangeBuffer = crate::common::change_buffer::ChangeBuffer<Vec<u8>, OwnedRow>;
50
51impl MaterializeCache {
52    /// Create a new `MaterializeCache`.
53    ///
54    /// Returns `None` if the conflict behavior is `NoCheck`.
55    pub fn new(
56        watermark_sequence: AtomicU64Ref,
57        metrics_info: MetricsInfo,
58        row_serde: BasicSerde,
59        version_column_indices: Vec<u32>,
60        conflict_behavior: ConflictBehavior,
61        toastable_column_indices: Option<Vec<usize>>,
62        materialize_cache_metrics: MaterializeCacheMetrics,
63    ) -> Option<Self> {
64        match conflict_behavior {
65            checked_conflict_behaviors!() => {}
66            ConflictBehavior::NoCheck => {
67                assert!(
68                    toastable_column_indices.is_none(),
69                    "when there are toastable columns, conflict handling must be enabled"
70                );
71                return None;
72            }
73        }
74
75        let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
76            ManagedLruCache::unbounded(watermark_sequence, metrics_info);
77        Some(Self {
78            lru_cache,
79            row_serde,
80            version_column_indices,
81            conflict_behavior,
82            toastable_column_indices,
83            metrics: materialize_cache_metrics,
84        })
85    }
86
87    /// First populate the cache from `table`, and then calculate a [`ChangeBuffer`].
88    /// `table` will not be written in this method.
89    pub async fn handle_new<S: StateStore, SD: ValueRowSerde>(
90        &mut self,
91        chunk: StreamChunk,
92        table: &StateTableInner<S, SD>,
93    ) -> StreamExecutorResult<ChangeBuffer> {
94        if table.value_indices().is_some() {
95            // TODO(st1page): when materialize partial columns(), we should
96            // construct some columns in the pk
97            unimplemented!("materialize cache cannot handle conflicts with partial table values");
98        };
99
100        let (data_chunk, ops) = chunk.clone().into_parts();
101        let values = data_chunk.serialize();
102
103        let key_chunk = data_chunk.project(table.pk_indices());
104
105        let pks = {
106            let mut pks = vec![vec![]; data_chunk.capacity()];
107            key_chunk
108                .rows_with_holes()
109                .zip_eq_fast(pks.iter_mut())
110                .for_each(|(r, vnode_and_pk)| {
111                    if let Some(r) = r {
112                        table.pk_serde().serialize(r, vnode_and_pk);
113                    }
114                });
115            pks
116        };
117        let (_, vis) = key_chunk.into_parts();
118        let row_ops = ops
119            .iter()
120            .zip_eq_fast(pks.into_iter())
121            .zip_eq_fast(values.into_iter())
122            .zip_eq_fast(vis.iter())
123            .filter_map(|(((op, k), v), vis)| vis.then_some((*op, k, v)))
124            .collect_vec();
125
126        self.handle_inner(row_ops, table).await
127    }
128
129    async fn handle_inner<S: StateStore, SD: ValueRowSerde>(
130        &mut self,
131        row_ops: Vec<(Op, Vec<u8>, Bytes)>,
132        table: &StateTableInner<S, SD>,
133    ) -> StreamExecutorResult<ChangeBuffer> {
134        let key_set: HashSet<Box<[u8]>> = row_ops
135            .iter()
136            .map(|(_, k, _)| k.as_slice().into())
137            .collect();
138
139        // Populate the LRU cache with the keys in input chunk.
140        // For new keys, row values are set to None.
141        self.fetch_keys(key_set.iter().map(|v| v.deref()), table)
142            .await?;
143
144        let mut change_buffer = ChangeBuffer::new();
145        let row_serde = self.row_serde.clone();
146        let version_column_indices = self.version_column_indices.clone();
147        for (op, key, row) in row_ops {
148            // Use a macro instead of method to workaround partial borrow.
149            macro_rules! get_expected {
150                () => {
151                    self.lru_cache.get(&key).unwrap_or_else(|| {
152                        panic!(
153                            "the key {:?} has not been fetched in the materialize executor's cache",
154                            key
155                        )
156                    })
157                };
158            }
159
160            match op {
161                Op::Insert | Op::UpdateInsert => {
162                    let Some(old_row) = get_expected!() else {
163                        // not exists before, meaning no conflict, simply insert
164                        let new_row_deserialized =
165                            row_serde.deserializer.deserialize(row.clone())?;
166                        change_buffer.insert(key.clone(), new_row_deserialized);
167                        self.lru_cache.put(key, Some(CompactedRow { row }));
168                        continue;
169                    };
170
171                    // now conflict happens, handle it according to the specified behavior
172                    match self.conflict_behavior {
173                        ConflictBehavior::Overwrite => {
174                            let old_row_deserialized =
175                                row_serde.deserializer.deserialize(old_row.row.clone())?;
176                            let new_row_deserialized =
177                                row_serde.deserializer.deserialize(row.clone())?;
178
179                            let need_overwrite = if !version_column_indices.is_empty() {
180                                versions_are_newer_or_equal(
181                                    &old_row_deserialized,
182                                    &new_row_deserialized,
183                                    &version_column_indices,
184                                )
185                            } else {
186                                // no version column specified, just overwrite
187                                true
188                            };
189
190                            if need_overwrite {
191                                if let Some(toastable_indices) = &self.toastable_column_indices {
192                                    // For TOAST-able columns, replace Debezium's unavailable value placeholder with old row values.
193                                    let final_row = toast::handle_toast_columns_for_postgres_cdc(
194                                        &old_row_deserialized,
195                                        &new_row_deserialized,
196                                        toastable_indices,
197                                    );
198
199                                    change_buffer.update(
200                                        key.clone(),
201                                        old_row_deserialized,
202                                        final_row.clone(),
203                                    );
204                                    let final_row_bytes =
205                                        Bytes::from(row_serde.serializer.serialize(final_row));
206                                    self.lru_cache.put(
207                                        key.clone(),
208                                        Some(CompactedRow {
209                                            row: final_row_bytes,
210                                        }),
211                                    );
212                                } else {
213                                    // No TOAST columns, use the original row bytes directly to avoid unnecessary serialization
214                                    change_buffer.update(
215                                        key.clone(),
216                                        old_row_deserialized,
217                                        new_row_deserialized,
218                                    );
219                                    self.lru_cache
220                                        .put(key.clone(), Some(CompactedRow { row: row.clone() }));
221                                }
222                            };
223                        }
224                        ConflictBehavior::IgnoreConflict => {
225                            // ignore conflict, do nothing
226                        }
227                        ConflictBehavior::DoUpdateIfNotNull => {
228                            // In this section, we compare the new row and old row column by column and perform `DoUpdateIfNotNull` replacement.
229
230                            let old_row_deserialized =
231                                row_serde.deserializer.deserialize(old_row.row.clone())?;
232                            let new_row_deserialized =
233                                row_serde.deserializer.deserialize(row.clone())?;
234                            let need_overwrite = if !version_column_indices.is_empty() {
235                                versions_are_newer_or_equal(
236                                    &old_row_deserialized,
237                                    &new_row_deserialized,
238                                    &version_column_indices,
239                                )
240                            } else {
241                                true
242                            };
243
244                            if need_overwrite {
245                                let mut row_deserialized_vec =
246                                    old_row_deserialized.clone().into_inner().into_vec();
247                                replace_if_not_null(
248                                    &mut row_deserialized_vec,
249                                    new_row_deserialized.clone(),
250                                );
251                                let mut updated_row = OwnedRow::new(row_deserialized_vec);
252
253                                // Apply TOAST column fix for CDC tables with TOAST columns
254                                if let Some(toastable_indices) = &self.toastable_column_indices {
255                                    // Note: we need to use old_row_deserialized again, but it was moved above
256                                    // So we re-deserialize the old row
257                                    let old_row_deserialized_again =
258                                        row_serde.deserializer.deserialize(old_row.row.clone())?;
259                                    updated_row = toast::handle_toast_columns_for_postgres_cdc(
260                                        &old_row_deserialized_again,
261                                        &updated_row,
262                                        toastable_indices,
263                                    );
264                                }
265
266                                change_buffer.update(
267                                    key.clone(),
268                                    old_row_deserialized,
269                                    updated_row.clone(),
270                                );
271                                let updated_row_bytes =
272                                    Bytes::from(row_serde.serializer.serialize(updated_row));
273                                self.lru_cache.put(
274                                    key.clone(),
275                                    Some(CompactedRow {
276                                        row: updated_row_bytes,
277                                    }),
278                                );
279                            }
280                        }
281                        ConflictBehavior::NoCheck => unreachable!(),
282                    };
283                }
284
285                Op::UpdateDelete
286                    if matches!(
287                        self.conflict_behavior,
288                        ConflictBehavior::Overwrite | ConflictBehavior::DoUpdateIfNotNull
289                    ) =>
290                {
291                    // For `UpdateDelete`s, we skip processing them but directly handle the following `UpdateInsert`
292                    // instead. This is because...
293                    //
294                    // - For `Overwrite`, we only care about the new row.
295                    // - For `DoUpdateIfNotNull`, we don't want the whole row to be deleted, but instead perform
296                    //   column-wise replacement when handling the `UpdateInsert`.
297                    //
298                    // However, for `IgnoreConflict`, we still need to delete the old row first, otherwise the row
299                    // cannot be updated at all.
300                }
301
302                Op::Delete | Op::UpdateDelete => {
303                    if let Some(old_row) = get_expected!() {
304                        let old_row_deserialized =
305                            row_serde.deserializer.deserialize(old_row.row.clone())?;
306                        change_buffer.delete(key.clone(), old_row_deserialized);
307                        // put a None into the cache to represent deletion
308                        self.lru_cache.put(key, None);
309                    } else {
310                        // delete a non-existent value
311                        // this is allowed in the case of mview conflict, so ignore
312                    }
313                }
314            }
315        }
316        Ok(change_buffer)
317    }
318
319    async fn fetch_keys<'a, S: StateStore, SD: ValueRowSerde>(
320        &mut self,
321        keys: impl Iterator<Item = &'a [u8]>,
322        table: &StateTableInner<S, SD>,
323    ) -> StreamExecutorResult<()> {
324        let mut futures = vec![];
325        for key in keys {
326            self.metrics.materialize_cache_total_count.inc();
327
328            if self.lru_cache.contains(key) {
329                if self.lru_cache.get(key).unwrap().is_some() {
330                    self.metrics.materialize_data_exist_count.inc();
331                }
332                self.metrics.materialize_cache_hit_count.inc();
333                continue;
334            }
335            futures.push(async {
336                let key_row = table.pk_serde().deserialize(key).unwrap();
337                let row = table.get_row(key_row).await?.map(CompactedRow::from);
338                StreamExecutorResult::Ok((key.to_vec(), row))
339            });
340        }
341
342        let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
343        while let Some(result) = buffered.next().await {
344            let (key, row) = result?;
345            if row.is_some() {
346                self.metrics.materialize_data_exist_count.inc();
347            }
348            // for keys that are not in the table, `value` is None
349            match self.conflict_behavior {
350                checked_conflict_behaviors!() => self.lru_cache.put(key, row),
351                _ => unreachable!(),
352            };
353        }
354
355        Ok(())
356    }
357
358    /// Evict the LRU cache entries that are lower than the watermark.
359    pub fn evict(&mut self) {
360        self.lru_cache.evict()
361    }
362
363    /// Clear the LRU cache.
364    pub fn clear(&mut self) {
365        self.lru_cache.clear()
366    }
367}
368
369/// Replace columns in an existing row with the corresponding columns in a replacement row, if the
370/// column value in the replacement row is not null.
371///
372/// # Example
373///
374/// ```ignore
375/// let mut row = vec![Some(1), None, Some(3)];
376/// let replacement = vec![Some(10), Some(20), None];
377/// replace_if_not_null(&mut row, replacement);
378/// ```
379///
380/// After the call, `row` will be `[Some(10), Some(20), Some(3)]`.
381fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
382    for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
383        if let Some(new_value) = new_col {
384            *old_col = Some(new_value);
385        }
386    }
387}
388
389/// Compare multiple version columns lexicographically.
390/// Returns true if `new_row` has a newer or equal version compared to `old_row`.
391fn versions_are_newer_or_equal(
392    old_row: &OwnedRow,
393    new_row: &OwnedRow,
394    version_column_indices: &[u32],
395) -> bool {
396    if version_column_indices.is_empty() {
397        // No version columns specified, always consider new version as newer
398        return true;
399    }
400
401    for &idx in version_column_indices {
402        let old_value = old_row.datum_at(idx as usize);
403        let new_value = new_row.datum_at(idx as usize);
404
405        match cmp_datum(old_value, new_value, OrderType::ascending_nulls_first()) {
406            std::cmp::Ordering::Less => return true,     // new is newer
407            std::cmp::Ordering::Greater => return false, // old is newer
408            std::cmp::Ordering::Equal => continue,       // equal, check next column
409        }
410    }
411
412    // All version columns are equal, consider new version as equal (should overwrite)
413    true
414}
415
416/// TOAST column handling for CDC tables with TOAST columns.
417mod toast {
418    use risingwave_common::row::Row as _;
419    use risingwave_common::types::DEBEZIUM_UNAVAILABLE_VALUE;
420
421    use super::*;
422
423    /// Fast string comparison to check if a string equals `DEBEZIUM_UNAVAILABLE_VALUE`.
424    /// Optimized by checking length first to avoid expensive string comparison.
425    fn is_unavailable_value_str(s: &str) -> bool {
426        s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE
427    }
428
429    /// Check if a datum represents Debezium's unavailable value placeholder.
430    /// This function handles both scalar types and one-dimensional arrays.
431    fn is_debezium_unavailable_value(
432        datum: &Option<risingwave_common::types::ScalarRefImpl<'_>>,
433    ) -> bool {
434        match datum {
435            Some(risingwave_common::types::ScalarRefImpl::Utf8(val)) => {
436                is_unavailable_value_str(val)
437            }
438            Some(risingwave_common::types::ScalarRefImpl::Jsonb(jsonb_ref)) => {
439                // For jsonb type, check if it's a string containing the unavailable value
440                jsonb_ref
441                    .as_str()
442                    .map(is_unavailable_value_str)
443                    .unwrap_or(false)
444            }
445            Some(risingwave_common::types::ScalarRefImpl::Bytea(bytea)) => {
446                // For bytea type, we need to check if it contains the string bytes of DEBEZIUM_UNAVAILABLE_VALUE
447                // This is because when processing bytea from Debezium, we convert the base64-encoded string
448                // to `DEBEZIUM_UNAVAILABLE_VALUE` in the json.rs parser to maintain consistency
449                if let Ok(bytea_str) = std::str::from_utf8(bytea) {
450                    is_unavailable_value_str(bytea_str)
451                } else {
452                    false
453                }
454            }
455            Some(risingwave_common::types::ScalarRefImpl::List(list_ref)) => {
456                // For list type, check if it contains exactly one element with the unavailable value
457                // This is because when any element in an array triggers TOAST, Debezium treats the entire
458                // array as unchanged and sends a placeholder array with only one element
459                if list_ref.len() == 1 {
460                    if let Some(Some(element)) = list_ref.get(0) {
461                        // Recursively check the array element
462                        is_debezium_unavailable_value(&Some(element))
463                    } else {
464                        false
465                    }
466                } else {
467                    false
468                }
469            }
470            _ => false,
471        }
472    }
473
474    /// Fix TOAST columns by replacing unavailable values with old row values.
475    pub fn handle_toast_columns_for_postgres_cdc(
476        old_row: &OwnedRow,
477        new_row: &OwnedRow,
478        toastable_indices: &[usize],
479    ) -> OwnedRow {
480        let mut fixed_row_data = new_row.as_inner().to_vec();
481
482        for &toast_idx in toastable_indices {
483            // Check if the new value is Debezium's unavailable value placeholder
484            let is_unavailable = is_debezium_unavailable_value(&new_row.datum_at(toast_idx));
485            if is_unavailable {
486                // Replace with old row value if available
487                if let Some(old_datum_ref) = old_row.datum_at(toast_idx) {
488                    fixed_row_data[toast_idx] = Some(old_datum_ref.into_scalar_impl());
489                }
490            }
491        }
492
493        OwnedRow::new(fixed_row_data)
494    }
495}