risingwave_meta/stream/
refresh_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::catalog::{DatabaseId, TableId};
17use risingwave_meta_model::table::RefreshState;
18use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
19use risingwave_pb::meta::{RefreshRequest, RefreshResponse};
20use thiserror_ext::AsReport;
21
22use crate::barrier::{BarrierScheduler, Command};
23use crate::manager::MetadataManager;
24use crate::{MetaError, MetaResult};
25
26/// # High level design for refresh table
27///
28/// - Three tables:
29///
30/// - Main table: serves queries.
31/// - Staging table: receives refreshed content during `Refreshing`.
32/// - Progress table: per-VNode progress state for resumable refresh.
33///
34/// - Phased execution:
35///
36/// - Normal → Refreshing → Merging → Cleanup → Normal.
37/// - Refreshing: load and write to staging.
38/// - Merging: chunked sort-merge integrates staging into main; per-VNode progress persists checkpoints.
39/// - Cleanup: purge staging and reset progress.
40///
41/// - Barrier-first responsiveness:
42///
43/// - Executor uses left-priority `select_with_strategy`, always handling upstream messages/barriers before background merge.
44/// - On barriers, the executor persists progress so restarts resume exactly.
45///
46/// - Meta-managed state:
47///
48/// - `refresh_state` on each table enforces no concurrent refresh and enables recovery after failures.
49/// - Startup recovery resets lingering `Refreshing` tables to `Idle` and lets executors resume `Finishing` safely.
50///
51/// ## Progress Table (Conceptual)
52/// Tracks, per VNode:
53/// - last processed position (e.g., last PK),
54/// - completion flag,
55/// - processed row count,
56/// - last checkpoint epoch.
57///
58/// The executor initializes entries on `RefreshStart`, updates them during merge, and loads them at startup to resume from the last checkpoint.
59///
60/// ## Barrier Coordination and Completion
61/// - Compute reports:
62///
63/// - `refresh_finished_table_ids`: indicates a materialized view finished refreshing.
64/// - `truncate_tables`: staging tables to be cleaned up.
65// - Checkpoint control aggregates these across barrier types; completion handlers in meta:
66/// - update `refresh_state` to `Idle`,
67/// - schedule/handle `LoadFinish`,
68/// - drive cleanup work reliably after the storage version commit.
69///
70/// Manager responsible for handling refresh operations on refreshable tables
71pub struct RefreshManager {
72    metadata_manager: MetadataManager,
73    barrier_scheduler: BarrierScheduler,
74}
75
76impl RefreshManager {
77    /// Create a new `RefreshManager` instance
78    pub fn new(metadata_manager: MetadataManager, barrier_scheduler: BarrierScheduler) -> Self {
79        Self {
80            metadata_manager,
81            barrier_scheduler,
82        }
83    }
84
85    /// Execute a refresh operation for the specified table
86    ///
87    /// This method:
88    /// 1. Validates that the table exists and is refreshable
89    /// 2. Checks current refresh state and ensures no concurrent refresh
90    /// 3. Atomically sets the table state to REFRESHING
91    /// 4. Sends a refresh command through the barrier system
92    /// 5. Returns the result of the refresh operation
93    pub async fn refresh_table(&self, request: RefreshRequest) -> MetaResult<RefreshResponse> {
94        let table_id = TableId::new(request.table_id);
95        let associated_source_id = TableId::new(request.associated_source_id);
96
97        tracing::info!("Starting refresh operation for table {}", table_id);
98
99        // Validate that the table exists and is refreshable
100        self.validate_refreshable_table(table_id, associated_source_id)
101            .await?;
102
103        // Get database_id for the table
104        let database_id = DatabaseId::new(
105            self.metadata_manager
106                .catalog_controller
107                .get_object_database_id(table_id.table_id() as _)
108                .await? as _,
109        );
110
111        // Create refresh command
112        let refresh_command = Command::Refresh {
113            table_id,
114            associated_source_id,
115        };
116
117        // Send refresh command through barrier system
118        match self
119            .barrier_scheduler
120            .run_command(database_id, refresh_command)
121            .await
122        {
123            Ok(_) => {
124                tracing::info!(
125                    table_id = %table_id,
126                    "Refresh command completed successfully"
127                );
128
129                Ok(RefreshResponse { status: None })
130            }
131            Err(e) => {
132                tracing::error!(
133                    error = %e.as_report(),
134                    table_id = %table_id,
135                    "Failed to execute refresh command"
136                );
137
138                Err(anyhow!(e)
139                    .context(format!("Failed to refresh table {}", table_id))
140                    .into())
141            }
142        }
143    }
144
145    /// Validate that the specified table exists and supports refresh operations
146    async fn validate_refreshable_table(
147        &self,
148        table_id: TableId,
149        associated_source_id: TableId,
150    ) -> MetaResult<()> {
151        // Check if table exists in catalog
152        let table = self
153            .metadata_manager
154            .catalog_controller
155            .get_table_by_id(table_id.table_id as _)
156            .await?;
157
158        // Check if table is refreshable
159        if !table.refreshable {
160            return Err(MetaError::invalid_parameter(format!(
161                "Table '{}' is not refreshable. Only tables created with REFRESHABLE flag support manual refresh.",
162                table.name
163            )));
164        }
165
166        if table.optional_associated_source_id
167            != Some(OptionalAssociatedSourceId::AssociatedSourceId(
168                associated_source_id.table_id(),
169            ))
170        {
171            return Err(MetaError::invalid_parameter(format!(
172                "Table '{}' is not associated with source '{}'. table.optional_associated_source_id: {:?}",
173                table.name, associated_source_id, table.optional_associated_source_id
174            )));
175        }
176
177        let current_state = self
178            .metadata_manager
179            .catalog_controller
180            .get_table_refresh_state(table_id.table_id as _)
181            .await?;
182        match current_state {
183            Some(RefreshState::Idle) | None => {
184                // the table is not refreshing. issue a refresh
185            }
186            state @ (Some(RefreshState::Finishing) | Some(RefreshState::Refreshing)) => {
187                return Err(MetaError::invalid_parameter(format!(
188                    "Table '{}' is currently in state {:?}. Cannot start a new refresh operation.",
189                    table.name,
190                    state.unwrap()
191                )));
192            }
193        }
194
195        tracing::debug!(
196            table_id = %table_id,
197            table_name = %table.name,
198            "Table validation passed for refresh operation"
199        );
200
201        Ok(())
202    }
203}