risingwave_meta/barrier/context/
context_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::catalog::DatabaseId;
18use risingwave_pb::common::WorkerNode;
19use risingwave_pb::hummock::HummockVersionStats;
20use risingwave_pb::stream_plan::PbFragmentTypeFlag;
21use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
22use risingwave_rpc_client::StreamingControlHandle;
23
24use crate::MetaResult;
25use crate::barrier::command::CommandContext;
26use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
27use crate::barrier::progress::TrackingJob;
28use crate::barrier::schedule::MarkReadyOptions;
29use crate::barrier::{
30    BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
31    CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
32    Scheduled,
33};
34use crate::hummock::CommitEpochInfo;
35use crate::stream::SourceChange;
36
37impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
38    async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
39        self.hummock_manager.commit_epoch(commit_info).await?;
40        Ok(self.hummock_manager.get_version_stats().await)
41    }
42
43    async fn next_scheduled(&self) -> Scheduled {
44        self.scheduled_barriers.next_scheduled().await
45    }
46
47    fn abort_and_mark_blocked(
48        &self,
49        database_id: Option<DatabaseId>,
50        recovery_reason: RecoveryReason,
51    ) {
52        if database_id.is_none() {
53            self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
54        }
55
56        // Mark blocked and abort buffered schedules, they might be dirty already.
57        self.scheduled_barriers
58            .abort_and_mark_blocked(database_id, "cluster is under recovering");
59    }
60
61    fn mark_ready(&self, options: MarkReadyOptions) {
62        let is_global = matches!(&options, MarkReadyOptions::Global { .. });
63        self.scheduled_barriers.mark_ready(options);
64        if is_global {
65            self.set_status(BarrierManagerStatus::Running);
66        }
67    }
68
69    async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
70        command.post_collect(self).await
71    }
72
73    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
74        self.metadata_manager
75            .notify_finish_failed(database_id, err)
76            .await
77    }
78
79    async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
80        job.finish(&self.metadata_manager).await
81    }
82
83    async fn new_control_stream(
84        &self,
85        node: &WorkerNode,
86        init_request: &PbInitRequest,
87    ) -> MetaResult<StreamingControlHandle> {
88        self.new_control_stream_impl(node, init_request).await
89    }
90
91    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
92        self.reload_runtime_info_impl().await
93    }
94
95    async fn reload_database_runtime_info(
96        &self,
97        database_id: DatabaseId,
98    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
99        self.reload_database_runtime_info_impl(database_id).await
100    }
101}
102
103impl GlobalBarrierWorkerContextImpl {
104    fn set_status(&self, new_status: BarrierManagerStatus) {
105        self.status.store(Arc::new(new_status));
106    }
107}
108
109impl CommandContext {
110    /// Do some stuffs after barriers are collected and the new storage version is committed, for
111    /// the given command.
112    pub async fn post_collect(
113        &self,
114        barrier_manager_context: &GlobalBarrierWorkerContextImpl,
115    ) -> MetaResult<()> {
116        let Some(command) = &self.command else {
117            return Ok(());
118        };
119        match command {
120            Command::Flush => {}
121
122            Command::Throttle(_) => {}
123
124            Command::Pause => {}
125
126            Command::Resume => {}
127
128            Command::SourceChangeSplit(split_assignment) => {
129                barrier_manager_context
130                    .metadata_manager
131                    .update_actor_splits_by_split_assignment(split_assignment)
132                    .await?;
133                barrier_manager_context
134                    .source_manager
135                    .apply_source_change(SourceChange::SplitChange(split_assignment.clone()))
136                    .await;
137            }
138
139            Command::DropStreamingJobs {
140                unregistered_state_table_ids,
141                ..
142            } => {
143                barrier_manager_context
144                    .hummock_manager
145                    .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
146                    .await?;
147            }
148            Command::CreateStreamingJob {
149                info,
150                job_type,
151                cross_db_snapshot_backfill_info,
152            } => {
153                match job_type {
154                    CreateStreamingJobType::SinkIntoTable(
155                        replace_plan @ ReplaceStreamJobPlan {
156                            old_fragments,
157                            new_fragments,
158                            upstream_fragment_downstreams,
159                            init_split_assignment,
160                            ..
161                        },
162                    ) => {
163                        barrier_manager_context
164                            .metadata_manager
165                            .catalog_controller
166                            .post_collect_job_fragments(
167                                new_fragments.stream_job_id.table_id as _,
168                                new_fragments.actor_ids(),
169                                upstream_fragment_downstreams,
170                                init_split_assignment,
171                            )
172                            .await?;
173                        barrier_manager_context
174                            .source_manager
175                            .handle_replace_job(
176                                old_fragments,
177                                new_fragments.stream_source_fragments(),
178                                init_split_assignment.clone(),
179                                replace_plan,
180                            )
181                            .await;
182                    }
183                    CreateStreamingJobType::Normal => {
184                        barrier_manager_context
185                            .metadata_manager
186                            .catalog_controller
187                            .fill_snapshot_backfill_epoch(
188                                info.stream_job_fragments.fragments.iter().filter_map(
189                                    |(fragment_id, fragment)| {
190                                        if (fragment.fragment_type_mask
191                                            & PbFragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32)
192                                            != 0
193                                        {
194                                            Some(*fragment_id as _)
195                                        } else {
196                                            None
197                                        }
198                                    },
199                                ),
200                                None,
201                                cross_db_snapshot_backfill_info,
202                            )
203                            .await?
204                    }
205                    CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
206                        barrier_manager_context
207                            .metadata_manager
208                            .catalog_controller
209                            .fill_snapshot_backfill_epoch(
210                                info.stream_job_fragments.fragments.iter().filter_map(
211                                    |(fragment_id, fragment)| {
212                                        if (fragment.fragment_type_mask
213                                            & (PbFragmentTypeFlag::SnapshotBackfillStreamScan as u32 | PbFragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32))
214                                            != 0
215                                        {
216                                            Some(*fragment_id as _)
217                                        } else {
218                                            None
219                                        }
220                                    },
221                                ),
222                                Some(snapshot_backfill_info),
223                                cross_db_snapshot_backfill_info,
224                            )
225                            .await?
226                    }
227                }
228
229                // Do `post_collect_job_fragments` of the original streaming job in the end, so that in any previous failure,
230                // we won't mark the job as `Creating`, and then the job will be later clean by the recovery triggered by the returned error.
231                let CreateStreamingJobCommandInfo {
232                    stream_job_fragments,
233                    upstream_fragment_downstreams,
234                    init_split_assignment,
235                    streaming_job,
236                    ..
237                } = info;
238                barrier_manager_context
239                    .metadata_manager
240                    .catalog_controller
241                    .post_collect_job_fragments_inner(
242                        stream_job_fragments.stream_job_id().table_id as _,
243                        stream_job_fragments.actor_ids(),
244                        upstream_fragment_downstreams,
245                        init_split_assignment,
246                        streaming_job.is_materialized_view(),
247                    )
248                    .await?;
249
250                let source_change = SourceChange::CreateJob {
251                    added_source_fragments: stream_job_fragments.stream_source_fragments(),
252                    added_backfill_fragments: stream_job_fragments.source_backfill_fragments()?,
253                    split_assignment: init_split_assignment.clone(),
254                };
255
256                barrier_manager_context
257                    .source_manager
258                    .apply_source_change(source_change)
259                    .await;
260            }
261            Command::RescheduleFragment {
262                reschedules,
263                post_updates,
264                ..
265            } => {
266                barrier_manager_context
267                    .scale_controller
268                    .post_apply_reschedule(reschedules, post_updates)
269                    .await?;
270            }
271
272            Command::ReplaceStreamJob(
273                replace_plan @ ReplaceStreamJobPlan {
274                    old_fragments,
275                    new_fragments,
276                    upstream_fragment_downstreams,
277                    init_split_assignment,
278                    to_drop_state_table_ids,
279                    ..
280                },
281            ) => {
282                // Update actors and actor_dispatchers for new table fragments.
283                barrier_manager_context
284                    .metadata_manager
285                    .catalog_controller
286                    .post_collect_job_fragments(
287                        new_fragments.stream_job_id.table_id as _,
288                        new_fragments.actor_ids(),
289                        upstream_fragment_downstreams,
290                        init_split_assignment,
291                    )
292                    .await?;
293
294                // Apply the split changes in source manager.
295                barrier_manager_context
296                    .source_manager
297                    .handle_replace_job(
298                        old_fragments,
299                        new_fragments.stream_source_fragments(),
300                        init_split_assignment.clone(),
301                        replace_plan,
302                    )
303                    .await;
304                barrier_manager_context
305                    .hummock_manager
306                    .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
307                    .await?;
308            }
309
310            Command::CreateSubscription {
311                subscription_id, ..
312            } => {
313                barrier_manager_context
314                    .metadata_manager
315                    .catalog_controller
316                    .finish_create_subscription_catalog(*subscription_id)
317                    .await?
318            }
319            Command::DropSubscription { .. } => {}
320            Command::MergeSnapshotBackfillStreamingJobs(_) => {}
321            Command::ConnectorPropsChange(_) => {}
322            Command::StartFragmentBackfill { .. } => {}
323        }
324
325        Ok(())
326    }
327}