risingwave_meta/barrier/context/
context_impl.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::catalog::DatabaseId;
18use risingwave_pb::common::WorkerNode;
19use risingwave_pb::hummock::HummockVersionStats;
20use risingwave_pb::stream_plan::PbFragmentTypeFlag;
21use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
22use risingwave_rpc_client::StreamingControlHandle;
23
24use crate::MetaResult;
25use crate::barrier::command::CommandContext;
26use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
27use crate::barrier::progress::TrackingJob;
28use crate::barrier::schedule::MarkReadyOptions;
29use crate::barrier::{
30    BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
31    CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
32    Scheduled,
33};
34use crate::hummock::CommitEpochInfo;
35use crate::stream::SourceChange;
36
37impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
38    async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
39        self.hummock_manager.commit_epoch(commit_info).await?;
40        Ok(self.hummock_manager.get_version_stats().await)
41    }
42
43    async fn next_scheduled(&self) -> Scheduled {
44        self.scheduled_barriers.next_scheduled().await
45    }
46
47    fn abort_and_mark_blocked(
48        &self,
49        database_id: Option<DatabaseId>,
50        recovery_reason: RecoveryReason,
51    ) {
52        if database_id.is_none() {
53            self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
54        }
55
56        // Mark blocked and abort buffered schedules, they might be dirty already.
57        self.scheduled_barriers
58            .abort_and_mark_blocked(database_id, "cluster is under recovering");
59    }
60
61    fn mark_ready(&self, options: MarkReadyOptions) {
62        let is_global = matches!(&options, MarkReadyOptions::Global { .. });
63        self.scheduled_barriers.mark_ready(options);
64        if is_global {
65            self.set_status(BarrierManagerStatus::Running);
66        }
67    }
68
69    async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
70        command.post_collect(self).await
71    }
72
73    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
74        self.metadata_manager
75            .notify_finish_failed(database_id, err)
76            .await
77    }
78
79    async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
80        job.finish(&self.metadata_manager).await
81    }
82
83    async fn new_control_stream(
84        &self,
85        node: &WorkerNode,
86        init_request: &PbInitRequest,
87    ) -> MetaResult<StreamingControlHandle> {
88        self.new_control_stream_impl(node, init_request).await
89    }
90
91    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
92        self.reload_runtime_info_impl().await
93    }
94
95    async fn reload_database_runtime_info(
96        &self,
97        database_id: DatabaseId,
98    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
99        self.reload_database_runtime_info_impl(database_id).await
100    }
101}
102
103impl GlobalBarrierWorkerContextImpl {
104    fn set_status(&self, new_status: BarrierManagerStatus) {
105        self.status.store(Arc::new(new_status));
106    }
107}
108
109impl CommandContext {
110    /// Do some stuffs after barriers are collected and the new storage version is committed, for
111    /// the given command.
112    pub async fn post_collect(
113        &self,
114        barrier_manager_context: &GlobalBarrierWorkerContextImpl,
115    ) -> MetaResult<()> {
116        let Some(command) = &self.command else {
117            return Ok(());
118        };
119        match command {
120            Command::Flush => {}
121
122            Command::Throttle(_) => {}
123
124            Command::Pause => {}
125
126            Command::Resume => {}
127
128            Command::SourceChangeSplit(split_assignment) => {
129                barrier_manager_context
130                    .metadata_manager
131                    .update_actor_splits_by_split_assignment(split_assignment)
132                    .await?;
133                barrier_manager_context
134                    .source_manager
135                    .apply_source_change(SourceChange::SplitChange(split_assignment.clone()))
136                    .await;
137            }
138
139            Command::DropStreamingJobs {
140                unregistered_state_table_ids,
141                ..
142            } => {
143                barrier_manager_context
144                    .hummock_manager
145                    .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
146                    .await?;
147            }
148            Command::CreateStreamingJob {
149                info,
150                job_type,
151                cross_db_snapshot_backfill_info,
152            } => {
153                let mut is_sink_into_table = false;
154                match job_type {
155                    CreateStreamingJobType::SinkIntoTable(
156                        replace_plan @ ReplaceStreamJobPlan {
157                            old_fragments,
158                            new_fragments,
159                            upstream_fragment_downstreams,
160                            init_split_assignment,
161                            ..
162                        },
163                    ) => {
164                        is_sink_into_table = true;
165                        barrier_manager_context
166                            .metadata_manager
167                            .catalog_controller
168                            .post_collect_job_fragments(
169                                new_fragments.stream_job_id.table_id as _,
170                                new_fragments.actor_ids(),
171                                upstream_fragment_downstreams,
172                                init_split_assignment,
173                            )
174                            .await?;
175                        barrier_manager_context
176                            .source_manager
177                            .handle_replace_job(
178                                old_fragments,
179                                new_fragments.stream_source_fragments(),
180                                init_split_assignment.clone(),
181                                replace_plan,
182                            )
183                            .await;
184                    }
185                    CreateStreamingJobType::Normal => {
186                        barrier_manager_context
187                            .metadata_manager
188                            .catalog_controller
189                            .fill_snapshot_backfill_epoch(
190                                info.stream_job_fragments.fragments.iter().filter_map(
191                                    |(fragment_id, fragment)| {
192                                        if (fragment.fragment_type_mask
193                                            & PbFragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32)
194                                            != 0
195                                        {
196                                            Some(*fragment_id as _)
197                                        } else {
198                                            None
199                                        }
200                                    },
201                                ),
202                                None,
203                                cross_db_snapshot_backfill_info,
204                            )
205                            .await?
206                    }
207                    CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
208                        barrier_manager_context
209                            .metadata_manager
210                            .catalog_controller
211                            .fill_snapshot_backfill_epoch(
212                                info.stream_job_fragments.fragments.iter().filter_map(
213                                    |(fragment_id, fragment)| {
214                                        if (fragment.fragment_type_mask
215                                            & (PbFragmentTypeFlag::SnapshotBackfillStreamScan as u32 | PbFragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32))
216                                            != 0
217                                        {
218                                            Some(*fragment_id as _)
219                                        } else {
220                                            None
221                                        }
222                                    },
223                                ),
224                                Some(snapshot_backfill_info),
225                                cross_db_snapshot_backfill_info,
226                            )
227                            .await?
228                    }
229                }
230
231                // Do `post_collect_job_fragments` of the original streaming job in the end, so that in any previous failure,
232                // we won't mark the job as `Creating`, and then the job will be later clean by the recovery triggered by the returned error.
233                let CreateStreamingJobCommandInfo {
234                    stream_job_fragments,
235                    upstream_fragment_downstreams,
236                    init_split_assignment,
237                    streaming_job,
238                    ..
239                } = info;
240                barrier_manager_context
241                    .metadata_manager
242                    .catalog_controller
243                    .post_collect_job_fragments_inner(
244                        stream_job_fragments.stream_job_id().table_id as _,
245                        stream_job_fragments.actor_ids(),
246                        upstream_fragment_downstreams,
247                        init_split_assignment,
248                        streaming_job.is_materialized_view(),
249                    )
250                    .await?;
251
252                if !is_sink_into_table {
253                    barrier_manager_context
254                        .source_manager
255                        .apply_source_change(SourceChange::CreateJob {
256                            added_source_fragments: stream_job_fragments.stream_source_fragments(),
257                            added_backfill_fragments: stream_job_fragments
258                                .source_backfill_fragments()?,
259                            split_assignment: init_split_assignment.clone(),
260                        })
261                        .await;
262                }
263            }
264            Command::RescheduleFragment {
265                reschedules,
266                post_updates,
267                ..
268            } => {
269                barrier_manager_context
270                    .scale_controller
271                    .post_apply_reschedule(reschedules, post_updates)
272                    .await?;
273            }
274
275            Command::ReplaceStreamJob(
276                replace_plan @ ReplaceStreamJobPlan {
277                    old_fragments,
278                    new_fragments,
279                    upstream_fragment_downstreams,
280                    init_split_assignment,
281                    to_drop_state_table_ids,
282                    ..
283                },
284            ) => {
285                // Update actors and actor_dispatchers for new table fragments.
286                barrier_manager_context
287                    .metadata_manager
288                    .catalog_controller
289                    .post_collect_job_fragments(
290                        new_fragments.stream_job_id.table_id as _,
291                        new_fragments.actor_ids(),
292                        upstream_fragment_downstreams,
293                        init_split_assignment,
294                    )
295                    .await?;
296
297                // Apply the split changes in source manager.
298                barrier_manager_context
299                    .source_manager
300                    .handle_replace_job(
301                        old_fragments,
302                        new_fragments.stream_source_fragments(),
303                        init_split_assignment.clone(),
304                        replace_plan,
305                    )
306                    .await;
307                barrier_manager_context
308                    .hummock_manager
309                    .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
310                    .await?;
311            }
312
313            Command::CreateSubscription {
314                subscription_id, ..
315            } => {
316                barrier_manager_context
317                    .metadata_manager
318                    .catalog_controller
319                    .finish_create_subscription_catalog(*subscription_id)
320                    .await?
321            }
322            Command::DropSubscription { .. } => {}
323            Command::MergeSnapshotBackfillStreamingJobs(_) => {}
324        }
325
326        Ok(())
327    }
328}