risingwave_frontend_macro/
lib.rs
1use proc_macro::TokenStream;
16use proc_macro2::TokenStream as TokenStream2;
17use quote::{format_ident, quote};
18use syn::parse::{Parse, ParseStream};
19use syn::{Ident, ItemFn, ItemStruct, LitStr, Result, Token, parse_macro_input};
20
21#[proc_macro_attribute]
22pub fn system_catalog(attr: TokenStream, item: TokenStream) -> TokenStream {
23 let mut input = item.clone();
24 let attr = parse_macro_input!(attr as Attr);
25 let item = parse_macro_input!(item as syn::Item);
26
27 match system_catalog_inner(attr, item) {
28 Ok(output) => {
30 input.extend(TokenStream::from(output));
31 input
32 }
33 Err(err) => err.to_compile_error().into(),
34 }
35}
36
37fn system_catalog_inner(attr: Attr, item: syn::Item) -> Result<TokenStream2> {
38 match item {
39 syn::Item::Fn(item_fn) => gen_sys_table(attr, item_fn),
40 syn::Item::Struct(item_struct) => gen_sys_view(attr, item_struct),
41 _ => Err(syn::Error::new_spanned(item, "expect function or struct")),
42 }
43}
44
45struct Attr {
46 kind: Ident,
47 schema_name: String,
48 table_name: String,
49 sql: Option<String>,
50}
51
52impl Parse for Attr {
53 fn parse(input: ParseStream<'_>) -> Result<Self> {
54 let kind = input.parse::<syn::Ident>()?;
55 input.parse::<Token![,]>()?;
56 let name = input.parse::<LitStr>()?;
57 let full_name = name.value();
58 let (schema_name, table_name) = full_name
59 .split_once('.')
60 .ok_or_else(|| syn::Error::new_spanned(name, "expect \"schema.table\""))?;
61 let sql = if input.parse::<Token![,]>().is_ok() {
62 Some(input.parse::<LitStr>()?.value())
63 } else {
64 None
65 };
66 Ok(Attr {
67 kind,
68 schema_name: schema_name.to_string(),
69 table_name: table_name.to_string(),
70 sql,
71 })
72 }
73}
74
75fn strip_outer_type<'a>(ty: &'a syn::Type, type_: &str) -> Option<&'a syn::Type> {
77 let syn::Type::Path(path) = ty else {
78 return None;
79 };
80 let seg = path.path.segments.last()?;
81 if seg.ident != type_ {
82 return None;
83 }
84 let syn::PathArguments::AngleBracketed(args) = &seg.arguments else {
85 return None;
86 };
87 let Some(syn::GenericArgument::Type(ty)) = args.args.first() else {
88 return None;
89 };
90 Some(ty)
91}
92
93fn gen_sys_table(attr: Attr, item_fn: ItemFn) -> Result<TokenStream2> {
94 if attr.kind != "table" {
95 return Err(syn::Error::new_spanned(attr.kind, "expect `table`"));
96 }
97
98 let schema_name = &attr.schema_name;
99 let table_name = &attr.table_name;
100 let gen_fn_name = format_ident!("{}_{}", attr.schema_name, attr.table_name);
101 let user_fn_name = item_fn.sig.ident;
102
103 let return_type_error =
104 || syn::Error::new_spanned(&item_fn.sig.output, "expect `-> Result<Vec<T>>`");
105 let syn::ReturnType::Type(_, ty) = &item_fn.sig.output else {
106 return Err(return_type_error());
107 };
108 let (return_result, ty) = match strip_outer_type(ty, "Result") {
109 Some(ty) => (true, ty),
110 None => (false, ty.as_ref()),
111 };
112 let struct_type = strip_outer_type(ty, "Vec").ok_or_else(return_type_error)?;
113 let _await = item_fn.sig.asyncness.map(|_| quote!(.await));
114 let handle_error = return_result.then(|| quote!(?));
115 let chunk_size = 1024usize;
116
117 Ok(quote! {
118 #[linkme::distributed_slice(crate::catalog::system_catalog::SYS_CATALOGS_SLICE)]
119 #[unsafe(no_mangle)] fn #gen_fn_name() -> crate::catalog::system_catalog::BuiltinCatalog {
121 const _: () = {
122 assert!(#struct_type::PRIMARY_KEY.is_some(), "primary key is required for system table");
123 };
124
125 #[futures_async_stream::try_stream(boxed, ok = risingwave_common::array::DataChunk, error = risingwave_common::error::BoxedError)]
126 async fn function(reader: &crate::catalog::system_catalog::SysCatalogReaderImpl) {
127 let rows = #user_fn_name(reader) #_await #handle_error;
128 let mut builder = #struct_type::data_chunk_builder(#chunk_size);
129 for row in rows {
130 if let Some(chunk) = builder.append_one_row(row.into_owned_row()) {
131 yield chunk;
132 }
133 }
134 if let Some(chunk) = builder.consume_all() {
135 yield chunk;
136 }
137 }
138
139 crate::catalog::system_catalog::BuiltinCatalog::Table(crate::catalog::system_catalog::BuiltinTable {
140 name: #table_name,
141 schema: #schema_name,
142 columns: #struct_type::fields(),
143 pk: #struct_type::PRIMARY_KEY.unwrap(),
144 function,
145 })
146 }
147 })
148}
149
150fn gen_sys_view(attr: Attr, item_struct: ItemStruct) -> Result<TokenStream2> {
151 if attr.kind != "view" {
152 return Err(syn::Error::new_spanned(attr.kind, "expect `view`"));
153 }
154 let schema_name = &attr.schema_name;
155 let table_name = &attr.table_name;
156 let gen_fn_name = format_ident!("{}_{}", attr.schema_name, attr.table_name);
157 let struct_type = &item_struct.ident;
158
159 let sql = if let Some(sql) = attr.sql {
160 quote! { #sql.into() }
161 } else {
162 quote! { crate::catalog::system_catalog::infer_dummy_view_sql(&fields) }
163 };
164
165 Ok(quote! {
166 #[linkme::distributed_slice(crate::catalog::system_catalog::SYS_CATALOGS_SLICE)]
167 #[unsafe(no_mangle)] fn #gen_fn_name() -> crate::catalog::system_catalog::BuiltinCatalog {
169 let fields = #struct_type::fields();
170 crate::catalog::system_catalog::BuiltinCatalog::View(crate::catalog::system_catalog::BuiltinView {
171 name: #table_name,
172 schema: #schema_name,
173 sql: #sql,
174 columns: fields,
175 })
176 }
177 })
178}