risingwave_frontend/handler/
alter_system.rsuse pgwire::pg_response::StatementType;
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_sqlparser::ast::{Ident, SetVariableValue};
use super::variable::set_var_to_param_str;
use super::{HandlerArgs, RwPgResponse};
use crate::error::{ErrorCode, Result};
const NOTICE_BARRIER_INTERVAL_MS: u32 = 300000;
const NOTICE_CHECKPOINT_FREQUENCY: u64 = 60;
pub async fn handle_alter_system(
handler_args: HandlerArgs,
param: Ident,
value: SetVariableValue,
) -> Result<RwPgResponse> {
let value = set_var_to_param_str(&value);
let param_name = param.to_string();
let meta_client = handler_args.session.env().meta_client();
let mut builder = RwPgResponse::builder(StatementType::ALTER_SYSTEM);
if SessionConfig::contains_param(¶m_name) {
if SessionConfig::check_no_alter_sys(¶m_name).unwrap() {
return Err(ErrorCode::InternalError(format!(
"session param {} cannot be altered system wide",
param_name
))
.into());
}
meta_client.set_session_param(param_name, value).await?;
} else {
let params = meta_client.set_system_param(param_name, value).await?;
if let Some(params) = params {
if params.barrier_interval_ms() >= NOTICE_BARRIER_INTERVAL_MS {
builder = builder.notice(
format!("Barrier interval is set to {} ms >= {} ms. This can hurt freshness and potentially cause OOM.",
params.barrier_interval_ms(), NOTICE_BARRIER_INTERVAL_MS));
}
if params.checkpoint_frequency() >= NOTICE_CHECKPOINT_FREQUENCY {
builder = builder.notice(
format!("Checkpoint frequency is set to {} >= {}. This can hurt freshness and potentially cause OOM.",
params.checkpoint_frequency(), NOTICE_CHECKPOINT_FREQUENCY));
}
}
}
Ok(builder.into())
}