risingwave_meta_service/
session_config.rs1use async_trait::async_trait;
16use risingwave_meta::controller::session_params::SessionParamsControllerRef;
17use risingwave_pb::meta::session_param_service_server::SessionParamService;
18use risingwave_pb::meta::{
19 GetSessionParamsRequest, GetSessionParamsResponse, SetSessionParamRequest,
20 SetSessionParamResponse,
21};
22use thiserror_ext::AsReport;
23use tonic::{Request, Response, Status};
24
25pub struct SessionParamsServiceImpl {
26 session_params_manager: SessionParamsControllerRef,
27}
28
29impl SessionParamsServiceImpl {
30 pub fn new(session_params_manager: SessionParamsControllerRef) -> Self {
31 Self {
32 session_params_manager,
33 }
34 }
35}
36
37#[async_trait]
38impl SessionParamService for SessionParamsServiceImpl {
39 async fn get_session_params(
40 &self,
41 _request: Request<GetSessionParamsRequest>,
42 ) -> Result<Response<GetSessionParamsResponse>, Status> {
43 let params = self.session_params_manager.get_params().await;
44 let params_str = serde_json::to_string(¶ms).map_err(|e| {
45 Status::internal(format!("Failed to parse session config: {}", e.as_report()))
46 })?;
47
48 Ok(Response::new(GetSessionParamsResponse {
49 params: params_str,
50 }))
51 }
52
53 async fn set_session_param(
54 &self,
55 request: Request<SetSessionParamRequest>,
56 ) -> Result<Response<SetSessionParamResponse>, Status> {
57 let req = request.into_inner();
58 let req_param = req.get_param();
59
60 let param_value = self
61 .session_params_manager
62 .set_param(req_param, req.value.clone())
63 .await;
64
65 Ok(Response::new(SetSessionParamResponse {
66 param: param_value?,
67 }))
68 }
69}