risingwave_stream/from_proto/
eowc_gap_fill.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::gap_fill::FillStrategy;
20use risingwave_expr::expr::build_non_strict_from_prost;
21use risingwave_pb::stream_plan::EowcGapFillNode;
22use risingwave_storage::StateStore;
23
24use super::ExecutorBuilder;
25use crate::common::table::state_table::StateTableBuilder;
26use crate::error::StreamResult;
27use crate::executor::Executor;
28use crate::executor::eowc::{EowcGapFillExecutor, EowcGapFillExecutorArgs};
29use crate::task::ExecutorParams;
30
31pub struct EowcGapFillExecutorBuilder;
32
33impl ExecutorBuilder for EowcGapFillExecutorBuilder {
34 type Node = EowcGapFillNode;
35
36 async fn new_boxed_executor(
37 params: ExecutorParams,
38 node: &EowcGapFillNode,
39 store: impl StateStore,
40 ) -> StreamResult<Executor> {
41 let [input]: [_; 1] = params.input.try_into().unwrap();
42
43 let time_column_index = node.get_time_column_index() as usize;
44
45 let interval_expr_node = node.get_interval()?;
47 let gap_interval =
48 build_non_strict_from_prost(interval_expr_node, params.eval_error_report.clone())?;
49
50 let fill_columns: Vec<usize> = node
51 .get_fill_columns()
52 .iter()
53 .map(|&x| x as usize)
54 .collect();
55
56 let fill_strategies: Vec<FillStrategy> = node
57 .get_fill_strategies()
58 .iter()
59 .map(|s| match s.as_str() {
60 "locf" => Ok(FillStrategy::Locf),
61 "interpolate" => Ok(FillStrategy::Interpolate),
62 "null" => Ok(FillStrategy::Null),
63 _ => anyhow::bail!("unknown fill strategy: {}", s),
64 })
65 .collect::<anyhow::Result<_>>()?;
66
67 let fill_columns_with_strategies: HashMap<usize, FillStrategy> =
68 fill_columns.into_iter().zip_eq(fill_strategies).collect();
69
70 let vnodes = params.vnode_bitmap.map(Arc::new);
71
72 let buffer_table = StateTableBuilder::new(
73 node.get_buffer_table().as_ref().unwrap(),
74 store.clone(),
75 vnodes.clone(),
76 )
77 .forbid_preload_all_rows()
78 .build()
79 .await;
80
81 let prev_row_table =
82 StateTableBuilder::new(node.get_prev_row_table().as_ref().unwrap(), store, vnodes)
83 .forbid_preload_all_rows()
84 .build()
85 .await;
86
87 let exec = EowcGapFillExecutor::new(EowcGapFillExecutorArgs {
88 actor_ctx: params.actor_context,
89 input,
90 schema: params.info.schema.clone(),
91 buffer_table,
92 prev_row_table,
93 chunk_size: params.env.config().developer.chunk_size,
94 time_column_index,
95 fill_columns: fill_columns_with_strategies,
96 gap_interval,
97 });
98
99 Ok((params.info, exec).into())
100 }
101}