risingwave_meta/barrier/context/
context_impl.rs1use std::sync::Arc;
16
17use risingwave_common::catalog::DatabaseId;
18use risingwave_pb::common::WorkerNode;
19use risingwave_pb::hummock::HummockVersionStats;
20use risingwave_pb::stream_plan::PbFragmentTypeFlag;
21use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
22use risingwave_rpc_client::StreamingControlHandle;
23
24use crate::MetaResult;
25use crate::barrier::command::CommandContext;
26use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
27use crate::barrier::progress::TrackingJob;
28use crate::barrier::schedule::MarkReadyOptions;
29use crate::barrier::{
30 BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
31 CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
32 Scheduled,
33};
34use crate::hummock::CommitEpochInfo;
35use crate::stream::SourceChange;
36
37impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
38 async fn commit_epoch(&self, commit_info: CommitEpochInfo) -> MetaResult<HummockVersionStats> {
39 self.hummock_manager.commit_epoch(commit_info).await?;
40 Ok(self.hummock_manager.get_version_stats().await)
41 }
42
43 async fn next_scheduled(&self) -> Scheduled {
44 self.scheduled_barriers.next_scheduled().await
45 }
46
47 fn abort_and_mark_blocked(
48 &self,
49 database_id: Option<DatabaseId>,
50 recovery_reason: RecoveryReason,
51 ) {
52 if database_id.is_none() {
53 self.set_status(BarrierManagerStatus::Recovering(recovery_reason));
54 }
55
56 self.scheduled_barriers
58 .abort_and_mark_blocked(database_id, "cluster is under recovering");
59 }
60
61 fn mark_ready(&self, options: MarkReadyOptions) {
62 let is_global = matches!(&options, MarkReadyOptions::Global { .. });
63 self.scheduled_barriers.mark_ready(options);
64 if is_global {
65 self.set_status(BarrierManagerStatus::Running);
66 }
67 }
68
69 async fn post_collect_command<'a>(&'a self, command: &'a CommandContext) -> MetaResult<()> {
70 command.post_collect(self).await
71 }
72
73 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String) {
74 self.metadata_manager
75 .notify_finish_failed(database_id, err)
76 .await
77 }
78
79 async fn finish_creating_job(&self, job: TrackingJob) -> MetaResult<()> {
80 job.finish(&self.metadata_manager).await
81 }
82
83 async fn new_control_stream(
84 &self,
85 node: &WorkerNode,
86 init_request: &PbInitRequest,
87 ) -> MetaResult<StreamingControlHandle> {
88 self.new_control_stream_impl(node, init_request).await
89 }
90
91 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
92 self.reload_runtime_info_impl().await
93 }
94
95 async fn reload_database_runtime_info(
96 &self,
97 database_id: DatabaseId,
98 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
99 self.reload_database_runtime_info_impl(database_id).await
100 }
101}
102
103impl GlobalBarrierWorkerContextImpl {
104 fn set_status(&self, new_status: BarrierManagerStatus) {
105 self.status.store(Arc::new(new_status));
106 }
107}
108
109impl CommandContext {
110 pub async fn post_collect(
113 &self,
114 barrier_manager_context: &GlobalBarrierWorkerContextImpl,
115 ) -> MetaResult<()> {
116 let Some(command) = &self.command else {
117 return Ok(());
118 };
119 match command {
120 Command::Flush => {}
121
122 Command::Throttle(_) => {}
123
124 Command::Pause => {}
125
126 Command::Resume => {}
127
128 Command::SourceChangeSplit(split_assignment) => {
129 barrier_manager_context
130 .metadata_manager
131 .update_actor_splits_by_split_assignment(split_assignment)
132 .await?;
133 barrier_manager_context
134 .source_manager
135 .apply_source_change(SourceChange::SplitChange(split_assignment.clone()))
136 .await;
137 }
138
139 Command::DropStreamingJobs {
140 unregistered_state_table_ids,
141 ..
142 } => {
143 barrier_manager_context
144 .hummock_manager
145 .unregister_table_ids(unregistered_state_table_ids.iter().cloned())
146 .await?;
147 }
148 Command::CreateStreamingJob {
149 info,
150 job_type,
151 cross_db_snapshot_backfill_info,
152 } => {
153 let mut is_sink_into_table = false;
154 match job_type {
155 CreateStreamingJobType::SinkIntoTable(
156 replace_plan @ ReplaceStreamJobPlan {
157 old_fragments,
158 new_fragments,
159 upstream_fragment_downstreams,
160 init_split_assignment,
161 ..
162 },
163 ) => {
164 is_sink_into_table = true;
165 barrier_manager_context
166 .metadata_manager
167 .catalog_controller
168 .post_collect_job_fragments(
169 new_fragments.stream_job_id.table_id as _,
170 new_fragments.actor_ids(),
171 upstream_fragment_downstreams,
172 init_split_assignment,
173 )
174 .await?;
175 barrier_manager_context
176 .source_manager
177 .handle_replace_job(
178 old_fragments,
179 new_fragments.stream_source_fragments(),
180 init_split_assignment.clone(),
181 replace_plan,
182 )
183 .await;
184 }
185 CreateStreamingJobType::Normal => {
186 barrier_manager_context
187 .metadata_manager
188 .catalog_controller
189 .fill_snapshot_backfill_epoch(
190 info.stream_job_fragments.fragments.iter().filter_map(
191 |(fragment_id, fragment)| {
192 if (fragment.fragment_type_mask
193 & PbFragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32)
194 != 0
195 {
196 Some(*fragment_id as _)
197 } else {
198 None
199 }
200 },
201 ),
202 None,
203 cross_db_snapshot_backfill_info,
204 )
205 .await?
206 }
207 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
208 barrier_manager_context
209 .metadata_manager
210 .catalog_controller
211 .fill_snapshot_backfill_epoch(
212 info.stream_job_fragments.fragments.iter().filter_map(
213 |(fragment_id, fragment)| {
214 if (fragment.fragment_type_mask
215 & (PbFragmentTypeFlag::SnapshotBackfillStreamScan as u32 | PbFragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32))
216 != 0
217 {
218 Some(*fragment_id as _)
219 } else {
220 None
221 }
222 },
223 ),
224 Some(snapshot_backfill_info),
225 cross_db_snapshot_backfill_info,
226 )
227 .await?
228 }
229 }
230
231 let CreateStreamingJobCommandInfo {
234 stream_job_fragments,
235 upstream_fragment_downstreams,
236 init_split_assignment,
237 streaming_job,
238 ..
239 } = info;
240 barrier_manager_context
241 .metadata_manager
242 .catalog_controller
243 .post_collect_job_fragments_inner(
244 stream_job_fragments.stream_job_id().table_id as _,
245 stream_job_fragments.actor_ids(),
246 upstream_fragment_downstreams,
247 init_split_assignment,
248 streaming_job.is_materialized_view(),
249 )
250 .await?;
251
252 if !is_sink_into_table {
253 barrier_manager_context
254 .source_manager
255 .apply_source_change(SourceChange::CreateJob {
256 added_source_fragments: stream_job_fragments.stream_source_fragments(),
257 added_backfill_fragments: stream_job_fragments
258 .source_backfill_fragments()?,
259 split_assignment: init_split_assignment.clone(),
260 })
261 .await;
262 }
263 }
264 Command::RescheduleFragment {
265 reschedules,
266 post_updates,
267 ..
268 } => {
269 barrier_manager_context
270 .scale_controller
271 .post_apply_reschedule(reschedules, post_updates)
272 .await?;
273 }
274
275 Command::ReplaceStreamJob(
276 replace_plan @ ReplaceStreamJobPlan {
277 old_fragments,
278 new_fragments,
279 upstream_fragment_downstreams,
280 init_split_assignment,
281 to_drop_state_table_ids,
282 ..
283 },
284 ) => {
285 barrier_manager_context
287 .metadata_manager
288 .catalog_controller
289 .post_collect_job_fragments(
290 new_fragments.stream_job_id.table_id as _,
291 new_fragments.actor_ids(),
292 upstream_fragment_downstreams,
293 init_split_assignment,
294 )
295 .await?;
296
297 barrier_manager_context
299 .source_manager
300 .handle_replace_job(
301 old_fragments,
302 new_fragments.stream_source_fragments(),
303 init_split_assignment.clone(),
304 replace_plan,
305 )
306 .await;
307 barrier_manager_context
308 .hummock_manager
309 .unregister_table_ids(to_drop_state_table_ids.iter().cloned())
310 .await?;
311 }
312
313 Command::CreateSubscription {
314 subscription_id, ..
315 } => {
316 barrier_manager_context
317 .metadata_manager
318 .catalog_controller
319 .finish_create_subscription_catalog(*subscription_id)
320 .await?
321 }
322 Command::DropSubscription { .. } => {}
323 Command::MergeSnapshotBackfillStreamingJobs(_) => {}
324 }
325
326 Ok(())
327 }
328}