risingwave_expr/sig/udf.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! UDF implementation interface.
//!
//! To support a new language or runtime for UDF, implement the interface in this module.
//!
//! See expr/impl/src/udf for the implementations.
use anyhow::{bail, Context, Result};
use enum_as_inner::EnumAsInner;
use futures::stream::BoxStream;
use risingwave_common::array::arrow::arrow_array_udf::{ArrayRef, BooleanArray, RecordBatch};
use risingwave_common::types::DataType;
/// The global registry of UDF implementations.
///
/// To register a new UDF implementation:
///
/// ```ignore
/// #[linkme::distributed_slice(UDF_IMPLS)]
/// static MY_UDF_LANGUAGE: UdfImplDescriptor = UdfImplDescriptor {...};
/// ```
#[linkme::distributed_slice]
pub static UDF_IMPLS: [UdfImplDescriptor];
/// Find a UDF implementation by language.
pub fn find_udf_impl(
language: &str,
runtime: Option<&str>,
link: Option<&str>,
) -> Result<&'static UdfImplDescriptor> {
let mut impls = UDF_IMPLS
.iter()
.filter(|desc| (desc.match_fn)(language, runtime, link));
let impl_ = impls.next().context(
"language not found.\nHINT: UDF feature flag may not be enabled during compilation",
)?;
if impls.next().is_some() {
bail!("multiple UDF implementations found for language: {language}");
}
Ok(impl_)
}
/// UDF implementation descriptor.
///
/// Every UDF implementation should provide 3 functions:
pub struct UdfImplDescriptor {
/// Returns if a function matches the implementation.
///
/// This function is used to determine which implementation to use for a UDF.
pub match_fn: fn(language: &str, runtime: Option<&str>, link: Option<&str>) -> bool,
/// Creates a function from options.
///
/// This function will be called when `create function` statement is executed on the frontend.
pub create_fn: fn(opts: CreateFunctionOptions<'_>) -> Result<CreateFunctionOutput>,
/// Builds UDF runtime from verified options.
///
/// This function will be called before the UDF is executed on the backend.
pub build_fn: fn(opts: UdfOptions<'_>) -> Result<Box<dyn UdfImpl>>,
}
/// Options for creating a function.
///
/// These information are parsed from `CREATE FUNCTION` statement.
/// Implementations should verify the options and return a `CreateFunctionOutput` in `create_fn`.
pub struct CreateFunctionOptions<'a> {
pub kind: UdfKind,
pub name: &'a str,
pub arg_names: &'a [String],
pub arg_types: &'a [DataType],
pub return_type: &'a DataType,
pub as_: Option<&'a str>,
pub using_link: Option<&'a str>,
pub using_base64_decoded: Option<&'a [u8]>,
}
/// Output of creating a function.
pub struct CreateFunctionOutput {
pub identifier: String,
pub body: Option<String>,
pub compressed_binary: Option<Vec<u8>>,
}
/// Options for building a UDF runtime.
pub struct UdfOptions<'a> {
pub kind: UdfKind,
pub body: Option<&'a str>,
pub compressed_binary: Option<&'a [u8]>,
pub link: Option<&'a str>,
pub identifier: &'a str,
pub arg_names: &'a [String],
pub return_type: &'a DataType,
pub always_retry_on_network_error: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
pub enum UdfKind {
Scalar,
Table,
Aggregate,
}
/// UDF implementation.
#[async_trait::async_trait]
pub trait UdfImpl: std::fmt::Debug + Send + Sync {
/// Call the scalar function.
async fn call(&self, input: &RecordBatch) -> Result<RecordBatch>;
/// Call the table function.
async fn call_table_function<'a>(
&'a self,
input: &'a RecordBatch,
) -> Result<BoxStream<'a, Result<RecordBatch>>>;
/// For aggregate function, create the initial state.
fn call_agg_create_state(&self) -> Result<ArrayRef> {
bail!("aggregate function is not supported");
}
/// For aggregate function, accumulate or retract the state.
fn call_agg_accumulate_or_retract(
&self,
_state: &ArrayRef,
_ops: &BooleanArray,
_input: &RecordBatch,
) -> Result<ArrayRef> {
bail!("aggregate function is not supported");
}
/// For aggregate function, get aggregate result from the state.
fn call_agg_finish(&self, _state: &ArrayRef) -> Result<ArrayRef> {
bail!("aggregate function is not supported");
}
/// Whether the UDF talks in legacy mode.
///
/// If true, decimal and jsonb types are mapped to Arrow `LargeBinary` and `LargeUtf8` types.
/// Otherwise, they are mapped to Arrow extension types.
/// See <https://github.com/risingwavelabs/arrow-udf/tree/main#extension-types>.
fn is_legacy(&self) -> bool {
false
}
/// Return the memory size consumed by UDF runtime in bytes.
///
/// If not available, return 0.
fn memory_usage(&self) -> usize {
0
}
}