risedev/task/
compute_node_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::path::{Path, PathBuf};
17use std::process::Command;
18
19use anyhow::Result;
20
21use super::{ExecuteContext, Task, risingwave_cmd};
22use crate::util::{get_program_args, get_program_env_cmd, get_program_name};
23use crate::{ComputeNodeConfig, add_meta_node, add_tempo_endpoint};
24
25pub struct ComputeNodeService {
26    config: ComputeNodeConfig,
27}
28
29impl ComputeNodeService {
30    pub fn new(config: ComputeNodeConfig) -> Result<Self> {
31        Ok(Self { config })
32    }
33
34    /// Apply command args according to config
35    pub fn apply_command_args(cmd: &mut Command, config: &ComputeNodeConfig) -> Result<()> {
36        cmd.arg("--listen-addr")
37            .arg(format!("{}:{}", config.listen_address, config.port))
38            .arg("--prometheus-listener-addr")
39            .arg(format!(
40                "{}:{}",
41                config.listen_address, config.exporter_port
42            ))
43            .arg("--advertise-addr")
44            .arg(format!("{}:{}", config.address, config.port))
45            .arg("--async-stack-trace")
46            .arg(&config.async_stack_trace)
47            .arg("--parallelism")
48            .arg(config.parallelism.to_string())
49            .arg("--total-memory-bytes")
50            .arg(config.total_memory_bytes.to_string())
51            .arg("--role")
52            .arg(&config.role)
53            .arg("--resource-group")
54            .arg(&config.resource_group);
55
56        let provide_meta_node = config.provide_meta_node.as_ref().unwrap();
57        add_meta_node(provide_meta_node, cmd)?;
58
59        let provide_tempo = config.provide_tempo.as_ref().unwrap();
60        add_tempo_endpoint(provide_tempo, cmd)?;
61
62        Ok(())
63    }
64}
65
66impl Task for ComputeNodeService {
67    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
68        ctx.service(self);
69        ctx.pb.set_message("starting...");
70
71        let mut cmd = risingwave_cmd("compute-node")?;
72
73        cmd.env(
74            "TOKIO_CONSOLE_BIND",
75            format!("127.0.0.1:{}", self.config.port + 1000),
76        );
77
78        if crate::util::is_env_set("RISEDEV_ENABLE_PROFILE") {
79            cmd.env(
80                "RW_PROFILE_PATH",
81                Path::new(&env::var("PREFIX_LOG")?).join(format!("profile-{}", self.id())),
82            );
83        }
84
85        if crate::util::is_env_set("RISEDEV_ENABLE_HEAP_PROFILE") {
86            // See https://linux.die.net/man/3/jemalloc for the descriptions of profiling options
87            let conf = "prof:true,lg_prof_interval:34,lg_prof_sample:19,prof_prefix:compute-node";
88            cmd.env("_RJEM_MALLOC_CONF", conf); // prefixed for macos
89            cmd.env("MALLOC_CONF", conf); // unprefixed for linux
90        }
91
92        Self::apply_command_args(&mut cmd, &self.config)?;
93        if self.config.enable_tiered_cache {
94            let prefix_data = env::var("PREFIX_DATA")?;
95            cmd.arg("--data-file-cache-dir").arg(
96                PathBuf::from(&prefix_data)
97                    .join("foyer")
98                    .join(self.config.port.to_string())
99                    .join("data"),
100            );
101            cmd.arg("--meta-file-cache-dir").arg(
102                PathBuf::from(&prefix_data)
103                    .join("foyer")
104                    .join(self.config.port.to_string())
105                    .join("meta"),
106            );
107        }
108
109        if !self.config.user_managed {
110            ctx.run_command(ctx.tmux_run(cmd)?)?;
111            ctx.pb.set_message("started");
112        } else {
113            ctx.pb.set_message("user managed");
114            writeln!(
115                &mut ctx.log,
116                "Please use the following parameters to start the compute node:\n{}\n{} {}\n\n",
117                get_program_env_cmd(&cmd),
118                get_program_name(&cmd),
119                get_program_args(&cmd)
120            )?;
121        }
122
123        Ok(())
124    }
125
126    fn id(&self) -> String {
127        self.config.id.clone()
128    }
129}