1use std::borrow::Cow;
16use std::collections::HashMap;
17use std::ops::Bound;
18
19use await_tree::InstrumentAwait;
20use futures::Stream;
21use futures::future::try_join_all;
22use futures_async_stream::try_stream;
23use risingwave_common::array::stream_record::Record;
24use risingwave_common::array::{Op, StreamChunk};
25use risingwave_common::bail;
26use risingwave_common::bitmap::BitmapBuilder;
27use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
28use risingwave_common::row::{OwnedRow, Row, RowExt};
29use risingwave_common::types::{DataType, Datum};
30use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
31use risingwave_common::util::epoch::EpochPair;
32use risingwave_common::util::iter_util::ZipEqDebug;
33use risingwave_common::util::sort_util::{OrderType, cmp_datum_iter};
34use risingwave_common::util::value_encoding::BasicSerde;
35use risingwave_common_rate_limit::RateLimit;
36use risingwave_connector::error::ConnectorError;
37use risingwave_connector::source::cdc::external::{CdcOffset, CdcOffsetParseFunc};
38use risingwave_storage::StateStore;
39use risingwave_storage::row_serde::value_serde::ValueRowSerde;
40use risingwave_storage::table::collect_data_chunk_with_builder;
41
42use crate::common::table::state_table::{ReplicatedStateTable, StateTableInner};
43use crate::executor::{
44 Message, PkIndicesRef, StreamExecutorError, StreamExecutorResult, Watermark,
45};
46
47pub const METADATA_STATE_LEN: usize = 3;
49
50#[derive(Clone, Debug)]
51pub struct BackfillState {
52 inner: HashMap<VirtualNode, BackfillStatePerVnode>,
55}
56
57impl BackfillState {
58 pub(crate) fn has_progress(&self) -> bool {
59 self.inner.values().any(|p| {
60 matches!(
61 p.current_state(),
62 &BackfillProgressPerVnode::InProgress { .. }
63 )
64 })
65 }
66
67 pub(crate) fn get_current_state(
68 &mut self,
69 vnode: &VirtualNode,
70 ) -> &mut BackfillProgressPerVnode {
71 &mut self.inner.get_mut(vnode).unwrap().current_state
72 }
73
74 pub(crate) fn get_progress(
76 &self,
77 vnode: &VirtualNode,
78 ) -> StreamExecutorResult<&BackfillProgressPerVnode> {
79 match self.inner.get(vnode) {
80 Some(p) => Ok(p.current_state()),
81 None => bail!(
82 "Backfill progress for vnode {:#?} not found, backfill_state not initialized properly",
83 vnode,
84 ),
85 }
86 }
87
88 pub(crate) fn update_progress(
89 &mut self,
90 vnode: VirtualNode,
91 new_pos: OwnedRow,
92 snapshot_row_count_delta: u64,
93 ) -> StreamExecutorResult<()> {
94 let state = self.get_current_state(&vnode);
95 match state {
96 BackfillProgressPerVnode::NotStarted => {
97 *state = BackfillProgressPerVnode::InProgress {
98 current_pos: new_pos,
99 snapshot_row_count: snapshot_row_count_delta,
100 };
101 }
102 BackfillProgressPerVnode::InProgress {
103 snapshot_row_count, ..
104 } => {
105 *state = BackfillProgressPerVnode::InProgress {
106 current_pos: new_pos,
107 snapshot_row_count: *snapshot_row_count + snapshot_row_count_delta,
108 };
109 }
110 BackfillProgressPerVnode::Completed { .. } => unreachable!(),
111 }
112 Ok(())
113 }
114
115 pub(crate) fn finish_progress(&mut self, vnode: VirtualNode, pos_len: usize) {
116 let finished_placeholder_position = construct_initial_finished_state(pos_len);
117 let current_state = self.get_current_state(&vnode);
118 let (new_pos, snapshot_row_count) = match current_state {
119 BackfillProgressPerVnode::NotStarted => (finished_placeholder_position, 0),
120 BackfillProgressPerVnode::InProgress {
121 current_pos,
122 snapshot_row_count,
123 } => (current_pos.clone(), *snapshot_row_count),
124 BackfillProgressPerVnode::Completed { .. } => {
125 return;
126 }
127 };
128 *current_state = BackfillProgressPerVnode::Completed {
129 current_pos: new_pos,
130 snapshot_row_count,
131 };
132 }
133
134 fn get_commit_state(&self, vnode: &VirtualNode) -> Option<(Option<Vec<Datum>>, Vec<Datum>)> {
136 let new_state = self.inner.get(vnode).unwrap().current_state().clone();
137 let new_encoded_state = match new_state {
138 BackfillProgressPerVnode::NotStarted => unreachable!(),
139 BackfillProgressPerVnode::InProgress {
140 current_pos,
141 snapshot_row_count,
142 } => {
143 let mut encoded_state = vec![None; current_pos.len() + METADATA_STATE_LEN];
144 encoded_state[0] = Some(vnode.to_scalar().into());
145 encoded_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
146 encoded_state[current_pos.len() + 1] = Some(false.into());
147 encoded_state[current_pos.len() + 2] = Some((snapshot_row_count as i64).into());
148 encoded_state
149 }
150 BackfillProgressPerVnode::Completed {
151 current_pos,
152 snapshot_row_count,
153 } => {
154 let mut encoded_state = vec![None; current_pos.len() + METADATA_STATE_LEN];
155 encoded_state[0] = Some(vnode.to_scalar().into());
156 encoded_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
157 encoded_state[current_pos.len() + 1] = Some(true.into());
158 encoded_state[current_pos.len() + 2] = Some((snapshot_row_count as i64).into());
159 encoded_state
160 }
161 };
162 let old_state = self.inner.get(vnode).unwrap().committed_state().clone();
163 let old_encoded_state = match old_state {
164 BackfillProgressPerVnode::NotStarted => None,
165 BackfillProgressPerVnode::InProgress {
166 current_pos,
167 snapshot_row_count,
168 } => {
169 let committed_pos = current_pos;
170 let mut encoded_state = vec![None; committed_pos.len() + METADATA_STATE_LEN];
171 encoded_state[0] = Some(vnode.to_scalar().into());
172 encoded_state[1..committed_pos.len() + 1]
173 .clone_from_slice(committed_pos.as_inner());
174 encoded_state[committed_pos.len() + 1] = Some(false.into());
175 encoded_state[committed_pos.len() + 2] = Some((snapshot_row_count as i64).into());
176 Some(encoded_state)
177 }
178 BackfillProgressPerVnode::Completed {
179 current_pos,
180 snapshot_row_count,
181 } => {
182 let committed_pos = current_pos;
183 let mut encoded_state = vec![None; committed_pos.len() + METADATA_STATE_LEN];
184 encoded_state[0] = Some(vnode.to_scalar().into());
185 encoded_state[1..committed_pos.len() + 1]
186 .clone_from_slice(committed_pos.as_inner());
187 encoded_state[committed_pos.len() + 1] = Some(true.into());
188 encoded_state[committed_pos.len() + 2] = Some((snapshot_row_count as i64).into());
189 Some(encoded_state)
190 }
191 };
192 Some((old_encoded_state, new_encoded_state))
193 }
194
195 fn need_commit(&self, vnode: &VirtualNode) -> bool {
198 let state = self.inner.get(vnode).unwrap();
199 match state.current_state() {
200 s @ BackfillProgressPerVnode::InProgress { .. }
202 | s @ BackfillProgressPerVnode::Completed { .. } => s != state.committed_state(),
203 BackfillProgressPerVnode::NotStarted => false,
204 }
205 }
206
207 fn mark_committed(&mut self, vnode: VirtualNode) {
208 let BackfillStatePerVnode {
209 committed_state,
210 current_state,
211 } = self.inner.get_mut(&vnode).unwrap();
212
213 assert!(matches!(
214 current_state,
215 BackfillProgressPerVnode::InProgress { .. }
216 | BackfillProgressPerVnode::Completed { .. }
217 ));
218 *committed_state = current_state.clone();
219 }
220
221 pub(crate) fn get_snapshot_row_count(&self) -> u64 {
222 self.inner
223 .values()
224 .map(|p| p.get_snapshot_row_count())
225 .sum()
226 }
227}
228
229#[derive(Clone, Debug, PartialEq, Eq)]
230pub struct BackfillStatePerVnode {
231 committed_state: BackfillProgressPerVnode,
232 current_state: BackfillProgressPerVnode,
233}
234
235impl BackfillStatePerVnode {
236 pub(crate) fn new(
237 committed_state: BackfillProgressPerVnode,
238 current_state: BackfillProgressPerVnode,
239 ) -> Self {
240 Self {
241 committed_state,
242 current_state,
243 }
244 }
245
246 pub(crate) fn committed_state(&self) -> &BackfillProgressPerVnode {
247 &self.committed_state
248 }
249
250 pub(crate) fn current_state(&self) -> &BackfillProgressPerVnode {
251 &self.current_state
252 }
253
254 pub(crate) fn get_snapshot_row_count(&self) -> u64 {
255 self.current_state().get_snapshot_row_count()
256 }
257}
258
259impl From<Vec<(VirtualNode, BackfillStatePerVnode)>> for BackfillState {
260 fn from(v: Vec<(VirtualNode, BackfillStatePerVnode)>) -> Self {
261 Self {
262 inner: v.into_iter().collect(),
263 }
264 }
265}
266
267#[derive(Clone, Eq, PartialEq, Debug)]
270pub enum BackfillProgressPerVnode {
271 NotStarted,
273 InProgress {
274 current_pos: OwnedRow,
276 snapshot_row_count: u64,
278 },
279 Completed {
280 current_pos: OwnedRow,
282 snapshot_row_count: u64,
284 },
285}
286
287impl BackfillProgressPerVnode {
288 fn get_snapshot_row_count(&self) -> u64 {
289 match self {
290 BackfillProgressPerVnode::NotStarted => 0,
291 BackfillProgressPerVnode::InProgress {
292 snapshot_row_count, ..
293 }
294 | BackfillProgressPerVnode::Completed {
295 snapshot_row_count, ..
296 } => *snapshot_row_count,
297 }
298 }
299}
300
301pub(crate) fn mark_chunk(
302 chunk: StreamChunk,
303 current_pos: &OwnedRow,
304 pk_in_output_indices: PkIndicesRef<'_>,
305 pk_order: &[OrderType],
306) -> StreamChunk {
307 let chunk = chunk.compact();
308 mark_chunk_inner(chunk, current_pos, pk_in_output_indices, pk_order)
309}
310
311pub(crate) fn mark_cdc_chunk(
312 offset_parse_func: &CdcOffsetParseFunc,
313 chunk: StreamChunk,
314 current_pos: &OwnedRow,
315 pk_in_output_indices: PkIndicesRef<'_>,
316 pk_order: &[OrderType],
317 last_cdc_offset: Option<CdcOffset>,
318) -> StreamExecutorResult<StreamChunk> {
319 let chunk = chunk.compact();
320 mark_cdc_chunk_inner(
321 offset_parse_func,
322 chunk,
323 current_pos,
324 last_cdc_offset,
325 pk_in_output_indices,
326 pk_order,
327 )
328}
329
330pub(crate) fn mark_chunk_ref_by_vnode<S: StateStore, SD: ValueRowSerde>(
335 chunk: &StreamChunk,
336 backfill_state: &BackfillState,
337 pk_in_output_indices: PkIndicesRef<'_>,
338 upstream_table: &ReplicatedStateTable<S, SD>,
339 pk_order: &[OrderType],
340) -> StreamExecutorResult<StreamChunk> {
341 let chunk = chunk.clone();
342 let (data, ops) = chunk.into_parts();
343 let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
344
345 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
346 let mut unmatched_update_delete = false;
347 let mut visible_update_delete = false;
348 for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
349 let pk = row.project(pk_in_output_indices);
350 let vnode = upstream_table.compute_vnode_by_pk(pk);
351 let visible = match backfill_state.get_progress(&vnode)? {
352 BackfillProgressPerVnode::Completed { .. } => true,
354 BackfillProgressPerVnode::NotStarted => false,
356 BackfillProgressPerVnode::InProgress { current_pos, .. } => {
358 cmp_datum_iter(pk.iter(), current_pos.iter(), pk_order.iter().copied()).is_le()
359 }
360 };
361 new_visibility.append(visible);
362
363 normalize_unmatched_updates(
364 &mut new_ops,
365 &mut unmatched_update_delete,
366 &mut visible_update_delete,
367 visible,
368 i,
369 op,
370 );
371 }
372 let (columns, _) = data.into_parts();
373 let chunk = StreamChunk::with_visibility(new_ops, columns, new_visibility.finish());
374 Ok(chunk)
375}
376
377fn mark_chunk_inner(
381 chunk: StreamChunk,
382 current_pos: &OwnedRow,
383 pk_in_output_indices: PkIndicesRef<'_>,
384 pk_order: &[OrderType],
385) -> StreamChunk {
386 let (data, ops) = chunk.into_parts();
387 let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
388 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
389 let mut unmatched_update_delete = false;
390 let mut visible_update_delete = false;
391 for (i, (op, row)) in ops.iter().zip_eq_debug(data.rows()).enumerate() {
392 let lhs = row.project(pk_in_output_indices);
393 let rhs = current_pos;
394 let visible = cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()).is_le();
395 new_visibility.append(visible);
396
397 normalize_unmatched_updates(
398 &mut new_ops,
399 &mut unmatched_update_delete,
400 &mut visible_update_delete,
401 visible,
402 i,
403 op,
404 );
405 }
406 let (columns, _) = data.into_parts();
407 StreamChunk::with_visibility(new_ops, columns, new_visibility.finish())
408}
409
410fn normalize_unmatched_updates(
418 normalized_ops: &mut Cow<'_, [Op]>,
419 unmatched_update_delete: &mut bool,
420 visible_update_delete: &mut bool,
421 current_visibility: bool,
422 current_op_index: usize,
423 current_op: &Op,
424) {
425 if *unmatched_update_delete {
426 assert_eq!(*current_op, Op::UpdateInsert);
427 let visible_update_insert = current_visibility;
428 match (visible_update_delete, visible_update_insert) {
429 (true, false) => {
430 let ops = normalized_ops.to_mut();
432 ops[current_op_index - 1] = Op::Delete;
433 }
434 (false, true) => {
435 let ops = normalized_ops.to_mut();
437 ops[current_op_index] = Op::Insert;
438 }
439 (true, true) | (false, false) => {}
440 }
441 *unmatched_update_delete = false;
442 } else {
443 match current_op {
444 Op::UpdateDelete => {
445 *unmatched_update_delete = true;
446 *visible_update_delete = current_visibility;
447 }
448 Op::UpdateInsert => {
449 unreachable!("UpdateInsert should not be present without UpdateDelete")
450 }
451 _ => {}
452 }
453 }
454}
455
456fn mark_cdc_chunk_inner(
457 offset_parse_func: &CdcOffsetParseFunc,
458 chunk: StreamChunk,
459 current_pos: &OwnedRow,
460 last_cdc_offset: Option<CdcOffset>,
461 pk_in_output_indices: PkIndicesRef<'_>,
462 pk_order: &[OrderType],
463) -> StreamExecutorResult<StreamChunk> {
464 let (data, ops) = chunk.into_parts();
465 let mut new_visibility = BitmapBuilder::with_capacity(ops.len());
466
467 let offset_col_idx = data.dimension() - 1;
469 for v in data.rows().map(|row| {
470 let offset_datum = row.datum_at(offset_col_idx).unwrap();
471 let event_offset = (*offset_parse_func)(offset_datum.into_utf8())?;
472 let visible = {
473 let in_binlog_range = if let Some(binlog_low) = &last_cdc_offset {
475 binlog_low <= &event_offset
476 } else {
477 true
478 };
479
480 if in_binlog_range {
481 let lhs = row.project(pk_in_output_indices);
482 let rhs = current_pos;
483 cmp_datum_iter(lhs.iter(), rhs.iter(), pk_order.iter().copied()).is_le()
484 } else {
485 false
486 }
487 };
488 Ok::<_, ConnectorError>(visible)
489 }) {
490 new_visibility.append(v?);
491 }
492
493 let (columns, _) = data.into_parts();
494 Ok(StreamChunk::with_visibility(
495 ops,
496 columns,
497 new_visibility.finish(),
498 ))
499}
500
501pub(crate) fn mapping_chunk(chunk: StreamChunk, output_indices: &[usize]) -> StreamChunk {
503 let (ops, columns, visibility) = chunk.into_inner();
504 let mapped_columns = output_indices.iter().map(|&i| columns[i].clone()).collect();
505 StreamChunk::with_visibility(ops, mapped_columns, visibility)
506}
507
508fn mapping_watermark(watermark: Watermark, upstream_indices: &[usize]) -> Option<Watermark> {
509 watermark.transform_with_indices(upstream_indices)
510}
511
512pub(crate) fn mapping_message(msg: Message, upstream_indices: &[usize]) -> Option<Message> {
513 match msg {
514 Message::Barrier(_) => Some(msg),
515 Message::Watermark(watermark) => {
516 mapping_watermark(watermark, upstream_indices).map(Message::Watermark)
517 }
518 Message::Chunk(chunk) => Some(Message::Chunk(mapping_chunk(chunk, upstream_indices))),
519 }
520}
521
522pub(crate) async fn get_progress_per_vnode<S: StateStore, const IS_REPLICATED: bool>(
525 state_table: &StateTableInner<S, BasicSerde, IS_REPLICATED>,
526) -> StreamExecutorResult<Vec<(VirtualNode, BackfillStatePerVnode)>> {
527 debug_assert!(!state_table.vnodes().is_empty());
528 let vnodes = state_table.vnodes().iter_vnodes();
529 let mut result = Vec::with_capacity(state_table.vnodes().len());
530 let vnode_keys = vnodes.map(|vnode| {
532 let datum: [Datum; 1] = [Some(vnode.to_scalar().into())];
533 datum
534 });
535 let tasks = vnode_keys.map(|vnode_key| state_table.get_row(vnode_key));
536 let state_for_vnodes = try_join_all(tasks).await?;
540 for (vnode, state_for_vnode) in state_table
541 .vnodes()
542 .iter_vnodes()
543 .zip_eq_debug(state_for_vnodes)
544 {
545 let backfill_progress = match state_for_vnode {
546 Some(row) => {
548 let snapshot_row_count = row.as_inner().get(row.len() - 1).unwrap();
551 let snapshot_row_count = (*snapshot_row_count.as_ref().unwrap().as_int64()) as u64;
552
553 let vnode_is_finished = row.as_inner().get(row.len() - 2).unwrap();
556 let vnode_is_finished = vnode_is_finished.as_ref().unwrap();
557
558 let current_pos = row.as_inner().get(..row.len() - 2).unwrap();
560 let current_pos = current_pos.into_owned_row();
561
562 if *vnode_is_finished.as_bool() {
564 BackfillStatePerVnode::new(
565 BackfillProgressPerVnode::Completed {
566 current_pos: current_pos.clone(),
567 snapshot_row_count,
568 },
569 BackfillProgressPerVnode::Completed {
570 current_pos,
571 snapshot_row_count,
572 },
573 )
574 } else {
575 BackfillStatePerVnode::new(
576 BackfillProgressPerVnode::InProgress {
577 current_pos: current_pos.clone(),
578 snapshot_row_count,
579 },
580 BackfillProgressPerVnode::InProgress {
581 current_pos,
582 snapshot_row_count,
583 },
584 )
585 }
586 }
587 None => BackfillStatePerVnode::new(
589 BackfillProgressPerVnode::NotStarted,
590 BackfillProgressPerVnode::NotStarted,
591 ),
592 };
593 result.push((vnode, backfill_progress));
594 }
595 assert_eq!(result.len(), state_table.vnodes().count_ones());
596 Ok(result)
597}
598
599pub(crate) async fn flush_data<S: StateStore, const IS_REPLICATED: bool>(
601 table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
602 epoch: EpochPair,
603 old_state: &mut Option<Vec<Datum>>,
604 current_partial_state: &mut [Datum],
605) -> StreamExecutorResult<()> {
606 let vnodes = table.vnodes().clone();
607 if let Some(old_state) = old_state {
608 if old_state[1..] != current_partial_state[1..] {
609 vnodes.iter_vnodes_scalar().for_each(|vnode| {
610 let datum = Some(vnode.into());
611 current_partial_state[0].clone_from(&datum);
612 old_state[0] = datum;
613 table.write_record(Record::Update {
614 old_row: &old_state[..],
615 new_row: &(*current_partial_state),
616 })
617 });
618 }
619 } else {
620 vnodes.iter_vnodes_scalar().for_each(|vnode| {
622 let datum = Some(vnode.into());
623 current_partial_state[0] = datum;
625 table.write_record(Record::Insert {
626 new_row: &(*current_partial_state),
627 })
628 });
629 }
630 table.commit_assert_no_update_vnode_bitmap(epoch).await
631}
632
633pub(crate) fn build_temporary_state(
640 row_state: &mut [Datum],
641 is_finished: bool,
642 current_pos: &OwnedRow,
643 row_count: u64,
644) {
645 row_state[1..current_pos.len() + 1].clone_from_slice(current_pos.as_inner());
646 row_state[current_pos.len() + 1] = Some(is_finished.into());
647 row_state[current_pos.len() + 2] = Some((row_count as i64).into());
648}
649
650pub(crate) fn update_pos_by_vnode(
652 vnode: VirtualNode,
653 chunk: &StreamChunk,
654 pk_in_output_indices: &[usize],
655 backfill_state: &mut BackfillState,
656 snapshot_row_count_delta: u64,
657) -> StreamExecutorResult<()> {
658 let new_pos = get_new_pos(chunk, pk_in_output_indices);
659 assert_eq!(new_pos.len(), pk_in_output_indices.len());
660 backfill_state.update_progress(vnode, new_pos, snapshot_row_count_delta)?;
661 Ok(())
662}
663
664pub(crate) fn get_new_pos(chunk: &StreamChunk, pk_in_output_indices: &[usize]) -> OwnedRow {
667 chunk
668 .rows()
669 .last()
670 .unwrap()
671 .1
672 .project(pk_in_output_indices)
673 .into_owned_row()
674}
675
676pub(crate) fn get_cdc_chunk_last_offset(
677 offset_parse_func: &CdcOffsetParseFunc,
678 chunk: &StreamChunk,
679) -> StreamExecutorResult<Option<CdcOffset>> {
680 let row = chunk.rows().last().unwrap().1;
681 let offset_col = row.iter().last().unwrap();
682 let output =
683 offset_col.map(|scalar| Ok::<_, ConnectorError>((*offset_parse_func)(scalar.into_utf8()))?);
684 output.transpose().map_err(|e| e.into())
685}
686
687pub(crate) fn construct_initial_finished_state(pos_len: usize) -> OwnedRow {
693 OwnedRow::new(vec![None; pos_len])
694}
695
696pub(crate) fn compute_bounds(
697 pk_indices: &[usize],
698 current_pos: Option<OwnedRow>,
699) -> Option<(Bound<OwnedRow>, Bound<OwnedRow>)> {
700 if let Some(current_pos) = current_pos {
703 if current_pos.is_empty() {
707 assert!(pk_indices.is_empty());
708 return None;
709 }
710
711 Some((Bound::Excluded(current_pos), Bound::Unbounded))
712 } else {
713 Some((Bound::Unbounded, Bound::Unbounded))
714 }
715}
716
717#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
718pub(crate) async fn iter_chunks<'a, S, E, R>(mut iter: S, builder: &'a mut DataChunkBuilder)
719where
720 StreamExecutorError: From<E>,
721 R: Row,
722 S: Stream<Item = Result<R, E>> + Unpin + 'a,
723{
724 while let Some(data_chunk) = collect_data_chunk_with_builder(&mut iter, builder)
725 .instrument_await("backfill_snapshot_read")
726 .await?
727 {
728 debug_assert!(data_chunk.cardinality() > 0);
729 let ops = vec![Op::Insert; data_chunk.capacity()];
730 let stream_chunk = StreamChunk::from_parts(ops, data_chunk);
731 yield stream_chunk;
732 }
733}
734
735pub(crate) async fn persist_state_per_vnode<S: StateStore, const IS_REPLICATED: bool>(
759 epoch: EpochPair,
760 table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
761 backfill_state: &mut BackfillState,
762 #[cfg(debug_assertions)] state_len: usize,
763 vnodes: impl Iterator<Item = VirtualNode>,
764) -> StreamExecutorResult<()> {
765 for vnode in vnodes {
766 if !backfill_state.need_commit(&vnode) {
767 continue;
768 }
769 let (encoded_prev_state, encoded_current_state) =
770 match backfill_state.get_commit_state(&vnode) {
771 Some((old_state, new_state)) => (old_state, new_state),
772 None => continue,
773 };
774 if let Some(encoded_prev_state) = encoded_prev_state {
775 #[cfg(debug_assertions)]
777 {
778 let pk: &[Datum; 1] = &[Some(vnode.to_scalar().into())];
779 let old_row = table.get_row(pk).await?;
781 match old_row {
782 Some(old_row) => {
783 let inner = old_row.as_inner();
784 assert_eq!(inner, &encoded_prev_state[1..]);
786 assert_ne!(inner, &encoded_current_state[1..]);
787 assert_eq!(old_row.len(), state_len - 1);
788 assert_eq!(encoded_current_state.len(), state_len);
789 }
790 None => {
791 bail!("row {:#?} not found", pk);
792 }
793 }
794 }
795 table.write_record(Record::Update {
796 old_row: &encoded_prev_state[..],
797 new_row: &encoded_current_state[..],
798 });
799 } else {
800 #[cfg(debug_assertions)]
802 {
803 let pk: &[Datum; 1] = &[Some(vnode.to_scalar().into())];
804 let row = table.get_row(pk).await?;
805 assert!(row.is_none(), "row {:#?}", row);
806 assert_eq!(encoded_current_state.len(), state_len);
807 }
808 table.write_record(Record::Insert {
809 new_row: &encoded_current_state[..],
810 });
811 }
812 backfill_state.mark_committed(vnode);
813 }
814
815 table.commit_assert_no_update_vnode_bitmap(epoch).await?;
816 Ok(())
817}
818
819pub(crate) async fn persist_state<S: StateStore, const IS_REPLICATED: bool>(
825 epoch: EpochPair,
826 table: &mut StateTableInner<S, BasicSerde, IS_REPLICATED>,
827 is_finished: bool,
828 current_pos: &Option<OwnedRow>,
829 row_count: u64,
830 old_state: &mut Option<Vec<Datum>>,
831 current_state: &mut [Datum],
832) -> StreamExecutorResult<()> {
833 if let Some(current_pos_inner) = current_pos {
834 build_temporary_state(current_state, is_finished, current_pos_inner, row_count);
836 flush_data(table, epoch, old_state, current_state).await?;
837 *old_state = Some(current_state.into());
838 } else {
839 table.commit_assert_no_update_vnode_bitmap(epoch).await?;
840 }
841 Ok(())
842}
843
844pub fn create_builder(
848 rate_limit: RateLimit,
849 chunk_size: usize,
850 data_types: Vec<DataType>,
851) -> DataChunkBuilder {
852 let batch_size = match rate_limit {
853 RateLimit::Disabled | RateLimit::Pause => chunk_size,
854 RateLimit::Fixed(limit) if limit.get() as usize >= chunk_size => chunk_size,
855 RateLimit::Fixed(limit) => limit.get() as usize,
856 };
857 DataChunkBuilder::new(data_types, batch_size)
858}
859
860#[cfg(test)]
861mod tests {
862 use std::sync::Arc;
863
864 use super::*;
865
866 #[test]
867 fn test_normalizing_unmatched_updates() {
868 let ops = vec![
869 Op::UpdateDelete,
870 Op::UpdateInsert,
871 Op::UpdateDelete,
872 Op::UpdateInsert,
873 ];
874 let ops: Arc<[Op]> = ops.into();
875
876 {
877 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
878 let mut unmatched_update_delete = true;
879 let mut visible_update_delete = true;
880 let current_visibility = true;
881 normalize_unmatched_updates(
882 &mut new_ops,
883 &mut unmatched_update_delete,
884 &mut visible_update_delete,
885 current_visibility,
886 1,
887 &Op::UpdateInsert,
888 );
889 assert_eq!(
890 &new_ops[..],
891 vec![
892 Op::UpdateDelete,
893 Op::UpdateInsert,
894 Op::UpdateDelete,
895 Op::UpdateInsert
896 ]
897 );
898 }
899 {
900 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
901 let mut unmatched_update_delete = true;
902 let mut visible_update_delete = false;
903 let current_visibility = false;
904 normalize_unmatched_updates(
905 &mut new_ops,
906 &mut unmatched_update_delete,
907 &mut visible_update_delete,
908 current_visibility,
909 1,
910 &Op::UpdateInsert,
911 );
912 assert_eq!(
913 &new_ops[..],
914 vec![
915 Op::UpdateDelete,
916 Op::UpdateInsert,
917 Op::UpdateDelete,
918 Op::UpdateInsert
919 ]
920 );
921 }
922 {
923 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
924 let mut unmatched_update_delete = true;
925 let mut visible_update_delete = true;
926 let current_visibility = false;
927 normalize_unmatched_updates(
928 &mut new_ops,
929 &mut unmatched_update_delete,
930 &mut visible_update_delete,
931 current_visibility,
932 1,
933 &Op::UpdateInsert,
934 );
935 assert_eq!(
936 &new_ops[..],
937 vec![
938 Op::Delete,
939 Op::UpdateInsert,
940 Op::UpdateDelete,
941 Op::UpdateInsert
942 ]
943 );
944 }
945 {
946 let mut new_ops: Cow<'_, [Op]> = Cow::Borrowed(ops.as_ref());
947 let mut unmatched_update_delete = true;
948 let mut visible_update_delete = false;
949 let current_visibility = true;
950 normalize_unmatched_updates(
951 &mut new_ops,
952 &mut unmatched_update_delete,
953 &mut visible_update_delete,
954 current_visibility,
955 1,
956 &Op::UpdateInsert,
957 );
958 assert_eq!(
959 &new_ops[..],
960 vec![
961 Op::UpdateDelete,
962 Op::Insert,
963 Op::UpdateDelete,
964 Op::UpdateInsert
965 ]
966 );
967 }
968 }
969}