risingwave_meta/barrier/context/
mod.rs1mod context_impl;
16mod recovery;
17
18use std::future::Future;
19use std::sync::Arc;
20
21use arc_swap::ArcSwap;
22use risingwave_common::catalog::DatabaseId;
23use risingwave_common::id::JobId;
24use risingwave_pb::common::WorkerNode;
25use risingwave_pb::hummock::HummockVersionStats;
26use risingwave_pb::stream_service::barrier_complete_response::{
27 PbListFinishedSource, PbLoadFinishedSource,
28};
29use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
30use risingwave_rpc_client::StreamingControlHandle;
31
32use crate::MetaResult;
33use crate::barrier::command::CommandContext;
34use crate::barrier::progress::TrackingJob;
35use crate::barrier::schedule::{MarkReadyOptions, ScheduledBarriers};
36use crate::barrier::{
37 BarrierManagerStatus, BarrierScheduler, BarrierWorkerRuntimeInfoSnapshot,
38 DatabaseRuntimeInfoSnapshot, RecoveryReason, Scheduled,
39};
40use crate::hummock::{CommitEpochInfo, HummockManagerRef};
41use crate::manager::{MetaSrvEnv, MetadataManager};
42use crate::stream::{ScaleControllerRef, SourceManagerRef};
43
44pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static {
45 fn commit_epoch(
46 &self,
47 commit_info: CommitEpochInfo,
48 ) -> impl Future<Output = MetaResult<HummockVersionStats>> + Send + '_;
49
50 async fn next_scheduled(&self) -> Scheduled;
51 fn abort_and_mark_blocked(
52 &self,
53 database_id: Option<DatabaseId>,
54 recovery_reason: RecoveryReason,
55 );
56 fn mark_ready(&self, options: MarkReadyOptions);
57
58 fn post_collect_command<'a>(
59 &'a self,
60 command: &'a CommandContext,
61 ) -> impl Future<Output = MetaResult<()>> + Send + 'a;
62
63 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String);
64
65 fn finish_creating_job(
66 &self,
67 job: TrackingJob,
68 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
69
70 fn finish_cdc_table_backfill(
71 &self,
72 job_id: JobId,
73 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
74
75 fn new_control_stream<'a>(
76 &'a self,
77 node: &'a WorkerNode,
78 init_request: &'a PbInitRequest,
79 ) -> impl Future<Output = MetaResult<StreamingControlHandle>> + Send + 'a;
80
81 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot>;
82
83 async fn reload_database_runtime_info(
84 &self,
85 database_id: DatabaseId,
86 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>>;
87
88 fn handle_list_finished_source_ids(
89 &self,
90 list_finished_source_ids: Vec<PbListFinishedSource>,
91 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
92
93 fn handle_load_finished_source_ids(
94 &self,
95 load_finished_source_ids: Vec<PbLoadFinishedSource>,
96 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
97
98 fn handle_refresh_finished_table_ids(
99 &self,
100 refresh_finished_table_job_ids: Vec<JobId>,
101 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
102}
103
104pub(super) struct GlobalBarrierWorkerContextImpl {
105 scheduled_barriers: ScheduledBarriers,
106
107 status: Arc<ArcSwap<BarrierManagerStatus>>,
108
109 pub(super) metadata_manager: MetadataManager,
110
111 hummock_manager: HummockManagerRef,
112
113 source_manager: SourceManagerRef,
114
115 _scale_controller: ScaleControllerRef,
116
117 pub(super) env: MetaSrvEnv,
118
119 barrier_scheduler: BarrierScheduler,
121}
122
123impl GlobalBarrierWorkerContextImpl {
124 pub(super) fn new(
125 scheduled_barriers: ScheduledBarriers,
126 status: Arc<ArcSwap<BarrierManagerStatus>>,
127 metadata_manager: MetadataManager,
128 hummock_manager: HummockManagerRef,
129 source_manager: SourceManagerRef,
130 scale_controller: ScaleControllerRef,
131 env: MetaSrvEnv,
132 barrier_scheduler: BarrierScheduler,
133 ) -> Self {
134 Self {
135 scheduled_barriers,
136 status,
137 metadata_manager,
138 hummock_manager,
139 source_manager,
140 _scale_controller: scale_controller,
141 env,
142 barrier_scheduler,
143 }
144 }
145
146 pub(super) fn status(&self) -> Arc<ArcSwap<BarrierManagerStatus>> {
147 self.status.clone()
148 }
149}