risingwave_meta/barrier/context/
mod.rs1mod context_impl;
16pub(crate) mod recovery;
17
18use std::future::Future;
19use std::sync::Arc;
20
21use arc_swap::ArcSwap;
22use risingwave_common::catalog::DatabaseId;
23use risingwave_common::id::JobId;
24use risingwave_pb::common::WorkerNode;
25use risingwave_pb::hummock::HummockVersionStats;
26use risingwave_pb::stream_service::barrier_complete_response::{
27 PbListFinishedSource, PbLoadFinishedSource,
28};
29use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
30use risingwave_rpc_client::StreamingControlHandle;
31
32use crate::MetaResult;
33use crate::barrier::checkpoint::independent_job::BatchRefreshJobTriggerContext;
34use crate::barrier::command::PostCollectCommand;
35use crate::barrier::progress::TrackingJob;
36use crate::barrier::schedule::{MarkReadyOptions, ScheduledBarriers};
37use crate::barrier::{
38 BarrierManagerStatus, BarrierScheduler, BarrierWorkerRuntimeInfoSnapshot, BatchRefreshInfo,
39 CreateStreamingJobCommandInfo, CreateStreamingJobType, DatabaseRuntimeInfoSnapshot,
40 RecoveryReason, Scheduled, SnapshotBackfillInfo,
41};
42use crate::hummock::{CommitEpochInfo, HummockManagerRef};
43use crate::manager::sink_coordination::SinkCoordinatorManager;
44use crate::manager::{MetaSrvEnv, MetadataManager};
45use crate::stream::source_manager::SplitAssignment;
46use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
47
48#[derive(Debug)]
49pub(super) struct CreateSnapshotBackfillJobCommandInfo {
50 pub info: CreateStreamingJobCommandInfo,
51 pub snapshot_backfill_info: SnapshotBackfillInfo,
52 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
53 pub resolved_split_assignment: SplitAssignment,
54 pub refresh_interval_sec: Option<u64>,
56}
57
58impl CreateSnapshotBackfillJobCommandInfo {
59 pub(super) fn into_post_collect(self) -> PostCollectCommand {
60 let job_type = if let Some(refresh_interval_sec) = self.refresh_interval_sec {
61 CreateStreamingJobType::BatchRefresh(BatchRefreshInfo {
62 snapshot_backfill_info: self.snapshot_backfill_info,
63 refresh_interval_sec,
64 })
65 } else {
66 CreateStreamingJobType::SnapshotBackfill(self.snapshot_backfill_info)
67 };
68 PostCollectCommand::CreateStreamingJob {
69 info: self.info,
70 job_type,
71 cross_db_snapshot_backfill_info: self.cross_db_snapshot_backfill_info,
72 resolved_split_assignment: self.resolved_split_assignment,
73 }
74 }
75}
76
77pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static {
78 fn commit_epoch(
79 &self,
80 commit_info: CommitEpochInfo,
81 ) -> impl Future<Output = MetaResult<HummockVersionStats>> + Send + '_;
82
83 async fn next_scheduled(&self) -> Scheduled;
84 fn abort_and_mark_blocked(
85 &self,
86 database_id: Option<DatabaseId>,
87 recovery_reason: RecoveryReason,
88 );
89 fn mark_ready(&self, options: MarkReadyOptions);
90
91 fn post_collect_command(
92 &self,
93 command: PostCollectCommand,
94 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
95
96 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String);
97
98 fn finish_creating_job(
99 &self,
100 job: TrackingJob,
101 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
102
103 fn finish_cdc_table_backfill(
104 &self,
105 job_id: JobId,
106 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
107
108 fn new_control_stream<'a>(
109 &'a self,
110 node: &'a WorkerNode,
111 init_request: &'a PbInitRequest,
112 ) -> impl Future<Output = MetaResult<StreamingControlHandle>> + Send + 'a;
113
114 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot>;
115
116 async fn reload_database_runtime_info(
117 &self,
118 database_id: DatabaseId,
119 ) -> MetaResult<DatabaseRuntimeInfoSnapshot>;
120
121 fn handle_list_finished_source_ids(
122 &self,
123 list_finished_source_ids: Vec<PbListFinishedSource>,
124 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
125
126 fn handle_load_finished_source_ids(
127 &self,
128 load_finished_source_ids: Vec<PbLoadFinishedSource>,
129 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
130
131 fn handle_refresh_finished_table_ids(
132 &self,
133 refresh_finished_table_job_ids: Vec<JobId>,
134 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
135
136 fn load_batch_refresh_trigger_context(
139 &self,
140 job_id: JobId,
141 database_id: DatabaseId,
142 last_committed_epoch: u64,
143 ) -> impl Future<Output = MetaResult<BatchRefreshJobTriggerContext>> + Send + '_;
144}
145
146pub(super) struct GlobalBarrierWorkerContextImpl {
147 scheduled_barriers: ScheduledBarriers,
148
149 status: Arc<ArcSwap<BarrierManagerStatus>>,
150
151 pub(super) metadata_manager: MetadataManager,
152
153 hummock_manager: HummockManagerRef,
154
155 source_manager: SourceManagerRef,
156
157 _scale_controller: ScaleControllerRef,
158
159 pub(super) env: MetaSrvEnv,
160
161 barrier_scheduler: BarrierScheduler,
163
164 pub(super) refresh_manager: GlobalRefreshManagerRef,
165
166 sink_manager: SinkCoordinatorManager,
167}
168
169impl GlobalBarrierWorkerContextImpl {
170 pub(super) fn new(
171 scheduled_barriers: ScheduledBarriers,
172 status: Arc<ArcSwap<BarrierManagerStatus>>,
173 metadata_manager: MetadataManager,
174 hummock_manager: HummockManagerRef,
175 source_manager: SourceManagerRef,
176 scale_controller: ScaleControllerRef,
177 env: MetaSrvEnv,
178 barrier_scheduler: BarrierScheduler,
179 refresh_manager: GlobalRefreshManagerRef,
180 sink_manager: SinkCoordinatorManager,
181 ) -> Self {
182 Self {
183 scheduled_barriers,
184 status,
185 metadata_manager,
186 hummock_manager,
187 source_manager,
188 _scale_controller: scale_controller,
189 env,
190 barrier_scheduler,
191 refresh_manager,
192 sink_manager,
193 }
194 }
195
196 pub(super) fn status(&self) -> Arc<ArcSwap<BarrierManagerStatus>> {
197 self.status.clone()
198 }
199}