risingwave_pb/
stream_service.rs

1// This file is @generated by prost-build.
2#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct InjectBarrierRequest {
5    #[prost(string, tag = "1")]
6    pub request_id: ::prost::alloc::string::String,
7    #[prost(message, optional, tag = "2")]
8    pub barrier: ::core::option::Option<super::stream_plan::Barrier>,
9    #[prost(uint32, repeated, tag = "4", wrapper = "crate::id::ActorId")]
10    pub actor_ids_to_collect: ::prost::alloc::vec::Vec<crate::id::ActorId>,
11    #[prost(uint32, repeated, tag = "5", wrapper = "crate::id::TableId")]
12    pub table_ids_to_sync: ::prost::alloc::vec::Vec<crate::id::TableId>,
13    #[prost(uint64, tag = "6", wrapper = "crate::id::PartialGraphId")]
14    pub partial_graph_id: crate::id::PartialGraphId,
15    #[prost(message, repeated, tag = "9")]
16    pub actors_to_build: ::prost::alloc::vec::Vec<
17        inject_barrier_request::FragmentBuildActorInfo,
18    >,
19}
20/// Nested message and enum types in `InjectBarrierRequest`.
21pub mod inject_barrier_request {
22    #[derive(prost_helpers::AnyPB)]
23    #[derive(Clone, PartialEq, ::prost::Message)]
24    pub struct FragmentBuildActorInfo {
25        #[prost(uint32, tag = "1", wrapper = "crate::id::FragmentId")]
26        pub fragment_id: crate::id::FragmentId,
27        #[prost(message, optional, tag = "2")]
28        pub node: ::core::option::Option<super::super::stream_plan::StreamNode>,
29        #[prost(message, repeated, tag = "3")]
30        pub actors: ::prost::alloc::vec::Vec<BuildActorInfo>,
31    }
32    #[derive(prost_helpers::AnyPB)]
33    #[derive(Clone, PartialEq, ::prost::Message)]
34    pub struct BuildActorInfo {
35        #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
36        pub actor_id: crate::id::ActorId,
37        #[prost(map = "uint32, message", tag = "2", wrapper = "crate::id::FragmentId")]
38        pub fragment_upstreams: ::std::collections::HashMap<
39            crate::id::FragmentId,
40            build_actor_info::UpstreamActors,
41        >,
42        #[prost(message, repeated, tag = "3")]
43        pub dispatchers: ::prost::alloc::vec::Vec<super::super::stream_plan::Dispatcher>,
44        #[prost(message, optional, tag = "4")]
45        pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
46        #[prost(string, tag = "5")]
47        pub mview_definition: ::prost::alloc::string::String,
48        #[prost(message, optional, tag = "6")]
49        pub expr_context: ::core::option::Option<super::super::plan_common::ExprContext>,
50        #[prost(string, tag = "9")]
51        pub config_override: ::prost::alloc::string::String,
52        #[prost(uint32, repeated, tag = "7", wrapper = "crate::id::SubscriberId")]
53        pub initial_subscriber_ids: ::prost::alloc::vec::Vec<crate::id::SubscriberId>,
54    }
55    /// Nested message and enum types in `BuildActorInfo`.
56    pub mod build_actor_info {
57        #[derive(prost_helpers::AnyPB)]
58        #[derive(Clone, PartialEq, ::prost::Message)]
59        pub struct UpstreamActors {
60            #[prost(message, repeated, tag = "1")]
61            pub actors: ::prost::alloc::vec::Vec<super::super::super::common::ActorInfo>,
62        }
63    }
64}
65#[derive(prost_helpers::AnyPB)]
66#[derive(Clone, PartialEq, ::prost::Message)]
67pub struct BarrierCompleteResponse {
68    #[prost(string, tag = "1")]
69    pub request_id: ::prost::alloc::string::String,
70    #[prost(message, optional, tag = "2")]
71    pub status: ::core::option::Option<super::common::Status>,
72    #[prost(message, repeated, tag = "3")]
73    pub create_mview_progress: ::prost::alloc::vec::Vec<
74        barrier_complete_response::CreateMviewProgress,
75    >,
76    #[prost(message, repeated, tag = "4")]
77    pub synced_sstables: ::prost::alloc::vec::Vec<
78        barrier_complete_response::LocalSstableInfo,
79    >,
80    #[prost(uint32, tag = "5", wrapper = "crate::id::WorkerId")]
81    pub worker_id: crate::id::WorkerId,
82    #[prost(map = "uint32, message", tag = "6", wrapper = "crate::id::TableId")]
83    pub table_watermarks: ::std::collections::HashMap<
84        crate::id::TableId,
85        super::hummock::TableWatermarks,
86    >,
87    #[prost(message, repeated, tag = "7")]
88    pub old_value_sstables: ::prost::alloc::vec::Vec<super::hummock::SstableInfo>,
89    #[prost(uint64, tag = "8", wrapper = "crate::id::PartialGraphId")]
90    pub partial_graph_id: crate::id::PartialGraphId,
91    /// prev_epoch of barrier
92    #[prost(uint64, tag = "9")]
93    pub epoch: u64,
94    #[prost(message, repeated, tag = "11")]
95    pub load_finished_sources: ::prost::alloc::vec::Vec<
96        barrier_complete_response::LoadFinishedSource,
97    >,
98    #[prost(map = "uint32, message", tag = "12", wrapper = "crate::id::TableId")]
99    pub vector_index_adds: ::std::collections::HashMap<
100        crate::id::TableId,
101        super::hummock::vector_index_delta::VectorIndexAdds,
102    >,
103    #[prost(message, repeated, tag = "13")]
104    pub cdc_table_backfill_progress: ::prost::alloc::vec::Vec<
105        barrier_complete_response::CdcTableBackfillProgress,
106    >,
107    /// Used for truncating tables in storage layer.
108    /// MaterializeExecutor reports the tables to truncate, and then
109    /// meta will apply truncate operation in next commit epoch.
110    #[prost(uint32, repeated, tag = "14", wrapper = "crate::id::TableId")]
111    pub truncate_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
112    /// Used for reporting materialized view refresh completion.
113    /// MaterializeExecutor reports when refresh has finished, and then
114    /// meta will update the table's refresh state to Finished.
115    #[prost(uint32, repeated, tag = "15", wrapper = "crate::id::TableId")]
116    pub refresh_finished_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
117    #[prost(message, repeated, tag = "16")]
118    pub list_finished_sources: ::prost::alloc::vec::Vec<
119        barrier_complete_response::ListFinishedSource,
120    >,
121}
122/// Nested message and enum types in `BarrierCompleteResponse`.
123pub mod barrier_complete_response {
124    #[derive(prost_helpers::AnyPB)]
125    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
126    pub struct CreateMviewProgress {
127        /// Note: ideally we should use `executor_id`, but `actor_id` is ok-ish.
128        /// See <<https://github.com/risingwavelabs/risingwave/issues/6236>.>
129        #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
130        pub backfill_actor_id: crate::id::ActorId,
131        #[prost(bool, tag = "2")]
132        pub done: bool,
133        /// MV backfill snapshot read epoch (0 for Done / Source backfill)
134        #[prost(uint64, tag = "3")]
135        pub consumed_epoch: u64,
136        /// MV backfill snapshot read rows / Source backfilled rows
137        #[prost(uint64, tag = "4")]
138        pub consumed_rows: u64,
139        #[prost(uint64, tag = "5")]
140        pub pending_epoch_lag: u64,
141        /// Buffered rows that are yet to be consumed (used by locality backfill to report precise progress)
142        #[prost(uint64, tag = "6")]
143        pub buffered_rows: u64,
144        #[prost(uint32, tag = "7", wrapper = "crate::id::FragmentId")]
145        pub fragment_id: crate::id::FragmentId,
146    }
147    #[derive(prost_helpers::AnyPB)]
148    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
149    pub struct CdcTableBackfillProgress {
150        #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
151        pub actor_id: crate::id::ActorId,
152        #[prost(uint64, tag = "2")]
153        pub epoch: u64,
154        #[prost(bool, tag = "3")]
155        pub done: bool,
156        #[prost(int64, tag = "4")]
157        pub split_id_start_inclusive: i64,
158        #[prost(int64, tag = "5")]
159        pub split_id_end_inclusive: i64,
160        #[prost(uint64, tag = "6")]
161        pub generation: u64,
162        #[prost(uint32, tag = "7", wrapper = "crate::id::FragmentId")]
163        pub fragment_id: crate::id::FragmentId,
164    }
165    #[derive(prost_helpers::AnyPB)]
166    #[derive(Clone, PartialEq, ::prost::Message)]
167    pub struct LocalSstableInfo {
168        #[prost(message, optional, tag = "2")]
169        pub sst: ::core::option::Option<super::super::hummock::SstableInfo>,
170        #[prost(map = "uint32, message", tag = "3", wrapper = "crate::id::TableId")]
171        pub table_stats_map: ::std::collections::HashMap<
172            crate::id::TableId,
173            super::super::hummock::TableStats,
174        >,
175        #[prost(uint64, tag = "4")]
176        pub created_at: u64,
177    }
178    /// Used for refreshable batch source.
179    /// SourceExecutor reports the source load is finished, and then
180    /// meta will issue a LoadFinish barrier to notify MaterializeExecutor to start diff calculation.
181    #[derive(prost_helpers::AnyPB)]
182    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
183    pub struct LoadFinishedSource {
184        /// The actor that reported the completion event.
185        #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
186        pub reporter_actor_id: crate::id::ActorId,
187        /// The table ID for the refreshable batch source.
188        #[prost(uint32, tag = "2", wrapper = "crate::id::TableId")]
189        pub table_id: crate::id::TableId,
190        /// The source identifier associated with the finished load.
191        #[prost(uint32, tag = "3", wrapper = "crate::id::SourceId")]
192        pub associated_source_id: crate::id::SourceId,
193    }
194    /// SourceExecutor reports the source list is finished, and then
195    /// meta will issue a ListFinish barrier to notify the source to start loading.
196    #[derive(prost_helpers::AnyPB)]
197    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
198    pub struct ListFinishedSource {
199        /// The actor that reported the completion event.
200        #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
201        pub reporter_actor_id: crate::id::ActorId,
202        /// The table ID for the refreshable batch source.
203        #[prost(uint32, tag = "2", wrapper = "crate::id::TableId")]
204        pub table_id: crate::id::TableId,
205        /// The source identifier associated with the completed listing.
206        #[prost(uint32, tag = "3", wrapper = "crate::id::SourceId")]
207        pub associated_source_id: crate::id::SourceId,
208    }
209}
210#[derive(prost_helpers::AnyPB)]
211#[derive(Clone, PartialEq, ::prost::Message)]
212pub struct StreamingControlStreamRequest {
213    #[prost(oneof = "streaming_control_stream_request::Request", tags = "1, 2, 3, 4, 5")]
214    pub request: ::core::option::Option<streaming_control_stream_request::Request>,
215}
216/// Nested message and enum types in `StreamingControlStreamRequest`.
217pub mod streaming_control_stream_request {
218    #[derive(prost_helpers::AnyPB)]
219    #[derive(Clone, PartialEq, ::prost::Message)]
220    pub struct InitRequest {
221        #[prost(string, tag = "1")]
222        pub term_id: ::prost::alloc::string::String,
223    }
224    #[derive(prost_helpers::AnyPB)]
225    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
226    pub struct CreatePartialGraphRequest {
227        #[prost(uint64, tag = "1", wrapper = "crate::id::PartialGraphId")]
228        pub partial_graph_id: crate::id::PartialGraphId,
229    }
230    #[derive(prost_helpers::AnyPB)]
231    #[derive(Clone, PartialEq, ::prost::Message)]
232    pub struct RemovePartialGraphRequest {
233        #[prost(uint64, repeated, tag = "1", wrapper = "crate::id::PartialGraphId")]
234        pub partial_graph_ids: ::prost::alloc::vec::Vec<crate::id::PartialGraphId>,
235    }
236    #[derive(prost_helpers::AnyPB)]
237    #[derive(Clone, PartialEq, ::prost::Message)]
238    pub struct ResetPartialGraphsRequest {
239        #[prost(uint64, repeated, tag = "1", wrapper = "crate::id::PartialGraphId")]
240        pub partial_graph_ids: ::prost::alloc::vec::Vec<crate::id::PartialGraphId>,
241    }
242    #[derive(prost_helpers::AnyPB)]
243    #[derive(Clone, PartialEq, ::prost::Oneof)]
244    pub enum Request {
245        #[prost(message, tag = "1")]
246        Init(InitRequest),
247        #[prost(message, tag = "2")]
248        InjectBarrier(super::InjectBarrierRequest),
249        #[prost(message, tag = "3")]
250        RemovePartialGraph(RemovePartialGraphRequest),
251        #[prost(message, tag = "4")]
252        CreatePartialGraph(CreatePartialGraphRequest),
253        #[prost(message, tag = "5")]
254        ResetPartialGraphs(ResetPartialGraphsRequest),
255    }
256}
257#[derive(prost_helpers::AnyPB)]
258#[derive(Clone, PartialEq, ::prost::Message)]
259pub struct ScoredError {
260    #[prost(string, tag = "1")]
261    pub err_msg: ::prost::alloc::string::String,
262    #[prost(int32, tag = "2")]
263    pub score: i32,
264}
265#[derive(prost_helpers::AnyPB)]
266#[derive(Clone, PartialEq, ::prost::Message)]
267pub struct StreamingControlStreamResponse {
268    #[prost(
269        oneof = "streaming_control_stream_response::Response",
270        tags = "1, 2, 3, 4, 5"
271    )]
272    pub response: ::core::option::Option<streaming_control_stream_response::Response>,
273}
274/// Nested message and enum types in `StreamingControlStreamResponse`.
275pub mod streaming_control_stream_response {
276    #[derive(prost_helpers::AnyPB)]
277    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
278    pub struct InitResponse {}
279    #[derive(prost_helpers::AnyPB)]
280    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
281    pub struct ShutdownResponse {}
282    #[derive(prost_helpers::AnyPB)]
283    #[derive(Clone, Copy, PartialEq, ::prost::Message)]
284    pub struct ReportPartialGraphFailureResponse {
285        #[prost(uint64, tag = "1", wrapper = "crate::id::PartialGraphId")]
286        pub partial_graph_id: crate::id::PartialGraphId,
287    }
288    #[derive(prost_helpers::AnyPB)]
289    #[derive(Clone, PartialEq, ::prost::Message)]
290    pub struct ResetPartialGraphResponse {
291        #[prost(uint64, tag = "1", wrapper = "crate::id::PartialGraphId")]
292        pub partial_graph_id: crate::id::PartialGraphId,
293        #[prost(message, optional, tag = "2")]
294        pub root_err: ::core::option::Option<super::ScoredError>,
295    }
296    #[derive(prost_helpers::AnyPB)]
297    #[derive(Clone, PartialEq, ::prost::Oneof)]
298    pub enum Response {
299        #[prost(message, tag = "1")]
300        Init(InitResponse),
301        #[prost(message, tag = "2")]
302        CompleteBarrier(super::BarrierCompleteResponse),
303        #[prost(message, tag = "3")]
304        Shutdown(ShutdownResponse),
305        #[prost(message, tag = "4")]
306        ReportPartialGraphFailure(ReportPartialGraphFailureResponse),
307        #[prost(message, tag = "5")]
308        ResetPartialGraph(ResetPartialGraphResponse),
309    }
310}
311#[derive(prost_helpers::AnyPB)]
312#[derive(Clone, Copy, PartialEq, ::prost::Message)]
313pub struct GetMinUncommittedObjectIdRequest {}
314#[derive(prost_helpers::AnyPB)]
315#[derive(Clone, Copy, PartialEq, ::prost::Message)]
316pub struct GetMinUncommittedObjectIdResponse {
317    #[prost(uint64, tag = "1")]
318    pub min_uncommitted_object_id: u64,
319}
320/// Generated client implementations.
321pub mod stream_service_client {
322    #![allow(
323        unused_variables,
324        dead_code,
325        missing_docs,
326        clippy::wildcard_imports,
327        clippy::let_unit_value,
328    )]
329    use tonic::codegen::*;
330    use tonic::codegen::http::Uri;
331    #[derive(Debug, Clone)]
332    pub struct StreamServiceClient<T> {
333        inner: tonic::client::Grpc<T>,
334    }
335    impl StreamServiceClient<tonic::transport::Channel> {
336        /// Attempt to create a new client by connecting to a given endpoint.
337        pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
338        where
339            D: TryInto<tonic::transport::Endpoint>,
340            D::Error: Into<StdError>,
341        {
342            let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
343            Ok(Self::new(conn))
344        }
345    }
346    impl<T> StreamServiceClient<T>
347    where
348        T: tonic::client::GrpcService<tonic::body::BoxBody>,
349        T::Error: Into<StdError>,
350        T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
351        <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
352    {
353        pub fn new(inner: T) -> Self {
354            let inner = tonic::client::Grpc::new(inner);
355            Self { inner }
356        }
357        pub fn with_origin(inner: T, origin: Uri) -> Self {
358            let inner = tonic::client::Grpc::with_origin(inner, origin);
359            Self { inner }
360        }
361        pub fn with_interceptor<F>(
362            inner: T,
363            interceptor: F,
364        ) -> StreamServiceClient<InterceptedService<T, F>>
365        where
366            F: tonic::service::Interceptor,
367            T::ResponseBody: Default,
368            T: tonic::codegen::Service<
369                http::Request<tonic::body::BoxBody>,
370                Response = http::Response<
371                    <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
372                >,
373            >,
374            <T as tonic::codegen::Service<
375                http::Request<tonic::body::BoxBody>,
376            >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
377        {
378            StreamServiceClient::new(InterceptedService::new(inner, interceptor))
379        }
380        /// Compress requests with the given encoding.
381        ///
382        /// This requires the server to support it otherwise it might respond with an
383        /// error.
384        #[must_use]
385        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
386            self.inner = self.inner.send_compressed(encoding);
387            self
388        }
389        /// Enable decompressing responses.
390        #[must_use]
391        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
392            self.inner = self.inner.accept_compressed(encoding);
393            self
394        }
395        /// Limits the maximum size of a decoded message.
396        ///
397        /// Default: `4MB`
398        #[must_use]
399        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
400            self.inner = self.inner.max_decoding_message_size(limit);
401            self
402        }
403        /// Limits the maximum size of an encoded message.
404        ///
405        /// Default: `usize::MAX`
406        #[must_use]
407        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
408            self.inner = self.inner.max_encoding_message_size(limit);
409            self
410        }
411        pub async fn streaming_control_stream(
412            &mut self,
413            request: impl tonic::IntoStreamingRequest<
414                Message = super::StreamingControlStreamRequest,
415            >,
416        ) -> std::result::Result<
417            tonic::Response<
418                tonic::codec::Streaming<super::StreamingControlStreamResponse>,
419            >,
420            tonic::Status,
421        > {
422            self.inner
423                .ready()
424                .await
425                .map_err(|e| {
426                    tonic::Status::unknown(
427                        format!("Service was not ready: {}", e.into()),
428                    )
429                })?;
430            let codec = tonic::codec::ProstCodec::default();
431            let path = http::uri::PathAndQuery::from_static(
432                "/stream_service.StreamService/StreamingControlStream",
433            );
434            let mut req = request.into_streaming_request();
435            req.extensions_mut()
436                .insert(
437                    GrpcMethod::new(
438                        "stream_service.StreamService",
439                        "StreamingControlStream",
440                    ),
441                );
442            self.inner.streaming(req, path, codec).await
443        }
444        pub async fn get_min_uncommitted_object_id(
445            &mut self,
446            request: impl tonic::IntoRequest<super::GetMinUncommittedObjectIdRequest>,
447        ) -> std::result::Result<
448            tonic::Response<super::GetMinUncommittedObjectIdResponse>,
449            tonic::Status,
450        > {
451            self.inner
452                .ready()
453                .await
454                .map_err(|e| {
455                    tonic::Status::unknown(
456                        format!("Service was not ready: {}", e.into()),
457                    )
458                })?;
459            let codec = tonic::codec::ProstCodec::default();
460            let path = http::uri::PathAndQuery::from_static(
461                "/stream_service.StreamService/GetMinUncommittedObjectId",
462            );
463            let mut req = request.into_request();
464            req.extensions_mut()
465                .insert(
466                    GrpcMethod::new(
467                        "stream_service.StreamService",
468                        "GetMinUncommittedObjectId",
469                    ),
470                );
471            self.inner.unary(req, path, codec).await
472        }
473    }
474}
475/// Generated server implementations.
476pub mod stream_service_server {
477    #![allow(
478        unused_variables,
479        dead_code,
480        missing_docs,
481        clippy::wildcard_imports,
482        clippy::let_unit_value,
483    )]
484    use tonic::codegen::*;
485    /// Generated trait containing gRPC methods that should be implemented for use with StreamServiceServer.
486    #[async_trait]
487    pub trait StreamService: std::marker::Send + std::marker::Sync + 'static {
488        /// Server streaming response type for the StreamingControlStream method.
489        type StreamingControlStreamStream: tonic::codegen::tokio_stream::Stream<
490                Item = std::result::Result<
491                    super::StreamingControlStreamResponse,
492                    tonic::Status,
493                >,
494            >
495            + std::marker::Send
496            + 'static;
497        async fn streaming_control_stream(
498            &self,
499            request: tonic::Request<
500                tonic::Streaming<super::StreamingControlStreamRequest>,
501            >,
502        ) -> std::result::Result<
503            tonic::Response<Self::StreamingControlStreamStream>,
504            tonic::Status,
505        >;
506        async fn get_min_uncommitted_object_id(
507            &self,
508            request: tonic::Request<super::GetMinUncommittedObjectIdRequest>,
509        ) -> std::result::Result<
510            tonic::Response<super::GetMinUncommittedObjectIdResponse>,
511            tonic::Status,
512        >;
513    }
514    #[derive(Debug)]
515    pub struct StreamServiceServer<T> {
516        inner: Arc<T>,
517        accept_compression_encodings: EnabledCompressionEncodings,
518        send_compression_encodings: EnabledCompressionEncodings,
519        max_decoding_message_size: Option<usize>,
520        max_encoding_message_size: Option<usize>,
521    }
522    impl<T> StreamServiceServer<T> {
523        pub fn new(inner: T) -> Self {
524            Self::from_arc(Arc::new(inner))
525        }
526        pub fn from_arc(inner: Arc<T>) -> Self {
527            Self {
528                inner,
529                accept_compression_encodings: Default::default(),
530                send_compression_encodings: Default::default(),
531                max_decoding_message_size: None,
532                max_encoding_message_size: None,
533            }
534        }
535        pub fn with_interceptor<F>(
536            inner: T,
537            interceptor: F,
538        ) -> InterceptedService<Self, F>
539        where
540            F: tonic::service::Interceptor,
541        {
542            InterceptedService::new(Self::new(inner), interceptor)
543        }
544        /// Enable decompressing requests with the given encoding.
545        #[must_use]
546        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
547            self.accept_compression_encodings.enable(encoding);
548            self
549        }
550        /// Compress responses with the given encoding, if the client supports it.
551        #[must_use]
552        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
553            self.send_compression_encodings.enable(encoding);
554            self
555        }
556        /// Limits the maximum size of a decoded message.
557        ///
558        /// Default: `4MB`
559        #[must_use]
560        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
561            self.max_decoding_message_size = Some(limit);
562            self
563        }
564        /// Limits the maximum size of an encoded message.
565        ///
566        /// Default: `usize::MAX`
567        #[must_use]
568        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
569            self.max_encoding_message_size = Some(limit);
570            self
571        }
572    }
573    impl<T, B> tonic::codegen::Service<http::Request<B>> for StreamServiceServer<T>
574    where
575        T: StreamService,
576        B: Body + std::marker::Send + 'static,
577        B::Error: Into<StdError> + std::marker::Send + 'static,
578    {
579        type Response = http::Response<tonic::body::BoxBody>;
580        type Error = std::convert::Infallible;
581        type Future = BoxFuture<Self::Response, Self::Error>;
582        fn poll_ready(
583            &mut self,
584            _cx: &mut Context<'_>,
585        ) -> Poll<std::result::Result<(), Self::Error>> {
586            Poll::Ready(Ok(()))
587        }
588        fn call(&mut self, req: http::Request<B>) -> Self::Future {
589            match req.uri().path() {
590                "/stream_service.StreamService/StreamingControlStream" => {
591                    #[allow(non_camel_case_types)]
592                    struct StreamingControlStreamSvc<T: StreamService>(pub Arc<T>);
593                    impl<
594                        T: StreamService,
595                    > tonic::server::StreamingService<
596                        super::StreamingControlStreamRequest,
597                    > for StreamingControlStreamSvc<T> {
598                        type Response = super::StreamingControlStreamResponse;
599                        type ResponseStream = T::StreamingControlStreamStream;
600                        type Future = BoxFuture<
601                            tonic::Response<Self::ResponseStream>,
602                            tonic::Status,
603                        >;
604                        fn call(
605                            &mut self,
606                            request: tonic::Request<
607                                tonic::Streaming<super::StreamingControlStreamRequest>,
608                            >,
609                        ) -> Self::Future {
610                            let inner = Arc::clone(&self.0);
611                            let fut = async move {
612                                <T as StreamService>::streaming_control_stream(
613                                        &inner,
614                                        request,
615                                    )
616                                    .await
617                            };
618                            Box::pin(fut)
619                        }
620                    }
621                    let accept_compression_encodings = self.accept_compression_encodings;
622                    let send_compression_encodings = self.send_compression_encodings;
623                    let max_decoding_message_size = self.max_decoding_message_size;
624                    let max_encoding_message_size = self.max_encoding_message_size;
625                    let inner = self.inner.clone();
626                    let fut = async move {
627                        let method = StreamingControlStreamSvc(inner);
628                        let codec = tonic::codec::ProstCodec::default();
629                        let mut grpc = tonic::server::Grpc::new(codec)
630                            .apply_compression_config(
631                                accept_compression_encodings,
632                                send_compression_encodings,
633                            )
634                            .apply_max_message_size_config(
635                                max_decoding_message_size,
636                                max_encoding_message_size,
637                            );
638                        let res = grpc.streaming(method, req).await;
639                        Ok(res)
640                    };
641                    Box::pin(fut)
642                }
643                "/stream_service.StreamService/GetMinUncommittedObjectId" => {
644                    #[allow(non_camel_case_types)]
645                    struct GetMinUncommittedObjectIdSvc<T: StreamService>(pub Arc<T>);
646                    impl<
647                        T: StreamService,
648                    > tonic::server::UnaryService<
649                        super::GetMinUncommittedObjectIdRequest,
650                    > for GetMinUncommittedObjectIdSvc<T> {
651                        type Response = super::GetMinUncommittedObjectIdResponse;
652                        type Future = BoxFuture<
653                            tonic::Response<Self::Response>,
654                            tonic::Status,
655                        >;
656                        fn call(
657                            &mut self,
658                            request: tonic::Request<
659                                super::GetMinUncommittedObjectIdRequest,
660                            >,
661                        ) -> Self::Future {
662                            let inner = Arc::clone(&self.0);
663                            let fut = async move {
664                                <T as StreamService>::get_min_uncommitted_object_id(
665                                        &inner,
666                                        request,
667                                    )
668                                    .await
669                            };
670                            Box::pin(fut)
671                        }
672                    }
673                    let accept_compression_encodings = self.accept_compression_encodings;
674                    let send_compression_encodings = self.send_compression_encodings;
675                    let max_decoding_message_size = self.max_decoding_message_size;
676                    let max_encoding_message_size = self.max_encoding_message_size;
677                    let inner = self.inner.clone();
678                    let fut = async move {
679                        let method = GetMinUncommittedObjectIdSvc(inner);
680                        let codec = tonic::codec::ProstCodec::default();
681                        let mut grpc = tonic::server::Grpc::new(codec)
682                            .apply_compression_config(
683                                accept_compression_encodings,
684                                send_compression_encodings,
685                            )
686                            .apply_max_message_size_config(
687                                max_decoding_message_size,
688                                max_encoding_message_size,
689                            );
690                        let res = grpc.unary(method, req).await;
691                        Ok(res)
692                    };
693                    Box::pin(fut)
694                }
695                _ => {
696                    Box::pin(async move {
697                        let mut response = http::Response::new(empty_body());
698                        let headers = response.headers_mut();
699                        headers
700                            .insert(
701                                tonic::Status::GRPC_STATUS,
702                                (tonic::Code::Unimplemented as i32).into(),
703                            );
704                        headers
705                            .insert(
706                                http::header::CONTENT_TYPE,
707                                tonic::metadata::GRPC_CONTENT_TYPE,
708                            );
709                        Ok(response)
710                    })
711                }
712            }
713        }
714    }
715    impl<T> Clone for StreamServiceServer<T> {
716        fn clone(&self) -> Self {
717            let inner = self.inner.clone();
718            Self {
719                inner,
720                accept_compression_encodings: self.accept_compression_encodings,
721                send_compression_encodings: self.send_compression_encodings,
722                max_decoding_message_size: self.max_decoding_message_size,
723                max_encoding_message_size: self.max_encoding_message_size,
724            }
725        }
726    }
727    /// Generated gRPC service name
728    pub const SERVICE_NAME: &str = "stream_service.StreamService";
729    impl<T> tonic::server::NamedService for StreamServiceServer<T> {
730        const NAME: &'static str = SERVICE_NAME;
731    }
732}