1#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct InjectBarrierRequest {
5 #[prost(string, tag = "1")]
6 pub request_id: ::prost::alloc::string::String,
7 #[prost(message, optional, tag = "2")]
8 pub barrier: ::core::option::Option<super::stream_plan::Barrier>,
9 #[prost(uint32, tag = "3")]
10 pub database_id: u32,
11 #[prost(uint32, repeated, tag = "4")]
12 pub actor_ids_to_collect: ::prost::alloc::vec::Vec<u32>,
13 #[prost(uint32, repeated, tag = "5")]
14 pub table_ids_to_sync: ::prost::alloc::vec::Vec<u32>,
15 #[prost(uint32, tag = "6")]
16 pub partial_graph_id: u32,
17 #[prost(message, repeated, tag = "9")]
18 pub actors_to_build: ::prost::alloc::vec::Vec<
19 inject_barrier_request::FragmentBuildActorInfo,
20 >,
21 #[prost(message, repeated, tag = "10")]
22 pub subscriptions_to_add: ::prost::alloc::vec::Vec<
23 super::stream_plan::SubscriptionUpstreamInfo,
24 >,
25 #[prost(message, repeated, tag = "11")]
26 pub subscriptions_to_remove: ::prost::alloc::vec::Vec<
27 super::stream_plan::SubscriptionUpstreamInfo,
28 >,
29}
30pub mod inject_barrier_request {
32 #[derive(prost_helpers::AnyPB)]
33 #[derive(Clone, PartialEq, ::prost::Message)]
34 pub struct FragmentBuildActorInfo {
35 #[prost(uint32, tag = "1")]
36 pub fragment_id: u32,
37 #[prost(message, optional, tag = "2")]
38 pub node: ::core::option::Option<super::super::stream_plan::StreamNode>,
39 #[prost(message, repeated, tag = "3")]
40 pub actors: ::prost::alloc::vec::Vec<BuildActorInfo>,
41 }
42 #[derive(prost_helpers::AnyPB)]
43 #[derive(Clone, PartialEq, ::prost::Message)]
44 pub struct BuildActorInfo {
45 #[prost(uint32, tag = "1")]
46 pub actor_id: u32,
47 #[prost(map = "uint32, message", tag = "2")]
48 pub fragment_upstreams: ::std::collections::HashMap<
49 u32,
50 build_actor_info::UpstreamActors,
51 >,
52 #[prost(message, repeated, tag = "3")]
53 pub dispatchers: ::prost::alloc::vec::Vec<super::super::stream_plan::Dispatcher>,
54 #[prost(message, optional, tag = "4")]
55 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
56 #[prost(string, tag = "5")]
57 pub mview_definition: ::prost::alloc::string::String,
58 #[prost(message, optional, tag = "6")]
59 pub expr_context: ::core::option::Option<super::super::plan_common::ExprContext>,
60 }
61 pub mod build_actor_info {
63 #[derive(prost_helpers::AnyPB)]
64 #[derive(Clone, PartialEq, ::prost::Message)]
65 pub struct UpstreamActors {
66 #[prost(message, repeated, tag = "1")]
67 pub actors: ::prost::alloc::vec::Vec<super::super::super::common::ActorInfo>,
68 }
69 }
70}
71#[derive(prost_helpers::AnyPB)]
72#[derive(Clone, PartialEq, ::prost::Message)]
73pub struct BarrierCompleteResponse {
74 #[prost(string, tag = "1")]
75 pub request_id: ::prost::alloc::string::String,
76 #[prost(message, optional, tag = "2")]
77 pub status: ::core::option::Option<super::common::Status>,
78 #[prost(message, repeated, tag = "3")]
79 pub create_mview_progress: ::prost::alloc::vec::Vec<
80 barrier_complete_response::CreateMviewProgress,
81 >,
82 #[prost(message, repeated, tag = "4")]
83 pub synced_sstables: ::prost::alloc::vec::Vec<
84 barrier_complete_response::LocalSstableInfo,
85 >,
86 #[prost(uint32, tag = "5")]
87 pub worker_id: u32,
88 #[prost(map = "uint32, message", tag = "6")]
89 pub table_watermarks: ::std::collections::HashMap<
90 u32,
91 super::hummock::TableWatermarks,
92 >,
93 #[prost(message, repeated, tag = "7")]
94 pub old_value_sstables: ::prost::alloc::vec::Vec<super::hummock::SstableInfo>,
95 #[prost(uint32, tag = "8")]
96 pub partial_graph_id: u32,
97 #[prost(uint64, tag = "9")]
99 pub epoch: u64,
100 #[prost(uint32, tag = "10")]
101 pub database_id: u32,
102 #[prost(uint32, repeated, tag = "11")]
106 pub load_finished_source_ids: ::prost::alloc::vec::Vec<u32>,
107 #[prost(map = "uint32, message", tag = "12")]
108 pub vector_index_adds: ::std::collections::HashMap<
109 u32,
110 super::hummock::vector_index_delta::VectorIndexAdds,
111 >,
112 #[prost(message, repeated, tag = "13")]
113 pub cdc_table_backfill_progress: ::prost::alloc::vec::Vec<
114 barrier_complete_response::CdcTableBackfillProgress,
115 >,
116 #[prost(uint32, repeated, tag = "14")]
120 pub truncate_tables: ::prost::alloc::vec::Vec<u32>,
121 #[prost(uint32, repeated, tag = "15")]
125 pub refresh_finished_tables: ::prost::alloc::vec::Vec<u32>,
126}
127pub mod barrier_complete_response {
129 #[derive(prost_helpers::AnyPB)]
130 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
131 pub struct CreateMviewProgress {
132 #[prost(uint32, tag = "1")]
135 pub backfill_actor_id: u32,
136 #[prost(bool, tag = "2")]
137 pub done: bool,
138 #[prost(uint64, tag = "3")]
140 pub consumed_epoch: u64,
141 #[prost(uint64, tag = "4")]
143 pub consumed_rows: u64,
144 #[prost(uint64, tag = "5")]
145 pub pending_epoch_lag: u64,
146 }
147 #[derive(prost_helpers::AnyPB)]
148 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
149 pub struct CdcTableBackfillProgress {
150 #[prost(uint32, tag = "1")]
151 pub actor_id: u32,
152 #[prost(uint64, tag = "2")]
153 pub epoch: u64,
154 #[prost(bool, tag = "3")]
155 pub done: bool,
156 #[prost(int64, tag = "4")]
157 pub split_id_start_inclusive: i64,
158 #[prost(int64, tag = "5")]
159 pub split_id_end_inclusive: i64,
160 #[prost(uint64, tag = "6")]
161 pub generation: u64,
162 #[prost(uint32, tag = "7")]
163 pub fragment_id: u32,
164 }
165 #[derive(prost_helpers::AnyPB)]
166 #[derive(Clone, PartialEq, ::prost::Message)]
167 pub struct LocalSstableInfo {
168 #[prost(message, optional, tag = "2")]
169 pub sst: ::core::option::Option<super::super::hummock::SstableInfo>,
170 #[prost(map = "uint32, message", tag = "3")]
171 pub table_stats_map: ::std::collections::HashMap<
172 u32,
173 super::super::hummock::TableStats,
174 >,
175 #[prost(uint64, tag = "4")]
176 pub created_at: u64,
177 }
178}
179#[derive(prost_helpers::AnyPB)]
180#[derive(Clone, PartialEq, ::prost::Message)]
181pub struct StreamingControlStreamRequest {
182 #[prost(oneof = "streaming_control_stream_request::Request", tags = "1, 2, 3, 4, 5")]
183 pub request: ::core::option::Option<streaming_control_stream_request::Request>,
184}
185pub mod streaming_control_stream_request {
187 #[derive(prost_helpers::AnyPB)]
188 #[derive(Clone, PartialEq, ::prost::Message)]
189 pub struct InitialPartialGraph {
190 #[prost(uint32, tag = "1")]
191 pub partial_graph_id: u32,
192 #[prost(message, repeated, tag = "2")]
193 pub subscriptions: ::prost::alloc::vec::Vec<
194 super::super::stream_plan::SubscriptionUpstreamInfo,
195 >,
196 }
197 #[derive(prost_helpers::AnyPB)]
198 #[derive(Clone, PartialEq, ::prost::Message)]
199 pub struct DatabaseInitialPartialGraph {
200 #[prost(uint32, tag = "1")]
201 pub database_id: u32,
202 #[prost(message, repeated, tag = "2")]
203 pub graphs: ::prost::alloc::vec::Vec<InitialPartialGraph>,
204 }
205 #[derive(prost_helpers::AnyPB)]
206 #[derive(Clone, PartialEq, ::prost::Message)]
207 pub struct InitRequest {
208 #[prost(message, repeated, tag = "1")]
209 pub databases: ::prost::alloc::vec::Vec<DatabaseInitialPartialGraph>,
210 #[prost(string, tag = "2")]
211 pub term_id: ::prost::alloc::string::String,
212 }
213 #[derive(prost_helpers::AnyPB)]
214 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
215 pub struct CreatePartialGraphRequest {
216 #[prost(uint32, tag = "1")]
217 pub partial_graph_id: u32,
218 #[prost(uint32, tag = "2")]
219 pub database_id: u32,
220 }
221 #[derive(prost_helpers::AnyPB)]
222 #[derive(Clone, PartialEq, ::prost::Message)]
223 pub struct RemovePartialGraphRequest {
224 #[prost(uint32, repeated, tag = "1")]
225 pub partial_graph_ids: ::prost::alloc::vec::Vec<u32>,
226 #[prost(uint32, tag = "2")]
227 pub database_id: u32,
228 }
229 #[derive(prost_helpers::AnyPB)]
230 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
231 pub struct ResetDatabaseRequest {
232 #[prost(uint32, tag = "1")]
233 pub database_id: u32,
234 #[prost(uint32, tag = "2")]
235 pub reset_request_id: u32,
236 }
237 #[derive(prost_helpers::AnyPB)]
238 #[derive(Clone, PartialEq, ::prost::Oneof)]
239 pub enum Request {
240 #[prost(message, tag = "1")]
241 Init(InitRequest),
242 #[prost(message, tag = "2")]
243 InjectBarrier(super::InjectBarrierRequest),
244 #[prost(message, tag = "3")]
245 RemovePartialGraph(RemovePartialGraphRequest),
246 #[prost(message, tag = "4")]
247 CreatePartialGraph(CreatePartialGraphRequest),
248 #[prost(message, tag = "5")]
249 ResetDatabase(ResetDatabaseRequest),
250 }
251}
252#[derive(prost_helpers::AnyPB)]
253#[derive(Clone, PartialEq, ::prost::Message)]
254pub struct ScoredError {
255 #[prost(string, tag = "1")]
256 pub err_msg: ::prost::alloc::string::String,
257 #[prost(int32, tag = "2")]
258 pub score: i32,
259}
260#[derive(prost_helpers::AnyPB)]
261#[derive(Clone, PartialEq, ::prost::Message)]
262pub struct StreamingControlStreamResponse {
263 #[prost(
264 oneof = "streaming_control_stream_response::Response",
265 tags = "1, 2, 3, 4, 5"
266 )]
267 pub response: ::core::option::Option<streaming_control_stream_response::Response>,
268}
269pub mod streaming_control_stream_response {
271 #[derive(prost_helpers::AnyPB)]
272 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
273 pub struct InitResponse {}
274 #[derive(prost_helpers::AnyPB)]
275 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
276 pub struct ShutdownResponse {}
277 #[derive(prost_helpers::AnyPB)]
278 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
279 pub struct ReportDatabaseFailureResponse {
280 #[prost(uint32, tag = "1")]
281 pub database_id: u32,
282 }
283 #[derive(prost_helpers::AnyPB)]
284 #[derive(Clone, PartialEq, ::prost::Message)]
285 pub struct ResetDatabaseResponse {
286 #[prost(uint32, tag = "1")]
287 pub database_id: u32,
288 #[prost(message, optional, tag = "2")]
289 pub root_err: ::core::option::Option<super::ScoredError>,
290 #[prost(uint32, tag = "3")]
291 pub reset_request_id: u32,
292 }
293 #[derive(prost_helpers::AnyPB)]
294 #[derive(Clone, PartialEq, ::prost::Oneof)]
295 pub enum Response {
296 #[prost(message, tag = "1")]
297 Init(InitResponse),
298 #[prost(message, tag = "2")]
299 CompleteBarrier(super::BarrierCompleteResponse),
300 #[prost(message, tag = "3")]
301 Shutdown(ShutdownResponse),
302 #[prost(message, tag = "4")]
303 ReportDatabaseFailure(ReportDatabaseFailureResponse),
304 #[prost(message, tag = "5")]
305 ResetDatabase(ResetDatabaseResponse),
306 }
307}
308#[derive(prost_helpers::AnyPB)]
309#[derive(Clone, Copy, PartialEq, ::prost::Message)]
310pub struct GetMinUncommittedObjectIdRequest {}
311#[derive(prost_helpers::AnyPB)]
312#[derive(Clone, Copy, PartialEq, ::prost::Message)]
313pub struct GetMinUncommittedObjectIdResponse {
314 #[prost(uint64, tag = "1")]
315 pub min_uncommitted_object_id: u64,
316}
317pub mod stream_service_client {
319 #![allow(
320 unused_variables,
321 dead_code,
322 missing_docs,
323 clippy::wildcard_imports,
324 clippy::let_unit_value,
325 )]
326 use tonic::codegen::*;
327 use tonic::codegen::http::Uri;
328 #[derive(Debug, Clone)]
329 pub struct StreamServiceClient<T> {
330 inner: tonic::client::Grpc<T>,
331 }
332 impl StreamServiceClient<tonic::transport::Channel> {
333 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
335 where
336 D: TryInto<tonic::transport::Endpoint>,
337 D::Error: Into<StdError>,
338 {
339 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
340 Ok(Self::new(conn))
341 }
342 }
343 impl<T> StreamServiceClient<T>
344 where
345 T: tonic::client::GrpcService<tonic::body::BoxBody>,
346 T::Error: Into<StdError>,
347 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
348 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
349 {
350 pub fn new(inner: T) -> Self {
351 let inner = tonic::client::Grpc::new(inner);
352 Self { inner }
353 }
354 pub fn with_origin(inner: T, origin: Uri) -> Self {
355 let inner = tonic::client::Grpc::with_origin(inner, origin);
356 Self { inner }
357 }
358 pub fn with_interceptor<F>(
359 inner: T,
360 interceptor: F,
361 ) -> StreamServiceClient<InterceptedService<T, F>>
362 where
363 F: tonic::service::Interceptor,
364 T::ResponseBody: Default,
365 T: tonic::codegen::Service<
366 http::Request<tonic::body::BoxBody>,
367 Response = http::Response<
368 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
369 >,
370 >,
371 <T as tonic::codegen::Service<
372 http::Request<tonic::body::BoxBody>,
373 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
374 {
375 StreamServiceClient::new(InterceptedService::new(inner, interceptor))
376 }
377 #[must_use]
382 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
383 self.inner = self.inner.send_compressed(encoding);
384 self
385 }
386 #[must_use]
388 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
389 self.inner = self.inner.accept_compressed(encoding);
390 self
391 }
392 #[must_use]
396 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
397 self.inner = self.inner.max_decoding_message_size(limit);
398 self
399 }
400 #[must_use]
404 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
405 self.inner = self.inner.max_encoding_message_size(limit);
406 self
407 }
408 pub async fn streaming_control_stream(
409 &mut self,
410 request: impl tonic::IntoStreamingRequest<
411 Message = super::StreamingControlStreamRequest,
412 >,
413 ) -> std::result::Result<
414 tonic::Response<
415 tonic::codec::Streaming<super::StreamingControlStreamResponse>,
416 >,
417 tonic::Status,
418 > {
419 self.inner
420 .ready()
421 .await
422 .map_err(|e| {
423 tonic::Status::unknown(
424 format!("Service was not ready: {}", e.into()),
425 )
426 })?;
427 let codec = tonic::codec::ProstCodec::default();
428 let path = http::uri::PathAndQuery::from_static(
429 "/stream_service.StreamService/StreamingControlStream",
430 );
431 let mut req = request.into_streaming_request();
432 req.extensions_mut()
433 .insert(
434 GrpcMethod::new(
435 "stream_service.StreamService",
436 "StreamingControlStream",
437 ),
438 );
439 self.inner.streaming(req, path, codec).await
440 }
441 pub async fn get_min_uncommitted_object_id(
442 &mut self,
443 request: impl tonic::IntoRequest<super::GetMinUncommittedObjectIdRequest>,
444 ) -> std::result::Result<
445 tonic::Response<super::GetMinUncommittedObjectIdResponse>,
446 tonic::Status,
447 > {
448 self.inner
449 .ready()
450 .await
451 .map_err(|e| {
452 tonic::Status::unknown(
453 format!("Service was not ready: {}", e.into()),
454 )
455 })?;
456 let codec = tonic::codec::ProstCodec::default();
457 let path = http::uri::PathAndQuery::from_static(
458 "/stream_service.StreamService/GetMinUncommittedObjectId",
459 );
460 let mut req = request.into_request();
461 req.extensions_mut()
462 .insert(
463 GrpcMethod::new(
464 "stream_service.StreamService",
465 "GetMinUncommittedObjectId",
466 ),
467 );
468 self.inner.unary(req, path, codec).await
469 }
470 }
471}
472pub mod stream_service_server {
474 #![allow(
475 unused_variables,
476 dead_code,
477 missing_docs,
478 clippy::wildcard_imports,
479 clippy::let_unit_value,
480 )]
481 use tonic::codegen::*;
482 #[async_trait]
484 pub trait StreamService: std::marker::Send + std::marker::Sync + 'static {
485 type StreamingControlStreamStream: tonic::codegen::tokio_stream::Stream<
487 Item = std::result::Result<
488 super::StreamingControlStreamResponse,
489 tonic::Status,
490 >,
491 >
492 + std::marker::Send
493 + 'static;
494 async fn streaming_control_stream(
495 &self,
496 request: tonic::Request<
497 tonic::Streaming<super::StreamingControlStreamRequest>,
498 >,
499 ) -> std::result::Result<
500 tonic::Response<Self::StreamingControlStreamStream>,
501 tonic::Status,
502 >;
503 async fn get_min_uncommitted_object_id(
504 &self,
505 request: tonic::Request<super::GetMinUncommittedObjectIdRequest>,
506 ) -> std::result::Result<
507 tonic::Response<super::GetMinUncommittedObjectIdResponse>,
508 tonic::Status,
509 >;
510 }
511 #[derive(Debug)]
512 pub struct StreamServiceServer<T> {
513 inner: Arc<T>,
514 accept_compression_encodings: EnabledCompressionEncodings,
515 send_compression_encodings: EnabledCompressionEncodings,
516 max_decoding_message_size: Option<usize>,
517 max_encoding_message_size: Option<usize>,
518 }
519 impl<T> StreamServiceServer<T> {
520 pub fn new(inner: T) -> Self {
521 Self::from_arc(Arc::new(inner))
522 }
523 pub fn from_arc(inner: Arc<T>) -> Self {
524 Self {
525 inner,
526 accept_compression_encodings: Default::default(),
527 send_compression_encodings: Default::default(),
528 max_decoding_message_size: None,
529 max_encoding_message_size: None,
530 }
531 }
532 pub fn with_interceptor<F>(
533 inner: T,
534 interceptor: F,
535 ) -> InterceptedService<Self, F>
536 where
537 F: tonic::service::Interceptor,
538 {
539 InterceptedService::new(Self::new(inner), interceptor)
540 }
541 #[must_use]
543 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
544 self.accept_compression_encodings.enable(encoding);
545 self
546 }
547 #[must_use]
549 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
550 self.send_compression_encodings.enable(encoding);
551 self
552 }
553 #[must_use]
557 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
558 self.max_decoding_message_size = Some(limit);
559 self
560 }
561 #[must_use]
565 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
566 self.max_encoding_message_size = Some(limit);
567 self
568 }
569 }
570 impl<T, B> tonic::codegen::Service<http::Request<B>> for StreamServiceServer<T>
571 where
572 T: StreamService,
573 B: Body + std::marker::Send + 'static,
574 B::Error: Into<StdError> + std::marker::Send + 'static,
575 {
576 type Response = http::Response<tonic::body::BoxBody>;
577 type Error = std::convert::Infallible;
578 type Future = BoxFuture<Self::Response, Self::Error>;
579 fn poll_ready(
580 &mut self,
581 _cx: &mut Context<'_>,
582 ) -> Poll<std::result::Result<(), Self::Error>> {
583 Poll::Ready(Ok(()))
584 }
585 fn call(&mut self, req: http::Request<B>) -> Self::Future {
586 match req.uri().path() {
587 "/stream_service.StreamService/StreamingControlStream" => {
588 #[allow(non_camel_case_types)]
589 struct StreamingControlStreamSvc<T: StreamService>(pub Arc<T>);
590 impl<
591 T: StreamService,
592 > tonic::server::StreamingService<
593 super::StreamingControlStreamRequest,
594 > for StreamingControlStreamSvc<T> {
595 type Response = super::StreamingControlStreamResponse;
596 type ResponseStream = T::StreamingControlStreamStream;
597 type Future = BoxFuture<
598 tonic::Response<Self::ResponseStream>,
599 tonic::Status,
600 >;
601 fn call(
602 &mut self,
603 request: tonic::Request<
604 tonic::Streaming<super::StreamingControlStreamRequest>,
605 >,
606 ) -> Self::Future {
607 let inner = Arc::clone(&self.0);
608 let fut = async move {
609 <T as StreamService>::streaming_control_stream(
610 &inner,
611 request,
612 )
613 .await
614 };
615 Box::pin(fut)
616 }
617 }
618 let accept_compression_encodings = self.accept_compression_encodings;
619 let send_compression_encodings = self.send_compression_encodings;
620 let max_decoding_message_size = self.max_decoding_message_size;
621 let max_encoding_message_size = self.max_encoding_message_size;
622 let inner = self.inner.clone();
623 let fut = async move {
624 let method = StreamingControlStreamSvc(inner);
625 let codec = tonic::codec::ProstCodec::default();
626 let mut grpc = tonic::server::Grpc::new(codec)
627 .apply_compression_config(
628 accept_compression_encodings,
629 send_compression_encodings,
630 )
631 .apply_max_message_size_config(
632 max_decoding_message_size,
633 max_encoding_message_size,
634 );
635 let res = grpc.streaming(method, req).await;
636 Ok(res)
637 };
638 Box::pin(fut)
639 }
640 "/stream_service.StreamService/GetMinUncommittedObjectId" => {
641 #[allow(non_camel_case_types)]
642 struct GetMinUncommittedObjectIdSvc<T: StreamService>(pub Arc<T>);
643 impl<
644 T: StreamService,
645 > tonic::server::UnaryService<
646 super::GetMinUncommittedObjectIdRequest,
647 > for GetMinUncommittedObjectIdSvc<T> {
648 type Response = super::GetMinUncommittedObjectIdResponse;
649 type Future = BoxFuture<
650 tonic::Response<Self::Response>,
651 tonic::Status,
652 >;
653 fn call(
654 &mut self,
655 request: tonic::Request<
656 super::GetMinUncommittedObjectIdRequest,
657 >,
658 ) -> Self::Future {
659 let inner = Arc::clone(&self.0);
660 let fut = async move {
661 <T as StreamService>::get_min_uncommitted_object_id(
662 &inner,
663 request,
664 )
665 .await
666 };
667 Box::pin(fut)
668 }
669 }
670 let accept_compression_encodings = self.accept_compression_encodings;
671 let send_compression_encodings = self.send_compression_encodings;
672 let max_decoding_message_size = self.max_decoding_message_size;
673 let max_encoding_message_size = self.max_encoding_message_size;
674 let inner = self.inner.clone();
675 let fut = async move {
676 let method = GetMinUncommittedObjectIdSvc(inner);
677 let codec = tonic::codec::ProstCodec::default();
678 let mut grpc = tonic::server::Grpc::new(codec)
679 .apply_compression_config(
680 accept_compression_encodings,
681 send_compression_encodings,
682 )
683 .apply_max_message_size_config(
684 max_decoding_message_size,
685 max_encoding_message_size,
686 );
687 let res = grpc.unary(method, req).await;
688 Ok(res)
689 };
690 Box::pin(fut)
691 }
692 _ => {
693 Box::pin(async move {
694 let mut response = http::Response::new(empty_body());
695 let headers = response.headers_mut();
696 headers
697 .insert(
698 tonic::Status::GRPC_STATUS,
699 (tonic::Code::Unimplemented as i32).into(),
700 );
701 headers
702 .insert(
703 http::header::CONTENT_TYPE,
704 tonic::metadata::GRPC_CONTENT_TYPE,
705 );
706 Ok(response)
707 })
708 }
709 }
710 }
711 }
712 impl<T> Clone for StreamServiceServer<T> {
713 fn clone(&self) -> Self {
714 let inner = self.inner.clone();
715 Self {
716 inner,
717 accept_compression_encodings: self.accept_compression_encodings,
718 send_compression_encodings: self.send_compression_encodings,
719 max_decoding_message_size: self.max_decoding_message_size,
720 max_encoding_message_size: self.max_encoding_message_size,
721 }
722 }
723 }
724 pub const SERVICE_NAME: &str = "stream_service.StreamService";
726 impl<T> tonic::server::NamedService for StreamServiceServer<T> {
727 const NAME: &'static str = SERVICE_NAME;
728 }
729}