risingwave_meta/barrier/context/
mod.rs1mod context_impl;
16pub(crate) mod recovery;
17
18use std::future::Future;
19use std::sync::Arc;
20
21use arc_swap::ArcSwap;
22use risingwave_common::catalog::DatabaseId;
23use risingwave_common::id::JobId;
24use risingwave_pb::common::WorkerNode;
25use risingwave_pb::hummock::HummockVersionStats;
26use risingwave_pb::stream_service::barrier_complete_response::{
27 PbListFinishedSource, PbLoadFinishedSource,
28};
29use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
30use risingwave_rpc_client::StreamingControlHandle;
31
32use crate::MetaResult;
33use crate::barrier::command::PostCollectCommand;
34use crate::barrier::progress::TrackingJob;
35use crate::barrier::schedule::{MarkReadyOptions, ScheduledBarriers};
36use crate::barrier::{
37 BarrierManagerStatus, BarrierScheduler, BarrierWorkerRuntimeInfoSnapshot,
38 CreateStreamingJobCommandInfo, CreateStreamingJobType, DatabaseRuntimeInfoSnapshot,
39 RecoveryReason, Scheduled, SnapshotBackfillInfo,
40};
41use crate::hummock::{CommitEpochInfo, HummockManagerRef};
42use crate::manager::sink_coordination::SinkCoordinatorManager;
43use crate::manager::{MetaSrvEnv, MetadataManager};
44use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
45
46#[derive(Debug)]
47pub(super) struct CreateSnapshotBackfillJobCommandInfo {
48 pub info: CreateStreamingJobCommandInfo,
49 pub snapshot_backfill_info: SnapshotBackfillInfo,
50 pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
51}
52
53impl CreateSnapshotBackfillJobCommandInfo {
54 pub(super) fn into_post_collect(self) -> PostCollectCommand {
55 PostCollectCommand::CreateStreamingJob {
56 info: self.info,
57 job_type: CreateStreamingJobType::SnapshotBackfill(self.snapshot_backfill_info),
58 cross_db_snapshot_backfill_info: self.cross_db_snapshot_backfill_info,
59 }
60 }
61}
62
63pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static {
64 fn commit_epoch(
65 &self,
66 commit_info: CommitEpochInfo,
67 ) -> impl Future<Output = MetaResult<HummockVersionStats>> + Send + '_;
68
69 async fn next_scheduled(&self) -> Scheduled;
70 fn abort_and_mark_blocked(
71 &self,
72 database_id: Option<DatabaseId>,
73 recovery_reason: RecoveryReason,
74 );
75 fn mark_ready(&self, options: MarkReadyOptions);
76
77 fn post_collect_command(
78 &self,
79 command: PostCollectCommand,
80 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
81
82 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String);
83
84 fn finish_creating_job(
85 &self,
86 job: TrackingJob,
87 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
88
89 fn finish_cdc_table_backfill(
90 &self,
91 job_id: JobId,
92 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
93
94 fn new_control_stream<'a>(
95 &'a self,
96 node: &'a WorkerNode,
97 init_request: &'a PbInitRequest,
98 ) -> impl Future<Output = MetaResult<StreamingControlHandle>> + Send + 'a;
99
100 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot>;
101
102 async fn reload_database_runtime_info(
103 &self,
104 database_id: DatabaseId,
105 ) -> MetaResult<DatabaseRuntimeInfoSnapshot>;
106
107 fn handle_list_finished_source_ids(
108 &self,
109 list_finished_source_ids: Vec<PbListFinishedSource>,
110 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
111
112 fn handle_load_finished_source_ids(
113 &self,
114 load_finished_source_ids: Vec<PbLoadFinishedSource>,
115 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
116
117 fn handle_refresh_finished_table_ids(
118 &self,
119 refresh_finished_table_job_ids: Vec<JobId>,
120 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
121}
122
123pub(super) struct GlobalBarrierWorkerContextImpl {
124 scheduled_barriers: ScheduledBarriers,
125
126 status: Arc<ArcSwap<BarrierManagerStatus>>,
127
128 pub(super) metadata_manager: MetadataManager,
129
130 hummock_manager: HummockManagerRef,
131
132 source_manager: SourceManagerRef,
133
134 _scale_controller: ScaleControllerRef,
135
136 pub(super) env: MetaSrvEnv,
137
138 barrier_scheduler: BarrierScheduler,
140
141 pub(super) refresh_manager: GlobalRefreshManagerRef,
142
143 sink_manager: SinkCoordinatorManager,
144}
145
146impl GlobalBarrierWorkerContextImpl {
147 pub(super) fn new(
148 scheduled_barriers: ScheduledBarriers,
149 status: Arc<ArcSwap<BarrierManagerStatus>>,
150 metadata_manager: MetadataManager,
151 hummock_manager: HummockManagerRef,
152 source_manager: SourceManagerRef,
153 scale_controller: ScaleControllerRef,
154 env: MetaSrvEnv,
155 barrier_scheduler: BarrierScheduler,
156 refresh_manager: GlobalRefreshManagerRef,
157 sink_manager: SinkCoordinatorManager,
158 ) -> Self {
159 Self {
160 scheduled_barriers,
161 status,
162 metadata_manager,
163 hummock_manager,
164 source_manager,
165 _scale_controller: scale_controller,
166 env,
167 barrier_scheduler,
168 refresh_manager,
169 sink_manager,
170 }
171 }
172
173 pub(super) fn status(&self) -> Arc<ArcSwap<BarrierManagerStatus>> {
174 self.status.clone()
175 }
176}