risingwave_meta/barrier/checkpoint/creating_job/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod barrier_control;
16mod status;
17
18use std::cmp::max;
19use std::collections::{HashMap, HashSet};
20use std::mem::take;
21use std::ops::Bound::{Excluded, Unbounded};
22
23use barrier_control::CreatingStreamingJobBarrierControl;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::LabelGuardedIntGauge;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_connector::source::cdc::build_pb_actor_cdc_table_snapshot_splits_with_generation;
28use risingwave_meta_model::{CreateType, WorkerId};
29use risingwave_pb::ddl_service::DdlProgress;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::stream_plan::AddMutation;
32use risingwave_pb::stream_plan::barrier::PbBarrierKind;
33use risingwave_pb::stream_plan::barrier_mutation::Mutation;
34use risingwave_pb::stream_service::BarrierCompleteResponse;
35use status::CreatingStreamingJobStatus;
36use tracing::{debug, info};
37
38use crate::MetaResult;
39use crate::barrier::backfill_order_control::get_nodes_with_backfill_dependencies;
40use crate::barrier::checkpoint::creating_job::status::CreateMviewLogStoreProgressTracker;
41use crate::barrier::edge_builder::FragmentEdgeBuildResult;
42use crate::barrier::info::{BarrierInfo, InflightStreamingJobInfo};
43use crate::barrier::progress::CreateMviewProgressTracker;
44use crate::barrier::rpc::ControlStreamManager;
45use crate::barrier::{
46    BackfillOrderState, BarrierKind, Command, CreateStreamingJobCommandInfo, TracedEpoch,
47};
48use crate::controller::fragment::InflightFragmentInfo;
49use crate::model::StreamJobActorsToCreate;
50use crate::rpc::metrics::GLOBAL_META_METRICS;
51use crate::stream::build_actor_connector_splits;
52
53#[derive(Debug)]
54pub(crate) struct CreatingStreamingJobControl {
55    database_id: DatabaseId,
56    pub(super) job_id: TableId,
57    definition: String,
58    create_type: CreateType,
59    pub(super) snapshot_backfill_upstream_tables: HashSet<TableId>,
60    backfill_epoch: u64,
61
62    job_info: InflightStreamingJobInfo,
63
64    barrier_control: CreatingStreamingJobBarrierControl,
65    status: CreatingStreamingJobStatus,
66
67    upstream_lag: LabelGuardedIntGauge,
68}
69
70impl CreatingStreamingJobControl {
71    pub(super) fn new(
72        info: &CreateStreamingJobCommandInfo,
73        snapshot_backfill_upstream_tables: HashSet<TableId>,
74        backfill_epoch: u64,
75        version_stat: &HummockVersionStats,
76        control_stream_manager: &mut ControlStreamManager,
77        edges: &mut FragmentEdgeBuildResult,
78    ) -> MetaResult<Self> {
79        let job_id = info.stream_job_fragments.stream_job_id();
80        let database_id = DatabaseId::new(info.streaming_job.database_id());
81        debug!(
82            %job_id,
83            definition = info.definition,
84            "new creating job"
85        );
86        let job_info = InflightStreamingJobInfo {
87            job_id,
88            fragment_infos: info
89                .stream_job_fragments
90                .new_fragment_info(&info.init_split_assignment)
91                .collect(),
92            subscribers: Default::default(), // no subscriber for newly create job
93        };
94        let snapshot_backfill_actors = job_info.snapshot_backfill_actor_ids().collect();
95        let backfill_nodes_to_pause =
96            get_nodes_with_backfill_dependencies(&info.fragment_backfill_ordering)
97                .into_iter()
98                .collect();
99        let backfill_order_state = BackfillOrderState::new(
100            info.fragment_backfill_ordering.clone(),
101            &info.stream_job_fragments,
102            info.locality_fragment_state_table_mapping.clone(),
103        );
104        let create_mview_tracker = CreateMviewProgressTracker::recover(
105            [(
106                job_id,
107                (info.definition.clone(), &job_info, backfill_order_state),
108            )],
109            version_stat,
110        );
111
112        let actors_to_create =
113            edges.collect_actors_to_create(info.stream_job_fragments.actors_to_create().map(
114                |(fragment_id, node, actors)| {
115                    (
116                        fragment_id,
117                        node,
118                        actors,
119                        [], // no subscribers for newly creating job
120                    )
121                },
122            ));
123
124        let mut barrier_control =
125            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, false);
126
127        let mut prev_epoch_fake_physical_time = 0;
128        let mut pending_non_checkpoint_barriers = vec![];
129
130        let initial_barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
131            &mut prev_epoch_fake_physical_time,
132            &mut pending_non_checkpoint_barriers,
133            PbBarrierKind::Checkpoint,
134        );
135
136        let added_actors = info.stream_job_fragments.actor_ids();
137        let actor_splits = info
138            .init_split_assignment
139            .values()
140            .flat_map(build_actor_connector_splits)
141            .collect();
142
143        let initial_mutation = Mutation::Add(AddMutation {
144            // for mutation of snapshot backfill job, we won't include changes to dispatchers of upstream actors.
145            actor_dispatchers: Default::default(),
146            added_actors,
147            actor_splits,
148            // we assume that when handling snapshot backfill, the cluster must not be paused
149            pause: false,
150            subscriptions_to_add: Default::default(),
151            backfill_nodes_to_pause,
152            actor_cdc_table_snapshot_splits:
153                build_pb_actor_cdc_table_snapshot_splits_with_generation(
154                    info.cdc_table_snapshot_split_assignment.clone(),
155                )
156                .into(),
157            new_upstream_sinks: Default::default(),
158        });
159
160        control_stream_manager.add_partial_graph(database_id, Some(job_id));
161        Self::inject_barrier(
162            database_id,
163            job_id,
164            control_stream_manager,
165            &mut barrier_control,
166            &job_info,
167            Some(&job_info),
168            initial_barrier_info,
169            Some(actors_to_create),
170            Some(initial_mutation),
171        )?;
172
173        assert!(pending_non_checkpoint_barriers.is_empty());
174
175        Ok(Self {
176            database_id,
177            definition: info.definition.clone(),
178            create_type: info.create_type.into(),
179            job_id,
180            snapshot_backfill_upstream_tables,
181            barrier_control,
182            backfill_epoch,
183            job_info,
184            status: CreatingStreamingJobStatus::ConsumingSnapshot {
185                prev_epoch_fake_physical_time,
186                pending_upstream_barriers: vec![],
187                version_stats: version_stat.clone(),
188                create_mview_tracker,
189                snapshot_backfill_actors,
190                backfill_epoch,
191                pending_non_checkpoint_barriers,
192            },
193            upstream_lag: GLOBAL_META_METRICS
194                .snapshot_backfill_lag
195                .with_guarded_label_values(&[&format!("{}", job_id)]),
196        })
197    }
198
199    fn resolve_upstream_log_epochs(
200        snapshot_backfill_upstream_tables: &HashSet<TableId>,
201        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
202        committed_epoch: u64,
203        upstream_curr_epoch: u64,
204    ) -> MetaResult<Vec<BarrierInfo>> {
205        // TODO: add sanity check
206        let table_id = snapshot_backfill_upstream_tables.iter().next().unwrap();
207        let mut epochs_iter = upstream_table_log_epochs[table_id].iter();
208        loop {
209            let (_, checkpoint_epoch) = epochs_iter.next().expect("not reach committed epoch yet");
210            if *checkpoint_epoch < committed_epoch {
211                continue;
212            }
213            assert_eq!(*checkpoint_epoch, committed_epoch);
214            break;
215        }
216        let mut ret = vec![];
217        let mut prev_epoch = committed_epoch;
218        let mut pending_non_checkpoint_barriers = vec![];
219        for (non_checkpoint_epochs, checkpoint_epoch) in epochs_iter {
220            for (i, epoch) in non_checkpoint_epochs
221                .iter()
222                .chain([checkpoint_epoch])
223                .enumerate()
224            {
225                assert!(*epoch > prev_epoch);
226                pending_non_checkpoint_barriers.push(prev_epoch);
227                ret.push(BarrierInfo {
228                    prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
229                    curr_epoch: TracedEpoch::new(Epoch(*epoch)),
230                    kind: if i == 0 {
231                        BarrierKind::Checkpoint(take(&mut pending_non_checkpoint_barriers))
232                    } else {
233                        BarrierKind::Barrier
234                    },
235                });
236                prev_epoch = *epoch;
237            }
238        }
239        ret.push(BarrierInfo {
240            prev_epoch: TracedEpoch::new(Epoch(prev_epoch)),
241            curr_epoch: TracedEpoch::new(Epoch(upstream_curr_epoch)),
242            kind: BarrierKind::Checkpoint(pending_non_checkpoint_barriers),
243        });
244        Ok(ret)
245    }
246
247    fn recover_consuming_snapshot(
248        job_id: TableId,
249        definition: &String,
250        snapshot_backfill_upstream_tables: &HashSet<TableId>,
251        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
252        backfill_epoch: u64,
253        committed_epoch: u64,
254        upstream_curr_epoch: u64,
255        job_info: &InflightStreamingJobInfo,
256        version_stat: &HummockVersionStats,
257    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
258        let mut prev_epoch_fake_physical_time = Epoch(committed_epoch).physical_time();
259        let mut pending_non_checkpoint_barriers = vec![];
260        let create_mview_tracker = CreateMviewProgressTracker::recover(
261            [(job_id, (definition.clone(), job_info, Default::default()))],
262            version_stat,
263        );
264        let barrier_info = CreatingStreamingJobStatus::new_fake_barrier(
265            &mut prev_epoch_fake_physical_time,
266            &mut pending_non_checkpoint_barriers,
267            PbBarrierKind::Initial,
268        );
269        Ok((
270            CreatingStreamingJobStatus::ConsumingSnapshot {
271                prev_epoch_fake_physical_time,
272                pending_upstream_barriers: Self::resolve_upstream_log_epochs(
273                    snapshot_backfill_upstream_tables,
274                    upstream_table_log_epochs,
275                    backfill_epoch,
276                    upstream_curr_epoch,
277                )?,
278                version_stats: version_stat.clone(),
279                create_mview_tracker,
280                snapshot_backfill_actors: job_info.snapshot_backfill_actor_ids().collect(),
281                backfill_epoch,
282                pending_non_checkpoint_barriers,
283            },
284            barrier_info,
285        ))
286    }
287
288    fn recover_consuming_log_store(
289        snapshot_backfill_upstream_tables: &HashSet<TableId>,
290        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
291        committed_epoch: u64,
292        upstream_curr_epoch: u64,
293        job_info: &InflightStreamingJobInfo,
294    ) -> MetaResult<(CreatingStreamingJobStatus, BarrierInfo)> {
295        let mut barriers_to_inject = Self::resolve_upstream_log_epochs(
296            snapshot_backfill_upstream_tables,
297            upstream_table_log_epochs,
298            committed_epoch,
299            upstream_curr_epoch,
300        )?;
301        let mut first_barrier = barriers_to_inject.remove(0);
302        assert!(first_barrier.kind.is_checkpoint());
303        first_barrier.kind = BarrierKind::Initial;
304        Ok((
305            CreatingStreamingJobStatus::ConsumingLogStore {
306                log_store_progress_tracker: CreateMviewLogStoreProgressTracker::new(
307                    job_info.snapshot_backfill_actor_ids(),
308                    barriers_to_inject
309                        .last()
310                        .map(|info| info.prev_epoch() - committed_epoch)
311                        .unwrap_or(0),
312                ),
313                barriers_to_inject: Some(barriers_to_inject),
314            },
315            first_barrier,
316        ))
317    }
318
319    #[expect(clippy::too_many_arguments)]
320    pub(crate) fn recover(
321        database_id: DatabaseId,
322        job_id: TableId,
323        definition: String,
324        snapshot_backfill_upstream_tables: HashSet<TableId>,
325        upstream_table_log_epochs: &HashMap<TableId, Vec<(Vec<u64>, u64)>>,
326        backfill_epoch: u64,
327        committed_epoch: u64,
328        upstream_curr_epoch: u64,
329        graph_info: InflightStreamingJobInfo,
330        version_stat: &HummockVersionStats,
331        new_actors: StreamJobActorsToCreate,
332        initial_mutation: Mutation,
333        control_stream_manager: &mut ControlStreamManager,
334    ) -> MetaResult<Self> {
335        debug!(
336            %job_id,
337            definition,
338            "recovered creating job"
339        );
340        let mut barrier_control =
341            CreatingStreamingJobBarrierControl::new(job_id, backfill_epoch, true);
342
343        let (status, first_barrier_info) = if committed_epoch < backfill_epoch {
344            Self::recover_consuming_snapshot(
345                job_id,
346                &definition,
347                &snapshot_backfill_upstream_tables,
348                upstream_table_log_epochs,
349                backfill_epoch,
350                committed_epoch,
351                upstream_curr_epoch,
352                &graph_info,
353                version_stat,
354            )?
355        } else {
356            Self::recover_consuming_log_store(
357                &snapshot_backfill_upstream_tables,
358                upstream_table_log_epochs,
359                committed_epoch,
360                upstream_curr_epoch,
361                &graph_info,
362            )?
363        };
364        control_stream_manager.add_partial_graph(database_id, Some(job_id));
365        Self::inject_barrier(
366            database_id,
367            job_id,
368            control_stream_manager,
369            &mut barrier_control,
370            &graph_info,
371            Some(&graph_info),
372            first_barrier_info,
373            Some(new_actors),
374            Some(initial_mutation),
375        )?;
376        Ok(Self {
377            database_id,
378            job_id,
379            definition,
380            create_type: CreateType::Background,
381            snapshot_backfill_upstream_tables,
382            backfill_epoch,
383            job_info: graph_info,
384            barrier_control,
385            status,
386            upstream_lag: GLOBAL_META_METRICS
387                .snapshot_backfill_lag
388                .with_guarded_label_values(&[&format!("{}", job_id)]),
389        })
390    }
391
392    pub(crate) fn is_empty(&self) -> bool {
393        self.barrier_control.is_empty()
394    }
395
396    pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
397        self.barrier_control.is_valid_after_worker_err(worker_id)
398            && (!self.status.is_finishing()
399                || InflightFragmentInfo::contains_worker(self.job_info.fragment_infos(), worker_id))
400    }
401
402    pub(crate) fn gen_ddl_progress(&self) -> DdlProgress {
403        let progress = match &self.status {
404            CreatingStreamingJobStatus::ConsumingSnapshot {
405                create_mview_tracker,
406                ..
407            } => {
408                if create_mview_tracker.has_pending_finished_jobs() {
409                    "Snapshot finished".to_owned()
410                } else {
411                    let progress = create_mview_tracker
412                        .gen_ddl_progress()
413                        .remove(&self.job_id.table_id)
414                        .expect("should exist");
415                    format!("Snapshot [{}]", progress.progress)
416                }
417            }
418            CreatingStreamingJobStatus::ConsumingLogStore {
419                log_store_progress_tracker,
420                ..
421            } => {
422                format!(
423                    "LogStore [{}]",
424                    log_store_progress_tracker.gen_ddl_progress()
425                )
426            }
427            CreatingStreamingJobStatus::Finishing(_) => {
428                format!(
429                    "Finishing [epoch count: {}]",
430                    self.barrier_control.inflight_barrier_count()
431                )
432            }
433        };
434        DdlProgress {
435            id: self.job_id.table_id as u64,
436            statement: self.definition.clone(),
437            create_type: self.create_type.as_str().to_owned(),
438            progress,
439        }
440    }
441
442    pub(super) fn pinned_upstream_log_epoch(&self) -> Option<u64> {
443        if self.status.is_finishing() {
444            None
445        } else {
446            // TODO: when supporting recoverable snapshot backfill, we should use the max epoch that has committed
447            Some(max(
448                self.barrier_control.max_collected_epoch().unwrap_or(0),
449                self.backfill_epoch,
450            ))
451        }
452    }
453
454    fn inject_barrier(
455        database_id: DatabaseId,
456        table_id: TableId,
457        control_stream_manager: &mut ControlStreamManager,
458        barrier_control: &mut CreatingStreamingJobBarrierControl,
459        pre_applied_graph_info: &InflightStreamingJobInfo,
460        applied_graph_info: Option<&InflightStreamingJobInfo>,
461        barrier_info: BarrierInfo,
462        new_actors: Option<StreamJobActorsToCreate>,
463        mutation: Option<Mutation>,
464    ) -> MetaResult<()> {
465        let node_to_collect = control_stream_manager.inject_barrier(
466            database_id,
467            Some(table_id),
468            mutation,
469            &barrier_info,
470            pre_applied_graph_info.fragment_infos(),
471            applied_graph_info
472                .map(|graph_info| graph_info.fragment_infos())
473                .into_iter()
474                .flatten(),
475            new_actors,
476        )?;
477        barrier_control.enqueue_epoch(
478            barrier_info.prev_epoch(),
479            node_to_collect,
480            barrier_info.kind.clone(),
481        );
482        Ok(())
483    }
484
485    pub(super) fn on_new_command(
486        &mut self,
487        control_stream_manager: &mut ControlStreamManager,
488        command: Option<&Command>,
489        barrier_info: &BarrierInfo,
490    ) -> MetaResult<()> {
491        let table_id = self.job_id;
492        let start_consume_upstream =
493            if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
494                jobs_to_merge.contains_key(&table_id)
495            } else {
496                false
497            };
498        if start_consume_upstream {
499            info!(
500                table_id = self.job_id.table_id,
501                prev_epoch = barrier_info.prev_epoch(),
502                "start consuming upstream"
503            );
504        }
505        let progress_epoch =
506            if let Some(max_collected_epoch) = self.barrier_control.max_collected_epoch() {
507                max(max_collected_epoch, self.backfill_epoch)
508            } else {
509                self.backfill_epoch
510            };
511        self.upstream_lag.set(
512            barrier_info
513                .prev_epoch
514                .value()
515                .0
516                .saturating_sub(progress_epoch) as _,
517        );
518        if start_consume_upstream {
519            self.status.start_consume_upstream(barrier_info);
520            Self::inject_barrier(
521                self.database_id,
522                self.job_id,
523                control_stream_manager,
524                &mut self.barrier_control,
525                &self.job_info,
526                None,
527                barrier_info.clone(),
528                None,
529                None,
530            )?;
531        } else {
532            for (barrier_to_inject, mutation) in self.status.on_new_upstream_epoch(barrier_info) {
533                Self::inject_barrier(
534                    self.database_id,
535                    self.job_id,
536                    control_stream_manager,
537                    &mut self.barrier_control,
538                    &self.job_info,
539                    Some(&self.job_info),
540                    barrier_to_inject,
541                    None,
542                    mutation,
543                )?;
544            }
545        }
546        Ok(())
547    }
548
549    pub(crate) fn collect(&mut self, resp: BarrierCompleteResponse) -> bool {
550        self.status.update_progress(&resp.create_mview_progress);
551        self.barrier_control.collect(resp);
552        self.should_merge_to_upstream().is_some()
553    }
554
555    pub(super) fn should_merge_to_upstream(&self) -> Option<&InflightStreamingJobInfo> {
556        if let CreatingStreamingJobStatus::ConsumingLogStore {
557            log_store_progress_tracker,
558            barriers_to_inject,
559        } = &self.status
560            && barriers_to_inject.is_none()
561            && log_store_progress_tracker.is_finished()
562        {
563            Some(&self.job_info)
564        } else {
565            None
566        }
567    }
568}
569
570pub(super) enum CompleteJobType {
571    /// The first barrier
572    First,
573    Normal,
574    /// The last barrier to complete
575    Finished,
576}
577
578impl CreatingStreamingJobControl {
579    pub(super) fn start_completing(
580        &mut self,
581        min_upstream_inflight_epoch: Option<u64>,
582    ) -> Option<(u64, Vec<BarrierCompleteResponse>, CompleteJobType)> {
583        let (finished_at_epoch, epoch_end_bound) = match &self.status {
584            CreatingStreamingJobStatus::Finishing(finish_at_epoch) => {
585                let epoch_end_bound = min_upstream_inflight_epoch
586                    .map(|upstream_epoch| {
587                        if upstream_epoch < *finish_at_epoch {
588                            Excluded(upstream_epoch)
589                        } else {
590                            Unbounded
591                        }
592                    })
593                    .unwrap_or(Unbounded);
594                (Some(*finish_at_epoch), epoch_end_bound)
595            }
596            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
597            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => (
598                None,
599                min_upstream_inflight_epoch
600                    .map(Excluded)
601                    .unwrap_or(Unbounded),
602            ),
603        };
604        self.barrier_control.start_completing(epoch_end_bound).map(
605            |(epoch, resps, is_first_commit)| {
606                let status = if let Some(finish_at_epoch) = finished_at_epoch {
607                    assert!(!is_first_commit);
608                    if epoch == finish_at_epoch {
609                        self.barrier_control.ack_completed(epoch);
610                        assert!(self.barrier_control.is_empty());
611                        CompleteJobType::Finished
612                    } else {
613                        CompleteJobType::Normal
614                    }
615                } else if is_first_commit {
616                    CompleteJobType::First
617                } else {
618                    CompleteJobType::Normal
619                };
620                (epoch, resps, status)
621            },
622        )
623    }
624
625    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
626        self.barrier_control.ack_completed(completed_epoch);
627    }
628
629    pub(super) fn is_finished(&self) -> bool {
630        self.barrier_control.is_empty() && self.status.is_finishing()
631    }
632
633    pub fn is_consuming(&self) -> bool {
634        match &self.status {
635            CreatingStreamingJobStatus::ConsumingSnapshot { .. }
636            | CreatingStreamingJobStatus::ConsumingLogStore { .. } => true,
637            CreatingStreamingJobStatus::Finishing(_) => false,
638        }
639    }
640
641    pub fn state_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
642        self.job_info.existing_table_ids()
643    }
644
645    pub fn graph_info(&self) -> &InflightStreamingJobInfo {
646        &self.job_info
647    }
648}