risingwave_meta/barrier/context/
mod.rs1mod context_impl;
16pub(crate) mod recovery;
17
18use std::future::Future;
19use std::sync::Arc;
20
21use arc_swap::ArcSwap;
22use risingwave_common::catalog::DatabaseId;
23use risingwave_common::id::JobId;
24use risingwave_pb::common::WorkerNode;
25use risingwave_pb::hummock::HummockVersionStats;
26use risingwave_pb::stream_service::barrier_complete_response::{
27 PbListFinishedSource, PbLoadFinishedSource,
28};
29use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
30use risingwave_rpc_client::StreamingControlHandle;
31
32use crate::MetaResult;
33use crate::barrier::command::PostCollectCommand;
34use crate::barrier::progress::TrackingJob;
35use crate::barrier::schedule::{MarkReadyOptions, ScheduledBarriers};
36use crate::barrier::{
37 BarrierManagerStatus, BarrierScheduler, BarrierWorkerRuntimeInfoSnapshot, BatchRefreshInfo,
38 CreateStreamingJobCommandInfo, CreateStreamingJobType, DatabaseRuntimeInfoSnapshot,
39 RecoveryReason, Scheduled, SnapshotBackfillInfo,
40};
41use crate::hummock::{CommitEpochInfo, HummockManagerRef};
42use crate::manager::sink_coordination::SinkCoordinatorManager;
43use crate::manager::{MetaSrvEnv, MetadataManager};
44use crate::stream::source_manager::SplitAssignment;
45use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
46
47#[derive(Debug)]
48pub(super) struct CreateSnapshotBackfillJobCommandInfo {
49 pub info: CreateStreamingJobCommandInfo,
50 pub snapshot_backfill_info: SnapshotBackfillInfo,
51 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
52 pub resolved_split_assignment: SplitAssignment,
53 pub refresh_interval_sec: Option<u64>,
55}
56
57impl CreateSnapshotBackfillJobCommandInfo {
58 pub(super) fn into_post_collect(self) -> PostCollectCommand {
59 let job_type = if let Some(refresh_interval_sec) = self.refresh_interval_sec {
60 CreateStreamingJobType::BatchRefresh(BatchRefreshInfo {
61 snapshot_backfill_info: self.snapshot_backfill_info,
62 refresh_interval_sec,
63 })
64 } else {
65 CreateStreamingJobType::SnapshotBackfill(self.snapshot_backfill_info)
66 };
67 PostCollectCommand::CreateStreamingJob {
68 info: self.info,
69 job_type,
70 cross_db_snapshot_backfill_info: self.cross_db_snapshot_backfill_info,
71 resolved_split_assignment: self.resolved_split_assignment,
72 }
73 }
74}
75
76pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static {
77 fn commit_epoch(
78 &self,
79 commit_info: CommitEpochInfo,
80 ) -> impl Future<Output = MetaResult<HummockVersionStats>> + Send + '_;
81
82 async fn next_scheduled(&self) -> Scheduled;
83 fn abort_and_mark_blocked(
84 &self,
85 database_id: Option<DatabaseId>,
86 recovery_reason: RecoveryReason,
87 );
88 fn mark_ready(&self, options: MarkReadyOptions);
89
90 fn post_collect_command(
91 &self,
92 command: PostCollectCommand,
93 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
94
95 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String);
96
97 fn finish_creating_job(
98 &self,
99 job: TrackingJob,
100 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
101
102 fn finish_cdc_table_backfill(
103 &self,
104 job_id: JobId,
105 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
106
107 fn new_control_stream<'a>(
108 &'a self,
109 node: &'a WorkerNode,
110 init_request: &'a PbInitRequest,
111 ) -> impl Future<Output = MetaResult<StreamingControlHandle>> + Send + 'a;
112
113 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot>;
114
115 async fn reload_database_runtime_info(
116 &self,
117 database_id: DatabaseId,
118 ) -> MetaResult<DatabaseRuntimeInfoSnapshot>;
119
120 fn handle_list_finished_source_ids(
121 &self,
122 list_finished_source_ids: Vec<PbListFinishedSource>,
123 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
124
125 fn handle_load_finished_source_ids(
126 &self,
127 load_finished_source_ids: Vec<PbLoadFinishedSource>,
128 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
129
130 fn handle_refresh_finished_table_ids(
131 &self,
132 refresh_finished_table_job_ids: Vec<JobId>,
133 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
134}
135
136pub(super) struct GlobalBarrierWorkerContextImpl {
137 scheduled_barriers: ScheduledBarriers,
138
139 status: Arc<ArcSwap<BarrierManagerStatus>>,
140
141 pub(super) metadata_manager: MetadataManager,
142
143 hummock_manager: HummockManagerRef,
144
145 source_manager: SourceManagerRef,
146
147 _scale_controller: ScaleControllerRef,
148
149 pub(super) env: MetaSrvEnv,
150
151 barrier_scheduler: BarrierScheduler,
153
154 pub(super) refresh_manager: GlobalRefreshManagerRef,
155
156 sink_manager: SinkCoordinatorManager,
157}
158
159impl GlobalBarrierWorkerContextImpl {
160 pub(super) fn new(
161 scheduled_barriers: ScheduledBarriers,
162 status: Arc<ArcSwap<BarrierManagerStatus>>,
163 metadata_manager: MetadataManager,
164 hummock_manager: HummockManagerRef,
165 source_manager: SourceManagerRef,
166 scale_controller: ScaleControllerRef,
167 env: MetaSrvEnv,
168 barrier_scheduler: BarrierScheduler,
169 refresh_manager: GlobalRefreshManagerRef,
170 sink_manager: SinkCoordinatorManager,
171 ) -> Self {
172 Self {
173 scheduled_barriers,
174 status,
175 metadata_manager,
176 hummock_manager,
177 source_manager,
178 _scale_controller: scale_controller,
179 env,
180 barrier_scheduler,
181 refresh_manager,
182 sink_manager,
183 }
184 }
185
186 pub(super) fn status(&self) -> Arc<ArcSwap<BarrierManagerStatus>> {
187 self.status.clone()
188 }
189}