1#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct TableSchema {
5 #[prost(message, repeated, tag = "1")]
6 pub columns: ::prost::alloc::vec::Vec<super::plan_common::ColumnDesc>,
7 #[prost(uint32, repeated, tag = "2")]
8 pub pk_indices: ::prost::alloc::vec::Vec<u32>,
9}
10#[derive(prost_helpers::AnyPB)]
11#[derive(Clone, PartialEq, ::prost::Message)]
12pub struct ValidationError {
13 #[prost(string, tag = "1")]
14 pub error_message: ::prost::alloc::string::String,
15}
16#[derive(prost_helpers::AnyPB)]
17#[derive(Clone, PartialEq, ::prost::Message)]
18pub struct SinkParam {
19 #[prost(uint32, tag = "1")]
20 pub sink_id: u32,
21 #[prost(btree_map = "string, string", tag = "2")]
22 pub properties: ::prost::alloc::collections::BTreeMap<
23 ::prost::alloc::string::String,
24 ::prost::alloc::string::String,
25 >,
26 #[prost(message, optional, tag = "3")]
27 pub table_schema: ::core::option::Option<TableSchema>,
28 #[prost(enumeration = "super::catalog::SinkType", tag = "4")]
30 pub sink_type: i32,
31 #[prost(string, tag = "5")]
32 pub db_name: ::prost::alloc::string::String,
33 #[prost(string, tag = "6")]
34 pub sink_from_name: ::prost::alloc::string::String,
35 #[prost(message, optional, tag = "7")]
36 pub format_desc: ::core::option::Option<super::catalog::SinkFormatDesc>,
37 #[prost(string, tag = "8")]
38 pub sink_name: ::prost::alloc::string::String,
39}
40#[derive(prost_helpers::AnyPB)]
41#[derive(Clone, PartialEq, ::prost::Message)]
42pub struct SinkWriterStreamRequest {
43 #[prost(oneof = "sink_writer_stream_request::Request", tags = "1, 3, 4")]
44 pub request: ::core::option::Option<sink_writer_stream_request::Request>,
45}
46pub mod sink_writer_stream_request {
48 #[derive(prost_helpers::AnyPB)]
49 #[derive(Clone, PartialEq, ::prost::Message)]
50 pub struct StartSink {
51 #[prost(message, optional, tag = "1")]
52 pub sink_param: ::core::option::Option<super::SinkParam>,
53 #[prost(message, optional, tag = "3")]
54 pub payload_schema: ::core::option::Option<super::TableSchema>,
55 }
56 #[derive(prost_helpers::AnyPB)]
57 #[derive(Clone, PartialEq, ::prost::Message)]
58 pub struct WriteBatch {
59 #[prost(uint64, tag = "3")]
60 pub batch_id: u64,
61 #[prost(uint64, tag = "4")]
62 pub epoch: u64,
63 #[prost(oneof = "write_batch::Payload", tags = "2, 5")]
64 pub payload: ::core::option::Option<write_batch::Payload>,
65 }
66 pub mod write_batch {
68 #[derive(prost_helpers::AnyPB)]
69 #[derive(Clone, PartialEq, ::prost::Message)]
70 pub struct StreamChunkPayload {
71 #[prost(bytes = "vec", tag = "1")]
72 pub binary_data: ::prost::alloc::vec::Vec<u8>,
73 }
74 #[derive(prost_helpers::AnyPB)]
75 #[derive(Clone, PartialEq, ::prost::Oneof)]
76 pub enum Payload {
77 #[prost(message, tag = "2")]
78 StreamChunkPayload(StreamChunkPayload),
79 #[prost(int64, tag = "5")]
83 StreamChunkRefPointer(i64),
84 }
85 }
86 #[derive(prost_helpers::AnyPB)]
87 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
88 pub struct Barrier {
89 #[prost(uint64, tag = "1")]
90 pub epoch: u64,
91 #[prost(bool, tag = "2")]
92 pub is_checkpoint: bool,
93 }
94 #[derive(prost_helpers::AnyPB)]
95 #[derive(Clone, PartialEq, ::prost::Oneof)]
96 pub enum Request {
97 #[prost(message, tag = "1")]
98 Start(StartSink),
99 #[prost(message, tag = "3")]
100 WriteBatch(WriteBatch),
101 #[prost(message, tag = "4")]
102 Barrier(Barrier),
103 }
104}
105#[derive(prost_helpers::AnyPB)]
106#[derive(Clone, PartialEq, ::prost::Message)]
107pub struct SinkWriterStreamResponse {
108 #[prost(oneof = "sink_writer_stream_response::Response", tags = "1, 2, 3")]
109 pub response: ::core::option::Option<sink_writer_stream_response::Response>,
110}
111pub mod sink_writer_stream_response {
113 #[derive(prost_helpers::AnyPB)]
114 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
115 pub struct StartResponse {}
116 #[derive(prost_helpers::AnyPB)]
117 #[derive(Clone, PartialEq, ::prost::Message)]
118 pub struct CommitResponse {
119 #[prost(uint64, tag = "1")]
120 pub epoch: u64,
121 #[prost(message, optional, tag = "2")]
122 pub metadata: ::core::option::Option<super::SinkMetadata>,
123 }
124 #[derive(prost_helpers::AnyPB)]
125 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
126 pub struct BatchWrittenResponse {
127 #[prost(uint64, tag = "1")]
128 pub epoch: u64,
129 #[prost(uint64, tag = "2")]
130 pub batch_id: u64,
131 }
132 #[derive(prost_helpers::AnyPB)]
133 #[derive(Clone, PartialEq, ::prost::Oneof)]
134 pub enum Response {
135 #[prost(message, tag = "1")]
136 Start(StartResponse),
137 #[prost(message, tag = "2")]
138 Commit(CommitResponse),
139 #[prost(message, tag = "3")]
140 Batch(BatchWrittenResponse),
141 }
142}
143#[derive(prost_helpers::AnyPB)]
144#[derive(Clone, PartialEq, ::prost::Message)]
145pub struct ValidateSinkRequest {
146 #[prost(message, optional, tag = "1")]
147 pub sink_param: ::core::option::Option<SinkParam>,
148}
149#[derive(prost_helpers::AnyPB)]
150#[derive(Clone, PartialEq, ::prost::Message)]
151pub struct ValidateSinkResponse {
152 #[prost(message, optional, tag = "1")]
154 pub error: ::core::option::Option<ValidationError>,
155}
156#[derive(prost_helpers::AnyPB)]
157#[derive(Clone, PartialEq, ::prost::Message)]
158pub struct SinkMetadata {
159 #[prost(oneof = "sink_metadata::Metadata", tags = "1")]
160 pub metadata: ::core::option::Option<sink_metadata::Metadata>,
161}
162pub mod sink_metadata {
164 #[derive(prost_helpers::AnyPB)]
165 #[derive(Clone, PartialEq, ::prost::Message)]
166 pub struct SerializedMetadata {
167 #[prost(bytes = "vec", tag = "1")]
168 pub metadata: ::prost::alloc::vec::Vec<u8>,
169 }
170 #[derive(prost_helpers::AnyPB)]
171 #[derive(Clone, PartialEq, ::prost::Oneof)]
172 pub enum Metadata {
173 #[prost(message, tag = "1")]
174 Serialized(SerializedMetadata),
175 }
176}
177#[derive(prost_helpers::AnyPB)]
178#[derive(Clone, PartialEq, ::prost::Message)]
179pub struct SinkCoordinatorStreamRequest {
180 #[prost(oneof = "sink_coordinator_stream_request::Request", tags = "1, 2")]
181 pub request: ::core::option::Option<sink_coordinator_stream_request::Request>,
182}
183pub mod sink_coordinator_stream_request {
185 #[derive(prost_helpers::AnyPB)]
186 #[derive(Clone, PartialEq, ::prost::Message)]
187 pub struct StartCoordinator {
188 #[prost(message, optional, tag = "1")]
189 pub param: ::core::option::Option<super::SinkParam>,
190 }
191 #[derive(prost_helpers::AnyPB)]
192 #[derive(Clone, PartialEq, ::prost::Message)]
193 pub struct CommitMetadata {
194 #[prost(uint64, tag = "1")]
195 pub epoch: u64,
196 #[prost(message, repeated, tag = "2")]
197 pub metadata: ::prost::alloc::vec::Vec<super::SinkMetadata>,
198 }
199 #[derive(prost_helpers::AnyPB)]
200 #[derive(Clone, PartialEq, ::prost::Oneof)]
201 pub enum Request {
202 #[prost(message, tag = "1")]
203 Start(StartCoordinator),
204 #[prost(message, tag = "2")]
205 Commit(CommitMetadata),
206 }
207}
208#[derive(prost_helpers::AnyPB)]
209#[derive(Clone, Copy, PartialEq, ::prost::Message)]
210pub struct SinkCoordinatorStreamResponse {
211 #[prost(oneof = "sink_coordinator_stream_response::Response", tags = "1, 2")]
212 pub response: ::core::option::Option<sink_coordinator_stream_response::Response>,
213}
214pub mod sink_coordinator_stream_response {
216 #[derive(prost_helpers::AnyPB)]
217 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
218 pub struct StartResponse {}
219 #[derive(prost_helpers::AnyPB)]
220 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
221 pub struct CommitResponse {
222 #[prost(uint64, tag = "1")]
223 pub epoch: u64,
224 }
225 #[derive(prost_helpers::AnyPB)]
226 #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
227 pub enum Response {
228 #[prost(message, tag = "1")]
229 Start(StartResponse),
230 #[prost(message, tag = "2")]
231 Commit(CommitResponse),
232 }
233}
234#[derive(prost_helpers::AnyPB)]
235#[derive(Clone, PartialEq, ::prost::Message)]
236pub struct CdcMessage {
237 #[prost(string, tag = "1")]
239 pub payload: ::prost::alloc::string::String,
240 #[prost(string, tag = "2")]
241 pub partition: ::prost::alloc::string::String,
242 #[prost(string, tag = "3")]
243 pub offset: ::prost::alloc::string::String,
244 #[prost(string, tag = "4")]
245 pub full_table_name: ::prost::alloc::string::String,
246 #[prost(int64, tag = "5")]
247 pub source_ts_ms: i64,
248 #[prost(enumeration = "cdc_message::CdcMessageType", tag = "6")]
249 pub msg_type: i32,
250 #[prost(string, tag = "7")]
252 pub key: ::prost::alloc::string::String,
253}
254pub mod cdc_message {
256 #[derive(prost_helpers::AnyPB)]
257 #[derive(
258 Clone,
259 Copy,
260 Debug,
261 PartialEq,
262 Eq,
263 Hash,
264 PartialOrd,
265 Ord,
266 ::prost::Enumeration
267 )]
268 #[repr(i32)]
269 pub enum CdcMessageType {
270 Unspecified = 0,
271 Heartbeat = 1,
272 Data = 2,
273 TransactionMeta = 3,
274 SchemaChange = 4,
275 }
276 impl CdcMessageType {
277 pub fn as_str_name(&self) -> &'static str {
282 match self {
283 Self::Unspecified => "UNSPECIFIED",
284 Self::Heartbeat => "HEARTBEAT",
285 Self::Data => "DATA",
286 Self::TransactionMeta => "TRANSACTION_META",
287 Self::SchemaChange => "SCHEMA_CHANGE",
288 }
289 }
290 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
292 match value {
293 "UNSPECIFIED" => Some(Self::Unspecified),
294 "HEARTBEAT" => Some(Self::Heartbeat),
295 "DATA" => Some(Self::Data),
296 "TRANSACTION_META" => Some(Self::TransactionMeta),
297 "SCHEMA_CHANGE" => Some(Self::SchemaChange),
298 _ => None,
299 }
300 }
301 }
302}
303#[derive(prost_helpers::AnyPB)]
304#[derive(Clone, PartialEq, ::prost::Message)]
305pub struct GetEventStreamRequest {
306 #[prost(uint64, tag = "1")]
307 pub source_id: u64,
308 #[prost(enumeration = "SourceType", tag = "2")]
309 pub source_type: i32,
310 #[prost(string, tag = "3")]
311 pub start_offset: ::prost::alloc::string::String,
312 #[prost(btree_map = "string, string", tag = "4")]
313 pub properties: ::prost::alloc::collections::BTreeMap<
314 ::prost::alloc::string::String,
315 ::prost::alloc::string::String,
316 >,
317 #[prost(bool, tag = "5")]
318 pub snapshot_done: bool,
319 #[prost(bool, tag = "6")]
320 pub is_source_job: bool,
321}
322#[derive(prost_helpers::AnyPB)]
323#[derive(Clone, PartialEq, ::prost::Message)]
324pub struct GetEventStreamResponse {
325 #[prost(uint64, tag = "1")]
326 pub source_id: u64,
327 #[prost(message, repeated, tag = "2")]
328 pub events: ::prost::alloc::vec::Vec<CdcMessage>,
329 #[prost(message, optional, tag = "3")]
330 pub control: ::core::option::Option<get_event_stream_response::ControlInfo>,
331}
332pub mod get_event_stream_response {
334 #[derive(prost_helpers::AnyPB)]
335 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
336 pub struct ControlInfo {
337 #[prost(bool, tag = "1")]
338 pub handshake_ok: bool,
339 }
340}
341#[derive(prost_helpers::AnyPB)]
342#[derive(Clone, PartialEq, ::prost::Message)]
343pub struct ValidateSourceRequest {
344 #[prost(uint64, tag = "1")]
345 pub source_id: u64,
346 #[prost(enumeration = "SourceType", tag = "2")]
347 pub source_type: i32,
348 #[prost(btree_map = "string, string", tag = "3")]
349 pub properties: ::prost::alloc::collections::BTreeMap<
350 ::prost::alloc::string::String,
351 ::prost::alloc::string::String,
352 >,
353 #[prost(message, optional, tag = "4")]
354 pub table_schema: ::core::option::Option<TableSchema>,
355 #[prost(bool, tag = "5")]
356 pub is_source_job: bool,
357 #[prost(bool, tag = "6")]
358 pub is_backfill_table: bool,
359}
360#[derive(prost_helpers::AnyPB)]
361#[derive(Clone, PartialEq, ::prost::Message)]
362pub struct ValidateSourceResponse {
363 #[prost(message, optional, tag = "1")]
365 pub error: ::core::option::Option<ValidationError>,
366}
367#[derive(prost_helpers::AnyPB)]
368#[derive(Clone, PartialEq, ::prost::Message)]
369pub struct CoordinateRequest {
370 #[prost(oneof = "coordinate_request::Msg", tags = "1, 2, 3, 4, 5")]
371 pub msg: ::core::option::Option<coordinate_request::Msg>,
372}
373pub mod coordinate_request {
375 #[derive(prost_helpers::AnyPB)]
378 #[derive(Clone, PartialEq, ::prost::Message)]
379 pub struct StartCoordinationRequest {
380 #[prost(message, optional, tag = "1")]
381 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
382 #[prost(message, optional, tag = "2")]
383 pub param: ::core::option::Option<super::SinkParam>,
384 }
385 #[derive(prost_helpers::AnyPB)]
386 #[derive(Clone, PartialEq, ::prost::Message)]
387 pub struct CommitRequest {
388 #[prost(uint64, tag = "1")]
389 pub epoch: u64,
390 #[prost(message, optional, tag = "2")]
391 pub metadata: ::core::option::Option<super::SinkMetadata>,
392 #[prost(message, optional, tag = "3")]
393 pub add_columns: ::core::option::Option<
394 super::super::stream_plan::SinkAddColumns,
395 >,
396 }
397 #[derive(prost_helpers::AnyPB)]
398 #[derive(Clone, PartialEq, ::prost::Message)]
399 pub struct UpdateVnodeBitmapRequest {
400 #[prost(message, optional, tag = "1")]
401 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
402 }
403 #[derive(prost_helpers::AnyPB)]
404 #[derive(Clone, PartialEq, ::prost::Oneof)]
405 pub enum Msg {
406 #[prost(message, tag = "1")]
407 StartRequest(StartCoordinationRequest),
408 #[prost(message, tag = "2")]
409 CommitRequest(CommitRequest),
410 #[prost(message, tag = "3")]
411 UpdateVnodeRequest(UpdateVnodeBitmapRequest),
412 #[prost(bool, tag = "4")]
413 Stop(bool),
414 #[prost(uint64, tag = "5")]
415 AlignInitialEpochRequest(u64),
416 }
417}
418#[derive(prost_helpers::AnyPB)]
419#[derive(Clone, Copy, PartialEq, ::prost::Message)]
420pub struct CoordinateResponse {
421 #[prost(oneof = "coordinate_response::Msg", tags = "1, 2, 3, 4")]
422 pub msg: ::core::option::Option<coordinate_response::Msg>,
423}
424pub mod coordinate_response {
426 #[derive(prost_helpers::AnyPB)]
427 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
428 pub struct StartCoordinationResponse {
429 #[prost(uint64, optional, tag = "1")]
430 pub log_store_rewind_start_epoch: ::core::option::Option<u64>,
431 }
432 #[derive(prost_helpers::AnyPB)]
433 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
434 pub struct CommitResponse {
435 #[prost(uint64, tag = "1")]
436 pub epoch: u64,
437 }
438 #[derive(prost_helpers::AnyPB)]
439 #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
440 pub enum Msg {
441 #[prost(message, tag = "1")]
442 StartResponse(StartCoordinationResponse),
443 #[prost(message, tag = "2")]
444 CommitResponse(CommitResponse),
445 #[prost(bool, tag = "3")]
446 Stopped(bool),
447 #[prost(uint64, tag = "4")]
448 AlignInitialEpochResponse(u64),
449 }
450}
451#[derive(prost_helpers::AnyPB)]
452#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
453#[repr(i32)]
454pub enum SourceType {
455 Unspecified = 0,
456 Mysql = 1,
457 Postgres = 2,
458 Citus = 3,
459 Mongodb = 4,
460 SqlServer = 5,
461}
462impl SourceType {
463 pub fn as_str_name(&self) -> &'static str {
468 match self {
469 Self::Unspecified => "UNSPECIFIED",
470 Self::Mysql => "MYSQL",
471 Self::Postgres => "POSTGRES",
472 Self::Citus => "CITUS",
473 Self::Mongodb => "MONGODB",
474 Self::SqlServer => "SQL_SERVER",
475 }
476 }
477 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
479 match value {
480 "UNSPECIFIED" => Some(Self::Unspecified),
481 "MYSQL" => Some(Self::Mysql),
482 "POSTGRES" => Some(Self::Postgres),
483 "CITUS" => Some(Self::Citus),
484 "MONGODB" => Some(Self::Mongodb),
485 "SQL_SERVER" => Some(Self::SqlServer),
486 _ => None,
487 }
488 }
489}
490pub mod connector_service_client {
492 #![allow(
493 unused_variables,
494 dead_code,
495 missing_docs,
496 clippy::wildcard_imports,
497 clippy::let_unit_value,
498 )]
499 use tonic::codegen::*;
500 use tonic::codegen::http::Uri;
501 #[derive(Debug, Clone)]
502 pub struct ConnectorServiceClient<T> {
503 inner: tonic::client::Grpc<T>,
504 }
505 impl ConnectorServiceClient<tonic::transport::Channel> {
506 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
508 where
509 D: TryInto<tonic::transport::Endpoint>,
510 D::Error: Into<StdError>,
511 {
512 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
513 Ok(Self::new(conn))
514 }
515 }
516 impl<T> ConnectorServiceClient<T>
517 where
518 T: tonic::client::GrpcService<tonic::body::BoxBody>,
519 T::Error: Into<StdError>,
520 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
521 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
522 {
523 pub fn new(inner: T) -> Self {
524 let inner = tonic::client::Grpc::new(inner);
525 Self { inner }
526 }
527 pub fn with_origin(inner: T, origin: Uri) -> Self {
528 let inner = tonic::client::Grpc::with_origin(inner, origin);
529 Self { inner }
530 }
531 pub fn with_interceptor<F>(
532 inner: T,
533 interceptor: F,
534 ) -> ConnectorServiceClient<InterceptedService<T, F>>
535 where
536 F: tonic::service::Interceptor,
537 T::ResponseBody: Default,
538 T: tonic::codegen::Service<
539 http::Request<tonic::body::BoxBody>,
540 Response = http::Response<
541 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
542 >,
543 >,
544 <T as tonic::codegen::Service<
545 http::Request<tonic::body::BoxBody>,
546 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
547 {
548 ConnectorServiceClient::new(InterceptedService::new(inner, interceptor))
549 }
550 #[must_use]
555 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
556 self.inner = self.inner.send_compressed(encoding);
557 self
558 }
559 #[must_use]
561 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
562 self.inner = self.inner.accept_compressed(encoding);
563 self
564 }
565 #[must_use]
569 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
570 self.inner = self.inner.max_decoding_message_size(limit);
571 self
572 }
573 #[must_use]
577 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
578 self.inner = self.inner.max_encoding_message_size(limit);
579 self
580 }
581 pub async fn sink_writer_stream(
582 &mut self,
583 request: impl tonic::IntoStreamingRequest<
584 Message = super::SinkWriterStreamRequest,
585 >,
586 ) -> std::result::Result<
587 tonic::Response<tonic::codec::Streaming<super::SinkWriterStreamResponse>>,
588 tonic::Status,
589 > {
590 self.inner
591 .ready()
592 .await
593 .map_err(|e| {
594 tonic::Status::unknown(
595 format!("Service was not ready: {}", e.into()),
596 )
597 })?;
598 let codec = tonic::codec::ProstCodec::default();
599 let path = http::uri::PathAndQuery::from_static(
600 "/connector_service.ConnectorService/SinkWriterStream",
601 );
602 let mut req = request.into_streaming_request();
603 req.extensions_mut()
604 .insert(
605 GrpcMethod::new(
606 "connector_service.ConnectorService",
607 "SinkWriterStream",
608 ),
609 );
610 self.inner.streaming(req, path, codec).await
611 }
612 pub async fn sink_coordinator_stream(
613 &mut self,
614 request: impl tonic::IntoStreamingRequest<
615 Message = super::SinkCoordinatorStreamRequest,
616 >,
617 ) -> std::result::Result<
618 tonic::Response<
619 tonic::codec::Streaming<super::SinkCoordinatorStreamResponse>,
620 >,
621 tonic::Status,
622 > {
623 self.inner
624 .ready()
625 .await
626 .map_err(|e| {
627 tonic::Status::unknown(
628 format!("Service was not ready: {}", e.into()),
629 )
630 })?;
631 let codec = tonic::codec::ProstCodec::default();
632 let path = http::uri::PathAndQuery::from_static(
633 "/connector_service.ConnectorService/SinkCoordinatorStream",
634 );
635 let mut req = request.into_streaming_request();
636 req.extensions_mut()
637 .insert(
638 GrpcMethod::new(
639 "connector_service.ConnectorService",
640 "SinkCoordinatorStream",
641 ),
642 );
643 self.inner.streaming(req, path, codec).await
644 }
645 pub async fn validate_sink(
646 &mut self,
647 request: impl tonic::IntoRequest<super::ValidateSinkRequest>,
648 ) -> std::result::Result<
649 tonic::Response<super::ValidateSinkResponse>,
650 tonic::Status,
651 > {
652 self.inner
653 .ready()
654 .await
655 .map_err(|e| {
656 tonic::Status::unknown(
657 format!("Service was not ready: {}", e.into()),
658 )
659 })?;
660 let codec = tonic::codec::ProstCodec::default();
661 let path = http::uri::PathAndQuery::from_static(
662 "/connector_service.ConnectorService/ValidateSink",
663 );
664 let mut req = request.into_request();
665 req.extensions_mut()
666 .insert(
667 GrpcMethod::new("connector_service.ConnectorService", "ValidateSink"),
668 );
669 self.inner.unary(req, path, codec).await
670 }
671 pub async fn get_event_stream(
672 &mut self,
673 request: impl tonic::IntoRequest<super::GetEventStreamRequest>,
674 ) -> std::result::Result<
675 tonic::Response<tonic::codec::Streaming<super::GetEventStreamResponse>>,
676 tonic::Status,
677 > {
678 self.inner
679 .ready()
680 .await
681 .map_err(|e| {
682 tonic::Status::unknown(
683 format!("Service was not ready: {}", e.into()),
684 )
685 })?;
686 let codec = tonic::codec::ProstCodec::default();
687 let path = http::uri::PathAndQuery::from_static(
688 "/connector_service.ConnectorService/GetEventStream",
689 );
690 let mut req = request.into_request();
691 req.extensions_mut()
692 .insert(
693 GrpcMethod::new(
694 "connector_service.ConnectorService",
695 "GetEventStream",
696 ),
697 );
698 self.inner.server_streaming(req, path, codec).await
699 }
700 pub async fn validate_source(
701 &mut self,
702 request: impl tonic::IntoRequest<super::ValidateSourceRequest>,
703 ) -> std::result::Result<
704 tonic::Response<super::ValidateSourceResponse>,
705 tonic::Status,
706 > {
707 self.inner
708 .ready()
709 .await
710 .map_err(|e| {
711 tonic::Status::unknown(
712 format!("Service was not ready: {}", e.into()),
713 )
714 })?;
715 let codec = tonic::codec::ProstCodec::default();
716 let path = http::uri::PathAndQuery::from_static(
717 "/connector_service.ConnectorService/ValidateSource",
718 );
719 let mut req = request.into_request();
720 req.extensions_mut()
721 .insert(
722 GrpcMethod::new(
723 "connector_service.ConnectorService",
724 "ValidateSource",
725 ),
726 );
727 self.inner.unary(req, path, codec).await
728 }
729 }
730}
731pub mod sink_coordination_service_client {
733 #![allow(
734 unused_variables,
735 dead_code,
736 missing_docs,
737 clippy::wildcard_imports,
738 clippy::let_unit_value,
739 )]
740 use tonic::codegen::*;
741 use tonic::codegen::http::Uri;
742 #[derive(Debug, Clone)]
743 pub struct SinkCoordinationServiceClient<T> {
744 inner: tonic::client::Grpc<T>,
745 }
746 impl SinkCoordinationServiceClient<tonic::transport::Channel> {
747 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
749 where
750 D: TryInto<tonic::transport::Endpoint>,
751 D::Error: Into<StdError>,
752 {
753 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
754 Ok(Self::new(conn))
755 }
756 }
757 impl<T> SinkCoordinationServiceClient<T>
758 where
759 T: tonic::client::GrpcService<tonic::body::BoxBody>,
760 T::Error: Into<StdError>,
761 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
762 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
763 {
764 pub fn new(inner: T) -> Self {
765 let inner = tonic::client::Grpc::new(inner);
766 Self { inner }
767 }
768 pub fn with_origin(inner: T, origin: Uri) -> Self {
769 let inner = tonic::client::Grpc::with_origin(inner, origin);
770 Self { inner }
771 }
772 pub fn with_interceptor<F>(
773 inner: T,
774 interceptor: F,
775 ) -> SinkCoordinationServiceClient<InterceptedService<T, F>>
776 where
777 F: tonic::service::Interceptor,
778 T::ResponseBody: Default,
779 T: tonic::codegen::Service<
780 http::Request<tonic::body::BoxBody>,
781 Response = http::Response<
782 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
783 >,
784 >,
785 <T as tonic::codegen::Service<
786 http::Request<tonic::body::BoxBody>,
787 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
788 {
789 SinkCoordinationServiceClient::new(
790 InterceptedService::new(inner, interceptor),
791 )
792 }
793 #[must_use]
798 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
799 self.inner = self.inner.send_compressed(encoding);
800 self
801 }
802 #[must_use]
804 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
805 self.inner = self.inner.accept_compressed(encoding);
806 self
807 }
808 #[must_use]
812 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
813 self.inner = self.inner.max_decoding_message_size(limit);
814 self
815 }
816 #[must_use]
820 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
821 self.inner = self.inner.max_encoding_message_size(limit);
822 self
823 }
824 pub async fn coordinate(
825 &mut self,
826 request: impl tonic::IntoStreamingRequest<Message = super::CoordinateRequest>,
827 ) -> std::result::Result<
828 tonic::Response<tonic::codec::Streaming<super::CoordinateResponse>>,
829 tonic::Status,
830 > {
831 self.inner
832 .ready()
833 .await
834 .map_err(|e| {
835 tonic::Status::unknown(
836 format!("Service was not ready: {}", e.into()),
837 )
838 })?;
839 let codec = tonic::codec::ProstCodec::default();
840 let path = http::uri::PathAndQuery::from_static(
841 "/connector_service.SinkCoordinationService/Coordinate",
842 );
843 let mut req = request.into_streaming_request();
844 req.extensions_mut()
845 .insert(
846 GrpcMethod::new(
847 "connector_service.SinkCoordinationService",
848 "Coordinate",
849 ),
850 );
851 self.inner.streaming(req, path, codec).await
852 }
853 }
854}
855pub mod connector_service_server {
857 #![allow(
858 unused_variables,
859 dead_code,
860 missing_docs,
861 clippy::wildcard_imports,
862 clippy::let_unit_value,
863 )]
864 use tonic::codegen::*;
865 #[async_trait]
867 pub trait ConnectorService: std::marker::Send + std::marker::Sync + 'static {
868 type SinkWriterStreamStream: tonic::codegen::tokio_stream::Stream<
870 Item = std::result::Result<
871 super::SinkWriterStreamResponse,
872 tonic::Status,
873 >,
874 >
875 + std::marker::Send
876 + 'static;
877 async fn sink_writer_stream(
878 &self,
879 request: tonic::Request<tonic::Streaming<super::SinkWriterStreamRequest>>,
880 ) -> std::result::Result<
881 tonic::Response<Self::SinkWriterStreamStream>,
882 tonic::Status,
883 >;
884 type SinkCoordinatorStreamStream: tonic::codegen::tokio_stream::Stream<
886 Item = std::result::Result<
887 super::SinkCoordinatorStreamResponse,
888 tonic::Status,
889 >,
890 >
891 + std::marker::Send
892 + 'static;
893 async fn sink_coordinator_stream(
894 &self,
895 request: tonic::Request<
896 tonic::Streaming<super::SinkCoordinatorStreamRequest>,
897 >,
898 ) -> std::result::Result<
899 tonic::Response<Self::SinkCoordinatorStreamStream>,
900 tonic::Status,
901 >;
902 async fn validate_sink(
903 &self,
904 request: tonic::Request<super::ValidateSinkRequest>,
905 ) -> std::result::Result<
906 tonic::Response<super::ValidateSinkResponse>,
907 tonic::Status,
908 >;
909 type GetEventStreamStream: tonic::codegen::tokio_stream::Stream<
911 Item = std::result::Result<super::GetEventStreamResponse, tonic::Status>,
912 >
913 + std::marker::Send
914 + 'static;
915 async fn get_event_stream(
916 &self,
917 request: tonic::Request<super::GetEventStreamRequest>,
918 ) -> std::result::Result<
919 tonic::Response<Self::GetEventStreamStream>,
920 tonic::Status,
921 >;
922 async fn validate_source(
923 &self,
924 request: tonic::Request<super::ValidateSourceRequest>,
925 ) -> std::result::Result<
926 tonic::Response<super::ValidateSourceResponse>,
927 tonic::Status,
928 >;
929 }
930 #[derive(Debug)]
931 pub struct ConnectorServiceServer<T> {
932 inner: Arc<T>,
933 accept_compression_encodings: EnabledCompressionEncodings,
934 send_compression_encodings: EnabledCompressionEncodings,
935 max_decoding_message_size: Option<usize>,
936 max_encoding_message_size: Option<usize>,
937 }
938 impl<T> ConnectorServiceServer<T> {
939 pub fn new(inner: T) -> Self {
940 Self::from_arc(Arc::new(inner))
941 }
942 pub fn from_arc(inner: Arc<T>) -> Self {
943 Self {
944 inner,
945 accept_compression_encodings: Default::default(),
946 send_compression_encodings: Default::default(),
947 max_decoding_message_size: None,
948 max_encoding_message_size: None,
949 }
950 }
951 pub fn with_interceptor<F>(
952 inner: T,
953 interceptor: F,
954 ) -> InterceptedService<Self, F>
955 where
956 F: tonic::service::Interceptor,
957 {
958 InterceptedService::new(Self::new(inner), interceptor)
959 }
960 #[must_use]
962 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
963 self.accept_compression_encodings.enable(encoding);
964 self
965 }
966 #[must_use]
968 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
969 self.send_compression_encodings.enable(encoding);
970 self
971 }
972 #[must_use]
976 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
977 self.max_decoding_message_size = Some(limit);
978 self
979 }
980 #[must_use]
984 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
985 self.max_encoding_message_size = Some(limit);
986 self
987 }
988 }
989 impl<T, B> tonic::codegen::Service<http::Request<B>> for ConnectorServiceServer<T>
990 where
991 T: ConnectorService,
992 B: Body + std::marker::Send + 'static,
993 B::Error: Into<StdError> + std::marker::Send + 'static,
994 {
995 type Response = http::Response<tonic::body::BoxBody>;
996 type Error = std::convert::Infallible;
997 type Future = BoxFuture<Self::Response, Self::Error>;
998 fn poll_ready(
999 &mut self,
1000 _cx: &mut Context<'_>,
1001 ) -> Poll<std::result::Result<(), Self::Error>> {
1002 Poll::Ready(Ok(()))
1003 }
1004 fn call(&mut self, req: http::Request<B>) -> Self::Future {
1005 match req.uri().path() {
1006 "/connector_service.ConnectorService/SinkWriterStream" => {
1007 #[allow(non_camel_case_types)]
1008 struct SinkWriterStreamSvc<T: ConnectorService>(pub Arc<T>);
1009 impl<
1010 T: ConnectorService,
1011 > tonic::server::StreamingService<super::SinkWriterStreamRequest>
1012 for SinkWriterStreamSvc<T> {
1013 type Response = super::SinkWriterStreamResponse;
1014 type ResponseStream = T::SinkWriterStreamStream;
1015 type Future = BoxFuture<
1016 tonic::Response<Self::ResponseStream>,
1017 tonic::Status,
1018 >;
1019 fn call(
1020 &mut self,
1021 request: tonic::Request<
1022 tonic::Streaming<super::SinkWriterStreamRequest>,
1023 >,
1024 ) -> Self::Future {
1025 let inner = Arc::clone(&self.0);
1026 let fut = async move {
1027 <T as ConnectorService>::sink_writer_stream(&inner, request)
1028 .await
1029 };
1030 Box::pin(fut)
1031 }
1032 }
1033 let accept_compression_encodings = self.accept_compression_encodings;
1034 let send_compression_encodings = self.send_compression_encodings;
1035 let max_decoding_message_size = self.max_decoding_message_size;
1036 let max_encoding_message_size = self.max_encoding_message_size;
1037 let inner = self.inner.clone();
1038 let fut = async move {
1039 let method = SinkWriterStreamSvc(inner);
1040 let codec = tonic::codec::ProstCodec::default();
1041 let mut grpc = tonic::server::Grpc::new(codec)
1042 .apply_compression_config(
1043 accept_compression_encodings,
1044 send_compression_encodings,
1045 )
1046 .apply_max_message_size_config(
1047 max_decoding_message_size,
1048 max_encoding_message_size,
1049 );
1050 let res = grpc.streaming(method, req).await;
1051 Ok(res)
1052 };
1053 Box::pin(fut)
1054 }
1055 "/connector_service.ConnectorService/SinkCoordinatorStream" => {
1056 #[allow(non_camel_case_types)]
1057 struct SinkCoordinatorStreamSvc<T: ConnectorService>(pub Arc<T>);
1058 impl<
1059 T: ConnectorService,
1060 > tonic::server::StreamingService<
1061 super::SinkCoordinatorStreamRequest,
1062 > for SinkCoordinatorStreamSvc<T> {
1063 type Response = super::SinkCoordinatorStreamResponse;
1064 type ResponseStream = T::SinkCoordinatorStreamStream;
1065 type Future = BoxFuture<
1066 tonic::Response<Self::ResponseStream>,
1067 tonic::Status,
1068 >;
1069 fn call(
1070 &mut self,
1071 request: tonic::Request<
1072 tonic::Streaming<super::SinkCoordinatorStreamRequest>,
1073 >,
1074 ) -> Self::Future {
1075 let inner = Arc::clone(&self.0);
1076 let fut = async move {
1077 <T as ConnectorService>::sink_coordinator_stream(
1078 &inner,
1079 request,
1080 )
1081 .await
1082 };
1083 Box::pin(fut)
1084 }
1085 }
1086 let accept_compression_encodings = self.accept_compression_encodings;
1087 let send_compression_encodings = self.send_compression_encodings;
1088 let max_decoding_message_size = self.max_decoding_message_size;
1089 let max_encoding_message_size = self.max_encoding_message_size;
1090 let inner = self.inner.clone();
1091 let fut = async move {
1092 let method = SinkCoordinatorStreamSvc(inner);
1093 let codec = tonic::codec::ProstCodec::default();
1094 let mut grpc = tonic::server::Grpc::new(codec)
1095 .apply_compression_config(
1096 accept_compression_encodings,
1097 send_compression_encodings,
1098 )
1099 .apply_max_message_size_config(
1100 max_decoding_message_size,
1101 max_encoding_message_size,
1102 );
1103 let res = grpc.streaming(method, req).await;
1104 Ok(res)
1105 };
1106 Box::pin(fut)
1107 }
1108 "/connector_service.ConnectorService/ValidateSink" => {
1109 #[allow(non_camel_case_types)]
1110 struct ValidateSinkSvc<T: ConnectorService>(pub Arc<T>);
1111 impl<
1112 T: ConnectorService,
1113 > tonic::server::UnaryService<super::ValidateSinkRequest>
1114 for ValidateSinkSvc<T> {
1115 type Response = super::ValidateSinkResponse;
1116 type Future = BoxFuture<
1117 tonic::Response<Self::Response>,
1118 tonic::Status,
1119 >;
1120 fn call(
1121 &mut self,
1122 request: tonic::Request<super::ValidateSinkRequest>,
1123 ) -> Self::Future {
1124 let inner = Arc::clone(&self.0);
1125 let fut = async move {
1126 <T as ConnectorService>::validate_sink(&inner, request)
1127 .await
1128 };
1129 Box::pin(fut)
1130 }
1131 }
1132 let accept_compression_encodings = self.accept_compression_encodings;
1133 let send_compression_encodings = self.send_compression_encodings;
1134 let max_decoding_message_size = self.max_decoding_message_size;
1135 let max_encoding_message_size = self.max_encoding_message_size;
1136 let inner = self.inner.clone();
1137 let fut = async move {
1138 let method = ValidateSinkSvc(inner);
1139 let codec = tonic::codec::ProstCodec::default();
1140 let mut grpc = tonic::server::Grpc::new(codec)
1141 .apply_compression_config(
1142 accept_compression_encodings,
1143 send_compression_encodings,
1144 )
1145 .apply_max_message_size_config(
1146 max_decoding_message_size,
1147 max_encoding_message_size,
1148 );
1149 let res = grpc.unary(method, req).await;
1150 Ok(res)
1151 };
1152 Box::pin(fut)
1153 }
1154 "/connector_service.ConnectorService/GetEventStream" => {
1155 #[allow(non_camel_case_types)]
1156 struct GetEventStreamSvc<T: ConnectorService>(pub Arc<T>);
1157 impl<
1158 T: ConnectorService,
1159 > tonic::server::ServerStreamingService<super::GetEventStreamRequest>
1160 for GetEventStreamSvc<T> {
1161 type Response = super::GetEventStreamResponse;
1162 type ResponseStream = T::GetEventStreamStream;
1163 type Future = BoxFuture<
1164 tonic::Response<Self::ResponseStream>,
1165 tonic::Status,
1166 >;
1167 fn call(
1168 &mut self,
1169 request: tonic::Request<super::GetEventStreamRequest>,
1170 ) -> Self::Future {
1171 let inner = Arc::clone(&self.0);
1172 let fut = async move {
1173 <T as ConnectorService>::get_event_stream(&inner, request)
1174 .await
1175 };
1176 Box::pin(fut)
1177 }
1178 }
1179 let accept_compression_encodings = self.accept_compression_encodings;
1180 let send_compression_encodings = self.send_compression_encodings;
1181 let max_decoding_message_size = self.max_decoding_message_size;
1182 let max_encoding_message_size = self.max_encoding_message_size;
1183 let inner = self.inner.clone();
1184 let fut = async move {
1185 let method = GetEventStreamSvc(inner);
1186 let codec = tonic::codec::ProstCodec::default();
1187 let mut grpc = tonic::server::Grpc::new(codec)
1188 .apply_compression_config(
1189 accept_compression_encodings,
1190 send_compression_encodings,
1191 )
1192 .apply_max_message_size_config(
1193 max_decoding_message_size,
1194 max_encoding_message_size,
1195 );
1196 let res = grpc.server_streaming(method, req).await;
1197 Ok(res)
1198 };
1199 Box::pin(fut)
1200 }
1201 "/connector_service.ConnectorService/ValidateSource" => {
1202 #[allow(non_camel_case_types)]
1203 struct ValidateSourceSvc<T: ConnectorService>(pub Arc<T>);
1204 impl<
1205 T: ConnectorService,
1206 > tonic::server::UnaryService<super::ValidateSourceRequest>
1207 for ValidateSourceSvc<T> {
1208 type Response = super::ValidateSourceResponse;
1209 type Future = BoxFuture<
1210 tonic::Response<Self::Response>,
1211 tonic::Status,
1212 >;
1213 fn call(
1214 &mut self,
1215 request: tonic::Request<super::ValidateSourceRequest>,
1216 ) -> Self::Future {
1217 let inner = Arc::clone(&self.0);
1218 let fut = async move {
1219 <T as ConnectorService>::validate_source(&inner, request)
1220 .await
1221 };
1222 Box::pin(fut)
1223 }
1224 }
1225 let accept_compression_encodings = self.accept_compression_encodings;
1226 let send_compression_encodings = self.send_compression_encodings;
1227 let max_decoding_message_size = self.max_decoding_message_size;
1228 let max_encoding_message_size = self.max_encoding_message_size;
1229 let inner = self.inner.clone();
1230 let fut = async move {
1231 let method = ValidateSourceSvc(inner);
1232 let codec = tonic::codec::ProstCodec::default();
1233 let mut grpc = tonic::server::Grpc::new(codec)
1234 .apply_compression_config(
1235 accept_compression_encodings,
1236 send_compression_encodings,
1237 )
1238 .apply_max_message_size_config(
1239 max_decoding_message_size,
1240 max_encoding_message_size,
1241 );
1242 let res = grpc.unary(method, req).await;
1243 Ok(res)
1244 };
1245 Box::pin(fut)
1246 }
1247 _ => {
1248 Box::pin(async move {
1249 let mut response = http::Response::new(empty_body());
1250 let headers = response.headers_mut();
1251 headers
1252 .insert(
1253 tonic::Status::GRPC_STATUS,
1254 (tonic::Code::Unimplemented as i32).into(),
1255 );
1256 headers
1257 .insert(
1258 http::header::CONTENT_TYPE,
1259 tonic::metadata::GRPC_CONTENT_TYPE,
1260 );
1261 Ok(response)
1262 })
1263 }
1264 }
1265 }
1266 }
1267 impl<T> Clone for ConnectorServiceServer<T> {
1268 fn clone(&self) -> Self {
1269 let inner = self.inner.clone();
1270 Self {
1271 inner,
1272 accept_compression_encodings: self.accept_compression_encodings,
1273 send_compression_encodings: self.send_compression_encodings,
1274 max_decoding_message_size: self.max_decoding_message_size,
1275 max_encoding_message_size: self.max_encoding_message_size,
1276 }
1277 }
1278 }
1279 pub const SERVICE_NAME: &str = "connector_service.ConnectorService";
1281 impl<T> tonic::server::NamedService for ConnectorServiceServer<T> {
1282 const NAME: &'static str = SERVICE_NAME;
1283 }
1284}
1285pub mod sink_coordination_service_server {
1287 #![allow(
1288 unused_variables,
1289 dead_code,
1290 missing_docs,
1291 clippy::wildcard_imports,
1292 clippy::let_unit_value,
1293 )]
1294 use tonic::codegen::*;
1295 #[async_trait]
1297 pub trait SinkCoordinationService: std::marker::Send + std::marker::Sync + 'static {
1298 type CoordinateStream: tonic::codegen::tokio_stream::Stream<
1300 Item = std::result::Result<super::CoordinateResponse, tonic::Status>,
1301 >
1302 + std::marker::Send
1303 + 'static;
1304 async fn coordinate(
1305 &self,
1306 request: tonic::Request<tonic::Streaming<super::CoordinateRequest>>,
1307 ) -> std::result::Result<tonic::Response<Self::CoordinateStream>, tonic::Status>;
1308 }
1309 #[derive(Debug)]
1310 pub struct SinkCoordinationServiceServer<T> {
1311 inner: Arc<T>,
1312 accept_compression_encodings: EnabledCompressionEncodings,
1313 send_compression_encodings: EnabledCompressionEncodings,
1314 max_decoding_message_size: Option<usize>,
1315 max_encoding_message_size: Option<usize>,
1316 }
1317 impl<T> SinkCoordinationServiceServer<T> {
1318 pub fn new(inner: T) -> Self {
1319 Self::from_arc(Arc::new(inner))
1320 }
1321 pub fn from_arc(inner: Arc<T>) -> Self {
1322 Self {
1323 inner,
1324 accept_compression_encodings: Default::default(),
1325 send_compression_encodings: Default::default(),
1326 max_decoding_message_size: None,
1327 max_encoding_message_size: None,
1328 }
1329 }
1330 pub fn with_interceptor<F>(
1331 inner: T,
1332 interceptor: F,
1333 ) -> InterceptedService<Self, F>
1334 where
1335 F: tonic::service::Interceptor,
1336 {
1337 InterceptedService::new(Self::new(inner), interceptor)
1338 }
1339 #[must_use]
1341 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
1342 self.accept_compression_encodings.enable(encoding);
1343 self
1344 }
1345 #[must_use]
1347 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
1348 self.send_compression_encodings.enable(encoding);
1349 self
1350 }
1351 #[must_use]
1355 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
1356 self.max_decoding_message_size = Some(limit);
1357 self
1358 }
1359 #[must_use]
1363 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
1364 self.max_encoding_message_size = Some(limit);
1365 self
1366 }
1367 }
1368 impl<T, B> tonic::codegen::Service<http::Request<B>>
1369 for SinkCoordinationServiceServer<T>
1370 where
1371 T: SinkCoordinationService,
1372 B: Body + std::marker::Send + 'static,
1373 B::Error: Into<StdError> + std::marker::Send + 'static,
1374 {
1375 type Response = http::Response<tonic::body::BoxBody>;
1376 type Error = std::convert::Infallible;
1377 type Future = BoxFuture<Self::Response, Self::Error>;
1378 fn poll_ready(
1379 &mut self,
1380 _cx: &mut Context<'_>,
1381 ) -> Poll<std::result::Result<(), Self::Error>> {
1382 Poll::Ready(Ok(()))
1383 }
1384 fn call(&mut self, req: http::Request<B>) -> Self::Future {
1385 match req.uri().path() {
1386 "/connector_service.SinkCoordinationService/Coordinate" => {
1387 #[allow(non_camel_case_types)]
1388 struct CoordinateSvc<T: SinkCoordinationService>(pub Arc<T>);
1389 impl<
1390 T: SinkCoordinationService,
1391 > tonic::server::StreamingService<super::CoordinateRequest>
1392 for CoordinateSvc<T> {
1393 type Response = super::CoordinateResponse;
1394 type ResponseStream = T::CoordinateStream;
1395 type Future = BoxFuture<
1396 tonic::Response<Self::ResponseStream>,
1397 tonic::Status,
1398 >;
1399 fn call(
1400 &mut self,
1401 request: tonic::Request<
1402 tonic::Streaming<super::CoordinateRequest>,
1403 >,
1404 ) -> Self::Future {
1405 let inner = Arc::clone(&self.0);
1406 let fut = async move {
1407 <T as SinkCoordinationService>::coordinate(&inner, request)
1408 .await
1409 };
1410 Box::pin(fut)
1411 }
1412 }
1413 let accept_compression_encodings = self.accept_compression_encodings;
1414 let send_compression_encodings = self.send_compression_encodings;
1415 let max_decoding_message_size = self.max_decoding_message_size;
1416 let max_encoding_message_size = self.max_encoding_message_size;
1417 let inner = self.inner.clone();
1418 let fut = async move {
1419 let method = CoordinateSvc(inner);
1420 let codec = tonic::codec::ProstCodec::default();
1421 let mut grpc = tonic::server::Grpc::new(codec)
1422 .apply_compression_config(
1423 accept_compression_encodings,
1424 send_compression_encodings,
1425 )
1426 .apply_max_message_size_config(
1427 max_decoding_message_size,
1428 max_encoding_message_size,
1429 );
1430 let res = grpc.streaming(method, req).await;
1431 Ok(res)
1432 };
1433 Box::pin(fut)
1434 }
1435 _ => {
1436 Box::pin(async move {
1437 let mut response = http::Response::new(empty_body());
1438 let headers = response.headers_mut();
1439 headers
1440 .insert(
1441 tonic::Status::GRPC_STATUS,
1442 (tonic::Code::Unimplemented as i32).into(),
1443 );
1444 headers
1445 .insert(
1446 http::header::CONTENT_TYPE,
1447 tonic::metadata::GRPC_CONTENT_TYPE,
1448 );
1449 Ok(response)
1450 })
1451 }
1452 }
1453 }
1454 }
1455 impl<T> Clone for SinkCoordinationServiceServer<T> {
1456 fn clone(&self) -> Self {
1457 let inner = self.inner.clone();
1458 Self {
1459 inner,
1460 accept_compression_encodings: self.accept_compression_encodings,
1461 send_compression_encodings: self.send_compression_encodings,
1462 max_decoding_message_size: self.max_decoding_message_size,
1463 max_encoding_message_size: self.max_encoding_message_size,
1464 }
1465 }
1466 }
1467 pub const SERVICE_NAME: &str = "connector_service.SinkCoordinationService";
1469 impl<T> tonic::server::NamedService for SinkCoordinationServiceServer<T> {
1470 const NAME: &'static str = SERVICE_NAME;
1471 }
1472}