risingwave_frontend/expr/function_impl/
rw_fragment_vnodes.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::types::JsonbVal;
18use risingwave_expr::{ExprError, Result, capture_context, function};
19use serde_json::json;
20
21use super::context::META_CLIENT;
22use crate::catalog::FragmentId;
23use crate::meta_client::FrontendMetaClient;
24
25#[function("rw_fragment_vnodes(int4) -> jsonb", volatile)]
26async fn rw_fragment_vnodes(fragment_id: i32) -> Result<JsonbVal> {
27    rw_fragment_vnodes_impl_captured((fragment_id as u32).into()).await
28}
29
30#[capture_context(META_CLIENT)]
31async fn rw_fragment_vnodes_impl(
32    meta_client: &Arc<dyn FrontendMetaClient>,
33    fragment_id: FragmentId,
34) -> Result<JsonbVal> {
35    let actors = meta_client
36        .get_fragment_vnodes(fragment_id)
37        .await
38        .map_err(|e| ExprError::Internal(e.into()))?;
39
40    let result: serde_json::Map<String, serde_json::Value> = actors
41        .into_iter()
42        .map(|(actor_id, vnode_indices)| (actor_id.to_string(), json!(vnode_indices)))
43        .collect();
44
45    Ok(json!(result).into())
46}