risingwave_frontend/handler/
reset_source.rs1use risingwave_connector::WithPropertiesExt;
16use risingwave_sqlparser::ast::ObjectName;
17
18use super::alter_source_with_sr::fetch_source_catalog_with_db_schema_id;
19use super::{HandlerArgs, RwPgResponse};
20use crate::error::{ErrorCode, Result};
21
22pub async fn handle_reset_source(
26 handler_args: HandlerArgs,
27 name: ObjectName,
28) -> Result<RwPgResponse> {
29 let session = handler_args.session.clone();
30
31 let source = fetch_source_catalog_with_db_schema_id(&session, &name)?;
33
34 if source.associated_table_id.is_some() {
36 return Err(ErrorCode::NotSupported(
37 "reset CDC table using RESET SOURCE statement".to_owned(),
38 "try to use RESET TABLE instead".to_owned(),
39 )
40 .into());
41 }
42
43 if !source.with_properties.is_cdc_connector() {
45 return Err(ErrorCode::NotSupported(
46 "RESET SOURCE only supports CDC sources".to_owned(),
47 "This operation is only for CDC sources when offset has expired".to_owned(),
48 )
49 .into());
50 }
51
52 let catalog_writer = session.catalog_writer()?;
54 catalog_writer.reset_source(source.id).await?;
55
56 Ok(pgwire::pg_response::PgResponse::empty_result(
57 pgwire::pg_response::StatementType::ALTER_SOURCE,
58 ))
59}