pub struct StreamingDeveloperConfig {Show 40 fields
    pub enable_executor_row_count: bool,
    pub connector_message_buffer_size: usize,
    pub unsafe_extreme_cache_size: usize,
    pub topn_cache_min_capacity: usize,
    pub chunk_size: usize,
    pub exchange_initial_permits: usize,
    pub exchange_batched_permits: usize,
    pub exchange_concurrent_barriers: usize,
    pub exchange_concurrent_dispatchers: usize,
    pub dml_channel_initial_permits: usize,
    pub hash_agg_max_dirty_groups_heap_size: usize,
    pub memory_controller_threshold_aggressive: f64,
    pub memory_controller_threshold_graceful: f64,
    pub memory_controller_threshold_stable: f64,
    pub memory_controller_eviction_factor_aggressive: f64,
    pub memory_controller_eviction_factor_graceful: f64,
    pub memory_controller_eviction_factor_stable: f64,
    pub memory_controller_update_interval_ms: usize,
    pub memory_controller_sequence_tls_step: u64,
    pub memory_controller_sequence_tls_lag: u64,
    pub enable_arrangement_backfill: bool,
    pub high_join_amplification_threshold: usize,
    pub enable_actor_tokio_metrics: bool,
    pub(super) exchange_connection_pool_size: Option<u16>,
    pub enable_auto_schema_change: bool,
    pub enable_shared_source: bool,
    pub switch_jdbc_pg_to_native: bool,
    pub max_barrier_batch_size: u32,
    pub hash_join_entry_state_max_rows: usize,
    pub now_progress_ratio: Option<f32>,
    pub enable_explain_analyze_stats: bool,
    pub compute_client_config: RpcClientConfig,
    pub iceberg_list_interval_sec: u64,
    pub iceberg_fetch_batch_size: u64,
    pub iceberg_sink_positional_delete_cache_size: usize,
    pub iceberg_sink_write_parquet_max_row_group_rows: usize,
    pub default_enable_mem_preload_state_table: bool,
    pub mem_preload_state_table_ids_whitelist: Vec<u32>,
    pub mem_preload_state_table_ids_blacklist: Vec<u32>,
    pub aggressive_noop_update_elimination: bool,
}Expand description
The subsections [streaming.developer].
It is put at StreamingConfig::developer.
Fields§
§enable_executor_row_count: boolSet to true to enable per-executor row count metrics. This will produce a lot of timeseries
and might affect the prometheus performance. If you only need actor input and output
rows data, see stream_actor_in_record_cnt and stream_actor_out_record_cnt instead.
connector_message_buffer_size: usizeThe capacity of the chunks in the channel that connects between ConnectorSource and
SourceExecutor.
unsafe_extreme_cache_size: usizeLimit number of the cached entries in an extreme aggregation call.
topn_cache_min_capacity: usizeMinimum cache size for TopN cache per group key.
chunk_size: usizeThe maximum size of the chunk produced by executor at a time.
exchange_initial_permits: usizeThe initial permits that a channel holds, i.e., the maximum row count can be buffered in the channel.
exchange_batched_permits: usizeThe permits that are batched to add back, for reducing the backward AddPermits messages
in remote exchange.
exchange_concurrent_barriers: usizeThe maximum number of concurrent barriers in an exchange channel.
exchange_concurrent_dispatchers: usizeThe concurrency for dispatching messages to different downstream jobs.
- 1means no concurrency, i.e., dispatch messages to downstream jobs one by one.
- 0means unlimited concurrency.
dml_channel_initial_permits: usizeThe initial permits for a dml channel, i.e., the maximum row count can be buffered in the channel.
hash_agg_max_dirty_groups_heap_size: usizeThe max heap size of dirty groups of HashAggExecutor.
memory_controller_threshold_aggressive: f64§memory_controller_threshold_graceful: f64§memory_controller_threshold_stable: f64§memory_controller_eviction_factor_aggressive: f64§memory_controller_eviction_factor_graceful: f64§memory_controller_eviction_factor_stable: f64§memory_controller_update_interval_ms: usize§memory_controller_sequence_tls_step: u64§memory_controller_sequence_tls_lag: u64§enable_arrangement_backfill: boolEnable arrangement backfill
If false, the arrangement backfill will be disabled,
even if session variable set.
If true, it’s decided by session variable streaming_use_arrangement_backfill (default true)
high_join_amplification_threshold: usizeIf number of hash join matches exceeds this threshold number, it will be logged.
enable_actor_tokio_metrics: boolActor tokio metrics is enabled if enable_actor_tokio_metrics is set or metrics level >= Debug.
exchange_connection_pool_size: Option<u16>The number of the connections for streaming remote exchange between two nodes.
If not specified, the value of server.connection_pool_size will be used.
enable_auto_schema_change: boolA flag to allow disabling the auto schema change handling
Enable shared source
If false, the shared source will be disabled,
even if session variable set.
If true, it’s decided by session variable streaming_use_shared_source (default true)
switch_jdbc_pg_to_native: boolWhen true, all jdbc sinks with connector=‘jdbc’ and jdbc.url=“jdbc:postgresql://…” will be switched from jdbc postgresql sinks to rust native (connector=‘postgres’) sinks.
max_barrier_batch_size: u32The maximum number of consecutive barriers allowed in a message when sent between actors.
hash_join_entry_state_max_rows: usizeConfigure the system-wide cache row cardinality of hash join. For example, if this is set to 1000, it means we can have at most 1000 rows in cache.
now_progress_ratio: Option<f32>§enable_explain_analyze_stats: boolEnable / Disable profiling stats used by EXPLAIN ANALYZE
compute_client_config: RpcClientConfig§iceberg_list_interval_sec: u64IcebergListExecutor: The interval in seconds for Iceberg source to list new files.
iceberg_fetch_batch_size: u64IcebergFetchExecutor: The number of files the executor will fetch concurrently in a batch.
iceberg_sink_positional_delete_cache_size: usizeIcebergSink: The size of the cache for positional delete in the sink.
iceberg_sink_write_parquet_max_row_group_rows: usizeIcebergSink: The maximum number of rows in a row group when writing Parquet files.
default_enable_mem_preload_state_table: boolWhether by default enable preloading all rows in memory for state table. If true, all capable state tables will preload its state to memory
mem_preload_state_table_ids_whitelist: Vec<u32>The list of state table ids to enable preloading all rows in memory for state table.
Only takes effect when default_enable_mem_preload_state_table is false.
mem_preload_state_table_ids_blacklist: Vec<u32>The list of state table ids to disable preloading all rows in memory for state table.
Only takes effect when default_enable_mem_preload_state_table is true.
aggressive_noop_update_elimination: boolEliminate unnecessary updates aggressively, even if it impacts performance. Enable this only if it’s confirmed that no-op updates are causing significant streaming amplification.
Implementations§
Trait Implementations§
Source§impl Clone for StreamingDeveloperConfig
 
