risingwave_meta/barrier/context/
context_impl.rs1use std::sync::Arc;
16
17use risingwave_common::catalog::DatabaseId;
18use risingwave_pb::common::WorkerNode;
19use risingwave_pb::hummock::HummockVersionStats;
20use risingwave_pb::stream_plan::PbFragmentTypeFlag;
21use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
22use risingwave_rpc_client::StreamingControlHandle;
23
24use crate::MetaResult;
25use crate::barrier::command::CommandContext;
26use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
27use crate::barrier::progress::TrackingJob;
28use crate::barrier::schedule::MarkReadyOptions;
29use crate::barrier::{
30 BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
31 CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
32 Scheduled,
33};
34use crate::hummock::CommitEpochInfo;
35use crate::stream::SourceChange;
36
37impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
38 #[await_tree::instrument]
39 async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
40 self.hummock_manager.commit_epoch(commit_info).await?;
41 Ok(self.hummock_manager.get_version_stats().await)
42 }
43
44 #[await_tree::instrument("next_scheduled_barrier")]
45 async fn next_scheduled(&self) -> Scheduled {
46 self.scheduled_barriers.next_scheduled().await
47 }
48
49 fn abort_and_mark_blocked(
50 &self,
51 database_id: Option<DatabaseId>,
52 recovery_reason: RecoveryReason,
53 ) {
54 if database_id.is_none() {
55 self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
56 }
57
58 self.scheduled_barriers
60 .abort_and_mark_blocked(database_id, "cluster is under recovering");
61 }
62
63 fn mark_ready(&self, options: MarkReadyOptions) {
64 let is_global = matches!(&options, MarkReadyOptions::Global { .. });
65 self.scheduled_barriers.mark_ready(options);
66 if is_global {
67 self.set_status(BarrierManagerStatus::Running);
68 }
69 }
70
71 #[await_tree::instrument("post_collect_command({command})")]
72 async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
73 command.post_collect(self).await
74 }
75
76 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
77 self.metadata_manager
78 .notify_finish_failed(database_id, err)
79 .await
80 }
81
82 #[await_tree::instrument("finish_creating_job({job})")]
83 async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
84 job.finish(&self.metadata_manager).await
85 }
86
87 #[await_tree::instrument("new_control_stream({})", node.id)]
88 async fn new_control_stream(
89 &self,
90 node: &WorkerNode,
91 init_request: &PbInitRequest,
92 ) -> MetaResult<StreamingControlHandle> {
93 self.new_control_stream_impl(node, init_request).await
94 }
95
96 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
97 self.reload_runtime_info_impl().await
98 }
99
100 async fn reload_database_runtime_info(
101 &self,
102 database_id: DatabaseId,
103 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
104 self.reload_database_runtime_info_impl(database_id).await
105 }
106}
107
108impl GlobalBarrierWorkerContextImpl {
109 fn set_status(&self, new_status: BarrierManagerStatus) {
110 self.status.store(Arc::new(new_status));
111 }
112}
113
114impl CommandContext {
115 pub async fn post_collect(
118 &self,
119 barrier_manager_context: &GlobalBarrierWorkerContextImpl,
120 ) -> MetaResult<()> {
121 let Some(command) = &self.command else {
122 return Ok(());
123 };
124 match command {
125 Command::Flush => {}
126
127 Command::Throttle(_) => {}
128
129 Command::Pause => {}
130
131 Command::Resume => {}
132
133 Command::SourceChangeSplit(split_assignment) => {
134 barrier_manager_context
135 .metadata_manager
136 .update_actor_splits_by_split_assignment(split_assignment)
137 .await?;
138 barrier_manager_context
139 .source_manager
140 .apply_source_change(SourceChange::SplitChange(split_assignment.clone()))
141 .await;
142 }
143
144 Command::DropStreamingJobs {
145 unregistered_state_table_ids,
146 ..
147 } => {
148 barrier_manager_context
149 .hummock_manager
150 .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
151 .await?;
152 }
153 Command::CreateStreamingJob {
154 info,
155 job_type,
156 cross_db_snapshot_backfill_info,
157 } => {
158 match job_type {
159 CreateStreamingJobType::SinkIntoTable(
160 replace_plan @ ReplaceStreamJobPlan {
161 old_fragments,
162 new_fragments,
163 upstream_fragment_downstreams,
164 init_split_assignment,
165 ..
166 },
167 ) => {
168 barrier_manager_context
169 .metadata_manager
170 .catalog_controller
171 .post_collect_job_fragments(
172 new_fragments.stream_job_id.table_id as _,
173 new_fragments.actor_ids(),
174 upstream_fragment_downstreams,
175 init_split_assignment,
176 )
177 .await?;
178 barrier_manager_context
179 .source_manager
180 .handle_replace_job(
181 old_fragments,
182 new_fragments.stream_source_fragments(),
183 init_split_assignment.clone(),
184 replace_plan,
185 )
186 .await;
187 }
188 CreateStreamingJobType::Normal => {
189 barrier_manager_context
190 .metadata_manager
191 .catalog_controller
192 .fill_snapshot_backfill_epoch(
193 info.stream_job_fragments.fragments.iter().filter_map(
194 |(fragment_id, fragment)| {
195 if (fragment.fragment_type_mask
196 & PbFragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32)
197 != 0
198 {
199 Some(*fragment_id as _)
200 } else {
201 None
202 }
203 },
204 ),
205 None,
206 cross_db_snapshot_backfill_info,
207 )
208 .await?
209 }
210 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
211 barrier_manager_context
212 .metadata_manager
213 .catalog_controller
214 .fill_snapshot_backfill_epoch(
215 info.stream_job_fragments.fragments.iter().filter_map(
216 |(fragment_id, fragment)| {
217 if (fragment.fragment_type_mask
218 & (PbFragmentTypeFlag::SnapshotBackfillStreamScan as u32 | PbFragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32))
219 != 0
220 {
221 Some(*fragment_id as _)
222 } else {
223 None
224 }
225 },
226 ),
227 Some(snapshot_backfill_info),
228 cross_db_snapshot_backfill_info,
229 )
230 .await?
231 }
232 }
233
234 let CreateStreamingJobCommandInfo {
237 stream_job_fragments,
238 upstream_fragment_downstreams,
239 init_split_assignment,
240 streaming_job,
241 ..
242 } = info;
243 barrier_manager_context
244 .metadata_manager
245 .catalog_controller
246 .post_collect_job_fragments_inner(
247 stream_job_fragments.stream_job_id().table_id as _,
248 stream_job_fragments.actor_ids(),
249 upstream_fragment_downstreams,
250 init_split_assignment,
251 streaming_job.is_materialized_view(),
252 )
253 .await?;
254
255 let source_change = SourceChange::CreateJob {
256 added_source_fragments: stream_job_fragments.stream_source_fragments(),
257 added_backfill_fragments: stream_job_fragments.source_backfill_fragments()?,
258 split_assignment: init_split_assignment.clone(),
259 };
260
261 barrier_manager_context
262 .source_manager
263 .apply_source_change(source_change)
264 .await;
265 }
266 Command::RescheduleFragment {
267 reschedules,
268 post_updates,
269 ..
270 } => {
271 barrier_manager_context
272 .scale_controller
273 .post_apply_reschedule(reschedules, post_updates)
274 .await?;
275 }
276
277 Command::ReplaceStreamJob(
278 replace_plan @ ReplaceStreamJobPlan {
279 old_fragments,
280 new_fragments,
281 upstream_fragment_downstreams,
282 init_split_assignment,
283 to_drop_state_table_ids,
284 ..
285 },
286 ) => {
287 barrier_manager_context
289 .metadata_manager
290 .catalog_controller
291 .post_collect_job_fragments(
292 new_fragments.stream_job_id.table_id as _,
293 new_fragments.actor_ids(),
294 upstream_fragment_downstreams,
295 init_split_assignment,
296 )
297 .await?;
298
299 barrier_manager_context
301 .source_manager
302 .handle_replace_job(
303 old_fragments,
304 new_fragments.stream_source_fragments(),
305 init_split_assignment.clone(),
306 replace_plan,
307 )
308 .await;
309 barrier_manager_context
310 .hummock_manager
311 .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
312 .await?;
313 }
314
315 Command::CreateSubscription {
316 subscription_id, ..
317 } => {
318 barrier_manager_context
319 .metadata_manager
320 .catalog_controller
321 .finish_create_subscription_catalog(*subscription_id)
322 .await?
323 }
324 Command::DropSubscription { .. } => {}
325 Command::MergeSnapshotBackfillStreamingJobs(_) => {}
326 Command::ConnectorPropsChange(_) => {}
327 Command::StartFragmentBackfill { .. } => {}
328 }
329
330 Ok(())
331 }
332}