risedev/task/
compactor_service.rs1use std::env;
16use std::io::Write;
17use std::path::Path;
18use std::process::Command;
19
20use anyhow::Result;
21
22use super::risingwave_cmd;
23use crate::util::{get_program_args, get_program_env_cmd, get_program_name};
24use crate::{CompactorConfig, ExecuteContext, Task, add_meta_node, add_tempo_endpoint};
25
26pub struct CompactorService {
27 config: CompactorConfig,
28}
29
30impl CompactorService {
31 pub fn new(config: CompactorConfig) -> Result<Self> {
32 Ok(Self { config })
33 }
34
35 pub fn apply_command_args(cmd: &mut Command, config: &CompactorConfig) -> Result<()> {
37 cmd.arg("--listen-addr")
38 .arg(format!("{}:{}", config.listen_address, config.port))
39 .arg("--prometheus-listener-addr")
40 .arg(format!(
41 "{}:{}",
42 config.listen_address, config.exporter_port
43 ))
44 .arg("--advertise-addr")
45 .arg(format!("{}:{}", config.address, config.port));
46 if let Some(compaction_worker_threads_number) =
47 config.compaction_worker_threads_number.as_ref()
48 {
49 cmd.arg("--compaction-worker-threads-number")
50 .arg(format!("{}", compaction_worker_threads_number));
51 }
52
53 let provide_meta_node = config.provide_meta_node.as_ref().unwrap();
54 add_meta_node(provide_meta_node, cmd)?;
55
56 let provide_tempo = config.provide_tempo.as_ref().unwrap();
57 add_tempo_endpoint(provide_tempo, cmd)?;
58
59 Ok(())
60 }
61}
62
63impl Task for CompactorService {
64 fn execute(&mut self, ctx: &mut ExecuteContext<impl Write>) -> Result<()> {
65 ctx.service(self);
66 ctx.pb.set_message("starting...");
67
68 let mut cmd = risingwave_cmd("compactor")?;
69
70 if crate::util::is_env_set("RISEDEV_ENABLE_PROFILE") {
71 cmd.env(
72 "RW_PROFILE_PATH",
73 Path::new(&env::var("PREFIX_LOG")?).join(format!("profile-{}", self.id())),
74 );
75 }
76
77 if crate::util::is_env_set("RISEDEV_ENABLE_HEAP_PROFILE") {
78 let conf = "prof:true,lg_prof_interval:34,lg_prof_sample:19,prof_prefix:compactor";
80 cmd.env("_RJEM_MALLOC_CONF", conf); cmd.env("MALLOC_CONF", conf); }
83
84 Self::apply_command_args(&mut cmd, &self.config)?;
85
86 if !self.config.user_managed {
87 ctx.run_command(ctx.tmux_run(cmd)?)?;
88 ctx.pb.set_message("started");
89 } else {
90 ctx.pb.set_message("user managed");
91 writeln!(
92 &mut ctx.log,
93 "Please use the following parameters to start the compactor:\n{}\n{} {}\n\n",
94 get_program_env_cmd(&cmd),
95 get_program_name(&cmd),
96 get_program_args(&cmd)
97 )?;
98 }
99
100 Ok(())
101 }
102
103 fn id(&self) -> String {
104 self.config.id.clone()
105 }
106}