risingwave_meta/stream/refresh_manager.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::catalog::{DatabaseId, TableId};
17use risingwave_meta_model::table::RefreshState;
18use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
19use risingwave_pb::meta::{RefreshRequest, RefreshResponse};
20use thiserror_ext::AsReport;
21
22use crate::barrier::{BarrierScheduler, Command};
23use crate::manager::MetadataManager;
24use crate::{MetaError, MetaResult};
25
26/// # High level design for refresh table
27///
28/// - Three tables:
29///
30/// - Main table: serves queries.
31/// - Staging table: receives refreshed content during `Refreshing`.
32/// - Progress table: per-VNode progress state for resumable refresh.
33///
34/// - Phased execution:
35///
36/// - Normal → Refreshing → Merging → Cleanup → Normal.
37/// - Refreshing: load and write to staging.
38/// - Merging: chunked sort-merge integrates staging into main; per-VNode progress persists checkpoints.
39/// - Cleanup: purge staging and reset progress.
40///
41/// - Barrier-first responsiveness:
42///
43/// - Executor uses left-priority `select_with_strategy`, always handling upstream messages/barriers before background merge.
44/// - On barriers, the executor persists progress so restarts resume exactly.
45///
46/// - Meta-managed state:
47///
48/// - `refresh_state` on each table enforces no concurrent refresh and enables recovery after failures.
49/// - Startup recovery resets lingering `Refreshing` tables to `Idle` and lets executors resume `Finishing` safely.
50///
51/// ## Progress Table (Conceptual)
52/// Tracks, per VNode:
53/// - last processed position (e.g., last PK),
54/// - completion flag,
55/// - processed row count,
56/// - last checkpoint epoch.
57///
58/// The executor initializes entries on `RefreshStart`, updates them during merge, and loads them at startup to resume from the last checkpoint.
59///
60/// ## Barrier Coordination and Completion
61/// - Compute reports:
62///
63/// - `refresh_finished_table_ids`: indicates a materialized view finished refreshing.
64/// - `truncate_tables`: staging tables to be cleaned up.
65// - Checkpoint control aggregates these across barrier types; completion handlers in meta:
66/// - update `refresh_state` to `Idle`,
67/// - schedule/handle `LoadFinish`,
68/// - drive cleanup work reliably after the storage version commit.
69///
70/// Manager responsible for handling refresh operations on refreshable tables
71pub struct RefreshManager {
72 metadata_manager: MetadataManager,
73 barrier_scheduler: BarrierScheduler,
74}
75
76impl RefreshManager {
77 /// Create a new `RefreshManager` instance
78 pub fn new(metadata_manager: MetadataManager, barrier_scheduler: BarrierScheduler) -> Self {
79 Self {
80 metadata_manager,
81 barrier_scheduler,
82 }
83 }
84
85 /// Execute a refresh operation for the specified table
86 ///
87 /// This method:
88 /// 1. Validates that the table exists and is refreshable
89 /// 2. Checks current refresh state and ensures no concurrent refresh
90 /// 3. Atomically sets the table state to REFRESHING
91 /// 4. Sends a refresh command through the barrier system
92 /// 5. Returns the result of the refresh operation
93 pub async fn refresh_table(&self, request: RefreshRequest) -> MetaResult<RefreshResponse> {
94 let table_id = TableId::new(request.table_id);
95 let associated_source_id = TableId::new(request.associated_source_id);
96
97 tracing::info!("Starting refresh operation for table {}", table_id);
98
99 // Validate that the table exists and is refreshable
100 self.validate_refreshable_table(table_id, associated_source_id)
101 .await?;
102
103 // Get database_id for the table
104 let database_id = DatabaseId::new(
105 self.metadata_manager
106 .catalog_controller
107 .get_object_database_id(table_id.table_id() as _)
108 .await? as _,
109 );
110
111 // Create refresh command
112 let refresh_command = Command::Refresh {
113 table_id,
114 associated_source_id,
115 };
116
117 // Send refresh command through barrier system
118 match self
119 .barrier_scheduler
120 .run_command(database_id, refresh_command)
121 .await
122 {
123 Ok(_) => {
124 tracing::info!(
125 table_id = %table_id,
126 "Refresh command completed successfully"
127 );
128
129 Ok(RefreshResponse { status: None })
130 }
131 Err(e) => {
132 tracing::error!(
133 error = %e.as_report(),
134 table_id = %table_id,
135 "Failed to execute refresh command"
136 );
137
138 Err(anyhow!(e)
139 .context(format!("Failed to refresh table {}", table_id))
140 .into())
141 }
142 }
143 }
144
145 /// Validate that the specified table exists and supports refresh operations
146 async fn validate_refreshable_table(
147 &self,
148 table_id: TableId,
149 associated_source_id: TableId,
150 ) -> MetaResult<()> {
151 // Check if table exists in catalog
152 let table = self
153 .metadata_manager
154 .catalog_controller
155 .get_table_by_id(table_id.table_id as _)
156 .await?;
157
158 // Check if table is refreshable
159 if !table.refreshable {
160 return Err(MetaError::invalid_parameter(format!(
161 "Table '{}' is not refreshable. Only tables created with REFRESHABLE flag support manual refresh.",
162 table.name
163 )));
164 }
165
166 if table.optional_associated_source_id
167 != Some(OptionalAssociatedSourceId::AssociatedSourceId(
168 associated_source_id.table_id(),
169 ))
170 {
171 return Err(MetaError::invalid_parameter(format!(
172 "Table '{}' is not associated with source '{}'. table.optional_associated_source_id: {:?}",
173 table.name, associated_source_id, table.optional_associated_source_id
174 )));
175 }
176
177 let current_state = self
178 .metadata_manager
179 .catalog_controller
180 .get_table_refresh_state(table_id.table_id as _)
181 .await?;
182 match current_state {
183 Some(RefreshState::Idle) | None => {
184 // the table is not refreshing. issue a refresh
185 }
186 state @ (Some(RefreshState::Finishing) | Some(RefreshState::Refreshing)) => {
187 return Err(MetaError::invalid_parameter(format!(
188 "Table '{}' is currently in state {:?}. Cannot start a new refresh operation.",
189 table.name,
190 state.unwrap()
191 )));
192 }
193 }
194
195 tracing::debug!(
196 table_id = %table_id,
197 table_name = %table.name,
198 "Table validation passed for refresh operation"
199 );
200
201 Ok(())
202 }
203}