risingwave_compute/rpc/service/
config_service.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use foyer::HybridCache;
18use risingwave_batch::task::BatchManager;
19use risingwave_common::error::tonic::ToTonicStatus;
20use risingwave_hummock_sdk::HummockSstableObjectId;
21use risingwave_pb::compute::config_service_server::ConfigService;
22use risingwave_pb::compute::{
23    ResizeCacheRequest, ResizeCacheResponse, ShowConfigRequest, ShowConfigResponse,
24};
25use risingwave_storage::hummock::{Block, Sstable, SstableBlockIndex};
26use risingwave_stream::task::LocalStreamManager;
27use thiserror_ext::AsReport;
28use tonic::{Code, Request, Response, Status};
29
30pub struct ConfigServiceImpl {
31    batch_mgr: Arc<BatchManager>,
32    stream_mgr: LocalStreamManager,
33    meta_cache: Option<HybridCache<HummockSstableObjectId, Box<Sstable>>>,
34    block_cache: Option<HybridCache<SstableBlockIndex, Box<Block>>>,
35}
36
37#[async_trait::async_trait]
38impl ConfigService for ConfigServiceImpl {
39    async fn show_config(
40        &self,
41        _request: Request<ShowConfigRequest>,
42    ) -> Result<Response<ShowConfigResponse>, Status> {
43        let batch_config = serde_json::to_string(self.batch_mgr.config())
44            .map_err(|e| e.to_status(Code::Internal, "compute"))?;
45        // TODO(config): show overridden config for specific job
46        let stream_config = serde_json::to_string(&self.stream_mgr.env.global_config())
47            .map_err(|e| e.to_status(Code::Internal, "compute"))?;
48
49        let show_config_response = ShowConfigResponse {
50            batch_config,
51            stream_config,
52        };
53        Ok(Response::new(show_config_response))
54    }
55
56    async fn resize_cache(
57        &self,
58        request: Request<ResizeCacheRequest>,
59    ) -> Result<Response<ResizeCacheResponse>, Status> {
60        let req = request.into_inner();
61
62        if let Some(meta_cache) = &self.meta_cache
63            && req.meta_cache_capacity > 0
64        {
65            match meta_cache.memory().resize(req.meta_cache_capacity as _) {
66                Ok(_) => tracing::info!(
67                    "resize meta cache capacity to {:?}",
68                    req.meta_cache_capacity
69                ),
70                Err(e) => return Err(Status::internal(e.to_report_string())),
71            }
72        }
73
74        if let Some(block_cache) = &self.block_cache
75            && req.data_cache_capacity > 0
76        {
77            match block_cache.memory().resize(req.data_cache_capacity as _) {
78                Ok(_) => tracing::info!(
79                    "resize data cache capacity to {:?}",
80                    req.data_cache_capacity
81                ),
82                Err(e) => return Err(Status::internal(e.to_report_string())),
83            }
84        }
85
86        Ok(Response::new(ResizeCacheResponse {}))
87    }
88}
89
90impl ConfigServiceImpl {
91    pub fn new(
92        batch_mgr: Arc<BatchManager>,
93        stream_mgr: LocalStreamManager,
94        meta_cache: Option<HybridCache<HummockSstableObjectId, Box<Sstable>>>,
95        block_cache: Option<HybridCache<SstableBlockIndex, Box<Block>>>,
96    ) -> Self {
97        Self {
98            batch_mgr,
99            stream_mgr,
100            meta_cache,
101            block_cache,
102        }
103    }
104}