risingwave_meta_service/
session_config.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use risingwave_meta::controller::session_params::SessionParamsControllerRef;
17use risingwave_pb::meta::session_param_service_server::SessionParamService;
18use risingwave_pb::meta::{
19    GetSessionParamsRequest, GetSessionParamsResponse, SetSessionParamRequest,
20    SetSessionParamResponse,
21};
22use thiserror_ext::AsReport;
23use tonic::{Request, Response, Status};
24
25pub struct SessionParamsServiceImpl {
26    session_params_manager: SessionParamsControllerRef,
27}
28
29impl SessionParamsServiceImpl {
30    pub fn new(session_params_manager: SessionParamsControllerRef) -> Self {
31        Self {
32            session_params_manager,
33        }
34    }
35}
36
37#[async_trait]
38impl SessionParamService for SessionParamsServiceImpl {
39    async fn get_session_params(
40        &self,
41        _request: Request<GetSessionParamsRequest>,
42    ) -> Result<Response<GetSessionParamsResponse>, Status> {
43        let params = self.session_params_manager.get_params().await;
44        let params_str = serde_json::to_string(&params).map_err(|e| {
45            Status::internal(format!("Failed to parse session config: {}", e.as_report()))
46        })?;
47
48        Ok(Response::new(GetSessionParamsResponse {
49            params: params_str,
50        }))
51    }
52
53    async fn set_session_param(
54        &self,
55        request: Request<SetSessionParamRequest>,
56    ) -> Result<Response<SetSessionParamResponse>, Status> {
57        let req = request.into_inner();
58        let req_param = req.get_param();
59
60        let param_value = self
61            .session_params_manager
62            .set_param(req_param, req.value.clone())
63            .await;
64
65        Ok(Response::new(SetSessionParamResponse {
66            param: param_value?,
67        }))
68    }
69}