risingwave_frontend/handler/
alter_streaming_enable_unaligned_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::bail;
17use risingwave_sqlparser::ast::ObjectName;
18
19use super::{HandlerArgs, RwPgResponse};
20use crate::binder::{Binder, Relation};
21use crate::catalog::CatalogError;
22use crate::error::{ErrorCode, Result};
23
24pub async fn handle_alter_streaming_enable_unaligned_join(
25    handler_args: HandlerArgs,
26    name: ObjectName,
27    enable: bool,
28) -> Result<RwPgResponse> {
29    if cfg!(debug_assertions) {
30        let session = handler_args.session.clone();
31        let job_id = {
32            let mut binder = Binder::new_for_system(&session);
33            Binder::validate_cross_db_reference(&session.database(), &name)?;
34            let not_found_err = CatalogError::NotFound("stream job", name.to_string());
35
36            if let Ok(relation) = binder.bind_catalog_relation_by_object_name(name.clone(), true) {
37                match relation {
38                    Relation::Source(s) => {
39                        if s.is_shared() {
40                            s.catalog.id
41                        } else {
42                            bail!(ErrorCode::NotSupported(
43                                "source has no unaligned_join".to_owned(),
44                                "Please only target materialized views or sinks".to_owned(),
45                            ));
46                        }
47                    }
48                    Relation::BaseTable(t) => t.table_catalog.id.table_id,
49                    Relation::SystemTable(_t) => {
50                        bail!(ErrorCode::NotSupported(
51                            "system table has no unaligned_join".to_owned(),
52                            "Please only target materialized views or sinks".to_owned(),
53                        ));
54                    }
55                    Relation::Share(_s) => {
56                        bail!(ErrorCode::NotSupported(
57                            "view has no unaligned_join".to_owned(),
58                            "Please only target materialized views or sinks".to_owned(),
59                        ));
60                    }
61                    _ => {
62                        // Other relation types (Subquery, Join, etc.) are not directly describable.
63                        return Err(not_found_err.into());
64                    }
65                }
66            } else if let Ok(sink) = binder.bind_sink_by_name(name.clone()) {
67                sink.sink_catalog.id.sink_id
68            } else {
69                return Err(not_found_err.into());
70            }
71        };
72        let meta_client = session.env().meta_client();
73        meta_client
74            .set_sync_log_store_aligned(job_id, !enable)
75            .await?;
76        Ok(PgResponse::empty_result(
77            StatementType::ALTER_MATERIALIZED_VIEW,
78        ))
79    } else {
80        bail!("ALTER STREAMING ENABLE UNALIGNED JOIN is only supported in debug mode");
81    }
82}