risingwave_pb/
stream_service.rs

1// This file is @generated by prost-build.
2#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct InjectBarrierRequest {
5    #[prost(string, tag = "1")]
6    pub request_id: ::prost::alloc::string::String,
7    #[prost(message, optional, tag = "2")]
8    pub barrier: ::core::option::Option<super::stream_plan::Barrier>,
9    #[prost(uint32, tag = "3", wrapper = "crate::id::DatabaseId")]
10    pub database_id: crate::id::DatabaseId,
11    #[prost(uint32, repeated, tag = "4")]
12    pub actor_ids_to_collect: ::prost::alloc::vec::Vec<u32>,
13    #[prost(uint32, repeated, tag = "5", wrapper = "crate::id::TableId")]
14    pub table_ids_to_sync: ::prost::alloc::vec::Vec<crate::id::TableId>,
15    #[prost(uint32, tag = "6")]
16    pub partial_graph_id: u32,
17    #[prost(message, repeated, tag = "9")]
18    pub actors_to_build: ::prost::alloc::vec::Vec<
19        inject_barrier_request::FragmentBuildActorInfo,
20    >,
21}
22/// Nested message and enum types in `InjectBarrierRequest`.
23pub mod inject_barrier_request {
24    #[derive(prost_helpers::AnyPB)]
25    #[derive(Clone, PartialEq, ::prost::Message)]
26    pub struct FragmentBuildActorInfo {
27        #[prost(uint32, tag = "1")]
28        pub fragment_id: u32,
29        #[prost(message, optional, tag = "2")]
30        pub node: ::core::option::Option<super::super::stream_plan::StreamNode>,
31        #[prost(message, repeated, tag = "3")]
32        pub actors: ::prost::alloc::vec::Vec<BuildActorInfo>,
33    }
34    #[derive(prost_helpers::AnyPB)]
35    #[derive(Clone, PartialEq, ::prost::Message)]
36    pub struct BuildActorInfo {
37        #[prost(uint32, tag = "1")]
38        pub actor_id: u32,
39        #[prost(map = "uint32, message", tag = "2")]
40        pub fragment_upstreams: ::std::collections::HashMap<
41            u32,
42            build_actor_info::UpstreamActors,
43        >,
44        #[prost(message, repeated, tag = "3")]
45        pub dispatchers: ::prost::alloc::vec::Vec<super::super::stream_plan::Dispatcher>,
46        #[prost(message, optional, tag = "4")]
47        pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
48        #[prost(string, tag = "5")]
49        pub mview_definition: ::prost::alloc::string::String,
50        #[prost(message, optional, tag = "6")]
51        pub expr_context: ::core::option::Option<super::super::plan_common::ExprContext>,
52        #[prost(uint32, repeated, tag = "7")]
53        pub initial_subscriber_ids: ::prost::alloc::vec::Vec<u32>,
54    }
55    /// Nested message and enum types in `BuildActorInfo`.
56    pub mod build_actor_info {
57        #[derive(prost_helpers::AnyPB)]
58        #[derive(Clone, PartialEq, ::prost::Message)]
59        pub struct UpstreamActors {
60            #[prost(message, repeated, tag = "1")]
61            pub actors: ::prost::alloc::vec::Vec<super::super::super::common::ActorInfo>,
62        }
63    }
64}
65#[derive(prost_helpers::AnyPB)]
66#[derive(Clone, PartialEq, ::prost::Message)]
67pub struct BarrierCompleteResponse {
68    #[prost(string, tag = "1")]
69    pub request_id: ::prost::alloc::string::String,
70    #[prost(message, optional, tag = "2")]
71    pub status: ::core::option::Option<super::common::Status>,
72    #[prost(message, repeated, tag = "3")]
73    pub create_mview_progress: ::prost::alloc::vec::Vec<
74        barrier_complete_response::CreateMviewProgress,
75    >,
76    #[prost(message, repeated, tag = "4")]
77    pub synced_sstables: ::prost::alloc::vec::Vec<
78        barrier_complete_response::LocalSstableInfo,
79    >,
80    #[prost(uint32, tag = "5")]
81    pub worker_id: u32,
82    #[prost(map = "uint32, message", tag = "6", wrapper = "crate::id::TableId")]
83    pub table_watermarks: ::std::collections::HashMap<
84        crate::id::TableId,
85        super::hummock::TableWatermarks,
86    >,
87    #[prost(message, repeated, tag = "7")]
88    pub old_value_sstables: ::prost::alloc::vec::Vec<super::hummock::SstableInfo>,
89    #[prost(uint32, tag = "8")]
90    pub partial_graph_id: u32,
91    /// prev_epoch of barrier
92    #[prost(uint64, tag = "9")]
93    pub epoch: u64,
94    #[prost(uint32, tag = "10", wrapper = "crate::id::DatabaseId")]
95    pub database_id: crate::id::DatabaseId,
96    #[prost(message, repeated, tag = "11")]
97    pub load_finished_sources: ::prost::alloc::vec::Vec<
98        barrier_complete_response::LoadFinishedSource,
99    >,
100    #[prost(map = "uint32, message", tag = "12", wrapper = "crate::id::TableId")]
101    pub vector_index_adds: ::std::collections::HashMap<
102        crate::id::TableId,
103        super::hummock::vector_index_delta::VectorIndexAdds,
104    >,
105    #[prost(message, repeated, tag = "13")]
106    pub cdc_table_backfill_progress: ::prost::alloc::vec::Vec<
107        barrier_complete_response::CdcTableBackfillProgress,
108    >,
109    /// Used for truncating tables in storage layer.
110    /// MaterializeExecutor reports the tables to truncate, and then
111    /// meta will apply truncate operation in next commit epoch.
112    #[prost(uint32, repeated, tag = "14", wrapper = "crate::id::TableId")]
113    pub truncate_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
114    /// Used for reporting materialized view refresh completion.
115    /// MaterializeExecutor reports when refresh has finished, and then
116    /// meta will update the table's refresh state to Finished.
117    #[prost(uint32, repeated, tag = "15", wrapper = "crate::id::TableId")]
118    pub refresh_finished_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
119    #[prost(message, repeated, tag = "16")]
120    pub list_finished_sources: ::prost::alloc::vec::Vec<
121        barrier_complete_response::ListFinishedSource,
122    >,
123}
124/// Nested message and enum types in `BarrierCompleteResponse`.
125pub mod barrier_complete_response {
126    #[derive(prost_helpers::AnyPB)]
127    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
128    pub struct CreateMviewProgress {
129        /// Note: ideally we should use `executor_id`, but `actor_id` is ok-ish.
130        /// See <<https://github.com/risingwavelabs/risingwave/issues/6236>.>
131        #[prost(uint32, tag = "1")]
132        pub backfill_actor_id: u32,
133        #[prost(bool, tag = "2")]
134        pub done: bool,
135        /// MV backfill snapshot read epoch (0 for Done / Source backfill)
136        #[prost(uint64, tag = "3")]
137        pub consumed_epoch: u64,
138        /// MV backfill snapshot read rows / Source backfilled rows
139        #[prost(uint64, tag = "4")]
140        pub consumed_rows: u64,
141        #[prost(uint64, tag = "5")]
142        pub pending_epoch_lag: u64,
143        /// Buffered rows that are yet to be consumed (used by locality backfill to report precise progress)
144        #[prost(uint64, tag = "6")]
145        pub buffered_rows: u64,
146    }
147    #[derive(prost_helpers::AnyPB)]
148    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
149    pub struct CdcTableBackfillProgress {
150        #[prost(uint32, tag = "1")]
151        pub actor_id: u32,
152        #[prost(uint64, tag = "2")]
153        pub epoch: u64,
154        #[prost(bool, tag = "3")]
155        pub done: bool,
156        #[prost(int64, tag = "4")]
157        pub split_id_start_inclusive: i64,
158        #[prost(int64, tag = "5")]
159        pub split_id_end_inclusive: i64,
160        #[prost(uint64, tag = "6")]
161        pub generation: u64,
162        #[prost(uint32, tag = "7")]
163        pub fragment_id: u32,
164    }
165    #[derive(prost_helpers::AnyPB)]
166    #[derive(Clone, PartialEq, ::prost::Message)]
167    pub struct LocalSstableInfo {
168        #[prost(message, optional, tag = "2")]
169        pub sst: ::core::option::Option<super::super::hummock::SstableInfo>,
170        #[prost(map = "uint32, message", tag = "3", wrapper = "crate::id::TableId")]
171        pub table_stats_map: ::std::collections::HashMap<
172            crate::id::TableId,
173            super::super::hummock::TableStats,
174        >,
175        #[prost(uint64, tag = "4")]
176        pub created_at: u64,
177    }
178    /// Used for refreshable batch source.
179    /// SourceExecutor reports the source load is finished, and then
180    /// meta will issue a LoadFinish barrier to notify MaterializeExecutor to start diff calculation.
181    #[derive(prost_helpers::AnyPB)]
182    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
183    pub struct LoadFinishedSource {
184        /// The actor that reported the completion event.
185        #[prost(uint32, tag = "1")]
186        pub reporter_actor_id: u32,
187        /// The table ID for the refreshable batch source.
188        #[prost(uint32, tag = "2")]
189        pub table_id: u32,
190        /// The source identifier associated with the finished load.
191        #[prost(uint32, tag = "3")]
192        pub associated_source_id: u32,
193    }
194    /// SourceExecutor reports the source list is finished, and then
195    /// meta will issue a ListFinish barrier to notify the source to start loading.
196    #[derive(prost_helpers::AnyPB)]
197    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
198    pub struct ListFinishedSource {
199        /// The actor that reported the completion event.
200        #[prost(uint32, tag = "1")]
201        pub reporter_actor_id: u32,
202        /// The table ID for the refreshable batch source.
203        #[prost(uint32, tag = "2")]
204        pub table_id: u32,
205        /// The source identifier associated with the completed listing.
206        #[prost(uint32, tag = "3")]
207        pub associated_source_id: u32,
208    }
209}
210#[derive(prost_helpers::AnyPB)]
211#[derive(Clone, PartialEq, ::prost::Message)]
212pub struct StreamingControlStreamRequest {
213    #[prost(oneof = "streaming_control_stream_request::Request", tags = "1, 2, 3, 4, 5")]
214    pub request: ::core::option::Option<streaming_control_stream_request::Request>,
215}
216/// Nested message and enum types in `StreamingControlStreamRequest`.
217pub mod streaming_control_stream_request {
218    #[derive(prost_helpers::AnyPB)]
219    #[derive(Clone, PartialEq, ::prost::Message)]
220    pub struct InitRequest {
221        #[prost(string, tag = "1")]
222        pub term_id: ::prost::alloc::string::String,
223    }
224    #[derive(prost_helpers::AnyPB)]
225    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
226    pub struct CreatePartialGraphRequest {
227        #[prost(uint32, tag = "1")]
228        pub partial_graph_id: u32,
229        #[prost(uint32, tag = "2", wrapper = "crate::id::DatabaseId")]
230        pub database_id: crate::id::DatabaseId,
231    }
232    #[derive(prost_helpers::AnyPB)]
233    #[derive(Clone, PartialEq, ::prost::Message)]
234    pub struct RemovePartialGraphRequest {
235        #[prost(uint32, repeated, tag = "1")]
236        pub partial_graph_ids: ::prost::alloc::vec::Vec<u32>,
237        #[prost(uint32, tag = "2", wrapper = "crate::id::DatabaseId")]
238        pub database_id: crate::id::DatabaseId,
239    }
240    #[derive(prost_helpers::AnyPB)]
241    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
242    pub struct ResetDatabaseRequest {
243        #[prost(uint32, tag = "1", wrapper = "crate::id::DatabaseId")]
244        pub database_id: crate::id::DatabaseId,
245        #[prost(uint32, tag = "2")]
246        pub reset_request_id: u32,
247    }
248    #[derive(prost_helpers::AnyPB)]
249    #[derive(Clone, PartialEq, ::prost::Oneof)]
250    pub enum Request {
251        #[prost(message, tag = "1")]
252        Init(InitRequest),
253        #[prost(message, tag = "2")]
254        InjectBarrier(super::InjectBarrierRequest),
255        #[prost(message, tag = "3")]
256        RemovePartialGraph(RemovePartialGraphRequest),
257        #[prost(message, tag = "4")]
258        CreatePartialGraph(CreatePartialGraphRequest),
259        #[prost(message, tag = "5")]
260        ResetDatabase(ResetDatabaseRequest),
261    }
262}
263#[derive(prost_helpers::AnyPB)]
264#[derive(Clone, PartialEq, ::prost::Message)]
265pub struct ScoredError {
266    #[prost(string, tag = "1")]
267    pub err_msg: ::prost::alloc::string::String,
268    #[prost(int32, tag = "2")]
269    pub score: i32,
270}
271#[derive(prost_helpers::AnyPB)]
272#[derive(Clone, PartialEq, ::prost::Message)]
273pub struct StreamingControlStreamResponse {
274    #[prost(
275        oneof = "streaming_control_stream_response::Response",
276        tags = "1, 2, 3, 4, 5"
277    )]
278    pub response: ::core::option::Option<streaming_control_stream_response::Response>,
279}
280/// Nested message and enum types in `StreamingControlStreamResponse`.
281pub mod streaming_control_stream_response {
282    #[derive(prost_helpers::AnyPB)]
283    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
284    pub struct InitResponse {}
285    #[derive(prost_helpers::AnyPB)]
286    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
287    pub struct ShutdownResponse {}
288    #[derive(prost_helpers::AnyPB)]
289    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
290    pub struct ReportDatabaseFailureResponse {
291        #[prost(uint32, tag = "1", wrapper = "crate::id::DatabaseId")]
292        pub database_id: crate::id::DatabaseId,
293    }
294    #[derive(prost_helpers::AnyPB)]
295    #[derive(Clone, PartialEq, ::prost::Message)]
296    pub struct ResetDatabaseResponse {
297        #[prost(uint32, tag = "1", wrapper = "crate::id::DatabaseId")]
298        pub database_id: crate::id::DatabaseId,
299        #[prost(message, optional, tag = "2")]
300        pub root_err: ::core::option::Option<super::ScoredError>,
301        #[prost(uint32, tag = "3")]
302        pub reset_request_id: u32,
303    }
304    #[derive(prost_helpers::AnyPB)]
305    #[derive(Clone, PartialEq, ::prost::Oneof)]
306    pub enum Response {
307        #[prost(message, tag = "1")]
308        Init(InitResponse),
309        #[prost(message, tag = "2")]
310        CompleteBarrier(super::BarrierCompleteResponse),
311        #[prost(message, tag = "3")]
312        Shutdown(ShutdownResponse),
313        #[prost(message, tag = "4")]
314        ReportDatabaseFailure(ReportDatabaseFailureResponse),
315        #[prost(message, tag = "5")]
316        ResetDatabase(ResetDatabaseResponse),
317    }
318}
319#[derive(prost_helpers::AnyPB)]
320#[derive(Clone, Copy, PartialEq, ::prost::Message)]
321pub struct GetMinUncommittedObjectIdRequest {}
322#[derive(prost_helpers::AnyPB)]
323#[derive(Clone, Copy, PartialEq, ::prost::Message)]
324pub struct GetMinUncommittedObjectIdResponse {
325    #[prost(uint64, tag = "1")]
326    pub min_uncommitted_object_id: u64,
327}
328/// Generated client implementations.
329pub mod stream_service_client {
330    #![allow(
331        unused_variables,
332        dead_code,
333        missing_docs,
334        clippy::wildcard_imports,
335        clippy::let_unit_value,
336    )]
337    use tonic::codegen::*;
338    use tonic::codegen::http::Uri;
339    #[derive(Debug, Clone)]
340    pub struct StreamServiceClient<T> {
341        inner: tonic::client::Grpc<T>,
342    }
343    impl StreamServiceClient<tonic::transport::Channel> {
344        /// Attempt to create a new client by connecting to a given endpoint.
345        pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
346        where
347            D: TryInto<tonic::transport::Endpoint>,
348            D::Error: Into<StdError>,
349        {
350            let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
351            Ok(Self::new(conn))
352        }
353    }
354    impl<T> StreamServiceClient<T>
355    where
356        T: tonic::client::GrpcService<tonic::body::BoxBody>,
357        T::Error: Into<StdError>,
358        T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
359        <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
360    {
361        pub fn new(inner: T) -> Self {
362            let inner = tonic::client::Grpc::new(inner);
363            Self { inner }
364        }
365        pub fn with_origin(inner: T, origin: Uri) -> Self {
366            let inner = tonic::client::Grpc::with_origin(inner, origin);
367            Self { inner }
368        }
369        pub fn with_interceptor<F>(
370            inner: T,
371            interceptor: F,
372        ) -> StreamServiceClient<InterceptedService<T, F>>
373        where
374            F: tonic::service::Interceptor,
375            T::ResponseBody: Default,
376            T: tonic::codegen::Service<
377                http::Request<tonic::body::BoxBody>,
378                Response = http::Response<
379                    <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
380                >,
381            >,
382            <T as tonic::codegen::Service<
383                http::Request<tonic::body::BoxBody>,
384            >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
385        {
386            StreamServiceClient::new(InterceptedService::new(inner, interceptor))
387        }
388        /// Compress requests with the given encoding.
389        ///
390        /// This requires the server to support it otherwise it might respond with an
391        /// error.
392        #[must_use]
393        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
394            self.inner = self.inner.send_compressed(encoding);
395            self
396        }
397        /// Enable decompressing responses.
398        #[must_use]
399        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
400            self.inner = self.inner.accept_compressed(encoding);
401            self
402        }
403        /// Limits the maximum size of a decoded message.
404        ///
405        /// Default: `4MB`
406        #[must_use]
407        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
408            self.inner = self.inner.max_decoding_message_size(limit);
409            self
410        }
411        /// Limits the maximum size of an encoded message.
412        ///
413        /// Default: `usize::MAX`
414        #[must_use]
415        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
416            self.inner = self.inner.max_encoding_message_size(limit);
417            self
418        }
419        pub async fn streaming_control_stream(
420            &mut self,
421            request: impl tonic::IntoStreamingRequest<
422                Message = super::StreamingControlStreamRequest,
423            >,
424        ) -> std::result::Result<
425            tonic::Response<
426                tonic::codec::Streaming<super::StreamingControlStreamResponse>,
427            >,
428            tonic::Status,
429        > {
430            self.inner
431                .ready()
432                .await
433                .map_err(|e| {
434                    tonic::Status::unknown(
435                        format!("Service was not ready: {}", e.into()),
436                    )
437                })?;
438            let codec = tonic::codec::ProstCodec::default();
439            let path = http::uri::PathAndQuery::from_static(
440                "/stream_service.StreamService/StreamingControlStream",
441            );
442            let mut req = request.into_streaming_request();
443            req.extensions_mut()
444                .insert(
445                    GrpcMethod::new(
446                        "stream_service.StreamService",
447                        "StreamingControlStream",
448                    ),
449                );
450            self.inner.streaming(req, path, codec).await
451        }
452        pub async fn get_min_uncommitted_object_id(
453            &mut self,
454            request: impl tonic::IntoRequest<super::GetMinUncommittedObjectIdRequest>,
455        ) -> std::result::Result<
456            tonic::Response<super::GetMinUncommittedObjectIdResponse>,
457            tonic::Status,
458        > {
459            self.inner
460                .ready()
461                .await
462                .map_err(|e| {
463                    tonic::Status::unknown(
464                        format!("Service was not ready: {}", e.into()),
465                    )
466                })?;
467            let codec = tonic::codec::ProstCodec::default();
468            let path = http::uri::PathAndQuery::from_static(
469                "/stream_service.StreamService/GetMinUncommittedObjectId",
470            );
471            let mut req = request.into_request();
472            req.extensions_mut()
473                .insert(
474                    GrpcMethod::new(
475                        "stream_service.StreamService",
476                        "GetMinUncommittedObjectId",
477                    ),
478                );
479            self.inner.unary(req, path, codec).await
480        }
481    }
482}
483/// Generated server implementations.
484pub mod stream_service_server {
485    #![allow(
486        unused_variables,
487        dead_code,
488        missing_docs,
489        clippy::wildcard_imports,
490        clippy::let_unit_value,
491    )]
492    use tonic::codegen::*;
493    /// Generated trait containing gRPC methods that should be implemented for use with StreamServiceServer.
494    #[async_trait]
495    pub trait StreamService: std::marker::Send + std::marker::Sync + 'static {
496        /// Server streaming response type for the StreamingControlStream method.
497        type StreamingControlStreamStream: tonic::codegen::tokio_stream::Stream<
498                Item = std::result::Result<
499                    super::StreamingControlStreamResponse,
500                    tonic::Status,
501                >,
502            >
503            + std::marker::Send
504            + 'static;
505        async fn streaming_control_stream(
506            &self,
507            request: tonic::Request<
508                tonic::Streaming<super::StreamingControlStreamRequest>,
509            >,
510        ) -> std::result::Result<
511            tonic::Response<Self::StreamingControlStreamStream>,
512            tonic::Status,
513        >;
514        async fn get_min_uncommitted_object_id(
515            &self,
516            request: tonic::Request<super::GetMinUncommittedObjectIdRequest>,
517        ) -> std::result::Result<
518            tonic::Response<super::GetMinUncommittedObjectIdResponse>,
519            tonic::Status,
520        >;
521    }
522    #[derive(Debug)]
523    pub struct StreamServiceServer<T> {
524        inner: Arc<T>,
525        accept_compression_encodings: EnabledCompressionEncodings,
526        send_compression_encodings: EnabledCompressionEncodings,
527        max_decoding_message_size: Option<usize>,
528        max_encoding_message_size: Option<usize>,
529    }
530    impl<T> StreamServiceServer<T> {
531        pub fn new(inner: T) -> Self {
532            Self::from_arc(Arc::new(inner))
533        }
534        pub fn from_arc(inner: Arc<T>) -> Self {
535            Self {
536                inner,
537                accept_compression_encodings: Default::default(),
538                send_compression_encodings: Default::default(),
539                max_decoding_message_size: None,
540                max_encoding_message_size: None,
541            }
542        }
543        pub fn with_interceptor<F>(
544            inner: T,
545            interceptor: F,
546        ) -> InterceptedService<Self, F>
547        where
548            F: tonic::service::Interceptor,
549        {
550            InterceptedService::new(Self::new(inner), interceptor)
551        }
552        /// Enable decompressing requests with the given encoding.
553        #[must_use]
554        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
555            self.accept_compression_encodings.enable(encoding);
556            self
557        }
558        /// Compress responses with the given encoding, if the client supports it.
559        #[must_use]
560        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
561            self.send_compression_encodings.enable(encoding);
562            self
563        }
564        /// Limits the maximum size of a decoded message.
565        ///
566        /// Default: `4MB`
567        #[must_use]
568        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
569            self.max_decoding_message_size = Some(limit);
570            self
571        }
572        /// Limits the maximum size of an encoded message.
573        ///
574        /// Default: `usize::MAX`
575        #[must_use]
576        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
577            self.max_encoding_message_size = Some(limit);
578            self
579        }
580    }
581    impl<T, B> tonic::codegen::Service<http::Request<B>> for StreamServiceServer<T>
582    where
583        T: StreamService,
584        B: Body + std::marker::Send + 'static,
585        B::Error: Into<StdError> + std::marker::Send + 'static,
586    {
587        type Response = http::Response<tonic::body::BoxBody>;
588        type Error = std::convert::Infallible;
589        type Future = BoxFuture<Self::Response, Self::Error>;
590        fn poll_ready(
591            &mut self,
592            _cx: &mut Context<'_>,
593        ) -> Poll<std::result::Result<(), Self::Error>> {
594            Poll::Ready(Ok(()))
595        }
596        fn call(&mut self, req: http::Request<B>) -> Self::Future {
597            match req.uri().path() {
598                "/stream_service.StreamService/StreamingControlStream" => {
599                    #[allow(non_camel_case_types)]
600                    struct StreamingControlStreamSvc<T: StreamService>(pub Arc<T>);
601                    impl<
602                        T: StreamService,
603                    > tonic::server::StreamingService<
604                        super::StreamingControlStreamRequest,
605                    > for StreamingControlStreamSvc<T> {
606                        type Response = super::StreamingControlStreamResponse;
607                        type ResponseStream = T::StreamingControlStreamStream;
608                        type Future = BoxFuture<
609                            tonic::Response<Self::ResponseStream>,
610                            tonic::Status,
611                        >;
612                        fn call(
613                            &mut self,
614                            request: tonic::Request<
615                                tonic::Streaming<super::StreamingControlStreamRequest>,
616                            >,
617                        ) -> Self::Future {
618                            let inner = Arc::clone(&self.0);
619                            let fut = async move {
620                                <T as StreamService>::streaming_control_stream(
621                                        &inner,
622                                        request,
623                                    )
624                                    .await
625                            };
626                            Box::pin(fut)
627                        }
628                    }
629                    let accept_compression_encodings = self.accept_compression_encodings;
630                    let send_compression_encodings = self.send_compression_encodings;
631                    let max_decoding_message_size = self.max_decoding_message_size;
632                    let max_encoding_message_size = self.max_encoding_message_size;
633                    let inner = self.inner.clone();
634                    let fut = async move {
635                        let method = StreamingControlStreamSvc(inner);
636                        let codec = tonic::codec::ProstCodec::default();
637                        let mut grpc = tonic::server::Grpc::new(codec)
638                            .apply_compression_config(
639                                accept_compression_encodings,
640                                send_compression_encodings,
641                            )
642                            .apply_max_message_size_config(
643                                max_decoding_message_size,
644                                max_encoding_message_size,
645                            );
646                        let res = grpc.streaming(method, req).await;
647                        Ok(res)
648                    };
649                    Box::pin(fut)
650                }
651                "/stream_service.StreamService/GetMinUncommittedObjectId" => {
652                    #[allow(non_camel_case_types)]
653                    struct GetMinUncommittedObjectIdSvc<T: StreamService>(pub Arc<T>);
654                    impl<
655                        T: StreamService,
656                    > tonic::server::UnaryService<
657                        super::GetMinUncommittedObjectIdRequest,
658                    > for GetMinUncommittedObjectIdSvc<T> {
659                        type Response = super::GetMinUncommittedObjectIdResponse;
660                        type Future = BoxFuture<
661                            tonic::Response<Self::Response>,
662                            tonic::Status,
663                        >;
664                        fn call(
665                            &mut self,
666                            request: tonic::Request<
667                                super::GetMinUncommittedObjectIdRequest,
668                            >,
669                        ) -> Self::Future {
670                            let inner = Arc::clone(&self.0);
671                            let fut = async move {
672                                <T as StreamService>::get_min_uncommitted_object_id(
673                                        &inner,
674                                        request,
675                                    )
676                                    .await
677                            };
678                            Box::pin(fut)
679                        }
680                    }
681                    let accept_compression_encodings = self.accept_compression_encodings;
682                    let send_compression_encodings = self.send_compression_encodings;
683                    let max_decoding_message_size = self.max_decoding_message_size;
684                    let max_encoding_message_size = self.max_encoding_message_size;
685                    let inner = self.inner.clone();
686                    let fut = async move {
687                        let method = GetMinUncommittedObjectIdSvc(inner);
688                        let codec = tonic::codec::ProstCodec::default();
689                        let mut grpc = tonic::server::Grpc::new(codec)
690                            .apply_compression_config(
691                                accept_compression_encodings,
692                                send_compression_encodings,
693                            )
694                            .apply_max_message_size_config(
695                                max_decoding_message_size,
696                                max_encoding_message_size,
697                            );
698                        let res = grpc.unary(method, req).await;
699                        Ok(res)
700                    };
701                    Box::pin(fut)
702                }
703                _ => {
704                    Box::pin(async move {
705                        let mut response = http::Response::new(empty_body());
706                        let headers = response.headers_mut();
707                        headers
708                            .insert(
709                                tonic::Status::GRPC_STATUS,
710                                (tonic::Code::Unimplemented as i32).into(),
711                            );
712                        headers
713                            .insert(
714                                http::header::CONTENT_TYPE,
715                                tonic::metadata::GRPC_CONTENT_TYPE,
716                            );
717                        Ok(response)
718                    })
719                }
720            }
721        }
722    }
723    impl<T> Clone for StreamServiceServer<T> {
724        fn clone(&self) -> Self {
725            let inner = self.inner.clone();
726            Self {
727                inner,
728                accept_compression_encodings: self.accept_compression_encodings,
729                send_compression_encodings: self.send_compression_encodings,
730                max_decoding_message_size: self.max_decoding_message_size,
731                max_encoding_message_size: self.max_encoding_message_size,
732            }
733        }
734    }
735    /// Generated gRPC service name
736    pub const SERVICE_NAME: &str = "stream_service.StreamService";
737    impl<T> tonic::server::NamedService for StreamServiceServer<T> {
738        const NAME: &'static str = SERVICE_NAME;
739    }
740}