1mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::LabelGuardedIntGauge;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::stream_plan::AddMutation;
31use risingwave_pb::stream_plan::barrier::PbBarrierKind;
32use risingwave_pb::stream_plan::barrier_mutation::Mutation;
33use risingwave_pb::stream_service::BarrierCompleteResponse;
34use status::CreatingStreamingJobStatus;
35use tracing::{debug, info};
36
37use crate::MetaResult;
38use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
39use crate::barrier::edge_builder::FragmentEdgeBuildResult;
40use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
41use crate::barrier::progress::CreateMviewProgressTracker;
42use crate::barrier::rpc::ControlStreamManager;
43use crate::barrier::{BarrierKind, Command, CreateStreamingJobCommandInfo, TracedEpoch};
44use crate::controller::fragment::InflightFragmentInfo;
45use crate::model::{StreamJobActorsToCreate, StreamJobFragments};
46use crate::rpc::metrics::GLOBAL_META_METRICS;
47use crate::stream::build_actor_connector_splits;
48
49#[derive(Debug)]
50pub(crate) struct CreatingStreamingJobControl {
51 database_id: DatabaseId,
52 pub(super) job_id: TableId,
53 definition: String,
54 pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
55 backfill_epoch: u64,
56
57 graph_info: InflightStreamingJobInfo,
58
59 barrier_control: CreatingStreamingJobBarrierControl,
60 status: CreatingStreamingJobStatus,
61
62 upstream_lag: LabelGuardedIntGauge,
63}
64
65impl CreatingStreamingJobControl {
66 pub(super) fn new(
67 info: &CreateStreamingJobCommandInfo,
68 snapshot_backfill_upstream_tables: HashSet<TableId>,
69 backfill_epoch: u64,
70 version_stat: &HummockVersionStats,
71 control_stream_manager: &mut ControlStreamManager,
72 edges: &mut FragmentEdgeBuildResult,
73 ) -> MetaResult<Self> {
74 let job_id = info.stream_job_fragments.stream_job_id();
75 let database_id = DatabaseId::new(info.streaming_job.database_id());
76 debug!(
77 %job_id,
78 definition = info.definition,
79 "new creating job"
80 );
81 let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
82 let create_mview_tracker = CreateMviewProgressTracker::recover(
83 [(
84 job_id,
85 (info.definition.clone(), &*info.stream_job_fragments),
86 )],
87 version_stat,
88 );
89 let fragment_infos: HashMap<_, _> = info.stream_job_fragments.new_fragment_info().collect();
90
91 let actors_to_create =
92 edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create());
93
94 let graph_info = InflightStreamingJobInfo {
95 job_id,
96 fragment_infos,
97 };
98
99 let mut barrier_control =
100 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
101
102 let mut prev_epoch_fake_physical_time = 0;
103 let mut pending_non_checkpoint_barriers = vec![];
104
105 let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
106 &mut prev_epoch_fake_physical_time,
107 &mut pending_non_checkpoint_barriers,
108 PbBarrierKind::Checkpoint,
109 );
110
111 let added_actors = info.stream_job_fragments.actor_ids();
112 let actor_splits = info
113 .init_split_assignment
114 .values()
115 .flat_map(build_actor_connector_splits)
116 .collect();
117
118 let initial_mutation = Mutation::Add(AddMutation {
119 actor_dispatchers: Default::default(),
121 added_actors,
122 actor_splits,
123 pause: false,
125 subscriptions_to_add: Default::default(),
126 });
127
128 control_stream_manager.add_partial_graph(database_id, Some(job_id));
129 Self::inject_barrier(
130 database_id,
131 job_id,
132 control_stream_manager,
133 &mut barrier_control,
134 &graph_info,
135 Some(&graph_info),
136 initial_barrier_info,
137 Some(actors_to_create),
138 Some(initial_mutation),
139 )?;
140
141 assert!(pending_non_checkpoint_barriers.is_empty());
142
143 Ok(Self {
144 database_id,
145 definition: info.definition.clone(),
146 job_id,
147 snapshot_backfill_upstream_tables,
148 barrier_control,
149 backfill_epoch,
150 graph_info,
151 status: CreatingStreamingJobStatus::ConsumingSnapshot {
152 prev_epoch_fake_physical_time,
153 pending_upstream_barriers: vec![],
154 version_stats: version_stat.clone(),
155 create_mview_tracker,
156 snapshot_backfill_actors,
157 backfill_epoch,
158 pending_non_checkpoint_barriers,
159 },
160 upstream_lag: GLOBAL_META_METRICS
161 .snapshot_backfill_lag
162 .with_guarded_label_values(&[&format!("{}", job_id)]),
163 })
164 }
165
166 fn resolve_upstream_log_epochs(
167 snapshot_backfill_upstream_tables: &HashSet<TableId>,
168 upstream_table_log_epochs: &HashMap<TableId, Vec<Vec<u64>>>,
169 committed_epoch: u64,
170 upstream_curr_epoch: u64,
171 ) -> MetaResult<Vec<BarrierInfo>> {
172 let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
174 let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
175 loop {
176 let epochs = epochs_iter.next().expect("not reach committed epoch yet");
177 if *epochs.last().unwrap() < committed_epoch {
178 continue;
179 }
180 assert_eq!(*epochs.last().unwrap(), committed_epoch);
181 break;
182 }
183 let mut ret = vec![];
184 let mut prev_epoch = committed_epoch;
185 let mut pending_non_checkpoint_barriers = vec![];
186 for epochs in epochs_iter {
187 for (i, epoch) in epochs.iter().enumerate() {
188 assert!(*epoch > prev_epoch);
189 pending_non_checkpoint_barriers.push(prev_epoch);
190 ret.push(BarrierInfo {
191 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
192 curr_epoch: TracedEpoch::new(Epoch(*epoch)),
193 kind: if i == 0 {
194 BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
195 } else {
196 BarrierKind::Barrier
197 },
198 });
199 prev_epoch = *epoch;
200 }
201 }
202 ret.push(BarrierInfo {
203 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
204 curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
205 kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
206 });
207 Ok(ret)
208 }
209
210 fn recover_consuming_snapshot(
211 job_id: TableId,
212 definition: &String,
213 snapshot_backfill_upstream_tables: &HashSet<TableId>,
214 upstream_table_log_epochs: &HashMap<TableId, Vec<Vec<u64>>>,
215 backfill_epoch: u64,
216 committed_epoch: u64,
217 upstream_curr_epoch: u64,
218 stream_job_fragments: StreamJobFragments,
219 version_stat: &HummockVersionStats,
220 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
221 let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
222 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
223 let mut pending_non_checkpoint_barriers = vec![];
224 let create_mview_tracker = CreateMviewProgressTracker::recover(
225 [(job_id, (definition.clone(), &stream_job_fragments))],
226 version_stat,
227 );
228 let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
229 &mut prev_epoch_fake_physical_time,
230 &mut pending_non_checkpoint_barriers,
231 PbBarrierKind::Initial,
232 );
233 Ok((
234 CreatingStreamingJobStatus::ConsumingSnapshot {
235 prev_epoch_fake_physical_time,
236 pending_upstream_barriers: Self::resolve_upstream_log_epochs(
237 snapshot_backfill_upstream_tables,
238 upstream_table_log_epochs,
239 backfill_epoch,
240 upstream_curr_epoch,
241 )?,
242 version_stats: version_stat.clone(),
243 create_mview_tracker,
244 snapshot_backfill_actors,
245 backfill_epoch,
246 pending_non_checkpoint_barriers,
247 },
248 barrier_info,
249 ))
250 }
251
252 fn recover_consuming_log_store(
253 snapshot_backfill_upstream_tables: &HashSet<TableId>,
254 upstream_table_log_epochs: &HashMap<TableId, Vec<Vec<u64>>>,
255 committed_epoch: u64,
256 upstream_curr_epoch: u64,
257 stream_job_fragments: StreamJobFragments,
258 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
259 let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
260 let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
261 snapshot_backfill_upstream_tables,
262 upstream_table_log_epochs,
263 committed_epoch,
264 upstream_curr_epoch,
265 )?;
266 let mut first_barrier = barriers_to_inject.remove(0);
267 assert!(first_barrier.kind.is_checkpoint());
268 first_barrier.kind = BarrierKind::Initial;
269 Ok((
270 CreatingStreamingJobStatus::ConsumingLogStore {
271 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
272 snapshot_backfill_actors.into_iter(),
273 barriers_to_inject
274 .last()
275 .map(|info| info.prev_epoch() - committed_epoch)
276 .unwrap_or(0),
277 ),
278 barriers_to_inject: Some(barriers_to_inject),
279 },
280 first_barrier,
281 ))
282 }
283
284 #[expect(clippy::too_many_arguments)]
285 pub(crate) fn recover(
286 database_id: DatabaseId,
287 job_id: TableId,
288 definition: String,
289 snapshot_backfill_upstream_tables: HashSet<TableId>,
290 upstream_table_log_epochs: &HashMap<TableId, Vec<Vec<u64>>>,
291 backfill_epoch: u64,
292 committed_epoch: u64,
293 upstream_curr_epoch: u64,
294 graph_info: InflightStreamingJobInfo,
295 stream_job_fragments: StreamJobFragments,
296 version_stat: &HummockVersionStats,
297 new_actors: StreamJobActorsToCreate,
298 initial_mutation: Mutation,
299 control_stream_manager: &mut ControlStreamManager,
300 ) -> MetaResult<Self> {
301 debug!(
302 %job_id,
303 definition,
304 "recovered creating job"
305 );
306 let mut barrier_control =
307 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
308
309 let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
310 Self::recover_consuming_snapshot(
311 job_id,
312 &definition,
313 &snapshot_backfill_upstream_tables,
314 upstream_table_log_epochs,
315 backfill_epoch,
316 committed_epoch,
317 upstream_curr_epoch,
318 stream_job_fragments,
319 version_stat,
320 )?
321 } else {
322 Self::recover_consuming_log_store(
323 &snapshot_backfill_upstream_tables,
324 upstream_table_log_epochs,
325 committed_epoch,
326 upstream_curr_epoch,
327 stream_job_fragments,
328 )?
329 };
330 control_stream_manager.add_partial_graph(database_id, Some(job_id));
331 Self::inject_barrier(
332 database_id,
333 job_id,
334 control_stream_manager,
335 &mut barrier_control,
336 &graph_info,
337 Some(&graph_info),
338 first_barrier_info,
339 Some(new_actors),
340 Some(initial_mutation),
341 )?;
342 Ok(Self {
343 database_id,
344 job_id,
345 definition,
346 snapshot_backfill_upstream_tables,
347 backfill_epoch,
348 graph_info,
349 barrier_control,
350 status,
351 upstream_lag: GLOBAL_META_METRICS
352 .snapshot_backfill_lag
353 .with_guarded_label_values(&[&format!("{}", job_id)]),
354 })
355 }
356
357 pub(crate) fn is_empty(&self) -> bool {
358 self.barrier_control.is_empty()
359 }
360
361 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
362 self.barrier_control.is_valid_after_worker_err(worker_id)
363 && (!self.status.is_finishing()
364 || InflightFragmentInfo::contains_worker(
365 self.graph_info.fragment_infos(),
366 worker_id,
367 ))
368 }
369
370 pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
371 let progress = match &self.status {
372 CreatingStreamingJobStatus::ConsumingSnapshot {
373 create_mview_tracker,
374 ..
375 } => {
376 if create_mview_tracker.has_pending_finished_jobs() {
377 "Snapshot finished".to_owned()
378 } else {
379 let progress = create_mview_tracker
380 .gen_ddl_progress()
381 .remove(&self.job_id.table_id)
382 .expect("should exist");
383 format!("Snapshot [{}]", progress.progress)
384 }
385 }
386 CreatingStreamingJobStatus::ConsumingLogStore {
387 log_store_progress_tracker,
388 ..
389 } => {
390 format!(
391 "LogStore [{}]",
392 log_store_progress_tracker.gen_ddl_progress()
393 )
394 }
395 CreatingStreamingJobStatus::Finishing(_) => {
396 format!(
397 "Finishing [epoch count: {}]",
398 self.barrier_control.inflight_barrier_count()
399 )
400 }
401 };
402 DdlProgress {
403 id: self.job_id.table_id as u64,
404 statement: self.definition.clone(),
405 progress,
406 }
407 }
408
409 pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
410 if self.status.is_finishing() {
411 None
412 } else {
413 Some(max(
415 self.barrier_control.max_collected_epoch().unwrap_or(0),
416 self.backfill_epoch,
417 ))
418 }
419 }
420
421 fn inject_barrier(
422 database_id: DatabaseId,
423 table_id: TableId,
424 control_stream_manager: &mut ControlStreamManager,
425 barrier_control: &mut CreatingStreamingJobBarrierControl,
426 pre_applied_graph_info: &InflightStreamingJobInfo,
427 applied_graph_info: Option<&InflightStreamingJobInfo>,
428 barrier_info: BarrierInfo,
429 new_actors: Option<StreamJobActorsToCreate>,
430 mutation: Option<Mutation>,
431 ) -> MetaResult<()> {
432 let node_to_collect = control_stream_manager.inject_barrier(
433 database_id,
434 Some(table_id),
435 mutation,
436 &barrier_info,
437 pre_applied_graph_info.fragment_infos(),
438 applied_graph_info
439 .map(|graph_info| graph_info.fragment_infos())
440 .into_iter()
441 .flatten(),
442 new_actors,
443 vec![],
444 vec![],
445 )?;
446 barrier_control.enqueue_epoch(
447 barrier_info.prev_epoch(),
448 node_to_collect,
449 barrier_info.kind.clone(),
450 );
451 Ok(())
452 }
453
454 pub(super) fn on_new_command(
455 &mut self,
456 control_stream_manager: &mut ControlStreamManager,
457 command: Option<&Command>,
458 barrier_info: &BarrierInfo,
459 ) -> MetaResult<()> {
460 let table_id = self.job_id;
461 let start_consume_upstream =
462 if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
463 jobs_to_merge.contains_key(&table_id)
464 } else {
465 false
466 };
467 if start_consume_upstream {
468 info!(
469 table_id = self.job_id.table_id,
470 prev_epoch = barrier_info.prev_epoch(),
471 "start consuming upstream"
472 );
473 }
474 let progress_epoch =
475 if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
476 max(max_collected_epoch, self.backfill_epoch)
477 } else {
478 self.backfill_epoch
479 };
480 self.upstream_lag.set(
481 barrier_info
482 .prev_epoch
483 .value()
484 .0
485 .saturating_sub(progress_epoch) as _,
486 );
487 if start_consume_upstream {
488 self.status.start_consume_upstream(barrier_info);
489 Self::inject_barrier(
490 self.database_id,
491 self.job_id,
492 control_stream_manager,
493 &mut self.barrier_control,
494 &self.graph_info,
495 None,
496 barrier_info.clone(),
497 None,
498 None,
499 )?;
500 } else {
501 for barrier_to_inject in self.status.on_new_upstream_epoch(barrier_info) {
502 Self::inject_barrier(
503 self.database_id,
504 self.job_id,
505 control_stream_manager,
506 &mut self.barrier_control,
507 &self.graph_info,
508 Some(&self.graph_info),
509 barrier_to_inject,
510 None,
511 None,
512 )?;
513 }
514 }
515 Ok(())
516 }
517
518 pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
519 self.status.update_progress(&resp.create_mview_progress);
520 self.barrier_control.collect(resp);
521 self.should_merge_to_upstream().is_some()
522 }
523
524 pub(super) fn should_merge_to_upstream(&self) -> Option<&InflightStreamingJobInfo> {
525 if let CreatingStreamingJobStatus::ConsumingLogStore {
526 log_store_progress_tracker,
527 barriers_to_inject,
528 } = &self.status
529 && barriers_to_inject.is_none()
530 && log_store_progress_tracker.is_finished()
531 {
532 Some(&self.graph_info)
533 } else {
534 None
535 }
536 }
537}
538
539pub(super) enum CompleteJobType {
540 First,
542 Normal,
543 Finished,
545}
546
547impl CreatingStreamingJobControl {
548 pub(super) fn start_completing(
549 &mut self,
550 min_upstream_inflight_epoch: Option<u64>,
551 ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
552 let (finished_at_epoch, epoch_end_bound) = match &self.status {
553 CreatingStreamingJobStatus::Finishing(finish_at_epoch) => {
554 let epoch_end_bound = min_upstream_inflight_epoch
555 .map(|upstream_epoch| {
556 if upstream_epoch < *finish_at_epoch {
557 Excluded(upstream_epoch)
558 } else {
559 Unbounded
560 }
561 })
562 .unwrap_or(Unbounded);
563 (Some(*finish_at_epoch), epoch_end_bound)
564 }
565 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
566 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
567 None,
568 min_upstream_inflight_epoch
569 .map(Excluded)
570 .unwrap_or(Unbounded),
571 ),
572 };
573 self.barrier_control.start_completing(epoch_end_bound).map(
574 |(epoch, resps, is_first_commit)| {
575 let status = if let Some(finish_at_epoch) = finished_at_epoch {
576 assert!(!is_first_commit);
577 if epoch == finish_at_epoch {
578 self.barrier_control.ack_completed(epoch);
579 assert!(self.barrier_control.is_empty());
580 CompleteJobType::Finished
581 } else {
582 CompleteJobType::Normal
583 }
584 } else if is_first_commit {
585 CompleteJobType::First
586 } else {
587 CompleteJobType::Normal
588 };
589 (epoch, resps, status)
590 },
591 )
592 }
593
594 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
595 self.barrier_control.ack_completed(completed_epoch);
596 }
597
598 pub(super) fn is_finished(&self) -> bool {
599 self.barrier_control.is_empty() && self.status.is_finishing()
600 }
601
602 pub fn is_consuming(&self) -> bool {
603 match &self.status {
604 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
605 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
606 CreatingStreamingJobStatus::Finishing(_) => false,
607 }
608 }
609
610 pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
611 self.graph_info.existing_table_ids()
612 }
613}