risingwave_meta/barrier/context/
mod.rs1mod context_impl;
16mod recovery;
17
18use std::future::Future;
19use std::sync::Arc;
20
21use arc_swap::ArcSwap;
22use risingwave_common::catalog::{DatabaseId, TableId};
23use risingwave_pb::common::WorkerNode;
24use risingwave_pb::hummock::HummockVersionStats;
25use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
26use risingwave_rpc_client::StreamingControlHandle;
27
28use crate::MetaResult;
29use crate::barrier::command::CommandContext;
30use crate::barrier::progress::TrackingJob;
31use crate::barrier::schedule::{MarkReadyOptions, ScheduledBarriers};
32use crate::barrier::{
33 BarrierManagerStatus, BarrierScheduler, BarrierWorkerRuntimeInfoSnapshot,
34 DatabaseRuntimeInfoSnapshot, RecoveryReason, Scheduled,
35};
36use crate::hummock::{CommitEpochInfo, HummockManagerRef};
37use crate::manager::{MetaSrvEnv, MetadataManager};
38use crate::stream::{ScaleControllerRef, SourceManagerRef};
39
40pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static {
41 fn commit_epoch(
42 &self,
43 commit_info: CommitEpochInfo,
44 ) -> impl Future<Output = MetaResult<HummockVersionStats>> + Send + '_;
45
46 async fn next_scheduled(&self) -> Scheduled;
47 fn abort_and_mark_blocked(
48 &self,
49 database_id: Option<DatabaseId>,
50 recovery_reason: RecoveryReason,
51 );
52 fn mark_ready(&self, options: MarkReadyOptions);
53
54 fn post_collect_command<'a>(
55 &'a self,
56 command: &'a CommandContext,
57 ) -> impl Future<Output = MetaResult<()>> + Send + 'a;
58
59 async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String);
60
61 fn finish_creating_job(
62 &self,
63 job: TrackingJob,
64 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
65
66 fn finish_cdc_table_backfill(
67 &self,
68 job_id: TableId,
69 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
70
71 async fn new_control_stream(
72 &self,
73 node: &WorkerNode,
74 init_request: &PbInitRequest,
75 ) -> MetaResult<StreamingControlHandle>;
76
77 async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot>;
78
79 async fn reload_database_runtime_info(
80 &self,
81 database_id: DatabaseId,
82 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>>;
83
84 fn handle_load_finished_source_ids(
85 &self,
86 load_finished_source_ids: Vec<u32>,
87 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
88
89 fn handle_refresh_finished_table_ids(
90 &self,
91 refresh_finished_table_ids: Vec<u32>,
92 ) -> impl Future<Output = MetaResult<()>> + Send + '_;
93}
94
95pub(super) struct GlobalBarrierWorkerContextImpl {
96 scheduled_barriers: ScheduledBarriers,
97
98 status: Arc<ArcSwap<BarrierManagerStatus>>,
99
100 pub(super) metadata_manager: MetadataManager,
101
102 hummock_manager: HummockManagerRef,
103
104 source_manager: SourceManagerRef,
105
106 scale_controller: ScaleControllerRef,
107
108 pub(super) env: MetaSrvEnv,
109
110 barrier_scheduler: BarrierScheduler,
112}
113
114impl GlobalBarrierWorkerContextImpl {
115 pub(super) fn new(
116 scheduled_barriers: ScheduledBarriers,
117 status: Arc<ArcSwap<BarrierManagerStatus>>,
118 metadata_manager: MetadataManager,
119 hummock_manager: HummockManagerRef,
120 source_manager: SourceManagerRef,
121 scale_controller: ScaleControllerRef,
122 env: MetaSrvEnv,
123 barrier_scheduler: BarrierScheduler,
124 ) -> Self {
125 Self {
126 scheduled_barriers,
127 status,
128 metadata_manager,
129 hummock_manager,
130 source_manager,
131 scale_controller,
132 env,
133 barrier_scheduler,
134 }
135 }
136
137 pub(super) fn status(&self) -> Arc<ArcSwap<BarrierManagerStatus>> {
138 self.status.clone()
139 }
140}