risingwave_stream/executor/mview/
cache.rs1use std::collections::HashSet;
16use std::ops::Deref as _;
17
18use bytes::Bytes;
19use futures::{StreamExt as _, stream};
20use itertools::Itertools as _;
21use risingwave_common::array::{Op, StreamChunk};
22use risingwave_common::catalog::{ConflictBehavior, checked_conflict_behaviors};
23use risingwave_common::row::{CompactedRow, OwnedRow, Row as _};
24use risingwave_common::types::ScalarImpl;
25use risingwave_common::util::iter_util::ZipEqFast as _;
26use risingwave_common::util::sort_util::{OrderType, cmp_datum};
27use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer as _};
28use risingwave_storage::StateStore;
29use risingwave_storage::row_serde::value_serde::ValueRowSerde;
30
31use crate::cache::ManagedLruCache;
32use crate::common::metrics::MetricsInfo;
33use crate::common::table::state_table::StateTableInner;
34use crate::executor::StreamExecutorResult;
35use crate::executor::monitor::MaterializeCacheMetrics;
36use crate::task::AtomicU64Ref;
37
38pub struct MaterializeCache {
40 lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
41 row_serde: BasicSerde,
42 version_column_indices: Vec<u32>,
43 conflict_behavior: ConflictBehavior,
44 toastable_column_indices: Option<Vec<usize>>,
45 metrics: MaterializeCacheMetrics,
46}
47
48type CacheValue = Option<CompactedRow>;
49type ChangeBuffer = crate::common::change_buffer::ChangeBuffer<Vec<u8>, OwnedRow>;
50
51impl MaterializeCache {
52 pub fn new(
56 watermark_sequence: AtomicU64Ref,
57 metrics_info: MetricsInfo,
58 row_serde: BasicSerde,
59 version_column_indices: Vec<u32>,
60 conflict_behavior: ConflictBehavior,
61 toastable_column_indices: Option<Vec<usize>>,
62 materialize_cache_metrics: MaterializeCacheMetrics,
63 ) -> Option<Self> {
64 match conflict_behavior {
65 checked_conflict_behaviors!() => {}
66 ConflictBehavior::NoCheck => {
67 assert!(
68 toastable_column_indices.is_none(),
69 "when there are toastable columns, conflict handling must be enabled"
70 );
71 return None;
72 }
73 }
74
75 let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
76 ManagedLruCache::unbounded(watermark_sequence, metrics_info);
77 Some(Self {
78 lru_cache,
79 row_serde,
80 version_column_indices,
81 conflict_behavior,
82 toastable_column_indices,
83 metrics: materialize_cache_metrics,
84 })
85 }
86
87 pub async fn handle_new<S: StateStore, SD: ValueRowSerde>(
90 &mut self,
91 chunk: StreamChunk,
92 table: &StateTableInner<S, SD>,
93 ) -> StreamExecutorResult<ChangeBuffer> {
94 if table.value_indices().is_some() {
95 unimplemented!("materialize cache cannot handle conflicts with partial table values");
98 };
99
100 let (data_chunk, ops) = chunk.clone().into_parts();
101 let values = data_chunk.serialize();
102
103 let key_chunk = data_chunk.project(table.pk_indices());
104
105 let pks = {
106 let mut pks = vec![vec![]; data_chunk.capacity()];
107 key_chunk
108 .rows_with_holes()
109 .zip_eq_fast(pks.iter_mut())
110 .for_each(|(r, vnode_and_pk)| {
111 if let Some(r) = r {
112 table.pk_serde().serialize(r, vnode_and_pk);
113 }
114 });
115 pks
116 };
117 let (_, vis) = key_chunk.into_parts();
118 let row_ops = ops
119 .iter()
120 .zip_eq_fast(pks.into_iter())
121 .zip_eq_fast(values.into_iter())
122 .zip_eq_fast(vis.iter())
123 .filter_map(|(((op, k), v), vis)| vis.then_some((*op, k, v)))
124 .collect_vec();
125
126 self.handle_inner(row_ops, table).await
127 }
128
129 async fn handle_inner<S: StateStore, SD: ValueRowSerde>(
130 &mut self,
131 row_ops: Vec<(Op, Vec<u8>, Bytes)>,
132 table: &StateTableInner<S, SD>,
133 ) -> StreamExecutorResult<ChangeBuffer> {
134 let key_set: HashSet<Box<[u8]>> = row_ops
135 .iter()
136 .map(|(_, k, _)| k.as_slice().into())
137 .collect();
138
139 self.fetch_keys(key_set.iter().map(|v| v.deref()), table)
142 .await?;
143
144 let mut change_buffer = ChangeBuffer::new();
145 let row_serde = self.row_serde.clone();
146 let version_column_indices = self.version_column_indices.clone();
147 for (op, key, row) in row_ops {
148 macro_rules! get_expected {
150 () => {
151 self.lru_cache.get(&key).unwrap_or_else(|| {
152 panic!(
153 "the key {:?} has not been fetched in the materialize executor's cache",
154 key
155 )
156 })
157 };
158 }
159
160 match op {
161 Op::Insert | Op::UpdateInsert => {
162 let Some(old_row) = get_expected!() else {
163 let new_row_deserialized =
165 row_serde.deserializer.deserialize(row.clone())?;
166 change_buffer.insert(key.clone(), new_row_deserialized);
167 self.lru_cache.put(key, Some(CompactedRow { row }));
168 continue;
169 };
170
171 match self.conflict_behavior {
173 ConflictBehavior::Overwrite => {
174 let old_row_deserialized =
175 row_serde.deserializer.deserialize(old_row.row.clone())?;
176 let new_row_deserialized =
177 row_serde.deserializer.deserialize(row.clone())?;
178
179 let need_overwrite = if !version_column_indices.is_empty() {
180 versions_are_newer_or_equal(
181 &old_row_deserialized,
182 &new_row_deserialized,
183 &version_column_indices,
184 )
185 } else {
186 true
188 };
189
190 if need_overwrite {
191 if let Some(toastable_indices) = &self.toastable_column_indices {
192 let final_row = toast::handle_toast_columns_for_postgres_cdc(
194 &old_row_deserialized,
195 &new_row_deserialized,
196 toastable_indices,
197 );
198
199 change_buffer.update(
200 key.clone(),
201 old_row_deserialized,
202 final_row.clone(),
203 );
204 let final_row_bytes =
205 Bytes::from(row_serde.serializer.serialize(final_row));
206 self.lru_cache.put(
207 key.clone(),
208 Some(CompactedRow {
209 row: final_row_bytes,
210 }),
211 );
212 } else {
213 change_buffer.update(
215 key.clone(),
216 old_row_deserialized,
217 new_row_deserialized,
218 );
219 self.lru_cache
220 .put(key.clone(), Some(CompactedRow { row: row.clone() }));
221 }
222 };
223 }
224 ConflictBehavior::IgnoreConflict => {
225 }
227 ConflictBehavior::DoUpdateIfNotNull => {
228 let old_row_deserialized =
231 row_serde.deserializer.deserialize(old_row.row.clone())?;
232 let new_row_deserialized =
233 row_serde.deserializer.deserialize(row.clone())?;
234 let need_overwrite = if !version_column_indices.is_empty() {
235 versions_are_newer_or_equal(
236 &old_row_deserialized,
237 &new_row_deserialized,
238 &version_column_indices,
239 )
240 } else {
241 true
242 };
243
244 if need_overwrite {
245 let mut row_deserialized_vec =
246 old_row_deserialized.clone().into_inner().into_vec();
247 replace_if_not_null(
248 &mut row_deserialized_vec,
249 new_row_deserialized.clone(),
250 );
251 let mut updated_row = OwnedRow::new(row_deserialized_vec);
252
253 if let Some(toastable_indices) = &self.toastable_column_indices {
255 let old_row_deserialized_again =
258 row_serde.deserializer.deserialize(old_row.row.clone())?;
259 updated_row = toast::handle_toast_columns_for_postgres_cdc(
260 &old_row_deserialized_again,
261 &updated_row,
262 toastable_indices,
263 );
264 }
265
266 change_buffer.update(
267 key.clone(),
268 old_row_deserialized,
269 updated_row.clone(),
270 );
271 let updated_row_bytes =
272 Bytes::from(row_serde.serializer.serialize(updated_row));
273 self.lru_cache.put(
274 key.clone(),
275 Some(CompactedRow {
276 row: updated_row_bytes,
277 }),
278 );
279 }
280 }
281 ConflictBehavior::NoCheck => unreachable!(),
282 };
283 }
284
285 Op::UpdateDelete
286 if matches!(
287 self.conflict_behavior,
288 ConflictBehavior::Overwrite | ConflictBehavior::DoUpdateIfNotNull
289 ) =>
290 {
291 }
301
302 Op::Delete | Op::UpdateDelete => {
303 if let Some(old_row) = get_expected!() {
304 let old_row_deserialized =
305 row_serde.deserializer.deserialize(old_row.row.clone())?;
306 change_buffer.delete(key.clone(), old_row_deserialized);
307 self.lru_cache.put(key, None);
309 } else {
310 }
313 }
314 }
315 }
316 Ok(change_buffer)
317 }
318
319 async fn fetch_keys<'a, S: StateStore, SD: ValueRowSerde>(
320 &mut self,
321 keys: impl Iterator<Item = &'a [u8]>,
322 table: &StateTableInner<S, SD>,
323 ) -> StreamExecutorResult<()> {
324 let mut futures = vec![];
325 for key in keys {
326 self.metrics.materialize_cache_total_count.inc();
327
328 if self.lru_cache.contains(key) {
329 if self.lru_cache.get(key).unwrap().is_some() {
330 self.metrics.materialize_data_exist_count.inc();
331 }
332 self.metrics.materialize_cache_hit_count.inc();
333 continue;
334 }
335 futures.push(async {
336 let key_row = table.pk_serde().deserialize(key).unwrap();
337 let row = table.get_row(key_row).await?.map(CompactedRow::from);
338 StreamExecutorResult::Ok((key.to_vec(), row))
339 });
340 }
341
342 let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
343 while let Some(result) = buffered.next().await {
344 let (key, row) = result?;
345 if row.is_some() {
346 self.metrics.materialize_data_exist_count.inc();
347 }
348 match self.conflict_behavior {
350 checked_conflict_behaviors!() => self.lru_cache.put(key, row),
351 _ => unreachable!(),
352 };
353 }
354
355 Ok(())
356 }
357
358 pub fn evict(&mut self) {
360 self.lru_cache.evict()
361 }
362
363 pub fn clear(&mut self) {
365 self.lru_cache.clear()
366 }
367}
368
369fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
382 for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
383 if let Some(new_value) = new_col {
384 *old_col = Some(new_value);
385 }
386 }
387}
388
389fn versions_are_newer_or_equal(
392 old_row: &OwnedRow,
393 new_row: &OwnedRow,
394 version_column_indices: &[u32],
395) -> bool {
396 if version_column_indices.is_empty() {
397 return true;
399 }
400
401 for &idx in version_column_indices {
402 let old_value = old_row.datum_at(idx as usize);
403 let new_value = new_row.datum_at(idx as usize);
404
405 match cmp_datum(old_value, new_value, OrderType::ascending_nulls_first()) {
406 std::cmp::Ordering::Less => return true, std::cmp::Ordering::Greater => return false, std::cmp::Ordering::Equal => continue, }
410 }
411
412 true
414}
415
416mod toast {
418 use risingwave_common::row::Row as _;
419 use risingwave_common::types::DEBEZIUM_UNAVAILABLE_VALUE;
420
421 use super::*;
422
423 fn is_unavailable_value_str(s: &str) -> bool {
426 s.len() == DEBEZIUM_UNAVAILABLE_VALUE.len() && s == DEBEZIUM_UNAVAILABLE_VALUE
427 }
428
429 fn is_debezium_unavailable_value(
432 datum: &Option<risingwave_common::types::ScalarRefImpl<'_>>,
433 ) -> bool {
434 match datum {
435 Some(risingwave_common::types::ScalarRefImpl::Utf8(val)) => {
436 is_unavailable_value_str(val)
437 }
438 Some(risingwave_common::types::ScalarRefImpl::Jsonb(jsonb_ref)) => {
439 jsonb_ref
441 .as_str()
442 .map(is_unavailable_value_str)
443 .unwrap_or(false)
444 }
445 Some(risingwave_common::types::ScalarRefImpl::Bytea(bytea)) => {
446 if let Ok(bytea_str) = std::str::from_utf8(bytea) {
450 is_unavailable_value_str(bytea_str)
451 } else {
452 false
453 }
454 }
455 Some(risingwave_common::types::ScalarRefImpl::List(list_ref)) => {
456 if list_ref.len() == 1 {
460 if let Some(Some(element)) = list_ref.get(0) {
461 is_debezium_unavailable_value(&Some(element))
463 } else {
464 false
465 }
466 } else {
467 false
468 }
469 }
470 _ => false,
471 }
472 }
473
474 pub fn handle_toast_columns_for_postgres_cdc(
476 old_row: &OwnedRow,
477 new_row: &OwnedRow,
478 toastable_indices: &[usize],
479 ) -> OwnedRow {
480 let mut fixed_row_data = new_row.as_inner().to_vec();
481
482 for &toast_idx in toastable_indices {
483 let is_unavailable = is_debezium_unavailable_value(&new_row.datum_at(toast_idx));
485 if is_unavailable {
486 if let Some(old_datum_ref) = old_row.datum_at(toast_idx) {
488 fixed_row_data[toast_idx] = Some(old_datum_ref.into_scalar_impl());
489 }
490 }
491 }
492
493 OwnedRow::new(fixed_row_data)
494 }
495}