risingwave_stream/executor/mview/
cache.rs1use std::collections::HashSet;
16use std::ops::Deref as _;
17
18use bytes::Bytes;
19use futures::{StreamExt as _, stream};
20use itertools::Itertools as _;
21use risingwave_common::array::{Op, StreamChunk};
22use risingwave_common::catalog::{ConflictBehavior, checked_conflict_behaviors};
23use risingwave_common::row::{CompactedRow, OwnedRow, Row as _};
24use risingwave_common::types::ScalarImpl;
25use risingwave_common::util::iter_util::ZipEqFast as _;
26use risingwave_common::util::sort_util::{OrderType, cmp_datum};
27use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer as _};
28use risingwave_storage::StateStore;
29use risingwave_storage::row_serde::value_serde::ValueRowSerde;
30
31use crate::cache::ManagedLruCache;
32use crate::common::metrics::MetricsInfo;
33use crate::common::table::state_table::StateTableInner;
34use crate::executor::StreamExecutorResult;
35use crate::executor::monitor::MaterializeCacheMetrics;
36use crate::task::AtomicU64Ref;
37
38pub struct MaterializeCache {
40 lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
41 row_serde: BasicSerde,
42 version_column_indices: Vec<u32>,
43 conflict_behavior: ConflictBehavior,
44 toastable_column_indices: Option<Vec<usize>>,
45 metrics: MaterializeCacheMetrics,
46}
47
48type CacheValue = Option<CompactedRow>;
49type ChangeBuffer = crate::common::change_buffer::ChangeBuffer<Vec<u8>, OwnedRow>;
50
51fn row_below_committed_watermark<S: StateStore, SD: ValueRowSerde>(
57 row_serde: &BasicSerde,
58 table: &StateTableInner<S, SD>,
59 row: &CompactedRow,
60) -> StreamExecutorResult<bool> {
61 let Some(clean_watermark_index) = table.clean_watermark_index else {
62 return Ok(false);
63 };
64 let Some(committed_watermark) = table.get_committed_watermark() else {
65 return Ok(false);
66 };
67 let deserialized = row_serde.deserializer.deserialize(row.row.clone())?;
68 Ok(cmp_datum(
69 deserialized.datum_at(clean_watermark_index),
70 Some(committed_watermark.as_scalar_ref_impl()),
71 OrderType::ascending(),
72 ) == std::cmp::Ordering::Less)
73}
74
75impl MaterializeCache {
76 pub fn new(
80 watermark_sequence: AtomicU64Ref,
81 metrics_info: MetricsInfo,
82 row_serde: BasicSerde,
83 version_column_indices: Vec<u32>,
84 conflict_behavior: ConflictBehavior,
85 toastable_column_indices: Option<Vec<usize>>,
86 materialize_cache_metrics: MaterializeCacheMetrics,
87 ) -> Option<Self> {
88 match conflict_behavior {
89 checked_conflict_behaviors!() => {}
90 ConflictBehavior::NoCheck => {
91 assert!(
92 toastable_column_indices.is_none(),
93 "when there are toastable columns, conflict handling must be enabled"
94 );
95 return None;
96 }
97 }
98
99 let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
100 ManagedLruCache::unbounded(watermark_sequence, metrics_info);
101 Some(Self {
102 lru_cache,
103 row_serde,
104 version_column_indices,
105 conflict_behavior,
106 toastable_column_indices,
107 metrics: materialize_cache_metrics,
108 })
109 }
110
111 pub async fn handle_new<S: StateStore, SD: ValueRowSerde>(
114 &mut self,
115 chunk: StreamChunk,
116 table: &StateTableInner<S, SD>,
117 ) -> StreamExecutorResult<ChangeBuffer> {
118 if table.value_indices().is_some() {
119 unimplemented!("materialize cache cannot handle conflicts with partial table values");
122 };
123
124 let (data_chunk, ops) = chunk.clone().into_parts();
125 let values = data_chunk.serialize();
126
127 let key_chunk = data_chunk.project(table.pk_indices());
128
129 let pks = {
130 let mut pks = vec![vec![]; data_chunk.capacity()];
131 key_chunk
132 .rows_with_holes()
133 .zip_eq_fast(pks.iter_mut())
134 .for_each(|(r, vnode_and_pk)| {
135 if let Some(r) = r {
136 table.pk_serde().serialize(r, vnode_and_pk);
137 }
138 });
139 pks
140 };
141 let (_, vis) = key_chunk.into_parts();
142 let row_ops = ops
143 .iter()
144 .zip_eq_fast(pks.into_iter())
145 .zip_eq_fast(values.into_iter())
146 .zip_eq_fast(vis.iter())
147 .filter_map(|(((op, k), v), vis)| vis.then_some((*op, k, v)))
148 .collect_vec();
149
150 self.handle_inner(row_ops, table).await
151 }
152
153 async fn handle_inner<S: StateStore, SD: ValueRowSerde>(
154 &mut self,
155 row_ops: Vec<(Op, Vec<u8>, Bytes)>,
156 table: &StateTableInner<S, SD>,
157 ) -> StreamExecutorResult<ChangeBuffer> {
158 let key_set: HashSet<Box<[u8]>> = row_ops
159 .iter()
160 .map(|(_, k, _)| k.as_slice().into())
161 .collect();
162
163 self.fetch_keys(key_set.iter().map(|v| v.deref()), table)
166 .await?;
167
168 let mut change_buffer = ChangeBuffer::new();
169 let row_serde = self.row_serde.clone();
170 let version_column_indices = self.version_column_indices.clone();
171 for (op, key, row) in row_ops {
172 macro_rules! get_expected {
174 () => {
175 self.lru_cache.get(&key).unwrap_or_else(|| {
176 panic!(
177 "the key {:?} has not been fetched in the materialize executor's cache",
178 key
179 )
180 })
181 };
182 }
183
184 match op {
185 Op::Insert | Op::UpdateInsert => {
186 let Some(old_row) = get_expected!() else {
187 let new_row_deserialized =
189 row_serde.deserializer.deserialize(row.clone())?;
190 change_buffer.insert(key.clone(), new_row_deserialized);
191 self.lru_cache.put(key, Some(CompactedRow { row }));
192 continue;
193 };
194
195 match self.conflict_behavior {
197 ConflictBehavior::Overwrite => {
198 let old_row_deserialized =
199 row_serde.deserializer.deserialize(old_row.row.clone())?;
200 let new_row_deserialized =
201 row_serde.deserializer.deserialize(row.clone())?;
202
203 let need_overwrite = if !version_column_indices.is_empty() {
204 versions_are_newer_or_equal(
205 &old_row_deserialized,
206 &new_row_deserialized,
207 &version_column_indices,
208 )
209 } else {
210 true
212 };
213
214 if need_overwrite {
215 if let Some(toastable_indices) = &self.toastable_column_indices {
216 let final_row = toast::handle_toast_columns_for_postgres_cdc(
218 &old_row_deserialized,
219 &new_row_deserialized,
220 toastable_indices,
221 );
222
223 change_buffer.update(
224 key.clone(),
225 old_row_deserialized,
226 final_row.clone(),
227 );
228 let final_row_bytes =
229 Bytes::from(row_serde.serializer.serialize(final_row));
230 self.lru_cache.put(
231 key.clone(),
232 Some(CompactedRow {
233 row: final_row_bytes,
234 }),
235 );
236 } else {
237 change_buffer.update(
239 key.clone(),
240 old_row_deserialized,
241 new_row_deserialized,
242 );
243 self.lru_cache
244 .put(key.clone(), Some(CompactedRow { row: row.clone() }));
245 }
246 };
247 }
248 ConflictBehavior::IgnoreConflict => {
249 }
251 ConflictBehavior::DoUpdateIfNotNull => {
252 let old_row_deserialized =
255 row_serde.deserializer.deserialize(old_row.row.clone())?;
256 let new_row_deserialized =
257 row_serde.deserializer.deserialize(row.clone())?;
258 let need_overwrite = if !version_column_indices.is_empty() {
259 versions_are_newer_or_equal(
260 &old_row_deserialized,
261 &new_row_deserialized,
262 &version_column_indices,
263 )
264 } else {
265 true
266 };
267
268 if need_overwrite {
269 let mut row_deserialized_vec =
270 old_row_deserialized.clone().into_inner().into_vec();
271 replace_if_not_null(
272 &mut row_deserialized_vec,
273 new_row_deserialized.clone(),
274 );
275 let mut updated_row = OwnedRow::new(row_deserialized_vec);
276
277 if let Some(toastable_indices) = &self.toastable_column_indices {
279 let old_row_deserialized_again =
282 row_serde.deserializer.deserialize(old_row.row.clone())?;
283 updated_row = toast::handle_toast_columns_for_postgres_cdc(
284 &old_row_deserialized_again,
285 &updated_row,
286 toastable_indices,
287 );
288 }
289
290 change_buffer.update(
291 key.clone(),
292 old_row_deserialized,
293 updated_row.clone(),
294 );
295 let updated_row_bytes =
296 Bytes::from(row_serde.serializer.serialize(updated_row));
297 self.lru_cache.put(
298 key.clone(),
299 Some(CompactedRow {
300 row: updated_row_bytes,
301 }),
302 );
303 }
304 }
305 ConflictBehavior::NoCheck => unreachable!(),
306 };
307 }
308
309 Op::UpdateDelete
310 if matches!(
311 self.conflict_behavior,
312 ConflictBehavior::Overwrite | ConflictBehavior::DoUpdateIfNotNull
313 ) =>
314 {
315 }
325
326 Op::Delete | Op::UpdateDelete => {
327 if let Some(old_row) = get_expected!() {
328 let old_row_deserialized =
329 row_serde.deserializer.deserialize(old_row.row.clone())?;
330 change_buffer.delete(key.clone(), old_row_deserialized);
331 self.lru_cache.put(key, None);
333 } else {
334 }
337 }
338 }
339 }
340 Ok(change_buffer)
341 }
342
343 async fn fetch_keys<'a, S: StateStore, SD: ValueRowSerde>(
344 &mut self,
345 keys: impl Iterator<Item = &'a [u8]>,
346 table: &StateTableInner<S, SD>,
347 ) -> StreamExecutorResult<()> {
348 let mut futures = vec![];
349 for key in keys {
350 self.metrics.materialize_cache_total_count.inc();
351
352 if let Some(cached) = self.lru_cache.get(key) {
353 self.metrics.materialize_cache_hit_count.inc();
354 if let Some(row) = cached {
355 if row_below_committed_watermark(&self.row_serde, table, row)? {
356 self.lru_cache.put(key.to_vec(), None);
357 } else {
358 self.metrics.materialize_data_exist_count.inc();
359 }
360 }
361 continue;
362 }
363
364 let row_serde = self.row_serde.clone();
365 futures.push(async move {
366 let key_row = table.pk_serde().deserialize(key).unwrap();
367 let row = table.get_row(key_row).await?.map(CompactedRow::from);
368 let row = match row {
369 Some(row) if row_below_committed_watermark(&row_serde, table, &row)? => None,
370 other => other,
371 };
372 StreamExecutorResult::Ok((key.to_vec(), row))
373 });
374 }
375
376 let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
377 while let Some(result) = buffered.next().await {
378 let (key, row) = result?;
379 if row.is_some() {
380 self.metrics.materialize_data_exist_count.inc();
381 }
382 match self.conflict_behavior {
384 checked_conflict_behaviors!() => self.lru_cache.put(key, row),
385 _ => unreachable!(),
386 };
387 }
388
389 Ok(())
390 }
391
392 pub fn evict(&mut self) {
394 self.lru_cache.evict()
395 }
396
397 pub fn clear(&mut self) {
399 self.lru_cache.clear()
400 }
401}
402
403fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
416 for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
417 if let Some(new_value) = new_col {
418 *old_col = Some(new_value);
419 }
420 }
421}
422
423fn versions_are_newer_or_equal(
426 old_row: &OwnedRow,
427 new_row: &OwnedRow,
428 version_column_indices: &[u32],
429) -> bool {
430 if version_column_indices.is_empty() {
431 return true;
433 }
434
435 for &idx in version_column_indices {
436 let old_value = old_row.datum_at(idx as usize);
437 let new_value = new_row.datum_at(idx as usize);
438
439 match cmp_datum(old_value, new_value, OrderType::ascending_nulls_first()) {
440 std::cmp::Ordering::Less => return true, std::cmp::Ordering::Greater => return false, std::cmp::Ordering::Equal => continue, }
444 }
445
446 true
448}
449
450mod toast {
452 use risingwave_common::row::Row as _;
453 use risingwave_common::types::DEBEZIUM_UNAVAILABLE_VALUE;
454
455 use super::*;
456
457 fn is_unavailable_value_str(s: &str) -> bool {
460 s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE
461 }
462
463 fn is_debezium_unavailable_value(
466 datum: &Option<risingwave_common::types::ScalarRefImpl<'_>>,
467 ) -> bool {
468 match datum {
469 Some(risingwave_common::types::ScalarRefImpl::Utf8(val)) => {
470 is_unavailable_value_str(val)
471 }
472 Some(risingwave_common::types::ScalarRefImpl::Jsonb(jsonb_ref)) => {
473 jsonb_ref
475 .as_str()
476 .map(is_unavailable_value_str)
477 .unwrap_or(false)
478 }
479 Some(risingwave_common::types::ScalarRefImpl::Bytea(bytea)) => {
480 if let Ok(bytea_str) = std::str::from_utf8(bytea) {
484 is_unavailable_value_str(bytea_str)
485 } else {
486 false
487 }
488 }
489 Some(risingwave_common::types::ScalarRefImpl::List(list_ref)) => {
490 if list_ref.len() == 1 {
494 if let Some(Some(element)) = list_ref.get(0) {
495 is_debezium_unavailable_value(&Some(element))
497 } else {
498 false
499 }
500 } else {
501 false
502 }
503 }
504 _ => false,
505 }
506 }
507
508 pub fn handle_toast_columns_for_postgres_cdc(
510 old_row: &OwnedRow,
511 new_row: &OwnedRow,
512 toastable_indices: &[usize],
513 ) -> OwnedRow {
514 let mut fixed_row_data = new_row.as_inner().to_vec();
515
516 for &toast_idx in toastable_indices {
517 let is_unavailable = is_debezium_unavailable_value(&new_row.datum_at(toast_idx));
519 if is_unavailable {
520 if let Some(old_datum_ref) = old_row.datum_at(toast_idx) {
522 fixed_row_data[toast_idx] = Some(old_datum_ref.into_scalar_impl());
523 }
524 }
525 }
526
527 OwnedRow::new(fixed_row_data)
528 }
529}