1use std::borrow::Cow;
16use std::cmp::{max, min};
17use std::collections::HashMap;
18use std::ops::Bound;
19
20use await_tree::InstrumentAwait;
21use futures::Stream;
22use futures::future::try_join_all;
23use futures_async_stream::try_stream;
24use risingwave_common::array::stream_record::Record;
25use risingwave_common::array::{Op, StreamChunk};
26use risingwave_common::bail;
27use risingwave_common::bitmap::BitmapBuilder;
28use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
29use risingwave_common::row::{OwnedRow, Row, RowExt};
30use risingwave_common::types::{DataType, Datum};
31use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
32use risingwave_common::util::epoch::EpochPair;
33use risingwave_common::util::iter_util::ZipEqDebug;
34use risingwave_common::util::sort_util::{OrderType, cmp_datum_iter};
35use risingwave_common::util::value_encoding::BasicSerde;
36use risingwave_common_rate_limit::RateLimit;
37use risingwave_connector::error::ConnectorError;
38use risingwave_connector::source::cdc::external::{CdcOffset, CdcOffsetParseFunc};
39use risingwave_storage::StateStore;
40use risingwave_storage::row_serde::value_serde::ValueRowSerde;
41use risingwave_storage::table::collect_data_chunk_with_builder;
42
43use crate::common::table::state_table::{ReplicatedStateTable, StateTableInner};
44use crate::executor::{
45 Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, Watermark,
46};
47
48pub const METADATA_STATE_LEN: usize = 3;
50
51#[derive(Clone, Debug)]
52pub struct BackfillState {
53 inner: HashMap<VirtualNode, BackfillStatePerVnode>,
56}
57
58impl BackfillState {
59 pub(crate) fn has_progress(&self) -> bool {
60 self.inner.values().any(|p| {
61 matches!(
62 p.current_state(),
63 &BackfillProgressPerVnode::InProgress { .. }
64 )
65 })
66 }
67
68 pub(crate) fn get_current_state(
69 &mut self,
70 vnode: &VirtualNode,
71 ) -> &mut BackfillProgressPerVnode {
72 &mut self.inner.get_mut(vnode).unwrap().current_state
73 }
74
75 pub(crate) fn get_progress(
77 &self,
78 vnode: &VirtualNode,
79 ) -> StreamExecutorResult<&BackfillProgressPerVnode> {
80 match self.inner.get(vnode) {
81 Some(p) => Ok(p.current_state()),
82 None => bail!(
83 "Backfill progress for vnode {:#?} not found, backfill_state not initialized properly",
84 vnode,
85 ),
86 }
87 }
88
89 pub(crate) fn update_progress(
90 &mut self,
91 vnode: VirtualNode,
92 new_pos: OwnedRow,
93 snapshot_row_count_delta: u64,
94 ) -> StreamExecutorResult<()> {
95 let state = self.get_current_state(&vnode);
96 match state {
97 BackfillProgressPerVnode::NotStarted => {
98 *state = BackfillProgressPerVnode::InProgress {
99 current_pos: new_pos,
100 snapshot_row_count: snapshot_row_count_delta,
101 };
102 }
103 BackfillProgressPerVnode::InProgress {
104 snapshot_row_count, ..
105 } => {
106 *state = BackfillProgressPerVnode::InProgress {
107 current_pos: new_pos,
108 snapshot_row_count: *snapshot_row_count + snapshot_row_count_delta,
109 };
110 }
111 BackfillProgressPerVnode::Completed { .. } => unreachable!(),
112 }
113 Ok(())
114 }
115
116 pub(crate) fn finish_progress(&mut self, vnode: VirtualNode, pos_len: usize) {
117 let finished_placeholder_position = construct_initial_finished_state(pos_len);
118 let current_state = self.get_current_state(&vnode);
119 let (new_pos, snapshot_row_count) = match current_state {
120 BackfillProgressPerVnode::NotStarted => (finished_placeholder_position, 0),
121 BackfillProgressPerVnode::InProgress {
122 current_pos,
123 snapshot_row_count,
124 } => (current_pos.clone(), *snapshot_row_count),
125 BackfillProgressPerVnode::Completed { .. } => {
126 return;
127 }
128 };
129 *current_state = BackfillProgressPerVnode::Completed {
130 current_pos: new_pos,
131 snapshot_row_count,
132 };
133 }
134
135 fn get_commit_state(&self, vnode: &VirtualNode) -> Option<(Option<Vec<Datum>>, Vec<Datum>)> {
137 let new_state = self.inner.get(vnode).unwrap().current_state().clone();
138 let new_encoded_state = match new_state {
139 BackfillProgressPerVnode::NotStarted => unreachable!(),
140 BackfillProgressPerVnode::InProgress {
141 current_pos,
142 snapshot_row_count,
143 } => {
144 let mut encoded_state = vec![None; current_pos.len() + METADATA_STATE_LEN];
145 encoded_state[0] = Some(vnode.to_scalar().into());
146 encoded_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
147 encoded_state[current_pos.len() + 1] = Some(false.into());
148 encoded_state[current_pos.len() + 2] = Some((snapshot_row_count as i64).into());
149 encoded_state
150 }
151 BackfillProgressPerVnode::Completed {
152 current_pos,
153 snapshot_row_count,
154 } => {
155 let mut encoded_state = vec![None; current_pos.len() + METADATA_STATE_LEN];
156 encoded_state[0] = Some(vnode.to_scalar().into());
157 encoded_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
158 encoded_state[current_pos.len() + 1] = Some(true.into());
159 encoded_state[current_pos.len() + 2] = Some((snapshot_row_count as i64).into());
160 encoded_state
161 }
162 };
163 let old_state = self.inner.get(vnode).unwrap().committed_state().clone();
164 let old_encoded_state = match old_state {
165 BackfillProgressPerVnode::NotStarted => None,
166 BackfillProgressPerVnode::InProgress {
167 current_pos,
168 snapshot_row_count,
169 } => {
170 let committed_pos = current_pos;
171 let mut encoded_state = vec![None; committed_pos.len() + METADATA_STATE_LEN];
172 encoded_state[0] = Some(vnode.to_scalar().into());
173 encoded_state[1..committed_pos.len() + 1]
174 .clone_from_slice(committed_pos.as_inner());
175 encoded_state[committed_pos.len() + 1] = Some(false.into());
176 encoded_state[committed_pos.len() + 2] = Some((snapshot_row_count as i64).into());
177 Some(encoded_state)
178 }
179 BackfillProgressPerVnode::Completed {
180 current_pos,
181 snapshot_row_count,
182 } => {
183 let committed_pos = current_pos;
184 let mut encoded_state = vec![None; committed_pos.len() + METADATA_STATE_LEN];
185 encoded_state[0] = Some(vnode.to_scalar().into());
186 encoded_state[1..committed_pos.len() + 1]
187 .clone_from_slice(committed_pos.as_inner());
188 encoded_state[committed_pos.len() + 1] = Some(true.into());
189 encoded_state[committed_pos.len() + 2] = Some((snapshot_row_count as i64).into());
190 Some(encoded_state)
191 }
192 };
193 Some((old_encoded_state, new_encoded_state))
194 }
195
196 fn need_commit(&self, vnode: &VirtualNode) -> bool {
199 let state = self.inner.get(vnode).unwrap();
200 match state.current_state() {
201 s @ BackfillProgressPerVnode::InProgress { .. }
203 | s @ BackfillProgressPerVnode::Completed { .. } => s != state.committed_state(),
204 BackfillProgressPerVnode::NotStarted => false,
205 }
206 }
207
208 fn mark_committed(&mut self, vnode: VirtualNode) {
209 let BackfillStatePerVnode {
210 committed_state,
211 current_state,
212 } = self.inner.get_mut(&vnode).unwrap();
213
214 assert!(matches!(
215 current_state,
216 BackfillProgressPerVnode::InProgress { .. }
217 | BackfillProgressPerVnode::Completed { .. }
218 ));
219 *committed_state = current_state.clone();
220 }
221
222 pub(crate) fn get_snapshot_row_count(&self) -> u64 {
223 self.inner
224 .values()
225 .map(|p| p.get_snapshot_row_count())
226 .sum()
227 }
228}
229
230#[derive(Clone, Debug, PartialEq, Eq)]
231pub struct BackfillStatePerVnode {
232 committed_state: BackfillProgressPerVnode,
233 current_state: BackfillProgressPerVnode,
234}
235
236impl BackfillStatePerVnode {
237 pub(crate) fn new(
238 committed_state: BackfillProgressPerVnode,
239 current_state: BackfillProgressPerVnode,
240 ) -> Self {
241 Self {
242 committed_state,
243 current_state,
244 }
245 }
246
247 pub(crate) fn committed_state(&self) -> &BackfillProgressPerVnode {
248 &self.committed_state
249 }
250
251 pub(crate) fn current_state(&self) -> &BackfillProgressPerVnode {
252 &self.current_state
253 }
254
255 pub(crate) fn get_snapshot_row_count(&self) -> u64 {
256 self.current_state().get_snapshot_row_count()
257 }
258}
259
260impl From<Vec<(VirtualNode, BackfillStatePerVnode)>> for BackfillState {
261 fn from(v: Vec<(VirtualNode, BackfillStatePerVnode)>) -> Self {
262 Self {
263 inner: v.into_iter().collect(),
264 }
265 }
266}
267
268#[derive(Clone, Eq, PartialEq, Debug)]
271pub enum BackfillProgressPerVnode {
272 NotStarted,
274 InProgress {
275 current_pos: OwnedRow,
277 snapshot_row_count: u64,
279 },
280 Completed {
281 current_pos: OwnedRow,
283 snapshot_row_count: u64,
285 },
286}
287
288impl BackfillProgressPerVnode {
289 fn get_snapshot_row_count(&self) -> u64 {
290 match self {
291 BackfillProgressPerVnode::NotStarted => 0,
292 BackfillProgressPerVnode::InProgress {
293 snapshot_row_count, ..
294 }
295 | BackfillProgressPerVnode::Completed {
296 snapshot_row_count, ..
297 } => *snapshot_row_count,
298 }
299 }
300}
301
302pub(crate) fn mark_chunk(
303 chunk: StreamChunk,
304 current_pos: &OwnedRow,
305 pk_in_output_indices: PkIndicesRef<'_>,
306 pk_order: &[OrderType],
307) -> StreamChunk {
308 let chunk = chunk.compact();
309 mark_chunk_inner(chunk, current_pos, pk_in_output_indices, pk_order)
310}
311
312pub(crate) fn mark_cdc_chunk(
313 offset_parse_func: &CdcOffsetParseFunc,
314 chunk: StreamChunk,
315 current_pos: &OwnedRow,
316 pk_in_output_indices: PkIndicesRef<'_>,
317 pk_order: &[OrderType],
318 last_cdc_offset: Option<CdcOffset>,
319) -> StreamExecutorResult<StreamChunk> {
320 let chunk = chunk.compact();
321 mark_cdc_chunk_inner(
322 offset_parse_func,
323 chunk,
324 current_pos,
325 last_cdc_offset,
326 pk_in_output_indices,
327 pk_order,
328 )
329}
330
331pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>(
336 chunk: &StreamChunk,
337 backfill_state: &BackfillState,
338 pk_in_output_indices: PkIndicesRef<'_>,
339 upstream_table: &ReplicatedStateTable<S, SD>,
340 pk_order: &[OrderType],
341) -> StreamExecutorResult<StreamChunk> {
342 let chunk = chunk.clone();
343 let (data, ops) = chunk.into_parts();
344 let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
345
346 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
347 let mut unmatched_update_delete = false;
348 let mut visible_update_delete = false;
349 for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
350 let pk = row.project(pk_in_output_indices);
351 let vnode = upstream_table.compute_vnode_by_pk(pk);
352 let visible = match backfill_state.get_progress(&vnode)? {
353 BackfillProgressPerVnode::Completed { .. } => true,
355 BackfillProgressPerVnode::NotStarted => false,
357 BackfillProgressPerVnode::InProgress { current_pos, .. } => {
359 cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()).is_le()
360 }
361 };
362 if !visible {
363 tracing::trace!(
364 source = "upstream",
365 state = "process_barrier",
366 action = "mark_chunk",
367 ?vnode,
368 ?op,
369 ?pk,
370 ?row,
371 "update_filtered",
372 );
373 }
374 new_visibility.append(visible);
375
376 normalize_unmatched_updates(
377 &mut new_ops,
378 &mut unmatched_update_delete,
379 &mut visible_update_delete,
380 visible,
381 i,
382 op,
383 );
384 }
385 let (columns, _) = data.into_parts();
386 let chunk = StreamChunk::with_visibility(new_ops, columns, new_visibility.finish());
387 Ok(chunk)
388}
389
390fn mark_chunk_inner(
394 chunk: StreamChunk,
395 current_pos: &OwnedRow,
396 pk_in_output_indices: PkIndicesRef<'_>,
397 pk_order: &[OrderType],
398) -> StreamChunk {
399 let (data, ops) = chunk.into_parts();
400 let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
401 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
402 let mut unmatched_update_delete = false;
403 let mut visible_update_delete = false;
404 for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
405 let lhs = row.project(pk_in_output_indices);
406 let rhs = current_pos;
407 let visible = cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()).is_le();
408 new_visibility.append(visible);
409
410 normalize_unmatched_updates(
411 &mut new_ops,
412 &mut unmatched_update_delete,
413 &mut visible_update_delete,
414 visible,
415 i,
416 op,
417 );
418 }
419 let (columns, _) = data.into_parts();
420 StreamChunk::with_visibility(new_ops, columns, new_visibility.finish())
421}
422
423fn normalize_unmatched_updates(
431 normalized_ops: &mut Cow<'_, [Op]>,
432 unmatched_update_delete: &mut bool,
433 visible_update_delete: &mut bool,
434 current_visibility: bool,
435 current_op_index: usize,
436 current_op: &Op,
437) {
438 if *unmatched_update_delete {
439 assert_eq!(*current_op, Op::UpdateInsert);
440 let visible_update_insert = current_visibility;
441 match (visible_update_delete, visible_update_insert) {
442 (true, false) => {
443 let ops = normalized_ops.to_mut();
445 ops[current_op_index - 1] = Op::Delete;
446 }
447 (false, true) => {
448 let ops = normalized_ops.to_mut();
450 ops[current_op_index] = Op::Insert;
451 }
452 (true, true) | (false, false) => {}
453 }
454 *unmatched_update_delete = false;
455 } else {
456 match current_op {
457 Op::UpdateDelete => {
458 *unmatched_update_delete = true;
459 *visible_update_delete = current_visibility;
460 }
461 Op::UpdateInsert => {
462 unreachable!("UpdateInsert should not be present without UpdateDelete")
463 }
464 _ => {}
465 }
466 }
467}
468
469fn mark_cdc_chunk_inner(
470 offset_parse_func: &CdcOffsetParseFunc,
471 chunk: StreamChunk,
472 current_pos: &OwnedRow,
473 last_cdc_offset: Option<CdcOffset>,
474 pk_in_output_indices: PkIndicesRef<'_>,
475 pk_order: &[OrderType],
476) -> StreamExecutorResult<StreamChunk> {
477 let (data, ops) = chunk.into_parts();
478 let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
479
480 let offset_col_idx = data.dimension() - 1;
482 for v in data.rows().map(|row| {
483 let offset_datum = row.datum_at(offset_col_idx).unwrap();
484 let event_offset = (*offset_parse_func)(offset_datum.into_utf8())?;
485 let visible = {
486 let in_binlog_range = if let Some(binlog_low) = &last_cdc_offset {
488 binlog_low <= &event_offset
489 } else {
490 true
491 };
492
493 if in_binlog_range {
494 let lhs = row.project(pk_in_output_indices);
495 let rhs = current_pos;
496 cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()).is_le()
497 } else {
498 false
499 }
500 };
501 Ok::<_, ConnectorError>(visible)
502 }) {
503 new_visibility.append(v?);
504 }
505
506 let (columns, _) = data.into_parts();
507 Ok(StreamChunk::with_visibility(
508 ops,
509 columns,
510 new_visibility.finish(),
511 ))
512}
513
514pub(crate) fn mapping_chunk(chunk: StreamChunk, output_indices: &[usize]) -> StreamChunk {
516 let (ops, columns, visibility) = chunk.into_inner();
517 let mapped_columns = output_indices.iter().map(|&i| columns[i].clone()).collect();
518 StreamChunk::with_visibility(ops, mapped_columns, visibility)
519}
520
521fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
522 watermark.transform_with_indices(upstream_indices)
523}
524
525pub(crate) fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Option<Message> {
526 match msg {
527 Message::Barrier(_) => Some(msg),
528 Message::Watermark(watermark) => {
529 mapping_watermark(watermark, upstream_indices).map(Message::Watermark)
530 }
531 Message::Chunk(chunk) => Some(Message::Chunk(mapping_chunk(chunk, upstream_indices))),
532 }
533}
534
535pub(crate) async fn get_progress_per_vnode<S: StateStore, const IS_REPLICATED: bool>(
538 state_table: &StateTableInner<S, BasicSerde, IS_REPLICATED>,
539) -> StreamExecutorResult<Vec<(VirtualNode, BackfillStatePerVnode)>> {
540 debug_assert!(!state_table.vnodes().is_empty());
541 let vnodes = state_table.vnodes().iter_vnodes();
542 let mut result = Vec::with_capacity(state_table.vnodes().len());
543 let vnode_keys = vnodes.map(|vnode| {
545 let datum: [Datum; 1] = [Some(vnode.to_scalar().into())];
546 datum
547 });
548 let tasks = vnode_keys.map(|vnode_key| state_table.get_row(vnode_key));
549 let state_for_vnodes = try_join_all(tasks).await?;
553 for (vnode, state_for_vnode) in state_table
554 .vnodes()
555 .iter_vnodes()
556 .zip_eq_debug(state_for_vnodes)
557 {
558 let backfill_progress = match state_for_vnode {
559 Some(row) => {
561 let snapshot_row_count = row.as_inner().get(row.len() - 1).unwrap();
564 let snapshot_row_count = (*snapshot_row_count.as_ref().unwrap().as_int64()) as u64;
565
566 let vnode_is_finished = row.as_inner().get(row.len() - 2).unwrap();
569 let vnode_is_finished = vnode_is_finished.as_ref().unwrap();
570
571 let current_pos = row.as_inner().get(..row.len() - 2).unwrap();
573 let current_pos = current_pos.into_owned_row();
574
575 if *vnode_is_finished.as_bool() {
577 BackfillStatePerVnode::new(
578 BackfillProgressPerVnode::Completed {
579 current_pos: current_pos.clone(),
580 snapshot_row_count,
581 },
582 BackfillProgressPerVnode::Completed {
583 current_pos,
584 snapshot_row_count,
585 },
586 )
587 } else {
588 BackfillStatePerVnode::new(
589 BackfillProgressPerVnode::InProgress {
590 current_pos: current_pos.clone(),
591 snapshot_row_count,
592 },
593 BackfillProgressPerVnode::InProgress {
594 current_pos,
595 snapshot_row_count,
596 },
597 )
598 }
599 }
600 None => BackfillStatePerVnode::new(
602 BackfillProgressPerVnode::NotStarted,
603 BackfillProgressPerVnode::NotStarted,
604 ),
605 };
606 result.push((vnode, backfill_progress));
607 }
608 assert_eq!(result.len(), state_table.vnodes().count_ones());
609 Ok(result)
610}
611
612pub(crate) async fn flush_data<S: StateStore, const IS_REPLICATED: bool>(
614 table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
615 epoch: EpochPair,
616 old_state: &mut Option<Vec<Datum>>,
617 current_partial_state: &mut [Datum],
618) -> StreamExecutorResult<()> {
619 let vnodes = table.vnodes().clone();
620 if let Some(old_state) = old_state {
621 if old_state[1..] != current_partial_state[1..] {
622 vnodes.iter_vnodes_scalar().for_each(|vnode| {
623 let datum = Some(vnode.into());
624 current_partial_state[0].clone_from(&datum);
625 old_state[0] = datum;
626 table.write_record(Record::Update {
627 old_row: &old_state[..],
628 new_row: &(*current_partial_state),
629 })
630 });
631 }
632 } else {
633 vnodes.iter_vnodes_scalar().for_each(|vnode| {
635 let datum = Some(vnode.into());
636 current_partial_state[0] = datum;
638 table.write_record(Record::Insert {
639 new_row: &(*current_partial_state),
640 })
641 });
642 }
643 table.commit_assert_no_update_vnode_bitmap(epoch).await
644}
645
646pub(crate) fn build_temporary_state(
653 row_state: &mut [Datum],
654 is_finished: bool,
655 current_pos: &OwnedRow,
656 row_count: u64,
657) {
658 row_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
659 row_state[current_pos.len() + 1] = Some(is_finished.into());
660 row_state[current_pos.len() + 2] = Some((row_count as i64).into());
661}
662
663pub(crate) fn update_pos_by_vnode(
665 vnode: VirtualNode,
666 chunk: &StreamChunk,
667 pk_in_output_indices: &[usize],
668 backfill_state: &mut BackfillState,
669 snapshot_row_count_delta: u64,
670) -> StreamExecutorResult<()> {
671 let new_pos = get_new_pos(chunk, pk_in_output_indices);
672 assert_eq!(new_pos.len(), pk_in_output_indices.len());
673 backfill_state.update_progress(vnode, new_pos, snapshot_row_count_delta)?;
674 Ok(())
675}
676
677pub(crate) fn get_new_pos(chunk: &StreamChunk, pk_in_output_indices: &[usize]) -> OwnedRow {
680 chunk
681 .rows()
682 .last()
683 .unwrap()
684 .1
685 .project(pk_in_output_indices)
686 .into_owned_row()
687}
688
689pub(crate) fn get_cdc_chunk_last_offset(
690 offset_parse_func: &CdcOffsetParseFunc,
691 chunk: &StreamChunk,
692) -> StreamExecutorResult<Option<CdcOffset>> {
693 let row = chunk.rows().last().unwrap().1;
694 let offset_col = row.iter().last().unwrap();
695 let output =
696 offset_col.map(|scalar| Ok::<_, ConnectorError>((*offset_parse_func)(scalar.into_utf8()))?);
697 output.transpose().map_err(|e| e.into())
698}
699
700pub(crate) fn construct_initial_finished_state(pos_len: usize) -> OwnedRow {
706 OwnedRow::new(vec![None; pos_len])
707}
708
709pub(crate) fn compute_bounds(
710 pk_indices: &[usize],
711 current_pos: Option<OwnedRow>,
712) -> Option<(Bound<OwnedRow>, Bound<OwnedRow>)> {
713 if let Some(current_pos) = current_pos {
716 if current_pos.is_empty() {
720 assert!(pk_indices.is_empty());
721 return None;
722 }
723
724 Some((Bound::Excluded(current_pos), Bound::Unbounded))
725 } else {
726 Some((Bound::Unbounded, Bound::Unbounded))
727 }
728}
729
730#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
731pub(crate) async fn iter_chunks<'a, S, E, R>(mut iter: S, builder: &'a mut DataChunkBuilder)
732where
733 StreamExecutorError: From<E>,
734 R: Row,
735 S: Stream<Item = Result<R, E>> + Unpin + 'a,
736{
737 while let Some(data_chunk) = collect_data_chunk_with_builder(&mut iter, builder)
738 .instrument_await("backfill_snapshot_read")
739 .await?
740 {
741 debug_assert!(data_chunk.cardinality() > 0);
742 let ops = vec![Op::Insert; data_chunk.capacity()];
743 let stream_chunk = StreamChunk::from_parts(ops, data_chunk);
744 yield stream_chunk;
745 }
746}
747
748pub(crate) async fn persist_state_per_vnode<S: StateStore, const IS_REPLICATED: bool>(
772 epoch: EpochPair,
773 table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
774 backfill_state: &mut BackfillState,
775 #[cfg(debug_assertions)] state_len: usize,
776 vnodes: impl Iterator<Item = VirtualNode>,
777) -> StreamExecutorResult<()> {
778 for vnode in vnodes {
779 if !backfill_state.need_commit(&vnode) {
780 continue;
781 }
782 let (encoded_prev_state, encoded_current_state) =
783 match backfill_state.get_commit_state(&vnode) {
784 Some((old_state, new_state)) => (old_state, new_state),
785 None => continue,
786 };
787 if let Some(encoded_prev_state) = encoded_prev_state {
788 #[cfg(debug_assertions)]
790 {
791 let pk: &[Datum; 1] = &[Some(vnode.to_scalar().into())];
792 let old_row = table.get_row(pk).await?;
794 match old_row {
795 Some(old_row) => {
796 let inner = old_row.as_inner();
797 assert_eq!(inner, &encoded_prev_state[1..]);
799 assert_ne!(inner, &encoded_current_state[1..]);
800 assert_eq!(old_row.len(), state_len - 1);
801 assert_eq!(encoded_current_state.len(), state_len);
802 }
803 None => {
804 bail!("row {:#?} not found", pk);
805 }
806 }
807 }
808 table.write_record(Record::Update {
809 old_row: &encoded_prev_state[..],
810 new_row: &encoded_current_state[..],
811 });
812 } else {
813 #[cfg(debug_assertions)]
815 {
816 let pk: &[Datum; 1] = &[Some(vnode.to_scalar().into())];
817 let row = table.get_row(pk).await?;
818 assert!(row.is_none(), "row {:#?}", row);
819 assert_eq!(encoded_current_state.len(), state_len);
820 }
821 table.write_record(Record::Insert {
822 new_row: &encoded_current_state[..],
823 });
824 }
825 backfill_state.mark_committed(vnode);
826 }
827
828 table.commit_assert_no_update_vnode_bitmap(epoch).await?;
829 Ok(())
830}
831
832pub(crate) async fn persist_state<S: StateStore, const IS_REPLICATED: bool>(
838 epoch: EpochPair,
839 table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
840 is_finished: bool,
841 current_pos: &Option<OwnedRow>,
842 row_count: u64,
843 old_state: &mut Option<Vec<Datum>>,
844 current_state: &mut [Datum],
845) -> StreamExecutorResult<()> {
846 if let Some(current_pos_inner) = current_pos {
847 build_temporary_state(current_state, is_finished, current_pos_inner, row_count);
849 flush_data(table, epoch, old_state, current_state).await?;
850 *old_state = Some(current_state.into());
851 } else {
852 table.commit_assert_no_update_vnode_bitmap(epoch).await?;
853 }
854 Ok(())
855}
856
857pub fn create_builder(
861 rate_limit: RateLimit,
862 chunk_size: usize,
863 data_types: Vec<DataType>,
864) -> DataChunkBuilder {
865 let batch_size = match rate_limit {
866 RateLimit::Disabled | RateLimit::Pause => chunk_size,
867 RateLimit::Fixed(limit) => min(limit.get() as usize, chunk_size),
868 };
869 let batch_size = max(2, batch_size);
871 DataChunkBuilder::new(data_types, batch_size)
872}
873
874#[cfg(test)]
875mod tests {
876 use std::sync::Arc;
877
878 use super::*;
879
880 #[test]
881 fn test_normalizing_unmatched_updates() {
882 let ops = vec![
883 Op::UpdateDelete,
884 Op::UpdateInsert,
885 Op::UpdateDelete,
886 Op::UpdateInsert,
887 ];
888 let ops: Arc<[Op]> = ops.into();
889
890 {
891 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
892 let mut unmatched_update_delete = true;
893 let mut visible_update_delete = true;
894 let current_visibility = true;
895 normalize_unmatched_updates(
896 &mut new_ops,
897 &mut unmatched_update_delete,
898 &mut visible_update_delete,
899 current_visibility,
900 1,
901 &Op::UpdateInsert,
902 );
903 assert_eq!(
904 &new_ops[..],
905 vec![
906 Op::UpdateDelete,
907 Op::UpdateInsert,
908 Op::UpdateDelete,
909 Op::UpdateInsert
910 ]
911 );
912 }
913 {
914 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
915 let mut unmatched_update_delete = true;
916 let mut visible_update_delete = false;
917 let current_visibility = false;
918 normalize_unmatched_updates(
919 &mut new_ops,
920 &mut unmatched_update_delete,
921 &mut visible_update_delete,
922 current_visibility,
923 1,
924 &Op::UpdateInsert,
925 );
926 assert_eq!(
927 &new_ops[..],
928 vec![
929 Op::UpdateDelete,
930 Op::UpdateInsert,
931 Op::UpdateDelete,
932 Op::UpdateInsert
933 ]
934 );
935 }
936 {
937 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
938 let mut unmatched_update_delete = true;
939 let mut visible_update_delete = true;
940 let current_visibility = false;
941 normalize_unmatched_updates(
942 &mut new_ops,
943 &mut unmatched_update_delete,
944 &mut visible_update_delete,
945 current_visibility,
946 1,
947 &Op::UpdateInsert,
948 );
949 assert_eq!(
950 &new_ops[..],
951 vec![
952 Op::Delete,
953 Op::UpdateInsert,
954 Op::UpdateDelete,
955 Op::UpdateInsert
956 ]
957 );
958 }
959 {
960 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
961 let mut unmatched_update_delete = true;
962 let mut visible_update_delete = false;
963 let current_visibility = true;
964 normalize_unmatched_updates(
965 &mut new_ops,
966 &mut unmatched_update_delete,
967 &mut visible_update_delete,
968 current_visibility,
969 1,
970 &Op::UpdateInsert,
971 );
972 assert_eq!(
973 &new_ops[..],
974 vec![
975 Op::UpdateDelete,
976 Op::Insert,
977 Op::UpdateDelete,
978 Op::UpdateInsert
979 ]
980 );
981 }
982 }
983}