1#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct InjectBarrierRequest {
5 #[prost(string, tag = "1")]
6 pub request_id: ::prost::alloc::string::String,
7 #[prost(message, optional, tag = "2")]
8 pub barrier: ::core::option::Option<super::stream_plan::Barrier>,
9 #[prost(uint32, tag = "3")]
10 pub database_id: u32,
11 #[prost(uint32, repeated, tag = "4")]
12 pub actor_ids_to_collect: ::prost::alloc::vec::Vec<u32>,
13 #[prost(uint32, repeated, tag = "5")]
14 pub table_ids_to_sync: ::prost::alloc::vec::Vec<u32>,
15 #[prost(uint32, tag = "6")]
16 pub partial_graph_id: u32,
17 #[prost(message, repeated, tag = "9")]
18 pub actors_to_build: ::prost::alloc::vec::Vec<
19 inject_barrier_request::FragmentBuildActorInfo,
20 >,
21 #[prost(message, repeated, tag = "10")]
22 pub subscriptions_to_add: ::prost::alloc::vec::Vec<
23 super::stream_plan::SubscriptionUpstreamInfo,
24 >,
25 #[prost(message, repeated, tag = "11")]
26 pub subscriptions_to_remove: ::prost::alloc::vec::Vec<
27 super::stream_plan::SubscriptionUpstreamInfo,
28 >,
29}
30pub mod inject_barrier_request {
32 #[derive(prost_helpers::AnyPB)]
33 #[derive(Clone, PartialEq, ::prost::Message)]
34 pub struct FragmentBuildActorInfo {
35 #[prost(uint32, tag = "1")]
36 pub fragment_id: u32,
37 #[prost(message, optional, tag = "2")]
38 pub node: ::core::option::Option<super::super::stream_plan::StreamNode>,
39 #[prost(message, repeated, tag = "3")]
40 pub actors: ::prost::alloc::vec::Vec<BuildActorInfo>,
41 }
42 #[derive(prost_helpers::AnyPB)]
43 #[derive(Clone, PartialEq, ::prost::Message)]
44 pub struct BuildActorInfo {
45 #[prost(uint32, tag = "1")]
46 pub actor_id: u32,
47 #[prost(map = "uint32, message", tag = "2")]
48 pub fragment_upstreams: ::std::collections::HashMap<
49 u32,
50 build_actor_info::UpstreamActors,
51 >,
52 #[prost(message, repeated, tag = "3")]
53 pub dispatchers: ::prost::alloc::vec::Vec<super::super::stream_plan::Dispatcher>,
54 #[prost(message, optional, tag = "4")]
55 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
56 #[prost(string, tag = "5")]
57 pub mview_definition: ::prost::alloc::string::String,
58 #[prost(message, optional, tag = "6")]
59 pub expr_context: ::core::option::Option<super::super::plan_common::ExprContext>,
60 }
61 pub mod build_actor_info {
63 #[derive(prost_helpers::AnyPB)]
64 #[derive(Clone, PartialEq, ::prost::Message)]
65 pub struct UpstreamActors {
66 #[prost(message, repeated, tag = "1")]
67 pub actors: ::prost::alloc::vec::Vec<super::super::super::common::ActorInfo>,
68 }
69 }
70}
71#[derive(prost_helpers::AnyPB)]
72#[derive(Clone, PartialEq, ::prost::Message)]
73pub struct BarrierCompleteResponse {
74 #[prost(string, tag = "1")]
75 pub request_id: ::prost::alloc::string::String,
76 #[prost(message, optional, tag = "2")]
77 pub status: ::core::option::Option<super::common::Status>,
78 #[prost(message, repeated, tag = "3")]
79 pub create_mview_progress: ::prost::alloc::vec::Vec<
80 barrier_complete_response::CreateMviewProgress,
81 >,
82 #[prost(message, repeated, tag = "4")]
83 pub synced_sstables: ::prost::alloc::vec::Vec<
84 barrier_complete_response::LocalSstableInfo,
85 >,
86 #[prost(uint32, tag = "5")]
87 pub worker_id: u32,
88 #[prost(map = "uint32, message", tag = "6")]
89 pub table_watermarks: ::std::collections::HashMap<
90 u32,
91 super::hummock::TableWatermarks,
92 >,
93 #[prost(message, repeated, tag = "7")]
94 pub old_value_sstables: ::prost::alloc::vec::Vec<super::hummock::SstableInfo>,
95 #[prost(uint32, tag = "8")]
96 pub partial_graph_id: u32,
97 #[prost(uint64, tag = "9")]
99 pub epoch: u64,
100 #[prost(uint32, tag = "10")]
101 pub database_id: u32,
102 #[prost(uint32, repeated, tag = "11")]
106 pub load_finished_source_ids: ::prost::alloc::vec::Vec<u32>,
107 #[prost(map = "uint32, message", tag = "12")]
108 pub vector_index_adds: ::std::collections::HashMap<
109 u32,
110 super::hummock::vector_index_delta::VectorIndexAdds,
111 >,
112 #[prost(message, repeated, tag = "13")]
113 pub cdc_table_backfill_progress: ::prost::alloc::vec::Vec<
114 barrier_complete_response::CdcTableBackfillProgress,
115 >,
116}
117pub mod barrier_complete_response {
119 #[derive(prost_helpers::AnyPB)]
120 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
121 pub struct CreateMviewProgress {
122 #[prost(uint32, tag = "1")]
125 pub backfill_actor_id: u32,
126 #[prost(bool, tag = "2")]
127 pub done: bool,
128 #[prost(uint64, tag = "3")]
130 pub consumed_epoch: u64,
131 #[prost(uint64, tag = "4")]
133 pub consumed_rows: u64,
134 #[prost(uint64, tag = "5")]
135 pub pending_epoch_lag: u64,
136 }
137 #[derive(prost_helpers::AnyPB)]
138 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
139 pub struct CdcTableBackfillProgress {
140 #[prost(uint32, tag = "1")]
141 pub actor_id: u32,
142 #[prost(uint64, tag = "2")]
143 pub epoch: u64,
144 #[prost(bool, tag = "3")]
145 pub done: bool,
146 #[prost(int64, tag = "4")]
147 pub split_id_start_inclusive: i64,
148 #[prost(int64, tag = "5")]
149 pub split_id_end_inclusive: i64,
150 #[prost(uint64, tag = "6")]
151 pub generation: u64,
152 #[prost(uint32, tag = "7")]
153 pub fragment_id: u32,
154 }
155 #[derive(prost_helpers::AnyPB)]
156 #[derive(Clone, PartialEq, ::prost::Message)]
157 pub struct LocalSstableInfo {
158 #[prost(message, optional, tag = "2")]
159 pub sst: ::core::option::Option<super::super::hummock::SstableInfo>,
160 #[prost(map = "uint32, message", tag = "3")]
161 pub table_stats_map: ::std::collections::HashMap<
162 u32,
163 super::super::hummock::TableStats,
164 >,
165 #[prost(uint64, tag = "4")]
166 pub created_at: u64,
167 }
168}
169#[derive(prost_helpers::AnyPB)]
170#[derive(Clone, PartialEq, ::prost::Message)]
171pub struct StreamingControlStreamRequest {
172 #[prost(oneof = "streaming_control_stream_request::Request", tags = "1, 2, 3, 4, 5")]
173 pub request: ::core::option::Option<streaming_control_stream_request::Request>,
174}
175pub mod streaming_control_stream_request {
177 #[derive(prost_helpers::AnyPB)]
178 #[derive(Clone, PartialEq, ::prost::Message)]
179 pub struct InitialPartialGraph {
180 #[prost(uint32, tag = "1")]
181 pub partial_graph_id: u32,
182 #[prost(message, repeated, tag = "2")]
183 pub subscriptions: ::prost::alloc::vec::Vec<
184 super::super::stream_plan::SubscriptionUpstreamInfo,
185 >,
186 }
187 #[derive(prost_helpers::AnyPB)]
188 #[derive(Clone, PartialEq, ::prost::Message)]
189 pub struct DatabaseInitialPartialGraph {
190 #[prost(uint32, tag = "1")]
191 pub database_id: u32,
192 #[prost(message, repeated, tag = "2")]
193 pub graphs: ::prost::alloc::vec::Vec<InitialPartialGraph>,
194 }
195 #[derive(prost_helpers::AnyPB)]
196 #[derive(Clone, PartialEq, ::prost::Message)]
197 pub struct InitRequest {
198 #[prost(message, repeated, tag = "1")]
199 pub databases: ::prost::alloc::vec::Vec<DatabaseInitialPartialGraph>,
200 #[prost(string, tag = "2")]
201 pub term_id: ::prost::alloc::string::String,
202 }
203 #[derive(prost_helpers::AnyPB)]
204 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
205 pub struct CreatePartialGraphRequest {
206 #[prost(uint32, tag = "1")]
207 pub partial_graph_id: u32,
208 #[prost(uint32, tag = "2")]
209 pub database_id: u32,
210 }
211 #[derive(prost_helpers::AnyPB)]
212 #[derive(Clone, PartialEq, ::prost::Message)]
213 pub struct RemovePartialGraphRequest {
214 #[prost(uint32, repeated, tag = "1")]
215 pub partial_graph_ids: ::prost::alloc::vec::Vec<u32>,
216 #[prost(uint32, tag = "2")]
217 pub database_id: u32,
218 }
219 #[derive(prost_helpers::AnyPB)]
220 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
221 pub struct ResetDatabaseRequest {
222 #[prost(uint32, tag = "1")]
223 pub database_id: u32,
224 #[prost(uint32, tag = "2")]
225 pub reset_request_id: u32,
226 }
227 #[derive(prost_helpers::AnyPB)]
228 #[derive(Clone, PartialEq, ::prost::Oneof)]
229 pub enum Request {
230 #[prost(message, tag = "1")]
231 Init(InitRequest),
232 #[prost(message, tag = "2")]
233 InjectBarrier(super::InjectBarrierRequest),
234 #[prost(message, tag = "3")]
235 RemovePartialGraph(RemovePartialGraphRequest),
236 #[prost(message, tag = "4")]
237 CreatePartialGraph(CreatePartialGraphRequest),
238 #[prost(message, tag = "5")]
239 ResetDatabase(ResetDatabaseRequest),
240 }
241}
242#[derive(prost_helpers::AnyPB)]
243#[derive(Clone, PartialEq, ::prost::Message)]
244pub struct ScoredError {
245 #[prost(string, tag = "1")]
246 pub err_msg: ::prost::alloc::string::String,
247 #[prost(int32, tag = "2")]
248 pub score: i32,
249}
250#[derive(prost_helpers::AnyPB)]
251#[derive(Clone, PartialEq, ::prost::Message)]
252pub struct StreamingControlStreamResponse {
253 #[prost(
254 oneof = "streaming_control_stream_response::Response",
255 tags = "1, 2, 3, 4, 5"
256 )]
257 pub response: ::core::option::Option<streaming_control_stream_response::Response>,
258}
259pub mod streaming_control_stream_response {
261 #[derive(prost_helpers::AnyPB)]
262 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
263 pub struct InitResponse {}
264 #[derive(prost_helpers::AnyPB)]
265 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
266 pub struct ShutdownResponse {}
267 #[derive(prost_helpers::AnyPB)]
268 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
269 pub struct ReportDatabaseFailureResponse {
270 #[prost(uint32, tag = "1")]
271 pub database_id: u32,
272 }
273 #[derive(prost_helpers::AnyPB)]
274 #[derive(Clone, PartialEq, ::prost::Message)]
275 pub struct ResetDatabaseResponse {
276 #[prost(uint32, tag = "1")]
277 pub database_id: u32,
278 #[prost(message, optional, tag = "2")]
279 pub root_err: ::core::option::Option<super::ScoredError>,
280 #[prost(uint32, tag = "3")]
281 pub reset_request_id: u32,
282 }
283 #[derive(prost_helpers::AnyPB)]
284 #[derive(Clone, PartialEq, ::prost::Oneof)]
285 pub enum Response {
286 #[prost(message, tag = "1")]
287 Init(InitResponse),
288 #[prost(message, tag = "2")]
289 CompleteBarrier(super::BarrierCompleteResponse),
290 #[prost(message, tag = "3")]
291 Shutdown(ShutdownResponse),
292 #[prost(message, tag = "4")]
293 ReportDatabaseFailure(ReportDatabaseFailureResponse),
294 #[prost(message, tag = "5")]
295 ResetDatabase(ResetDatabaseResponse),
296 }
297}
298#[derive(prost_helpers::AnyPB)]
299#[derive(Clone, Copy, PartialEq, ::prost::Message)]
300pub struct GetMinUncommittedObjectIdRequest {}
301#[derive(prost_helpers::AnyPB)]
302#[derive(Clone, Copy, PartialEq, ::prost::Message)]
303pub struct GetMinUncommittedObjectIdResponse {
304 #[prost(uint64, tag = "1")]
305 pub min_uncommitted_object_id: u64,
306}
307pub mod stream_service_client {
309 #![allow(
310 unused_variables,
311 dead_code,
312 missing_docs,
313 clippy::wildcard_imports,
314 clippy::let_unit_value,
315 )]
316 use tonic::codegen::*;
317 use tonic::codegen::http::Uri;
318 #[derive(Debug, Clone)]
319 pub struct StreamServiceClient<T> {
320 inner: tonic::client::Grpc<T>,
321 }
322 impl StreamServiceClient<tonic::transport::Channel> {
323 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
325 where
326 D: TryInto<tonic::transport::Endpoint>,
327 D::Error: Into<StdError>,
328 {
329 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
330 Ok(Self::new(conn))
331 }
332 }
333 impl<T> StreamServiceClient<T>
334 where
335 T: tonic::client::GrpcService<tonic::body::BoxBody>,
336 T::Error: Into<StdError>,
337 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
338 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
339 {
340 pub fn new(inner: T) -> Self {
341 let inner = tonic::client::Grpc::new(inner);
342 Self { inner }
343 }
344 pub fn with_origin(inner: T, origin: Uri) -> Self {
345 let inner = tonic::client::Grpc::with_origin(inner, origin);
346 Self { inner }
347 }
348 pub fn with_interceptor<F>(
349 inner: T,
350 interceptor: F,
351 ) -> StreamServiceClient<InterceptedService<T, F>>
352 where
353 F: tonic::service::Interceptor,
354 T::ResponseBody: Default,
355 T: tonic::codegen::Service<
356 http::Request<tonic::body::BoxBody>,
357 Response = http::Response<
358 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
359 >,
360 >,
361 <T as tonic::codegen::Service<
362 http::Request<tonic::body::BoxBody>,
363 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
364 {
365 StreamServiceClient::new(InterceptedService::new(inner, interceptor))
366 }
367 #[must_use]
372 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
373 self.inner = self.inner.send_compressed(encoding);
374 self
375 }
376 #[must_use]
378 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
379 self.inner = self.inner.accept_compressed(encoding);
380 self
381 }
382 #[must_use]
386 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
387 self.inner = self.inner.max_decoding_message_size(limit);
388 self
389 }
390 #[must_use]
394 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
395 self.inner = self.inner.max_encoding_message_size(limit);
396 self
397 }
398 pub async fn streaming_control_stream(
399 &mut self,
400 request: impl tonic::IntoStreamingRequest<
401 Message = super::StreamingControlStreamRequest,
402 >,
403 ) -> std::result::Result<
404 tonic::Response<
405 tonic::codec::Streaming<super::StreamingControlStreamResponse>,
406 >,
407 tonic::Status,
408 > {
409 self.inner
410 .ready()
411 .await
412 .map_err(|e| {
413 tonic::Status::unknown(
414 format!("Service was not ready: {}", e.into()),
415 )
416 })?;
417 let codec = tonic::codec::ProstCodec::default();
418 let path = http::uri::PathAndQuery::from_static(
419 "/stream_service.StreamService/StreamingControlStream",
420 );
421 let mut req = request.into_streaming_request();
422 req.extensions_mut()
423 .insert(
424 GrpcMethod::new(
425 "stream_service.StreamService",
426 "StreamingControlStream",
427 ),
428 );
429 self.inner.streaming(req, path, codec).await
430 }
431 pub async fn get_min_uncommitted_object_id(
432 &mut self,
433 request: impl tonic::IntoRequest<super::GetMinUncommittedObjectIdRequest>,
434 ) -> std::result::Result<
435 tonic::Response<super::GetMinUncommittedObjectIdResponse>,
436 tonic::Status,
437 > {
438 self.inner
439 .ready()
440 .await
441 .map_err(|e| {
442 tonic::Status::unknown(
443 format!("Service was not ready: {}", e.into()),
444 )
445 })?;
446 let codec = tonic::codec::ProstCodec::default();
447 let path = http::uri::PathAndQuery::from_static(
448 "/stream_service.StreamService/GetMinUncommittedObjectId",
449 );
450 let mut req = request.into_request();
451 req.extensions_mut()
452 .insert(
453 GrpcMethod::new(
454 "stream_service.StreamService",
455 "GetMinUncommittedObjectId",
456 ),
457 );
458 self.inner.unary(req, path, codec).await
459 }
460 }
461}
462pub mod stream_service_server {
464 #![allow(
465 unused_variables,
466 dead_code,
467 missing_docs,
468 clippy::wildcard_imports,
469 clippy::let_unit_value,
470 )]
471 use tonic::codegen::*;
472 #[async_trait]
474 pub trait StreamService: std::marker::Send + std::marker::Sync + 'static {
475 type StreamingControlStreamStream: tonic::codegen::tokio_stream::Stream<
477 Item = std::result::Result<
478 super::StreamingControlStreamResponse,
479 tonic::Status,
480 >,
481 >
482 + std::marker::Send
483 + 'static;
484 async fn streaming_control_stream(
485 &self,
486 request: tonic::Request<
487 tonic::Streaming<super::StreamingControlStreamRequest>,
488 >,
489 ) -> std::result::Result<
490 tonic::Response<Self::StreamingControlStreamStream>,
491 tonic::Status,
492 >;
493 async fn get_min_uncommitted_object_id(
494 &self,
495 request: tonic::Request<super::GetMinUncommittedObjectIdRequest>,
496 ) -> std::result::Result<
497 tonic::Response<super::GetMinUncommittedObjectIdResponse>,
498 tonic::Status,
499 >;
500 }
501 #[derive(Debug)]
502 pub struct StreamServiceServer<T> {
503 inner: Arc<T>,
504 accept_compression_encodings: EnabledCompressionEncodings,
505 send_compression_encodings: EnabledCompressionEncodings,
506 max_decoding_message_size: Option<usize>,
507 max_encoding_message_size: Option<usize>,
508 }
509 impl<T> StreamServiceServer<T> {
510 pub fn new(inner: T) -> Self {
511 Self::from_arc(Arc::new(inner))
512 }
513 pub fn from_arc(inner: Arc<T>) -> Self {
514 Self {
515 inner,
516 accept_compression_encodings: Default::default(),
517 send_compression_encodings: Default::default(),
518 max_decoding_message_size: None,
519 max_encoding_message_size: None,
520 }
521 }
522 pub fn with_interceptor<F>(
523 inner: T,
524 interceptor: F,
525 ) -> InterceptedService<Self, F>
526 where
527 F: tonic::service::Interceptor,
528 {
529 InterceptedService::new(Self::new(inner), interceptor)
530 }
531 #[must_use]
533 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
534 self.accept_compression_encodings.enable(encoding);
535 self
536 }
537 #[must_use]
539 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
540 self.send_compression_encodings.enable(encoding);
541 self
542 }
543 #[must_use]
547 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
548 self.max_decoding_message_size = Some(limit);
549 self
550 }
551 #[must_use]
555 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
556 self.max_encoding_message_size = Some(limit);
557 self
558 }
559 }
560 impl<T, B> tonic::codegen::Service<http::Request<B>> for StreamServiceServer<T>
561 where
562 T: StreamService,
563 B: Body + std::marker::Send + 'static,
564 B::Error: Into<StdError> + std::marker::Send + 'static,
565 {
566 type Response = http::Response<tonic::body::BoxBody>;
567 type Error = std::convert::Infallible;
568 type Future = BoxFuture<Self::Response, Self::Error>;
569 fn poll_ready(
570 &mut self,
571 _cx: &mut Context<'_>,
572 ) -> Poll<std::result::Result<(), Self::Error>> {
573 Poll::Ready(Ok(()))
574 }
575 fn call(&mut self, req: http::Request<B>) -> Self::Future {
576 match req.uri().path() {
577 "/stream_service.StreamService/StreamingControlStream" => {
578 #[allow(non_camel_case_types)]
579 struct StreamingControlStreamSvc<T: StreamService>(pub Arc<T>);
580 impl<
581 T: StreamService,
582 > tonic::server::StreamingService<
583 super::StreamingControlStreamRequest,
584 > for StreamingControlStreamSvc<T> {
585 type Response = super::StreamingControlStreamResponse;
586 type ResponseStream = T::StreamingControlStreamStream;
587 type Future = BoxFuture<
588 tonic::Response<Self::ResponseStream>,
589 tonic::Status,
590 >;
591 fn call(
592 &mut self,
593 request: tonic::Request<
594 tonic::Streaming<super::StreamingControlStreamRequest>,
595 >,
596 ) -> Self::Future {
597 let inner = Arc::clone(&self.0);
598 let fut = async move {
599 <T as StreamService>::streaming_control_stream(
600 &inner,
601 request,
602 )
603 .await
604 };
605 Box::pin(fut)
606 }
607 }
608 let accept_compression_encodings = self.accept_compression_encodings;
609 let send_compression_encodings = self.send_compression_encodings;
610 let max_decoding_message_size = self.max_decoding_message_size;
611 let max_encoding_message_size = self.max_encoding_message_size;
612 let inner = self.inner.clone();
613 let fut = async move {
614 let method = StreamingControlStreamSvc(inner);
615 let codec = tonic::codec::ProstCodec::default();
616 let mut grpc = tonic::server::Grpc::new(codec)
617 .apply_compression_config(
618 accept_compression_encodings,
619 send_compression_encodings,
620 )
621 .apply_max_message_size_config(
622 max_decoding_message_size,
623 max_encoding_message_size,
624 );
625 let res = grpc.streaming(method, req).await;
626 Ok(res)
627 };
628 Box::pin(fut)
629 }
630 "/stream_service.StreamService/GetMinUncommittedObjectId" => {
631 #[allow(non_camel_case_types)]
632 struct GetMinUncommittedObjectIdSvc<T: StreamService>(pub Arc<T>);
633 impl<
634 T: StreamService,
635 > tonic::server::UnaryService<
636 super::GetMinUncommittedObjectIdRequest,
637 > for GetMinUncommittedObjectIdSvc<T> {
638 type Response = super::GetMinUncommittedObjectIdResponse;
639 type Future = BoxFuture<
640 tonic::Response<Self::Response>,
641 tonic::Status,
642 >;
643 fn call(
644 &mut self,
645 request: tonic::Request<
646 super::GetMinUncommittedObjectIdRequest,
647 >,
648 ) -> Self::Future {
649 let inner = Arc::clone(&self.0);
650 let fut = async move {
651 <T as StreamService>::get_min_uncommitted_object_id(
652 &inner,
653 request,
654 )
655 .await
656 };
657 Box::pin(fut)
658 }
659 }
660 let accept_compression_encodings = self.accept_compression_encodings;
661 let send_compression_encodings = self.send_compression_encodings;
662 let max_decoding_message_size = self.max_decoding_message_size;
663 let max_encoding_message_size = self.max_encoding_message_size;
664 let inner = self.inner.clone();
665 let fut = async move {
666 let method = GetMinUncommittedObjectIdSvc(inner);
667 let codec = tonic::codec::ProstCodec::default();
668 let mut grpc = tonic::server::Grpc::new(codec)
669 .apply_compression_config(
670 accept_compression_encodings,
671 send_compression_encodings,
672 )
673 .apply_max_message_size_config(
674 max_decoding_message_size,
675 max_encoding_message_size,
676 );
677 let res = grpc.unary(method, req).await;
678 Ok(res)
679 };
680 Box::pin(fut)
681 }
682 _ => {
683 Box::pin(async move {
684 let mut response = http::Response::new(empty_body());
685 let headers = response.headers_mut();
686 headers
687 .insert(
688 tonic::Status::GRPC_STATUS,
689 (tonic::Code::Unimplemented as i32).into(),
690 );
691 headers
692 .insert(
693 http::header::CONTENT_TYPE,
694 tonic::metadata::GRPC_CONTENT_TYPE,
695 );
696 Ok(response)
697 })
698 }
699 }
700 }
701 }
702 impl<T> Clone for StreamServiceServer<T> {
703 fn clone(&self) -> Self {
704 let inner = self.inner.clone();
705 Self {
706 inner,
707 accept_compression_encodings: self.accept_compression_encodings,
708 send_compression_encodings: self.send_compression_encodings,
709 max_decoding_message_size: self.max_decoding_message_size,
710 max_encoding_message_size: self.max_encoding_message_size,
711 }
712 }
713 }
714 pub const SERVICE_NAME: &str = "stream_service.StreamService";
716 impl<T> tonic::server::NamedService for StreamServiceServer<T> {
717 const NAME: &'static str = SERVICE_NAME;
718 }
719}