1use std::borrow::Cow;
16use std::cmp::{max, min};
17use std::collections::HashMap;
18use std::ops::Bound;
19
20use await_tree::InstrumentAwait;
21use futures::Stream;
22use futures::future::try_join_all;
23use futures_async_stream::try_stream;
24use risingwave_common::array::stream_record::Record;
25use risingwave_common::array::{DataChunk, Op, StreamChunk};
26use risingwave_common::bail;
27use risingwave_common::bitmap::BitmapBuilder;
28use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
29use risingwave_common::row::{OwnedRow, Row, RowExt};
30use risingwave_common::types::{DataType, Datum};
31use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
32use risingwave_common::util::epoch::EpochPair;
33use risingwave_common::util::iter_util::ZipEqDebug;
34use risingwave_common::util::sort_util::{OrderType, cmp_datum_iter};
35use risingwave_common::util::value_encoding::BasicSerde;
36use risingwave_common_rate_limit::RateLimit;
37use risingwave_connector::error::ConnectorError;
38use risingwave_connector::source::cdc::external::{CdcOffset, CdcOffsetParseFunc};
39use risingwave_storage::StateStore;
40use risingwave_storage::row_serde::value_serde::ValueRowSerde;
41use risingwave_storage::table::collect_data_chunk_with_builder;
42
43use crate::common::table::state_table::{ReplicatedStateTable, StateTableInner};
44use crate::executor::{Message, StreamExecutorError, StreamExecutorResult, Watermark};
45
46pub const METADATA_STATE_LEN: usize = 3;
48
49#[derive(Clone, Debug)]
50pub struct BackfillState {
51 inner: HashMap<VirtualNode, BackfillStatePerVnode>,
54}
55
56impl BackfillState {
57 pub(crate) fn has_progress(&self) -> bool {
58 self.inner.values().any(|p| {
59 matches!(
60 p.current_state(),
61 &BackfillProgressPerVnode::InProgress { .. }
62 )
63 })
64 }
65
66 pub(crate) fn get_current_state(
67 &mut self,
68 vnode: &VirtualNode,
69 ) -> &mut BackfillProgressPerVnode {
70 &mut self.inner.get_mut(vnode).unwrap().current_state
71 }
72
73 pub(crate) fn get_progress(
75 &self,
76 vnode: &VirtualNode,
77 ) -> StreamExecutorResult<&BackfillProgressPerVnode> {
78 match self.inner.get(vnode) {
79 Some(p) => Ok(p.current_state()),
80 None => bail!(
81 "Backfill progress for vnode {:#?} not found, backfill_state not initialized properly",
82 vnode,
83 ),
84 }
85 }
86
87 pub(crate) fn update_progress(
88 &mut self,
89 vnode: VirtualNode,
90 new_pos: OwnedRow,
91 snapshot_row_count_delta: u64,
92 ) -> StreamExecutorResult<()> {
93 let state = self.get_current_state(&vnode);
94 match state {
95 BackfillProgressPerVnode::NotStarted => {
96 *state = BackfillProgressPerVnode::InProgress {
97 current_pos: new_pos,
98 snapshot_row_count: snapshot_row_count_delta,
99 };
100 }
101 BackfillProgressPerVnode::InProgress {
102 snapshot_row_count, ..
103 } => {
104 *state = BackfillProgressPerVnode::InProgress {
105 current_pos: new_pos,
106 snapshot_row_count: *snapshot_row_count + snapshot_row_count_delta,
107 };
108 }
109 BackfillProgressPerVnode::Completed { .. } => unreachable!(),
110 }
111 Ok(())
112 }
113
114 pub(crate) fn finish_progress(&mut self, vnode: VirtualNode, pos_len: usize) {
115 let finished_placeholder_position = construct_initial_finished_state(pos_len);
116 let current_state = self.get_current_state(&vnode);
117 let (new_pos, snapshot_row_count) = match current_state {
118 BackfillProgressPerVnode::NotStarted => (finished_placeholder_position, 0),
119 BackfillProgressPerVnode::InProgress {
120 current_pos,
121 snapshot_row_count,
122 } => (current_pos.clone(), *snapshot_row_count),
123 BackfillProgressPerVnode::Completed { .. } => {
124 return;
125 }
126 };
127 *current_state = BackfillProgressPerVnode::Completed {
128 current_pos: new_pos,
129 snapshot_row_count,
130 };
131 }
132
133 fn get_commit_state(&self, vnode: &VirtualNode) -> Option<(Option<Vec<Datum>>, Vec<Datum>)> {
135 let new_state = self.inner.get(vnode).unwrap().current_state().clone();
136 let new_encoded_state = match new_state {
137 BackfillProgressPerVnode::NotStarted => unreachable!(),
138 BackfillProgressPerVnode::InProgress {
139 current_pos,
140 snapshot_row_count,
141 } => {
142 let mut encoded_state = vec![None; current_pos.len() + METADATA_STATE_LEN];
143 encoded_state[0] = Some(vnode.to_scalar().into());
144 encoded_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
145 encoded_state[current_pos.len() + 1] = Some(false.into());
146 encoded_state[current_pos.len() + 2] = Some((snapshot_row_count as i64).into());
147 encoded_state
148 }
149 BackfillProgressPerVnode::Completed {
150 current_pos,
151 snapshot_row_count,
152 } => {
153 let mut encoded_state = vec![None; current_pos.len() + METADATA_STATE_LEN];
154 encoded_state[0] = Some(vnode.to_scalar().into());
155 encoded_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
156 encoded_state[current_pos.len() + 1] = Some(true.into());
157 encoded_state[current_pos.len() + 2] = Some((snapshot_row_count as i64).into());
158 encoded_state
159 }
160 };
161 let old_state = self.inner.get(vnode).unwrap().committed_state().clone();
162 let old_encoded_state = match old_state {
163 BackfillProgressPerVnode::NotStarted => None,
164 BackfillProgressPerVnode::InProgress {
165 current_pos,
166 snapshot_row_count,
167 } => {
168 let committed_pos = current_pos;
169 let mut encoded_state = vec![None; committed_pos.len() + METADATA_STATE_LEN];
170 encoded_state[0] = Some(vnode.to_scalar().into());
171 encoded_state[1..committed_pos.len() + 1]
172 .clone_from_slice(committed_pos.as_inner());
173 encoded_state[committed_pos.len() + 1] = Some(false.into());
174 encoded_state[committed_pos.len() + 2] = Some((snapshot_row_count as i64).into());
175 Some(encoded_state)
176 }
177 BackfillProgressPerVnode::Completed {
178 current_pos,
179 snapshot_row_count,
180 } => {
181 let committed_pos = current_pos;
182 let mut encoded_state = vec![None; committed_pos.len() + METADATA_STATE_LEN];
183 encoded_state[0] = Some(vnode.to_scalar().into());
184 encoded_state[1..committed_pos.len() + 1]
185 .clone_from_slice(committed_pos.as_inner());
186 encoded_state[committed_pos.len() + 1] = Some(true.into());
187 encoded_state[committed_pos.len() + 2] = Some((snapshot_row_count as i64).into());
188 Some(encoded_state)
189 }
190 };
191 Some((old_encoded_state, new_encoded_state))
192 }
193
194 fn need_commit(&self, vnode: &VirtualNode) -> bool {
197 let state = self.inner.get(vnode).unwrap();
198 match state.current_state() {
199 s @ BackfillProgressPerVnode::InProgress { .. }
201 | s @ BackfillProgressPerVnode::Completed { .. } => s != state.committed_state(),
202 BackfillProgressPerVnode::NotStarted => false,
203 }
204 }
205
206 fn mark_committed(&mut self, vnode: VirtualNode) {
207 let BackfillStatePerVnode {
208 committed_state,
209 current_state,
210 } = self.inner.get_mut(&vnode).unwrap();
211
212 assert!(matches!(
213 current_state,
214 BackfillProgressPerVnode::InProgress { .. }
215 | BackfillProgressPerVnode::Completed { .. }
216 ));
217 *committed_state = current_state.clone();
218 }
219
220 pub(crate) fn get_snapshot_row_count(&self) -> u64 {
221 self.inner
222 .values()
223 .map(|p| p.get_snapshot_row_count())
224 .sum()
225 }
226}
227
228#[derive(Clone, Debug, PartialEq, Eq)]
229pub struct BackfillStatePerVnode {
230 committed_state: BackfillProgressPerVnode,
231 current_state: BackfillProgressPerVnode,
232}
233
234impl BackfillStatePerVnode {
235 pub(crate) fn new(
236 committed_state: BackfillProgressPerVnode,
237 current_state: BackfillProgressPerVnode,
238 ) -> Self {
239 Self {
240 committed_state,
241 current_state,
242 }
243 }
244
245 pub(crate) fn committed_state(&self) -> &BackfillProgressPerVnode {
246 &self.committed_state
247 }
248
249 pub(crate) fn current_state(&self) -> &BackfillProgressPerVnode {
250 &self.current_state
251 }
252
253 pub(crate) fn get_snapshot_row_count(&self) -> u64 {
254 self.current_state().get_snapshot_row_count()
255 }
256}
257
258impl From<Vec<(VirtualNode, BackfillStatePerVnode)>> for BackfillState {
259 fn from(v: Vec<(VirtualNode, BackfillStatePerVnode)>) -> Self {
260 Self {
261 inner: v.into_iter().collect(),
262 }
263 }
264}
265
266#[derive(Clone, Eq, PartialEq, Debug)]
269pub enum BackfillProgressPerVnode {
270 NotStarted,
272 InProgress {
273 current_pos: OwnedRow,
275 snapshot_row_count: u64,
277 },
278 Completed {
279 current_pos: OwnedRow,
281 snapshot_row_count: u64,
283 },
284}
285
286impl BackfillProgressPerVnode {
287 fn get_snapshot_row_count(&self) -> u64 {
288 match self {
289 BackfillProgressPerVnode::NotStarted => 0,
290 BackfillProgressPerVnode::InProgress {
291 snapshot_row_count, ..
292 }
293 | BackfillProgressPerVnode::Completed {
294 snapshot_row_count, ..
295 } => *snapshot_row_count,
296 }
297 }
298}
299
300pub(crate) fn mark_chunk(
301 chunk: StreamChunk,
302 current_pos: &OwnedRow,
303 pk_in_output_indices: &[usize],
304 pk_order: &[OrderType],
305) -> StreamChunk {
306 let chunk = chunk.compact_vis();
307 mark_chunk_inner(chunk, current_pos, pk_in_output_indices, pk_order)
308}
309
310pub(crate) fn mark_cdc_chunk(
311 offset_parse_func: &CdcOffsetParseFunc,
312 chunk: StreamChunk,
313 current_pos: &OwnedRow,
314 pk_in_output_indices: &[usize],
315 pk_order: &[OrderType],
316 last_cdc_offset: Option<CdcOffset>,
317) -> StreamExecutorResult<StreamChunk> {
318 let chunk = chunk.compact_vis();
319 mark_cdc_chunk_inner(
320 offset_parse_func,
321 chunk,
322 current_pos,
323 last_cdc_offset,
324 pk_in_output_indices,
325 pk_order,
326 )
327}
328
329pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>(
334 chunk: &StreamChunk,
335 backfill_state: &BackfillState,
336 pk_in_output_indices: &[usize],
337 upstream_table: &ReplicatedStateTable<S, SD>,
338 pk_order: &[OrderType],
339) -> StreamExecutorResult<StreamChunk> {
340 let chunk = chunk.clone();
341 let (data, ops) = chunk.into_parts();
342 let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
343
344 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
345 let mut unmatched_update_delete = false;
346 let mut visible_update_delete = false;
347 for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
348 let pk = row.project(pk_in_output_indices);
349 let vnode = upstream_table.compute_vnode_by_pk(pk);
350 let visible = match backfill_state.get_progress(&vnode)? {
351 BackfillProgressPerVnode::Completed { .. } => true,
353 BackfillProgressPerVnode::NotStarted => false,
355 BackfillProgressPerVnode::InProgress { current_pos, .. } => {
357 cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()).is_le()
358 }
359 };
360 if !visible {
361 tracing::trace!(
362 source = "upstream",
363 state = "process_barrier",
364 action = "mark_chunk",
365 ?vnode,
366 ?op,
367 ?pk,
368 ?row,
369 "update_filtered",
370 );
371 }
372 new_visibility.append(visible);
373
374 normalize_unmatched_updates(
375 &mut new_ops,
376 &mut unmatched_update_delete,
377 &mut visible_update_delete,
378 visible,
379 i,
380 op,
381 );
382 }
383 let (columns, _) = data.into_parts();
384 let chunk = StreamChunk::with_visibility(new_ops, columns, new_visibility.finish());
385 Ok(chunk)
386}
387
388fn mark_chunk_inner(
392 chunk: StreamChunk,
393 current_pos: &OwnedRow,
394 pk_in_output_indices: &[usize],
395 pk_order: &[OrderType],
396) -> StreamChunk {
397 let (data, ops) = chunk.into_parts();
398 let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
399 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
400 let mut unmatched_update_delete = false;
401 let mut visible_update_delete = false;
402 for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
403 let lhs = row.project(pk_in_output_indices);
404 let rhs = current_pos;
405 let visible = cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()).is_le();
406 new_visibility.append(visible);
407
408 normalize_unmatched_updates(
409 &mut new_ops,
410 &mut unmatched_update_delete,
411 &mut visible_update_delete,
412 visible,
413 i,
414 op,
415 );
416 }
417 let (columns, _) = data.into_parts();
418 StreamChunk::with_visibility(new_ops, columns, new_visibility.finish())
419}
420
421fn normalize_unmatched_updates(
429 normalized_ops: &mut Cow<'_, [Op]>,
430 unmatched_update_delete: &mut bool,
431 visible_update_delete: &mut bool,
432 current_visibility: bool,
433 current_op_index: usize,
434 current_op: &Op,
435) {
436 if *unmatched_update_delete {
437 assert_eq!(*current_op, Op::UpdateInsert);
438 let visible_update_insert = current_visibility;
439 match (visible_update_delete, visible_update_insert) {
440 (true, false) => {
441 let ops = normalized_ops.to_mut();
443 ops[current_op_index - 1] = Op::Delete;
444 }
445 (false, true) => {
446 let ops = normalized_ops.to_mut();
448 ops[current_op_index] = Op::Insert;
449 }
450 (true, true) | (false, false) => {}
451 }
452 *unmatched_update_delete = false;
453 } else {
454 match current_op {
455 Op::UpdateDelete => {
456 *unmatched_update_delete = true;
457 *visible_update_delete = current_visibility;
458 }
459 Op::UpdateInsert => {
460 unreachable!("UpdateInsert should not be present without UpdateDelete")
461 }
462 _ => {}
463 }
464 }
465}
466
467fn mark_cdc_chunk_inner(
468 offset_parse_func: &CdcOffsetParseFunc,
469 chunk: StreamChunk,
470 current_pos: &OwnedRow,
471 last_cdc_offset: Option<CdcOffset>,
472 pk_in_output_indices: &[usize],
473 pk_order: &[OrderType],
474) -> StreamExecutorResult<StreamChunk> {
475 let (data, ops) = chunk.into_parts();
476 let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
477
478 let offset_col_idx = data.dimension() - 1;
480 for v in data.rows().map(|row| {
481 let offset_datum = row.datum_at(offset_col_idx).unwrap();
482 let event_offset = (*offset_parse_func)(offset_datum.into_utf8())?;
483 let visible = {
484 let in_binlog_range = if let Some(binlog_low) = &last_cdc_offset {
486 binlog_low <= &event_offset
487 } else {
488 true
489 };
490
491 if in_binlog_range {
492 let lhs = row.project(pk_in_output_indices);
493 let rhs = current_pos;
494 cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()).is_le()
495 } else {
496 false
497 }
498 };
499 Ok::<_, ConnectorError>(visible)
500 }) {
501 new_visibility.append(v?);
502 }
503
504 let (columns, _) = data.into_parts();
505 Ok(StreamChunk::with_visibility(
506 ops,
507 columns,
508 new_visibility.finish(),
509 ))
510}
511
512pub(crate) fn mapping_chunk(chunk: StreamChunk, output_indices: &[usize]) -> StreamChunk {
514 let (ops, columns, visibility) = chunk.into_inner();
515 let mapped_columns = output_indices.iter().map(|&i| columns[i].clone()).collect();
516 StreamChunk::with_visibility(ops, mapped_columns, visibility)
517}
518
519fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
520 watermark.transform_with_indices(upstream_indices)
521}
522
523pub(crate) fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Option<Message> {
524 match msg {
525 Message::Barrier(_) => Some(msg),
526 Message::Watermark(watermark) => {
527 mapping_watermark(watermark, upstream_indices).map(Message::Watermark)
528 }
529 Message::Chunk(chunk) => Some(Message::Chunk(mapping_chunk(chunk, upstream_indices))),
530 }
531}
532
533fn same_key_columns(lhs: &[usize], rhs: &[usize]) -> bool {
534 lhs.len() == rhs.len() && lhs.iter().all(|idx| rhs.contains(idx))
535}
536
537pub(super) struct UpstreamStreamKeyUpdateNormalizer {
544 current_stream_key_indices: Option<Vec<usize>>,
545}
546
547impl UpstreamStreamKeyUpdateNormalizer {
548 pub(super) fn new(
563 input_stream_key_indices: &[usize],
564 current_stream_key_indices: Vec<usize>,
565 ) -> Self {
566 let current_stream_key_indices =
567 (!same_key_columns(input_stream_key_indices, ¤t_stream_key_indices))
568 .then_some(current_stream_key_indices);
569 Self {
570 current_stream_key_indices,
571 }
572 }
573
574 pub(super) fn normalize_chunk(&self, chunk: StreamChunk) -> Option<StreamChunk> {
575 if let Some(current_stream_key_indices) = &self.current_stream_key_indices {
576 normalize_update_chunk_by_key(chunk, current_stream_key_indices)
577 } else {
578 Some(chunk)
579 }
580 }
581
582 pub(super) fn normalize_message(&self, msg: Message) -> Option<Message> {
583 match msg {
584 Message::Chunk(chunk) => self.normalize_chunk(chunk).map(Message::Chunk),
585 msg => Some(msg),
586 }
587 }
588}
589
590fn normalize_update_chunk_by_key(chunk: StreamChunk, key_indices: &[usize]) -> Option<StreamChunk> {
591 let (data_chunk, ops) = chunk.into_parts();
592 let mut update_indices = vec![];
593 let mut row_idx = data_chunk.next_visible_row_idx(0);
594 while let Some(idx) = row_idx {
595 let row = data_chunk.row_at_unchecked_vis(idx);
596 match ops[idx] {
597 Op::UpdateDelete => {
598 let next_idx = data_chunk
599 .next_visible_row_idx(idx + 1)
600 .unwrap_or_else(|| panic!("expect a U+ after U-\nU- row: {}", row.display()));
601 let next_row = data_chunk.row_at_unchecked_vis(next_idx);
602 debug_assert_eq!(
603 ops[next_idx],
604 Op::UpdateInsert,
605 "expect a U+ after U-\nU- row: {}\nrow after U-: {}",
606 row.display(),
607 next_row.display()
608 );
609 if row.project(key_indices) != next_row.project(key_indices) {
610 update_indices.push((idx, next_idx));
611 }
612 row_idx = data_chunk.next_visible_row_idx(next_idx + 1);
613 }
614 Op::UpdateInsert => panic!("expect a U- before U+\nU+ row: {}", row.display()),
615 Op::Insert | Op::Delete => {
616 row_idx = data_chunk.next_visible_row_idx(idx + 1);
617 }
618 }
619 }
620
621 if update_indices.is_empty() {
622 return Some(StreamChunk::from_parts(ops, data_chunk));
623 }
624
625 let (columns, visibility) = data_chunk.into_parts();
626 let mut ops = ops.to_vec();
627 for (delete_idx, insert_idx) in update_indices {
628 ops[delete_idx] = Op::Delete;
629 ops[insert_idx] = Op::Insert;
630 }
631 Some(StreamChunk::from_parts(
632 ops,
633 DataChunk::new(columns, visibility),
634 ))
635}
636
637pub(crate) async fn get_progress_per_vnode<S: StateStore, const IS_REPLICATED: bool>(
640 state_table: &StateTableInner<S, BasicSerde, IS_REPLICATED>,
641) -> StreamExecutorResult<Vec<(VirtualNode, BackfillStatePerVnode)>> {
642 debug_assert!(!state_table.vnodes().is_empty());
643 let vnodes = state_table.vnodes().iter_vnodes();
644 let mut result = Vec::with_capacity(state_table.vnodes().len());
645 let vnode_keys = vnodes.map(|vnode| {
647 let datum: [Datum; 1] = [Some(vnode.to_scalar().into())];
648 datum
649 });
650 let tasks = vnode_keys.map(|vnode_key| state_table.get_row(vnode_key));
651 let state_for_vnodes = try_join_all(tasks).await?;
655 for (vnode, state_for_vnode) in state_table
656 .vnodes()
657 .iter_vnodes()
658 .zip_eq_debug(state_for_vnodes)
659 {
660 let backfill_progress = match state_for_vnode {
661 Some(row) => {
663 let snapshot_row_count = row.as_inner().get(row.len() - 1).unwrap();
666 let snapshot_row_count = (*snapshot_row_count.as_ref().unwrap().as_int64()) as u64;
667
668 let vnode_is_finished = row.as_inner().get(row.len() - 2).unwrap();
671 let vnode_is_finished = vnode_is_finished.as_ref().unwrap();
672
673 let current_pos = row.as_inner().get(..row.len() - 2).unwrap();
675 let current_pos = current_pos.into_owned_row();
676
677 if *vnode_is_finished.as_bool() {
679 BackfillStatePerVnode::new(
680 BackfillProgressPerVnode::Completed {
681 current_pos: current_pos.clone(),
682 snapshot_row_count,
683 },
684 BackfillProgressPerVnode::Completed {
685 current_pos,
686 snapshot_row_count,
687 },
688 )
689 } else {
690 BackfillStatePerVnode::new(
691 BackfillProgressPerVnode::InProgress {
692 current_pos: current_pos.clone(),
693 snapshot_row_count,
694 },
695 BackfillProgressPerVnode::InProgress {
696 current_pos,
697 snapshot_row_count,
698 },
699 )
700 }
701 }
702 None => BackfillStatePerVnode::new(
704 BackfillProgressPerVnode::NotStarted,
705 BackfillProgressPerVnode::NotStarted,
706 ),
707 };
708 result.push((vnode, backfill_progress));
709 }
710 assert_eq!(result.len(), state_table.vnodes().count_ones());
711 Ok(result)
712}
713
714pub(crate) async fn flush_data<S: StateStore, const IS_REPLICATED: bool>(
716 table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
717 epoch: EpochPair,
718 old_state: &mut Option<Vec<Datum>>,
719 current_partial_state: &mut [Datum],
720) -> StreamExecutorResult<()> {
721 let vnodes = table.vnodes().clone();
722 if let Some(old_state) = old_state {
723 if old_state[1..] != current_partial_state[1..] {
724 vnodes.iter_vnodes_scalar().for_each(|vnode| {
725 let datum = Some(vnode.into());
726 current_partial_state[0].clone_from(&datum);
727 old_state[0] = datum;
728 table.write_record(Record::Update {
729 old_row: &old_state[..],
730 new_row: &(*current_partial_state),
731 })
732 });
733 }
734 } else {
735 vnodes.iter_vnodes_scalar().for_each(|vnode| {
737 let datum = Some(vnode.into());
738 current_partial_state[0] = datum;
740 table.write_record(Record::Insert {
741 new_row: &(*current_partial_state),
742 })
743 });
744 }
745 table.commit_assert_no_update_vnode_bitmap(epoch).await
746}
747
748pub(crate) fn build_temporary_state(
755 row_state: &mut [Datum],
756 is_finished: bool,
757 current_pos: &OwnedRow,
758 row_count: u64,
759) {
760 row_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
761 row_state[current_pos.len() + 1] = Some(is_finished.into());
762 row_state[current_pos.len() + 2] = Some((row_count as i64).into());
763}
764
765pub(crate) fn update_pos_by_vnode(
767 vnode: VirtualNode,
768 chunk: &StreamChunk,
769 pk_in_output_indices: &[usize],
770 backfill_state: &mut BackfillState,
771 snapshot_row_count_delta: u64,
772) -> StreamExecutorResult<()> {
773 let new_pos = get_new_pos(chunk, pk_in_output_indices);
774 assert_eq!(new_pos.len(), pk_in_output_indices.len());
775 backfill_state.update_progress(vnode, new_pos, snapshot_row_count_delta)?;
776 Ok(())
777}
778
779pub(crate) fn get_new_pos(chunk: &StreamChunk, pk_in_output_indices: &[usize]) -> OwnedRow {
782 chunk
783 .rows()
784 .last()
785 .unwrap()
786 .1
787 .project(pk_in_output_indices)
788 .into_owned_row()
789}
790
791pub(crate) fn get_cdc_chunk_last_offset(
792 offset_parse_func: &CdcOffsetParseFunc,
793 chunk: &StreamChunk,
794) -> StreamExecutorResult<Option<CdcOffset>> {
795 let row = chunk.rows().last().unwrap().1;
796 let offset_col = row.iter().last().unwrap();
797 let output =
798 offset_col.map(|scalar| Ok::<_, ConnectorError>((*offset_parse_func)(scalar.into_utf8()))?);
799 output.transpose().map_err(|e| e.into())
800}
801
802pub(crate) fn construct_initial_finished_state(pos_len: usize) -> OwnedRow {
808 OwnedRow::new(vec![None; pos_len])
809}
810
811pub(crate) fn compute_bounds(
812 pk_indices: &[usize],
813 current_pos: Option<OwnedRow>,
814) -> Option<(Bound<OwnedRow>, Bound<OwnedRow>)> {
815 if let Some(current_pos) = current_pos {
818 if current_pos.is_empty() {
822 assert!(pk_indices.is_empty());
823 return None;
824 }
825
826 Some((Bound::Excluded(current_pos), Bound::Unbounded))
827 } else {
828 Some((Bound::Unbounded, Bound::Unbounded))
829 }
830}
831
832#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
833pub(crate) async fn iter_chunks<'a, S, E, R>(mut iter: S, builder: &'a mut DataChunkBuilder)
834where
835 StreamExecutorError: From<E>,
836 R: Row,
837 S: Stream<Item = Result<R, E>> + Unpin + 'a,
838{
839 while let Some(data_chunk) = collect_data_chunk_with_builder(&mut iter, builder)
840 .instrument_await("backfill_snapshot_read")
841 .await?
842 {
843 debug_assert!(data_chunk.cardinality() > 0);
844 let ops = vec![Op::Insert; data_chunk.capacity()];
845 let stream_chunk = StreamChunk::from_parts(ops, data_chunk);
846 yield stream_chunk;
847 }
848}
849
850pub(crate) async fn persist_state_per_vnode<S: StateStore, const IS_REPLICATED: bool>(
874 epoch: EpochPair,
875 table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
876 backfill_state: &mut BackfillState,
877 #[cfg(debug_assertions)] state_len: usize,
878 vnodes: impl Iterator<Item = VirtualNode>,
879) -> StreamExecutorResult<()> {
880 for vnode in vnodes {
881 if !backfill_state.need_commit(&vnode) {
882 continue;
883 }
884 let (encoded_prev_state, encoded_current_state) =
885 match backfill_state.get_commit_state(&vnode) {
886 Some((old_state, new_state)) => (old_state, new_state),
887 None => continue,
888 };
889 if let Some(encoded_prev_state) = encoded_prev_state {
890 #[cfg(debug_assertions)]
892 {
893 let pk: &[Datum; 1] = &[Some(vnode.to_scalar().into())];
894 let old_row = table.get_row(pk).await?;
896 match old_row {
897 Some(old_row) => {
898 let inner = old_row.as_inner();
899 assert_eq!(inner, &encoded_prev_state[1..]);
901 assert_ne!(inner, &encoded_current_state[1..]);
902 assert_eq!(old_row.len(), state_len - 1);
903 assert_eq!(encoded_current_state.len(), state_len);
904 }
905 None => {
906 bail!("row {:#?} not found", pk);
907 }
908 }
909 }
910 table.write_record(Record::Update {
911 old_row: &encoded_prev_state[..],
912 new_row: &encoded_current_state[..],
913 });
914 } else {
915 #[cfg(debug_assertions)]
917 {
918 let pk: &[Datum; 1] = &[Some(vnode.to_scalar().into())];
919 let row = table.get_row(pk).await?;
920 assert!(row.is_none(), "row {:#?}", row);
921 assert_eq!(encoded_current_state.len(), state_len);
922 }
923 table.write_record(Record::Insert {
924 new_row: &encoded_current_state[..],
925 });
926 }
927 backfill_state.mark_committed(vnode);
928 }
929
930 table.commit_assert_no_update_vnode_bitmap(epoch).await?;
931 Ok(())
932}
933
934pub(crate) async fn persist_state<S: StateStore, const IS_REPLICATED: bool>(
940 epoch: EpochPair,
941 table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
942 is_finished: bool,
943 current_pos: &Option<OwnedRow>,
944 row_count: u64,
945 old_state: &mut Option<Vec<Datum>>,
946 current_state: &mut [Datum],
947) -> StreamExecutorResult<()> {
948 if let Some(current_pos_inner) = current_pos {
949 build_temporary_state(current_state, is_finished, current_pos_inner, row_count);
951 flush_data(table, epoch, old_state, current_state).await?;
952 *old_state = Some(current_state.into());
953 } else {
954 table.commit_assert_no_update_vnode_bitmap(epoch).await?;
955 }
956 Ok(())
957}
958
959pub fn create_builder(
963 rate_limit: RateLimit,
964 chunk_size: usize,
965 data_types: Vec<DataType>,
966) -> DataChunkBuilder {
967 let batch_size = match rate_limit {
968 RateLimit::Disabled | RateLimit::Pause => chunk_size,
969 RateLimit::Fixed(limit) => min(limit.get() as usize, chunk_size),
970 };
971 let batch_size = max(2, batch_size);
973 DataChunkBuilder::new(data_types, batch_size)
974}
975
976#[cfg(test)]
977mod tests {
978 use std::sync::Arc;
979
980 use super::*;
981
982 #[test]
983 fn test_normalizing_unmatched_updates() {
984 let ops = vec![
985 Op::UpdateDelete,
986 Op::UpdateInsert,
987 Op::UpdateDelete,
988 Op::UpdateInsert,
989 ];
990 let ops: Arc<[Op]> = ops.into();
991
992 {
993 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
994 let mut unmatched_update_delete = true;
995 let mut visible_update_delete = true;
996 let current_visibility = true;
997 normalize_unmatched_updates(
998 &mut new_ops,
999 &mut unmatched_update_delete,
1000 &mut visible_update_delete,
1001 current_visibility,
1002 1,
1003 &Op::UpdateInsert,
1004 );
1005 assert_eq!(
1006 &new_ops[..],
1007 vec![
1008 Op::UpdateDelete,
1009 Op::UpdateInsert,
1010 Op::UpdateDelete,
1011 Op::UpdateInsert
1012 ]
1013 );
1014 }
1015 {
1016 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
1017 let mut unmatched_update_delete = true;
1018 let mut visible_update_delete = false;
1019 let current_visibility = false;
1020 normalize_unmatched_updates(
1021 &mut new_ops,
1022 &mut unmatched_update_delete,
1023 &mut visible_update_delete,
1024 current_visibility,
1025 1,
1026 &Op::UpdateInsert,
1027 );
1028 assert_eq!(
1029 &new_ops[..],
1030 vec![
1031 Op::UpdateDelete,
1032 Op::UpdateInsert,
1033 Op::UpdateDelete,
1034 Op::UpdateInsert
1035 ]
1036 );
1037 }
1038 {
1039 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
1040 let mut unmatched_update_delete = true;
1041 let mut visible_update_delete = true;
1042 let current_visibility = false;
1043 normalize_unmatched_updates(
1044 &mut new_ops,
1045 &mut unmatched_update_delete,
1046 &mut visible_update_delete,
1047 current_visibility,
1048 1,
1049 &Op::UpdateInsert,
1050 );
1051 assert_eq!(
1052 &new_ops[..],
1053 vec![
1054 Op::Delete,
1055 Op::UpdateInsert,
1056 Op::UpdateDelete,
1057 Op::UpdateInsert
1058 ]
1059 );
1060 }
1061 {
1062 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
1063 let mut unmatched_update_delete = true;
1064 let mut visible_update_delete = false;
1065 let current_visibility = true;
1066 normalize_unmatched_updates(
1067 &mut new_ops,
1068 &mut unmatched_update_delete,
1069 &mut visible_update_delete,
1070 current_visibility,
1071 1,
1072 &Op::UpdateInsert,
1073 );
1074 assert_eq!(
1075 &new_ops[..],
1076 vec![
1077 Op::UpdateDelete,
1078 Op::Insert,
1079 Op::UpdateDelete,
1080 Op::UpdateInsert
1081 ]
1082 );
1083 }
1084 }
1085}