risingwave_frontend/handler/
refresh.rs1use anyhow::Context;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_pb::meta::RefreshRequest;
18use risingwave_sqlparser::ast::ObjectName;
19use thiserror_ext::AsReport;
20
21use crate::catalog::table_catalog::TableType;
22use crate::error::{ErrorCode, Result};
23use crate::handler::util::get_table_catalog_by_table_name;
24use crate::handler::{HandlerArgs, RwPgResponse};
25
26pub async fn handle_refresh(
33 handler_args: HandlerArgs,
34 table_name: ObjectName,
35) -> Result<RwPgResponse> {
36 let session = handler_args.session;
37
38 let (table_catalog, schema_name) =
40 get_table_catalog_by_table_name(session.as_ref(), &table_name)?;
41
42 if !table_catalog.refreshable {
44 return Err(ErrorCode::InvalidInputSyntax(format!(
45 "Table '{}.{}' is not refreshable. Only tables created with REFRESHABLE flag support manual refresh.",
46 schema_name, table_name
47 )).into());
48 }
49
50 match table_catalog.table_type() {
52 TableType::Table => {
53 }
55 t @ (TableType::MaterializedView
56 | TableType::Index
57 | TableType::VectorIndex
58 | TableType::Internal) => {
59 return Err(ErrorCode::InvalidInputSyntax(format!(
60 "REFRESH is only supported for tables, got {:?}.",
61 t
62 ))
63 .into());
64 }
65 }
66
67 let table_id = table_catalog.id();
68
69 let refresh_request = RefreshRequest {
71 table_id,
72 associated_source_id: table_catalog
73 .associated_source_id()
74 .context("Table is not associated with a refreshable source")?,
75 };
76
77 let meta_client = session.env().meta_client();
79 match meta_client.refresh(refresh_request).await {
80 Ok(_) => {
81 tracing::info!(
83 table_id = %table_id,
84 table_name = %table_name,
85 "Manual refresh initiated"
86 );
87
88 Ok(PgResponse::builder(StatementType::REFRESH_TABLE)
90 .notice(format!(
91 "REFRESH initiated for table '{}.{}'",
92 schema_name, table_name
93 ))
94 .into())
95 }
96 Err(e) => {
97 tracing::error!(
98 error = %e.as_report(),
99 table_id = %table_id,
100 table_name = %table_name,
101 "Failed to initiate refresh"
102 );
103
104 Err(ErrorCode::InternalError(format!(
105 "Failed to refresh table '{}.{}': {}",
106 schema_name,
107 table_name,
108 e.as_report()
109 ))
110 .into())
111 }
112 }
113}