risingwave_meta/barrier/context/
context_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::Context;
19use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
20use risingwave_common::id::JobId;
21use risingwave_meta_model::ActorId;
22use risingwave_pb::common::WorkerNode;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::id::SourceId;
25use risingwave_pb::stream_service::barrier_complete_response::{
26    PbListFinishedSource, PbLoadFinishedSource,
27};
28use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
29use risingwave_rpc_client::StreamingControlHandle;
30
31use crate::MetaResult;
32use crate::barrier::cdc_progress::CdcTableBackfillTracker;
33use crate::barrier::command::CommandContext;
34use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
35use crate::barrier::progress::TrackingJob;
36use crate::barrier::schedule::MarkReadyOptions;
37use crate::barrier::{
38    BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
39    CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
40    Scheduled,
41};
42use crate::hummock::CommitEpochInfo;
43use crate::manager::LocalNotification;
44use crate::model::FragmentDownstreamRelation;
45use crate::stream::{SourceChange, SplitState};
46
47impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
48    #[await_tree::instrument]
49    async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
50        self.hummock_manager.commit_epoch(commit_info).await?;
51        Ok(self.hummock_manager.get_version_stats().await)
52    }
53
54    #[await_tree::instrument("next_scheduled_barrier")]
55    async fn next_scheduled(&self) -> Scheduled {
56        self.scheduled_barriers.next_scheduled().await
57    }
58
59    fn abort_and_mark_blocked(
60        &self,
61        database_id: Option<DatabaseId>,
62        recovery_reason: RecoveryReason,
63    ) {
64        if database_id.is_none() {
65            self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
66        }
67
68        // Mark blocked and abort buffered schedules, they might be dirty already.
69        self.scheduled_barriers
70            .abort_and_mark_blocked(database_id, "cluster is under recovering");
71    }
72
73    fn mark_ready(&self, options: MarkReadyOptions) {
74        let is_global = matches!(&options, MarkReadyOptions::Global { .. });
75        self.scheduled_barriers.mark_ready(options);
76        if is_global {
77            self.set_status(BarrierManagerStatus::Running);
78        }
79    }
80
81    #[await_tree::instrument("post_collect_command({command})")]
82    async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
83        command.post_collect(self).await
84    }
85
86    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
87        self.metadata_manager
88            .notify_finish_failed(database_id, err)
89            .await
90    }
91
92    #[await_tree::instrument("finish_creating_job({job})")]
93    async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
94        let job_id = job.job_id();
95        job.finish(&self.metadata_manager, &self.source_manager)
96            .await?;
97        self.env
98            .notification_manager()
99            .notify_local_subscribers(LocalNotification::StreamingJobBackfillFinished(job_id));
100        Ok(())
101    }
102
103    #[await_tree::instrument("finish_cdc_table_backfill({job})")]
104    async fn finish_cdc_table_backfill(&self, job: JobId) -> MetaResult<()> {
105        CdcTableBackfillTracker::mark_complete_job(&self.env.meta_store().conn, job).await
106    }
107
108    #[await_tree::instrument("new_control_stream({})", node.id)]
109    async fn new_control_stream(
110        &self,
111        node: &WorkerNode,
112        init_request: &PbInitRequest,
113    ) -> MetaResult<StreamingControlHandle> {
114        self.new_control_stream_impl(node, init_request).await
115    }
116
117    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
118        self.reload_runtime_info_impl().await
119    }
120
121    async fn reload_database_runtime_info(
122        &self,
123        database_id: DatabaseId,
124    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
125        self.reload_database_runtime_info_impl(database_id).await
126    }
127
128    async fn handle_list_finished_source_ids(
129        &self,
130        list_finished: Vec<PbListFinishedSource>,
131    ) -> MetaResult<()> {
132        let mut list_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
133
134        for list_finished in list_finished {
135            let table_id = list_finished.table_id;
136            let associated_source_id = list_finished.associated_source_id;
137            list_finished_info
138                .entry((table_id, associated_source_id))
139                .or_default()
140                .insert(list_finished.reporter_actor_id);
141        }
142
143        for ((table_id, associated_source_id), actors) in list_finished_info {
144            let allow_yield = self
145                .refresh_manager
146                .mark_list_stage_finished(table_id, &actors)?;
147
148            if !allow_yield {
149                continue;
150            }
151
152            // Find the database ID for this table
153            let database_id = self
154                .metadata_manager
155                .catalog_controller
156                .get_object_database_id(associated_source_id)
157                .await
158                .context("Failed to get database id for table")?;
159
160            // Create ListFinish command
161            let list_finish_command = Command::ListFinish {
162                table_id,
163                associated_source_id,
164            };
165
166            // Schedule the command through the barrier system without waiting
167            self.barrier_scheduler
168                .run_command_no_wait(database_id, list_finish_command)
169                .context("Failed to schedule ListFinish command")?;
170
171            tracing::info!(
172                %table_id,
173                %associated_source_id,
174                "ListFinish command scheduled successfully"
175            );
176        }
177        Ok(())
178    }
179
180    async fn handle_load_finished_source_ids(
181        &self,
182        load_finished: Vec<PbLoadFinishedSource>,
183    ) -> MetaResult<()> {
184        let mut load_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
185
186        for load_finished in load_finished {
187            let table_id = load_finished.table_id;
188            let associated_source_id = load_finished.associated_source_id;
189            load_finished_info
190                .entry((table_id, associated_source_id))
191                .or_default()
192                .insert(load_finished.reporter_actor_id);
193        }
194
195        for ((table_id, associated_source_id), actors) in load_finished_info {
196            let allow_yield = self
197                .refresh_manager
198                .mark_load_stage_finished(table_id, &actors)?;
199
200            if !allow_yield {
201                continue;
202            }
203
204            // Find the database ID for this table
205            let database_id = self
206                .metadata_manager
207                .catalog_controller
208                .get_object_database_id(associated_source_id)
209                .await
210                .context("Failed to get database id for table")?;
211
212            // Create LoadFinish command
213            let load_finish_command = Command::LoadFinish {
214                table_id,
215                associated_source_id,
216            };
217
218            // Schedule the command through the barrier system without waiting
219            self.barrier_scheduler
220                .run_command_no_wait(database_id, load_finish_command)
221                .context("Failed to schedule LoadFinish command")?;
222
223            tracing::info!(
224                %table_id,
225                %associated_source_id,
226                "LoadFinish command scheduled successfully"
227            );
228        }
229
230        Ok(())
231    }
232
233    async fn handle_refresh_finished_table_ids(
234        &self,
235        refresh_finished_table_job_ids: Vec<JobId>,
236    ) -> MetaResult<()> {
237        for job_id in refresh_finished_table_job_ids {
238            let table_id = job_id.as_mv_table_id();
239
240            self.refresh_manager.mark_refresh_complete(table_id).await?;
241        }
242
243        Ok(())
244    }
245}
246
247impl GlobalBarrierWorkerContextImpl {
248    fn set_status(&self, new_status: BarrierManagerStatus) {
249        self.status.store(Arc::new(new_status));
250    }
251}
252
253impl CommandContext {
254    /// Do some stuffs after barriers are collected and the new storage version is committed, for
255    /// the given command.
256    pub async fn post_collect(
257        &self,
258        barrier_manager_context: &GlobalBarrierWorkerContextImpl,
259    ) -> MetaResult<()> {
260        let Some(command) = &self.command else {
261            return Ok(());
262        };
263        match command {
264            Command::Flush => {}
265
266            Command::Throttle(_) => {}
267
268            Command::Pause => {}
269
270            Command::Resume => {}
271
272            Command::SourceChangeSplit(SplitState {
273                split_assignment: assignment,
274                ..
275            }) => {
276                barrier_manager_context
277                    .metadata_manager
278                    .update_fragment_splits(assignment)
279                    .await?;
280            }
281
282            Command::DropStreamingJobs {
283                streaming_job_ids,
284                unregistered_state_table_ids,
285                ..
286            } => {
287                for job_id in streaming_job_ids {
288                    barrier_manager_context
289                        .refresh_manager
290                        .remove_progress_tracker(job_id.as_mv_table_id(), "drop_streaming_jobs");
291                }
292
293                barrier_manager_context
294                    .hummock_manager
295                    .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
296                    .await?;
297                barrier_manager_context
298                    .metadata_manager
299                    .catalog_controller
300                    .complete_dropped_tables(unregistered_state_table_ids.iter().copied())
301                    .await;
302            }
303            Command::ConnectorPropsChange(obj_id_map_props) => {
304                // todo: we dont know the type of the object id, it can be a source or a sink. Should carry more info in the barrier command.
305                barrier_manager_context
306                    .source_manager
307                    .apply_source_change(SourceChange::UpdateSourceProps {
308                        source_id_map_new_props: obj_id_map_props
309                            .iter()
310                            .map(|(source_id, props)| (source_id.as_source_id(), props.clone()))
311                            .collect(),
312                    })
313                    .await;
314            }
315            Command::CreateStreamingJob {
316                info,
317                job_type,
318                cross_db_snapshot_backfill_info,
319            } => {
320                match job_type {
321                    CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
322                        barrier_manager_context
323                            .metadata_manager
324                            .catalog_controller
325                            .fill_snapshot_backfill_epoch(
326                                info.stream_job_fragments.fragments.iter().filter_map(
327                                    |(fragment_id, fragment)| {
328                                        if fragment.fragment_type_mask.contains(
329                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
330                                        ) {
331                                            Some(*fragment_id as _)
332                                        } else {
333                                            None
334                                        }
335                                    },
336                                ),
337                                None,
338                                cross_db_snapshot_backfill_info,
339                            )
340                            .await?
341                    }
342                    CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
343                        barrier_manager_context
344                            .metadata_manager
345                            .catalog_controller
346                            .fill_snapshot_backfill_epoch(
347                                info.stream_job_fragments.fragments.iter().filter_map(
348                                    |(fragment_id, fragment)| {
349                                        if fragment.fragment_type_mask.contains_any([
350                                            FragmentTypeFlag::SnapshotBackfillStreamScan,
351                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
352                                        ]) {
353                                            Some(*fragment_id as _)
354                                        } else {
355                                            None
356                                        }
357                                    },
358                                ),
359                                Some(snapshot_backfill_info),
360                                cross_db_snapshot_backfill_info,
361                            )
362                            .await?
363                    }
364                }
365
366                // Do `post_collect_job_fragments` of the original streaming job in the end, so that in any previous failure,
367                // we won't mark the job as `Creating`, and then the job will be later clean by the recovery triggered by the returned error.
368                let CreateStreamingJobCommandInfo {
369                    stream_job_fragments,
370                    upstream_fragment_downstreams,
371                    ..
372                } = info;
373                let new_sink_downstream =
374                    if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
375                        let new_downstreams = ctx.new_sink_downstream.clone();
376                        let new_downstreams = FragmentDownstreamRelation::from([(
377                            ctx.sink_fragment_id,
378                            vec![new_downstreams],
379                        )]);
380                        Some(new_downstreams)
381                    } else {
382                        None
383                    };
384
385                barrier_manager_context
386                    .metadata_manager
387                    .catalog_controller
388                    .post_collect_job_fragments(
389                        stream_job_fragments.stream_job_id(),
390                        upstream_fragment_downstreams,
391                        new_sink_downstream,
392                        Some(&info.init_split_assignment),
393                    )
394                    .await?;
395
396                let source_change = SourceChange::CreateJob {
397                    added_source_fragments: stream_job_fragments.stream_source_fragments(),
398                    added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
399                };
400
401                barrier_manager_context
402                    .source_manager
403                    .apply_source_change(source_change)
404                    .await;
405            }
406            Command::RescheduleFragment { reschedules, .. } => {
407                let fragment_splits = reschedules
408                    .iter()
409                    .map(|(fragment_id, reschedule)| {
410                        (*fragment_id, reschedule.actor_splits.clone())
411                    })
412                    .collect();
413
414                barrier_manager_context
415                    .metadata_manager
416                    .update_fragment_splits(&fragment_splits)
417                    .await?;
418            }
419
420            Command::ReplaceStreamJob(
421                replace_plan @ ReplaceStreamJobPlan {
422                    old_fragments,
423                    new_fragments,
424                    upstream_fragment_downstreams,
425                    to_drop_state_table_ids,
426                    auto_refresh_schema_sinks,
427                    init_split_assignment,
428                    ..
429                },
430            ) => {
431                // Update actors and actor_dispatchers for new table fragments.
432                barrier_manager_context
433                    .metadata_manager
434                    .catalog_controller
435                    .post_collect_job_fragments(
436                        new_fragments.stream_job_id,
437                        upstream_fragment_downstreams,
438                        None,
439                        Some(init_split_assignment),
440                    )
441                    .await?;
442
443                if let Some(sinks) = auto_refresh_schema_sinks {
444                    for sink in sinks {
445                        barrier_manager_context
446                            .metadata_manager
447                            .catalog_controller
448                            .post_collect_job_fragments(
449                                sink.tmp_sink_id.as_job_id(),
450                                &Default::default(), // upstream_fragment_downstreams is already inserted in the job of upstream table
451                                None, // no replace plan
452                                None, // no init split assignment
453                            )
454                            .await?;
455                    }
456                }
457
458                // Apply the split changes in source manager.
459                barrier_manager_context
460                    .source_manager
461                    .handle_replace_job(
462                        old_fragments,
463                        new_fragments.stream_source_fragments(),
464                        replace_plan,
465                    )
466                    .await;
467                barrier_manager_context
468                    .hummock_manager
469                    .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
470                    .await?;
471            }
472
473            Command::CreateSubscription {
474                subscription_id, ..
475            } => {
476                barrier_manager_context
477                    .metadata_manager
478                    .catalog_controller
479                    .finish_create_subscription_catalog(*subscription_id)
480                    .await?
481            }
482            Command::DropSubscription { .. } => {}
483            Command::ListFinish { .. } | Command::LoadFinish { .. } | Command::Refresh { .. } => {}
484        }
485
486        Ok(())
487    }
488}