risingwave_meta_service/
session_config.rs1use async_trait::async_trait;
16use risingwave_meta::controller::session_params::SessionParamsControllerRef;
17use risingwave_pb::meta::session_param_service_server::SessionParamService;
18use risingwave_pb::meta::{
19    GetSessionParamsRequest, GetSessionParamsResponse, SetSessionParamRequest,
20    SetSessionParamResponse,
21};
22use thiserror_ext::AsReport;
23use tonic::{Request, Response, Status};
24
25pub struct SessionParamsServiceImpl {
26    session_params_manager: SessionParamsControllerRef,
27}
28
29impl SessionParamsServiceImpl {
30    pub fn new(session_params_manager: SessionParamsControllerRef) -> Self {
31        Self {
32            session_params_manager,
33        }
34    }
35}
36
37#[async_trait]
38impl SessionParamService for SessionParamsServiceImpl {
39    async fn get_session_params(
40        &self,
41        _request: Request<GetSessionParamsRequest>,
42    ) -> Result<Response<GetSessionParamsResponse>, Status> {
43        let params = self.session_params_manager.get_params().await;
44        let params_str = serde_json::to_string(¶ms).map_err(|e| {
45            Status::internal(format!("Failed to parse session config: {}", e.as_report()))
46        })?;
47
48        Ok(Response::new(GetSessionParamsResponse {
49            params: params_str,
50        }))
51    }
52
53    async fn set_session_param(
54        &self,
55        request: Request<SetSessionParamRequest>,
56    ) -> Result<Response<SetSessionParamResponse>, Status> {
57        let req = request.into_inner();
58        let req_param = req.get_param();
59
60        let param_value = self
61            .session_params_manager
62            .set_param(req_param, req.value.clone())
63            .await;
64
65        Ok(Response::new(SetSessionParamResponse {
66            param: param_value?,
67        }))
68    }
69}