risedev/task/
compactor_service.rs1use std::env;
16use std::io::Write;
17use std::path::Path;
18use std::process::Command;
19
20use anyhow::Result;
21
22use crate::util::{get_program_args, get_program_env_cmd, get_program_name};
23use crate::{CompactorConfig, ExecuteContext, Task, add_meta_node, add_tempo_endpoint};
24
25pub struct CompactorService {
26 config: CompactorConfig,
27}
28
29impl CompactorService {
30 pub fn new(config: CompactorConfig) -> Result<Self> {
31 Ok(Self { config })
32 }
33
34 pub fn apply_command_args(cmd: &mut Command, config: &CompactorConfig) -> Result<()> {
36 cmd.arg("--listen-addr")
37 .arg(format!("{}:{}", config.listen_address, config.port))
38 .arg("--prometheus-listener-addr")
39 .arg(format!(
40 "{}:{}",
41 config.listen_address, config.exporter_port
42 ))
43 .arg("--advertise-addr")
44 .arg(format!("{}:{}", config.address, config.port))
45 .arg("--compactor-mode")
46 .arg(&config.compactor_mode);
47 if let Some(compaction_worker_threads_number) =
48 config.compaction_worker_threads_number.as_ref()
49 {
50 cmd.arg("--compaction-worker-threads-number")
51 .arg(format!("{}", compaction_worker_threads_number));
52 }
53
54 let provide_meta_node = config.provide_meta_node.as_ref().unwrap();
55 add_meta_node(provide_meta_node, cmd)?;
56
57 let provide_tempo = config.provide_tempo.as_ref().unwrap();
58 add_tempo_endpoint(provide_tempo, cmd)?;
59
60 Ok(())
61 }
62}
63
64impl Task for CompactorService {
65 fn execute(&mut self, ctx: &mut ExecuteContext<impl Write>) -> Result<()> {
66 ctx.service(self);
67 ctx.pb.set_message("starting...");
68
69 let mut cmd = ctx.risingwave_cmd("compactor")?;
70
71 if crate::util::is_env_set("RISEDEV_ENABLE_PROFILE") {
72 cmd.env(
73 "RW_PROFILE_PATH",
74 Path::new(&env::var("PREFIX_LOG")?).join(format!("profile-{}", self.id())),
75 );
76 }
77
78 if crate::util::is_env_set("RISEDEV_ENABLE_HEAP_PROFILE") {
79 let conf = "prof:true,lg_prof_interval:34,lg_prof_sample:19,prof_prefix:compactor";
81 cmd.env("_RJEM_MALLOC_CONF", conf); cmd.env("MALLOC_CONF", conf); }
84
85 Self::apply_command_args(&mut cmd, &self.config)?;
86
87 if !self.config.user_managed {
88 ctx.run_command(ctx.tmux_run(cmd)?)?;
89 ctx.pb.set_message("started");
90 } else {
91 ctx.pb.set_message("user managed");
92 writeln!(
93 &mut ctx.log,
94 "Please use the following parameters to start the compactor:\n{}\n{} {}\n\n",
95 get_program_env_cmd(&cmd),
96 get_program_name(&cmd),
97 get_program_args(&cmd)
98 )?;
99 }
100
101 Ok(())
102 }
103
104 fn id(&self) -> String {
105 self.config.id.clone()
106 }
107}