risingwave_meta/barrier/context/
context_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::catalog::DatabaseId;
18use risingwave_pb::common::WorkerNode;
19use risingwave_pb::hummock::HummockVersionStats;
20use risingwave_pb::stream_plan::PbFragmentTypeFlag;
21use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
22use risingwave_rpc_client::StreamingControlHandle;
23
24use crate::MetaResult;
25use crate::barrier::command::CommandContext;
26use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
27use crate::barrier::progress::TrackingJob;
28use crate::barrier::schedule::MarkReadyOptions;
29use crate::barrier::{
30    BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
31    CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
32    Scheduled,
33};
34use crate::hummock::CommitEpochInfo;
35use crate::stream::SourceChange;
36
37impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
38    #[await_tree::instrument]
39    async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
40        self.hummock_manager.commit_epoch(commit_info).await?;
41        Ok(self.hummock_manager.get_version_stats().await)
42    }
43
44    #[await_tree::instrument("next_scheduled_barrier")]
45    async fn next_scheduled(&self) -> Scheduled {
46        self.scheduled_barriers.next_scheduled().await
47    }
48
49    fn abort_and_mark_blocked(
50        &self,
51        database_id: Option<DatabaseId>,
52        recovery_reason: RecoveryReason,
53    ) {
54        if database_id.is_none() {
55            self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
56        }
57
58        // Mark blocked and abort buffered schedules, they might be dirty already.
59        self.scheduled_barriers
60            .abort_and_mark_blocked(database_id, "cluster is under recovering");
61    }
62
63    fn mark_ready(&self, options: MarkReadyOptions) {
64        let is_global = matches!(&options, MarkReadyOptions::Global { .. });
65        self.scheduled_barriers.mark_ready(options);
66        if is_global {
67            self.set_status(BarrierManagerStatus::Running);
68        }
69    }
70
71    #[await_tree::instrument("post_collect_command({command})")]
72    async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
73        command.post_collect(self).await
74    }
75
76    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
77        self.metadata_manager
78            .notify_finish_failed(database_id, err)
79            .await
80    }
81
82    #[await_tree::instrument("finish_creating_job({job})")]
83    async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
84        job.finish(&self.metadata_manager).await
85    }
86
87    #[await_tree::instrument("new_control_stream({})", node.id)]
88    async fn new_control_stream(
89        &self,
90        node: &WorkerNode,
91        init_request: &PbInitRequest,
92    ) -> MetaResult<StreamingControlHandle> {
93        self.new_control_stream_impl(node, init_request).await
94    }
95
96    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
97        self.reload_runtime_info_impl().await
98    }
99
100    async fn reload_database_runtime_info(
101        &self,
102        database_id: DatabaseId,
103    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
104        self.reload_database_runtime_info_impl(database_id).await
105    }
106}
107
108impl GlobalBarrierWorkerContextImpl {
109    fn set_status(&self, new_status: BarrierManagerStatus) {
110        self.status.store(Arc::new(new_status));
111    }
112}
113
114impl CommandContext {
115    /// Do some stuffs after barriers are collected and the new storage version is committed, for
116    /// the given command.
117    pub async fn post_collect(
118        &self,
119        barrier_manager_context: &GlobalBarrierWorkerContextImpl,
120    ) -> MetaResult<()> {
121        let Some(command) = &self.command else {
122            return Ok(());
123        };
124        match command {
125            Command::Flush => {}
126
127            Command::Throttle(_) => {}
128
129            Command::Pause => {}
130
131            Command::Resume => {}
132
133            Command::SourceChangeSplit(split_assignment) => {
134                barrier_manager_context
135                    .metadata_manager
136                    .update_actor_splits_by_split_assignment(split_assignment)
137                    .await?;
138                barrier_manager_context
139                    .source_manager
140                    .apply_source_change(SourceChange::SplitChange(split_assignment.clone()))
141                    .await;
142            }
143
144            Command::DropStreamingJobs {
145                unregistered_state_table_ids,
146                ..
147            } => {
148                barrier_manager_context
149                    .hummock_manager
150                    .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
151                    .await?;
152            }
153            Command::CreateStreamingJob {
154                info,
155                job_type,
156                cross_db_snapshot_backfill_info,
157            } => {
158                match job_type {
159                    CreateStreamingJobType::SinkIntoTable(
160                        replace_plan @ ReplaceStreamJobPlan {
161                            old_fragments,
162                            new_fragments,
163                            upstream_fragment_downstreams,
164                            init_split_assignment,
165                            ..
166                        },
167                    ) => {
168                        barrier_manager_context
169                            .metadata_manager
170                            .catalog_controller
171                            .post_collect_job_fragments(
172                                new_fragments.stream_job_id.table_id as _,
173                                new_fragments.actor_ids(),
174                                upstream_fragment_downstreams,
175                                init_split_assignment,
176                            )
177                            .await?;
178                        barrier_manager_context
179                            .source_manager
180                            .handle_replace_job(
181                                old_fragments,
182                                new_fragments.stream_source_fragments(),
183                                init_split_assignment.clone(),
184                                replace_plan,
185                            )
186                            .await;
187                    }
188                    CreateStreamingJobType::Normal => {
189                        barrier_manager_context
190                            .metadata_manager
191                            .catalog_controller
192                            .fill_snapshot_backfill_epoch(
193                                info.stream_job_fragments.fragments.iter().filter_map(
194                                    |(fragment_id, fragment)| {
195                                        if (fragment.fragment_type_mask
196                                            & PbFragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32)
197                                            != 0
198                                        {
199                                            Some(*fragment_id as _)
200                                        } else {
201                                            None
202                                        }
203                                    },
204                                ),
205                                None,
206                                cross_db_snapshot_backfill_info,
207                            )
208                            .await?
209                    }
210                    CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
211                        barrier_manager_context
212                            .metadata_manager
213                            .catalog_controller
214                            .fill_snapshot_backfill_epoch(
215                                info.stream_job_fragments.fragments.iter().filter_map(
216                                    |(fragment_id, fragment)| {
217                                        if (fragment.fragment_type_mask
218                                            & (PbFragmentTypeFlag::SnapshotBackfillStreamScan as u32 | PbFragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32))
219                                            != 0
220                                        {
221                                            Some(*fragment_id as _)
222                                        } else {
223                                            None
224                                        }
225                                    },
226                                ),
227                                Some(snapshot_backfill_info),
228                                cross_db_snapshot_backfill_info,
229                            )
230                            .await?
231                    }
232                }
233
234                // Do `post_collect_job_fragments` of the original streaming job in the end, so that in any previous failure,
235                // we won't mark the job as `Creating`, and then the job will be later clean by the recovery triggered by the returned error.
236                let CreateStreamingJobCommandInfo {
237                    stream_job_fragments,
238                    upstream_fragment_downstreams,
239                    init_split_assignment,
240                    streaming_job,
241                    ..
242                } = info;
243                barrier_manager_context
244                    .metadata_manager
245                    .catalog_controller
246                    .post_collect_job_fragments_inner(
247                        stream_job_fragments.stream_job_id().table_id as _,
248                        stream_job_fragments.actor_ids(),
249                        upstream_fragment_downstreams,
250                        init_split_assignment,
251                        streaming_job.is_materialized_view(),
252                    )
253                    .await?;
254
255                let source_change = SourceChange::CreateJob {
256                    added_source_fragments: stream_job_fragments.stream_source_fragments(),
257                    added_backfill_fragments: stream_job_fragments.source_backfill_fragments()?,
258                    split_assignment: init_split_assignment.clone(),
259                };
260
261                barrier_manager_context
262                    .source_manager
263                    .apply_source_change(source_change)
264                    .await;
265            }
266            Command::RescheduleFragment {
267                reschedules,
268                post_updates,
269                ..
270            } => {
271                barrier_manager_context
272                    .scale_controller
273                    .post_apply_reschedule(reschedules, post_updates)
274                    .await?;
275            }
276
277            Command::ReplaceStreamJob(
278                replace_plan @ ReplaceStreamJobPlan {
279                    old_fragments,
280                    new_fragments,
281                    upstream_fragment_downstreams,
282                    init_split_assignment,
283                    to_drop_state_table_ids,
284                    ..
285                },
286            ) => {
287                // Update actors and actor_dispatchers for new table fragments.
288                barrier_manager_context
289                    .metadata_manager
290                    .catalog_controller
291                    .post_collect_job_fragments(
292                        new_fragments.stream_job_id.table_id as _,
293                        new_fragments.actor_ids(),
294                        upstream_fragment_downstreams,
295                        init_split_assignment,
296                    )
297                    .await?;
298
299                // Apply the split changes in source manager.
300                barrier_manager_context
301                    .source_manager
302                    .handle_replace_job(
303                        old_fragments,
304                        new_fragments.stream_source_fragments(),
305                        init_split_assignment.clone(),
306                        replace_plan,
307                    )
308                    .await;
309                barrier_manager_context
310                    .hummock_manager
311                    .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
312                    .await?;
313            }
314
315            Command::CreateSubscription {
316                subscription_id, ..
317            } => {
318                barrier_manager_context
319                    .metadata_manager
320                    .catalog_controller
321                    .finish_create_subscription_catalog(*subscription_id)
322                    .await?
323            }
324            Command::DropSubscription { .. } => {}
325            Command::MergeSnapshotBackfillStreamingJobs(_) => {}
326            Command::ConnectorPropsChange(_) => {}
327            Command::StartFragmentBackfill { .. } => {}
328        }
329
330        Ok(())
331    }
332}