risingwave_pb/
stream_service.rs

1// This file is @generated by prost-build.
2#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct InjectBarrierRequest {
5    #[prost(string, tag = "1")]
6    pub request_id: ::prost::alloc::string::String,
7    #[prost(message, optional, tag = "2")]
8    pub barrier: ::core::option::Option<super::stream_plan::Barrier>,
9    #[prost(uint32, tag = "3", wrapper = "crate::id::DatabaseId")]
10    pub database_id: crate::id::DatabaseId,
11    #[prost(uint32, repeated, tag = "4", wrapper = "crate::id::ActorId")]
12    pub actor_ids_to_collect: ::prost::alloc::vec::Vec<crate::id::ActorId>,
13    #[prost(uint32, repeated, tag = "5", wrapper = "crate::id::TableId")]
14    pub table_ids_to_sync: ::prost::alloc::vec::Vec<crate::id::TableId>,
15    #[prost(uint32, tag = "6")]
16    pub partial_graph_id: u32,
17    #[prost(message, repeated, tag = "9")]
18    pub actors_to_build: ::prost::alloc::vec::Vec<
19        inject_barrier_request::FragmentBuildActorInfo,
20    >,
21}
22/// Nested message and enum types in `InjectBarrierRequest`.
23pub mod inject_barrier_request {
24    #[derive(prost_helpers::AnyPB)]
25    #[derive(Clone, PartialEq, ::prost::Message)]
26    pub struct FragmentBuildActorInfo {
27        #[prost(uint32, tag = "1", wrapper = "crate::id::FragmentId")]
28        pub fragment_id: crate::id::FragmentId,
29        #[prost(message, optional, tag = "2")]
30        pub node: ::core::option::Option<super::super::stream_plan::StreamNode>,
31        #[prost(message, repeated, tag = "3")]
32        pub actors: ::prost::alloc::vec::Vec<BuildActorInfo>,
33    }
34    #[derive(prost_helpers::AnyPB)]
35    #[derive(Clone, PartialEq, ::prost::Message)]
36    pub struct BuildActorInfo {
37        #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
38        pub actor_id: crate::id::ActorId,
39        #[prost(map = "uint32, message", tag = "2", wrapper = "crate::id::FragmentId")]
40        pub fragment_upstreams: ::std::collections::HashMap<
41            crate::id::FragmentId,
42            build_actor_info::UpstreamActors,
43        >,
44        #[prost(message, repeated, tag = "3")]
45        pub dispatchers: ::prost::alloc::vec::Vec<super::super::stream_plan::Dispatcher>,
46        #[prost(message, optional, tag = "4")]
47        pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
48        #[prost(string, tag = "5")]
49        pub mview_definition: ::prost::alloc::string::String,
50        #[prost(message, optional, tag = "6")]
51        pub expr_context: ::core::option::Option<super::super::plan_common::ExprContext>,
52        #[prost(string, tag = "9")]
53        pub config_override: ::prost::alloc::string::String,
54        #[prost(uint32, repeated, tag = "7")]
55        pub initial_subscriber_ids: ::prost::alloc::vec::Vec<u32>,
56    }
57    /// Nested message and enum types in `BuildActorInfo`.
58    pub mod build_actor_info {
59        #[derive(prost_helpers::AnyPB)]
60        #[derive(Clone, PartialEq, ::prost::Message)]
61        pub struct UpstreamActors {
62            #[prost(message, repeated, tag = "1")]
63            pub actors: ::prost::alloc::vec::Vec<super::super::super::common::ActorInfo>,
64        }
65    }
66}
67#[derive(prost_helpers::AnyPB)]
68#[derive(Clone, PartialEq, ::prost::Message)]
69pub struct BarrierCompleteResponse {
70    #[prost(string, tag = "1")]
71    pub request_id: ::prost::alloc::string::String,
72    #[prost(message, optional, tag = "2")]
73    pub status: ::core::option::Option<super::common::Status>,
74    #[prost(message, repeated, tag = "3")]
75    pub create_mview_progress: ::prost::alloc::vec::Vec<
76        barrier_complete_response::CreateMviewProgress,
77    >,
78    #[prost(message, repeated, tag = "4")]
79    pub synced_sstables: ::prost::alloc::vec::Vec<
80        barrier_complete_response::LocalSstableInfo,
81    >,
82    #[prost(uint32, tag = "5", wrapper = "crate::id::WorkerId")]
83    pub worker_id: crate::id::WorkerId,
84    #[prost(map = "uint32, message", tag = "6", wrapper = "crate::id::TableId")]
85    pub table_watermarks: ::std::collections::HashMap<
86        crate::id::TableId,
87        super::hummock::TableWatermarks,
88    >,
89    #[prost(message, repeated, tag = "7")]
90    pub old_value_sstables: ::prost::alloc::vec::Vec<super::hummock::SstableInfo>,
91    #[prost(uint32, tag = "8")]
92    pub partial_graph_id: u32,
93    /// prev_epoch of barrier
94    #[prost(uint64, tag = "9")]
95    pub epoch: u64,
96    #[prost(uint32, tag = "10", wrapper = "crate::id::DatabaseId")]
97    pub database_id: crate::id::DatabaseId,
98    #[prost(message, repeated, tag = "11")]
99    pub load_finished_sources: ::prost::alloc::vec::Vec<
100        barrier_complete_response::LoadFinishedSource,
101    >,
102    #[prost(map = "uint32, message", tag = "12", wrapper = "crate::id::TableId")]
103    pub vector_index_adds: ::std::collections::HashMap<
104        crate::id::TableId,
105        super::hummock::vector_index_delta::VectorIndexAdds,
106    >,
107    #[prost(message, repeated, tag = "13")]
108    pub cdc_table_backfill_progress: ::prost::alloc::vec::Vec<
109        barrier_complete_response::CdcTableBackfillProgress,
110    >,
111    /// Used for truncating tables in storage layer.
112    /// MaterializeExecutor reports the tables to truncate, and then
113    /// meta will apply truncate operation in next commit epoch.
114    #[prost(uint32, repeated, tag = "14", wrapper = "crate::id::TableId")]
115    pub truncate_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
116    /// Used for reporting materialized view refresh completion.
117    /// MaterializeExecutor reports when refresh has finished, and then
118    /// meta will update the table's refresh state to Finished.
119    #[prost(uint32, repeated, tag = "15", wrapper = "crate::id::TableId")]
120    pub refresh_finished_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
121    #[prost(message, repeated, tag = "16")]
122    pub list_finished_sources: ::prost::alloc::vec::Vec<
123        barrier_complete_response::ListFinishedSource,
124    >,
125}
126/// Nested message and enum types in `BarrierCompleteResponse`.
127pub mod barrier_complete_response {
128    #[derive(prost_helpers::AnyPB)]
129    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
130    pub struct CreateMviewProgress {
131        /// Note: ideally we should use `executor_id`, but `actor_id` is ok-ish.
132        /// See <<https://github.com/risingwavelabs/risingwave/issues/6236>.>
133        #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
134        pub backfill_actor_id: crate::id::ActorId,
135        #[prost(bool, tag = "2")]
136        pub done: bool,
137        /// MV backfill snapshot read epoch (0 for Done / Source backfill)
138        #[prost(uint64, tag = "3")]
139        pub consumed_epoch: u64,
140        /// MV backfill snapshot read rows / Source backfilled rows
141        #[prost(uint64, tag = "4")]
142        pub consumed_rows: u64,
143        #[prost(uint64, tag = "5")]
144        pub pending_epoch_lag: u64,
145        /// Buffered rows that are yet to be consumed (used by locality backfill to report precise progress)
146        #[prost(uint64, tag = "6")]
147        pub buffered_rows: u64,
148        #[prost(uint32, tag = "7", wrapper = "crate::id::FragmentId")]
149        pub fragment_id: crate::id::FragmentId,
150    }
151    #[derive(prost_helpers::AnyPB)]
152    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
153    pub struct CdcTableBackfillProgress {
154        #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
155        pub actor_id: crate::id::ActorId,
156        #[prost(uint64, tag = "2")]
157        pub epoch: u64,
158        #[prost(bool, tag = "3")]
159        pub done: bool,
160        #[prost(int64, tag = "4")]
161        pub split_id_start_inclusive: i64,
162        #[prost(int64, tag = "5")]
163        pub split_id_end_inclusive: i64,
164        #[prost(uint64, tag = "6")]
165        pub generation: u64,
166        #[prost(uint32, tag = "7", wrapper = "crate::id::FragmentId")]
167        pub fragment_id: crate::id::FragmentId,
168    }
169    #[derive(prost_helpers::AnyPB)]
170    #[derive(Clone, PartialEq, ::prost::Message)]
171    pub struct LocalSstableInfo {
172        #[prost(message, optional, tag = "2")]
173        pub sst: ::core::option::Option<super::super::hummock::SstableInfo>,
174        #[prost(map = "uint32, message", tag = "3", wrapper = "crate::id::TableId")]
175        pub table_stats_map: ::std::collections::HashMap<
176            crate::id::TableId,
177            super::super::hummock::TableStats,
178        >,
179        #[prost(uint64, tag = "4")]
180        pub created_at: u64,
181    }
182    /// Used for refreshable batch source.
183    /// SourceExecutor reports the source load is finished, and then
184    /// meta will issue a LoadFinish barrier to notify MaterializeExecutor to start diff calculation.
185    #[derive(prost_helpers::AnyPB)]
186    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
187    pub struct LoadFinishedSource {
188        /// The actor that reported the completion event.
189        #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
190        pub reporter_actor_id: crate::id::ActorId,
191        /// The table ID for the refreshable batch source.
192        #[prost(uint32, tag = "2", wrapper = "crate::id::TableId")]
193        pub table_id: crate::id::TableId,
194        /// The source identifier associated with the finished load.
195        #[prost(uint32, tag = "3", wrapper = "crate::id::SourceId")]
196        pub associated_source_id: crate::id::SourceId,
197    }
198    /// SourceExecutor reports the source list is finished, and then
199    /// meta will issue a ListFinish barrier to notify the source to start loading.
200    #[derive(prost_helpers::AnyPB)]
201    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
202    pub struct ListFinishedSource {
203        /// The actor that reported the completion event.
204        #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
205        pub reporter_actor_id: crate::id::ActorId,
206        /// The table ID for the refreshable batch source.
207        #[prost(uint32, tag = "2", wrapper = "crate::id::TableId")]
208        pub table_id: crate::id::TableId,
209        /// The source identifier associated with the completed listing.
210        #[prost(uint32, tag = "3", wrapper = "crate::id::SourceId")]
211        pub associated_source_id: crate::id::SourceId,
212    }
213}
214#[derive(prost_helpers::AnyPB)]
215#[derive(Clone, PartialEq, ::prost::Message)]
216pub struct StreamingControlStreamRequest {
217    #[prost(oneof = "streaming_control_stream_request::Request", tags = "1, 2, 3, 4, 5")]
218    pub request: ::core::option::Option<streaming_control_stream_request::Request>,
219}
220/// Nested message and enum types in `StreamingControlStreamRequest`.
221pub mod streaming_control_stream_request {
222    #[derive(prost_helpers::AnyPB)]
223    #[derive(Clone, PartialEq, ::prost::Message)]
224    pub struct InitRequest {
225        #[prost(string, tag = "1")]
226        pub term_id: ::prost::alloc::string::String,
227    }
228    #[derive(prost_helpers::AnyPB)]
229    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
230    pub struct CreatePartialGraphRequest {
231        #[prost(uint32, tag = "1")]
232        pub partial_graph_id: u32,
233        #[prost(uint32, tag = "2", wrapper = "crate::id::DatabaseId")]
234        pub database_id: crate::id::DatabaseId,
235    }
236    #[derive(prost_helpers::AnyPB)]
237    #[derive(Clone, PartialEq, ::prost::Message)]
238    pub struct RemovePartialGraphRequest {
239        #[prost(uint32, repeated, tag = "1")]
240        pub partial_graph_ids: ::prost::alloc::vec::Vec<u32>,
241        #[prost(uint32, tag = "2", wrapper = "crate::id::DatabaseId")]
242        pub database_id: crate::id::DatabaseId,
243    }
244    #[derive(prost_helpers::AnyPB)]
245    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
246    pub struct ResetDatabaseRequest {
247        #[prost(uint32, tag = "1", wrapper = "crate::id::DatabaseId")]
248        pub database_id: crate::id::DatabaseId,
249        #[prost(uint32, tag = "2")]
250        pub reset_request_id: u32,
251    }
252    #[derive(prost_helpers::AnyPB)]
253    #[derive(Clone, PartialEq, ::prost::Oneof)]
254    pub enum Request {
255        #[prost(message, tag = "1")]
256        Init(InitRequest),
257        #[prost(message, tag = "2")]
258        InjectBarrier(super::InjectBarrierRequest),
259        #[prost(message, tag = "3")]
260        RemovePartialGraph(RemovePartialGraphRequest),
261        #[prost(message, tag = "4")]
262        CreatePartialGraph(CreatePartialGraphRequest),
263        #[prost(message, tag = "5")]
264        ResetDatabase(ResetDatabaseRequest),
265    }
266}
267#[derive(prost_helpers::AnyPB)]
268#[derive(Clone, PartialEq, ::prost::Message)]
269pub struct ScoredError {
270    #[prost(string, tag = "1")]
271    pub err_msg: ::prost::alloc::string::String,
272    #[prost(int32, tag = "2")]
273    pub score: i32,
274}
275#[derive(prost_helpers::AnyPB)]
276#[derive(Clone, PartialEq, ::prost::Message)]
277pub struct StreamingControlStreamResponse {
278    #[prost(
279        oneof = "streaming_control_stream_response::Response",
280        tags = "1, 2, 3, 4, 5"
281    )]
282    pub response: ::core::option::Option<streaming_control_stream_response::Response>,
283}
284/// Nested message and enum types in `StreamingControlStreamResponse`.
285pub mod streaming_control_stream_response {
286    #[derive(prost_helpers::AnyPB)]
287    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
288    pub struct InitResponse {}
289    #[derive(prost_helpers::AnyPB)]
290    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
291    pub struct ShutdownResponse {}
292    #[derive(prost_helpers::AnyPB)]
293    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
294    pub struct ReportDatabaseFailureResponse {
295        #[prost(uint32, tag = "1", wrapper = "crate::id::DatabaseId")]
296        pub database_id: crate::id::DatabaseId,
297    }
298    #[derive(prost_helpers::AnyPB)]
299    #[derive(Clone, PartialEq, ::prost::Message)]
300    pub struct ResetDatabaseResponse {
301        #[prost(uint32, tag = "1", wrapper = "crate::id::DatabaseId")]
302        pub database_id: crate::id::DatabaseId,
303        #[prost(message, optional, tag = "2")]
304        pub root_err: ::core::option::Option<super::ScoredError>,
305        #[prost(uint32, tag = "3")]
306        pub reset_request_id: u32,
307    }
308    #[derive(prost_helpers::AnyPB)]
309    #[derive(Clone, PartialEq, ::prost::Oneof)]
310    pub enum Response {
311        #[prost(message, tag = "1")]
312        Init(InitResponse),
313        #[prost(message, tag = "2")]
314        CompleteBarrier(super::BarrierCompleteResponse),
315        #[prost(message, tag = "3")]
316        Shutdown(ShutdownResponse),
317        #[prost(message, tag = "4")]
318        ReportDatabaseFailure(ReportDatabaseFailureResponse),
319        #[prost(message, tag = "5")]
320        ResetDatabase(ResetDatabaseResponse),
321    }
322}
323#[derive(prost_helpers::AnyPB)]
324#[derive(Clone, Copy, PartialEq, ::prost::Message)]
325pub struct GetMinUncommittedObjectIdRequest {}
326#[derive(prost_helpers::AnyPB)]
327#[derive(Clone, Copy, PartialEq, ::prost::Message)]
328pub struct GetMinUncommittedObjectIdResponse {
329    #[prost(uint64, tag = "1")]
330    pub min_uncommitted_object_id: u64,
331}
332/// Generated client implementations.
333pub mod stream_service_client {
334    #![allow(
335        unused_variables,
336        dead_code,
337        missing_docs,
338        clippy::wildcard_imports,
339        clippy::let_unit_value,
340    )]
341    use tonic::codegen::*;
342    use tonic::codegen::http::Uri;
343    #[derive(Debug, Clone)]
344    pub struct StreamServiceClient<T> {
345        inner: tonic::client::Grpc<T>,
346    }
347    impl StreamServiceClient<tonic::transport::Channel> {
348        /// Attempt to create a new client by connecting to a given endpoint.
349        pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
350        where
351            D: TryInto<tonic::transport::Endpoint>,
352            D::Error: Into<StdError>,
353        {
354            let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
355            Ok(Self::new(conn))
356        }
357    }
358    impl<T> StreamServiceClient<T>
359    where
360        T: tonic::client::GrpcService<tonic::body::BoxBody>,
361        T::Error: Into<StdError>,
362        T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
363        <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
364    {
365        pub fn new(inner: T) -> Self {
366            let inner = tonic::client::Grpc::new(inner);
367            Self { inner }
368        }
369        pub fn with_origin(inner: T, origin: Uri) -> Self {
370            let inner = tonic::client::Grpc::with_origin(inner, origin);
371            Self { inner }
372        }
373        pub fn with_interceptor<F>(
374            inner: T,
375            interceptor: F,
376        ) -> StreamServiceClient<InterceptedService<T, F>>
377        where
378            F: tonic::service::Interceptor,
379            T::ResponseBody: Default,
380            T: tonic::codegen::Service<
381                http::Request<tonic::body::BoxBody>,
382                Response = http::Response<
383                    <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
384                >,
385            >,
386            <T as tonic::codegen::Service<
387                http::Request<tonic::body::BoxBody>,
388            >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
389        {
390            StreamServiceClient::new(InterceptedService::new(inner, interceptor))
391        }
392        /// Compress requests with the given encoding.
393        ///
394        /// This requires the server to support it otherwise it might respond with an
395        /// error.
396        #[must_use]
397        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
398            self.inner = self.inner.send_compressed(encoding);
399            self
400        }
401        /// Enable decompressing responses.
402        #[must_use]
403        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
404            self.inner = self.inner.accept_compressed(encoding);
405            self
406        }
407        /// Limits the maximum size of a decoded message.
408        ///
409        /// Default: `4MB`
410        #[must_use]
411        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
412            self.inner = self.inner.max_decoding_message_size(limit);
413            self
414        }
415        /// Limits the maximum size of an encoded message.
416        ///
417        /// Default: `usize::MAX`
418        #[must_use]
419        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
420            self.inner = self.inner.max_encoding_message_size(limit);
421            self
422        }
423        pub async fn streaming_control_stream(
424            &mut self,
425            request: impl tonic::IntoStreamingRequest<
426                Message = super::StreamingControlStreamRequest,
427            >,
428        ) -> std::result::Result<
429            tonic::Response<
430                tonic::codec::Streaming<super::StreamingControlStreamResponse>,
431            >,
432            tonic::Status,
433        > {
434            self.inner
435                .ready()
436                .await
437                .map_err(|e| {
438                    tonic::Status::unknown(
439                        format!("Service was not ready: {}", e.into()),
440                    )
441                })?;
442            let codec = tonic::codec::ProstCodec::default();
443            let path = http::uri::PathAndQuery::from_static(
444                "/stream_service.StreamService/StreamingControlStream",
445            );
446            let mut req = request.into_streaming_request();
447            req.extensions_mut()
448                .insert(
449                    GrpcMethod::new(
450                        "stream_service.StreamService",
451                        "StreamingControlStream",
452                    ),
453                );
454            self.inner.streaming(req, path, codec).await
455        }
456        pub async fn get_min_uncommitted_object_id(
457            &mut self,
458            request: impl tonic::IntoRequest<super::GetMinUncommittedObjectIdRequest>,
459        ) -> std::result::Result<
460            tonic::Response<super::GetMinUncommittedObjectIdResponse>,
461            tonic::Status,
462        > {
463            self.inner
464                .ready()
465                .await
466                .map_err(|e| {
467                    tonic::Status::unknown(
468                        format!("Service was not ready: {}", e.into()),
469                    )
470                })?;
471            let codec = tonic::codec::ProstCodec::default();
472            let path = http::uri::PathAndQuery::from_static(
473                "/stream_service.StreamService/GetMinUncommittedObjectId",
474            );
475            let mut req = request.into_request();
476            req.extensions_mut()
477                .insert(
478                    GrpcMethod::new(
479                        "stream_service.StreamService",
480                        "GetMinUncommittedObjectId",
481                    ),
482                );
483            self.inner.unary(req, path, codec).await
484        }
485    }
486}
487/// Generated server implementations.
488pub mod stream_service_server {
489    #![allow(
490        unused_variables,
491        dead_code,
492        missing_docs,
493        clippy::wildcard_imports,
494        clippy::let_unit_value,
495    )]
496    use tonic::codegen::*;
497    /// Generated trait containing gRPC methods that should be implemented for use with StreamServiceServer.
498    #[async_trait]
499    pub trait StreamService: std::marker::Send + std::marker::Sync + 'static {
500        /// Server streaming response type for the StreamingControlStream method.
501        type StreamingControlStreamStream: tonic::codegen::tokio_stream::Stream<
502                Item = std::result::Result<
503                    super::StreamingControlStreamResponse,
504                    tonic::Status,
505                >,
506            >
507            + std::marker::Send
508            + 'static;
509        async fn streaming_control_stream(
510            &self,
511            request: tonic::Request<
512                tonic::Streaming<super::StreamingControlStreamRequest>,
513            >,
514        ) -> std::result::Result<
515            tonic::Response<Self::StreamingControlStreamStream>,
516            tonic::Status,
517        >;
518        async fn get_min_uncommitted_object_id(
519            &self,
520            request: tonic::Request<super::GetMinUncommittedObjectIdRequest>,
521        ) -> std::result::Result<
522            tonic::Response<super::GetMinUncommittedObjectIdResponse>,
523            tonic::Status,
524        >;
525    }
526    #[derive(Debug)]
527    pub struct StreamServiceServer<T> {
528        inner: Arc<T>,
529        accept_compression_encodings: EnabledCompressionEncodings,
530        send_compression_encodings: EnabledCompressionEncodings,
531        max_decoding_message_size: Option<usize>,
532        max_encoding_message_size: Option<usize>,
533    }
534    impl<T> StreamServiceServer<T> {
535        pub fn new(inner: T) -> Self {
536            Self::from_arc(Arc::new(inner))
537        }
538        pub fn from_arc(inner: Arc<T>) -> Self {
539            Self {
540                inner,
541                accept_compression_encodings: Default::default(),
542                send_compression_encodings: Default::default(),
543                max_decoding_message_size: None,
544                max_encoding_message_size: None,
545            }
546        }
547        pub fn with_interceptor<F>(
548            inner: T,
549            interceptor: F,
550        ) -> InterceptedService<Self, F>
551        where
552            F: tonic::service::Interceptor,
553        {
554            InterceptedService::new(Self::new(inner), interceptor)
555        }
556        /// Enable decompressing requests with the given encoding.
557        #[must_use]
558        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
559            self.accept_compression_encodings.enable(encoding);
560            self
561        }
562        /// Compress responses with the given encoding, if the client supports it.
563        #[must_use]
564        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
565            self.send_compression_encodings.enable(encoding);
566            self
567        }
568        /// Limits the maximum size of a decoded message.
569        ///
570        /// Default: `4MB`
571        #[must_use]
572        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
573            self.max_decoding_message_size = Some(limit);
574            self
575        }
576        /// Limits the maximum size of an encoded message.
577        ///
578        /// Default: `usize::MAX`
579        #[must_use]
580        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
581            self.max_encoding_message_size = Some(limit);
582            self
583        }
584    }
585    impl<T, B> tonic::codegen::Service<http::Request<B>> for StreamServiceServer<T>
586    where
587        T: StreamService,
588        B: Body + std::marker::Send + 'static,
589        B::Error: Into<StdError> + std::marker::Send + 'static,
590    {
591        type Response = http::Response<tonic::body::BoxBody>;
592        type Error = std::convert::Infallible;
593        type Future = BoxFuture<Self::Response, Self::Error>;
594        fn poll_ready(
595            &mut self,
596            _cx: &mut Context<'_>,
597        ) -> Poll<std::result::Result<(), Self::Error>> {
598            Poll::Ready(Ok(()))
599        }
600        fn call(&mut self, req: http::Request<B>) -> Self::Future {
601            match req.uri().path() {
602                "/stream_service.StreamService/StreamingControlStream" => {
603                    #[allow(non_camel_case_types)]
604                    struct StreamingControlStreamSvc<T: StreamService>(pub Arc<T>);
605                    impl<
606                        T: StreamService,
607                    > tonic::server::StreamingService<
608                        super::StreamingControlStreamRequest,
609                    > for StreamingControlStreamSvc<T> {
610                        type Response = super::StreamingControlStreamResponse;
611                        type ResponseStream = T::StreamingControlStreamStream;
612                        type Future = BoxFuture<
613                            tonic::Response<Self::ResponseStream>,
614                            tonic::Status,
615                        >;
616                        fn call(
617                            &mut self,
618                            request: tonic::Request<
619                                tonic::Streaming<super::StreamingControlStreamRequest>,
620                            >,
621                        ) -> Self::Future {
622                            let inner = Arc::clone(&self.0);
623                            let fut = async move {
624                                <T as StreamService>::streaming_control_stream(
625                                        &inner,
626                                        request,
627                                    )
628                                    .await
629                            };
630                            Box::pin(fut)
631                        }
632                    }
633                    let accept_compression_encodings = self.accept_compression_encodings;
634                    let send_compression_encodings = self.send_compression_encodings;
635                    let max_decoding_message_size = self.max_decoding_message_size;
636                    let max_encoding_message_size = self.max_encoding_message_size;
637                    let inner = self.inner.clone();
638                    let fut = async move {
639                        let method = StreamingControlStreamSvc(inner);
640                        let codec = tonic::codec::ProstCodec::default();
641                        let mut grpc = tonic::server::Grpc::new(codec)
642                            .apply_compression_config(
643                                accept_compression_encodings,
644                                send_compression_encodings,
645                            )
646                            .apply_max_message_size_config(
647                                max_decoding_message_size,
648                                max_encoding_message_size,
649                            );
650                        let res = grpc.streaming(method, req).await;
651                        Ok(res)
652                    };
653                    Box::pin(fut)
654                }
655                "/stream_service.StreamService/GetMinUncommittedObjectId" => {
656                    #[allow(non_camel_case_types)]
657                    struct GetMinUncommittedObjectIdSvc<T: StreamService>(pub Arc<T>);
658                    impl<
659                        T: StreamService,
660                    > tonic::server::UnaryService<
661                        super::GetMinUncommittedObjectIdRequest,
662                    > for GetMinUncommittedObjectIdSvc<T> {
663                        type Response = super::GetMinUncommittedObjectIdResponse;
664                        type Future = BoxFuture<
665                            tonic::Response<Self::Response>,
666                            tonic::Status,
667                        >;
668                        fn call(
669                            &mut self,
670                            request: tonic::Request<
671                                super::GetMinUncommittedObjectIdRequest,
672                            >,
673                        ) -> Self::Future {
674                            let inner = Arc::clone(&self.0);
675                            let fut = async move {
676                                <T as StreamService>::get_min_uncommitted_object_id(
677                                        &inner,
678                                        request,
679                                    )
680                                    .await
681                            };
682                            Box::pin(fut)
683                        }
684                    }
685                    let accept_compression_encodings = self.accept_compression_encodings;
686                    let send_compression_encodings = self.send_compression_encodings;
687                    let max_decoding_message_size = self.max_decoding_message_size;
688                    let max_encoding_message_size = self.max_encoding_message_size;
689                    let inner = self.inner.clone();
690                    let fut = async move {
691                        let method = GetMinUncommittedObjectIdSvc(inner);
692                        let codec = tonic::codec::ProstCodec::default();
693                        let mut grpc = tonic::server::Grpc::new(codec)
694                            .apply_compression_config(
695                                accept_compression_encodings,
696                                send_compression_encodings,
697                            )
698                            .apply_max_message_size_config(
699                                max_decoding_message_size,
700                                max_encoding_message_size,
701                            );
702                        let res = grpc.unary(method, req).await;
703                        Ok(res)
704                    };
705                    Box::pin(fut)
706                }
707                _ => {
708                    Box::pin(async move {
709                        let mut response = http::Response::new(empty_body());
710                        let headers = response.headers_mut();
711                        headers
712                            .insert(
713                                tonic::Status::GRPC_STATUS,
714                                (tonic::Code::Unimplemented as i32).into(),
715                            );
716                        headers
717                            .insert(
718                                http::header::CONTENT_TYPE,
719                                tonic::metadata::GRPC_CONTENT_TYPE,
720                            );
721                        Ok(response)
722                    })
723                }
724            }
725        }
726    }
727    impl<T> Clone for StreamServiceServer<T> {
728        fn clone(&self) -> Self {
729            let inner = self.inner.clone();
730            Self {
731                inner,
732                accept_compression_encodings: self.accept_compression_encodings,
733                send_compression_encodings: self.send_compression_encodings,
734                max_decoding_message_size: self.max_decoding_message_size,
735                max_encoding_message_size: self.max_encoding_message_size,
736            }
737        }
738    }
739    /// Generated gRPC service name
740    pub const SERVICE_NAME: &str = "stream_service.StreamService";
741    impl<T> tonic::server::NamedService for StreamServiceServer<T> {
742        const NAME: &'static str = SERVICE_NAME;
743    }
744}