risingwave_meta/barrier/context/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod context_impl;
16pub(crate) mod recovery;
17
18use std::future::Future;
19use std::sync::Arc;
20
21use arc_swap::ArcSwap;
22use risingwave_common::catalog::DatabaseId;
23use risingwave_common::id::JobId;
24use risingwave_pb::common::WorkerNode;
25use risingwave_pb::hummock::HummockVersionStats;
26use risingwave_pb::stream_service::barrier_complete_response::{
27    PbListFinishedSource, PbLoadFinishedSource,
28};
29use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
30use risingwave_rpc_client::StreamingControlHandle;
31
32use crate::MetaResult;
33use crate::barrier::command::PostCollectCommand;
34use crate::barrier::progress::TrackingJob;
35use crate::barrier::schedule::{MarkReadyOptions, ScheduledBarriers};
36use crate::barrier::{
37    BarrierManagerStatus, BarrierScheduler, BarrierWorkerRuntimeInfoSnapshot,
38    CreateStreamingJobCommandInfo, CreateStreamingJobType, DatabaseRuntimeInfoSnapshot,
39    RecoveryReason, Scheduled, SnapshotBackfillInfo,
40};
41use crate::hummock::{CommitEpochInfo, HummockManagerRef};
42use crate::manager::sink_coordination::SinkCoordinatorManager;
43use crate::manager::{MetaSrvEnv, MetadataManager};
44use crate::stream::source_manager::SplitAssignment;
45use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
46
47#[derive(Debug)]
48pub(super) struct CreateSnapshotBackfillJobCommandInfo {
49    pub info: CreateStreamingJobCommandInfo,
50    pub snapshot_backfill_info: SnapshotBackfillInfo,
51    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
52    pub resolved_split_assignment: SplitAssignment,
53}
54
55impl CreateSnapshotBackfillJobCommandInfo {
56    pub(super) fn into_post_collect(self) -> PostCollectCommand {
57        PostCollectCommand::CreateStreamingJob {
58            info: self.info,
59            job_type: CreateStreamingJobType::SnapshotBackfill(self.snapshot_backfill_info),
60            cross_db_snapshot_backfill_info: self.cross_db_snapshot_backfill_info,
61            resolved_split_assignment: self.resolved_split_assignment,
62        }
63    }
64}
65
66pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static {
67    fn commit_epoch(
68        &self,
69        commit_info: CommitEpochInfo,
70    ) -> impl Future<Output = MetaResult<HummockVersionStats>> + Send + '_;
71
72    async fn next_scheduled(&self) -> Scheduled;
73    fn abort_and_mark_blocked(
74        &self,
75        database_id: Option<DatabaseId>,
76        recovery_reason: RecoveryReason,
77    );
78    fn mark_ready(&self, options: MarkReadyOptions);
79
80    fn post_collect_command(
81        &self,
82        command: PostCollectCommand,
83    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
84
85    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String);
86
87    fn finish_creating_job(
88        &self,
89        job: TrackingJob,
90    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
91
92    fn finish_cdc_table_backfill(
93        &self,
94        job_id: JobId,
95    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
96
97    fn new_control_stream<'a>(
98        &'a self,
99        node: &'a WorkerNode,
100        init_request: &'a PbInitRequest,
101    ) -> impl Future<Output = MetaResult<StreamingControlHandle>> + Send + 'a;
102
103    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot>;
104
105    async fn reload_database_runtime_info(
106        &self,
107        database_id: DatabaseId,
108    ) -> MetaResult<DatabaseRuntimeInfoSnapshot>;
109
110    fn handle_list_finished_source_ids(
111        &self,
112        list_finished_source_ids: Vec<PbListFinishedSource>,
113    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
114
115    fn handle_load_finished_source_ids(
116        &self,
117        load_finished_source_ids: Vec<PbLoadFinishedSource>,
118    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
119
120    fn handle_refresh_finished_table_ids(
121        &self,
122        refresh_finished_table_job_ids: Vec<JobId>,
123    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
124}
125
126pub(super) struct GlobalBarrierWorkerContextImpl {
127    scheduled_barriers: ScheduledBarriers,
128
129    status: Arc<ArcSwap<BarrierManagerStatus>>,
130
131    pub(super) metadata_manager: MetadataManager,
132
133    hummock_manager: HummockManagerRef,
134
135    source_manager: SourceManagerRef,
136
137    _scale_controller: ScaleControllerRef,
138
139    pub(super) env: MetaSrvEnv,
140
141    /// Barrier scheduler for scheduling load finish commands
142    barrier_scheduler: BarrierScheduler,
143
144    pub(super) refresh_manager: GlobalRefreshManagerRef,
145
146    sink_manager: SinkCoordinatorManager,
147}
148
149impl GlobalBarrierWorkerContextImpl {
150    pub(super) fn new(
151        scheduled_barriers: ScheduledBarriers,
152        status: Arc<ArcSwap<BarrierManagerStatus>>,
153        metadata_manager: MetadataManager,
154        hummock_manager: HummockManagerRef,
155        source_manager: SourceManagerRef,
156        scale_controller: ScaleControllerRef,
157        env: MetaSrvEnv,
158        barrier_scheduler: BarrierScheduler,
159        refresh_manager: GlobalRefreshManagerRef,
160        sink_manager: SinkCoordinatorManager,
161    ) -> Self {
162        Self {
163            scheduled_barriers,
164            status,
165            metadata_manager,
166            hummock_manager,
167            source_manager,
168            _scale_controller: scale_controller,
169            env,
170            barrier_scheduler,
171            refresh_manager,
172            sink_manager,
173        }
174    }
175
176    pub(super) fn status(&self) -> Arc<ArcSwap<BarrierManagerStatus>> {
177        self.status.clone()
178    }
179}