1mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::LabelGuardedIntGauge;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::stream_plan::AddMutation;
31use risingwave_pb::stream_plan::barrier::PbBarrierKind;
32use risingwave_pb::stream_plan::barrier_mutation::Mutation;
33use risingwave_pb::stream_service::BarrierCompleteResponse;
34use status::CreatingStreamingJobStatus;
35use tracing::{debug, info};
36
37use crate::MetaResult;
38use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
39use crate::barrier::edge_builder::FragmentEdgeBuildResult;
40use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
41use crate::barrier::progress::CreateMviewProgressTracker;
42use crate::barrier::rpc::ControlStreamManager;
43use crate::barrier::{BarrierKind, Command, CreateStreamingJobCommandInfo, TracedEpoch};
44use crate::controller::fragment::InflightFragmentInfo;
45use crate::model::{StreamJobActorsToCreate, StreamJobFragments};
46use crate::rpc::metrics::GLOBAL_META_METRICS;
47use crate::stream::build_actor_connector_splits;
48
49#[derive(Debug)]
50pub(crate) struct CreatingStreamingJobControl {
51 database_id: DatabaseId,
52 pub(super) job_id: TableId,
53 definition: String,
54 pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
55 backfill_epoch: u64,
56
57 graph_info: InflightStreamingJobInfo,
58
59 barrier_control: CreatingStreamingJobBarrierControl,
60 status: CreatingStreamingJobStatus,
61
62 upstream_lag: LabelGuardedIntGauge,
63}
64
65impl CreatingStreamingJobControl {
66 pub(super) fn new(
67 info: &CreateStreamingJobCommandInfo,
68 snapshot_backfill_upstream_tables: HashSet<TableId>,
69 backfill_epoch: u64,
70 version_stat: &HummockVersionStats,
71 control_stream_manager: &mut ControlStreamManager,
72 edges: &mut FragmentEdgeBuildResult,
73 ) -> MetaResult<Self> {
74 let job_id = info.stream_job_fragments.stream_job_id();
75 let database_id = DatabaseId::new(info.streaming_job.database_id());
76 debug!(
77 %job_id,
78 definition = info.definition,
79 "new creating job"
80 );
81 let snapshot_backfill_actors = info.stream_job_fragments.snapshot_backfill_actor_ids();
82 let create_mview_tracker = CreateMviewProgressTracker::recover(
84 [(
85 job_id,
86 (info.definition.clone(), &*info.stream_job_fragments),
87 )],
88 version_stat,
89 );
90 let fragment_infos: HashMap<_, _> = info.stream_job_fragments.new_fragment_info().collect();
91
92 let actors_to_create =
93 edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create());
94
95 let graph_info = InflightStreamingJobInfo {
96 job_id,
97 fragment_infos,
98 };
99
100 let mut barrier_control =
101 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
102
103 let mut prev_epoch_fake_physical_time = 0;
104 let mut pending_non_checkpoint_barriers = vec![];
105
106 let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
107 &mut prev_epoch_fake_physical_time,
108 &mut pending_non_checkpoint_barriers,
109 PbBarrierKind::Checkpoint,
110 );
111
112 let added_actors = info.stream_job_fragments.actor_ids();
113 let actor_splits = info
114 .init_split_assignment
115 .values()
116 .flat_map(build_actor_connector_splits)
117 .collect();
118
119 let initial_mutation = Mutation::Add(AddMutation {
120 actor_dispatchers: Default::default(),
122 added_actors,
123 actor_splits,
124 pause: false,
126 subscriptions_to_add: Default::default(),
127 backfill_nodes_to_pause: Default::default(),
128 });
129
130 control_stream_manager.add_partial_graph(database_id, Some(job_id));
131 Self::inject_barrier(
132 database_id,
133 job_id,
134 control_stream_manager,
135 &mut barrier_control,
136 &graph_info,
137 Some(&graph_info),
138 initial_barrier_info,
139 Some(actors_to_create),
140 Some(initial_mutation),
141 )?;
142
143 assert!(pending_non_checkpoint_barriers.is_empty());
144
145 Ok(Self {
146 database_id,
147 definition: info.definition.clone(),
148 job_id,
149 snapshot_backfill_upstream_tables,
150 barrier_control,
151 backfill_epoch,
152 graph_info,
153 status: CreatingStreamingJobStatus::ConsumingSnapshot {
154 prev_epoch_fake_physical_time,
155 pending_upstream_barriers: vec![],
156 version_stats: version_stat.clone(),
157 create_mview_tracker,
158 snapshot_backfill_actors,
159 backfill_epoch,
160 pending_non_checkpoint_barriers,
161 },
162 upstream_lag: GLOBAL_META_METRICS
163 .snapshot_backfill_lag
164 .with_guarded_label_values(&[&format!("{}", job_id)]),
165 })
166 }
167
168 fn resolve_upstream_log_epochs(
169 snapshot_backfill_upstream_tables: &HashSet<TableId>,
170 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
171 committed_epoch: u64,
172 upstream_curr_epoch: u64,
173 ) -> MetaResult<Vec<BarrierInfo>> {
174 let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
176 let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
177 loop {
178 let (_, checkpoint_epoch) = epochs_iter.next().expect("not reach committed epoch yet");
179 if *checkpoint_epoch < committed_epoch {
180 continue;
181 }
182 assert_eq!(*checkpoint_epoch, committed_epoch);
183 break;
184 }
185 let mut ret = vec![];
186 let mut prev_epoch = committed_epoch;
187 let mut pending_non_checkpoint_barriers = vec![];
188 for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
189 for (i, epoch) in non_checkpoint_epochs
190 .iter()
191 .chain([checkpoint_epoch])
192 .enumerate()
193 {
194 assert!(*epoch > prev_epoch);
195 pending_non_checkpoint_barriers.push(prev_epoch);
196 ret.push(BarrierInfo {
197 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
198 curr_epoch: TracedEpoch::new(Epoch(*epoch)),
199 kind: if i == 0 {
200 BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
201 } else {
202 BarrierKind::Barrier
203 },
204 });
205 prev_epoch = *epoch;
206 }
207 }
208 ret.push(BarrierInfo {
209 prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
210 curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
211 kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
212 });
213 Ok(ret)
214 }
215
216 fn recover_consuming_snapshot(
217 job_id: TableId,
218 definition: &String,
219 snapshot_backfill_upstream_tables: &HashSet<TableId>,
220 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
221 backfill_epoch: u64,
222 committed_epoch: u64,
223 upstream_curr_epoch: u64,
224 stream_job_fragments: StreamJobFragments,
225 version_stat: &HummockVersionStats,
226 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
227 let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
228 let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
229 let mut pending_non_checkpoint_barriers = vec![];
230 let create_mview_tracker = CreateMviewProgressTracker::recover(
231 [(job_id, (definition.clone(), &stream_job_fragments))],
232 version_stat,
233 );
234 let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
235 &mut prev_epoch_fake_physical_time,
236 &mut pending_non_checkpoint_barriers,
237 PbBarrierKind::Initial,
238 );
239 Ok((
240 CreatingStreamingJobStatus::ConsumingSnapshot {
241 prev_epoch_fake_physical_time,
242 pending_upstream_barriers: Self::resolve_upstream_log_epochs(
243 snapshot_backfill_upstream_tables,
244 upstream_table_log_epochs,
245 backfill_epoch,
246 upstream_curr_epoch,
247 )?,
248 version_stats: version_stat.clone(),
249 create_mview_tracker,
250 snapshot_backfill_actors,
251 backfill_epoch,
252 pending_non_checkpoint_barriers,
253 },
254 barrier_info,
255 ))
256 }
257
258 fn recover_consuming_log_store(
259 snapshot_backfill_upstream_tables: &HashSet<TableId>,
260 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
261 committed_epoch: u64,
262 upstream_curr_epoch: u64,
263 stream_job_fragments: StreamJobFragments,
264 ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
265 let snapshot_backfill_actors = stream_job_fragments.snapshot_backfill_actor_ids();
266 let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
267 snapshot_backfill_upstream_tables,
268 upstream_table_log_epochs,
269 committed_epoch,
270 upstream_curr_epoch,
271 )?;
272 let mut first_barrier = barriers_to_inject.remove(0);
273 assert!(first_barrier.kind.is_checkpoint());
274 first_barrier.kind = BarrierKind::Initial;
275 Ok((
276 CreatingStreamingJobStatus::ConsumingLogStore {
277 log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
278 snapshot_backfill_actors.into_iter(),
279 barriers_to_inject
280 .last()
281 .map(|info| info.prev_epoch() - committed_epoch)
282 .unwrap_or(0),
283 ),
284 barriers_to_inject: Some(barriers_to_inject),
285 },
286 first_barrier,
287 ))
288 }
289
290 #[expect(clippy::too_many_arguments)]
291 pub(crate) fn recover(
292 database_id: DatabaseId,
293 job_id: TableId,
294 definition: String,
295 snapshot_backfill_upstream_tables: HashSet<TableId>,
296 upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
297 backfill_epoch: u64,
298 committed_epoch: u64,
299 upstream_curr_epoch: u64,
300 graph_info: InflightStreamingJobInfo,
301 stream_job_fragments: StreamJobFragments,
302 version_stat: &HummockVersionStats,
303 new_actors: StreamJobActorsToCreate,
304 initial_mutation: Mutation,
305 control_stream_manager: &mut ControlStreamManager,
306 ) -> MetaResult<Self> {
307 debug!(
308 %job_id,
309 definition,
310 "recovered creating job"
311 );
312 let mut barrier_control =
313 CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
314
315 let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
316 Self::recover_consuming_snapshot(
317 job_id,
318 &definition,
319 &snapshot_backfill_upstream_tables,
320 upstream_table_log_epochs,
321 backfill_epoch,
322 committed_epoch,
323 upstream_curr_epoch,
324 stream_job_fragments,
325 version_stat,
326 )?
327 } else {
328 Self::recover_consuming_log_store(
329 &snapshot_backfill_upstream_tables,
330 upstream_table_log_epochs,
331 committed_epoch,
332 upstream_curr_epoch,
333 stream_job_fragments,
334 )?
335 };
336 control_stream_manager.add_partial_graph(database_id, Some(job_id));
337 Self::inject_barrier(
338 database_id,
339 job_id,
340 control_stream_manager,
341 &mut barrier_control,
342 &graph_info,
343 Some(&graph_info),
344 first_barrier_info,
345 Some(new_actors),
346 Some(initial_mutation),
347 )?;
348 Ok(Self {
349 database_id,
350 job_id,
351 definition,
352 snapshot_backfill_upstream_tables,
353 backfill_epoch,
354 graph_info,
355 barrier_control,
356 status,
357 upstream_lag: GLOBAL_META_METRICS
358 .snapshot_backfill_lag
359 .with_guarded_label_values(&[&format!("{}", job_id)]),
360 })
361 }
362
363 pub(crate) fn is_empty(&self) -> bool {
364 self.barrier_control.is_empty()
365 }
366
367 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
368 self.barrier_control.is_valid_after_worker_err(worker_id)
369 && (!self.status.is_finishing()
370 || InflightFragmentInfo::contains_worker(
371 self.graph_info.fragment_infos(),
372 worker_id,
373 ))
374 }
375
376 pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
377 let progress = match &self.status {
378 CreatingStreamingJobStatus::ConsumingSnapshot {
379 create_mview_tracker,
380 ..
381 } => {
382 if create_mview_tracker.has_pending_finished_jobs() {
383 "Snapshot finished".to_owned()
384 } else {
385 let progress = create_mview_tracker
386 .gen_ddl_progress()
387 .remove(&self.job_id.table_id)
388 .expect("should exist");
389 format!("Snapshot [{}]", progress.progress)
390 }
391 }
392 CreatingStreamingJobStatus::ConsumingLogStore {
393 log_store_progress_tracker,
394 ..
395 } => {
396 format!(
397 "LogStore [{}]",
398 log_store_progress_tracker.gen_ddl_progress()
399 )
400 }
401 CreatingStreamingJobStatus::Finishing(_) => {
402 format!(
403 "Finishing [epoch count: {}]",
404 self.barrier_control.inflight_barrier_count()
405 )
406 }
407 };
408 DdlProgress {
409 id: self.job_id.table_id as u64,
410 statement: self.definition.clone(),
411 progress,
412 }
413 }
414
415 pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
416 if self.status.is_finishing() {
417 None
418 } else {
419 Some(max(
421 self.barrier_control.max_collected_epoch().unwrap_or(0),
422 self.backfill_epoch,
423 ))
424 }
425 }
426
427 fn inject_barrier(
428 database_id: DatabaseId,
429 table_id: TableId,
430 control_stream_manager: &mut ControlStreamManager,
431 barrier_control: &mut CreatingStreamingJobBarrierControl,
432 pre_applied_graph_info: &InflightStreamingJobInfo,
433 applied_graph_info: Option<&InflightStreamingJobInfo>,
434 barrier_info: BarrierInfo,
435 new_actors: Option<StreamJobActorsToCreate>,
436 mutation: Option<Mutation>,
437 ) -> MetaResult<()> {
438 let node_to_collect = control_stream_manager.inject_barrier(
439 database_id,
440 Some(table_id),
441 mutation,
442 &barrier_info,
443 pre_applied_graph_info.fragment_infos(),
444 applied_graph_info
445 .map(|graph_info| graph_info.fragment_infos())
446 .into_iter()
447 .flatten(),
448 new_actors,
449 vec![],
450 vec![],
451 )?;
452 barrier_control.enqueue_epoch(
453 barrier_info.prev_epoch(),
454 node_to_collect,
455 barrier_info.kind.clone(),
456 );
457 Ok(())
458 }
459
460 pub(super) fn on_new_command(
461 &mut self,
462 control_stream_manager: &mut ControlStreamManager,
463 command: Option<&Command>,
464 barrier_info: &BarrierInfo,
465 ) -> MetaResult<()> {
466 let table_id = self.job_id;
467 let start_consume_upstream =
468 if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
469 jobs_to_merge.contains_key(&table_id)
470 } else {
471 false
472 };
473 if start_consume_upstream {
474 info!(
475 table_id = self.job_id.table_id,
476 prev_epoch = barrier_info.prev_epoch(),
477 "start consuming upstream"
478 );
479 }
480 let progress_epoch =
481 if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
482 max(max_collected_epoch, self.backfill_epoch)
483 } else {
484 self.backfill_epoch
485 };
486 self.upstream_lag.set(
487 barrier_info
488 .prev_epoch
489 .value()
490 .0
491 .saturating_sub(progress_epoch) as _,
492 );
493 if start_consume_upstream {
494 self.status.start_consume_upstream(barrier_info);
495 Self::inject_barrier(
496 self.database_id,
497 self.job_id,
498 control_stream_manager,
499 &mut self.barrier_control,
500 &self.graph_info,
501 None,
502 barrier_info.clone(),
503 None,
504 None,
505 )?;
506 } else {
507 for barrier_to_inject in self.status.on_new_upstream_epoch(barrier_info) {
508 Self::inject_barrier(
509 self.database_id,
510 self.job_id,
511 control_stream_manager,
512 &mut self.barrier_control,
513 &self.graph_info,
514 Some(&self.graph_info),
515 barrier_to_inject,
516 None,
517 None,
518 )?;
519 }
520 }
521 Ok(())
522 }
523
524 pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
525 self.status.update_progress(&resp.create_mview_progress);
526 self.barrier_control.collect(resp);
527 self.should_merge_to_upstream().is_some()
528 }
529
530 pub(super) fn should_merge_to_upstream(&self) -> Option<&InflightStreamingJobInfo> {
531 if let CreatingStreamingJobStatus::ConsumingLogStore {
532 log_store_progress_tracker,
533 barriers_to_inject,
534 } = &self.status
535 && barriers_to_inject.is_none()
536 && log_store_progress_tracker.is_finished()
537 {
538 Some(&self.graph_info)
539 } else {
540 None
541 }
542 }
543}
544
545pub(super) enum CompleteJobType {
546 First,
548 Normal,
549 Finished,
551}
552
553impl CreatingStreamingJobControl {
554 pub(super) fn start_completing(
555 &mut self,
556 min_upstream_inflight_epoch: Option<u64>,
557 ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
558 let (finished_at_epoch, epoch_end_bound) = match &self.status {
559 CreatingStreamingJobStatus::Finishing(finish_at_epoch) => {
560 let epoch_end_bound = min_upstream_inflight_epoch
561 .map(|upstream_epoch| {
562 if upstream_epoch < *finish_at_epoch {
563 Excluded(upstream_epoch)
564 } else {
565 Unbounded
566 }
567 })
568 .unwrap_or(Unbounded);
569 (Some(*finish_at_epoch), epoch_end_bound)
570 }
571 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
572 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
573 None,
574 min_upstream_inflight_epoch
575 .map(Excluded)
576 .unwrap_or(Unbounded),
577 ),
578 };
579 self.barrier_control.start_completing(epoch_end_bound).map(
580 |(epoch, resps, is_first_commit)| {
581 let status = if let Some(finish_at_epoch) = finished_at_epoch {
582 assert!(!is_first_commit);
583 if epoch == finish_at_epoch {
584 self.barrier_control.ack_completed(epoch);
585 assert!(self.barrier_control.is_empty());
586 CompleteJobType::Finished
587 } else {
588 CompleteJobType::Normal
589 }
590 } else if is_first_commit {
591 CompleteJobType::First
592 } else {
593 CompleteJobType::Normal
594 };
595 (epoch, resps, status)
596 },
597 )
598 }
599
600 pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
601 self.barrier_control.ack_completed(completed_epoch);
602 }
603
604 pub(super) fn is_finished(&self) -> bool {
605 self.barrier_control.is_empty() && self.status.is_finishing()
606 }
607
608 pub fn is_consuming(&self) -> bool {
609 match &self.status {
610 CreatingStreamingJobStatus::ConsumingSnapshot { .. }
611 | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
612 CreatingStreamingJobStatus::Finishing(_) => false,
613 }
614 }
615
616 pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
617 self.graph_info.existing_table_ids()
618 }
619}