risingwave_meta/barrier/context/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod context_impl;
16mod recovery;
17
18use std::future::Future;
19use std::sync::Arc;
20
21use arc_swap::ArcSwap;
22use risingwave_common::catalog::DatabaseId;
23use risingwave_pb::common::WorkerNode;
24use risingwave_pb::hummock::HummockVersionStats;
25use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
26use risingwave_rpc_client::StreamingControlHandle;
27
28use crate::MetaResult;
29use crate::barrier::command::CommandContext;
30use crate::barrier::progress::TrackingJob;
31use crate::barrier::schedule::{MarkReadyOptions, ScheduledBarriers};
32use crate::barrier::{
33    BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, DatabaseRuntimeInfoSnapshot,
34    RecoveryReason, Scheduled,
35};
36use crate::hummock::{CommitEpochInfo, HummockManagerRef};
37use crate::manager::{MetaSrvEnv, MetadataManager};
38use crate::stream::{ScaleControllerRef, SourceManagerRef};
39
40pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static {
41    fn commit_epoch(
42        &self,
43        commit_info: CommitEpochInfo,
44    ) -> impl Future<Output = MetaResult<HummockVersionStats>> + Send + '_;
45
46    async fn next_scheduled(&self) -> Scheduled;
47    fn abort_and_mark_blocked(
48        &self,
49        database_id: Option<DatabaseId>,
50        recovery_reason: RecoveryReason,
51    );
52    fn mark_ready(&self, options: MarkReadyOptions);
53
54    fn post_collect_command<'a>(
55        &'a self,
56        command: &'a CommandContext,
57    ) -> impl Future<Output = MetaResult<()>> + Send + 'a;
58
59    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String);
60
61    fn finish_creating_job(
62        &self,
63        job: TrackingJob,
64    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
65
66    async fn new_control_stream(
67        &self,
68        node: &WorkerNode,
69        init_request: &PbInitRequest,
70    ) -> MetaResult<StreamingControlHandle>;
71
72    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot>;
73
74    async fn reload_database_runtime_info(
75        &self,
76        database_id: DatabaseId,
77    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>>;
78}
79
80pub(super) struct GlobalBarrierWorkerContextImpl {
81    scheduled_barriers: ScheduledBarriers,
82
83    status: Arc<ArcSwap<BarrierManagerStatus>>,
84
85    pub(super) metadata_manager: MetadataManager,
86
87    hummock_manager: HummockManagerRef,
88
89    source_manager: SourceManagerRef,
90
91    scale_controller: ScaleControllerRef,
92
93    pub(super) env: MetaSrvEnv,
94}
95
96impl GlobalBarrierWorkerContextImpl {
97    pub(super) fn new(
98        scheduled_barriers: ScheduledBarriers,
99        status: Arc<ArcSwap<BarrierManagerStatus>>,
100        metadata_manager: MetadataManager,
101        hummock_manager: HummockManagerRef,
102        source_manager: SourceManagerRef,
103        scale_controller: ScaleControllerRef,
104        env: MetaSrvEnv,
105    ) -> Self {
106        Self {
107            scheduled_barriers,
108            status,
109            metadata_manager,
110            hummock_manager,
111            source_manager,
112            scale_controller,
113            env,
114        }
115    }
116
117    pub(super) fn status(&self) -> Arc<ArcSwap<BarrierManagerStatus>> {
118        self.status.clone()
119    }
120}