risingwave_meta/barrier/context/
context_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::{Context, anyhow};
19use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
20use risingwave_common::id::JobId;
21use risingwave_meta_model::table::RefreshState;
22use risingwave_pb::common::WorkerNode;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::id::{ActorId, SourceId};
25use risingwave_pb::stream_service::barrier_complete_response::{
26    PbListFinishedSource, PbLoadFinishedSource,
27};
28use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
29use risingwave_rpc_client::StreamingControlHandle;
30
31use crate::barrier::command::CommandContext;
32use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
33use crate::barrier::progress::TrackingJob;
34use crate::barrier::schedule::MarkReadyOptions;
35use crate::barrier::{
36    BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
37    CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
38    Scheduled,
39};
40use crate::hummock::CommitEpochInfo;
41use crate::model::FragmentDownstreamRelation;
42use crate::stream::{REFRESH_TABLE_PROGRESS_TRACKER, SourceChange, SplitState};
43use crate::{MetaError, MetaResult};
44
45impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
46    #[await_tree::instrument]
47    async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
48        self.hummock_manager.commit_epoch(commit_info).await?;
49        Ok(self.hummock_manager.get_version_stats().await)
50    }
51
52    #[await_tree::instrument("next_scheduled_barrier")]
53    async fn next_scheduled(&self) -> Scheduled {
54        self.scheduled_barriers.next_scheduled().await
55    }
56
57    fn abort_and_mark_blocked(
58        &self,
59        database_id: Option<DatabaseId>,
60        recovery_reason: RecoveryReason,
61    ) {
62        if database_id.is_none() {
63            self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
64        }
65
66        // Mark blocked and abort buffered schedules, they might be dirty already.
67        self.scheduled_barriers
68            .abort_and_mark_blocked(database_id, "cluster is under recovering");
69    }
70
71    fn mark_ready(&self, options: MarkReadyOptions) {
72        let is_global = matches!(&options, MarkReadyOptions::Global { .. });
73        self.scheduled_barriers.mark_ready(options);
74        if is_global {
75            self.set_status(BarrierManagerStatus::Running);
76        }
77    }
78
79    #[await_tree::instrument("post_collect_command({command})")]
80    async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
81        command.post_collect(self).await
82    }
83
84    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
85        self.metadata_manager
86            .notify_finish_failed(database_id, err)
87            .await
88    }
89
90    #[await_tree::instrument("finish_creating_job({job})")]
91    async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
92        job.finish(&self.metadata_manager, &self.source_manager)
93            .await
94    }
95
96    #[await_tree::instrument("finish_cdc_table_backfill({job})")]
97    async fn finish_cdc_table_backfill(&self, job: JobId) -> MetaResult<()> {
98        self.env.cdc_table_backfill_tracker.complete_job(job).await
99    }
100
101    #[await_tree::instrument("new_control_stream({})", node.id)]
102    async fn new_control_stream(
103        &self,
104        node: &WorkerNode,
105        init_request: &PbInitRequest,
106    ) -> MetaResult<StreamingControlHandle> {
107        self.new_control_stream_impl(node, init_request).await
108    }
109
110    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
111        self.reload_runtime_info_impl().await
112    }
113
114    async fn reload_database_runtime_info(
115        &self,
116        database_id: DatabaseId,
117    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
118        self.reload_database_runtime_info_impl(database_id).await
119    }
120
121    async fn handle_list_finished_source_ids(
122        &self,
123        list_finished: Vec<PbListFinishedSource>,
124    ) -> MetaResult<()> {
125        let mut list_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
126
127        for list_finished in list_finished {
128            let table_id = list_finished.table_id;
129            let associated_source_id = list_finished.associated_source_id;
130            list_finished_info
131                .entry((table_id, associated_source_id))
132                .or_default()
133                .insert(list_finished.reporter_actor_id);
134        }
135
136        for ((table_id, associated_source_id), actors) in list_finished_info {
137            let allow_yield = {
138                let mut lock_handle = REFRESH_TABLE_PROGRESS_TRACKER.lock();
139                let single_task_tracker =
140                    lock_handle.inner.get_mut(&table_id).ok_or_else(|| {
141                        MetaError::from(anyhow!("Table tracker not found for table {}", table_id))
142                    })?;
143                single_task_tracker.report_list_finished(actors.iter().copied());
144                let allow_yield = single_task_tracker.is_list_finished()?;
145
146                Ok::<_, MetaError>(allow_yield)
147            }?;
148
149            if !allow_yield {
150                continue;
151            }
152
153            // Find the database ID for this table
154            let database_id = self
155                .metadata_manager
156                .catalog_controller
157                .get_object_database_id(associated_source_id)
158                .await
159                .context("Failed to get database id for table")?;
160
161            // Create ListFinish command
162            let list_finish_command = Command::ListFinish {
163                table_id,
164                associated_source_id,
165            };
166
167            // Schedule the command through the barrier system without waiting
168            self.barrier_scheduler
169                .run_command_no_wait(database_id, list_finish_command)
170                .context("Failed to schedule ListFinish command")?;
171
172            tracing::info!(
173                %table_id,
174                %associated_source_id,
175                "ListFinish command scheduled successfully"
176            );
177        }
178        Ok(())
179    }
180
181    async fn handle_load_finished_source_ids(
182        &self,
183        load_finished: Vec<PbLoadFinishedSource>,
184    ) -> MetaResult<()> {
185        let mut load_finished_info: HashMap<(TableId, SourceId), HashSet<ActorId>> = HashMap::new();
186
187        for load_finished in load_finished {
188            let table_id = load_finished.table_id;
189            let associated_source_id = load_finished.associated_source_id;
190            load_finished_info
191                .entry((table_id, associated_source_id))
192                .or_default()
193                .insert(load_finished.reporter_actor_id);
194        }
195
196        for ((table_id, associated_source_id), actors) in load_finished_info {
197            let allow_yield = {
198                let mut lock_handle = REFRESH_TABLE_PROGRESS_TRACKER.lock();
199                let single_task_tracker =
200                    lock_handle.inner.get_mut(&table_id).ok_or_else(|| {
201                        MetaError::from(anyhow!("Table tracker not found for table {}", table_id))
202                    })?;
203                single_task_tracker.report_load_finished(actors.iter().copied());
204                let allow_yield = single_task_tracker.is_load_finished()?;
205
206                Ok::<_, MetaError>(allow_yield)
207            }?;
208
209            if !allow_yield {
210                continue;
211            }
212
213            // Find the database ID for this table
214            let database_id = self
215                .metadata_manager
216                .catalog_controller
217                .get_object_database_id(associated_source_id)
218                .await
219                .context("Failed to get database id for table")?;
220
221            // Create LoadFinish command
222            let load_finish_command = Command::LoadFinish {
223                table_id,
224                associated_source_id,
225            };
226
227            // Schedule the command through the barrier system without waiting
228            self.barrier_scheduler
229                .run_command_no_wait(database_id, load_finish_command)
230                .context("Failed to schedule LoadFinish command")?;
231
232            tracing::info!(
233                %table_id,
234                %associated_source_id,
235                "LoadFinish command scheduled successfully"
236            );
237        }
238
239        Ok(())
240    }
241
242    async fn handle_refresh_finished_table_ids(
243        &self,
244        refresh_finished_table_job_ids: Vec<JobId>,
245    ) -> MetaResult<()> {
246        for job_id in refresh_finished_table_job_ids {
247            {
248                let table_id = &job_id.as_mv_table_id();
249                let mut lock_handle = REFRESH_TABLE_PROGRESS_TRACKER.lock();
250                let remove_res = lock_handle.inner.remove(table_id);
251                debug_assert!(remove_res.is_some());
252
253                // try remove the table_id from the table_id_by_database_id
254                lock_handle
255                    .table_id_by_database_id
256                    .values_mut()
257                    .for_each(|table_ids| {
258                        table_ids.remove(table_id);
259                    });
260            }
261
262            // Update the table's refresh state back to Idle (refresh complete)
263            self.metadata_manager
264                .catalog_controller
265                .set_table_refresh_state(job_id.as_mv_table_id(), RefreshState::Idle)
266                .await
267                .context("Failed to set table refresh state to Idle")?;
268
269            tracing::info!(%job_id, "Table refresh completed, state updated to Idle");
270        }
271
272        Ok(())
273    }
274}
275
276impl GlobalBarrierWorkerContextImpl {
277    fn set_status(&self, new_status: BarrierManagerStatus) {
278        self.status.store(Arc::new(new_status));
279    }
280}
281
282impl CommandContext {
283    /// Do some stuffs after barriers are collected and the new storage version is committed, for
284    /// the given command.
285    pub async fn post_collect(
286        &self,
287        barrier_manager_context: &GlobalBarrierWorkerContextImpl,
288    ) -> MetaResult<()> {
289        let Some(command) = &self.command else {
290            return Ok(());
291        };
292        match command {
293            Command::Flush => {}
294
295            Command::Throttle(_) => {}
296
297            Command::Pause => {}
298
299            Command::Resume => {}
300
301            Command::SourceChangeSplit(SplitState {
302                split_assignment: assignment,
303                ..
304            }) => {
305                barrier_manager_context
306                    .metadata_manager
307                    .update_fragment_splits(assignment)
308                    .await?;
309            }
310
311            Command::DropStreamingJobs {
312                unregistered_state_table_ids,
313                ..
314            } => {
315                barrier_manager_context
316                    .hummock_manager
317                    .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
318                    .await?;
319                barrier_manager_context
320                    .metadata_manager
321                    .catalog_controller
322                    .complete_dropped_tables(unregistered_state_table_ids.iter().copied())
323                    .await;
324            }
325            Command::ConnectorPropsChange(obj_id_map_props) => {
326                // todo: we dont know the type of the object id, it can be a source or a sink. Should carry more info in the barrier command.
327                barrier_manager_context
328                    .source_manager
329                    .apply_source_change(SourceChange::UpdateSourceProps {
330                        source_id_map_new_props: obj_id_map_props
331                            .iter()
332                            .map(|(source_id, props)| (source_id.as_source_id(), props.clone()))
333                            .collect(),
334                    })
335                    .await;
336            }
337            Command::CreateStreamingJob {
338                info,
339                job_type,
340                cross_db_snapshot_backfill_info,
341            } => {
342                match job_type {
343                    CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
344                        barrier_manager_context
345                            .metadata_manager
346                            .catalog_controller
347                            .fill_snapshot_backfill_epoch(
348                                info.stream_job_fragments.fragments.iter().filter_map(
349                                    |(fragment_id, fragment)| {
350                                        if fragment.fragment_type_mask.contains(
351                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
352                                        ) {
353                                            Some(*fragment_id as _)
354                                        } else {
355                                            None
356                                        }
357                                    },
358                                ),
359                                None,
360                                cross_db_snapshot_backfill_info,
361                            )
362                            .await?
363                    }
364                    CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
365                        barrier_manager_context
366                            .metadata_manager
367                            .catalog_controller
368                            .fill_snapshot_backfill_epoch(
369                                info.stream_job_fragments.fragments.iter().filter_map(
370                                    |(fragment_id, fragment)| {
371                                        if fragment.fragment_type_mask.contains_any([
372                                            FragmentTypeFlag::SnapshotBackfillStreamScan,
373                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
374                                        ]) {
375                                            Some(*fragment_id as _)
376                                        } else {
377                                            None
378                                        }
379                                    },
380                                ),
381                                Some(snapshot_backfill_info),
382                                cross_db_snapshot_backfill_info,
383                            )
384                            .await?
385                    }
386                }
387
388                // Do `post_collect_job_fragments` of the original streaming job in the end, so that in any previous failure,
389                // we won't mark the job as `Creating`, and then the job will be later clean by the recovery triggered by the returned error.
390                let CreateStreamingJobCommandInfo {
391                    stream_job_fragments,
392                    upstream_fragment_downstreams,
393                    ..
394                } = info;
395                let new_sink_downstream =
396                    if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
397                        let new_downstreams = ctx.new_sink_downstream.clone();
398                        let new_downstreams = FragmentDownstreamRelation::from([(
399                            ctx.sink_fragment_id,
400                            vec![new_downstreams],
401                        )]);
402                        Some(new_downstreams)
403                    } else {
404                        None
405                    };
406
407                barrier_manager_context
408                    .metadata_manager
409                    .catalog_controller
410                    .post_collect_job_fragments(
411                        stream_job_fragments.stream_job_id(),
412                        upstream_fragment_downstreams,
413                        new_sink_downstream,
414                        Some(&info.init_split_assignment),
415                    )
416                    .await?;
417
418                let source_change = SourceChange::CreateJob {
419                    added_source_fragments: stream_job_fragments.stream_source_fragments(),
420                    added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
421                };
422
423                barrier_manager_context
424                    .source_manager
425                    .apply_source_change(source_change)
426                    .await;
427            }
428            Command::RescheduleFragment { reschedules, .. } => {
429                let fragment_splits = reschedules
430                    .iter()
431                    .map(|(fragment_id, reschedule)| {
432                        (*fragment_id, reschedule.actor_splits.clone())
433                    })
434                    .collect();
435
436                barrier_manager_context
437                    .metadata_manager
438                    .update_fragment_splits(&fragment_splits)
439                    .await?;
440            }
441
442            Command::ReplaceStreamJob(
443                replace_plan @ ReplaceStreamJobPlan {
444                    old_fragments,
445                    new_fragments,
446                    upstream_fragment_downstreams,
447                    to_drop_state_table_ids,
448                    auto_refresh_schema_sinks,
449                    init_split_assignment,
450                    ..
451                },
452            ) => {
453                // Update actors and actor_dispatchers for new table fragments.
454                barrier_manager_context
455                    .metadata_manager
456                    .catalog_controller
457                    .post_collect_job_fragments(
458                        new_fragments.stream_job_id,
459                        upstream_fragment_downstreams,
460                        None,
461                        Some(init_split_assignment),
462                    )
463                    .await?;
464
465                if let Some(sinks) = auto_refresh_schema_sinks {
466                    for sink in sinks {
467                        barrier_manager_context
468                            .metadata_manager
469                            .catalog_controller
470                            .post_collect_job_fragments(
471                                sink.tmp_sink_id.as_job_id(),
472                                &Default::default(), // upstream_fragment_downstreams is already inserted in the job of upstream table
473                                None, // no replace plan
474                                None, // no init split assignment
475                            )
476                            .await?;
477                    }
478                }
479
480                // Apply the split changes in source manager.
481                barrier_manager_context
482                    .source_manager
483                    .handle_replace_job(
484                        old_fragments,
485                        new_fragments.stream_source_fragments(),
486                        replace_plan,
487                    )
488                    .await;
489                barrier_manager_context
490                    .hummock_manager
491                    .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
492                    .await?;
493            }
494
495            Command::CreateSubscription {
496                subscription_id, ..
497            } => {
498                barrier_manager_context
499                    .metadata_manager
500                    .catalog_controller
501                    .finish_create_subscription_catalog(*subscription_id)
502                    .await?
503            }
504            Command::DropSubscription { .. } => {}
505            Command::MergeSnapshotBackfillStreamingJobs(_) => {}
506            Command::StartFragmentBackfill { .. } => {}
507            Command::Refresh { table_id, .. } => {
508                barrier_manager_context
509                    .metadata_manager
510                    .catalog_controller
511                    .set_table_refresh_state(*table_id, RefreshState::Refreshing)
512                    .await?;
513            }
514            Command::ListFinish { .. } => {
515                // List stage completed, table remains in Refreshing state
516                // The next stage will be load completion
517            }
518            Command::LoadFinish { table_id, .. } => {
519                barrier_manager_context
520                    .metadata_manager
521                    .catalog_controller
522                    .set_table_refresh_state(*table_id, RefreshState::Finishing)
523                    .await?;
524            }
525        }
526
527        Ok(())
528    }
529}