risingwave_meta/stream/refresh_manager.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::catalog::{DatabaseId, TableId};
17use risingwave_meta_model::table::RefreshState;
18use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
19use risingwave_pb::meta::{RefreshRequest, RefreshResponse};
20use thiserror_ext::AsReport;
21
22use crate::barrier::{BarrierScheduler, Command};
23use crate::manager::MetadataManager;
24use crate::{MetaError, MetaResult};
25
26/// # High level design for refresh table
27///
28/// - Three tables:
29///
30/// - Main table: serves queries.
31/// - Staging table: receives refreshed content during `Refreshing`.
32/// - Progress table: per-VNode progress state for resumable refresh.
33///
34/// - Phased execution:
35///
36/// - Normal → Refreshing → Merging → Cleanup → Normal.
37/// - Refreshing: load and write to staging.
38/// - Merging: chunked sort-merge integrates staging into main; per-VNode progress persists checkpoints.
39/// - Cleanup: purge staging and reset progress.
40///
41/// - Barrier-first responsiveness:
42///
43/// - Executor uses left-priority `select_with_strategy`, always handling upstream messages/barriers before background merge.
44/// - On barriers, the executor persists progress so restarts resume exactly.
45///
46/// - Meta-managed state:
47///
48/// - `refresh_state` on each table enforces no concurrent refresh and enables recovery after failures.
49/// - Startup recovery resets lingering `Refreshing` tables to `Idle` and lets executors resume `Finishing` safely.
50///
51/// ## Progress Table (Conceptual)
52/// Tracks, per VNode:
53/// - last processed position (e.g., last PK),
54/// - completion flag,
55/// - processed row count,
56/// - last checkpoint epoch.
57///
58/// The executor initializes entries on `RefreshStart`, updates them during merge, and loads them at startup to resume from the last checkpoint.
59///
60/// ## Barrier Coordination and Completion
61/// - Compute reports:
62///
63/// - `refresh_finished_table_ids`: indicates a materialized view finished refreshing.
64/// - `truncate_tables`: staging tables to be cleaned up.
65// - Checkpoint control aggregates these across barrier types; completion handlers in meta:
66/// - update `refresh_state` to `Idle`,
67/// - schedule/handle `LoadFinish`,
68/// - drive cleanup work reliably after the storage version commit.
69///
70/// Manager responsible for handling refresh operations on refreshable tables
71pub struct RefreshManager {
72 metadata_manager: MetadataManager,
73 barrier_scheduler: BarrierScheduler,
74}
75
76impl RefreshManager {
77 /// Create a new `RefreshManager` instance
78 pub fn new(metadata_manager: MetadataManager, barrier_scheduler: BarrierScheduler) -> Self {
79 Self {
80 metadata_manager,
81 barrier_scheduler,
82 }
83 }
84
85 /// Execute a refresh operation for the specified table
86 ///
87 /// This method:
88 /// 1. Validates that the table exists and is refreshable
89 /// 2. Checks current refresh state and ensures no concurrent refresh
90 /// 3. Atomically sets the table state to REFRESHING
91 /// 4. Sends a refresh command through the barrier system
92 /// 5. Returns the result of the refresh operation
93 pub async fn refresh_table(&self, request: RefreshRequest) -> MetaResult<RefreshResponse> {
94 let table_id = TableId::new(request.table_id);
95 let associated_source_id = TableId::new(request.associated_source_id);
96
97 // Validate that the table exists and is refreshable
98 self.validate_refreshable_table(table_id, associated_source_id)
99 .await?;
100
101 tracing::info!("Starting refresh operation for table {}", table_id);
102
103 // Get database_id for the table
104 let database_id = DatabaseId::new(
105 self.metadata_manager
106 .catalog_controller
107 .get_object_database_id(table_id.table_id() as _)
108 .await? as _,
109 );
110
111 // Create refresh command
112 let refresh_command = Command::Refresh {
113 table_id,
114 associated_source_id,
115 };
116
117 // Send refresh command through barrier system
118 match self
119 .barrier_scheduler
120 .run_command(database_id, refresh_command)
121 .await
122 {
123 Ok(_) => {
124 tracing::info!(
125 table_id = %table_id,
126 "Refresh command completed successfully"
127 );
128
129 Ok(RefreshResponse { status: None })
130 }
131 Err(e) => {
132 tracing::error!(
133 error = %e.as_report(),
134 table_id = %table_id,
135 "Failed to execute refresh command, resetting refresh state to Idle"
136 );
137
138 self.metadata_manager
139 .catalog_controller
140 .set_table_refresh_state(table_id.table_id as _, RefreshState::Idle)
141 .await?;
142
143 Err(anyhow!(e)
144 .context(format!("Failed to refresh table {}", table_id))
145 .into())
146 }
147 }
148 }
149
150 /// Validate that the specified table exists and supports refresh operations
151 async fn validate_refreshable_table(
152 &self,
153 table_id: TableId,
154 associated_source_id: TableId,
155 ) -> MetaResult<()> {
156 // Check if table exists in catalog
157 let table = self
158 .metadata_manager
159 .catalog_controller
160 .get_table_by_id(table_id.table_id as _)
161 .await?;
162
163 // Check if table is refreshable
164 if !table.refreshable {
165 return Err(MetaError::invalid_parameter(format!(
166 "Table '{}' is not refreshable. Only tables created with REFRESHABLE flag support manual refresh.",
167 table.name
168 )));
169 }
170
171 if table.optional_associated_source_id
172 != Some(OptionalAssociatedSourceId::AssociatedSourceId(
173 associated_source_id.table_id(),
174 ))
175 {
176 return Err(MetaError::invalid_parameter(format!(
177 "Table '{}' is not associated with source '{}'. table.optional_associated_source_id: {:?}",
178 table.name, associated_source_id, table.optional_associated_source_id
179 )));
180 }
181
182 let current_state = self
183 .metadata_manager
184 .catalog_controller
185 .get_table_refresh_state(table_id.table_id as _)
186 .await?;
187 match current_state {
188 Some(RefreshState::Idle) | None => {
189 // the table is not refreshing. issue a refresh
190 }
191 state @ (Some(RefreshState::Finishing) | Some(RefreshState::Refreshing)) => {
192 return Err(MetaError::invalid_parameter(format!(
193 "Table '{}' is currently in state {:?}. Cannot start a new refresh operation.",
194 table.name,
195 state.unwrap()
196 )));
197 }
198 }
199
200 tracing::debug!(
201 table_id = %table_id,
202 table_name = %table.name,
203 "Table validation passed for refresh operation"
204 );
205
206 Ok(())
207 }
208}