risingwave_frontend/handler/
refresh.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_pb::meta::RefreshRequest;
18use risingwave_sqlparser::ast::ObjectName;
19use thiserror_ext::AsReport;
20
21use crate::catalog::table_catalog::TableType;
22use crate::error::{ErrorCode, Result};
23use crate::handler::util::get_table_catalog_by_table_name;
24use crate::handler::{HandlerArgs, RwPgResponse};
25
26/// Handle REFRESH statement
27///
28/// This function processes the REFRESH statement by:
29/// 1. Validating the table exists and is refreshable
30/// 2. Sending a refresh command to the meta service
31/// 3. Returning appropriate response to the client
32pub async fn handle_refresh(
33    handler_args: HandlerArgs,
34    table_name: ObjectName,
35) -> Result<RwPgResponse> {
36    let session = handler_args.session;
37
38    // Get table catalog to validate table exists
39    let (table_catalog, schema_name) =
40        get_table_catalog_by_table_name(session.as_ref(), &table_name)?;
41
42    // Check if table supports refresh operations
43    if !table_catalog.refreshable {
44        return Err(ErrorCode::InvalidInputSyntax(format!(
45            "Table '{}.{}' is not refreshable. Only tables created with REFRESHABLE flag support manual refresh.",
46            schema_name, table_name
47        )).into());
48    }
49
50    // Only allow refresh on tables, not views or materialized views
51    match table_catalog.table_type() {
52        TableType::Table => {
53            // This is valid
54        }
55        t @ (TableType::MaterializedView
56        | TableType::Index
57        | TableType::VectorIndex
58        | TableType::Internal) => {
59            return Err(ErrorCode::InvalidInputSyntax(format!(
60                "REFRESH is only supported for tables, got {:?}.",
61                t
62            ))
63            .into());
64        }
65    }
66
67    let table_id = table_catalog.id();
68
69    // Create refresh request
70    let refresh_request = RefreshRequest {
71        table_id: table_id.table_id(),
72        associated_source_id: table_catalog
73            .associated_source_id()
74            .context("Table is not associated with a refreshable source")?
75            .table_id(),
76    };
77
78    // Send refresh command to meta service via stream manager
79    let meta_client = session.env().meta_client();
80    match meta_client.refresh(refresh_request).await {
81        Ok(_) => {
82            // Refresh command sent successfully
83            tracing::info!(
84                table_id = %table_id,
85                table_name = %table_name,
86                "Manual refresh initiated"
87            );
88
89            // Return success response
90            Ok(PgResponse::builder(StatementType::REFRESH_TABLE)
91                .notice(format!(
92                    "REFRESH initiated for table '{}.{}'",
93                    schema_name, table_name
94                ))
95                .into())
96        }
97        Err(e) => {
98            tracing::error!(
99                error = %e.as_report(),
100                table_id = %table_id,
101                table_name = %table_name,
102                "Failed to initiate refresh"
103            );
104
105            Err(ErrorCode::InternalError(format!(
106                "Failed to refresh table '{}.{}': {}",
107                schema_name,
108                table_name,
109                e.as_report()
110            ))
111            .into())
112        }
113    }
114}