risedev/task/
compute_node_service.rs1use std::env;
16use std::path::{Path, PathBuf};
17use std::process::Command;
18
19use anyhow::Result;
20
21use super::{ExecuteContext, Task, risingwave_cmd};
22use crate::util::{get_program_args, get_program_env_cmd, get_program_name};
23use crate::{ComputeNodeConfig, add_meta_node, add_tempo_endpoint};
24
25pub struct ComputeNodeService {
26 config: ComputeNodeConfig,
27}
28
29impl ComputeNodeService {
30 pub fn new(config: ComputeNodeConfig) -> Result<Self> {
31 Ok(Self { config })
32 }
33
34 pub fn apply_command_args(cmd: &mut Command, config: &ComputeNodeConfig) -> Result<()> {
36 cmd.arg("--listen-addr")
37 .arg(format!("{}:{}", config.listen_address, config.port))
38 .arg("--prometheus-listener-addr")
39 .arg(format!(
40 "{}:{}",
41 config.listen_address, config.exporter_port
42 ))
43 .arg("--advertise-addr")
44 .arg(format!("{}:{}", config.address, config.port))
45 .arg("--async-stack-trace")
46 .arg(&config.async_stack_trace)
47 .arg("--parallelism")
48 .arg(config.parallelism.to_string())
49 .arg("--total-memory-bytes")
50 .arg(config.total_memory_bytes.to_string())
51 .arg("--role")
52 .arg(&config.role)
53 .arg("--resource-group")
54 .arg(&config.resource_group);
55
56 let provide_meta_node = config.provide_meta_node.as_ref().unwrap();
57 add_meta_node(provide_meta_node, cmd)?;
58
59 let provide_tempo = config.provide_tempo.as_ref().unwrap();
60 add_tempo_endpoint(provide_tempo, cmd)?;
61
62 Ok(())
63 }
64}
65
66impl Task for ComputeNodeService {
67 fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
68 ctx.service(self);
69 ctx.pb.set_message("starting...");
70
71 let mut cmd = risingwave_cmd("compute-node")?;
72
73 cmd.env(
74 "TOKIO_CONSOLE_BIND",
75 format!("127.0.0.1:{}", self.config.port + 1000),
76 );
77
78 if crate::util::is_env_set("RISEDEV_ENABLE_PROFILE") {
79 cmd.env(
80 "RW_PROFILE_PATH",
81 Path::new(&env::var("PREFIX_LOG")?).join(format!("profile-{}", self.id())),
82 );
83 }
84
85 if crate::util::is_env_set("RISEDEV_ENABLE_HEAP_PROFILE") {
86 let conf = "prof:true,lg_prof_interval:34,lg_prof_sample:19,prof_prefix:compute-node";
88 cmd.env("_RJEM_MALLOC_CONF", conf); cmd.env("MALLOC_CONF", conf); }
91
92 Self::apply_command_args(&mut cmd, &self.config)?;
93 if self.config.enable_tiered_cache {
94 let prefix_data = env::var("PREFIX_DATA")?;
95 cmd.arg("--data-file-cache-dir").arg(
96 PathBuf::from(&prefix_data)
97 .join("foyer")
98 .join(self.config.port.to_string())
99 .join("data"),
100 );
101 cmd.arg("--meta-file-cache-dir").arg(
102 PathBuf::from(&prefix_data)
103 .join("foyer")
104 .join(self.config.port.to_string())
105 .join("meta"),
106 );
107 }
108
109 if !self.config.user_managed {
110 ctx.run_command(ctx.tmux_run(cmd)?)?;
111 ctx.pb.set_message("started");
112 } else {
113 ctx.pb.set_message("user managed");
114 writeln!(
115 &mut ctx.log,
116 "Please use the following parameters to start the compute node:\n{}\n{} {}\n\n",
117 get_program_env_cmd(&cmd),
118 get_program_name(&cmd),
119 get_program_args(&cmd)
120 )?;
121 }
122
123 Ok(())
124 }
125
126 fn id(&self) -> String {
127 self.config.id.clone()
128 }
129}