risedev/task/
compactor_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::io::Write;
17use std::path::Path;
18use std::process::Command;
19
20use anyhow::Result;
21
22use crate::util::{get_program_args, get_program_env_cmd, get_program_name};
23use crate::{CompactorConfig, ExecuteContext, Task, add_meta_node, add_tempo_endpoint};
24
25pub struct CompactorService {
26    config: CompactorConfig,
27}
28
29impl CompactorService {
30    pub fn new(config: CompactorConfig) -> Result<Self> {
31        Ok(Self { config })
32    }
33
34    /// Apply command args according to config
35    pub fn apply_command_args(cmd: &mut Command, config: &CompactorConfig) -> Result<()> {
36        cmd.arg("--listen-addr")
37            .arg(format!("{}:{}", config.listen_address, config.port))
38            .arg("--prometheus-listener-addr")
39            .arg(format!(
40                "{}:{}",
41                config.listen_address, config.exporter_port
42            ))
43            .arg("--advertise-addr")
44            .arg(format!("{}:{}", config.address, config.port))
45            .arg("--compactor-mode")
46            .arg(&config.compactor_mode);
47        if let Some(compaction_worker_threads_number) =
48            config.compaction_worker_threads_number.as_ref()
49        {
50            cmd.arg("--compaction-worker-threads-number")
51                .arg(format!("{}", compaction_worker_threads_number));
52        }
53
54        let provide_meta_node = config.provide_meta_node.as_ref().unwrap();
55        add_meta_node(provide_meta_node, cmd)?;
56
57        let provide_tempo = config.provide_tempo.as_ref().unwrap();
58        add_tempo_endpoint(provide_tempo, cmd)?;
59
60        Ok(())
61    }
62}
63
64impl Task for CompactorService {
65    fn execute(&mut self, ctx: &mut ExecuteContext<impl Write>) -> Result<()> {
66        ctx.service(self);
67        ctx.pb.set_message("starting...");
68
69        let mut cmd = ctx.risingwave_cmd("compactor")?;
70
71        if crate::util::is_env_set("RISEDEV_ENABLE_PROFILE") {
72            cmd.env(
73                "RW_PROFILE_PATH",
74                Path::new(&env::var("PREFIX_LOG")?).join(format!("profile-{}", self.id())),
75            );
76        }
77
78        if crate::util::is_env_set("RISEDEV_ENABLE_HEAP_PROFILE") {
79            // See https://linux.die.net/man/3/jemalloc for the descriptions of profiling options
80            let conf = "prof:true,lg_prof_interval:34,lg_prof_sample:19,prof_prefix:compactor";
81            cmd.env("_RJEM_MALLOC_CONF", conf); // prefixed for macos
82            cmd.env("MALLOC_CONF", conf); // unprefixed for linux
83        }
84
85        Self::apply_command_args(&mut cmd, &self.config)?;
86
87        if !self.config.user_managed {
88            ctx.run_command(ctx.tmux_run(cmd)?)?;
89            ctx.pb.set_message("started");
90        } else {
91            ctx.pb.set_message("user managed");
92            writeln!(
93                &mut ctx.log,
94                "Please use the following parameters to start the compactor:\n{}\n{} {}\n\n",
95                get_program_env_cmd(&cmd),
96                get_program_name(&cmd),
97                get_program_args(&cmd)
98            )?;
99        }
100
101        Ok(())
102    }
103
104    fn id(&self) -> String {
105        self.config.id.clone()
106    }
107}