risedev/task/
compactor_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::io::Write;
17use std::path::Path;
18use std::process::Command;
19
20use anyhow::Result;
21
22use super::risingwave_cmd;
23use crate::util::{get_program_args, get_program_env_cmd, get_program_name};
24use crate::{CompactorConfig, ExecuteContext, Task, add_meta_node, add_tempo_endpoint};
25
26pub struct CompactorService {
27    config: CompactorConfig,
28}
29
30impl CompactorService {
31    pub fn new(config: CompactorConfig) -> Result<Self> {
32        Ok(Self { config })
33    }
34
35    /// Apply command args according to config
36    pub fn apply_command_args(cmd: &mut Command, config: &CompactorConfig) -> Result<()> {
37        cmd.arg("--listen-addr")
38            .arg(format!("{}:{}", config.listen_address, config.port))
39            .arg("--prometheus-listener-addr")
40            .arg(format!(
41                "{}:{}",
42                config.listen_address, config.exporter_port
43            ))
44            .arg("--advertise-addr")
45            .arg(format!("{}:{}", config.address, config.port));
46        if let Some(compaction_worker_threads_number) =
47            config.compaction_worker_threads_number.as_ref()
48        {
49            cmd.arg("--compaction-worker-threads-number")
50                .arg(format!("{}", compaction_worker_threads_number));
51        }
52
53        let provide_meta_node = config.provide_meta_node.as_ref().unwrap();
54        add_meta_node(provide_meta_node, cmd)?;
55
56        let provide_tempo = config.provide_tempo.as_ref().unwrap();
57        add_tempo_endpoint(provide_tempo, cmd)?;
58
59        Ok(())
60    }
61}
62
63impl Task for CompactorService {
64    fn execute(&mut self, ctx: &mut ExecuteContext<impl Write>) -> Result<()> {
65        ctx.service(self);
66        ctx.pb.set_message("starting...");
67
68        let mut cmd = risingwave_cmd("compactor")?;
69
70        if crate::util::is_env_set("RISEDEV_ENABLE_PROFILE") {
71            cmd.env(
72                "RW_PROFILE_PATH",
73                Path::new(&env::var("PREFIX_LOG")?).join(format!("profile-{}", self.id())),
74            );
75        }
76
77        if crate::util::is_env_set("RISEDEV_ENABLE_HEAP_PROFILE") {
78            // See https://linux.die.net/man/3/jemalloc for the descriptions of profiling options
79            let conf = "prof:true,lg_prof_interval:34,lg_prof_sample:19,prof_prefix:compactor";
80            cmd.env("_RJEM_MALLOC_CONF", conf); // prefixed for macos
81            cmd.env("MALLOC_CONF", conf); // unprefixed for linux
82        }
83
84        Self::apply_command_args(&mut cmd, &self.config)?;
85
86        if !self.config.user_managed {
87            ctx.run_command(ctx.tmux_run(cmd)?)?;
88            ctx.pb.set_message("started");
89        } else {
90            ctx.pb.set_message("user managed");
91            writeln!(
92                &mut ctx.log,
93                "Please use the following parameters to start the compactor:\n{}\n{} {}\n\n",
94                get_program_env_cmd(&cmd),
95                get_program_name(&cmd),
96                get_program_args(&cmd)
97            )?;
98        }
99
100        Ok(())
101    }
102
103    fn id(&self) -> String {
104        self.config.id.clone()
105    }
106}