1mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::LabelGuardedIntGauge;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::cdc::build_pb_actor_cdc_table_snapshot_splits_with_generation;
28use risingwave_meta_model::{CreateType, WorkerId};
29use risingwave_pb::ddl_service::DdlProgress;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::stream_plan::AddMutation;
32use risingwave_pb::stream_plan::barrier::PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_service::BarrierCompleteResponse;
35use status::CreatingStreamingJobStatus;
36use tracing::{debug, info};
37
38use crate::MetaResult;
39use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
40use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
41use crate::barrier::edge_builder::FragmentEdgeBuildResult;
42use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
43use crate::barrier::progress::CreateMviewProgressTracker;
44use crate::barrier::rpc::ControlStreamManager;
45use crate::barrier::{
46 BackfillOrderState, BarrierKind, Command, CreateStreamingJobCommandInfo, TracedEpoch,
47};
48use crate::controller::fragment::InflightFragmentInfo;
49use crate::model::{StreamJobActorsToCreate, StreamJobFragments};
50use crate::rpc::metrics::GLOBAL_META_METRICS;
51use crate::stream::build_actor_connector_splits;
52
53#[derive(Debug)]
54pub(crate) struct CreatingStreamingJobControl {
55 database_id: DatabaseId,
56 pub(super) job_id: TableId,
57 definition: String,
58 create_type: CreateType,
59 pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
60 backfill_epoch: u64,
61
62 graph_info: InflightStreamingJobInfo,
63
64 barrier_control: CreatingStreamingJobBarrierControl,
65 status: CreatingStreamingJobStatus,
66
67 upstream_lag: LabelGuardedIntGauge,
68}
69
70impl CreatingStreamingJobControl {
71 pub(super) fn new(
72 info: &CreateStreamingJobCommandInfo,
73 snapshot_backfill_upstream_tables: HashSet<TableId>,
74 backfill_epoch: u64,
75 version_stat: &HummockVersionStats,
76 control_stream_manager: &mut ControlStreamManager,
77 edges: &mut FragmentEdgeBuildResult,
78 ) -> MetaResult<Self> {
79 let job_id = info.stream_job_fragments.stream_job_id();
80 let database_id = DatabaseId::new(info.streaming_job.database_id());
81 debug!(
82 %job_id,
83 definition = info.definition,
84 "new creating job"
85 );
86 let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
87 let backfill_nodes_to_pause =
88 get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
89 .into_iter()
90 .collect();
91 let backfill_order_state = BackfillOrderState::new(
92 info.fragment_backfill_ordering.clone(),
93 &info.stream_job_fragments,
94 );
95 let create_mview_tracker = CreateMviewProgressTracker::recover(
96 [(
97 job_id,
98 (
99 info.definition.clone(),
100 &*info.stream_job_fragments,
101 backfill_order_state,
102 ),
103 )],
104 version_stat,
105 );
106
107 let fragment_infos: HashMap<_, _> = info
108 .stream_job_fragments
109 .new_fragment_info(&info.init_split_assignment)
110 .collect();
111
112 let actors_to_create =
113 edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create());
114
115 let graph_info = InflightStreamingJobInfo {
116 job_id,
117 fragment_infos,
118 };
119
120 let mut barrier_control =
121 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
122
123 let mut prev_epoch_fake_physical_time = 0;
124 let mut pending_non_checkpoint_barriers = vec![];
125
126 let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
127 &mut prev_epoch_fake_physical_time,
128 &mut pending_non_checkpoint_barriers,
129 PbBarrierKind::Checkpoint,
130 );
131
132 let added_actors = info.stream_job_fragments.actor_ids();
133 let actor_splits = info
134 .init_split_assignment
135 .values()
136 .flat_map(build_actor_connector_splits)
137 .collect();
138
139 let initial_mutation = Mutation::Add(AddMutation {
140 actor_dispatchers: Default::default(),
142 added_actors,
143 actor_splits,
144 pause: false,
146 subscriptions_to_add: Default::default(),
147 backfill_nodes_to_pause,
148 actor_cdc_table_snapshot_splits:
149 build_pb_actor_cdc_table_snapshot_splits_with_generation(
150 info.cdc_table_snapshot_split_assignment.clone(),
151 )
152 .into(),
153 new_upstream_sinks: Default::default(),
154 });
155
156 control_stream_manager.add_partial_graph(database_id, Some(job_id));
157 Self::inject_barrier(
158 database_id,
159 job_id,
160 control_stream_manager,
161 &mut barrier_control,
162 &graph_info,
163 Some(&graph_info),
164 initial_barrier_info,
165 Some(actors_to_create),
166 Some(initial_mutation),
167 )?;
168
169 assert!(pending_non_checkpoint_barriers.is_empty());
170
171 Ok(Self {
172 database_id,
173 definition: info.definition.clone(),
174 create_type: info.create_type.into(),
175 job_id,
176 snapshot_backfill_upstream_tables,
177 barrier_control,
178 backfill_epoch,
179 graph_info,
180 status: CreatingStreamingJobStatus::ConsumingSnapshot {
181 prev_epoch_fake_physical_time,
182 pending_upstream_barriers: vec![],
183 version_stats: version_stat.clone(),
184 create_mview_tracker,
185 snapshot_backfill_actors,
186 backfill_epoch,
187 pending_non_checkpoint_barriers,
188 },
189 upstream_lag: GLOBAL_META_METRICS
190 .snapshot_backfill_lag
191 .with_guarded_label_values(&[&format!("{}", job_id)]),
192 })
193 }
194
195 fn resolve_upstream_log_epochs(
196 snapshot_backfill_upstream_tables: &HashSet<TableId>,
197 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
198 committed_epoch: u64,
199 upstream_curr_epoch: u64,
200 ) -> MetaResult<Vec<BarrierInfo>> {
201 let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
203 let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
204 loop {
205 let (_, checkpoint_epoch) = epochs_iter.next().expect("not reach committed epoch yet");
206 if *checkpoint_epoch < committed_epoch {
207 continue;
208 }
209 assert_eq!(*checkpoint_epoch, committed_epoch);
210 break;
211 }
212 let mut ret = vec![];
213 let mut prev_epoch = committed_epoch;
214 let mut pending_non_checkpoint_barriers = vec![];
215 for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
216 for (i, epoch) in non_checkpoint_epochs
217 .iter()
218 .chain([checkpoint_epoch])
219 .enumerate()
220 {
221 assert!(*epoch > prev_epoch);
222 pending_non_checkpoint_barriers.push(prev_epoch);
223 ret.push(BarrierInfo {
224 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
225 curr_epoch: TracedEpoch::new(Epoch(*epoch)),
226 kind: if i == 0 {
227 BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
228 } else {
229 BarrierKind::Barrier
230 },
231 });
232 prev_epoch = *epoch;
233 }
234 }
235 ret.push(BarrierInfo {
236 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
237 curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
238 kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
239 });
240 Ok(ret)
241 }
242
243 fn recover_consuming_snapshot(
244 job_id: TableId,
245 definition: &String,
246 snapshot_backfill_upstream_tables: &HashSet<TableId>,
247 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
248 backfill_epoch: u64,
249 committed_epoch: u64,
250 upstream_curr_epoch: u64,
251 stream_job_fragments: StreamJobFragments,
252 version_stat: &HummockVersionStats,
253 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
254 let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
255 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
256 let mut pending_non_checkpoint_barriers = vec![];
257 let create_mview_tracker = CreateMviewProgressTracker::recover(
258 [(
259 job_id,
260 (
261 definition.clone(),
262 &stream_job_fragments,
263 Default::default(),
264 ),
265 )],
266 version_stat,
267 );
268 let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
269 &mut prev_epoch_fake_physical_time,
270 &mut pending_non_checkpoint_barriers,
271 PbBarrierKind::Initial,
272 );
273 Ok((
274 CreatingStreamingJobStatus::ConsumingSnapshot {
275 prev_epoch_fake_physical_time,
276 pending_upstream_barriers: Self::resolve_upstream_log_epochs(
277 snapshot_backfill_upstream_tables,
278 upstream_table_log_epochs,
279 backfill_epoch,
280 upstream_curr_epoch,
281 )?,
282 version_stats: version_stat.clone(),
283 create_mview_tracker,
284 snapshot_backfill_actors,
285 backfill_epoch,
286 pending_non_checkpoint_barriers,
287 },
288 barrier_info,
289 ))
290 }
291
292 fn recover_consuming_log_store(
293 snapshot_backfill_upstream_tables: &HashSet<TableId>,
294 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
295 committed_epoch: u64,
296 upstream_curr_epoch: u64,
297 stream_job_fragments: StreamJobFragments,
298 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
299 let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
300 let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
301 snapshot_backfill_upstream_tables,
302 upstream_table_log_epochs,
303 committed_epoch,
304 upstream_curr_epoch,
305 )?;
306 let mut first_barrier = barriers_to_inject.remove(0);
307 assert!(first_barrier.kind.is_checkpoint());
308 first_barrier.kind = BarrierKind::Initial;
309 Ok((
310 CreatingStreamingJobStatus::ConsumingLogStore {
311 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
312 snapshot_backfill_actors.into_iter(),
313 barriers_to_inject
314 .last()
315 .map(|info| info.prev_epoch() - committed_epoch)
316 .unwrap_or(0),
317 ),
318 barriers_to_inject: Some(barriers_to_inject),
319 },
320 first_barrier,
321 ))
322 }
323
324 #[expect(clippy::too_many_arguments)]
325 pub(crate) fn recover(
326 database_id: DatabaseId,
327 job_id: TableId,
328 definition: String,
329 snapshot_backfill_upstream_tables: HashSet<TableId>,
330 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
331 backfill_epoch: u64,
332 committed_epoch: u64,
333 upstream_curr_epoch: u64,
334 graph_info: InflightStreamingJobInfo,
335 stream_job_fragments: StreamJobFragments,
336 version_stat: &HummockVersionStats,
337 new_actors: StreamJobActorsToCreate,
338 initial_mutation: Mutation,
339 control_stream_manager: &mut ControlStreamManager,
340 ) -> MetaResult<Self> {
341 debug!(
342 %job_id,
343 definition,
344 "recovered creating job"
345 );
346 let mut barrier_control =
347 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
348
349 let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
350 Self::recover_consuming_snapshot(
351 job_id,
352 &definition,
353 &snapshot_backfill_upstream_tables,
354 upstream_table_log_epochs,
355 backfill_epoch,
356 committed_epoch,
357 upstream_curr_epoch,
358 stream_job_fragments,
359 version_stat,
360 )?
361 } else {
362 Self::recover_consuming_log_store(
363 &snapshot_backfill_upstream_tables,
364 upstream_table_log_epochs,
365 committed_epoch,
366 upstream_curr_epoch,
367 stream_job_fragments,
368 )?
369 };
370 control_stream_manager.add_partial_graph(database_id, Some(job_id));
371 Self::inject_barrier(
372 database_id,
373 job_id,
374 control_stream_manager,
375 &mut barrier_control,
376 &graph_info,
377 Some(&graph_info),
378 first_barrier_info,
379 Some(new_actors),
380 Some(initial_mutation),
381 )?;
382 Ok(Self {
383 database_id,
384 job_id,
385 definition,
386 create_type: CreateType::Background,
387 snapshot_backfill_upstream_tables,
388 backfill_epoch,
389 graph_info,
390 barrier_control,
391 status,
392 upstream_lag: GLOBAL_META_METRICS
393 .snapshot_backfill_lag
394 .with_guarded_label_values(&[&format!("{}", job_id)]),
395 })
396 }
397
398 pub(crate) fn is_empty(&self) -> bool {
399 self.barrier_control.is_empty()
400 }
401
402 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
403 self.barrier_control.is_valid_after_worker_err(worker_id)
404 && (!self.status.is_finishing()
405 || InflightFragmentInfo::contains_worker(
406 self.graph_info.fragment_infos(),
407 worker_id,
408 ))
409 }
410
411 pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
412 let progress = match &self.status {
413 CreatingStreamingJobStatus::ConsumingSnapshot {
414 create_mview_tracker,
415 ..
416 } => {
417 if create_mview_tracker.has_pending_finished_jobs() {
418 "Snapshot finished".to_owned()
419 } else {
420 let progress = create_mview_tracker
421 .gen_ddl_progress()
422 .remove(&self.job_id.table_id)
423 .expect("should exist");
424 format!("Snapshot [{}]", progress.progress)
425 }
426 }
427 CreatingStreamingJobStatus::ConsumingLogStore {
428 log_store_progress_tracker,
429 ..
430 } => {
431 format!(
432 "LogStore [{}]",
433 log_store_progress_tracker.gen_ddl_progress()
434 )
435 }
436 CreatingStreamingJobStatus::Finishing(_) => {
437 format!(
438 "Finishing [epoch count: {}]",
439 self.barrier_control.inflight_barrier_count()
440 )
441 }
442 };
443 DdlProgress {
444 id: self.job_id.table_id as u64,
445 statement: self.definition.clone(),
446 create_type: self.create_type.as_str().to_owned(),
447 progress,
448 }
449 }
450
451 pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
452 if self.status.is_finishing() {
453 None
454 } else {
455 Some(max(
457 self.barrier_control.max_collected_epoch().unwrap_or(0),
458 self.backfill_epoch,
459 ))
460 }
461 }
462
463 fn inject_barrier(
464 database_id: DatabaseId,
465 table_id: TableId,
466 control_stream_manager: &mut ControlStreamManager,
467 barrier_control: &mut CreatingStreamingJobBarrierControl,
468 pre_applied_graph_info: &InflightStreamingJobInfo,
469 applied_graph_info: Option<&InflightStreamingJobInfo>,
470 barrier_info: BarrierInfo,
471 new_actors: Option<StreamJobActorsToCreate>,
472 mutation: Option<Mutation>,
473 ) -> MetaResult<()> {
474 let node_to_collect = control_stream_manager.inject_barrier(
475 database_id,
476 Some(table_id),
477 mutation,
478 &barrier_info,
479 pre_applied_graph_info.fragment_infos(),
480 applied_graph_info
481 .map(|graph_info| graph_info.fragment_infos())
482 .into_iter()
483 .flatten(),
484 new_actors,
485 vec![],
486 vec![],
487 )?;
488 barrier_control.enqueue_epoch(
489 barrier_info.prev_epoch(),
490 node_to_collect,
491 barrier_info.kind.clone(),
492 );
493 Ok(())
494 }
495
496 pub(super) fn on_new_command(
497 &mut self,
498 control_stream_manager: &mut ControlStreamManager,
499 command: Option<&Command>,
500 barrier_info: &BarrierInfo,
501 ) -> MetaResult<()> {
502 let table_id = self.job_id;
503 let start_consume_upstream =
504 if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
505 jobs_to_merge.contains_key(&table_id)
506 } else {
507 false
508 };
509 if start_consume_upstream {
510 info!(
511 table_id = self.job_id.table_id,
512 prev_epoch = barrier_info.prev_epoch(),
513 "start consuming upstream"
514 );
515 }
516 let progress_epoch =
517 if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
518 max(max_collected_epoch, self.backfill_epoch)
519 } else {
520 self.backfill_epoch
521 };
522 self.upstream_lag.set(
523 barrier_info
524 .prev_epoch
525 .value()
526 .0
527 .saturating_sub(progress_epoch) as _,
528 );
529 if start_consume_upstream {
530 self.status.start_consume_upstream(barrier_info);
531 Self::inject_barrier(
532 self.database_id,
533 self.job_id,
534 control_stream_manager,
535 &mut self.barrier_control,
536 &self.graph_info,
537 None,
538 barrier_info.clone(),
539 None,
540 None,
541 )?;
542 } else {
543 for (barrier_to_inject, mutation) in self.status.on_new_upstream_epoch(barrier_info) {
544 Self::inject_barrier(
545 self.database_id,
546 self.job_id,
547 control_stream_manager,
548 &mut self.barrier_control,
549 &self.graph_info,
550 Some(&self.graph_info),
551 barrier_to_inject,
552 None,
553 mutation,
554 )?;
555 }
556 }
557 Ok(())
558 }
559
560 pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
561 self.status.update_progress(&resp.create_mview_progress);
562 self.barrier_control.collect(resp);
563 self.should_merge_to_upstream().is_some()
564 }
565
566 pub(super) fn should_merge_to_upstream(&self) -> Option<&InflightStreamingJobInfo> {
567 if let CreatingStreamingJobStatus::ConsumingLogStore {
568 log_store_progress_tracker,
569 barriers_to_inject,
570 } = &self.status
571 && barriers_to_inject.is_none()
572 && log_store_progress_tracker.is_finished()
573 {
574 Some(&self.graph_info)
575 } else {
576 None
577 }
578 }
579}
580
581pub(super) enum CompleteJobType {
582 First,
584 Normal,
585 Finished,
587}
588
589impl CreatingStreamingJobControl {
590 pub(super) fn start_completing(
591 &mut self,
592 min_upstream_inflight_epoch: Option<u64>,
593 ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
594 let (finished_at_epoch, epoch_end_bound) = match &self.status {
595 CreatingStreamingJobStatus::Finishing(finish_at_epoch) => {
596 let epoch_end_bound = min_upstream_inflight_epoch
597 .map(|upstream_epoch| {
598 if upstream_epoch < *finish_at_epoch {
599 Excluded(upstream_epoch)
600 } else {
601 Unbounded
602 }
603 })
604 .unwrap_or(Unbounded);
605 (Some(*finish_at_epoch), epoch_end_bound)
606 }
607 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
608 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
609 None,
610 min_upstream_inflight_epoch
611 .map(Excluded)
612 .unwrap_or(Unbounded),
613 ),
614 };
615 self.barrier_control.start_completing(epoch_end_bound).map(
616 |(epoch, resps, is_first_commit)| {
617 let status = if let Some(finish_at_epoch) = finished_at_epoch {
618 assert!(!is_first_commit);
619 if epoch == finish_at_epoch {
620 self.barrier_control.ack_completed(epoch);
621 assert!(self.barrier_control.is_empty());
622 CompleteJobType::Finished
623 } else {
624 CompleteJobType::Normal
625 }
626 } else if is_first_commit {
627 CompleteJobType::First
628 } else {
629 CompleteJobType::Normal
630 };
631 (epoch, resps, status)
632 },
633 )
634 }
635
636 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
637 self.barrier_control.ack_completed(completed_epoch);
638 }
639
640 pub(super) fn is_finished(&self) -> bool {
641 self.barrier_control.is_empty() && self.status.is_finishing()
642 }
643
644 pub fn is_consuming(&self) -> bool {
645 match &self.status {
646 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
647 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
648 CreatingStreamingJobStatus::Finishing(_) => false,
649 }
650 }
651
652 pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
653 self.graph_info.existing_table_ids()
654 }
655
656 pub fn graph_info(&self) -> &InflightStreamingJobInfo {
657 &self.graph_info
658 }
659}