risingwave_compute/rpc/service/
config_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::sync::Arc;
15
16use foyer::HybridCache;
17use risingwave_batch::task::BatchManager;
18use risingwave_common::error::tonic::ToTonicStatus;
19use risingwave_hummock_sdk::HummockSstableObjectId;
20use risingwave_pb::compute::config_service_server::ConfigService;
21use risingwave_pb::compute::{
22    ResizeCacheRequest, ResizeCacheResponse, ShowConfigRequest, ShowConfigResponse,
23};
24use risingwave_storage::hummock::{Block, Sstable, SstableBlockIndex};
25use risingwave_stream::task::LocalStreamManager;
26use thiserror_ext::AsReport;
27use tonic::{Code, Request, Response, Status};
28
29pub struct ConfigServiceImpl {
30    batch_mgr: Arc<BatchManager>,
31    stream_mgr: LocalStreamManager,
32    meta_cache: Option<HybridCache<HummockSstableObjectId, Box<Sstable>>>,
33    block_cache: Option<HybridCache<SstableBlockIndex, Box<Block>>>,
34}
35
36#[async_trait::async_trait]
37impl ConfigService for ConfigServiceImpl {
38    async fn show_config(
39        &self,
40        _request: Request<ShowConfigRequest>,
41    ) -> Result<Response<ShowConfigResponse>, Status> {
42        let batch_config = serde_json::to_string(self.batch_mgr.config())
43            .map_err(|e| e.to_status(Code::Internal, "compute"))?;
44        let stream_config = serde_json::to_string(&self.stream_mgr.env.config())
45            .map_err(|e| e.to_status(Code::Internal, "compute"))?;
46
47        let show_config_response = ShowConfigResponse {
48            batch_config,
49            stream_config,
50        };
51        Ok(Response::new(show_config_response))
52    }
53
54    async fn resize_cache(
55        &self,
56        request: Request<ResizeCacheRequest>,
57    ) -> Result<Response<ResizeCacheResponse>, Status> {
58        let req = request.into_inner();
59
60        if let Some(meta_cache) = &self.meta_cache
61            && req.meta_cache_capacity > 0
62        {
63            match meta_cache.memory().resize(req.meta_cache_capacity as _) {
64                Ok(_) => tracing::info!(
65                    "resize meta cache capacity to {:?}",
66                    req.meta_cache_capacity
67                ),
68                Err(e) => return Err(Status::internal(e.to_report_string())),
69            }
70        }
71
72        if let Some(block_cache) = &self.block_cache
73            && req.data_cache_capacity > 0
74        {
75            match block_cache.memory().resize(req.data_cache_capacity as _) {
76                Ok(_) => tracing::info!(
77                    "resize data cache capacity to {:?}",
78                    req.data_cache_capacity
79                ),
80                Err(e) => return Err(Status::internal(e.to_report_string())),
81            }
82        }
83
84        Ok(Response::new(ResizeCacheResponse {}))
85    }
86}
87
88impl ConfigServiceImpl {
89    pub fn new(
90        batch_mgr: Arc<BatchManager>,
91        stream_mgr: LocalStreamManager,
92        meta_cache: Option<HybridCache<HummockSstableObjectId, Box<Sstable>>>,
93        block_cache: Option<HybridCache<SstableBlockIndex, Box<Block>>>,
94    ) -> Self {
95        Self {
96            batch_mgr,
97            stream_mgr,
98            meta_cache,
99            block_cache,
100        }
101    }
102}