risingwave_frontend/handler/
reset_source.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_connector::WithPropertiesExt;
16use risingwave_sqlparser::ast::ObjectName;
17
18use super::alter_source_with_sr::fetch_source_catalog_with_db_schema_id;
19use super::{HandlerArgs, RwPgResponse};
20use crate::error::{ErrorCode, Result};
21
22/// Handle `RESET SOURCE source_name` command
23/// This command is used to reset CDC source offset to the latest position
24/// when the original offset has expired in the upstream binlog/oplog.
25pub async fn handle_reset_source(
26    handler_args: HandlerArgs,
27    name: ObjectName,
28) -> Result<RwPgResponse> {
29    let session = handler_args.session.clone();
30
31    // Fetch source catalog and check privileges
32    let source = fetch_source_catalog_with_db_schema_id(&session, &name)?;
33
34    // Check if the source has an associated table (CDC table)
35    if source.associated_table_id.is_some() {
36        return Err(ErrorCode::NotSupported(
37            "reset CDC table using RESET SOURCE statement".to_owned(),
38            "try to use RESET TABLE instead".to_owned(),
39        )
40        .into());
41    }
42
43    // Only CDC sources (MySQL/MongoDB) support RESET operation
44    if !source.with_properties.is_cdc_connector() {
45        return Err(ErrorCode::NotSupported(
46            "RESET SOURCE only supports CDC sources".to_owned(),
47            "This operation is only for CDC sources when offset has expired".to_owned(),
48        )
49        .into());
50    }
51
52    // Call meta service to reset the source
53    let catalog_writer = session.catalog_writer()?;
54    catalog_writer.reset_source(source.id).await?;
55
56    Ok(pgwire::pg_response::PgResponse::empty_result(
57        pgwire::pg_response::StatementType::ALTER_SOURCE,
58    ))
59}