risingwave_meta/barrier/context/
mod.rs1mod context_impl;
16mod recovery;
17
18use std::future::Future;
19use std::sync::Arc;
20
21use arc_swap::ArcSwap;
22use risingwave_common::catalog::DatabaseId;
23use risingwave_common::id::JobId;
24use risingwave_pb::common::WorkerNode;
25use risingwave_pb::hummock::HummockVersionStats;
26use risingwave_pb::stream_service::barrier_complete_response::{
27 PbListFinishedSource, PbLoadFinishedSource,
28};
29use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
30use risingwave_rpc_client::StreamingControlHandle;
31
32use crate::MetaResult;
33use crate::barrier::command::CommandContext;
34use crate::barrier::progress::TrackingJob;
35use crate::barrier::schedule::{MarkReadyOptions, ScheduledBarriers};
36use crate::barrier::{
37 BarrierManagerStatus, BarrierScheduler, BarrierWorkerRuntimeInfoSnapshot,
38 DatabaseRuntimeInfoSnapshot, RecoveryReason, Scheduled,
39};
40use crate::hummock::{CommitEpochInfo, HummockManagerRef};
41use crate::manager::sink_coordination::SinkCoordinatorManager;
42use crate::manager::{MetaSrvEnv, MetadataManager};
43use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
44
45pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static {
46 fn commit_epoch(
47 &self,
48 commit_info: CommitEpochInfo,
49 ) -> impl Future<Output = MetaResult<HummockVersionStats>> + Send + '_;
50
51 async fn next_scheduled(&self) -> Scheduled;
52 fn abort_and_mark_blocked(
53 &self,
54 database_id: Option<DatabaseId>,
55 recovery_reason: RecoveryReason,
56 );
57 fn mark_ready(&self, options: MarkReadyOptions);
58
59 fn post_collect_command<'a>(
60 &'a self,
61 command: &'a CommandContext,
62 ) -> impl Future<Output = MetaResult<()>> + Send + 'a;
63
64 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String);
65
66 fn finish_creating_job(
67 &self,
68 job: TrackingJob,
69 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
70
71 fn finish_cdc_table_backfill(
72 &self,
73 job_id: JobId,
74 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
75
76 fn new_control_stream<'a>(
77 &'a self,
78 node: &'a WorkerNode,
79 init_request: &'a PbInitRequest,
80 ) -> impl Future<Output = MetaResult<StreamingControlHandle>> + Send + 'a;
81
82 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot>;
83
84 async fn reload_database_runtime_info(
85 &self,
86 database_id: DatabaseId,
87 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>>;
88
89 fn handle_list_finished_source_ids(
90 &self,
91 list_finished_source_ids: Vec<PbListFinishedSource>,
92 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
93
94 fn handle_load_finished_source_ids(
95 &self,
96 load_finished_source_ids: Vec<PbLoadFinishedSource>,
97 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
98
99 fn handle_refresh_finished_table_ids(
100 &self,
101 refresh_finished_table_job_ids: Vec<JobId>,
102 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
103}
104
105pub(super) struct GlobalBarrierWorkerContextImpl {
106 scheduled_barriers: ScheduledBarriers,
107
108 status: Arc<ArcSwap<BarrierManagerStatus>>,
109
110 pub(super) metadata_manager: MetadataManager,
111
112 hummock_manager: HummockManagerRef,
113
114 source_manager: SourceManagerRef,
115
116 _scale_controller: ScaleControllerRef,
117
118 pub(super) env: MetaSrvEnv,
119
120 barrier_scheduler: BarrierScheduler,
122
123 pub(super) refresh_manager: GlobalRefreshManagerRef,
124
125 sink_manager: SinkCoordinatorManager,
126}
127
128impl GlobalBarrierWorkerContextImpl {
129 pub(super) fn new(
130 scheduled_barriers: ScheduledBarriers,
131 status: Arc<ArcSwap<BarrierManagerStatus>>,
132 metadata_manager: MetadataManager,
133 hummock_manager: HummockManagerRef,
134 source_manager: SourceManagerRef,
135 scale_controller: ScaleControllerRef,
136 env: MetaSrvEnv,
137 barrier_scheduler: BarrierScheduler,
138 refresh_manager: GlobalRefreshManagerRef,
139 sink_manager: SinkCoordinatorManager,
140 ) -> Self {
141 Self {
142 scheduled_barriers,
143 status,
144 metadata_manager,
145 hummock_manager,
146 source_manager,
147 _scale_controller: scale_controller,
148 env,
149 barrier_scheduler,
150 refresh_manager,
151 sink_manager,
152 }
153 }
154
155 pub(super) fn status(&self) -> Arc<ArcSwap<BarrierManagerStatus>> {
156 self.status.clone()
157 }
158}