1#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct InjectBarrierRequest {
5 #[prost(string, tag = "1")]
6 pub request_id: ::prost::alloc::string::String,
7 #[prost(message, optional, tag = "2")]
8 pub barrier: ::core::option::Option<super::stream_plan::Barrier>,
9 #[prost(uint32, tag = "3", wrapper = "crate::id::DatabaseId")]
10 pub database_id: crate::id::DatabaseId,
11 #[prost(uint32, repeated, tag = "4", wrapper = "crate::id::ActorId")]
12 pub actor_ids_to_collect: ::prost::alloc::vec::Vec<crate::id::ActorId>,
13 #[prost(uint32, repeated, tag = "5", wrapper = "crate::id::TableId")]
14 pub table_ids_to_sync: ::prost::alloc::vec::Vec<crate::id::TableId>,
15 #[prost(uint32, tag = "6")]
16 pub partial_graph_id: u32,
17 #[prost(message, repeated, tag = "9")]
18 pub actors_to_build: ::prost::alloc::vec::Vec<
19 inject_barrier_request::FragmentBuildActorInfo,
20 >,
21}
22pub mod inject_barrier_request {
24 #[derive(prost_helpers::AnyPB)]
25 #[derive(Clone, PartialEq, ::prost::Message)]
26 pub struct FragmentBuildActorInfo {
27 #[prost(uint32, tag = "1", wrapper = "crate::id::FragmentId")]
28 pub fragment_id: crate::id::FragmentId,
29 #[prost(message, optional, tag = "2")]
30 pub node: ::core::option::Option<super::super::stream_plan::StreamNode>,
31 #[prost(message, repeated, tag = "3")]
32 pub actors: ::prost::alloc::vec::Vec<BuildActorInfo>,
33 }
34 #[derive(prost_helpers::AnyPB)]
35 #[derive(Clone, PartialEq, ::prost::Message)]
36 pub struct BuildActorInfo {
37 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
38 pub actor_id: crate::id::ActorId,
39 #[prost(map = "uint32, message", tag = "2", wrapper = "crate::id::FragmentId")]
40 pub fragment_upstreams: ::std::collections::HashMap<
41 crate::id::FragmentId,
42 build_actor_info::UpstreamActors,
43 >,
44 #[prost(message, repeated, tag = "3")]
45 pub dispatchers: ::prost::alloc::vec::Vec<super::super::stream_plan::Dispatcher>,
46 #[prost(message, optional, tag = "4")]
47 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
48 #[prost(string, tag = "5")]
49 pub mview_definition: ::prost::alloc::string::String,
50 #[prost(message, optional, tag = "6")]
51 pub expr_context: ::core::option::Option<super::super::plan_common::ExprContext>,
52 #[prost(string, tag = "9")]
53 pub config_override: ::prost::alloc::string::String,
54 #[prost(uint32, repeated, tag = "7")]
55 pub initial_subscriber_ids: ::prost::alloc::vec::Vec<u32>,
56 }
57 pub mod build_actor_info {
59 #[derive(prost_helpers::AnyPB)]
60 #[derive(Clone, PartialEq, ::prost::Message)]
61 pub struct UpstreamActors {
62 #[prost(message, repeated, tag = "1")]
63 pub actors: ::prost::alloc::vec::Vec<super::super::super::common::ActorInfo>,
64 }
65 }
66}
67#[derive(prost_helpers::AnyPB)]
68#[derive(Clone, PartialEq, ::prost::Message)]
69pub struct BarrierCompleteResponse {
70 #[prost(string, tag = "1")]
71 pub request_id: ::prost::alloc::string::String,
72 #[prost(message, optional, tag = "2")]
73 pub status: ::core::option::Option<super::common::Status>,
74 #[prost(message, repeated, tag = "3")]
75 pub create_mview_progress: ::prost::alloc::vec::Vec<
76 barrier_complete_response::CreateMviewProgress,
77 >,
78 #[prost(message, repeated, tag = "4")]
79 pub synced_sstables: ::prost::alloc::vec::Vec<
80 barrier_complete_response::LocalSstableInfo,
81 >,
82 #[prost(uint32, tag = "5", wrapper = "crate::id::WorkerId")]
83 pub worker_id: crate::id::WorkerId,
84 #[prost(map = "uint32, message", tag = "6", wrapper = "crate::id::TableId")]
85 pub table_watermarks: ::std::collections::HashMap<
86 crate::id::TableId,
87 super::hummock::TableWatermarks,
88 >,
89 #[prost(message, repeated, tag = "7")]
90 pub old_value_sstables: ::prost::alloc::vec::Vec<super::hummock::SstableInfo>,
91 #[prost(uint32, tag = "8")]
92 pub partial_graph_id: u32,
93 #[prost(uint64, tag = "9")]
95 pub epoch: u64,
96 #[prost(uint32, tag = "10", wrapper = "crate::id::DatabaseId")]
97 pub database_id: crate::id::DatabaseId,
98 #[prost(message, repeated, tag = "11")]
99 pub load_finished_sources: ::prost::alloc::vec::Vec<
100 barrier_complete_response::LoadFinishedSource,
101 >,
102 #[prost(map = "uint32, message", tag = "12", wrapper = "crate::id::TableId")]
103 pub vector_index_adds: ::std::collections::HashMap<
104 crate::id::TableId,
105 super::hummock::vector_index_delta::VectorIndexAdds,
106 >,
107 #[prost(message, repeated, tag = "13")]
108 pub cdc_table_backfill_progress: ::prost::alloc::vec::Vec<
109 barrier_complete_response::CdcTableBackfillProgress,
110 >,
111 #[prost(uint32, repeated, tag = "14", wrapper = "crate::id::TableId")]
115 pub truncate_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
116 #[prost(uint32, repeated, tag = "15", wrapper = "crate::id::TableId")]
120 pub refresh_finished_tables: ::prost::alloc::vec::Vec<crate::id::TableId>,
121 #[prost(message, repeated, tag = "16")]
122 pub list_finished_sources: ::prost::alloc::vec::Vec<
123 barrier_complete_response::ListFinishedSource,
124 >,
125}
126pub mod barrier_complete_response {
128 #[derive(prost_helpers::AnyPB)]
129 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
130 pub struct CreateMviewProgress {
131 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
134 pub backfill_actor_id: crate::id::ActorId,
135 #[prost(bool, tag = "2")]
136 pub done: bool,
137 #[prost(uint64, tag = "3")]
139 pub consumed_epoch: u64,
140 #[prost(uint64, tag = "4")]
142 pub consumed_rows: u64,
143 #[prost(uint64, tag = "5")]
144 pub pending_epoch_lag: u64,
145 #[prost(uint64, tag = "6")]
147 pub buffered_rows: u64,
148 #[prost(uint32, tag = "7", wrapper = "crate::id::FragmentId")]
149 pub fragment_id: crate::id::FragmentId,
150 }
151 #[derive(prost_helpers::AnyPB)]
152 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
153 pub struct CdcTableBackfillProgress {
154 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
155 pub actor_id: crate::id::ActorId,
156 #[prost(uint64, tag = "2")]
157 pub epoch: u64,
158 #[prost(bool, tag = "3")]
159 pub done: bool,
160 #[prost(int64, tag = "4")]
161 pub split_id_start_inclusive: i64,
162 #[prost(int64, tag = "5")]
163 pub split_id_end_inclusive: i64,
164 #[prost(uint64, tag = "6")]
165 pub generation: u64,
166 #[prost(uint32, tag = "7", wrapper = "crate::id::FragmentId")]
167 pub fragment_id: crate::id::FragmentId,
168 }
169 #[derive(prost_helpers::AnyPB)]
170 #[derive(Clone, PartialEq, ::prost::Message)]
171 pub struct LocalSstableInfo {
172 #[prost(message, optional, tag = "2")]
173 pub sst: ::core::option::Option<super::super::hummock::SstableInfo>,
174 #[prost(map = "uint32, message", tag = "3", wrapper = "crate::id::TableId")]
175 pub table_stats_map: ::std::collections::HashMap<
176 crate::id::TableId,
177 super::super::hummock::TableStats,
178 >,
179 #[prost(uint64, tag = "4")]
180 pub created_at: u64,
181 }
182 #[derive(prost_helpers::AnyPB)]
186 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
187 pub struct LoadFinishedSource {
188 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
190 pub reporter_actor_id: crate::id::ActorId,
191 #[prost(uint32, tag = "2", wrapper = "crate::id::TableId")]
193 pub table_id: crate::id::TableId,
194 #[prost(uint32, tag = "3", wrapper = "crate::id::SourceId")]
196 pub associated_source_id: crate::id::SourceId,
197 }
198 #[derive(prost_helpers::AnyPB)]
201 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
202 pub struct ListFinishedSource {
203 #[prost(uint32, tag = "1", wrapper = "crate::id::ActorId")]
205 pub reporter_actor_id: crate::id::ActorId,
206 #[prost(uint32, tag = "2", wrapper = "crate::id::TableId")]
208 pub table_id: crate::id::TableId,
209 #[prost(uint32, tag = "3", wrapper = "crate::id::SourceId")]
211 pub associated_source_id: crate::id::SourceId,
212 }
213}
214#[derive(prost_helpers::AnyPB)]
215#[derive(Clone, PartialEq, ::prost::Message)]
216pub struct StreamingControlStreamRequest {
217 #[prost(oneof = "streaming_control_stream_request::Request", tags = "1, 2, 3, 4, 5")]
218 pub request: ::core::option::Option<streaming_control_stream_request::Request>,
219}
220pub mod streaming_control_stream_request {
222 #[derive(prost_helpers::AnyPB)]
223 #[derive(Clone, PartialEq, ::prost::Message)]
224 pub struct InitRequest {
225 #[prost(string, tag = "1")]
226 pub term_id: ::prost::alloc::string::String,
227 }
228 #[derive(prost_helpers::AnyPB)]
229 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
230 pub struct CreatePartialGraphRequest {
231 #[prost(uint32, tag = "1")]
232 pub partial_graph_id: u32,
233 #[prost(uint32, tag = "2", wrapper = "crate::id::DatabaseId")]
234 pub database_id: crate::id::DatabaseId,
235 }
236 #[derive(prost_helpers::AnyPB)]
237 #[derive(Clone, PartialEq, ::prost::Message)]
238 pub struct RemovePartialGraphRequest {
239 #[prost(uint32, repeated, tag = "1")]
240 pub partial_graph_ids: ::prost::alloc::vec::Vec<u32>,
241 #[prost(uint32, tag = "2", wrapper = "crate::id::DatabaseId")]
242 pub database_id: crate::id::DatabaseId,
243 }
244 #[derive(prost_helpers::AnyPB)]
245 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
246 pub struct ResetDatabaseRequest {
247 #[prost(uint32, tag = "1", wrapper = "crate::id::DatabaseId")]
248 pub database_id: crate::id::DatabaseId,
249 #[prost(uint32, tag = "2")]
250 pub reset_request_id: u32,
251 }
252 #[derive(prost_helpers::AnyPB)]
253 #[derive(Clone, PartialEq, ::prost::Oneof)]
254 pub enum Request {
255 #[prost(message, tag = "1")]
256 Init(InitRequest),
257 #[prost(message, tag = "2")]
258 InjectBarrier(super::InjectBarrierRequest),
259 #[prost(message, tag = "3")]
260 RemovePartialGraph(RemovePartialGraphRequest),
261 #[prost(message, tag = "4")]
262 CreatePartialGraph(CreatePartialGraphRequest),
263 #[prost(message, tag = "5")]
264 ResetDatabase(ResetDatabaseRequest),
265 }
266}
267#[derive(prost_helpers::AnyPB)]
268#[derive(Clone, PartialEq, ::prost::Message)]
269pub struct ScoredError {
270 #[prost(string, tag = "1")]
271 pub err_msg: ::prost::alloc::string::String,
272 #[prost(int32, tag = "2")]
273 pub score: i32,
274}
275#[derive(prost_helpers::AnyPB)]
276#[derive(Clone, PartialEq, ::prost::Message)]
277pub struct StreamingControlStreamResponse {
278 #[prost(
279 oneof = "streaming_control_stream_response::Response",
280 tags = "1, 2, 3, 4, 5"
281 )]
282 pub response: ::core::option::Option<streaming_control_stream_response::Response>,
283}
284pub mod streaming_control_stream_response {
286 #[derive(prost_helpers::AnyPB)]
287 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
288 pub struct InitResponse {}
289 #[derive(prost_helpers::AnyPB)]
290 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
291 pub struct ShutdownResponse {}
292 #[derive(prost_helpers::AnyPB)]
293 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
294 pub struct ReportDatabaseFailureResponse {
295 #[prost(uint32, tag = "1", wrapper = "crate::id::DatabaseId")]
296 pub database_id: crate::id::DatabaseId,
297 }
298 #[derive(prost_helpers::AnyPB)]
299 #[derive(Clone, PartialEq, ::prost::Message)]
300 pub struct ResetDatabaseResponse {
301 #[prost(uint32, tag = "1", wrapper = "crate::id::DatabaseId")]
302 pub database_id: crate::id::DatabaseId,
303 #[prost(message, optional, tag = "2")]
304 pub root_err: ::core::option::Option<super::ScoredError>,
305 #[prost(uint32, tag = "3")]
306 pub reset_request_id: u32,
307 }
308 #[derive(prost_helpers::AnyPB)]
309 #[derive(Clone, PartialEq, ::prost::Oneof)]
310 pub enum Response {
311 #[prost(message, tag = "1")]
312 Init(InitResponse),
313 #[prost(message, tag = "2")]
314 CompleteBarrier(super::BarrierCompleteResponse),
315 #[prost(message, tag = "3")]
316 Shutdown(ShutdownResponse),
317 #[prost(message, tag = "4")]
318 ReportDatabaseFailure(ReportDatabaseFailureResponse),
319 #[prost(message, tag = "5")]
320 ResetDatabase(ResetDatabaseResponse),
321 }
322}
323#[derive(prost_helpers::AnyPB)]
324#[derive(Clone, Copy, PartialEq, ::prost::Message)]
325pub struct GetMinUncommittedObjectIdRequest {}
326#[derive(prost_helpers::AnyPB)]
327#[derive(Clone, Copy, PartialEq, ::prost::Message)]
328pub struct GetMinUncommittedObjectIdResponse {
329 #[prost(uint64, tag = "1")]
330 pub min_uncommitted_object_id: u64,
331}
332pub mod stream_service_client {
334 #![allow(
335 unused_variables,
336 dead_code,
337 missing_docs,
338 clippy::wildcard_imports,
339 clippy::let_unit_value,
340 )]
341 use tonic::codegen::*;
342 use tonic::codegen::http::Uri;
343 #[derive(Debug, Clone)]
344 pub struct StreamServiceClient<T> {
345 inner: tonic::client::Grpc<T>,
346 }
347 impl StreamServiceClient<tonic::transport::Channel> {
348 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
350 where
351 D: TryInto<tonic::transport::Endpoint>,
352 D::Error: Into<StdError>,
353 {
354 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
355 Ok(Self::new(conn))
356 }
357 }
358 impl<T> StreamServiceClient<T>
359 where
360 T: tonic::client::GrpcService<tonic::body::BoxBody>,
361 T::Error: Into<StdError>,
362 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
363 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
364 {
365 pub fn new(inner: T) -> Self {
366 let inner = tonic::client::Grpc::new(inner);
367 Self { inner }
368 }
369 pub fn with_origin(inner: T, origin: Uri) -> Self {
370 let inner = tonic::client::Grpc::with_origin(inner, origin);
371 Self { inner }
372 }
373 pub fn with_interceptor<F>(
374 inner: T,
375 interceptor: F,
376 ) -> StreamServiceClient<InterceptedService<T, F>>
377 where
378 F: tonic::service::Interceptor,
379 T::ResponseBody: Default,
380 T: tonic::codegen::Service<
381 http::Request<tonic::body::BoxBody>,
382 Response = http::Response<
383 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
384 >,
385 >,
386 <T as tonic::codegen::Service<
387 http::Request<tonic::body::BoxBody>,
388 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
389 {
390 StreamServiceClient::new(InterceptedService::new(inner, interceptor))
391 }
392 #[must_use]
397 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
398 self.inner = self.inner.send_compressed(encoding);
399 self
400 }
401 #[must_use]
403 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
404 self.inner = self.inner.accept_compressed(encoding);
405 self
406 }
407 #[must_use]
411 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
412 self.inner = self.inner.max_decoding_message_size(limit);
413 self
414 }
415 #[must_use]
419 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
420 self.inner = self.inner.max_encoding_message_size(limit);
421 self
422 }
423 pub async fn streaming_control_stream(
424 &mut self,
425 request: impl tonic::IntoStreamingRequest<
426 Message = super::StreamingControlStreamRequest,
427 >,
428 ) -> std::result::Result<
429 tonic::Response<
430 tonic::codec::Streaming<super::StreamingControlStreamResponse>,
431 >,
432 tonic::Status,
433 > {
434 self.inner
435 .ready()
436 .await
437 .map_err(|e| {
438 tonic::Status::unknown(
439 format!("Service was not ready: {}", e.into()),
440 )
441 })?;
442 let codec = tonic::codec::ProstCodec::default();
443 let path = http::uri::PathAndQuery::from_static(
444 "/stream_service.StreamService/StreamingControlStream",
445 );
446 let mut req = request.into_streaming_request();
447 req.extensions_mut()
448 .insert(
449 GrpcMethod::new(
450 "stream_service.StreamService",
451 "StreamingControlStream",
452 ),
453 );
454 self.inner.streaming(req, path, codec).await
455 }
456 pub async fn get_min_uncommitted_object_id(
457 &mut self,
458 request: impl tonic::IntoRequest<super::GetMinUncommittedObjectIdRequest>,
459 ) -> std::result::Result<
460 tonic::Response<super::GetMinUncommittedObjectIdResponse>,
461 tonic::Status,
462 > {
463 self.inner
464 .ready()
465 .await
466 .map_err(|e| {
467 tonic::Status::unknown(
468 format!("Service was not ready: {}", e.into()),
469 )
470 })?;
471 let codec = tonic::codec::ProstCodec::default();
472 let path = http::uri::PathAndQuery::from_static(
473 "/stream_service.StreamService/GetMinUncommittedObjectId",
474 );
475 let mut req = request.into_request();
476 req.extensions_mut()
477 .insert(
478 GrpcMethod::new(
479 "stream_service.StreamService",
480 "GetMinUncommittedObjectId",
481 ),
482 );
483 self.inner.unary(req, path, codec).await
484 }
485 }
486}
487pub mod stream_service_server {
489 #![allow(
490 unused_variables,
491 dead_code,
492 missing_docs,
493 clippy::wildcard_imports,
494 clippy::let_unit_value,
495 )]
496 use tonic::codegen::*;
497 #[async_trait]
499 pub trait StreamService: std::marker::Send + std::marker::Sync + 'static {
500 type StreamingControlStreamStream: tonic::codegen::tokio_stream::Stream<
502 Item = std::result::Result<
503 super::StreamingControlStreamResponse,
504 tonic::Status,
505 >,
506 >
507 + std::marker::Send
508 + 'static;
509 async fn streaming_control_stream(
510 &self,
511 request: tonic::Request<
512 tonic::Streaming<super::StreamingControlStreamRequest>,
513 >,
514 ) -> std::result::Result<
515 tonic::Response<Self::StreamingControlStreamStream>,
516 tonic::Status,
517 >;
518 async fn get_min_uncommitted_object_id(
519 &self,
520 request: tonic::Request<super::GetMinUncommittedObjectIdRequest>,
521 ) -> std::result::Result<
522 tonic::Response<super::GetMinUncommittedObjectIdResponse>,
523 tonic::Status,
524 >;
525 }
526 #[derive(Debug)]
527 pub struct StreamServiceServer<T> {
528 inner: Arc<T>,
529 accept_compression_encodings: EnabledCompressionEncodings,
530 send_compression_encodings: EnabledCompressionEncodings,
531 max_decoding_message_size: Option<usize>,
532 max_encoding_message_size: Option<usize>,
533 }
534 impl<T> StreamServiceServer<T> {
535 pub fn new(inner: T) -> Self {
536 Self::from_arc(Arc::new(inner))
537 }
538 pub fn from_arc(inner: Arc<T>) -> Self {
539 Self {
540 inner,
541 accept_compression_encodings: Default::default(),
542 send_compression_encodings: Default::default(),
543 max_decoding_message_size: None,
544 max_encoding_message_size: None,
545 }
546 }
547 pub fn with_interceptor<F>(
548 inner: T,
549 interceptor: F,
550 ) -> InterceptedService<Self, F>
551 where
552 F: tonic::service::Interceptor,
553 {
554 InterceptedService::new(Self::new(inner), interceptor)
555 }
556 #[must_use]
558 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
559 self.accept_compression_encodings.enable(encoding);
560 self
561 }
562 #[must_use]
564 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
565 self.send_compression_encodings.enable(encoding);
566 self
567 }
568 #[must_use]
572 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
573 self.max_decoding_message_size = Some(limit);
574 self
575 }
576 #[must_use]
580 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
581 self.max_encoding_message_size = Some(limit);
582 self
583 }
584 }
585 impl<T, B> tonic::codegen::Service<http::Request<B>> for StreamServiceServer<T>
586 where
587 T: StreamService,
588 B: Body + std::marker::Send + 'static,
589 B::Error: Into<StdError> + std::marker::Send + 'static,
590 {
591 type Response = http::Response<tonic::body::BoxBody>;
592 type Error = std::convert::Infallible;
593 type Future = BoxFuture<Self::Response, Self::Error>;
594 fn poll_ready(
595 &mut self,
596 _cx: &mut Context<'_>,
597 ) -> Poll<std::result::Result<(), Self::Error>> {
598 Poll::Ready(Ok(()))
599 }
600 fn call(&mut self, req: http::Request<B>) -> Self::Future {
601 match req.uri().path() {
602 "/stream_service.StreamService/StreamingControlStream" => {
603 #[allow(non_camel_case_types)]
604 struct StreamingControlStreamSvc<T: StreamService>(pub Arc<T>);
605 impl<
606 T: StreamService,
607 > tonic::server::StreamingService<
608 super::StreamingControlStreamRequest,
609 > for StreamingControlStreamSvc<T> {
610 type Response = super::StreamingControlStreamResponse;
611 type ResponseStream = T::StreamingControlStreamStream;
612 type Future = BoxFuture<
613 tonic::Response<Self::ResponseStream>,
614 tonic::Status,
615 >;
616 fn call(
617 &mut self,
618 request: tonic::Request<
619 tonic::Streaming<super::StreamingControlStreamRequest>,
620 >,
621 ) -> Self::Future {
622 let inner = Arc::clone(&self.0);
623 let fut = async move {
624 <T as StreamService>::streaming_control_stream(
625 &inner,
626 request,
627 )
628 .await
629 };
630 Box::pin(fut)
631 }
632 }
633 let accept_compression_encodings = self.accept_compression_encodings;
634 let send_compression_encodings = self.send_compression_encodings;
635 let max_decoding_message_size = self.max_decoding_message_size;
636 let max_encoding_message_size = self.max_encoding_message_size;
637 let inner = self.inner.clone();
638 let fut = async move {
639 let method = StreamingControlStreamSvc(inner);
640 let codec = tonic::codec::ProstCodec::default();
641 let mut grpc = tonic::server::Grpc::new(codec)
642 .apply_compression_config(
643 accept_compression_encodings,
644 send_compression_encodings,
645 )
646 .apply_max_message_size_config(
647 max_decoding_message_size,
648 max_encoding_message_size,
649 );
650 let res = grpc.streaming(method, req).await;
651 Ok(res)
652 };
653 Box::pin(fut)
654 }
655 "/stream_service.StreamService/GetMinUncommittedObjectId" => {
656 #[allow(non_camel_case_types)]
657 struct GetMinUncommittedObjectIdSvc<T: StreamService>(pub Arc<T>);
658 impl<
659 T: StreamService,
660 > tonic::server::UnaryService<
661 super::GetMinUncommittedObjectIdRequest,
662 > for GetMinUncommittedObjectIdSvc<T> {
663 type Response = super::GetMinUncommittedObjectIdResponse;
664 type Future = BoxFuture<
665 tonic::Response<Self::Response>,
666 tonic::Status,
667 >;
668 fn call(
669 &mut self,
670 request: tonic::Request<
671 super::GetMinUncommittedObjectIdRequest,
672 >,
673 ) -> Self::Future {
674 let inner = Arc::clone(&self.0);
675 let fut = async move {
676 <T as StreamService>::get_min_uncommitted_object_id(
677 &inner,
678 request,
679 )
680 .await
681 };
682 Box::pin(fut)
683 }
684 }
685 let accept_compression_encodings = self.accept_compression_encodings;
686 let send_compression_encodings = self.send_compression_encodings;
687 let max_decoding_message_size = self.max_decoding_message_size;
688 let max_encoding_message_size = self.max_encoding_message_size;
689 let inner = self.inner.clone();
690 let fut = async move {
691 let method = GetMinUncommittedObjectIdSvc(inner);
692 let codec = tonic::codec::ProstCodec::default();
693 let mut grpc = tonic::server::Grpc::new(codec)
694 .apply_compression_config(
695 accept_compression_encodings,
696 send_compression_encodings,
697 )
698 .apply_max_message_size_config(
699 max_decoding_message_size,
700 max_encoding_message_size,
701 );
702 let res = grpc.unary(method, req).await;
703 Ok(res)
704 };
705 Box::pin(fut)
706 }
707 _ => {
708 Box::pin(async move {
709 let mut response = http::Response::new(empty_body());
710 let headers = response.headers_mut();
711 headers
712 .insert(
713 tonic::Status::GRPC_STATUS,
714 (tonic::Code::Unimplemented as i32).into(),
715 );
716 headers
717 .insert(
718 http::header::CONTENT_TYPE,
719 tonic::metadata::GRPC_CONTENT_TYPE,
720 );
721 Ok(response)
722 })
723 }
724 }
725 }
726 }
727 impl<T> Clone for StreamServiceServer<T> {
728 fn clone(&self) -> Self {
729 let inner = self.inner.clone();
730 Self {
731 inner,
732 accept_compression_encodings: self.accept_compression_encodings,
733 send_compression_encodings: self.send_compression_encodings,
734 max_decoding_message_size: self.max_decoding_message_size,
735 max_encoding_message_size: self.max_encoding_message_size,
736 }
737 }
738 }
739 pub const SERVICE_NAME: &str = "stream_service.StreamService";
741 impl<T> tonic::server::NamedService for StreamServiceServer<T> {
742 const NAME: &'static str = SERVICE_NAME;
743 }
744}