risingwave_frontend/handler/
refresh.rs1use anyhow::Context;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_pb::meta::RefreshRequest;
18use risingwave_sqlparser::ast::ObjectName;
19use thiserror_ext::AsReport;
20
21use crate::catalog::table_catalog::TableType;
22use crate::error::{ErrorCode, Result};
23use crate::handler::util::get_table_catalog_by_table_name;
24use crate::handler::{HandlerArgs, RwPgResponse};
25
26pub async fn handle_refresh(
33 handler_args: HandlerArgs,
34 table_name: ObjectName,
35) -> Result<RwPgResponse> {
36 let session = handler_args.session;
37
38 let (table_catalog, schema_name) =
40 get_table_catalog_by_table_name(session.as_ref(), &table_name)?;
41
42 if !table_catalog.refreshable {
44 return Err(ErrorCode::InvalidInputSyntax(format!(
45 "Table '{}.{}' is not refreshable. Only tables created with REFRESHABLE flag support manual refresh.",
46 schema_name, table_name
47 )).into());
48 }
49
50 match table_catalog.table_type() {
52 TableType::Table => {
53 }
55 t @ (TableType::MaterializedView | TableType::Index | TableType::Internal) => {
56 return Err(ErrorCode::InvalidInputSyntax(format!(
57 "REFRESH is only supported for tables, got {:?}.",
58 t
59 ))
60 .into());
61 }
62 }
63
64 let table_id = table_catalog.id();
65
66 let refresh_request = RefreshRequest {
68 table_id: table_id.table_id(),
69 associated_source_id: table_catalog
70 .associated_source_id()
71 .context("Table is not associated with a refreshable source")?
72 .table_id(),
73 };
74
75 let meta_client = session.env().meta_client();
77 match meta_client.refresh(refresh_request).await {
78 Ok(_) => {
79 tracing::info!(
81 table_id = %table_id,
82 table_name = %table_name,
83 "Manual refresh initiated"
84 );
85
86 Ok(PgResponse::builder(StatementType::REFRESH_TABLE)
88 .notice(format!(
89 "REFRESH initiated for table '{}.{}'",
90 schema_name, table_name
91 ))
92 .into())
93 }
94 Err(e) => {
95 tracing::error!(
96 error = %e.as_report(),
97 table_id = %table_id,
98 table_name = %table_name,
99 "Failed to initiate refresh"
100 );
101
102 Err(ErrorCode::InternalError(format!(
103 "Failed to refresh table '{}.{}': {}",
104 schema_name,
105 table_name,
106 e.as_report()
107 ))
108 .into())
109 }
110 }
111}