1mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::LabelGuardedIntGauge;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::{CreateType, WorkerId};
29use risingwave_pb::ddl_service::DdlProgress;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::id::{ActorId, FragmentId};
32use risingwave_pb::stream_plan::AddMutation;
33use risingwave_pb::stream_plan::barrier::PbBarrierKind;
34use risingwave_pb::stream_plan::barrier_mutation::Mutation;
35use risingwave_pb::stream_service::BarrierCompleteResponse;
36use status::CreatingStreamingJobStatus;
37use tracing::{debug, info};
38
39use crate::MetaResult;
40use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
41use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
42use crate::barrier::edge_builder::FragmentEdgeBuildResult;
43use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
44use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
45use crate::barrier::rpc::ControlStreamManager;
46use crate::barrier::{BackfillOrderState, BarrierKind, CreateStreamingJobCommandInfo, TracedEpoch};
47use crate::controller::fragment::InflightFragmentInfo;
48use crate::model::StreamJobActorsToCreate;
49use crate::rpc::metrics::GLOBAL_META_METRICS;
50use crate::stream::build_actor_connector_splits;
51
52#[derive(Debug)]
53pub(crate) struct CreatingStreamingJobControl {
54 database_id: DatabaseId,
55 pub(super) job_id: JobId,
56 definition: String,
57 create_type: CreateType,
58 pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
59 backfill_epoch: u64,
60
61 node_actors: HashMap<WorkerId, HashSet<ActorId>>,
62 state_table_ids: HashSet<TableId>,
63
64 barrier_control: CreatingStreamingJobBarrierControl,
65 status: CreatingStreamingJobStatus,
66
67 upstream_lag: LabelGuardedIntGauge,
68}
69
70impl CreatingStreamingJobControl {
71 pub(super) fn new(
72 info: &CreateStreamingJobCommandInfo,
73 snapshot_backfill_upstream_tables: HashSet<TableId>,
74 backfill_epoch: u64,
75 version_stat: &HummockVersionStats,
76 control_stream_manager: &mut ControlStreamManager,
77 edges: &mut FragmentEdgeBuildResult,
78 ) -> MetaResult<Self> {
79 let job_id = info.stream_job_fragments.stream_job_id();
80 let database_id = info.streaming_job.database_id();
81 debug!(
82 %job_id,
83 definition = info.definition,
84 "new creating job"
85 );
86 let fragment_infos = info
87 .stream_job_fragments
88 .new_fragment_info(&info.init_split_assignment)
89 .collect();
90 let snapshot_backfill_actors =
91 InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos).collect();
92 let backfill_nodes_to_pause =
93 get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
94 .into_iter()
95 .collect();
96 let backfill_order_state = BackfillOrderState::new(
97 &info.fragment_backfill_ordering,
98 &info.stream_job_fragments,
99 info.locality_fragment_state_table_mapping.clone(),
100 );
101 let create_mview_tracker = CreateMviewProgressTracker::recover(
102 job_id,
103 info.definition.clone(),
104 &fragment_infos,
105 backfill_order_state,
106 version_stat,
107 );
108
109 let actors_to_create =
110 edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create().map(
111 |(fragment_id, node, actors)| {
112 (
113 fragment_id,
114 node,
115 actors,
116 [], )
118 },
119 ));
120
121 let mut barrier_control =
122 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
123
124 let mut prev_epoch_fake_physical_time = 0;
125 let mut pending_non_checkpoint_barriers = vec![];
126
127 let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
128 &mut prev_epoch_fake_physical_time,
129 &mut pending_non_checkpoint_barriers,
130 PbBarrierKind::Checkpoint,
131 );
132
133 let added_actors = info.stream_job_fragments.actor_ids().collect();
134 let actor_splits = info
135 .init_split_assignment
136 .values()
137 .flat_map(build_actor_connector_splits)
138 .collect();
139
140 assert!(
141 info.cdc_table_snapshot_splits.is_none(),
142 "should not have cdc backfill for snapshot backfill job"
143 );
144
145 let initial_mutation = Mutation::Add(AddMutation {
146 actor_dispatchers: Default::default(),
148 added_actors,
149 actor_splits,
150 pause: false,
152 subscriptions_to_add: Default::default(),
153 backfill_nodes_to_pause,
154 actor_cdc_table_snapshot_splits: None,
155 new_upstream_sinks: Default::default(),
156 });
157
158 let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
159 let state_table_ids =
160 InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
161
162 control_stream_manager.add_partial_graph(database_id, Some(job_id));
163 Self::inject_barrier(
164 database_id,
165 job_id,
166 control_stream_manager,
167 &mut barrier_control,
168 &node_actors,
169 Some(&state_table_ids),
170 initial_barrier_info,
171 Some(actors_to_create),
172 Some(initial_mutation),
173 )?;
174
175 assert!(pending_non_checkpoint_barriers.is_empty());
176
177 Ok(Self {
178 database_id,
179 definition: info.definition.clone(),
180 create_type: info.create_type.into(),
181 job_id,
182 snapshot_backfill_upstream_tables,
183 barrier_control,
184 backfill_epoch,
185 status: CreatingStreamingJobStatus::ConsumingSnapshot {
186 prev_epoch_fake_physical_time,
187 pending_upstream_barriers: vec![],
188 version_stats: version_stat.clone(),
189 create_mview_tracker,
190 snapshot_backfill_actors,
191 backfill_epoch,
192 fragment_infos,
193 pending_non_checkpoint_barriers,
194 },
195 upstream_lag: GLOBAL_META_METRICS
196 .snapshot_backfill_lag
197 .with_guarded_label_values(&[&format!("{}", job_id)]),
198 node_actors,
199 state_table_ids,
200 })
201 }
202
203 fn resolve_upstream_log_epochs(
204 snapshot_backfill_upstream_tables: &HashSet<TableId>,
205 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
206 committed_epoch: u64,
207 upstream_curr_epoch: u64,
208 ) -> MetaResult<Vec<BarrierInfo>> {
209 let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
211 let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
212 loop {
213 let (_, checkpoint_epoch) = epochs_iter.next().expect("not reach committed epoch yet");
214 if *checkpoint_epoch < committed_epoch {
215 continue;
216 }
217 assert_eq!(*checkpoint_epoch, committed_epoch);
218 break;
219 }
220 let mut ret = vec![];
221 let mut prev_epoch = committed_epoch;
222 let mut pending_non_checkpoint_barriers = vec![];
223 for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
224 for (i, epoch) in non_checkpoint_epochs
225 .iter()
226 .chain([checkpoint_epoch])
227 .enumerate()
228 {
229 assert!(*epoch > prev_epoch);
230 pending_non_checkpoint_barriers.push(prev_epoch);
231 ret.push(BarrierInfo {
232 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
233 curr_epoch: TracedEpoch::new(Epoch(*epoch)),
234 kind: if i == 0 {
235 BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
236 } else {
237 BarrierKind::Barrier
238 },
239 });
240 prev_epoch = *epoch;
241 }
242 }
243 ret.push(BarrierInfo {
244 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
245 curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
246 kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
247 });
248 Ok(ret)
249 }
250
251 fn recover_consuming_snapshot(
252 job_id: JobId,
253 definition: &String,
254 snapshot_backfill_upstream_tables: &HashSet<TableId>,
255 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
256 backfill_epoch: u64,
257 committed_epoch: u64,
258 upstream_curr_epoch: u64,
259 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
260 version_stat: &HummockVersionStats,
261 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
262 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
263 let mut pending_non_checkpoint_barriers = vec![];
264 let create_mview_tracker = CreateMviewProgressTracker::recover(
265 job_id,
266 definition.clone(),
267 &fragment_infos,
268 Default::default(),
269 version_stat,
270 );
271 let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
272 &mut prev_epoch_fake_physical_time,
273 &mut pending_non_checkpoint_barriers,
274 PbBarrierKind::Initial,
275 );
276 Ok((
277 CreatingStreamingJobStatus::ConsumingSnapshot {
278 prev_epoch_fake_physical_time,
279 pending_upstream_barriers: Self::resolve_upstream_log_epochs(
280 snapshot_backfill_upstream_tables,
281 upstream_table_log_epochs,
282 backfill_epoch,
283 upstream_curr_epoch,
284 )?,
285 version_stats: version_stat.clone(),
286 create_mview_tracker,
287 snapshot_backfill_actors: InflightStreamingJobInfo::snapshot_backfill_actor_ids(
288 &fragment_infos,
289 )
290 .collect(),
291 fragment_infos,
292 backfill_epoch,
293 pending_non_checkpoint_barriers,
294 },
295 barrier_info,
296 ))
297 }
298
299 fn recover_consuming_log_store(
300 job_id: JobId,
301 snapshot_backfill_upstream_tables: &HashSet<TableId>,
302 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
303 committed_epoch: u64,
304 upstream_curr_epoch: u64,
305 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
306 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
307 let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
308 snapshot_backfill_upstream_tables,
309 upstream_table_log_epochs,
310 committed_epoch,
311 upstream_curr_epoch,
312 )?;
313 let mut first_barrier = barriers_to_inject.remove(0);
314 assert!(first_barrier.kind.is_checkpoint());
315 first_barrier.kind = BarrierKind::Initial;
316 Ok((
317 CreatingStreamingJobStatus::ConsumingLogStore {
318 tracking_job: TrackingJob::recovered(job_id, &fragment_infos),
319 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
320 InflightStreamingJobInfo::snapshot_backfill_actor_ids(&fragment_infos),
321 barriers_to_inject
322 .last()
323 .map(|info| info.prev_epoch() - committed_epoch)
324 .unwrap_or(0),
325 ),
326 barriers_to_inject: Some(barriers_to_inject),
327 fragment_infos,
328 },
329 first_barrier,
330 ))
331 }
332
333 #[expect(clippy::too_many_arguments)]
334 pub(crate) fn recover(
335 database_id: DatabaseId,
336 job_id: JobId,
337 definition: String,
338 snapshot_backfill_upstream_tables: HashSet<TableId>,
339 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
340 backfill_epoch: u64,
341 committed_epoch: u64,
342 upstream_curr_epoch: u64,
343 fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
344 version_stat: &HummockVersionStats,
345 new_actors: StreamJobActorsToCreate,
346 initial_mutation: Mutation,
347 control_stream_manager: &mut ControlStreamManager,
348 ) -> MetaResult<Self> {
349 debug!(
350 %job_id,
351 definition,
352 "recovered creating job"
353 );
354 let mut barrier_control =
355 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
356
357 let node_actors = InflightFragmentInfo::actor_ids_to_collect(fragment_infos.values());
358 let state_table_ids =
359 InflightFragmentInfo::existing_table_ids(fragment_infos.values()).collect();
360
361 let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
362 Self::recover_consuming_snapshot(
363 job_id,
364 &definition,
365 &snapshot_backfill_upstream_tables,
366 upstream_table_log_epochs,
367 backfill_epoch,
368 committed_epoch,
369 upstream_curr_epoch,
370 fragment_infos,
371 version_stat,
372 )?
373 } else {
374 Self::recover_consuming_log_store(
375 job_id,
376 &snapshot_backfill_upstream_tables,
377 upstream_table_log_epochs,
378 committed_epoch,
379 upstream_curr_epoch,
380 fragment_infos,
381 )?
382 };
383 control_stream_manager.add_partial_graph(database_id, Some(job_id));
384
385 Self::inject_barrier(
386 database_id,
387 job_id,
388 control_stream_manager,
389 &mut barrier_control,
390 &node_actors,
391 Some(&state_table_ids),
392 first_barrier_info,
393 Some(new_actors),
394 Some(initial_mutation),
395 )?;
396 Ok(Self {
397 database_id,
398 job_id,
399 definition,
400 create_type: CreateType::Background,
401 snapshot_backfill_upstream_tables,
402 backfill_epoch,
403 node_actors,
404 state_table_ids,
405 barrier_control,
406 status,
407 upstream_lag: GLOBAL_META_METRICS
408 .snapshot_backfill_lag
409 .with_guarded_label_values(&[&format!("{}", job_id)]),
410 })
411 }
412
413 pub(crate) fn is_empty(&self) -> bool {
414 self.barrier_control.is_empty()
415 }
416
417 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
418 self.barrier_control.is_valid_after_worker_err(worker_id)
419 && self
420 .status
421 .fragment_infos()
422 .map(|fragment_infos| {
423 InflightFragmentInfo::contains_worker(fragment_infos.values(), worker_id)
424 })
425 .unwrap_or(true)
426 }
427
428 pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
429 let progress = match &self.status {
430 CreatingStreamingJobStatus::ConsumingSnapshot {
431 create_mview_tracker,
432 ..
433 } => {
434 if create_mview_tracker.is_finished() {
435 "Snapshot finished".to_owned()
436 } else {
437 let progress = create_mview_tracker.gen_ddl_progress();
438 format!("Snapshot [{}]", progress.progress)
439 }
440 }
441 CreatingStreamingJobStatus::ConsumingLogStore {
442 log_store_progress_tracker,
443 ..
444 } => {
445 format!(
446 "LogStore [{}]",
447 log_store_progress_tracker.gen_ddl_progress()
448 )
449 }
450 CreatingStreamingJobStatus::Finishing(..) => {
451 format!(
452 "Finishing [epoch count: {}]",
453 self.barrier_control.inflight_barrier_count()
454 )
455 }
456 CreatingStreamingJobStatus::PlaceHolder => {
457 unreachable!()
458 }
459 };
460 DdlProgress {
461 id: self.job_id.as_raw_id() as u64,
462 statement: self.definition.clone(),
463 create_type: self.create_type.as_str().to_owned(),
464 progress,
465 }
466 }
467
468 pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
469 if self.status.is_finishing() {
470 None
471 } else {
472 Some(max(
474 self.barrier_control.max_collected_epoch().unwrap_or(0),
475 self.backfill_epoch,
476 ))
477 }
478 }
479
480 fn inject_barrier(
481 database_id: DatabaseId,
482 job_id: JobId,
483 control_stream_manager: &mut ControlStreamManager,
484 barrier_control: &mut CreatingStreamingJobBarrierControl,
485 node_actors: &HashMap<WorkerId, HashSet<ActorId>>,
486 state_table_ids: Option<&HashSet<TableId>>,
487 barrier_info: BarrierInfo,
488 new_actors: Option<StreamJobActorsToCreate>,
489 mutation: Option<Mutation>,
490 ) -> MetaResult<()> {
491 let node_to_collect = control_stream_manager.inject_barrier(
492 database_id,
493 Some(job_id),
494 mutation,
495 &barrier_info,
496 node_actors,
497 state_table_ids.into_iter().flatten().copied(),
498 new_actors,
499 )?;
500 barrier_control.enqueue_epoch(
501 barrier_info.prev_epoch(),
502 node_to_collect,
503 barrier_info.kind.clone(),
504 );
505 Ok(())
506 }
507
508 pub(super) fn start_consume_upstream(
509 &mut self,
510 control_stream_manager: &mut ControlStreamManager,
511 barrier_info: &BarrierInfo,
512 ) -> MetaResult<HashMap<FragmentId, InflightFragmentInfo>> {
513 info!(
514 job_id = %self.job_id,
515 prev_epoch = barrier_info.prev_epoch(),
516 "start consuming upstream"
517 );
518 let fragment_infos = self.status.start_consume_upstream(barrier_info);
519 Self::inject_barrier(
520 self.database_id,
521 self.job_id,
522 control_stream_manager,
523 &mut self.barrier_control,
524 &self.node_actors,
525 None,
526 barrier_info.clone(),
527 None,
528 None,
529 )?;
530 Ok(fragment_infos)
531 }
532
533 pub(super) fn on_new_upstream_barrier(
534 &mut self,
535 control_stream_manager: &mut ControlStreamManager,
536 barrier_info: &BarrierInfo,
537 ) -> MetaResult<()> {
538 let progress_epoch =
539 if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
540 max(max_collected_epoch, self.backfill_epoch)
541 } else {
542 self.backfill_epoch
543 };
544 self.upstream_lag.set(
545 barrier_info
546 .prev_epoch
547 .value()
548 .0
549 .saturating_sub(progress_epoch) as _,
550 );
551 {
552 for (barrier_to_inject, mutation) in self.status.on_new_upstream_epoch(barrier_info) {
553 Self::inject_barrier(
554 self.database_id,
555 self.job_id,
556 control_stream_manager,
557 &mut self.barrier_control,
558 &self.node_actors,
559 Some(&self.state_table_ids),
560 barrier_to_inject,
561 None,
562 mutation,
563 )?;
564 }
565 }
566 Ok(())
567 }
568
569 pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
570 self.status.update_progress(&resp.create_mview_progress);
571 self.barrier_control.collect(resp);
572 self.should_merge_to_upstream().is_some()
573 }
574
575 pub(super) fn should_merge_to_upstream(&self) -> Option<&HashSet<TableId>> {
576 if let CreatingStreamingJobStatus::ConsumingLogStore {
577 log_store_progress_tracker,
578 barriers_to_inject,
579 ..
580 } = &self.status
581 && barriers_to_inject.is_none()
582 && log_store_progress_tracker.is_finished()
583 {
584 Some(&self.snapshot_backfill_upstream_tables)
585 } else {
586 None
587 }
588 }
589}
590
591pub(super) enum CompleteJobType {
592 First,
594 Normal,
595 Finished,
597}
598
599impl CreatingStreamingJobControl {
600 pub(super) fn start_completing(
601 &mut self,
602 min_upstream_inflight_epoch: Option<u64>,
603 ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
604 let (finished_at_epoch, epoch_end_bound) = match &self.status {
605 CreatingStreamingJobStatus::Finishing(finish_at_epoch, _) => {
606 let epoch_end_bound = min_upstream_inflight_epoch
607 .map(|upstream_epoch| {
608 if upstream_epoch < *finish_at_epoch {
609 Excluded(upstream_epoch)
610 } else {
611 Unbounded
612 }
613 })
614 .unwrap_or(Unbounded);
615 (Some(*finish_at_epoch), epoch_end_bound)
616 }
617 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
618 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
619 None,
620 min_upstream_inflight_epoch
621 .map(Excluded)
622 .unwrap_or(Unbounded),
623 ),
624 CreatingStreamingJobStatus::PlaceHolder => {
625 unreachable!()
626 }
627 };
628 self.barrier_control.start_completing(epoch_end_bound).map(
629 |(epoch, resps, is_first_commit)| {
630 let status = if let Some(finish_at_epoch) = finished_at_epoch {
631 assert!(!is_first_commit);
632 if epoch == finish_at_epoch {
633 self.barrier_control.ack_completed(epoch);
634 assert!(self.barrier_control.is_empty());
635 CompleteJobType::Finished
636 } else {
637 CompleteJobType::Normal
638 }
639 } else if is_first_commit {
640 CompleteJobType::First
641 } else {
642 CompleteJobType::Normal
643 };
644 (epoch, resps, status)
645 },
646 )
647 }
648
649 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
650 self.barrier_control.ack_completed(completed_epoch);
651 }
652
653 pub fn is_consuming(&self) -> bool {
654 match &self.status {
655 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
656 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
657 CreatingStreamingJobStatus::Finishing(..) => false,
658 CreatingStreamingJobStatus::PlaceHolder => {
659 unreachable!()
660 }
661 }
662 }
663
664 pub fn state_table_ids(&self) -> &HashSet<TableId> {
665 &self.state_table_ids
666 }
667
668 pub fn fragment_infos_with_job_id(
669 &self,
670 ) -> impl Iterator<Item = (&InflightFragmentInfo, JobId)> + '_ {
671 self.status
672 .fragment_infos()
673 .into_iter()
674 .flat_map(|fragments| fragments.values().map(|fragment| (fragment, self.job_id)))
675 }
676
677 pub fn into_tracking_job(self) -> TrackingJob {
678 assert!(self.barrier_control.is_empty());
679 match self.status {
680 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
681 | CreatingStreamingJobStatus::ConsumingLogStore { .. }
682 | CreatingStreamingJobStatus::PlaceHolder => {
683 unreachable!("expect finish")
684 }
685 CreatingStreamingJobStatus::Finishing(_, tracking_job) => tracking_job,
686 }
687 }
688}