1mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::LabelGuardedIntGauge;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::cdc::build_pb_actor_cdc_table_snapshot_splits;
28use risingwave_meta_model::{CreateType, WorkerId};
29use risingwave_pb::ddl_service::DdlProgress;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::stream_plan::AddMutation;
32use risingwave_pb::stream_plan::barrier::PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_service::BarrierCompleteResponse;
35use status::CreatingStreamingJobStatus;
36use tracing::{debug, info};
37
38use crate::MetaResult;
39use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
40use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
41use crate::barrier::edge_builder::FragmentEdgeBuildResult;
42use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
43use crate::barrier::progress::CreateMviewProgressTracker;
44use crate::barrier::rpc::ControlStreamManager;
45use crate::barrier::{
46 BackfillOrderState, BarrierKind, Command, CreateStreamingJobCommandInfo, TracedEpoch,
47};
48use crate::controller::fragment::InflightFragmentInfo;
49use crate::model::{StreamJobActorsToCreate, StreamJobFragments};
50use crate::rpc::metrics::GLOBAL_META_METRICS;
51use crate::stream::build_actor_connector_splits;
52
53#[derive(Debug)]
54pub(crate) struct CreatingStreamingJobControl {
55 database_id: DatabaseId,
56 pub(super) job_id: TableId,
57 definition: String,
58 create_type: CreateType,
59 pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
60 backfill_epoch: u64,
61
62 graph_info: InflightStreamingJobInfo,
63
64 barrier_control: CreatingStreamingJobBarrierControl,
65 status: CreatingStreamingJobStatus,
66
67 upstream_lag: LabelGuardedIntGauge,
68}
69
70impl CreatingStreamingJobControl {
71 pub(super) fn new(
72 info: &CreateStreamingJobCommandInfo,
73 snapshot_backfill_upstream_tables: HashSet<TableId>,
74 backfill_epoch: u64,
75 version_stat: &HummockVersionStats,
76 control_stream_manager: &mut ControlStreamManager,
77 edges: &mut FragmentEdgeBuildResult,
78 ) -> MetaResult<Self> {
79 let job_id = info.stream_job_fragments.stream_job_id();
80 let database_id = DatabaseId::new(info.streaming_job.database_id());
81 debug!(
82 %job_id,
83 definition = info.definition,
84 "new creating job"
85 );
86 let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
87 let backfill_nodes_to_pause =
88 get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
89 .into_iter()
90 .collect();
91 let backfill_order_state = BackfillOrderState::new(
92 info.fragment_backfill_ordering.clone(),
93 &info.stream_job_fragments,
94 );
95 let create_mview_tracker = CreateMviewProgressTracker::recover(
96 [(
97 job_id,
98 (
99 info.definition.clone(),
100 &*info.stream_job_fragments,
101 backfill_order_state,
102 ),
103 )],
104 version_stat,
105 );
106 let fragment_infos: HashMap<_, _> = info.stream_job_fragments.new_fragment_info().collect();
107
108 let actors_to_create =
109 edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create());
110
111 let graph_info = InflightStreamingJobInfo {
112 job_id,
113 fragment_infos,
114 };
115
116 let mut barrier_control =
117 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
118
119 let mut prev_epoch_fake_physical_time = 0;
120 let mut pending_non_checkpoint_barriers = vec![];
121
122 let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
123 &mut prev_epoch_fake_physical_time,
124 &mut pending_non_checkpoint_barriers,
125 PbBarrierKind::Checkpoint,
126 );
127
128 let added_actors = info.stream_job_fragments.actor_ids();
129 let actor_splits = info
130 .init_split_assignment
131 .values()
132 .flat_map(build_actor_connector_splits)
133 .collect();
134
135 let initial_mutation = Mutation::Add(AddMutation {
136 actor_dispatchers: Default::default(),
138 added_actors,
139 actor_splits,
140 pause: false,
142 subscriptions_to_add: Default::default(),
143 backfill_nodes_to_pause,
144 actor_cdc_table_snapshot_splits: build_pb_actor_cdc_table_snapshot_splits(
145 info.cdc_table_snapshot_split_assignment.clone(),
146 ),
147 });
148
149 control_stream_manager.add_partial_graph(database_id, Some(job_id));
150 Self::inject_barrier(
151 database_id,
152 job_id,
153 control_stream_manager,
154 &mut barrier_control,
155 &graph_info,
156 Some(&graph_info),
157 initial_barrier_info,
158 Some(actors_to_create),
159 Some(initial_mutation),
160 )?;
161
162 assert!(pending_non_checkpoint_barriers.is_empty());
163
164 Ok(Self {
165 database_id,
166 definition: info.definition.clone(),
167 create_type: info.create_type.into(),
168 job_id,
169 snapshot_backfill_upstream_tables,
170 barrier_control,
171 backfill_epoch,
172 graph_info,
173 status: CreatingStreamingJobStatus::ConsumingSnapshot {
174 prev_epoch_fake_physical_time,
175 pending_upstream_barriers: vec![],
176 version_stats: version_stat.clone(),
177 create_mview_tracker,
178 snapshot_backfill_actors,
179 backfill_epoch,
180 pending_non_checkpoint_barriers,
181 },
182 upstream_lag: GLOBAL_META_METRICS
183 .snapshot_backfill_lag
184 .with_guarded_label_values(&[&format!("{}", job_id)]),
185 })
186 }
187
188 fn resolve_upstream_log_epochs(
189 snapshot_backfill_upstream_tables: &HashSet<TableId>,
190 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
191 committed_epoch: u64,
192 upstream_curr_epoch: u64,
193 ) -> MetaResult<Vec<BarrierInfo>> {
194 let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
196 let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
197 loop {
198 let (_, checkpoint_epoch) = epochs_iter.next().expect("not reach committed epoch yet");
199 if *checkpoint_epoch < committed_epoch {
200 continue;
201 }
202 assert_eq!(*checkpoint_epoch, committed_epoch);
203 break;
204 }
205 let mut ret = vec![];
206 let mut prev_epoch = committed_epoch;
207 let mut pending_non_checkpoint_barriers = vec![];
208 for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
209 for (i, epoch) in non_checkpoint_epochs
210 .iter()
211 .chain([checkpoint_epoch])
212 .enumerate()
213 {
214 assert!(*epoch > prev_epoch);
215 pending_non_checkpoint_barriers.push(prev_epoch);
216 ret.push(BarrierInfo {
217 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
218 curr_epoch: TracedEpoch::new(Epoch(*epoch)),
219 kind: if i == 0 {
220 BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
221 } else {
222 BarrierKind::Barrier
223 },
224 });
225 prev_epoch = *epoch;
226 }
227 }
228 ret.push(BarrierInfo {
229 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
230 curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
231 kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
232 });
233 Ok(ret)
234 }
235
236 fn recover_consuming_snapshot(
237 job_id: TableId,
238 definition: &String,
239 snapshot_backfill_upstream_tables: &HashSet<TableId>,
240 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
241 backfill_epoch: u64,
242 committed_epoch: u64,
243 upstream_curr_epoch: u64,
244 stream_job_fragments: StreamJobFragments,
245 version_stat: &HummockVersionStats,
246 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
247 let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
248 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
249 let mut pending_non_checkpoint_barriers = vec![];
250 let create_mview_tracker = CreateMviewProgressTracker::recover(
251 [(
252 job_id,
253 (
254 definition.clone(),
255 &stream_job_fragments,
256 Default::default(),
257 ),
258 )],
259 version_stat,
260 );
261 let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
262 &mut prev_epoch_fake_physical_time,
263 &mut pending_non_checkpoint_barriers,
264 PbBarrierKind::Initial,
265 );
266 Ok((
267 CreatingStreamingJobStatus::ConsumingSnapshot {
268 prev_epoch_fake_physical_time,
269 pending_upstream_barriers: Self::resolve_upstream_log_epochs(
270 snapshot_backfill_upstream_tables,
271 upstream_table_log_epochs,
272 backfill_epoch,
273 upstream_curr_epoch,
274 )?,
275 version_stats: version_stat.clone(),
276 create_mview_tracker,
277 snapshot_backfill_actors,
278 backfill_epoch,
279 pending_non_checkpoint_barriers,
280 },
281 barrier_info,
282 ))
283 }
284
285 fn recover_consuming_log_store(
286 snapshot_backfill_upstream_tables: &HashSet<TableId>,
287 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
288 committed_epoch: u64,
289 upstream_curr_epoch: u64,
290 stream_job_fragments: StreamJobFragments,
291 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
292 let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
293 let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
294 snapshot_backfill_upstream_tables,
295 upstream_table_log_epochs,
296 committed_epoch,
297 upstream_curr_epoch,
298 )?;
299 let mut first_barrier = barriers_to_inject.remove(0);
300 assert!(first_barrier.kind.is_checkpoint());
301 first_barrier.kind = BarrierKind::Initial;
302 Ok((
303 CreatingStreamingJobStatus::ConsumingLogStore {
304 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
305 snapshot_backfill_actors.into_iter(),
306 barriers_to_inject
307 .last()
308 .map(|info| info.prev_epoch() - committed_epoch)
309 .unwrap_or(0),
310 ),
311 barriers_to_inject: Some(barriers_to_inject),
312 },
313 first_barrier,
314 ))
315 }
316
317 #[expect(clippy::too_many_arguments)]
318 pub(crate) fn recover(
319 database_id: DatabaseId,
320 job_id: TableId,
321 definition: String,
322 snapshot_backfill_upstream_tables: HashSet<TableId>,
323 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
324 backfill_epoch: u64,
325 committed_epoch: u64,
326 upstream_curr_epoch: u64,
327 graph_info: InflightStreamingJobInfo,
328 stream_job_fragments: StreamJobFragments,
329 version_stat: &HummockVersionStats,
330 new_actors: StreamJobActorsToCreate,
331 initial_mutation: Mutation,
332 control_stream_manager: &mut ControlStreamManager,
333 ) -> MetaResult<Self> {
334 debug!(
335 %job_id,
336 definition,
337 "recovered creating job"
338 );
339 let mut barrier_control =
340 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
341
342 let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
343 Self::recover_consuming_snapshot(
344 job_id,
345 &definition,
346 &snapshot_backfill_upstream_tables,
347 upstream_table_log_epochs,
348 backfill_epoch,
349 committed_epoch,
350 upstream_curr_epoch,
351 stream_job_fragments,
352 version_stat,
353 )?
354 } else {
355 Self::recover_consuming_log_store(
356 &snapshot_backfill_upstream_tables,
357 upstream_table_log_epochs,
358 committed_epoch,
359 upstream_curr_epoch,
360 stream_job_fragments,
361 )?
362 };
363 control_stream_manager.add_partial_graph(database_id, Some(job_id));
364 Self::inject_barrier(
365 database_id,
366 job_id,
367 control_stream_manager,
368 &mut barrier_control,
369 &graph_info,
370 Some(&graph_info),
371 first_barrier_info,
372 Some(new_actors),
373 Some(initial_mutation),
374 )?;
375 Ok(Self {
376 database_id,
377 job_id,
378 definition,
379 create_type: CreateType::Background,
380 snapshot_backfill_upstream_tables,
381 backfill_epoch,
382 graph_info,
383 barrier_control,
384 status,
385 upstream_lag: GLOBAL_META_METRICS
386 .snapshot_backfill_lag
387 .with_guarded_label_values(&[&format!("{}", job_id)]),
388 })
389 }
390
391 pub(crate) fn is_empty(&self) -> bool {
392 self.barrier_control.is_empty()
393 }
394
395 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
396 self.barrier_control.is_valid_after_worker_err(worker_id)
397 && (!self.status.is_finishing()
398 || InflightFragmentInfo::contains_worker(
399 self.graph_info.fragment_infos(),
400 worker_id,
401 ))
402 }
403
404 pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
405 let progress = match &self.status {
406 CreatingStreamingJobStatus::ConsumingSnapshot {
407 create_mview_tracker,
408 ..
409 } => {
410 if create_mview_tracker.has_pending_finished_jobs() {
411 "Snapshot finished".to_owned()
412 } else {
413 let progress = create_mview_tracker
414 .gen_ddl_progress()
415 .remove(&self.job_id.table_id)
416 .expect("should exist");
417 format!("Snapshot [{}]", progress.progress)
418 }
419 }
420 CreatingStreamingJobStatus::ConsumingLogStore {
421 log_store_progress_tracker,
422 ..
423 } => {
424 format!(
425 "LogStore [{}]",
426 log_store_progress_tracker.gen_ddl_progress()
427 )
428 }
429 CreatingStreamingJobStatus::Finishing(_) => {
430 format!(
431 "Finishing [epoch count: {}]",
432 self.barrier_control.inflight_barrier_count()
433 )
434 }
435 };
436 DdlProgress {
437 id: self.job_id.table_id as u64,
438 statement: self.definition.clone(),
439 create_type: self.create_type.as_str().to_owned(),
440 progress,
441 }
442 }
443
444 pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
445 if self.status.is_finishing() {
446 None
447 } else {
448 Some(max(
450 self.barrier_control.max_collected_epoch().unwrap_or(0),
451 self.backfill_epoch,
452 ))
453 }
454 }
455
456 fn inject_barrier(
457 database_id: DatabaseId,
458 table_id: TableId,
459 control_stream_manager: &mut ControlStreamManager,
460 barrier_control: &mut CreatingStreamingJobBarrierControl,
461 pre_applied_graph_info: &InflightStreamingJobInfo,
462 applied_graph_info: Option<&InflightStreamingJobInfo>,
463 barrier_info: BarrierInfo,
464 new_actors: Option<StreamJobActorsToCreate>,
465 mutation: Option<Mutation>,
466 ) -> MetaResult<()> {
467 let node_to_collect = control_stream_manager.inject_barrier(
468 database_id,
469 Some(table_id),
470 mutation,
471 &barrier_info,
472 pre_applied_graph_info.fragment_infos(),
473 applied_graph_info
474 .map(|graph_info| graph_info.fragment_infos())
475 .into_iter()
476 .flatten(),
477 new_actors,
478 vec![],
479 vec![],
480 )?;
481 barrier_control.enqueue_epoch(
482 barrier_info.prev_epoch(),
483 node_to_collect,
484 barrier_info.kind.clone(),
485 );
486 Ok(())
487 }
488
489 pub(super) fn on_new_command(
490 &mut self,
491 control_stream_manager: &mut ControlStreamManager,
492 command: Option<&Command>,
493 barrier_info: &BarrierInfo,
494 ) -> MetaResult<()> {
495 let table_id = self.job_id;
496 let start_consume_upstream =
497 if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
498 jobs_to_merge.contains_key(&table_id)
499 } else {
500 false
501 };
502 if start_consume_upstream {
503 info!(
504 table_id = self.job_id.table_id,
505 prev_epoch = barrier_info.prev_epoch(),
506 "start consuming upstream"
507 );
508 }
509 let progress_epoch =
510 if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
511 max(max_collected_epoch, self.backfill_epoch)
512 } else {
513 self.backfill_epoch
514 };
515 self.upstream_lag.set(
516 barrier_info
517 .prev_epoch
518 .value()
519 .0
520 .saturating_sub(progress_epoch) as _,
521 );
522 if start_consume_upstream {
523 self.status.start_consume_upstream(barrier_info);
524 Self::inject_barrier(
525 self.database_id,
526 self.job_id,
527 control_stream_manager,
528 &mut self.barrier_control,
529 &self.graph_info,
530 None,
531 barrier_info.clone(),
532 None,
533 None,
534 )?;
535 } else {
536 for (barrier_to_inject, mutation) in self.status.on_new_upstream_epoch(barrier_info) {
537 Self::inject_barrier(
538 self.database_id,
539 self.job_id,
540 control_stream_manager,
541 &mut self.barrier_control,
542 &self.graph_info,
543 Some(&self.graph_info),
544 barrier_to_inject,
545 None,
546 mutation,
547 )?;
548 }
549 }
550 Ok(())
551 }
552
553 pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
554 self.status.update_progress(&resp.create_mview_progress);
555 self.barrier_control.collect(resp);
556 self.should_merge_to_upstream().is_some()
557 }
558
559 pub(super) fn should_merge_to_upstream(&self) -> Option<&InflightStreamingJobInfo> {
560 if let CreatingStreamingJobStatus::ConsumingLogStore {
561 log_store_progress_tracker,
562 barriers_to_inject,
563 } = &self.status
564 && barriers_to_inject.is_none()
565 && log_store_progress_tracker.is_finished()
566 {
567 Some(&self.graph_info)
568 } else {
569 None
570 }
571 }
572}
573
574pub(super) enum CompleteJobType {
575 First,
577 Normal,
578 Finished,
580}
581
582impl CreatingStreamingJobControl {
583 pub(super) fn start_completing(
584 &mut self,
585 min_upstream_inflight_epoch: Option<u64>,
586 ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
587 let (finished_at_epoch, epoch_end_bound) = match &self.status {
588 CreatingStreamingJobStatus::Finishing(finish_at_epoch) => {
589 let epoch_end_bound = min_upstream_inflight_epoch
590 .map(|upstream_epoch| {
591 if upstream_epoch < *finish_at_epoch {
592 Excluded(upstream_epoch)
593 } else {
594 Unbounded
595 }
596 })
597 .unwrap_or(Unbounded);
598 (Some(*finish_at_epoch), epoch_end_bound)
599 }
600 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
601 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
602 None,
603 min_upstream_inflight_epoch
604 .map(Excluded)
605 .unwrap_or(Unbounded),
606 ),
607 };
608 self.barrier_control.start_completing(epoch_end_bound).map(
609 |(epoch, resps, is_first_commit)| {
610 let status = if let Some(finish_at_epoch) = finished_at_epoch {
611 assert!(!is_first_commit);
612 if epoch == finish_at_epoch {
613 self.barrier_control.ack_completed(epoch);
614 assert!(self.barrier_control.is_empty());
615 CompleteJobType::Finished
616 } else {
617 CompleteJobType::Normal
618 }
619 } else if is_first_commit {
620 CompleteJobType::First
621 } else {
622 CompleteJobType::Normal
623 };
624 (epoch, resps, status)
625 },
626 )
627 }
628
629 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
630 self.barrier_control.ack_completed(completed_epoch);
631 }
632
633 pub(super) fn is_finished(&self) -> bool {
634 self.barrier_control.is_empty() && self.status.is_finishing()
635 }
636
637 pub fn is_consuming(&self) -> bool {
638 match &self.status {
639 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
640 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
641 CreatingStreamingJobStatus::Finishing(_) => false,
642 }
643 }
644
645 pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
646 self.graph_info.existing_table_ids()
647 }
648
649 pub fn graph_info(&self) -> &InflightStreamingJobInfo {
650 &self.graph_info
651 }
652}