risingwave_ctl/common/
meta_service.rs1use std::env;
16use std::sync::Arc;
17
18use anyhow::{Result, bail};
19use risingwave_common::config::MetaConfig;
20use risingwave_common::util::addr::HostAddr;
21use risingwave_pb::common::WorkerType;
22use risingwave_pb::common::worker_node::Property;
23use risingwave_rpc_client::MetaClient;
24
25pub struct MetaServiceOpts {
26 pub meta_addr: String,
27}
28
29impl MetaServiceOpts {
30 pub fn from_env() -> Result<Self> {
36 let meta_addr = match env::var("RW_META_ADDR") {
37 Ok(url) => {
38 tracing::info!("using meta addr from `RW_META_ADDR`: {}", url);
39 url
40 }
41 Err(_) => {
42 const MESSAGE: &str = "env variable `RW_META_ADDR` not found.
43
44For `./risedev d` use cases, please do the following:
45* use `./risedev ctl` to use risectl.
46
47For production use cases,
48* please set `RW_META_ADDR` to the address of the meta node.
49
50Note: the default value of `RW_META_ADDR` is 'http://127.0.0.1:5690'.";
51 bail!(MESSAGE);
52 }
53 };
54 Ok(Self { meta_addr })
55 }
56
57 pub async fn create_meta_client(&self) -> Result<MetaClient> {
59 let (client, _) = MetaClient::register_new(
60 self.meta_addr.parse()?,
61 WorkerType::RiseCtl,
62 &get_new_ctl_identity(),
63 Property::default(),
64 Arc::new(MetaConfig::default()),
65 )
66 .await;
67 let worker_id = client.worker_id();
68 tracing::info!("registered as RiseCtl worker, worker_id = {}", worker_id);
69 Ok(client)
70 }
71}
72
73fn get_new_ctl_identity() -> HostAddr {
74 HostAddr {
75 host: format!("risectl-{}", uuid::Uuid::new_v4()),
76 port: 0,
77 }
78}