risingwave_compute/rpc/service/
config_service.rs1use std::sync::Arc;
16
17use foyer::HybridCache;
18use risingwave_batch::task::BatchManager;
19use risingwave_common::error::tonic::ToTonicStatus;
20use risingwave_hummock_sdk::HummockSstableObjectId;
21use risingwave_pb::compute::config_service_server::ConfigService;
22use risingwave_pb::compute::{
23 ResizeCacheRequest, ResizeCacheResponse, ShowConfigRequest, ShowConfigResponse,
24};
25use risingwave_storage::hummock::{Block, Sstable, SstableBlockIndex};
26use risingwave_stream::task::LocalStreamManager;
27use thiserror_ext::AsReport;
28use tonic::{Code, Request, Response, Status};
29
30pub struct ConfigServiceImpl {
31 batch_mgr: Arc<BatchManager>,
32 stream_mgr: LocalStreamManager,
33 meta_cache: Option<HybridCache<HummockSstableObjectId, Box<Sstable>>>,
34 block_cache: Option<HybridCache<SstableBlockIndex, Box<Block>>>,
35}
36
37#[async_trait::async_trait]
38impl ConfigService for ConfigServiceImpl {
39 async fn show_config(
40 &self,
41 _request: Request<ShowConfigRequest>,
42 ) -> Result<Response<ShowConfigResponse>, Status> {
43 let batch_config = serde_json::to_string(self.batch_mgr.config())
44 .map_err(|e| e.to_status(Code::Internal, "compute"))?;
45 let stream_config = serde_json::to_string(&self.stream_mgr.env.global_config())
47 .map_err(|e| e.to_status(Code::Internal, "compute"))?;
48
49 let show_config_response = ShowConfigResponse {
50 batch_config,
51 stream_config,
52 };
53 Ok(Response::new(show_config_response))
54 }
55
56 async fn resize_cache(
57 &self,
58 request: Request<ResizeCacheRequest>,
59 ) -> Result<Response<ResizeCacheResponse>, Status> {
60 let req = request.into_inner();
61
62 if let Some(meta_cache) = &self.meta_cache
63 && req.meta_cache_capacity > 0
64 {
65 match meta_cache.memory().resize(req.meta_cache_capacity as _) {
66 Ok(_) => tracing::info!(
67 "resize meta cache capacity to {:?}",
68 req.meta_cache_capacity
69 ),
70 Err(e) => return Err(Status::internal(e.to_report_string())),
71 }
72 }
73
74 if let Some(block_cache) = &self.block_cache
75 && req.data_cache_capacity > 0
76 {
77 match block_cache.memory().resize(req.data_cache_capacity as _) {
78 Ok(_) => tracing::info!(
79 "resize data cache capacity to {:?}",
80 req.data_cache_capacity
81 ),
82 Err(e) => return Err(Status::internal(e.to_report_string())),
83 }
84 }
85
86 Ok(Response::new(ResizeCacheResponse {}))
87 }
88}
89
90impl ConfigServiceImpl {
91 pub fn new(
92 batch_mgr: Arc<BatchManager>,
93 stream_mgr: LocalStreamManager,
94 meta_cache: Option<HybridCache<HummockSstableObjectId, Box<Sstable>>>,
95 block_cache: Option<HybridCache<SstableBlockIndex, Box<Block>>>,
96 ) -> Self {
97 Self {
98 batch_mgr,
99 stream_mgr,
100 meta_cache,
101 block_cache,
102 }
103 }
104}