1mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_connector::source::cdc::build_pb_actor_cdc_table_snapshot_splits_with_generation;
29use risingwave_meta_model::{CreateType, WorkerId};
30use risingwave_pb::ddl_service::DdlProgress;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::FragmentId;
33use risingwave_pb::stream_plan::AddMutation;
34use risingwave_pb::stream_plan::barrier::PbBarrierKind;
35use risingwave_pb::stream_plan::barrier_mutation::Mutation;
36use risingwave_pb::stream_service::BarrierCompleteResponse;
37use status::CreatingStreamingJobStatus;
38use tracing::{debug, info};
39
40use crate::MetaResult;
41use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
42use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
43use crate::barrier::edge_builder::FragmentEdgeBuildResult;
44use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
45use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
46use crate::barrier::rpc::ControlStreamManager;
47use crate::barrier::{
48 BackfillOrderState, BarrierKind, Command, CreateStreamingJobCommandInfo, TracedEpoch,
49};
50use crate::controller::fragment::InflightFragmentInfo;
51use crate::model::StreamJobActorsToCreate;
52use crate::rpc::metrics::GLOBAL_META_METRICS;
53use crate::stream::build_actor_connector_splits;
54
55#[derive(Debug)]
56pub(crate) struct CreatingStreamingJobControl {
57 database_id: DatabaseId,
58 pub(super) job_id: JobId,
59 definition: String,
60 create_type: CreateType,
61 pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
62 backfill_epoch: u64,
63
64 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
65
66 barrier_control: CreatingStreamingJobBarrierControl,
67 status: CreatingStreamingJobStatus,
68
69 upstream_lag: LabelGuardedIntGauge,
70}
71
72impl CreatingStreamingJobControl {
73 pub(super) fn new(
74 info: &CreateStreamingJobCommandInfo,
75 snapshot_backfill_upstream_tables: HashSet<TableId>,
76 backfill_epoch: u64,
77 version_stat: &HummockVersionStats,
78 control_stream_manager: &mut ControlStreamManager,
79 edges: &mut FragmentEdgeBuildResult,
80 ) -> MetaResult<Self> {
81 let job_id = info.stream_job_fragments.stream_job_id();
82 let database_id = info.streaming_job.database_id();
83 debug!(
84 %job_id,
85 definition = info.definition,
86 "new creating job"
87 );
88 let fragment_infos = info
89 .stream_job_fragments
90 .new_fragment_info(&info.init_split_assignment)
91 .collect();
92 let snapshot_backfill_actors =
93 InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos).collect();
94 let backfill_nodes_to_pause =
95 get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
96 .into_iter()
97 .collect();
98 let backfill_order_state = BackfillOrderState::new(
99 &info.fragment_backfill_ordering,
100 &info.stream_job_fragments,
101 info.locality_fragment_state_table_mapping.clone(),
102 );
103 let create_mview_tracker = CreateMviewProgressTracker::recover(
104 job_id,
105 info.definition.clone(),
106 &fragment_infos,
107 backfill_order_state,
108 version_stat,
109 );
110
111 let actors_to_create =
112 edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create().map(
113 |(fragment_id, node, actors)| {
114 (
115 fragment_id,
116 node,
117 actors,
118 [], )
120 },
121 ));
122
123 let mut barrier_control =
124 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
125
126 let mut prev_epoch_fake_physical_time = 0;
127 let mut pending_non_checkpoint_barriers = vec![];
128
129 let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
130 &mut prev_epoch_fake_physical_time,
131 &mut pending_non_checkpoint_barriers,
132 PbBarrierKind::Checkpoint,
133 );
134
135 let added_actors = info.stream_job_fragments.actor_ids().collect();
136 let actor_splits = info
137 .init_split_assignment
138 .values()
139 .flat_map(build_actor_connector_splits)
140 .collect();
141
142 let initial_mutation = Mutation::Add(AddMutation {
143 actor_dispatchers: Default::default(),
145 added_actors,
146 actor_splits,
147 pause: false,
149 subscriptions_to_add: Default::default(),
150 backfill_nodes_to_pause,
151 actor_cdc_table_snapshot_splits:
152 build_pb_actor_cdc_table_snapshot_splits_with_generation(
153 info.cdc_table_snapshot_split_assignment.clone(),
154 )
155 .into(),
156 new_upstream_sinks: Default::default(),
157 });
158
159 control_stream_manager.add_partial_graph(database_id, Some(job_id));
160 Self::inject_barrier(
161 database_id,
162 job_id,
163 control_stream_manager,
164 &mut barrier_control,
165 &fragment_infos,
166 Some(&fragment_infos),
167 initial_barrier_info,
168 Some(actors_to_create),
169 Some(initial_mutation),
170 )?;
171
172 assert!(pending_non_checkpoint_barriers.is_empty());
173
174 Ok(Self {
175 database_id,
176 definition: info.definition.clone(),
177 create_type: info.create_type.into(),
178 job_id,
179 snapshot_backfill_upstream_tables,
180 barrier_control,
181 backfill_epoch,
182 fragment_infos,
183 status: CreatingStreamingJobStatus::ConsumingSnapshot {
184 prev_epoch_fake_physical_time,
185 pending_upstream_barriers: vec![],
186 version_stats: version_stat.clone(),
187 create_mview_tracker,
188 snapshot_backfill_actors,
189 backfill_epoch,
190 pending_non_checkpoint_barriers,
191 },
192 upstream_lag: GLOBAL_META_METRICS
193 .snapshot_backfill_lag
194 .with_guarded_label_values(&[&format!("{}", job_id)]),
195 })
196 }
197
198 fn resolve_upstream_log_epochs(
199 snapshot_backfill_upstream_tables: &HashSet<TableId>,
200 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
201 committed_epoch: u64,
202 upstream_curr_epoch: u64,
203 ) -> MetaResult<Vec<BarrierInfo>> {
204 let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
206 let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
207 loop {
208 let (_, checkpoint_epoch) = epochs_iter.next().expect("not reach committed epoch yet");
209 if *checkpoint_epoch < committed_epoch {
210 continue;
211 }
212 assert_eq!(*checkpoint_epoch, committed_epoch);
213 break;
214 }
215 let mut ret = vec![];
216 let mut prev_epoch = committed_epoch;
217 let mut pending_non_checkpoint_barriers = vec![];
218 for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
219 for (i, epoch) in non_checkpoint_epochs
220 .iter()
221 .chain([checkpoint_epoch])
222 .enumerate()
223 {
224 assert!(*epoch > prev_epoch);
225 pending_non_checkpoint_barriers.push(prev_epoch);
226 ret.push(BarrierInfo {
227 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
228 curr_epoch: TracedEpoch::new(Epoch(*epoch)),
229 kind: if i == 0 {
230 BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
231 } else {
232 BarrierKind::Barrier
233 },
234 });
235 prev_epoch = *epoch;
236 }
237 }
238 ret.push(BarrierInfo {
239 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
240 curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
241 kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
242 });
243 Ok(ret)
244 }
245
246 fn recover_consuming_snapshot(
247 job_id: JobId,
248 definition: &String,
249 snapshot_backfill_upstream_tables: &HashSet<TableId>,
250 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
251 backfill_epoch: u64,
252 committed_epoch: u64,
253 upstream_curr_epoch: u64,
254 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
255 version_stat: &HummockVersionStats,
256 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
257 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
258 let mut pending_non_checkpoint_barriers = vec![];
259 let create_mview_tracker = CreateMviewProgressTracker::recover(
260 job_id,
261 definition.clone(),
262 fragment_infos,
263 Default::default(),
264 version_stat,
265 );
266 let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
267 &mut prev_epoch_fake_physical_time,
268 &mut pending_non_checkpoint_barriers,
269 PbBarrierKind::Initial,
270 );
271 Ok((
272 CreatingStreamingJobStatus::ConsumingSnapshot {
273 prev_epoch_fake_physical_time,
274 pending_upstream_barriers: Self::resolve_upstream_log_epochs(
275 snapshot_backfill_upstream_tables,
276 upstream_table_log_epochs,
277 backfill_epoch,
278 upstream_curr_epoch,
279 )?,
280 version_stats: version_stat.clone(),
281 create_mview_tracker,
282 snapshot_backfill_actors: InflightStreamingJobInfo::snapshot_backfill_actor_ids(
283 fragment_infos,
284 )
285 .collect(),
286 backfill_epoch,
287 pending_non_checkpoint_barriers,
288 },
289 barrier_info,
290 ))
291 }
292
293 fn recover_consuming_log_store(
294 job_id: JobId,
295 snapshot_backfill_upstream_tables: &HashSet<TableId>,
296 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
297 committed_epoch: u64,
298 upstream_curr_epoch: u64,
299 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
300 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
301 let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
302 snapshot_backfill_upstream_tables,
303 upstream_table_log_epochs,
304 committed_epoch,
305 upstream_curr_epoch,
306 )?;
307 let mut first_barrier = barriers_to_inject.remove(0);
308 assert!(first_barrier.kind.is_checkpoint());
309 first_barrier.kind = BarrierKind::Initial;
310 Ok((
311 CreatingStreamingJobStatus::ConsumingLogStore {
312 tracking_job: TrackingJob::recovered(job_id, fragment_infos),
313 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
314 InflightStreamingJobInfo::snapshot_backfill_actor_ids(fragment_infos),
315 barriers_to_inject
316 .last()
317 .map(|info| info.prev_epoch() - committed_epoch)
318 .unwrap_or(0),
319 ),
320 barriers_to_inject: Some(barriers_to_inject),
321 },
322 first_barrier,
323 ))
324 }
325
326 #[expect(clippy::too_many_arguments)]
327 pub(crate) fn recover(
328 database_id: DatabaseId,
329 job_id: JobId,
330 definition: String,
331 snapshot_backfill_upstream_tables: HashSet<TableId>,
332 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
333 backfill_epoch: u64,
334 committed_epoch: u64,
335 upstream_curr_epoch: u64,
336 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
337 version_stat: &HummockVersionStats,
338 new_actors: StreamJobActorsToCreate,
339 initial_mutation: Mutation,
340 control_stream_manager: &mut ControlStreamManager,
341 ) -> MetaResult<Self> {
342 debug!(
343 %job_id,
344 definition,
345 "recovered creating job"
346 );
347 let mut barrier_control =
348 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
349
350 let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
351 Self::recover_consuming_snapshot(
352 job_id,
353 &definition,
354 &snapshot_backfill_upstream_tables,
355 upstream_table_log_epochs,
356 backfill_epoch,
357 committed_epoch,
358 upstream_curr_epoch,
359 &fragment_infos,
360 version_stat,
361 )?
362 } else {
363 Self::recover_consuming_log_store(
364 job_id,
365 &snapshot_backfill_upstream_tables,
366 upstream_table_log_epochs,
367 committed_epoch,
368 upstream_curr_epoch,
369 &fragment_infos,
370 )?
371 };
372 control_stream_manager.add_partial_graph(database_id, Some(job_id));
373 Self::inject_barrier(
374 database_id,
375 job_id,
376 control_stream_manager,
377 &mut barrier_control,
378 &fragment_infos,
379 Some(&fragment_infos),
380 first_barrier_info,
381 Some(new_actors),
382 Some(initial_mutation),
383 )?;
384 Ok(Self {
385 database_id,
386 job_id,
387 definition,
388 create_type: CreateType::Background,
389 snapshot_backfill_upstream_tables,
390 backfill_epoch,
391 fragment_infos,
392 barrier_control,
393 status,
394 upstream_lag: GLOBAL_META_METRICS
395 .snapshot_backfill_lag
396 .with_guarded_label_values(&[&format!("{}", job_id)]),
397 })
398 }
399
400 pub(crate) fn is_empty(&self) -> bool {
401 self.barrier_control.is_empty()
402 }
403
404 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
405 self.barrier_control.is_valid_after_worker_err(worker_id)
406 && (!self.status.is_finishing()
407 || InflightFragmentInfo::contains_worker(self.fragment_infos.values(), worker_id))
408 }
409
410 pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
411 let progress = match &self.status {
412 CreatingStreamingJobStatus::ConsumingSnapshot {
413 create_mview_tracker,
414 ..
415 } => {
416 if create_mview_tracker.is_finished() {
417 "Snapshot finished".to_owned()
418 } else {
419 let progress = create_mview_tracker.gen_ddl_progress();
420 format!("Snapshot [{}]", progress.progress)
421 }
422 }
423 CreatingStreamingJobStatus::ConsumingLogStore {
424 log_store_progress_tracker,
425 ..
426 } => {
427 format!(
428 "LogStore [{}]",
429 log_store_progress_tracker.gen_ddl_progress()
430 )
431 }
432 CreatingStreamingJobStatus::Finishing(..) => {
433 format!(
434 "Finishing [epoch count: {}]",
435 self.barrier_control.inflight_barrier_count()
436 )
437 }
438 CreatingStreamingJobStatus::PlaceHolder => {
439 unreachable!()
440 }
441 };
442 DdlProgress {
443 id: self.job_id.as_raw_id() as u64,
444 statement: self.definition.clone(),
445 create_type: self.create_type.as_str().to_owned(),
446 progress,
447 }
448 }
449
450 pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
451 if self.status.is_finishing() {
452 None
453 } else {
454 Some(max(
456 self.barrier_control.max_collected_epoch().unwrap_or(0),
457 self.backfill_epoch,
458 ))
459 }
460 }
461
462 fn inject_barrier(
463 database_id: DatabaseId,
464 job_id: JobId,
465 control_stream_manager: &mut ControlStreamManager,
466 barrier_control: &mut CreatingStreamingJobBarrierControl,
467 pre_applied_graph_info: &HashMap<FragmentId, InflightFragmentInfo>,
468 applied_graph_info: Option<&HashMap<FragmentId, InflightFragmentInfo>>,
469 barrier_info: BarrierInfo,
470 new_actors: Option<StreamJobActorsToCreate>,
471 mutation: Option<Mutation>,
472 ) -> MetaResult<()> {
473 let node_to_collect = control_stream_manager.inject_barrier(
474 database_id,
475 Some(job_id),
476 mutation,
477 &barrier_info,
478 pre_applied_graph_info.values(),
479 applied_graph_info
480 .map(|graph_info| graph_info.values())
481 .into_iter()
482 .flatten(),
483 new_actors,
484 )?;
485 barrier_control.enqueue_epoch(
486 barrier_info.prev_epoch(),
487 node_to_collect,
488 barrier_info.kind.clone(),
489 );
490 Ok(())
491 }
492
493 pub(super) fn on_new_command(
494 &mut self,
495 control_stream_manager: &mut ControlStreamManager,
496 command: Option<&Command>,
497 barrier_info: &BarrierInfo,
498 ) -> MetaResult<()> {
499 let table_id = self.job_id;
500 let start_consume_upstream =
501 if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
502 jobs_to_merge.contains_key(&table_id)
503 } else {
504 false
505 };
506 if start_consume_upstream {
507 info!(
508 job_id = %self.job_id,
509 prev_epoch = barrier_info.prev_epoch(),
510 "start consuming upstream"
511 );
512 }
513 let progress_epoch =
514 if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
515 max(max_collected_epoch, self.backfill_epoch)
516 } else {
517 self.backfill_epoch
518 };
519 self.upstream_lag.set(
520 barrier_info
521 .prev_epoch
522 .value()
523 .0
524 .saturating_sub(progress_epoch) as _,
525 );
526 if start_consume_upstream {
527 self.status.start_consume_upstream(barrier_info);
528 Self::inject_barrier(
529 self.database_id,
530 self.job_id,
531 control_stream_manager,
532 &mut self.barrier_control,
533 &self.fragment_infos,
534 None,
535 barrier_info.clone(),
536 None,
537 None,
538 )?;
539 } else {
540 for (barrier_to_inject, mutation) in self.status.on_new_upstream_epoch(barrier_info) {
541 Self::inject_barrier(
542 self.database_id,
543 self.job_id,
544 control_stream_manager,
545 &mut self.barrier_control,
546 &self.fragment_infos,
547 Some(&self.fragment_infos),
548 barrier_to_inject,
549 None,
550 mutation,
551 )?;
552 }
553 }
554 Ok(())
555 }
556
557 pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
558 self.status.update_progress(&resp.create_mview_progress);
559 self.barrier_control.collect(resp);
560 self.should_merge_to_upstream().is_some()
561 }
562
563 pub(super) fn should_merge_to_upstream(
564 &self,
565 ) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
566 if let CreatingStreamingJobStatus::ConsumingLogStore {
567 log_store_progress_tracker,
568 barriers_to_inject,
569 ..
570 } = &self.status
571 && barriers_to_inject.is_none()
572 && log_store_progress_tracker.is_finished()
573 {
574 Some(&self.fragment_infos)
575 } else {
576 None
577 }
578 }
579}
580
581pub(super) enum CompleteJobType {
582 First,
584 Normal,
585 Finished,
587}
588
589impl CreatingStreamingJobControl {
590 pub(super) fn start_completing(
591 &mut self,
592 min_upstream_inflight_epoch: Option<u64>,
593 ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
594 let (finished_at_epoch, epoch_end_bound) = match &self.status {
595 CreatingStreamingJobStatus::Finishing(finish_at_epoch, _) => {
596 let epoch_end_bound = min_upstream_inflight_epoch
597 .map(|upstream_epoch| {
598 if upstream_epoch < *finish_at_epoch {
599 Excluded(upstream_epoch)
600 } else {
601 Unbounded
602 }
603 })
604 .unwrap_or(Unbounded);
605 (Some(*finish_at_epoch), epoch_end_bound)
606 }
607 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
608 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
609 None,
610 min_upstream_inflight_epoch
611 .map(Excluded)
612 .unwrap_or(Unbounded),
613 ),
614 CreatingStreamingJobStatus::PlaceHolder => {
615 unreachable!()
616 }
617 };
618 self.barrier_control.start_completing(epoch_end_bound).map(
619 |(epoch, resps, is_first_commit)| {
620 let status = if let Some(finish_at_epoch) = finished_at_epoch {
621 assert!(!is_first_commit);
622 if epoch == finish_at_epoch {
623 self.barrier_control.ack_completed(epoch);
624 assert!(self.barrier_control.is_empty());
625 CompleteJobType::Finished
626 } else {
627 CompleteJobType::Normal
628 }
629 } else if is_first_commit {
630 CompleteJobType::First
631 } else {
632 CompleteJobType::Normal
633 };
634 (epoch, resps, status)
635 },
636 )
637 }
638
639 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
640 self.barrier_control.ack_completed(completed_epoch);
641 }
642
643 pub fn is_consuming(&self) -> bool {
644 match &self.status {
645 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
646 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
647 CreatingStreamingJobStatus::Finishing(..) => false,
648 CreatingStreamingJobStatus::PlaceHolder => {
649 unreachable!()
650 }
651 }
652 }
653
654 pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
655 InflightFragmentInfo::existing_table_ids(self.fragment_infos.values())
656 }
657
658 pub fn graph_info(&self) -> &HashMap<FragmentId, InflightFragmentInfo> {
659 &self.fragment_infos
660 }
661
662 pub fn into_tracking_job(self) -> TrackingJob {
663 assert!(self.barrier_control.is_empty());
664 match self.status {
665 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
666 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
667 | CreatingStreamingJobStatus::PlaceHolder => {
668 unreachable!("expect finish")
669 }
670 CreatingStreamingJobStatus::Finishing(_, tracking_job) => tracking_job,
671 }
672 }
673}