risingwave_compute/rpc/service/
config_service.rs1use std::sync::Arc;
15
16use foyer::HybridCache;
17use risingwave_batch::task::BatchManager;
18use risingwave_common::error::tonic::ToTonicStatus;
19use risingwave_hummock_sdk::HummockSstableObjectId;
20use risingwave_pb::compute::config_service_server::ConfigService;
21use risingwave_pb::compute::{
22 ResizeCacheRequest, ResizeCacheResponse, ShowConfigRequest, ShowConfigResponse,
23};
24use risingwave_storage::hummock::{Block, Sstable, SstableBlockIndex};
25use risingwave_stream::task::LocalStreamManager;
26use thiserror_ext::AsReport;
27use tonic::{Code, Request, Response, Status};
28
29pub struct ConfigServiceImpl {
30 batch_mgr: Arc<BatchManager>,
31 stream_mgr: LocalStreamManager,
32 meta_cache: Option<HybridCache<HummockSstableObjectId, Box<Sstable>>>,
33 block_cache: Option<HybridCache<SstableBlockIndex, Box<Block>>>,
34}
35
36#[async_trait::async_trait]
37impl ConfigService for ConfigServiceImpl {
38 async fn show_config(
39 &self,
40 _request: Request<ShowConfigRequest>,
41 ) -> Result<Response<ShowConfigResponse>, Status> {
42 let batch_config = serde_json::to_string(self.batch_mgr.config())
43 .map_err(|e| e.to_status(Code::Internal, "compute"))?;
44 let stream_config = serde_json::to_string(&self.stream_mgr.env.global_config())
46 .map_err(|e| e.to_status(Code::Internal, "compute"))?;
47
48 let show_config_response = ShowConfigResponse {
49 batch_config,
50 stream_config,
51 };
52 Ok(Response::new(show_config_response))
53 }
54
55 async fn resize_cache(
56 &self,
57 request: Request<ResizeCacheRequest>,
58 ) -> Result<Response<ResizeCacheResponse>, Status> {
59 let req = request.into_inner();
60
61 if let Some(meta_cache) = &self.meta_cache
62 && req.meta_cache_capacity > 0
63 {
64 match meta_cache.memory().resize(req.meta_cache_capacity as _) {
65 Ok(_) => tracing::info!(
66 "resize meta cache capacity to {:?}",
67 req.meta_cache_capacity
68 ),
69 Err(e) => return Err(Status::internal(e.to_report_string())),
70 }
71 }
72
73 if let Some(block_cache) = &self.block_cache
74 && req.data_cache_capacity > 0
75 {
76 match block_cache.memory().resize(req.data_cache_capacity as _) {
77 Ok(_) => tracing::info!(
78 "resize data cache capacity to {:?}",
79 req.data_cache_capacity
80 ),
81 Err(e) => return Err(Status::internal(e.to_report_string())),
82 }
83 }
84
85 Ok(Response::new(ResizeCacheResponse {}))
86 }
87}
88
89impl ConfigServiceImpl {
90 pub fn new(
91 batch_mgr: Arc<BatchManager>,
92 stream_mgr: LocalStreamManager,
93 meta_cache: Option<HybridCache<HummockSstableObjectId, Box<Sstable>>>,
94 block_cache: Option<HybridCache<SstableBlockIndex, Box<Block>>>,
95 ) -> Self {
96 Self {
97 batch_mgr,
98 stream_mgr,
99 meta_cache,
100 block_cache,
101 }
102 }
103}