risingwave_meta/barrier/context/
context_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::Context;
18use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag};
19use risingwave_pb::common::WorkerNode;
20use risingwave_pb::hummock::HummockVersionStats;
21use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
22use risingwave_rpc_client::StreamingControlHandle;
23use thiserror_ext::AsReport;
24
25use crate::MetaResult;
26use crate::barrier::command::CommandContext;
27use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
28use crate::barrier::progress::TrackingJob;
29use crate::barrier::schedule::MarkReadyOptions;
30use crate::barrier::{
31    BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
32    CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
33    Scheduled,
34};
35use crate::hummock::CommitEpochInfo;
36use crate::stream::SourceChange;
37
38impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
39    #[await_tree::instrument]
40    async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
41        self.hummock_manager.commit_epoch(commit_info).await?;
42        Ok(self.hummock_manager.get_version_stats().await)
43    }
44
45    #[await_tree::instrument("next_scheduled_barrier")]
46    async fn next_scheduled(&self) -> Scheduled {
47        self.scheduled_barriers.next_scheduled().await
48    }
49
50    fn abort_and_mark_blocked(
51        &self,
52        database_id: Option<DatabaseId>,
53        recovery_reason: RecoveryReason,
54    ) {
55        if database_id.is_none() {
56            self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
57        }
58
59        // Mark blocked and abort buffered schedules, they might be dirty already.
60        self.scheduled_barriers
61            .abort_and_mark_blocked(database_id, "cluster is under recovering");
62    }
63
64    fn mark_ready(&self, options: MarkReadyOptions) {
65        let is_global = matches!(&options, MarkReadyOptions::Global { .. });
66        self.scheduled_barriers.mark_ready(options);
67        if is_global {
68            self.set_status(BarrierManagerStatus::Running);
69        }
70    }
71
72    #[await_tree::instrument("post_collect_command({command})")]
73    async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
74        command.post_collect(self).await
75    }
76
77    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
78        self.metadata_manager
79            .notify_finish_failed(database_id, err)
80            .await
81    }
82
83    #[await_tree::instrument("finish_creating_job({job})")]
84    async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
85        job.finish(&self.metadata_manager).await
86    }
87
88    #[await_tree::instrument("new_control_stream({})", node.id)]
89    async fn new_control_stream(
90        &self,
91        node: &WorkerNode,
92        init_request: &PbInitRequest,
93    ) -> MetaResult<StreamingControlHandle> {
94        self.new_control_stream_impl(node, init_request).await
95    }
96
97    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
98        self.reload_runtime_info_impl().await
99    }
100
101    async fn reload_database_runtime_info(
102        &self,
103        database_id: DatabaseId,
104    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
105        self.reload_database_runtime_info_impl(database_id).await
106    }
107
108    async fn handle_load_finished_source_ids(
109        &self,
110        load_finished_source_ids: Vec<u32>,
111    ) -> MetaResult<()> {
112        use risingwave_common::catalog::TableId;
113
114        tracing::info!(
115            "Handling load finished source IDs: {:?}",
116            load_finished_source_ids
117        );
118
119        use crate::barrier::Command;
120        for associated_source_id in load_finished_source_ids {
121            let res: MetaResult<()> = try {
122                tracing::info!(%associated_source_id, "Scheduling LoadFinish command for refreshable batch source");
123
124                // For refreshable batch sources, associated_source_id is the table_id
125                let table_id = TableId::new(associated_source_id);
126                let associated_source_id = table_id;
127
128                // Find the database ID for this table
129                let database_id = self
130                    .metadata_manager
131                    .catalog_controller
132                    .get_object_database_id(table_id.table_id() as _)
133                    .await
134                    .context("Failed to get database id for table")?;
135
136                // Create LoadFinish command
137                let load_finish_command = Command::LoadFinish {
138                    table_id,
139                    associated_source_id,
140                };
141
142                // Schedule the command through the barrier system without waiting
143                self.barrier_scheduler
144                    .run_command_no_wait(
145                        risingwave_common::catalog::DatabaseId::new(database_id as u32),
146                        load_finish_command,
147                    )
148                    .context("Failed to schedule LoadFinish command")?;
149
150                tracing::info!(%table_id, %associated_source_id, "LoadFinish command scheduled successfully");
151            };
152            if let Err(e) = res {
153                tracing::error!(error = %e.as_report(),%associated_source_id, "Failed to handle source load finished");
154            }
155        }
156
157        Ok(())
158    }
159}
160
161impl GlobalBarrierWorkerContextImpl {
162    fn set_status(&self, new_status: BarrierManagerStatus) {
163        self.status.store(Arc::new(new_status));
164    }
165}
166
167impl CommandContext {
168    /// Do some stuffs after barriers are collected and the new storage version is committed, for
169    /// the given command.
170    pub async fn post_collect(
171        &self,
172        barrier_manager_context: &GlobalBarrierWorkerContextImpl,
173    ) -> MetaResult<()> {
174        let Some(command) = &self.command else {
175            return Ok(());
176        };
177        match command {
178            Command::Flush => {}
179
180            Command::Throttle(_) => {}
181
182            Command::Pause => {}
183
184            Command::Resume => {}
185
186            Command::SourceChangeSplit(split_assignment) => {
187                barrier_manager_context
188                    .metadata_manager
189                    .update_actor_splits_by_split_assignment(split_assignment)
190                    .await?;
191                barrier_manager_context
192                    .source_manager
193                    .apply_source_change(SourceChange::SplitChange(split_assignment.clone()))
194                    .await;
195            }
196
197            Command::DropStreamingJobs {
198                unregistered_state_table_ids,
199                ..
200            } => {
201                barrier_manager_context
202                    .hummock_manager
203                    .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
204                    .await?;
205            }
206            Command::ConnectorPropsChange(obj_id_map_props) => {
207                // todo: we dont know the type of the object id, it can be a source or a sink. Should carry more info in the barrier command.
208                barrier_manager_context
209                    .source_manager
210                    .apply_source_change(SourceChange::UpdateSourceProps {
211                        source_id_map_new_props: obj_id_map_props.clone(),
212                    })
213                    .await;
214            }
215            Command::CreateStreamingJob {
216                info,
217                job_type,
218                cross_db_snapshot_backfill_info,
219            } => {
220                let mut replace_plan = None;
221                match job_type {
222                    CreateStreamingJobType::SinkIntoTable(plan) => {
223                        replace_plan = Some(plan);
224                    }
225                    CreateStreamingJobType::Normal => {
226                        barrier_manager_context
227                            .metadata_manager
228                            .catalog_controller
229                            .fill_snapshot_backfill_epoch(
230                                info.stream_job_fragments.fragments.iter().filter_map(
231                                    |(fragment_id, fragment)| {
232                                        if fragment.fragment_type_mask.contains(
233                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
234                                        ) {
235                                            Some(*fragment_id as _)
236                                        } else {
237                                            None
238                                        }
239                                    },
240                                ),
241                                None,
242                                cross_db_snapshot_backfill_info,
243                            )
244                            .await?
245                    }
246                    CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
247                        barrier_manager_context
248                            .metadata_manager
249                            .catalog_controller
250                            .fill_snapshot_backfill_epoch(
251                                info.stream_job_fragments.fragments.iter().filter_map(
252                                    |(fragment_id, fragment)| {
253                                        if fragment.fragment_type_mask.contains_any([
254                                            FragmentTypeFlag::SnapshotBackfillStreamScan,
255                                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan,
256                                        ]) {
257                                            Some(*fragment_id as _)
258                                        } else {
259                                            None
260                                        }
261                                    },
262                                ),
263                                Some(snapshot_backfill_info),
264                                cross_db_snapshot_backfill_info,
265                            )
266                            .await?
267                    }
268                }
269
270                // Do `post_collect_job_fragments` of the original streaming job in the end, so that in any previous failure,
271                // we won't mark the job as `Creating`, and then the job will be later clean by the recovery triggered by the returned error.
272                let CreateStreamingJobCommandInfo {
273                    stream_job_fragments,
274                    upstream_fragment_downstreams,
275                    init_split_assignment,
276                    ..
277                } = info;
278                barrier_manager_context
279                    .metadata_manager
280                    .catalog_controller
281                    .post_collect_job_fragments(
282                        stream_job_fragments.stream_job_id().table_id as _,
283                        stream_job_fragments.actor_ids(),
284                        upstream_fragment_downstreams,
285                        init_split_assignment,
286                        replace_plan,
287                    )
288                    .await?;
289
290                if let Some(plan) = replace_plan {
291                    barrier_manager_context
292                        .source_manager
293                        .handle_replace_job(
294                            &plan.old_fragments,
295                            plan.new_fragments.stream_source_fragments(),
296                            init_split_assignment.clone(),
297                            plan,
298                        )
299                        .await;
300                }
301
302                let source_change = SourceChange::CreateJob {
303                    added_source_fragments: stream_job_fragments.stream_source_fragments(),
304                    added_backfill_fragments: stream_job_fragments.source_backfill_fragments(),
305                    split_assignment: init_split_assignment.clone(),
306                };
307
308                barrier_manager_context
309                    .source_manager
310                    .apply_source_change(source_change)
311                    .await;
312            }
313            Command::RescheduleFragment {
314                reschedules,
315                post_updates,
316                ..
317            } => {
318                barrier_manager_context
319                    .scale_controller
320                    .post_apply_reschedule(reschedules, post_updates)
321                    .await?;
322            }
323
324            Command::ReplaceStreamJob(
325                replace_plan @ ReplaceStreamJobPlan {
326                    old_fragments,
327                    new_fragments,
328                    upstream_fragment_downstreams,
329                    init_split_assignment,
330                    to_drop_state_table_ids,
331                    auto_refresh_schema_sinks,
332                    ..
333                },
334            ) => {
335                // Update actors and actor_dispatchers for new table fragments.
336                barrier_manager_context
337                    .metadata_manager
338                    .catalog_controller
339                    .post_collect_job_fragments(
340                        new_fragments.stream_job_id.table_id as _,
341                        new_fragments.actor_ids(),
342                        upstream_fragment_downstreams,
343                        init_split_assignment,
344                        None,
345                    )
346                    .await?;
347
348                if let Some(sinks) = auto_refresh_schema_sinks {
349                    for sink in sinks {
350                        barrier_manager_context
351                            .metadata_manager
352                            .catalog_controller
353                            .post_collect_job_fragments(
354                                sink.tmp_sink_id,
355                                sink.actor_status.keys().cloned().collect(),
356                                &Default::default(), // upstream_fragment_downstreams is already inserted in the job of upstream table
357                                &Default::default(), // no split assignment
358                                None, // no replace plan
359                            )
360                            .await?;
361                    }
362                }
363
364                // Apply the split changes in source manager.
365                barrier_manager_context
366                    .source_manager
367                    .handle_replace_job(
368                        old_fragments,
369                        new_fragments.stream_source_fragments(),
370                        init_split_assignment.clone(),
371                        replace_plan,
372                    )
373                    .await;
374                barrier_manager_context
375                    .hummock_manager
376                    .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
377                    .await?;
378            }
379
380            Command::CreateSubscription {
381                subscription_id, ..
382            } => {
383                barrier_manager_context
384                    .metadata_manager
385                    .catalog_controller
386                    .finish_create_subscription_catalog(*subscription_id)
387                    .await?
388            }
389            Command::DropSubscription { .. } => {}
390            Command::MergeSnapshotBackfillStreamingJobs(_) => {}
391            Command::StartFragmentBackfill { .. } => {}
392            Command::Refresh { .. } => {}
393            Command::LoadFinish { .. } => {}
394        }
395
396        Ok(())
397    }
398}