risingwave_frontend/expr/function_impl/
rw_fragment_vnodes.rs1use std::sync::Arc;
16
17use risingwave_common::types::JsonbVal;
18use risingwave_expr::{ExprError, Result, capture_context, function};
19use serde_json::json;
20
21use super::context::META_CLIENT;
22use crate::catalog::FragmentId;
23use crate::meta_client::FrontendMetaClient;
24
25#[function("rw_fragment_vnodes(int4) -> jsonb", volatile)]
26async fn rw_fragment_vnodes(fragment_id: i32) -> Result<JsonbVal> {
27 rw_fragment_vnodes_impl_captured((fragment_id as u32).into()).await
28}
29
30#[capture_context(META_CLIENT)]
31async fn rw_fragment_vnodes_impl(
32 meta_client: &Arc<dyn FrontendMetaClient>,
33 fragment_id: FragmentId,
34) -> Result<JsonbVal> {
35 let actors = meta_client
36 .get_fragment_vnodes(fragment_id)
37 .await
38 .map_err(|e| ExprError::Internal(e.into()))?;
39
40 let result: serde_json::Map<String, serde_json::Value> = actors
41 .into_iter()
42 .map(|(actor_id, vnode_indices)| (actor_id.to_string(), json!(vnode_indices)))
43 .collect();
44
45 Ok(json!(result).into())
46}