risingwave_meta/stream/
refresh_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::catalog::{DatabaseId, TableId};
17use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
18use risingwave_pb::meta::{RefreshRequest, RefreshResponse};
19use thiserror_ext::AsReport;
20
21use crate::barrier::{BarrierScheduler, Command};
22use crate::manager::MetadataManager;
23use crate::{MetaError, MetaResult};
24
25/// Manager responsible for handling refresh operations on refreshable tables
26pub struct RefreshManager {
27    metadata_manager: MetadataManager,
28    barrier_scheduler: BarrierScheduler,
29}
30
31impl RefreshManager {
32    /// Create a new `RefreshManager` instance
33    pub fn new(metadata_manager: MetadataManager, barrier_scheduler: BarrierScheduler) -> Self {
34        Self {
35            metadata_manager,
36            barrier_scheduler,
37        }
38    }
39
40    /// Execute a refresh operation for the specified table
41    ///
42    /// This method:
43    /// 1. Validates that the table exists and is refreshable
44    /// 2. Sends a refresh command through the barrier system
45    /// 3. Returns the result of the refresh operation
46    pub async fn refresh_table(&self, request: RefreshRequest) -> MetaResult<RefreshResponse> {
47        let table_id = TableId::new(request.table_id);
48        let associated_source_id = TableId::new(request.associated_source_id);
49
50        tracing::info!("Starting refresh operation for table {}", table_id);
51
52        // Validate that the table exists and is refreshable
53        self.validate_refreshable_table(table_id, associated_source_id)
54            .await?;
55
56        // Get database_id for the table
57        let database_id = DatabaseId::new(
58            self.metadata_manager
59                .catalog_controller
60                .get_object_database_id(table_id.table_id() as _)
61                .await? as _,
62        );
63
64        // Create refresh command
65        let refresh_command = Command::Refresh {
66            table_id,
67            associated_source_id,
68        };
69
70        // Send refresh command through barrier system
71        match self
72            .barrier_scheduler
73            .run_command(database_id, refresh_command)
74            .await
75        {
76            Ok(_) => {
77                tracing::info!(
78                    table_id = %table_id,
79                    "Refresh command completed successfully"
80                );
81
82                Ok(RefreshResponse {
83                    status: None, // Success indicated by None status
84                })
85            }
86            Err(e) => {
87                tracing::error!(
88                    error = %e.as_report(),
89                    table_id = %table_id,
90                    "Failed to execute refresh command"
91                );
92
93                Err(anyhow!(e)
94                    .context(format!("Failed to refresh table {}", table_id))
95                    .into())
96            }
97        }
98    }
99
100    /// Validate that the specified table exists and supports refresh operations
101    async fn validate_refreshable_table(
102        &self,
103        table_id: TableId,
104        associated_source_id: TableId,
105    ) -> MetaResult<()> {
106        // Check if table exists in catalog
107        let table = self
108            .metadata_manager
109            .catalog_controller
110            .get_table_by_id(table_id.table_id as _)
111            .await?;
112
113        // Check if table is refreshable
114        if !table.refreshable {
115            return Err(MetaError::invalid_parameter(format!(
116                "Table '{}' is not refreshable. Only tables created with REFRESHABLE flag support manual refresh.",
117                table.name
118            )));
119        }
120
121        if table.optional_associated_source_id
122            != Some(OptionalAssociatedSourceId::AssociatedSourceId(
123                associated_source_id.table_id(),
124            ))
125        {
126            return Err(MetaError::invalid_parameter(format!(
127                "Table '{}' is not associated with source '{}'. table.optional_associated_source_id: {:?}",
128                table.name, associated_source_id, table.optional_associated_source_id
129            )));
130        }
131
132        tracing::debug!(
133            table_id = %table_id,
134            table_name = %table.name,
135            "Table validation passed for refresh operation"
136        );
137
138        Ok(())
139    }
140}