risingwave_meta/barrier/context/
mod.rs1mod context_impl;
16mod recovery;
17
18use std::future::Future;
19use std::sync::Arc;
20
21use arc_swap::ArcSwap;
22use risingwave_common::catalog::{DatabaseId, TableId};
23use risingwave_pb::common::WorkerNode;
24use risingwave_pb::hummock::HummockVersionStats;
25use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
26use risingwave_rpc_client::StreamingControlHandle;
27
28use crate::MetaResult;
29use crate::barrier::command::CommandContext;
30use crate::barrier::progress::TrackingJob;
31use crate::barrier::schedule::{MarkReadyOptions, ScheduledBarriers};
32use crate::barrier::{
33    BarrierManagerStatus, BarrierScheduler, BarrierWorkerRuntimeInfoSnapshot,
34    DatabaseRuntimeInfoSnapshot, RecoveryReason, Scheduled,
35};
36use crate::hummock::{CommitEpochInfo, HummockManagerRef};
37use crate::manager::{MetaSrvEnv, MetadataManager};
38use crate::stream::{ScaleControllerRef, SourceManagerRef};
39
40pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static {
41    fn commit_epoch(
42        &self,
43        commit_info: CommitEpochInfo,
44    ) -> impl Future<Output = MetaResult<HummockVersionStats>> + Send + '_;
45
46    async fn next_scheduled(&self) -> Scheduled;
47    fn abort_and_mark_blocked(
48        &self,
49        database_id: Option<DatabaseId>,
50        recovery_reason: RecoveryReason,
51    );
52    fn mark_ready(&self, options: MarkReadyOptions);
53
54    fn post_collect_command<'a>(
55        &'a self,
56        command: &'a CommandContext,
57    ) -> impl Future<Output = MetaResult<()>> + Send + 'a;
58
59    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String);
60
61    fn finish_creating_job(
62        &self,
63        job: TrackingJob,
64    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
65
66    fn finish_cdc_table_backfill(
67        &self,
68        job_id: TableId,
69    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
70
71    fn new_control_stream<'a>(
72        &'a self,
73        node: &'a WorkerNode,
74        init_request: &'a PbInitRequest,
75    ) -> impl Future<Output = MetaResult<StreamingControlHandle>> + Send + 'a;
76
77    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot>;
78
79    async fn reload_database_runtime_info(
80        &self,
81        database_id: DatabaseId,
82    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>>;
83
84    fn handle_list_finished_source_ids(
85        &self,
86        list_finished_source_ids: Vec<u32>,
87    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
88
89    fn handle_load_finished_source_ids(
90        &self,
91        load_finished_source_ids: Vec<u32>,
92    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
93
94    fn handle_refresh_finished_table_ids(
95        &self,
96        refresh_finished_table_ids: Vec<u32>,
97    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
98}
99
100pub(super) struct GlobalBarrierWorkerContextImpl {
101    scheduled_barriers: ScheduledBarriers,
102
103    status: Arc<ArcSwap<BarrierManagerStatus>>,
104
105    pub(super) metadata_manager: MetadataManager,
106
107    hummock_manager: HummockManagerRef,
108
109    source_manager: SourceManagerRef,
110
111    _scale_controller: ScaleControllerRef,
112
113    pub(super) env: MetaSrvEnv,
114
115    barrier_scheduler: BarrierScheduler,
117}
118
119impl GlobalBarrierWorkerContextImpl {
120    pub(super) fn new(
121        scheduled_barriers: ScheduledBarriers,
122        status: Arc<ArcSwap<BarrierManagerStatus>>,
123        metadata_manager: MetadataManager,
124        hummock_manager: HummockManagerRef,
125        source_manager: SourceManagerRef,
126        scale_controller: ScaleControllerRef,
127        env: MetaSrvEnv,
128        barrier_scheduler: BarrierScheduler,
129    ) -> Self {
130        Self {
131            scheduled_barriers,
132            status,
133            metadata_manager,
134            hummock_manager,
135            source_manager,
136            _scale_controller: scale_controller,
137            env,
138            barrier_scheduler,
139        }
140    }
141
142    pub(super) fn status(&self) -> Arc<ArcSwap<BarrierManagerStatus>> {
143        self.status.clone()
144    }
145}