1use std::collections::HashMap;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use either::Either;
20use itertools::Itertools;
21use prometheus::core::{AtomicU64, GenericCounter};
22use risingwave_common::array::ArrayRef;
23use risingwave_common::catalog::{ColumnId, TableId};
24use risingwave_common::metrics::{GLOBAL_ERROR_METRICS, LabelGuardedMetric};
25use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::epoch::{Epoch, EpochPair};
28use risingwave_connector::parser::schema_change::SchemaChangeEnvelope;
29use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
30use risingwave_connector::source::reader::reader::SourceReader;
31use risingwave_connector::source::{
32 ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitImpl, SplitMetaData,
33 StreamChunkWithState, WaitCheckpointTask,
34};
35use risingwave_hummock_sdk::HummockReadEpoch;
36use risingwave_storage::store::TryWaitEpochOptions;
37use serde_json;
38use thiserror_ext::AsReport;
39use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
40use tokio::sync::{mpsc, oneshot};
41use tokio::time::Instant;
42
43use super::executor_core::StreamSourceCore;
44use super::{barrier_to_message_stream, get_split_offset_col_idx, prune_additional_cols};
45use crate::common::rate_limit::limited_chunk_size;
46use crate::executor::UpdateMutation;
47use crate::executor::prelude::*;
48use crate::executor::source::reader_stream::StreamReaderBuilder;
49use crate::executor::stream_reader::StreamReaderWithPause;
50use crate::task::LocalBarrierManager;
51
52pub const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
55
56fn extract_split_offset(split: &SplitImpl) -> Option<u64> {
61 match split {
62 SplitImpl::PostgresCdc(pg_split) => {
63 let offset_str = pg_split.start_offset().as_ref()?;
64 extract_pg_cdc_lsn_from_offset(offset_str)
65 }
66 _ => None,
67 }
68}
69
70fn extract_pg_cdc_lsn_from_offset(offset_str: &str) -> Option<u64> {
73 let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
74 let source_offset = offset.get("sourceOffset")?;
75 let lsn = source_offset.get("lsn")?;
76 lsn.as_u64()
77}
78
79pub struct SourceExecutor<S: StateStore> {
80 actor_ctx: ActorContextRef,
81
82 stream_source_core: StreamSourceCore<S>,
84
85 metrics: Arc<StreamingMetrics>,
87
88 barrier_receiver: Option<UnboundedReceiver<Barrier>>,
90
91 system_params: SystemParamsReaderRef,
93
94 rate_limit_rps: Option<u32>,
96
97 is_shared_non_cdc: bool,
98
99 barrier_manager: LocalBarrierManager,
101}
102
103impl<S: StateStore> SourceExecutor<S> {
104 #[expect(clippy::too_many_arguments)]
105 pub fn new(
106 actor_ctx: ActorContextRef,
107 stream_source_core: StreamSourceCore<S>,
108 metrics: Arc<StreamingMetrics>,
109 barrier_receiver: UnboundedReceiver<Barrier>,
110 system_params: SystemParamsReaderRef,
111 rate_limit_rps: Option<u32>,
112 is_shared_non_cdc: bool,
113 barrier_manager: LocalBarrierManager,
114 ) -> Self {
115 Self {
116 actor_ctx,
117 stream_source_core,
118 metrics,
119 barrier_receiver: Some(barrier_receiver),
120 system_params,
121 rate_limit_rps,
122 is_shared_non_cdc,
123 barrier_manager,
124 }
125 }
126
127 fn stream_reader_builder(&self, source_desc: SourceDesc) -> StreamReaderBuilder {
128 StreamReaderBuilder {
129 source_desc,
130 rate_limit: self.rate_limit_rps,
131 source_id: self.stream_source_core.source_id,
132 source_name: self.stream_source_core.source_name.clone(),
133 is_auto_schema_change_enable: self.is_auto_schema_change_enable(),
134 actor_ctx: self.actor_ctx.clone(),
135 reader_stream: None,
136 }
137 }
138
139 async fn spawn_wait_checkpoint_worker(
140 core: &StreamSourceCore<S>,
141 source_reader: SourceReader,
142 metrics: Arc<StreamingMetrics>,
143 ) -> StreamExecutorResult<Option<WaitCheckpointTaskBuilder>> {
144 let Some(initial_task) = source_reader.create_wait_checkpoint_task().await? else {
145 return Ok(None);
146 };
147 let (wait_checkpoint_tx, wait_checkpoint_rx) = mpsc::unbounded_channel();
148 let wait_checkpoint_worker = WaitCheckpointWorker {
149 wait_checkpoint_rx,
150 state_store: core.split_state_store.state_table().state_store().clone(),
151 table_id: core.split_state_store.state_table().table_id().into(),
152 metrics,
153 };
154 tokio::spawn(wait_checkpoint_worker.run());
155 Ok(Some(WaitCheckpointTaskBuilder {
156 wait_checkpoint_tx,
157 source_reader,
158 building_task: initial_task,
159 }))
160 }
161
162 pub fn prepare_source_stream_build(
164 &self,
165 source_desc: &SourceDesc,
166 ) -> (Vec<ColumnId>, SourceContext) {
167 let column_ids = source_desc
168 .columns
169 .iter()
170 .map(|column_desc| column_desc.column_id)
171 .collect_vec();
172
173 let (schema_change_tx, mut schema_change_rx) =
174 mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16);
175 let schema_change_tx = if self.is_auto_schema_change_enable() {
176 let meta_client = self.actor_ctx.meta_client.clone();
177 let _join_handle = tokio::task::spawn(async move {
179 while let Some((schema_change, finish_tx)) = schema_change_rx.recv().await {
180 let table_ids = schema_change.table_ids();
181 tracing::info!(
182 target: "auto_schema_change",
183 "recv a schema change event for tables: {:?}", table_ids);
184 if let Some(ref meta_client) = meta_client {
186 match meta_client
187 .auto_schema_change(schema_change.to_protobuf())
188 .await
189 {
190 Ok(_) => {
191 tracing::info!(
192 target: "auto_schema_change",
193 "schema change success for tables: {:?}", table_ids);
194 finish_tx.send(()).unwrap();
195 }
196 Err(e) => {
197 tracing::error!(
198 target: "auto_schema_change",
199 error = ?e.as_report(), "schema change error");
200 finish_tx.send(()).unwrap();
201 }
202 }
203 }
204 }
205 });
206 Some(schema_change_tx)
207 } else {
208 info!("auto schema change is disabled in config");
209 None
210 };
211 let source_ctx = SourceContext::new(
212 self.actor_ctx.id,
213 self.stream_source_core.source_id,
214 self.actor_ctx.fragment_id,
215 self.stream_source_core.source_name.clone(),
216 source_desc.metrics.clone(),
217 SourceCtrlOpts {
218 chunk_size: limited_chunk_size(self.rate_limit_rps),
219 split_txn: self.rate_limit_rps.is_some(), },
221 source_desc.source.config.clone(),
222 schema_change_tx,
223 );
224
225 (column_ids, source_ctx)
226 }
227
228 fn is_batch_source(&self) -> bool {
230 self.stream_source_core.is_batch_source
231 }
232
233 fn refresh_batch_splits(&mut self) -> StreamExecutorResult<Vec<SplitImpl>> {
235 debug_assert!(self.is_batch_source());
236 let core = &self.stream_source_core;
237 let mut split = core.get_batch_split();
238 split.refresh();
239 Ok(vec![split.into()])
240 }
241
242 fn is_auto_schema_change_enable(&self) -> bool {
243 self.actor_ctx
244 .streaming_config
245 .developer
246 .enable_auto_schema_change
247 }
248
249 #[inline]
251 fn get_metric_labels(&self) -> [String; 4] {
252 [
253 self.stream_source_core.source_id.to_string(),
254 self.stream_source_core.source_name.clone(),
255 self.actor_ctx.id.to_string(),
256 self.actor_ctx.fragment_id.to_string(),
257 ]
258 }
259
260 async fn apply_split_change_after_yield_barrier<const BIASED: bool>(
269 &mut self,
270 barrier_epoch: EpochPair,
271 source_desc: &SourceDesc,
272 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
273 apply_mutation: ApplyMutationAfterBarrier<'_>,
274 ) -> StreamExecutorResult<()> {
275 {
276 let mut should_rebuild_stream = false;
277 match apply_mutation {
278 ApplyMutationAfterBarrier::SplitChange {
279 target_splits,
280 should_trim_state,
281 split_change_count,
282 } => {
283 split_change_count.inc();
284 if self
285 .update_state_if_changed(barrier_epoch, target_splits, should_trim_state)
286 .await?
287 {
288 should_rebuild_stream = true;
289 }
290 }
291 ApplyMutationAfterBarrier::RefreshBatchSplits(splits) => {
292 self.stream_source_core.latest_split_info =
294 splits.into_iter().map(|s| (s.id(), s)).collect();
295 should_rebuild_stream = true;
296 }
297 ApplyMutationAfterBarrier::ConnectorPropsChange => {
298 should_rebuild_stream = true;
299 }
300 }
301
302 if should_rebuild_stream {
303 self.rebuild_stream_reader(source_desc, stream)?;
304 }
305 }
306
307 Ok(())
308 }
309
310 async fn update_state_if_changed(
312 &mut self,
313 barrier_epoch: EpochPair,
314 target_splits: Vec<SplitImpl>,
315 should_trim_state: bool,
316 ) -> StreamExecutorResult<bool> {
317 let core = &mut self.stream_source_core;
318
319 let target_splits: HashMap<_, _> = target_splits
320 .into_iter()
321 .map(|split| (split.id(), split))
322 .collect();
323
324 let mut target_state: HashMap<SplitId, SplitImpl> =
325 HashMap::with_capacity(target_splits.len());
326
327 let mut split_changed = false;
328
329 let committed_reader = core
330 .split_state_store
331 .new_committed_reader(barrier_epoch)
332 .await?;
333
334 for (split_id, split) in target_splits {
336 if let Some(s) = core.latest_split_info.get(&split_id) {
337 target_state.insert(split_id, s.clone());
340 } else {
341 split_changed = true;
342 let initial_state = if let Some(recover_state) = committed_reader
345 .try_recover_from_state_store(&split)
346 .await?
347 {
348 recover_state
349 } else {
350 split
351 };
352
353 core.updated_splits_in_epoch
354 .entry(split_id.clone())
355 .or_insert_with(|| initial_state.clone());
356
357 target_state.insert(split_id, initial_state);
358 }
359 }
360
361 for existing_split_id in core.latest_split_info.keys() {
363 if !target_state.contains_key(existing_split_id) {
364 tracing::info!("split dropping detected: {}", existing_split_id);
365 split_changed = true;
366 }
367 }
368
369 if split_changed {
370 tracing::info!(
371 actor_id = self.actor_ctx.id,
372 state = ?target_state,
373 "apply split change"
374 );
375
376 core.updated_splits_in_epoch
377 .retain(|split_id, _| target_state.contains_key(split_id));
378
379 let dropped_splits = core
380 .latest_split_info
381 .extract_if(|split_id, _| !target_state.contains_key(split_id))
382 .map(|(_, split)| split)
383 .collect_vec();
384
385 if should_trim_state && !dropped_splits.is_empty() {
386 core.split_state_store.trim_state(&dropped_splits).await?;
388 }
389
390 core.latest_split_info = target_state;
391 }
392
393 Ok(split_changed)
394 }
395
396 fn rebuild_stream_reader_from_error<const BIASED: bool>(
398 &mut self,
399 source_desc: &SourceDesc,
400 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
401 e: StreamExecutorError,
402 ) -> StreamExecutorResult<()> {
403 let core = &mut self.stream_source_core;
404 tracing::error!(
405 error = ?e.as_report(),
406 actor_id = self.actor_ctx.id,
407 source_id = %core.source_id,
408 "stream source reader error",
409 );
410 GLOBAL_ERROR_METRICS.user_source_error.report([
411 e.variant_name().to_owned(),
412 core.source_id.to_string(),
413 core.source_name.to_owned(),
414 self.actor_ctx.fragment_id.to_string(),
415 ]);
416
417 self.rebuild_stream_reader(source_desc, stream)
418 }
419
420 fn rebuild_stream_reader<const BIASED: bool>(
421 &mut self,
422 source_desc: &SourceDesc,
423 stream: &mut StreamReaderWithPause<BIASED, StreamChunkWithState>,
424 ) -> StreamExecutorResult<()> {
425 let core = &mut self.stream_source_core;
426 let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();
427
428 tracing::info!(
429 "actor {:?} apply source split change to {:?}",
430 self.actor_ctx.id,
431 target_state
432 );
433
434 let reader_stream_builder = self.stream_reader_builder(source_desc.clone());
436 let reader_stream =
437 reader_stream_builder.into_retry_stream(Some(target_state.clone()), false);
438
439 stream.replace_data_stream(reader_stream);
440
441 Ok(())
442 }
443
444 async fn persist_state_and_clear_cache(
445 &mut self,
446 epoch: EpochPair,
447 ) -> StreamExecutorResult<HashMap<SplitId, SplitImpl>> {
448 let core = &mut self.stream_source_core;
449
450 let cache = core
451 .updated_splits_in_epoch
452 .values()
453 .map(|split_impl| split_impl.to_owned())
454 .collect_vec();
455
456 if !cache.is_empty() {
457 tracing::debug!(state = ?cache, "take snapshot");
458
459 let source_id = core.source_id.to_string();
461 for split_impl in &cache {
462 if let Some(state_table_lsn_value) = extract_split_offset(split_impl) {
464 self.metrics
465 .pg_cdc_state_table_lsn
466 .with_guarded_label_values(&[&source_id])
467 .set(state_table_lsn_value as i64);
468 }
469 }
470
471 core.split_state_store.set_states(cache).await?;
472 }
473
474 core.split_state_store.commit(epoch).await?;
476
477 let updated_splits = core.updated_splits_in_epoch.clone();
478
479 core.updated_splits_in_epoch.clear();
480
481 Ok(updated_splits)
482 }
483
484 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
486 let core = &mut self.stream_source_core;
487 core.split_state_store.try_flush().await?;
488
489 Ok(())
490 }
491
492 #[try_stream(ok = Message, error = StreamExecutorError)]
497 async fn execute_inner(mut self) {
498 let mut barrier_receiver = self.barrier_receiver.take().unwrap();
499 let first_barrier = barrier_receiver
500 .recv()
501 .instrument_await("source_recv_first_barrier")
502 .await
503 .ok_or_else(|| {
504 anyhow!(
505 "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
506 self.actor_ctx.id,
507 self.stream_source_core.source_id
508 )
509 })?;
510 let first_epoch = first_barrier.epoch;
511 let mut boot_state =
512 if let Some(splits) = first_barrier.initial_split_assignment(self.actor_ctx.id) {
513 tracing::debug!(?splits, "boot with splits");
514 splits.to_vec()
515 } else {
516 Vec::default()
517 };
518 let is_pause_on_startup = first_barrier.is_pause_on_startup();
519 let mut is_uninitialized = first_barrier.is_newly_added(self.actor_ctx.id);
520
521 yield Message::Barrier(first_barrier);
522
523 let mut core = self.stream_source_core;
524 let source_id = core.source_id;
525
526 let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap();
528 let mut source_desc = source_desc_builder
529 .build()
530 .map_err(StreamExecutorError::connector_error)?;
531
532 let mut wait_checkpoint_task_builder = Self::spawn_wait_checkpoint_worker(
533 &core,
534 source_desc.source.clone(),
535 self.metrics.clone(),
536 )
537 .await?;
538
539 let (Some(split_idx), Some(offset_idx)) = get_split_offset_col_idx(&source_desc.columns)
540 else {
541 unreachable!("Partition and offset columns must be set.");
542 };
543
544 core.split_state_store.init_epoch(first_epoch).await?;
545 {
546 let committed_reader = core
547 .split_state_store
548 .new_committed_reader(first_epoch)
549 .await?;
550 for ele in &mut boot_state {
551 if let Some(recover_state) =
552 committed_reader.try_recover_from_state_store(ele).await?
553 {
554 *ele = recover_state;
555 is_uninitialized = false;
557 } else {
558 core.updated_splits_in_epoch.insert(ele.id(), ele.clone());
562 }
563 }
564 }
565
566 core.init_split_state(boot_state.clone());
568
569 self.stream_source_core = core;
571
572 let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
573 tracing::debug!(state = ?recover_state, "start with state");
574
575 let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
576 let mut reader_stream_builder = self.stream_reader_builder(source_desc.clone());
577 let mut latest_splits = None;
578 if is_uninitialized {
580 let create_split_reader_result = reader_stream_builder
581 .fetch_latest_splits(recover_state.clone(), self.is_shared_non_cdc)
582 .await?;
583 latest_splits = create_split_reader_result.latest_splits;
584 }
585
586 if let Some(latest_splits) = latest_splits {
587 self.stream_source_core
590 .updated_splits_in_epoch
591 .extend(latest_splits.into_iter().map(|s| (s.id(), s)));
592 }
593 let mut stream = StreamReaderWithPause::<true, StreamChunkWithState>::new(
596 barrier_stream,
597 reader_stream_builder
598 .into_retry_stream(recover_state, is_uninitialized && self.is_shared_non_cdc),
599 );
600 let mut command_paused = false;
601
602 if is_pause_on_startup {
604 tracing::info!("source paused on startup");
605 stream.pause_stream();
606 command_paused = true;
607 }
608
609 let mut max_wait_barrier_time_ms =
612 self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES;
613 let mut last_barrier_time = Instant::now();
614 let mut self_paused = false;
615
616 let source_output_row_count = self
617 .metrics
618 .source_output_row_count
619 .with_guarded_label_values(&self.get_metric_labels());
620
621 let source_split_change_count = self
622 .metrics
623 .source_split_change_count
624 .with_guarded_label_values(&self.get_metric_labels());
625
626 let mut is_refreshing = false;
627
628 while let Some(msg) = stream.next().await {
629 let Ok(msg) = msg else {
630 tokio::time::sleep(Duration::from_millis(1000)).await;
631 self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err())?;
632 continue;
633 };
634
635 match msg {
636 Either::Left(Message::Barrier(barrier)) => {
638 last_barrier_time = Instant::now();
639
640 if self_paused {
641 self_paused = false;
642 if !command_paused {
644 stream.resume_stream();
645 }
646 }
647
648 let epoch = barrier.epoch;
649
650 if barrier.is_checkpoint() && self.is_batch_source() && is_refreshing {
653 let batch_split = self.stream_source_core.get_batch_split();
654 if batch_split.finished() {
655 tracing::info!(?epoch, "emitting load finish");
656 self.barrier_manager.report_source_load_finished(
657 epoch,
658 self.actor_ctx.id,
659 source_id.table_id(),
660 source_id.table_id(),
661 );
662 is_refreshing = false;
663 }
664 }
665
666 let mut split_change = None;
667
668 if let Some(mutation) = barrier.mutation.as_deref() {
669 match mutation {
670 Mutation::Pause => {
671 command_paused = true;
672 stream.pause_stream()
673 }
674 Mutation::Resume => {
675 command_paused = false;
676 stream.resume_stream()
677 }
678 Mutation::SourceChangeSplit(actor_splits) => {
679 tracing::info!(
680 actor_id = self.actor_ctx.id,
681 actor_splits = ?actor_splits,
682 "source change split received"
683 );
684
685 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
686 |target_splits| {
687 (
688 &source_desc,
689 &mut stream,
690 ApplyMutationAfterBarrier::SplitChange {
691 target_splits,
692 should_trim_state: true,
693 split_change_count: &source_split_change_count,
694 },
695 )
696 },
697 );
698 }
699
700 Mutation::ConnectorPropsChange(maybe_mutation) => {
701 if let Some(new_props) = maybe_mutation.get(&source_id.table_id()) {
702 tracing::info!(
704 "updating source properties from {:?} to {:?}",
705 source_desc.source.config,
706 new_props
707 );
708 source_desc.update_reader(new_props.clone())?;
709 split_change = Some((
711 &source_desc,
712 &mut stream,
713 ApplyMutationAfterBarrier::ConnectorPropsChange,
714 ));
715 }
716 }
717
718 Mutation::Update(UpdateMutation { actor_splits, .. }) => {
719 split_change = actor_splits.get(&self.actor_ctx.id).cloned().map(
720 |target_splits| {
721 (
722 &source_desc,
723 &mut stream,
724 ApplyMutationAfterBarrier::SplitChange {
725 target_splits,
726 should_trim_state: false,
727 split_change_count: &source_split_change_count,
728 },
729 )
730 },
731 );
732 }
733 Mutation::Throttle(actor_to_apply) => {
734 if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
735 && *new_rate_limit != self.rate_limit_rps
736 {
737 tracing::info!(
738 "updating rate limit from {:?} to {:?}",
739 self.rate_limit_rps,
740 *new_rate_limit
741 );
742 self.rate_limit_rps = *new_rate_limit;
743 self.rebuild_stream_reader(&source_desc, &mut stream)?;
745 }
746 }
747 Mutation::RefreshStart {
748 table_id: _,
749 associated_source_id,
750 } if *associated_source_id == source_id => {
751 debug_assert!(self.is_batch_source());
752 is_refreshing = true;
753
754 if let Ok(new_splits) = self.refresh_batch_splits() {
758 tracing::info!(
759 actor_id = self.actor_ctx.id,
760 %associated_source_id,
761 new_splits_count = new_splits.len(),
762 "RefreshStart triggered split re-enumeration"
763 );
764 split_change = Some((
765 &source_desc,
766 &mut stream,
767 ApplyMutationAfterBarrier::RefreshBatchSplits(new_splits),
768 ));
769 } else {
770 tracing::warn!(
771 actor_id = self.actor_ctx.id,
772 %associated_source_id,
773 "Failed to refresh splits during RefreshStart"
774 );
775 }
776 }
777 _ => {}
778 }
779 }
780
781 let updated_splits = self.persist_state_and_clear_cache(epoch).await?;
782
783 if barrier.kind.is_checkpoint()
785 && let Some(task_builder) = &mut wait_checkpoint_task_builder
786 {
787 task_builder.update_task_on_checkpoint(updated_splits);
788
789 tracing::debug!("epoch to wait {:?}", epoch);
790 task_builder.send(Epoch(epoch.prev)).await?
791 }
792
793 let barrier_epoch = barrier.epoch;
794 yield Message::Barrier(barrier);
795
796 if let Some((source_desc, stream, to_apply_mutation)) = split_change {
797 self.apply_split_change_after_yield_barrier(
798 barrier_epoch,
799 source_desc,
800 stream,
801 to_apply_mutation,
802 )
803 .await?;
804 }
805 }
806 Either::Left(_) => {
807 unreachable!();
810 }
811
812 Either::Right((chunk, latest_state)) => {
813 if let Some(task_builder) = &mut wait_checkpoint_task_builder {
814 let offset_col = chunk.column_at(offset_idx);
815 task_builder.update_task_on_chunk(offset_col.clone());
816 }
817 if last_barrier_time.elapsed().as_millis() > max_wait_barrier_time_ms {
818 self_paused = true;
823 tracing::warn!(
824 "source paused, wait barrier for {:?}",
825 last_barrier_time.elapsed()
826 );
827 stream.pause_stream();
828
829 max_wait_barrier_time_ms = self.system_params.load().barrier_interval_ms()
834 as u128
835 * WAIT_BARRIER_MULTIPLE_TIMES;
836 }
837
838 latest_state.iter().for_each(|(split_id, new_split_impl)| {
839 if let Some(split_impl) =
840 self.stream_source_core.latest_split_info.get_mut(split_id)
841 {
842 *split_impl = new_split_impl.clone();
843 }
844 });
845
846 self.stream_source_core
847 .updated_splits_in_epoch
848 .extend(latest_state);
849
850 let card = chunk.cardinality();
851 source_output_row_count.inc_by(card as u64);
852 let chunk =
853 prune_additional_cols(&chunk, split_idx, offset_idx, &source_desc.columns);
854 yield Message::Chunk(chunk);
855 self.try_flush_data().await?;
856 }
857 }
858 }
859
860 tracing::error!(
862 actor_id = self.actor_ctx.id,
863 "source executor exited unexpectedly"
864 )
865 }
866}
867
868#[derive(Debug, Clone)]
869enum ApplyMutationAfterBarrier<'a> {
870 SplitChange {
871 target_splits: Vec<SplitImpl>,
872 should_trim_state: bool,
873 split_change_count: &'a LabelGuardedMetric<GenericCounter<AtomicU64>>,
874 },
875 RefreshBatchSplits(Vec<SplitImpl>),
876 ConnectorPropsChange,
877}
878
879impl<S: StateStore> Execute for SourceExecutor<S> {
880 fn execute(self: Box<Self>) -> BoxedMessageStream {
881 self.execute_inner().boxed()
882 }
883}
884
885impl<S: StateStore> Debug for SourceExecutor<S> {
886 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
887 f.debug_struct("SourceExecutor")
888 .field("source_id", &self.stream_source_core.source_id)
889 .field("column_ids", &self.stream_source_core.column_ids)
890 .finish()
891 }
892}
893
894struct WaitCheckpointTaskBuilder {
895 wait_checkpoint_tx: UnboundedSender<(Epoch, WaitCheckpointTask)>,
896 source_reader: SourceReader,
897 building_task: WaitCheckpointTask,
898}
899
900impl WaitCheckpointTaskBuilder {
901 fn update_task_on_chunk(&mut self, offset_col: ArrayRef) {
902 match &mut self.building_task {
903 WaitCheckpointTask::AckPubsubMessage(_, arrays) => {
904 arrays.push(offset_col);
905 }
906 WaitCheckpointTask::AckNatsJetStream(_, arrays, _) => {
907 arrays.push(offset_col);
908 }
909 WaitCheckpointTask::CommitCdcOffset(_) => {}
910 }
911 }
912
913 fn update_task_on_checkpoint(&mut self, updated_splits: HashMap<SplitId, SplitImpl>) {
914 #[expect(clippy::single_match)]
915 match &mut self.building_task {
916 WaitCheckpointTask::CommitCdcOffset(offsets) => {
917 if !updated_splits.is_empty() {
918 assert_eq!(1, updated_splits.len());
920 for (split_id, split_impl) in updated_splits {
921 if split_impl.is_cdc_split() {
922 *offsets = Some((split_id, split_impl.get_cdc_split_offset()));
923 } else {
924 unreachable!()
925 }
926 }
927 }
928 }
929 _ => {}
930 }
931 }
932
933 async fn send(&mut self, epoch: Epoch) -> Result<(), anyhow::Error> {
935 let new_task = self
936 .source_reader
937 .create_wait_checkpoint_task()
938 .await?
939 .expect("wait checkpoint task should be created");
940 self.wait_checkpoint_tx
941 .send((epoch, std::mem::replace(&mut self.building_task, new_task)))
942 .expect("wait_checkpoint_tx send should succeed");
943 Ok(())
944 }
945}
946
947struct WaitCheckpointWorker<S: StateStore> {
973 wait_checkpoint_rx: UnboundedReceiver<(Epoch, WaitCheckpointTask)>,
974 state_store: S,
975 table_id: TableId,
976 metrics: Arc<StreamingMetrics>,
977}
978
979impl<S: StateStore> WaitCheckpointWorker<S> {
980 pub async fn run(mut self) {
981 tracing::debug!("wait epoch worker start success");
982 loop {
983 match self.wait_checkpoint_rx.recv().await {
985 Some((epoch, task)) => {
986 tracing::debug!("start to wait epoch {}", epoch.0);
987 let ret = self
988 .state_store
989 .try_wait_epoch(
990 HummockReadEpoch::Committed(epoch.0),
991 TryWaitEpochOptions {
992 table_id: self.table_id,
993 },
994 )
995 .await;
996
997 match ret {
998 Ok(()) => {
999 tracing::debug!(epoch = epoch.0, "wait epoch success");
1000
1001 task.run_with_on_commit_success(|source_id: u64, offset| {
1003 if let Some(lsn_value) = extract_pg_cdc_lsn_from_offset(offset) {
1004 self.metrics
1005 .pg_cdc_jni_commit_offset_lsn
1006 .with_guarded_label_values(&[&source_id.to_string()])
1007 .set(lsn_value as i64);
1008 }
1009 })
1010 .await;
1011 }
1012 Err(e) => {
1013 tracing::error!(
1014 error = %e.as_report(),
1015 "wait epoch {} failed", epoch.0
1016 );
1017 }
1018 }
1019 }
1020 None => {
1021 tracing::error!("wait epoch rx closed");
1022 break;
1023 }
1024 }
1025 }
1026 }
1027}
1028
1029#[cfg(test)]
1030mod tests {
1031 use maplit::{btreemap, convert_args, hashmap};
1032 use risingwave_common::catalog::{ColumnId, Field, TableId};
1033 use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
1034 use risingwave_common::test_prelude::StreamChunkTestExt;
1035 use risingwave_common::util::epoch::{EpochExt, test_epoch};
1036 use risingwave_connector::source::datagen::DatagenSplit;
1037 use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
1038 use risingwave_pb::catalog::StreamSourceInfo;
1039 use risingwave_pb::plan_common::PbRowFormatType;
1040 use risingwave_storage::memory::MemoryStateStore;
1041 use tokio::sync::mpsc::unbounded_channel;
1042 use tracing_test::traced_test;
1043
1044 use super::*;
1045 use crate::executor::AddMutation;
1046 use crate::executor::source::{SourceStateTableHandler, default_source_internal_table};
1047 use crate::task::LocalBarrierManager;
1048
1049 const MOCK_SOURCE_NAME: &str = "mock_source";
1050
1051 #[tokio::test]
1052 async fn test_source_executor() {
1053 let table_id = TableId::default();
1054 let schema = Schema {
1055 fields: vec![Field::with_name(DataType::Int32, "sequence_int")],
1056 };
1057 let row_id_index = None;
1058 let source_info = StreamSourceInfo {
1059 row_format: PbRowFormatType::Native as i32,
1060 ..Default::default()
1061 };
1062 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1063 let column_ids = vec![0].into_iter().map(ColumnId::from).collect();
1064
1065 let properties = convert_args!(btreemap!(
1067 "connector" => "datagen",
1068 "datagen.rows.per.second" => "3",
1069 "fields.sequence_int.kind" => "sequence",
1070 "fields.sequence_int.start" => "11",
1071 "fields.sequence_int.end" => "11111",
1072 ));
1073 let source_desc_builder =
1074 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1075 let split_state_store = SourceStateTableHandler::from_table_catalog(
1076 &default_source_internal_table(0x2333),
1077 MemoryStateStore::new(),
1078 )
1079 .await;
1080 let core = StreamSourceCore::<MemoryStateStore> {
1081 source_id: table_id,
1082 column_ids,
1083 source_desc_builder: Some(source_desc_builder),
1084 latest_split_info: HashMap::new(),
1085 split_state_store,
1086 updated_splits_in_epoch: HashMap::new(),
1087 source_name: MOCK_SOURCE_NAME.to_owned(),
1088 is_batch_source: false,
1089 };
1090
1091 let system_params_manager = LocalSystemParamsManager::for_test();
1092
1093 let executor = SourceExecutor::new(
1094 ActorContext::for_test(0),
1095 core,
1096 Arc::new(StreamingMetrics::unused()),
1097 barrier_rx,
1098 system_params_manager.get_params(),
1099 None,
1100 false,
1101 LocalBarrierManager::for_test(),
1102 );
1103 let mut executor = executor.boxed().execute();
1104
1105 let init_barrier =
1106 Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Add(AddMutation {
1107 splits: hashmap! {
1108 ActorId::default() => vec![
1109 SplitImpl::Datagen(DatagenSplit {
1110 split_index: 0,
1111 split_num: 1,
1112 start_offset: None,
1113 }),
1114 ],
1115 },
1116 ..Default::default()
1117 }));
1118 barrier_tx.send(init_barrier).unwrap();
1119
1120 executor.next().await.unwrap().unwrap();
1122
1123 let msg = executor.next().await.unwrap().unwrap();
1125
1126 assert_eq!(
1128 msg.into_chunk().unwrap(),
1129 StreamChunk::from_pretty(
1130 " i
1131 + 11
1132 + 12
1133 + 13"
1134 )
1135 );
1136 }
1137
1138 #[traced_test]
1139 #[tokio::test]
1140 async fn test_split_change_mutation() {
1141 let table_id = TableId::default();
1142 let schema = Schema {
1143 fields: vec![Field::with_name(DataType::Int32, "v1")],
1144 };
1145 let row_id_index = None;
1146 let source_info = StreamSourceInfo {
1147 row_format: PbRowFormatType::Native as i32,
1148 ..Default::default()
1149 };
1150 let properties = convert_args!(btreemap!(
1151 "connector" => "datagen",
1152 "fields.v1.kind" => "sequence",
1153 "fields.v1.start" => "11",
1154 "fields.v1.end" => "11111",
1155 ));
1156
1157 let source_desc_builder =
1158 create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]);
1159 let mem_state_store = MemoryStateStore::new();
1160
1161 let column_ids = vec![ColumnId::from(0)];
1162 let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
1163 let split_state_store = SourceStateTableHandler::from_table_catalog(
1164 &default_source_internal_table(0x2333),
1165 mem_state_store.clone(),
1166 )
1167 .await;
1168
1169 let core = StreamSourceCore::<MemoryStateStore> {
1170 source_id: table_id,
1171 column_ids: column_ids.clone(),
1172 source_desc_builder: Some(source_desc_builder),
1173 latest_split_info: HashMap::new(),
1174 split_state_store,
1175 updated_splits_in_epoch: HashMap::new(),
1176 source_name: MOCK_SOURCE_NAME.to_owned(),
1177 is_batch_source: false,
1178 };
1179
1180 let system_params_manager = LocalSystemParamsManager::for_test();
1181
1182 let executor = SourceExecutor::new(
1183 ActorContext::for_test(0),
1184 core,
1185 Arc::new(StreamingMetrics::unused()),
1186 barrier_rx,
1187 system_params_manager.get_params(),
1188 None,
1189 false,
1190 LocalBarrierManager::for_test(),
1191 );
1192 let mut handler = executor.boxed().execute();
1193
1194 let mut epoch = test_epoch(1);
1195 let init_barrier =
1196 Barrier::new_test_barrier(epoch).with_mutation(Mutation::Add(AddMutation {
1197 splits: hashmap! {
1198 ActorId::default() => vec![
1199 SplitImpl::Datagen(DatagenSplit {
1200 split_index: 0,
1201 split_num: 3,
1202 start_offset: None,
1203 }),
1204 ],
1205 },
1206 ..Default::default()
1207 }));
1208 barrier_tx.send(init_barrier).unwrap();
1209
1210 handler
1212 .next()
1213 .await
1214 .unwrap()
1215 .unwrap()
1216 .into_barrier()
1217 .unwrap();
1218
1219 let mut ready_chunks = handler.ready_chunks(10);
1220
1221 let _ = ready_chunks.next().await.unwrap();
1222
1223 let new_assignment = vec![
1224 SplitImpl::Datagen(DatagenSplit {
1225 split_index: 0,
1226 split_num: 3,
1227 start_offset: None,
1228 }),
1229 SplitImpl::Datagen(DatagenSplit {
1230 split_index: 1,
1231 split_num: 3,
1232 start_offset: None,
1233 }),
1234 SplitImpl::Datagen(DatagenSplit {
1235 split_index: 2,
1236 split_num: 3,
1237 start_offset: None,
1238 }),
1239 ];
1240
1241 epoch.inc_epoch();
1242 let change_split_mutation =
1243 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1244 ActorId::default() => new_assignment.clone()
1245 }));
1246
1247 barrier_tx.send(change_split_mutation).unwrap();
1248
1249 let _ = ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1252 let barrier = Barrier::new_test_barrier(epoch);
1253 barrier_tx.send(barrier).unwrap();
1254
1255 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1258 &default_source_internal_table(0x2333),
1259 mem_state_store.clone(),
1260 )
1261 .await;
1262
1263 source_state_handler
1265 .init_epoch(EpochPair::new_test_epoch(epoch))
1266 .await
1267 .unwrap();
1268 source_state_handler
1269 .get(&new_assignment[1].id())
1270 .await
1271 .unwrap()
1272 .unwrap();
1273
1274 tokio::time::sleep(Duration::from_millis(100)).await;
1275
1276 let _ = ready_chunks.next().await.unwrap();
1277
1278 epoch.inc_epoch();
1279 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Pause);
1280 barrier_tx.send(barrier).unwrap();
1281
1282 epoch.inc_epoch();
1283 let barrier = Barrier::new_test_barrier(epoch).with_mutation(Mutation::Resume);
1284 barrier_tx.send(barrier).unwrap();
1285
1286 ready_chunks.next().await.unwrap();
1288
1289 let prev_assignment = new_assignment;
1290 let new_assignment = vec![prev_assignment[2].clone()];
1291
1292 epoch.inc_epoch();
1293 let drop_split_mutation =
1294 Barrier::new_test_barrier(epoch).with_mutation(Mutation::SourceChangeSplit(hashmap! {
1295 ActorId::default() => new_assignment.clone()
1296 }));
1297
1298 barrier_tx.send(drop_split_mutation).unwrap();
1299
1300 ready_chunks.next().await.unwrap(); epoch.inc_epoch();
1303 let barrier = Barrier::new_test_barrier(epoch);
1304 barrier_tx.send(barrier).unwrap();
1305
1306 ready_chunks.next().await.unwrap(); let mut source_state_handler = SourceStateTableHandler::from_table_catalog(
1309 &default_source_internal_table(0x2333),
1310 mem_state_store.clone(),
1311 )
1312 .await;
1313
1314 let new_epoch = EpochPair::new_test_epoch(epoch);
1315 source_state_handler.init_epoch(new_epoch).await.unwrap();
1316
1317 let committed_reader = source_state_handler
1318 .new_committed_reader(new_epoch)
1319 .await
1320 .unwrap();
1321 assert!(
1322 committed_reader
1323 .try_recover_from_state_store(&prev_assignment[0])
1324 .await
1325 .unwrap()
1326 .is_none()
1327 );
1328
1329 assert!(
1330 committed_reader
1331 .try_recover_from_state_store(&prev_assignment[1])
1332 .await
1333 .unwrap()
1334 .is_none()
1335 );
1336
1337 assert!(
1338 committed_reader
1339 .try_recover_from_state_store(&prev_assignment[2])
1340 .await
1341 .unwrap()
1342 .is_some()
1343 );
1344 }
1345}