risingwave_meta/stream/
refresh_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::catalog::{DatabaseId, TableId};
17use risingwave_meta_model::table::RefreshState;
18use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
19use risingwave_pb::meta::{RefreshRequest, RefreshResponse};
20use thiserror_ext::AsReport;
21
22use crate::barrier::{BarrierScheduler, Command};
23use crate::manager::MetadataManager;
24use crate::{MetaError, MetaResult};
25
26/// # High level design for refresh table
27///
28/// - Three tables:
29///
30/// - Main table: serves queries.
31/// - Staging table: receives refreshed content during `Refreshing`.
32/// - Progress table: per-VNode progress state for resumable refresh.
33///
34/// - Phased execution:
35///
36/// - Normal → Refreshing → Merging → Cleanup → Normal.
37/// - Refreshing: load and write to staging.
38/// - Merging: chunked sort-merge integrates staging into main; per-VNode progress persists checkpoints.
39/// - Cleanup: purge staging and reset progress.
40///
41/// - Barrier-first responsiveness:
42///
43/// - Executor uses left-priority `select_with_strategy`, always handling upstream messages/barriers before background merge.
44/// - On barriers, the executor persists progress so restarts resume exactly.
45///
46/// - Meta-managed state:
47///
48/// - `refresh_state` on each table enforces no concurrent refresh and enables recovery after failures.
49/// - Startup recovery resets lingering `Refreshing` tables to `Idle` and lets executors resume `Finishing` safely.
50///
51/// ## Progress Table (Conceptual)
52/// Tracks, per VNode:
53/// - last processed position (e.g., last PK),
54/// - completion flag,
55/// - processed row count,
56/// - last checkpoint epoch.
57///
58/// The executor initializes entries on `RefreshStart`, updates them during merge, and loads them at startup to resume from the last checkpoint.
59///
60/// ## Barrier Coordination and Completion
61/// - Compute reports:
62///
63/// - `refresh_finished_table_ids`: indicates a materialized view finished refreshing.
64/// - `truncate_tables`: staging tables to be cleaned up.
65// - Checkpoint control aggregates these across barrier types; completion handlers in meta:
66/// - update `refresh_state` to `Idle`,
67/// - schedule/handle `LoadFinish`,
68/// - drive cleanup work reliably after the storage version commit.
69///
70/// Manager responsible for handling refresh operations on refreshable tables
71pub struct RefreshManager {
72    metadata_manager: MetadataManager,
73    barrier_scheduler: BarrierScheduler,
74}
75
76impl RefreshManager {
77    /// Create a new `RefreshManager` instance
78    pub fn new(metadata_manager: MetadataManager, barrier_scheduler: BarrierScheduler) -> Self {
79        Self {
80            metadata_manager,
81            barrier_scheduler,
82        }
83    }
84
85    /// Execute a refresh operation for the specified table
86    ///
87    /// This method:
88    /// 1. Validates that the table exists and is refreshable
89    /// 2. Checks current refresh state and ensures no concurrent refresh
90    /// 3. Atomically sets the table state to REFRESHING
91    /// 4. Sends a refresh command through the barrier system
92    /// 5. Returns the result of the refresh operation
93    pub async fn refresh_table(&self, request: RefreshRequest) -> MetaResult<RefreshResponse> {
94        let table_id = TableId::new(request.table_id);
95        let associated_source_id = TableId::new(request.associated_source_id);
96
97        // Validate that the table exists and is refreshable
98        self.validate_refreshable_table(table_id, associated_source_id)
99            .await?;
100
101        tracing::info!("Starting refresh operation for table {}", table_id);
102
103        // Get database_id for the table
104        let database_id = DatabaseId::new(
105            self.metadata_manager
106                .catalog_controller
107                .get_object_database_id(table_id.table_id() as _)
108                .await? as _,
109        );
110
111        // Create refresh command
112        let refresh_command = Command::Refresh {
113            table_id,
114            associated_source_id,
115        };
116
117        // Send refresh command through barrier system
118        match self
119            .barrier_scheduler
120            .run_command(database_id, refresh_command)
121            .await
122        {
123            Ok(_) => {
124                tracing::info!(
125                    table_id = %table_id,
126                    "Refresh command completed successfully"
127                );
128
129                Ok(RefreshResponse { status: None })
130            }
131            Err(e) => {
132                tracing::error!(
133                    error = %e.as_report(),
134                    table_id = %table_id,
135                    "Failed to execute refresh command, resetting refresh state to Idle"
136                );
137
138                self.metadata_manager
139                    .catalog_controller
140                    .set_table_refresh_state(table_id.table_id as _, RefreshState::Idle)
141                    .await?;
142
143                Err(anyhow!(e)
144                    .context(format!("Failed to refresh table {}", table_id))
145                    .into())
146            }
147        }
148    }
149
150    /// Validate that the specified table exists and supports refresh operations
151    async fn validate_refreshable_table(
152        &self,
153        table_id: TableId,
154        associated_source_id: TableId,
155    ) -> MetaResult<()> {
156        // Check if table exists in catalog
157        let table = self
158            .metadata_manager
159            .catalog_controller
160            .get_table_by_id(table_id.table_id as _)
161            .await?;
162
163        // Check if table is refreshable
164        if !table.refreshable {
165            return Err(MetaError::invalid_parameter(format!(
166                "Table '{}' is not refreshable. Only tables created with REFRESHABLE flag support manual refresh.",
167                table.name
168            )));
169        }
170
171        if table.optional_associated_source_id
172            != Some(OptionalAssociatedSourceId::AssociatedSourceId(
173                associated_source_id.table_id(),
174            ))
175        {
176            return Err(MetaError::invalid_parameter(format!(
177                "Table '{}' is not associated with source '{}'. table.optional_associated_source_id: {:?}",
178                table.name, associated_source_id, table.optional_associated_source_id
179            )));
180        }
181
182        let current_state = self
183            .metadata_manager
184            .catalog_controller
185            .get_table_refresh_state(table_id.table_id as _)
186            .await?;
187        match current_state {
188            Some(RefreshState::Idle) | None => {
189                // the table is not refreshing. issue a refresh
190            }
191            state @ (Some(RefreshState::Finishing) | Some(RefreshState::Refreshing)) => {
192                return Err(MetaError::invalid_parameter(format!(
193                    "Table '{}' is currently in state {:?}. Cannot start a new refresh operation.",
194                    table.name,
195                    state.unwrap()
196                )));
197            }
198        }
199
200        tracing::debug!(
201            table_id = %table_id,
202            table_name = %table.name,
203            "Table validation passed for refresh operation"
204        );
205
206        Ok(())
207    }
208}