1use std::borrow::Cow;
16use std::collections::HashMap;
17use std::ops::Bound;
18
19use await_tree::InstrumentAwait;
20use futures::Stream;
21use futures::future::try_join_all;
22use futures_async_stream::try_stream;
23use risingwave_common::array::stream_record::Record;
24use risingwave_common::array::{Op, StreamChunk};
25use risingwave_common::bail;
26use risingwave_common::bitmap::BitmapBuilder;
27use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
28use risingwave_common::row::{OwnedRow, Row, RowExt};
29use risingwave_common::types::{DataType, Datum};
30use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
31use risingwave_common::util::epoch::EpochPair;
32use risingwave_common::util::iter_util::ZipEqDebug;
33use risingwave_common::util::sort_util::{OrderType, cmp_datum_iter};
34use risingwave_common::util::value_encoding::BasicSerde;
35use risingwave_common_rate_limit::RateLimit;
36use risingwave_connector::error::ConnectorError;
37use risingwave_connector::source::cdc::external::{CdcOffset, CdcOffsetParseFunc};
38use risingwave_storage::StateStore;
39use risingwave_storage::row_serde::value_serde::ValueRowSerde;
40use risingwave_storage::table::collect_data_chunk_with_builder;
41
42use crate::common::table::state_table::{ReplicatedStateTable, StateTableInner};
43use crate::executor::{
44 Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, Watermark,
45};
46
47pub const METADATA_STATE_LEN: usize = 3;
49
50#[derive(Clone, Debug)]
51pub struct BackfillState {
52 inner: HashMap<VirtualNode, BackfillStatePerVnode>,
55}
56
57impl BackfillState {
58 pub(crate) fn has_progress(&self) -> bool {
59 self.inner.values().any(|p| {
60 matches!(
61 p.current_state(),
62 &BackfillProgressPerVnode::InProgress { .. }
63 )
64 })
65 }
66
67 pub(crate) fn get_current_state(
68 &mut self,
69 vnode: &VirtualNode,
70 ) -> &mut BackfillProgressPerVnode {
71 &mut self.inner.get_mut(vnode).unwrap().current_state
72 }
73
74 pub(crate) fn get_progress(
76 &self,
77 vnode: &VirtualNode,
78 ) -> StreamExecutorResult<&BackfillProgressPerVnode> {
79 match self.inner.get(vnode) {
80 Some(p) => Ok(p.current_state()),
81 None => bail!(
82 "Backfill progress for vnode {:#?} not found, backfill_state not initialized properly",
83 vnode,
84 ),
85 }
86 }
87
88 pub(crate) fn update_progress(
89 &mut self,
90 vnode: VirtualNode,
91 new_pos: OwnedRow,
92 snapshot_row_count_delta: u64,
93 ) -> StreamExecutorResult<()> {
94 let state = self.get_current_state(&vnode);
95 match state {
96 BackfillProgressPerVnode::NotStarted => {
97 *state = BackfillProgressPerVnode::InProgress {
98 current_pos: new_pos,
99 snapshot_row_count: snapshot_row_count_delta,
100 };
101 }
102 BackfillProgressPerVnode::InProgress {
103 snapshot_row_count, ..
104 } => {
105 *state = BackfillProgressPerVnode::InProgress {
106 current_pos: new_pos,
107 snapshot_row_count: *snapshot_row_count + snapshot_row_count_delta,
108 };
109 }
110 BackfillProgressPerVnode::Completed { .. } => unreachable!(),
111 }
112 Ok(())
113 }
114
115 pub(crate) fn finish_progress(&mut self, vnode: VirtualNode, pos_len: usize) {
116 let finished_placeholder_position = construct_initial_finished_state(pos_len);
117 let current_state = self.get_current_state(&vnode);
118 let (new_pos, snapshot_row_count) = match current_state {
119 BackfillProgressPerVnode::NotStarted => (finished_placeholder_position, 0),
120 BackfillProgressPerVnode::InProgress {
121 current_pos,
122 snapshot_row_count,
123 } => (current_pos.clone(), *snapshot_row_count),
124 BackfillProgressPerVnode::Completed { .. } => {
125 return;
126 }
127 };
128 *current_state = BackfillProgressPerVnode::Completed {
129 current_pos: new_pos,
130 snapshot_row_count,
131 };
132 }
133
134 fn get_commit_state(&self, vnode: &VirtualNode) -> Option<(Option<Vec<Datum>>, Vec<Datum>)> {
136 let new_state = self.inner.get(vnode).unwrap().current_state().clone();
137 let new_encoded_state = match new_state {
138 BackfillProgressPerVnode::NotStarted => unreachable!(),
139 BackfillProgressPerVnode::InProgress {
140 current_pos,
141 snapshot_row_count,
142 } => {
143 let mut encoded_state = vec![None; current_pos.len() + METADATA_STATE_LEN];
144 encoded_state[0] = Some(vnode.to_scalar().into());
145 encoded_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
146 encoded_state[current_pos.len() + 1] = Some(false.into());
147 encoded_state[current_pos.len() + 2] = Some((snapshot_row_count as i64).into());
148 encoded_state
149 }
150 BackfillProgressPerVnode::Completed {
151 current_pos,
152 snapshot_row_count,
153 } => {
154 let mut encoded_state = vec![None; current_pos.len() + METADATA_STATE_LEN];
155 encoded_state[0] = Some(vnode.to_scalar().into());
156 encoded_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
157 encoded_state[current_pos.len() + 1] = Some(true.into());
158 encoded_state[current_pos.len() + 2] = Some((snapshot_row_count as i64).into());
159 encoded_state
160 }
161 };
162 let old_state = self.inner.get(vnode).unwrap().committed_state().clone();
163 let old_encoded_state = match old_state {
164 BackfillProgressPerVnode::NotStarted => None,
165 BackfillProgressPerVnode::InProgress {
166 current_pos,
167 snapshot_row_count,
168 } => {
169 let committed_pos = current_pos;
170 let mut encoded_state = vec![None; committed_pos.len() + METADATA_STATE_LEN];
171 encoded_state[0] = Some(vnode.to_scalar().into());
172 encoded_state[1..committed_pos.len() + 1]
173 .clone_from_slice(committed_pos.as_inner());
174 encoded_state[committed_pos.len() + 1] = Some(false.into());
175 encoded_state[committed_pos.len() + 2] = Some((snapshot_row_count as i64).into());
176 Some(encoded_state)
177 }
178 BackfillProgressPerVnode::Completed {
179 current_pos,
180 snapshot_row_count,
181 } => {
182 let committed_pos = current_pos;
183 let mut encoded_state = vec![None; committed_pos.len() + METADATA_STATE_LEN];
184 encoded_state[0] = Some(vnode.to_scalar().into());
185 encoded_state[1..committed_pos.len() + 1]
186 .clone_from_slice(committed_pos.as_inner());
187 encoded_state[committed_pos.len() + 1] = Some(true.into());
188 encoded_state[committed_pos.len() + 2] = Some((snapshot_row_count as i64).into());
189 Some(encoded_state)
190 }
191 };
192 Some((old_encoded_state, new_encoded_state))
193 }
194
195 fn need_commit(&self, vnode: &VirtualNode) -> bool {
198 let state = self.inner.get(vnode).unwrap();
199 match state.current_state() {
200 s @ BackfillProgressPerVnode::InProgress { .. }
202 | s @ BackfillProgressPerVnode::Completed { .. } => s != state.committed_state(),
203 BackfillProgressPerVnode::NotStarted => false,
204 }
205 }
206
207 fn mark_committed(&mut self, vnode: VirtualNode) {
208 let BackfillStatePerVnode {
209 committed_state,
210 current_state,
211 } = self.inner.get_mut(&vnode).unwrap();
212
213 assert!(matches!(
214 current_state,
215 BackfillProgressPerVnode::InProgress { .. }
216 | BackfillProgressPerVnode::Completed { .. }
217 ));
218 *committed_state = current_state.clone();
219 }
220
221 pub(crate) fn get_snapshot_row_count(&self) -> u64 {
222 self.inner
223 .values()
224 .map(|p| p.get_snapshot_row_count())
225 .sum()
226 }
227}
228
229#[derive(Clone, Debug, PartialEq, Eq)]
230pub struct BackfillStatePerVnode {
231 committed_state: BackfillProgressPerVnode,
232 current_state: BackfillProgressPerVnode,
233}
234
235impl BackfillStatePerVnode {
236 pub(crate) fn new(
237 committed_state: BackfillProgressPerVnode,
238 current_state: BackfillProgressPerVnode,
239 ) -> Self {
240 Self {
241 committed_state,
242 current_state,
243 }
244 }
245
246 pub(crate) fn committed_state(&self) -> &BackfillProgressPerVnode {
247 &self.committed_state
248 }
249
250 pub(crate) fn current_state(&self) -> &BackfillProgressPerVnode {
251 &self.current_state
252 }
253
254 pub(crate) fn get_snapshot_row_count(&self) -> u64 {
255 self.current_state().get_snapshot_row_count()
256 }
257}
258
259impl From<Vec<(VirtualNode, BackfillStatePerVnode)>> for BackfillState {
260 fn from(v: Vec<(VirtualNode, BackfillStatePerVnode)>) -> Self {
261 Self {
262 inner: v.into_iter().collect(),
263 }
264 }
265}
266
267#[derive(Clone, Eq, PartialEq, Debug)]
270pub enum BackfillProgressPerVnode {
271 NotStarted,
273 InProgress {
274 current_pos: OwnedRow,
276 snapshot_row_count: u64,
278 },
279 Completed {
280 current_pos: OwnedRow,
282 snapshot_row_count: u64,
284 },
285}
286
287impl BackfillProgressPerVnode {
288 fn get_snapshot_row_count(&self) -> u64 {
289 match self {
290 BackfillProgressPerVnode::NotStarted => 0,
291 BackfillProgressPerVnode::InProgress {
292 snapshot_row_count, ..
293 }
294 | BackfillProgressPerVnode::Completed {
295 snapshot_row_count, ..
296 } => *snapshot_row_count,
297 }
298 }
299}
300
301pub(crate) fn mark_chunk(
302 chunk: StreamChunk,
303 current_pos: &OwnedRow,
304 pk_in_output_indices: PkIndicesRef<'_>,
305 pk_order: &[OrderType],
306) -> StreamChunk {
307 let chunk = chunk.compact();
308 mark_chunk_inner(chunk, current_pos, pk_in_output_indices, pk_order)
309}
310
311pub(crate) fn mark_cdc_chunk(
312 offset_parse_func: &CdcOffsetParseFunc,
313 chunk: StreamChunk,
314 current_pos: &OwnedRow,
315 pk_in_output_indices: PkIndicesRef<'_>,
316 pk_order: &[OrderType],
317 last_cdc_offset: Option<CdcOffset>,
318) -> StreamExecutorResult<StreamChunk> {
319 let chunk = chunk.compact();
320 mark_cdc_chunk_inner(
321 offset_parse_func,
322 chunk,
323 current_pos,
324 last_cdc_offset,
325 pk_in_output_indices,
326 pk_order,
327 )
328}
329
330pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>(
335 chunk: &StreamChunk,
336 backfill_state: &BackfillState,
337 pk_in_output_indices: PkIndicesRef<'_>,
338 upstream_table: &ReplicatedStateTable<S, SD>,
339 pk_order: &[OrderType],
340) -> StreamExecutorResult<StreamChunk> {
341 let chunk = chunk.clone();
342 let (data, ops) = chunk.into_parts();
343 let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
344
345 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
346 let mut unmatched_update_delete = false;
347 let mut visible_update_delete = false;
348 for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
349 let pk = row.project(pk_in_output_indices);
350 let vnode = upstream_table.compute_vnode_by_pk(pk);
351 let visible = match backfill_state.get_progress(&vnode)? {
352 BackfillProgressPerVnode::Completed { .. } => true,
354 BackfillProgressPerVnode::NotStarted => false,
356 BackfillProgressPerVnode::InProgress { current_pos, .. } => {
358 cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()).is_le()
359 }
360 };
361 if !visible {
362 tracing::trace!(
363 source = "upstream",
364 state = "process_barrier",
365 action = "mark_chunk",
366 ?vnode,
367 ?op,
368 ?pk,
369 ?row,
370 "update_filtered",
371 );
372 }
373 new_visibility.append(visible);
374
375 normalize_unmatched_updates(
376 &mut new_ops,
377 &mut unmatched_update_delete,
378 &mut visible_update_delete,
379 visible,
380 i,
381 op,
382 );
383 }
384 let (columns, _) = data.into_parts();
385 let chunk = StreamChunk::with_visibility(new_ops, columns, new_visibility.finish());
386 Ok(chunk)
387}
388
389fn mark_chunk_inner(
393 chunk: StreamChunk,
394 current_pos: &OwnedRow,
395 pk_in_output_indices: PkIndicesRef<'_>,
396 pk_order: &[OrderType],
397) -> StreamChunk {
398 let (data, ops) = chunk.into_parts();
399 let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
400 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
401 let mut unmatched_update_delete = false;
402 let mut visible_update_delete = false;
403 for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
404 let lhs = row.project(pk_in_output_indices);
405 let rhs = current_pos;
406 let visible = cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()).is_le();
407 new_visibility.append(visible);
408
409 normalize_unmatched_updates(
410 &mut new_ops,
411 &mut unmatched_update_delete,
412 &mut visible_update_delete,
413 visible,
414 i,
415 op,
416 );
417 }
418 let (columns, _) = data.into_parts();
419 StreamChunk::with_visibility(new_ops, columns, new_visibility.finish())
420}
421
422fn normalize_unmatched_updates(
430 normalized_ops: &mut Cow<'_, [Op]>,
431 unmatched_update_delete: &mut bool,
432 visible_update_delete: &mut bool,
433 current_visibility: bool,
434 current_op_index: usize,
435 current_op: &Op,
436) {
437 if *unmatched_update_delete {
438 assert_eq!(*current_op, Op::UpdateInsert);
439 let visible_update_insert = current_visibility;
440 match (visible_update_delete, visible_update_insert) {
441 (true, false) => {
442 let ops = normalized_ops.to_mut();
444 ops[current_op_index - 1] = Op::Delete;
445 }
446 (false, true) => {
447 let ops = normalized_ops.to_mut();
449 ops[current_op_index] = Op::Insert;
450 }
451 (true, true) | (false, false) => {}
452 }
453 *unmatched_update_delete = false;
454 } else {
455 match current_op {
456 Op::UpdateDelete => {
457 *unmatched_update_delete = true;
458 *visible_update_delete = current_visibility;
459 }
460 Op::UpdateInsert => {
461 unreachable!("UpdateInsert should not be present without UpdateDelete")
462 }
463 _ => {}
464 }
465 }
466}
467
468fn mark_cdc_chunk_inner(
469 offset_parse_func: &CdcOffsetParseFunc,
470 chunk: StreamChunk,
471 current_pos: &OwnedRow,
472 last_cdc_offset: Option<CdcOffset>,
473 pk_in_output_indices: PkIndicesRef<'_>,
474 pk_order: &[OrderType],
475) -> StreamExecutorResult<StreamChunk> {
476 let (data, ops) = chunk.into_parts();
477 let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
478
479 let offset_col_idx = data.dimension() - 1;
481 for v in data.rows().map(|row| {
482 let offset_datum = row.datum_at(offset_col_idx).unwrap();
483 let event_offset = (*offset_parse_func)(offset_datum.into_utf8())?;
484 let visible = {
485 let in_binlog_range = if let Some(binlog_low) = &last_cdc_offset {
487 binlog_low <= &event_offset
488 } else {
489 true
490 };
491
492 if in_binlog_range {
493 let lhs = row.project(pk_in_output_indices);
494 let rhs = current_pos;
495 cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()).is_le()
496 } else {
497 false
498 }
499 };
500 Ok::<_, ConnectorError>(visible)
501 }) {
502 new_visibility.append(v?);
503 }
504
505 let (columns, _) = data.into_parts();
506 Ok(StreamChunk::with_visibility(
507 ops,
508 columns,
509 new_visibility.finish(),
510 ))
511}
512
513pub(crate) fn mapping_chunk(chunk: StreamChunk, output_indices: &[usize]) -> StreamChunk {
515 let (ops, columns, visibility) = chunk.into_inner();
516 let mapped_columns = output_indices.iter().map(|&i| columns[i].clone()).collect();
517 StreamChunk::with_visibility(ops, mapped_columns, visibility)
518}
519
520fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
521 watermark.transform_with_indices(upstream_indices)
522}
523
524pub(crate) fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Option<Message> {
525 match msg {
526 Message::Barrier(_) => Some(msg),
527 Message::Watermark(watermark) => {
528 mapping_watermark(watermark, upstream_indices).map(Message::Watermark)
529 }
530 Message::Chunk(chunk) => Some(Message::Chunk(mapping_chunk(chunk, upstream_indices))),
531 }
532}
533
534pub(crate) async fn get_progress_per_vnode<S: StateStore, const IS_REPLICATED: bool>(
537 state_table: &StateTableInner<S, BasicSerde, IS_REPLICATED>,
538) -> StreamExecutorResult<Vec<(VirtualNode, BackfillStatePerVnode)>> {
539 debug_assert!(!state_table.vnodes().is_empty());
540 let vnodes = state_table.vnodes().iter_vnodes();
541 let mut result = Vec::with_capacity(state_table.vnodes().len());
542 let vnode_keys = vnodes.map(|vnode| {
544 let datum: [Datum; 1] = [Some(vnode.to_scalar().into())];
545 datum
546 });
547 let tasks = vnode_keys.map(|vnode_key| state_table.get_row(vnode_key));
548 let state_for_vnodes = try_join_all(tasks).await?;
552 for (vnode, state_for_vnode) in state_table
553 .vnodes()
554 .iter_vnodes()
555 .zip_eq_debug(state_for_vnodes)
556 {
557 let backfill_progress = match state_for_vnode {
558 Some(row) => {
560 let snapshot_row_count = row.as_inner().get(row.len() - 1).unwrap();
563 let snapshot_row_count = (*snapshot_row_count.as_ref().unwrap().as_int64()) as u64;
564
565 let vnode_is_finished = row.as_inner().get(row.len() - 2).unwrap();
568 let vnode_is_finished = vnode_is_finished.as_ref().unwrap();
569
570 let current_pos = row.as_inner().get(..row.len() - 2).unwrap();
572 let current_pos = current_pos.into_owned_row();
573
574 if *vnode_is_finished.as_bool() {
576 BackfillStatePerVnode::new(
577 BackfillProgressPerVnode::Completed {
578 current_pos: current_pos.clone(),
579 snapshot_row_count,
580 },
581 BackfillProgressPerVnode::Completed {
582 current_pos,
583 snapshot_row_count,
584 },
585 )
586 } else {
587 BackfillStatePerVnode::new(
588 BackfillProgressPerVnode::InProgress {
589 current_pos: current_pos.clone(),
590 snapshot_row_count,
591 },
592 BackfillProgressPerVnode::InProgress {
593 current_pos,
594 snapshot_row_count,
595 },
596 )
597 }
598 }
599 None => BackfillStatePerVnode::new(
601 BackfillProgressPerVnode::NotStarted,
602 BackfillProgressPerVnode::NotStarted,
603 ),
604 };
605 result.push((vnode, backfill_progress));
606 }
607 assert_eq!(result.len(), state_table.vnodes().count_ones());
608 Ok(result)
609}
610
611pub(crate) async fn flush_data<S: StateStore, const IS_REPLICATED: bool>(
613 table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
614 epoch: EpochPair,
615 old_state: &mut Option<Vec<Datum>>,
616 current_partial_state: &mut [Datum],
617) -> StreamExecutorResult<()> {
618 let vnodes = table.vnodes().clone();
619 if let Some(old_state) = old_state {
620 if old_state[1..] != current_partial_state[1..] {
621 vnodes.iter_vnodes_scalar().for_each(|vnode| {
622 let datum = Some(vnode.into());
623 current_partial_state[0].clone_from(&datum);
624 old_state[0] = datum;
625 table.write_record(Record::Update {
626 old_row: &old_state[..],
627 new_row: &(*current_partial_state),
628 })
629 });
630 }
631 } else {
632 vnodes.iter_vnodes_scalar().for_each(|vnode| {
634 let datum = Some(vnode.into());
635 current_partial_state[0] = datum;
637 table.write_record(Record::Insert {
638 new_row: &(*current_partial_state),
639 })
640 });
641 }
642 table.commit_assert_no_update_vnode_bitmap(epoch).await
643}
644
645pub(crate) fn build_temporary_state(
652 row_state: &mut [Datum],
653 is_finished: bool,
654 current_pos: &OwnedRow,
655 row_count: u64,
656) {
657 row_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
658 row_state[current_pos.len() + 1] = Some(is_finished.into());
659 row_state[current_pos.len() + 2] = Some((row_count as i64).into());
660}
661
662pub(crate) fn update_pos_by_vnode(
664 vnode: VirtualNode,
665 chunk: &StreamChunk,
666 pk_in_output_indices: &[usize],
667 backfill_state: &mut BackfillState,
668 snapshot_row_count_delta: u64,
669) -> StreamExecutorResult<()> {
670 let new_pos = get_new_pos(chunk, pk_in_output_indices);
671 assert_eq!(new_pos.len(), pk_in_output_indices.len());
672 backfill_state.update_progress(vnode, new_pos, snapshot_row_count_delta)?;
673 Ok(())
674}
675
676pub(crate) fn get_new_pos(chunk: &StreamChunk, pk_in_output_indices: &[usize]) -> OwnedRow {
679 chunk
680 .rows()
681 .last()
682 .unwrap()
683 .1
684 .project(pk_in_output_indices)
685 .into_owned_row()
686}
687
688pub(crate) fn get_cdc_chunk_last_offset(
689 offset_parse_func: &CdcOffsetParseFunc,
690 chunk: &StreamChunk,
691) -> StreamExecutorResult<Option<CdcOffset>> {
692 let row = chunk.rows().last().unwrap().1;
693 let offset_col = row.iter().last().unwrap();
694 let output =
695 offset_col.map(|scalar| Ok::<_, ConnectorError>((*offset_parse_func)(scalar.into_utf8()))?);
696 output.transpose().map_err(|e| e.into())
697}
698
699pub(crate) fn construct_initial_finished_state(pos_len: usize) -> OwnedRow {
705 OwnedRow::new(vec![None; pos_len])
706}
707
708pub(crate) fn compute_bounds(
709 pk_indices: &[usize],
710 current_pos: Option<OwnedRow>,
711) -> Option<(Bound<OwnedRow>, Bound<OwnedRow>)> {
712 if let Some(current_pos) = current_pos {
715 if current_pos.is_empty() {
719 assert!(pk_indices.is_empty());
720 return None;
721 }
722
723 Some((Bound::Excluded(current_pos), Bound::Unbounded))
724 } else {
725 Some((Bound::Unbounded, Bound::Unbounded))
726 }
727}
728
729#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
730pub(crate) async fn iter_chunks<'a, S, E, R>(mut iter: S, builder: &'a mut DataChunkBuilder)
731where
732 StreamExecutorError: From<E>,
733 R: Row,
734 S: Stream<Item = Result<R, E>> + Unpin + 'a,
735{
736 while let Some(data_chunk) = collect_data_chunk_with_builder(&mut iter, builder)
737 .instrument_await("backfill_snapshot_read")
738 .await?
739 {
740 debug_assert!(data_chunk.cardinality() > 0);
741 let ops = vec![Op::Insert; data_chunk.capacity()];
742 let stream_chunk = StreamChunk::from_parts(ops, data_chunk);
743 yield stream_chunk;
744 }
745}
746
747pub(crate) async fn persist_state_per_vnode<S: StateStore, const IS_REPLICATED: bool>(
771 epoch: EpochPair,
772 table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
773 backfill_state: &mut BackfillState,
774 #[cfg(debug_assertions)] state_len: usize,
775 vnodes: impl Iterator<Item = VirtualNode>,
776) -> StreamExecutorResult<()> {
777 for vnode in vnodes {
778 if !backfill_state.need_commit(&vnode) {
779 continue;
780 }
781 let (encoded_prev_state, encoded_current_state) =
782 match backfill_state.get_commit_state(&vnode) {
783 Some((old_state, new_state)) => (old_state, new_state),
784 None => continue,
785 };
786 if let Some(encoded_prev_state) = encoded_prev_state {
787 #[cfg(debug_assertions)]
789 {
790 let pk: &[Datum; 1] = &[Some(vnode.to_scalar().into())];
791 let old_row = table.get_row(pk).await?;
793 match old_row {
794 Some(old_row) => {
795 let inner = old_row.as_inner();
796 assert_eq!(inner, &encoded_prev_state[1..]);
798 assert_ne!(inner, &encoded_current_state[1..]);
799 assert_eq!(old_row.len(), state_len - 1);
800 assert_eq!(encoded_current_state.len(), state_len);
801 }
802 None => {
803 bail!("row {:#?} not found", pk);
804 }
805 }
806 }
807 table.write_record(Record::Update {
808 old_row: &encoded_prev_state[..],
809 new_row: &encoded_current_state[..],
810 });
811 } else {
812 #[cfg(debug_assertions)]
814 {
815 let pk: &[Datum; 1] = &[Some(vnode.to_scalar().into())];
816 let row = table.get_row(pk).await?;
817 assert!(row.is_none(), "row {:#?}", row);
818 assert_eq!(encoded_current_state.len(), state_len);
819 }
820 table.write_record(Record::Insert {
821 new_row: &encoded_current_state[..],
822 });
823 }
824 backfill_state.mark_committed(vnode);
825 }
826
827 table.commit_assert_no_update_vnode_bitmap(epoch).await?;
828 Ok(())
829}
830
831pub(crate) async fn persist_state<S: StateStore, const IS_REPLICATED: bool>(
837 epoch: EpochPair,
838 table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
839 is_finished: bool,
840 current_pos: &Option<OwnedRow>,
841 row_count: u64,
842 old_state: &mut Option<Vec<Datum>>,
843 current_state: &mut [Datum],
844) -> StreamExecutorResult<()> {
845 if let Some(current_pos_inner) = current_pos {
846 build_temporary_state(current_state, is_finished, current_pos_inner, row_count);
848 flush_data(table, epoch, old_state, current_state).await?;
849 *old_state = Some(current_state.into());
850 } else {
851 table.commit_assert_no_update_vnode_bitmap(epoch).await?;
852 }
853 Ok(())
854}
855
856pub fn create_builder(
860 rate_limit: RateLimit,
861 chunk_size: usize,
862 data_types: Vec<DataType>,
863) -> DataChunkBuilder {
864 let batch_size = match rate_limit {
865 RateLimit::Disabled | RateLimit::Pause => chunk_size,
866 RateLimit::Fixed(limit) if limit.get() as usize >= chunk_size => chunk_size,
867 RateLimit::Fixed(limit) => limit.get() as usize,
868 };
869 DataChunkBuilder::new(data_types, batch_size)
870}
871
872#[cfg(test)]
873mod tests {
874 use std::sync::Arc;
875
876 use super::*;
877
878 #[test]
879 fn test_normalizing_unmatched_updates() {
880 let ops = vec![
881 Op::UpdateDelete,
882 Op::UpdateInsert,
883 Op::UpdateDelete,
884 Op::UpdateInsert,
885 ];
886 let ops: Arc<[Op]> = ops.into();
887
888 {
889 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
890 let mut unmatched_update_delete = true;
891 let mut visible_update_delete = true;
892 let current_visibility = true;
893 normalize_unmatched_updates(
894 &mut new_ops,
895 &mut unmatched_update_delete,
896 &mut visible_update_delete,
897 current_visibility,
898 1,
899 &Op::UpdateInsert,
900 );
901 assert_eq!(
902 &new_ops[..],
903 vec![
904 Op::UpdateDelete,
905 Op::UpdateInsert,
906 Op::UpdateDelete,
907 Op::UpdateInsert
908 ]
909 );
910 }
911 {
912 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
913 let mut unmatched_update_delete = true;
914 let mut visible_update_delete = false;
915 let current_visibility = false;
916 normalize_unmatched_updates(
917 &mut new_ops,
918 &mut unmatched_update_delete,
919 &mut visible_update_delete,
920 current_visibility,
921 1,
922 &Op::UpdateInsert,
923 );
924 assert_eq!(
925 &new_ops[..],
926 vec![
927 Op::UpdateDelete,
928 Op::UpdateInsert,
929 Op::UpdateDelete,
930 Op::UpdateInsert
931 ]
932 );
933 }
934 {
935 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
936 let mut unmatched_update_delete = true;
937 let mut visible_update_delete = true;
938 let current_visibility = false;
939 normalize_unmatched_updates(
940 &mut new_ops,
941 &mut unmatched_update_delete,
942 &mut visible_update_delete,
943 current_visibility,
944 1,
945 &Op::UpdateInsert,
946 );
947 assert_eq!(
948 &new_ops[..],
949 vec![
950 Op::Delete,
951 Op::UpdateInsert,
952 Op::UpdateDelete,
953 Op::UpdateInsert
954 ]
955 );
956 }
957 {
958 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
959 let mut unmatched_update_delete = true;
960 let mut visible_update_delete = false;
961 let current_visibility = true;
962 normalize_unmatched_updates(
963 &mut new_ops,
964 &mut unmatched_update_delete,
965 &mut visible_update_delete,
966 current_visibility,
967 1,
968 &Op::UpdateInsert,
969 );
970 assert_eq!(
971 &new_ops[..],
972 vec![
973 Op::UpdateDelete,
974 Op::Insert,
975 Op::UpdateDelete,
976 Op::UpdateInsert
977 ]
978 );
979 }
980 }
981}