impl Clone for StreamingDeveloperConfig
Source§fn clone(&self) -> StreamingDeveloperConfig
 
fn clone(&self) -> StreamingDeveloperConfig
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
 
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl Debug for StreamingDeveloperConfig
 
impl Debug for StreamingDeveloperConfig
Source§impl Default for StreamingDeveloperConfig
 
impl Default for StreamingDeveloperConfig
Source§impl<'de> Deserialize<'de> for StreamingDeveloperConfig
 
impl<'de> Deserialize<'de> for StreamingDeveloperConfig
Source§fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
    __D: Deserializer<'de>,
 
fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
    __D: Deserializer<'de>,
Auto Trait Implementations§
impl Freeze for StreamingDeveloperConfig
impl RefUnwindSafe for StreamingDeveloperConfig
impl Send for StreamingDeveloperConfig
impl Sync for StreamingDeveloperConfig
impl Unpin for StreamingDeveloperConfig
impl UnwindSafe for StreamingDeveloperConfig
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
    T: ?Sized,
 
impl<T> BorrowMut<T> for Twhere
    T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
 
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
    T: Clone,
 
impl<T> CloneToUninit for Twhere
    T: Clone,
§impl<T> Code for Twhere
    T: Serialize + DeserializeOwned,
 
impl<T> Code for Twhere
    T: Serialize + DeserializeOwned,
§impl<T> Conv for T
 
impl<T> Conv for T
§impl<T> FmtForward for T
 
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
    Self: Binary,
 
fn fmt_binary(self) -> FmtBinary<Self>where
    Self: Binary,
self to use its Binary implementation when Debug-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
    Self: Display,
 
fn fmt_display(self) -> FmtDisplay<Self>where
    Self: Display,
self to use its Display implementation when
Debug-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
    Self: LowerExp,
 
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
    Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
    Self: LowerHex,
 
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
    Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
    Self: Octal,
 
fn fmt_octal(self) -> FmtOctal<Self>where
    Self: Octal,
self to use its Octal implementation when Debug-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
    Self: Pointer,
 
fn fmt_pointer(self) -> FmtPointer<Self>where
    Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
    Self: UpperExp,
 
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
    Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
    Self: UpperHex,
 
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
    Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.§fn fmt_list(self) -> FmtList<Self>where
    &'a Self: for<'a> IntoIterator,
 
fn fmt_list(self) -> FmtList<Self>where
    &'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
 
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
 
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
 
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
 
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
 
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
 
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Instrument for T
 
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
 
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
 
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
 
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
 
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
 
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
 
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
 
fn into_request(self) -> Request<T>
T in a tonic::Request§impl<T> IntoResult<T> for T
 
impl<T> IntoResult<T> for T
type Err = Infallible
fn into_result(self) -> Result<T, <T as IntoResult<T>>::Err>
Source§impl<M> MetricVecRelabelExt for M
 
impl<M> MetricVecRelabelExt for M
Source§fn relabel(
    self,
    metric_level: MetricLevel,
    relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
 
fn relabel( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level.Source§fn relabel_n(
    self,
    metric_level: MetricLevel,
    relabel_threshold: MetricLevel,
    relabel_num: usize,
) -> RelabeledMetricVec<M>
 
fn relabel_n( self, metric_level: MetricLevel, relabel_threshold: MetricLevel, relabel_num: usize, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n.Source§fn relabel_debug_1(
    self,
    relabel_threshold: MetricLevel,
) -> RelabeledMetricVec<M>
 
fn relabel_debug_1( self, relabel_threshold: MetricLevel, ) -> RelabeledMetricVec<M>
RelabeledMetricVec::with_metric_level_relabel_n with metric_level set to
MetricLevel::Debug and relabel_num set to 1.§impl<T> Pipe for Twhere
    T: ?Sized,
 
impl<T> Pipe for Twhere
    T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
    Self: Sized,
 
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
    Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
    R: 'a,
 
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
    R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
    R: 'a,
 
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
    R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
 
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
    &'a mut self,
    func: impl FnOnce(&'a mut B) -> R,
) -> R
 
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
 
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
 
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
 
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.§impl<T> Pointable for T
 
impl<T> Pointable for T
§impl<T> Scope for T
 
impl<T> Scope for T
§impl<T> Tap for T
 
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
 
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
 
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
 
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
 
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
 
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
 
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
 
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
 
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
 
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
 
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
 
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
 
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
 
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.