risingwave_frontend/handler/
refresh.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_pb::meta::RefreshRequest;
18use risingwave_sqlparser::ast::ObjectName;
19use thiserror_ext::AsReport;
20
21use crate::catalog::table_catalog::TableType;
22use crate::error::{ErrorCode, Result};
23use crate::handler::util::get_table_catalog_by_table_name;
24use crate::handler::{HandlerArgs, RwPgResponse};
25
26/// Handle REFRESH statement
27///
28/// This function processes the REFRESH statement by:
29/// 1. Validating the table exists and is refreshable
30/// 2. Sending a refresh command to the meta service
31/// 3. Returning appropriate response to the client
32pub async fn handle_refresh(
33    handler_args: HandlerArgs,
34    table_name: ObjectName,
35) -> Result<RwPgResponse> {
36    let session = handler_args.session;
37
38    // Get table catalog to validate table exists
39    let (table_catalog, schema_name) =
40        get_table_catalog_by_table_name(session.as_ref(), &table_name)?;
41
42    // Check if table supports refresh operations
43    if !table_catalog.refreshable {
44        return Err(ErrorCode::InvalidInputSyntax(format!(
45            "Table '{}.{}' is not refreshable. Only tables created with REFRESHABLE flag support manual refresh.",
46            schema_name, table_name
47        )).into());
48    }
49
50    // Only allow refresh on tables, not views or materialized views
51    match table_catalog.table_type() {
52        TableType::Table => {
53            // This is valid
54        }
55        t @ (TableType::MaterializedView | TableType::Index | TableType::Internal) => {
56            return Err(ErrorCode::InvalidInputSyntax(format!(
57                "REFRESH is only supported for tables, got {:?}.",
58                t
59            ))
60            .into());
61        }
62    }
63
64    let table_id = table_catalog.id();
65
66    // Create refresh request
67    let refresh_request = RefreshRequest {
68        table_id: table_id.table_id(),
69        associated_source_id: table_catalog
70            .associated_source_id()
71            .context("Table is not associated with a refreshable source")?
72            .table_id(),
73    };
74
75    // Send refresh command to meta service via stream manager
76    let meta_client = session.env().meta_client();
77    match meta_client.refresh(refresh_request).await {
78        Ok(_) => {
79            // Refresh command sent successfully
80            tracing::info!(
81                table_id = %table_id,
82                table_name = %table_name,
83                "Manual refresh initiated"
84            );
85
86            // Return success response
87            Ok(PgResponse::builder(StatementType::REFRESH_TABLE)
88                .notice(format!(
89                    "REFRESH initiated for table '{}.{}'",
90                    schema_name, table_name
91                ))
92                .into())
93        }
94        Err(e) => {
95            tracing::error!(
96                error = %e.as_report(),
97                table_id = %table_id,
98                table_name = %table_name,
99                "Failed to initiate refresh"
100            );
101
102            Err(ErrorCode::InternalError(format!(
103                "Failed to refresh table '{}.{}': {}",
104                schema_name,
105                table_name,
106                e.as_report()
107            ))
108            .into())
109        }
110    }
111}