risingwave_frontend/handler/
alter_streaming_enable_unaligned_join.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::bail;
17use risingwave_sqlparser::ast::ObjectName;
18
19use super::{HandlerArgs, RwPgResponse};
20use crate::binder::{Binder, Relation};
21use crate::catalog::CatalogError;
22use crate::error::{ErrorCode, Result};
23
24pub async fn handle_alter_streaming_enable_unaligned_join(
25 handler_args: HandlerArgs,
26 name: ObjectName,
27 enable: bool,
28) -> Result<RwPgResponse> {
29 if cfg!(debug_assertions) {
30 let session = handler_args.session.clone();
31 let job_id = {
32 let mut binder = Binder::new_for_system(&session);
33 Binder::validate_cross_db_reference(&session.database(), &name)?;
34 let not_found_err = CatalogError::NotFound("stream job", name.to_string());
35
36 if let Ok(relation) = binder.bind_catalog_relation_by_object_name(name.clone(), true) {
37 match relation {
38 Relation::Source(s) => {
39 if s.is_shared() {
40 s.catalog.id
41 } else {
42 bail!(ErrorCode::NotSupported(
43 "source has no unaligned_join".to_owned(),
44 "Please only target materialized views or sinks".to_owned(),
45 ));
46 }
47 }
48 Relation::BaseTable(t) => t.table_catalog.id.table_id,
49 Relation::SystemTable(_t) => {
50 bail!(ErrorCode::NotSupported(
51 "system table has no unaligned_join".to_owned(),
52 "Please only target materialized views or sinks".to_owned(),
53 ));
54 }
55 Relation::Share(_s) => {
56 bail!(ErrorCode::NotSupported(
57 "view has no unaligned_join".to_owned(),
58 "Please only target materialized views or sinks".to_owned(),
59 ));
60 }
61 _ => {
62 return Err(not_found_err.into());
64 }
65 }
66 } else if let Ok(sink) = binder.bind_sink_by_name(name.clone()) {
67 sink.sink_catalog.id.sink_id
68 } else {
69 return Err(not_found_err.into());
70 }
71 };
72 let meta_client = session.env().meta_client();
73 meta_client
74 .set_sync_log_store_aligned(job_id, !enable)
75 .await?;
76 Ok(PgResponse::empty_result(
77 StatementType::ALTER_MATERIALIZED_VIEW,
78 ))
79 } else {
80 bail!("ALTER STREAMING ENABLE UNALIGNED JOIN is only supported in debug mode");
81 }
82}