risingwave_meta/barrier/context/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod context_impl;
16pub(crate) mod recovery;
17
18use std::future::Future;
19use std::sync::Arc;
20
21use arc_swap::ArcSwap;
22use risingwave_common::catalog::DatabaseId;
23use risingwave_common::id::JobId;
24use risingwave_pb::common::WorkerNode;
25use risingwave_pb::hummock::HummockVersionStats;
26use risingwave_pb::stream_service::barrier_complete_response::{
27    PbListFinishedSource, PbLoadFinishedSource,
28};
29use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
30use risingwave_rpc_client::StreamingControlHandle;
31
32use crate::MetaResult;
33use crate::barrier::checkpoint::independent_job::BatchRefreshJobTriggerContext;
34use crate::barrier::command::PostCollectCommand;
35use crate::barrier::progress::TrackingJob;
36use crate::barrier::schedule::{MarkReadyOptions, ScheduledBarriers};
37use crate::barrier::{
38    BarrierManagerStatus, BarrierScheduler, BarrierWorkerRuntimeInfoSnapshot, BatchRefreshInfo,
39    CreateStreamingJobCommandInfo, CreateStreamingJobType, DatabaseRuntimeInfoSnapshot,
40    RecoveryReason, Scheduled, SnapshotBackfillInfo,
41};
42use crate::hummock::{CommitEpochInfo, HummockManagerRef};
43use crate::manager::sink_coordination::SinkCoordinatorManager;
44use crate::manager::{MetaSrvEnv, MetadataManager};
45use crate::stream::source_manager::SplitAssignment;
46use crate::stream::{GlobalRefreshManagerRef, ScaleControllerRef, SourceManagerRef};
47
48#[derive(Debug)]
49pub(super) struct CreateSnapshotBackfillJobCommandInfo {
50    pub info: CreateStreamingJobCommandInfo,
51    pub snapshot_backfill_info: SnapshotBackfillInfo,
52    pub cross_db_snapshot_backfill_info: SnapshotBackfillInfo,
53    pub resolved_split_assignment: SplitAssignment,
54    /// If set, this is a batch refresh job rather than a regular snapshot backfill.
55    pub refresh_interval_sec: Option<u64>,
56}
57
58impl CreateSnapshotBackfillJobCommandInfo {
59    pub(super) fn into_post_collect(self) -> PostCollectCommand {
60        let job_type = if let Some(refresh_interval_sec) = self.refresh_interval_sec {
61            CreateStreamingJobType::BatchRefresh(BatchRefreshInfo {
62                snapshot_backfill_info: self.snapshot_backfill_info,
63                refresh_interval_sec,
64            })
65        } else {
66            CreateStreamingJobType::SnapshotBackfill(self.snapshot_backfill_info)
67        };
68        PostCollectCommand::CreateStreamingJob {
69            info: self.info,
70            job_type,
71            cross_db_snapshot_backfill_info: self.cross_db_snapshot_backfill_info,
72            resolved_split_assignment: self.resolved_split_assignment,
73        }
74    }
75}
76
77pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static {
78    fn commit_epoch(
79        &self,
80        commit_info: CommitEpochInfo,
81    ) -> impl Future<Output = MetaResult<HummockVersionStats>> + Send + '_;
82
83    async fn next_scheduled(&self) -> Scheduled;
84    fn abort_and_mark_blocked(
85        &self,
86        database_id: Option<DatabaseId>,
87        recovery_reason: RecoveryReason,
88    );
89    fn mark_ready(&self, options: MarkReadyOptions);
90
91    fn post_collect_command(
92        &self,
93        command: PostCollectCommand,
94    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
95
96    async fn notify_creating_job_failed(&self, database_id: Option<DatabaseId>, err: String);
97
98    fn finish_creating_job(
99        &self,
100        job: TrackingJob,
101    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
102
103    fn finish_cdc_table_backfill(
104        &self,
105        job_id: JobId,
106    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
107
108    fn new_control_stream<'a>(
109        &'a self,
110        node: &'a WorkerNode,
111        init_request: &'a PbInitRequest,
112    ) -> impl Future<Output = MetaResult<StreamingControlHandle>> + Send + 'a;
113
114    async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot>;
115
116    async fn reload_database_runtime_info(
117        &self,
118        database_id: DatabaseId,
119    ) -> MetaResult<DatabaseRuntimeInfoSnapshot>;
120
121    fn handle_list_finished_source_ids(
122        &self,
123        list_finished_source_ids: Vec<PbListFinishedSource>,
124    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
125
126    fn handle_load_finished_source_ids(
127        &self,
128        load_finished_source_ids: Vec<PbLoadFinishedSource>,
129    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
130
131    fn handle_refresh_finished_table_ids(
132        &self,
133        refresh_finished_table_job_ids: Vec<JobId>,
134    ) -> impl Future<Output = MetaResult<()>> + Send + '_;
135
136    /// Load the trigger context for a batch refresh job: fragment metadata, job model,
137    /// upstream log epochs, and target upstream epoch — all bundled in one struct.
138    fn load_batch_refresh_trigger_context(
139        &self,
140        job_id: JobId,
141        database_id: DatabaseId,
142        last_committed_epoch: u64,
143    ) -> impl Future<Output = MetaResult<BatchRefreshJobTriggerContext>> + Send + '_;
144}
145
146pub(super) struct GlobalBarrierWorkerContextImpl {
147    scheduled_barriers: ScheduledBarriers,
148
149    status: Arc<ArcSwap<BarrierManagerStatus>>,
150
151    pub(super) metadata_manager: MetadataManager,
152
153    hummock_manager: HummockManagerRef,
154
155    source_manager: SourceManagerRef,
156
157    _scale_controller: ScaleControllerRef,
158
159    pub(super) env: MetaSrvEnv,
160
161    /// Barrier scheduler for scheduling load finish commands
162    barrier_scheduler: BarrierScheduler,
163
164    pub(super) refresh_manager: GlobalRefreshManagerRef,
165
166    sink_manager: SinkCoordinatorManager,
167}
168
169impl GlobalBarrierWorkerContextImpl {
170    pub(super) fn new(
171        scheduled_barriers: ScheduledBarriers,
172        status: Arc<ArcSwap<BarrierManagerStatus>>,
173        metadata_manager: MetadataManager,
174        hummock_manager: HummockManagerRef,
175        source_manager: SourceManagerRef,
176        scale_controller: ScaleControllerRef,
177        env: MetaSrvEnv,
178        barrier_scheduler: BarrierScheduler,
179        refresh_manager: GlobalRefreshManagerRef,
180        sink_manager: SinkCoordinatorManager,
181    ) -> Self {
182        Self {
183            scheduled_barriers,
184            status,
185            metadata_manager,
186            hummock_manager,
187            source_manager,
188            _scale_controller: scale_controller,
189            env,
190            barrier_scheduler,
191            refresh_manager,
192            sink_manager,
193        }
194    }
195
196    pub(super) fn status(&self) -> Arc<ArcSwap<BarrierManagerStatus>> {
197        self.status.clone()
198    }
199}