risingwave_meta/barrier/context/
context_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::Context;
18use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
19use risingwave_meta_model::table::RefreshState;
20use risingwave_pb::common::WorkerNode;
21use risingwave_pb::hummock::HummockVersionStats;
22use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
23use risingwave_rpc_client::StreamingControlHandle;
24use thiserror_ext::AsReport;
25
26use crate::MetaResult;
27use crate::barrier::command::CommandContext;
28use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
29use crate::barrier::progress::TrackingJob;
30use crate::barrier::schedule::MarkReadyOptions;
31use crate::barrier::{
32    BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
33    CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
34    Scheduled,
35};
36use crate::hummock::CommitEpochInfo;
37use crate::model::FragmentDownstreamRelation;
38use crate::stream::{SourceChange, SplitState};
39
40impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
41    #[await_tree::instrument]
42    async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
43        self.hummock_manager.commit_epoch(commit_info).await?;
44        Ok(self.hummock_manager.get_version_stats().await)
45    }
46
47    #[await_tree::instrument("next_scheduled_barrier")]
48    async fn next_scheduled(&self) -> Scheduled {
49        self.scheduled_barriers.next_scheduled().await
50    }
51
52    fn abort_and_mark_blocked(
53        &self,
54        database_id: Option<DatabaseId>,
55        recovery_reason: RecoveryReason,
56    ) {
57        if database_id.is_none() {
58            self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
59        }
60
61        // Mark blocked and abort buffered schedules, they might be dirty already.
62        self.scheduled_barriers
63            .abort_and_mark_blocked(database_id, "cluster is under recovering");
64    }
65
66    fn mark_ready(&self, options: MarkReadyOptions) {
67        let is_global = matches!(&options, MarkReadyOptions::Global { .. });
68        self.scheduled_barriers.mark_ready(options);
69        if is_global {
70            self.set_status(BarrierManagerStatus::Running);
71        }
72    }
73
74    #[await_tree::instrument("post_collect_command({command})")]
75    async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
76        command.post_collect(self).await
77    }
78
79    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
80        self.metadata_manager
81            .notify_finish_failed(database_id, err)
82            .await
83    }
84
85    #[await_tree::instrument("finish_creating_job({job})")]
86    async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
87        job.finish(&self.metadata_manager).await
88    }
89
90    #[await_tree::instrument("finish_cdc_table_backfill({job})")]
91    async fn finish_cdc_table_backfill(&self, job: TableId) -> MetaResult<()> {
92        self.env.cdc_table_backfill_tracker.complete_job(job).await
93    }
94
95    #[await_tree::instrument("new_control_stream({})", node.id)]
96    async fn new_control_stream(
97        &self,
98        node: &WorkerNode,
99        init_request: &PbInitRequest,
100    ) -> MetaResult<StreamingControlHandle> {
101        self.new_control_stream_impl(node, init_request).await
102    }
103
104    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
105        self.reload_runtime_info_impl().await
106    }
107
108    async fn reload_database_runtime_info(
109        &self,
110        database_id: DatabaseId,
111    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
112        self.reload_database_runtime_info_impl(database_id).await
113    }
114
115    async fn handle_load_finished_source_ids(
116        &self,
117        load_finished_source_ids: Vec<u32>,
118    ) -> MetaResult<()> {
119        use risingwave_common::catalog::TableId;
120
121        tracing::info!(
122            "Handling load finished source IDs: {:?}",
123            load_finished_source_ids
124        );
125
126        use crate::barrier::Command;
127        for associated_source_id in load_finished_source_ids {
128            let res: MetaResult<()> = try {
129                tracing::info!(%associated_source_id, "Scheduling LoadFinish command for refreshable batch source");
130
131                // For refreshable batch sources, associated_source_id is the table_id
132                let associated_source_id = TableId::new(associated_source_id);
133                // Use a proper lookup to get the table_id associated with the source_id
134                let table_id = self
135                    .metadata_manager
136                    .catalog_controller
137                    .get_table_by_associate_source_id(associated_source_id.table_id() as _)
138                    .await
139                    .context("Failed to get table id for source")?
140                    .id
141                    .into();
142
143                // Find the database ID for this table
144                let database_id = self
145                    .metadata_manager
146                    .catalog_controller
147                    .get_object_database_id(associated_source_id.table_id() as _)
148                    .await
149                    .context("Failed to get database id for table")?;
150
151                // Create LoadFinish command
152                let load_finish_command = Command::LoadFinish {
153                    table_id,
154                    associated_source_id,
155                };
156
157                // Schedule the command through the barrier system without waiting
158                self.barrier_scheduler
159                    .run_command_no_wait(
160                        risingwave_common::catalog::DatabaseId::new(database_id as u32),
161                        load_finish_command,
162                    )
163                    .context("Failed to schedule LoadFinish command")?;
164
165                tracing::info!(%associated_source_id, %associated_source_id, "LoadFinish command scheduled successfully");
166            };
167            if let Err(e) = res {
168                tracing::error!(error = %e.as_report(), %associated_source_id, "Failed to handle source load finished");
169            }
170        }
171
172        Ok(())
173    }
174
175    async fn handle_refresh_finished_table_ids(
176        &self,
177        refresh_finished_table_ids: Vec<u32>,
178    ) -> MetaResult<()> {
179        use risingwave_meta_model::table::RefreshState;
180
181        tracing::info!(
182            "Handling refresh finished table IDs: {:?}",
183            refresh_finished_table_ids
184        );
185
186        for table_id in refresh_finished_table_ids {
187            let res: MetaResult<()> = try {
188                tracing::info!(%table_id, "Processing refresh finished for materialized view");
189
190                // Update the table's refresh state back to Idle (refresh complete)
191                self.metadata_manager
192                    .catalog_controller
193                    .set_table_refresh_state(table_id as i32, RefreshState::Idle)
194                    .await
195                    .context("Failed to set table refresh state to Idle")?;
196
197                tracing::info!(%table_id, "Table refresh completed, state updated to Idle");
198            };
199            if let Err(e) = res {
200                tracing::error!(error = %e.as_report(), %table_id, "Failed to handle refresh finished table");
201            }
202        }
203
204        Ok(())
205    }
206}
207
208impl GlobalBarrierWorkerContextImpl {
209    fn set_status(&self, new_status: BarrierManagerStatus) {
210        self.status.store(Arc::new(new_status));
211    }
212}
213
214impl CommandContext {
215    /// Do some stuffs after barriers are collected and the new storage version is committed, for
216    /// the given command.
217    pub async fn post_collect(
218        &self,
219        barrier_manager_context: &GlobalBarrierWorkerContextImpl,
220    ) -> MetaResult<()> {
221        let Some(command) = &self.command else {
222            return Ok(());
223        };
224        match command {
225            Command::Flush => {}
226
227            Command::Throttle(_) => {}
228
229            Command::Pause => {}
230
231            Command::Resume => {}
232
233            Command::SourceChangeSplit(SplitState {
234                split_assignment: assignment,
235                discovered_source_splits: source_splits,
236            }) => {
237                barrier_manager_context
238                    .metadata_manager
239                    .update_actor_splits_by_split_assignment(assignment)
240                    .await?;
241
242                barrier_manager_context
243                    .metadata_manager
244                    .update_source_splits(source_splits)
245                    .await?;
246            }
247
248            Command::DropStreamingJobs {
249                unregistered_state_table_ids,
250                ..
251            } => {
252                barrier_manager_context
253                    .hummock_manager
254                    .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
255                    .await?;
256                barrier_manager_context
257                    .metadata_manager
258                    .catalog_controller
259                    .complete_dropped_tables(
260                        unregistered_state_table_ids
261                            .iter()
262                            .map(|id| id.table_id as _),
263                    )
264                    .await;
265            }
266            Command::ConnectorPropsChange(obj_id_map_props) => {
267                // todo: we dont know the type of the object id, it can be a source or a sink. Should carry more info in the barrier command.
268                barrier_manager_context
269                    .source_manager
270                    .apply_source_change(SourceChange::UpdateSourceProps {
271                        source_id_map_new_props: obj_id_map_props.clone(),
272                    })
273                    .await;
274            }
275            Command::CreateStreamingJob {
276                info,
277                job_type,
278                cross_db_snapshot_backfill_info,
279            } => {
280                match job_type {
281                    CreateStreamingJobType::SinkIntoTable(_) | CreateStreamingJobType::Normal => {
282                        barrier_manager_context
283                            .metadata_manager
284                            .catalog_controller
285                            .fill_snapshot_backfill_epoch(
286                                info.stream_job_fragments.fragments.iter().filter_map(
287                                    |(fragment_id, fragment)| {
288                                        if fragment.fragment_type_mask.contains(
289                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
290                                        ) {
291                                            Some(*fragment_id as _)
292                                        } else {
293                                            None
294                                        }
295                                    },
296                                ),
297                                None,
298                                cross_db_snapshot_backfill_info,
299                            )
300                            .await?
301                    }
302                    CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
303                        barrier_manager_context
304                            .metadata_manager
305                            .catalog_controller
306                            .fill_snapshot_backfill_epoch(
307                                info.stream_job_fragments.fragments.iter().filter_map(
308                                    |(fragment_id, fragment)| {
309                                        if fragment.fragment_type_mask.contains_any([
310                                            FragmentTypeFlag::SnapshotBackfillStreamScan,
311                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
312                                        ]) {
313                                            Some(*fragment_id as _)
314                                        } else {
315                                            None
316                                        }
317                                    },
318                                ),
319                                Some(snapshot_backfill_info),
320                                cross_db_snapshot_backfill_info,
321                            )
322                            .await?
323                    }
324                }
325
326                // Do `post_collect_job_fragments` of the original streaming job in the end, so that in any previous failure,
327                // we won't mark the job as `Creating`, and then the job will be later clean by the recovery triggered by the returned error.
328                let CreateStreamingJobCommandInfo {
329                    stream_job_fragments,
330                    upstream_fragment_downstreams,
331                    init_split_assignment,
332                    ..
333                } = info;
334                let new_sink_downstream =
335                    if let CreateStreamingJobType::SinkIntoTable(ctx) = job_type {
336                        let new_downstreams = ctx.new_sink_downstream.clone();
337                        let new_downstreams = FragmentDownstreamRelation::from([(
338                            ctx.sink_fragment_id,
339                            vec![new_downstreams],
340                        )]);
341                        Some(new_downstreams)
342                    } else {
343                        None
344                    };
345
346                barrier_manager_context
347                    .metadata_manager
348                    .catalog_controller
349                    .post_collect_job_fragments(
350                        stream_job_fragments.stream_job_id().table_id as _,
351                        stream_job_fragments.actor_ids(),
352                        upstream_fragment_downstreams,
353                        init_split_assignment,
354                        new_sink_downstream,
355                    )
356                    .await?;
357
358                let source_change = SourceChange::CreateJob {
359                    added_source_fragments: stream_job_fragments.stream_source_fragments(),
360                    added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
361                };
362
363                barrier_manager_context
364                    .source_manager
365                    .apply_source_change(source_change)
366                    .await;
367            }
368            Command::RescheduleFragment {
369                reschedules,
370                post_updates,
371                ..
372            } => {
373                barrier_manager_context
374                    .scale_controller
375                    .post_apply_reschedule(reschedules, post_updates)
376                    .await?;
377            }
378
379            Command::ReplaceStreamJob(
380                replace_plan @ ReplaceStreamJobPlan {
381                    old_fragments,
382                    new_fragments,
383                    upstream_fragment_downstreams,
384                    init_split_assignment,
385                    to_drop_state_table_ids,
386                    auto_refresh_schema_sinks,
387                    ..
388                },
389            ) => {
390                // Update actors and actor_dispatchers for new table fragments.
391                barrier_manager_context
392                    .metadata_manager
393                    .catalog_controller
394                    .post_collect_job_fragments(
395                        new_fragments.stream_job_id.table_id as _,
396                        new_fragments.actor_ids(),
397                        upstream_fragment_downstreams,
398                        init_split_assignment,
399                        None,
400                    )
401                    .await?;
402
403                if let Some(sinks) = auto_refresh_schema_sinks {
404                    for sink in sinks {
405                        barrier_manager_context
406                            .metadata_manager
407                            .catalog_controller
408                            .post_collect_job_fragments(
409                                sink.tmp_sink_id,
410                                sink.actor_status.keys().cloned().collect(),
411                                &Default::default(), // upstream_fragment_downstreams is already inserted in the job of upstream table
412                                &Default::default(), // no split assignment
413                                None, // no replace plan
414                            )
415                            .await?;
416                    }
417                }
418
419                // Apply the split changes in source manager.
420                barrier_manager_context
421                    .source_manager
422                    .handle_replace_job(
423                        old_fragments,
424                        new_fragments.stream_source_fragments(),
425                        replace_plan,
426                    )
427                    .await;
428                barrier_manager_context
429                    .hummock_manager
430                    .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
431                    .await?;
432            }
433
434            Command::CreateSubscription {
435                subscription_id, ..
436            } => {
437                barrier_manager_context
438                    .metadata_manager
439                    .catalog_controller
440                    .finish_create_subscription_catalog(*subscription_id)
441                    .await?
442            }
443            Command::DropSubscription { .. } => {}
444            Command::MergeSnapshotBackfillStreamingJobs(_) => {}
445            Command::StartFragmentBackfill { .. } => {}
446            Command::Refresh { table_id, .. } => {
447                barrier_manager_context
448                    .metadata_manager
449                    .catalog_controller
450                    .set_table_refresh_state(table_id.table_id() as i32, RefreshState::Refreshing)
451                    .await?;
452            }
453            Command::LoadFinish { table_id, .. } => {
454                barrier_manager_context
455                    .metadata_manager
456                    .catalog_controller
457                    .set_table_refresh_state(table_id.table_id() as i32, RefreshState::Finishing)
458                    .await?;
459            }
460        }
461
462        Ok(())
463    }
464}