risingwave_expr/sig/
udf.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! UDF implementation interface.
//!
//! To support a new language or runtime for UDF, implement the interface in this module.
//!
//! See expr/impl/src/udf for the implementations.

use anyhow::{bail, Context, Result};
use enum_as_inner::EnumAsInner;
use futures::stream::BoxStream;
use risingwave_common::array::arrow::arrow_array_udf::{ArrayRef, BooleanArray, RecordBatch};
use risingwave_common::types::DataType;

/// The global registry of UDF implementations.
///
/// To register a new UDF implementation:
///
/// ```ignore
/// #[linkme::distributed_slice(UDF_IMPLS)]
/// static MY_UDF_LANGUAGE: UdfImplDescriptor = UdfImplDescriptor {...};
/// ```
#[linkme::distributed_slice]
pub static UDF_IMPLS: [UdfImplDescriptor];

/// Find a UDF implementation by language.
pub fn find_udf_impl(
    language: &str,
    runtime: Option<&str>,
    link: Option<&str>,
) -> Result<&'static UdfImplDescriptor> {
    let mut impls = UDF_IMPLS
        .iter()
        .filter(|desc| (desc.match_fn)(language, runtime, link));
    let impl_ = impls.next().context(
        "language not found.\nHINT: UDF feature flag may not be enabled during compilation",
    )?;
    if impls.next().is_some() {
        bail!("multiple UDF implementations found for language: {language}");
    }
    Ok(impl_)
}

/// UDF implementation descriptor.
///
/// Every UDF implementation should provide 3 functions:
pub struct UdfImplDescriptor {
    /// Returns if a function matches the implementation.
    ///
    /// This function is used to determine which implementation to use for a UDF.
    pub match_fn: fn(language: &str, runtime: Option<&str>, link: Option<&str>) -> bool,

    /// Creates a function from options.
    ///
    /// This function will be called when `create function` statement is executed on the frontend.
    pub create_fn: fn(opts: CreateFunctionOptions<'_>) -> Result<CreateFunctionOutput>,

    /// Builds UDF runtime from verified options.
    ///
    /// This function will be called before the UDF is executed on the backend.
    pub build_fn: fn(opts: UdfOptions<'_>) -> Result<Box<dyn UdfImpl>>,
}

/// Options for creating a function.
///
/// These information are parsed from `CREATE FUNCTION` statement.
/// Implementations should verify the options and return a `CreateFunctionOutput` in `create_fn`.
pub struct CreateFunctionOptions<'a> {
    pub kind: UdfKind,
    pub name: &'a str,
    pub arg_names: &'a [String],
    pub arg_types: &'a [DataType],
    pub return_type: &'a DataType,
    pub as_: Option<&'a str>,
    pub using_link: Option<&'a str>,
    pub using_base64_decoded: Option<&'a [u8]>,
}

/// Output of creating a function.
pub struct CreateFunctionOutput {
    pub identifier: String,
    pub body: Option<String>,
    pub compressed_binary: Option<Vec<u8>>,
}

/// Options for building a UDF runtime.
pub struct UdfOptions<'a> {
    pub kind: UdfKind,
    pub body: Option<&'a str>,
    pub compressed_binary: Option<&'a [u8]>,
    pub link: Option<&'a str>,
    pub identifier: &'a str,
    pub arg_names: &'a [String],
    pub return_type: &'a DataType,
    pub always_retry_on_network_error: bool,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
pub enum UdfKind {
    Scalar,
    Table,
    Aggregate,
}

/// UDF implementation.
#[async_trait::async_trait]
pub trait UdfImpl: std::fmt::Debug + Send + Sync {
    /// Call the scalar function.
    async fn call(&self, input: &RecordBatch) -> Result<RecordBatch>;

    /// Call the table function.
    async fn call_table_function<'a>(
        &'a self,
        input: &'a RecordBatch,
    ) -> Result<BoxStream<'a, Result<RecordBatch>>>;

    /// For aggregate function, create the initial state.
    fn call_agg_create_state(&self) -> Result<ArrayRef> {
        bail!("aggregate function is not supported");
    }

    /// For aggregate function, accumulate or retract the state.
    fn call_agg_accumulate_or_retract(
        &self,
        _state: &ArrayRef,
        _ops: &BooleanArray,
        _input: &RecordBatch,
    ) -> Result<ArrayRef> {
        bail!("aggregate function is not supported");
    }

    /// For aggregate function, get aggregate result from the state.
    fn call_agg_finish(&self, _state: &ArrayRef) -> Result<ArrayRef> {
        bail!("aggregate function is not supported");
    }

    /// Whether the UDF talks in legacy mode.
    ///
    /// If true, decimal and jsonb types are mapped to Arrow `LargeBinary` and `LargeUtf8` types.
    /// Otherwise, they are mapped to Arrow extension types.
    /// See <https://github.com/risingwavelabs/arrow-udf/tree/main#extension-types>.
    fn is_legacy(&self) -> bool {
        false
    }

    /// Return the memory size consumed by UDF runtime in bytes.
    ///
    /// If not available, return 0.
    fn memory_usage(&self) -> usize {
        0
    }
}