1#[derive(prost_helpers::AnyPB)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct TableSchema {
5 #[prost(message, repeated, tag = "1")]
6 pub columns: ::prost::alloc::vec::Vec<super::plan_common::ColumnDesc>,
7 #[prost(uint32, repeated, tag = "2")]
8 pub pk_indices: ::prost::alloc::vec::Vec<u32>,
9}
10#[derive(prost_helpers::AnyPB)]
11#[derive(Clone, PartialEq, ::prost::Message)]
12pub struct ValidationError {
13 #[prost(string, tag = "1")]
14 pub error_message: ::prost::alloc::string::String,
15}
16#[derive(prost_helpers::AnyPB)]
17#[derive(Clone, PartialEq, ::prost::Message)]
18pub struct SinkParam {
19 #[prost(uint32, tag = "1", wrapper = "crate::id::SinkId")]
20 pub sink_id: crate::id::SinkId,
21 #[prost(btree_map = "string, string", tag = "2")]
22 pub properties: ::prost::alloc::collections::BTreeMap<
23 ::prost::alloc::string::String,
24 ::prost::alloc::string::String,
25 >,
26 #[prost(message, optional, tag = "3")]
27 pub table_schema: ::core::option::Option<TableSchema>,
28 #[prost(enumeration = "super::catalog::SinkType", tag = "4")]
30 pub sink_type: i32,
31 #[prost(string, tag = "5")]
32 pub db_name: ::prost::alloc::string::String,
33 #[prost(string, tag = "6")]
34 pub sink_from_name: ::prost::alloc::string::String,
35 #[prost(message, optional, tag = "7")]
36 pub format_desc: ::core::option::Option<super::catalog::SinkFormatDesc>,
37 #[prost(string, tag = "8")]
38 pub sink_name: ::prost::alloc::string::String,
39 #[prost(bool, tag = "9")]
43 pub raw_ignore_delete: bool,
44}
45#[derive(prost_helpers::AnyPB)]
46#[derive(Clone, PartialEq, ::prost::Message)]
47pub struct SinkWriterStreamRequest {
48 #[prost(oneof = "sink_writer_stream_request::Request", tags = "1, 3, 4")]
49 pub request: ::core::option::Option<sink_writer_stream_request::Request>,
50}
51pub mod sink_writer_stream_request {
53 #[derive(prost_helpers::AnyPB)]
54 #[derive(Clone, PartialEq, ::prost::Message)]
55 pub struct StartSink {
56 #[prost(message, optional, tag = "1")]
57 pub sink_param: ::core::option::Option<super::SinkParam>,
58 #[prost(message, optional, tag = "3")]
59 pub payload_schema: ::core::option::Option<super::TableSchema>,
60 }
61 #[derive(prost_helpers::AnyPB)]
62 #[derive(Clone, PartialEq, ::prost::Message)]
63 pub struct WriteBatch {
64 #[prost(uint64, tag = "3")]
65 pub batch_id: u64,
66 #[prost(uint64, tag = "4")]
67 pub epoch: u64,
68 #[prost(oneof = "write_batch::Payload", tags = "2, 5")]
69 pub payload: ::core::option::Option<write_batch::Payload>,
70 }
71 pub mod write_batch {
73 #[derive(prost_helpers::AnyPB)]
74 #[derive(Clone, PartialEq, ::prost::Message)]
75 pub struct StreamChunkPayload {
76 #[prost(bytes = "vec", tag = "1")]
77 pub binary_data: ::prost::alloc::vec::Vec<u8>,
78 }
79 #[derive(prost_helpers::AnyPB)]
80 #[derive(Clone, PartialEq, ::prost::Oneof)]
81 pub enum Payload {
82 #[prost(message, tag = "2")]
83 StreamChunkPayload(StreamChunkPayload),
84 #[prost(int64, tag = "5")]
88 StreamChunkRefPointer(i64),
89 }
90 }
91 #[derive(prost_helpers::AnyPB)]
92 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
93 pub struct Barrier {
94 #[prost(uint64, tag = "1")]
95 pub epoch: u64,
96 #[prost(bool, tag = "2")]
97 pub is_checkpoint: bool,
98 }
99 #[derive(prost_helpers::AnyPB)]
100 #[derive(Clone, PartialEq, ::prost::Oneof)]
101 pub enum Request {
102 #[prost(message, tag = "1")]
103 Start(StartSink),
104 #[prost(message, tag = "3")]
105 WriteBatch(WriteBatch),
106 #[prost(message, tag = "4")]
107 Barrier(Barrier),
108 }
109}
110#[derive(prost_helpers::AnyPB)]
111#[derive(Clone, PartialEq, ::prost::Message)]
112pub struct SinkWriterStreamResponse {
113 #[prost(oneof = "sink_writer_stream_response::Response", tags = "1, 2, 3")]
114 pub response: ::core::option::Option<sink_writer_stream_response::Response>,
115}
116pub mod sink_writer_stream_response {
118 #[derive(prost_helpers::AnyPB)]
119 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
120 pub struct StartResponse {}
121 #[derive(prost_helpers::AnyPB)]
122 #[derive(Clone, PartialEq, ::prost::Message)]
123 pub struct CommitResponse {
124 #[prost(uint64, tag = "1")]
125 pub epoch: u64,
126 #[prost(message, optional, tag = "2")]
127 pub metadata: ::core::option::Option<super::SinkMetadata>,
128 }
129 #[derive(prost_helpers::AnyPB)]
130 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
131 pub struct BatchWrittenResponse {
132 #[prost(uint64, tag = "1")]
133 pub epoch: u64,
134 #[prost(uint64, tag = "2")]
135 pub batch_id: u64,
136 }
137 #[derive(prost_helpers::AnyPB)]
138 #[derive(Clone, PartialEq, ::prost::Oneof)]
139 pub enum Response {
140 #[prost(message, tag = "1")]
141 Start(StartResponse),
142 #[prost(message, tag = "2")]
143 Commit(CommitResponse),
144 #[prost(message, tag = "3")]
145 Batch(BatchWrittenResponse),
146 }
147}
148#[derive(prost_helpers::AnyPB)]
149#[derive(Clone, PartialEq, ::prost::Message)]
150pub struct ValidateSinkRequest {
151 #[prost(message, optional, tag = "1")]
152 pub sink_param: ::core::option::Option<SinkParam>,
153}
154#[derive(prost_helpers::AnyPB)]
155#[derive(Clone, PartialEq, ::prost::Message)]
156pub struct ValidateSinkResponse {
157 #[prost(message, optional, tag = "1")]
159 pub error: ::core::option::Option<ValidationError>,
160}
161#[derive(prost_helpers::AnyPB)]
162#[derive(Clone, PartialEq, ::prost::Message)]
163pub struct SinkMetadata {
164 #[prost(oneof = "sink_metadata::Metadata", tags = "1")]
165 pub metadata: ::core::option::Option<sink_metadata::Metadata>,
166}
167pub mod sink_metadata {
169 #[derive(prost_helpers::AnyPB)]
170 #[derive(Clone, PartialEq, ::prost::Message)]
171 pub struct SerializedMetadata {
172 #[prost(bytes = "vec", tag = "1")]
173 pub metadata: ::prost::alloc::vec::Vec<u8>,
174 }
175 #[derive(prost_helpers::AnyPB)]
176 #[derive(Clone, PartialEq, ::prost::Oneof)]
177 pub enum Metadata {
178 #[prost(message, tag = "1")]
179 Serialized(SerializedMetadata),
180 }
181}
182#[derive(prost_helpers::AnyPB)]
183#[derive(Clone, PartialEq, ::prost::Message)]
184pub struct SinkCoordinatorStreamRequest {
185 #[prost(oneof = "sink_coordinator_stream_request::Request", tags = "1, 2")]
186 pub request: ::core::option::Option<sink_coordinator_stream_request::Request>,
187}
188pub mod sink_coordinator_stream_request {
190 #[derive(prost_helpers::AnyPB)]
191 #[derive(Clone, PartialEq, ::prost::Message)]
192 pub struct StartCoordinator {
193 #[prost(message, optional, tag = "1")]
194 pub param: ::core::option::Option<super::SinkParam>,
195 }
196 #[derive(prost_helpers::AnyPB)]
197 #[derive(Clone, PartialEq, ::prost::Message)]
198 pub struct CommitMetadata {
199 #[prost(uint64, tag = "1")]
200 pub epoch: u64,
201 #[prost(message, repeated, tag = "2")]
202 pub metadata: ::prost::alloc::vec::Vec<super::SinkMetadata>,
203 }
204 #[derive(prost_helpers::AnyPB)]
205 #[derive(Clone, PartialEq, ::prost::Oneof)]
206 pub enum Request {
207 #[prost(message, tag = "1")]
208 Start(StartCoordinator),
209 #[prost(message, tag = "2")]
210 Commit(CommitMetadata),
211 }
212}
213#[derive(prost_helpers::AnyPB)]
214#[derive(Clone, Copy, PartialEq, ::prost::Message)]
215pub struct SinkCoordinatorStreamResponse {
216 #[prost(oneof = "sink_coordinator_stream_response::Response", tags = "1, 2")]
217 pub response: ::core::option::Option<sink_coordinator_stream_response::Response>,
218}
219pub mod sink_coordinator_stream_response {
221 #[derive(prost_helpers::AnyPB)]
222 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
223 pub struct StartResponse {}
224 #[derive(prost_helpers::AnyPB)]
225 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
226 pub struct CommitResponse {
227 #[prost(uint64, tag = "1")]
228 pub epoch: u64,
229 }
230 #[derive(prost_helpers::AnyPB)]
231 #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
232 pub enum Response {
233 #[prost(message, tag = "1")]
234 Start(StartResponse),
235 #[prost(message, tag = "2")]
236 Commit(CommitResponse),
237 }
238}
239#[derive(prost_helpers::AnyPB)]
240#[derive(Clone, PartialEq, ::prost::Message)]
241pub struct CdcMessage {
242 #[prost(string, tag = "1")]
244 pub payload: ::prost::alloc::string::String,
245 #[prost(string, tag = "2")]
246 pub partition: ::prost::alloc::string::String,
247 #[prost(string, tag = "3")]
248 pub offset: ::prost::alloc::string::String,
249 #[prost(string, tag = "4")]
250 pub full_table_name: ::prost::alloc::string::String,
251 #[prost(int64, tag = "5")]
252 pub source_ts_ms: i64,
253 #[prost(enumeration = "cdc_message::CdcMessageType", tag = "6")]
254 pub msg_type: i32,
255 #[prost(string, tag = "7")]
257 pub key: ::prost::alloc::string::String,
258}
259pub mod cdc_message {
261 #[derive(prost_helpers::AnyPB)]
262 #[derive(
263 Clone,
264 Copy,
265 Debug,
266 PartialEq,
267 Eq,
268 Hash,
269 PartialOrd,
270 Ord,
271 ::prost::Enumeration
272 )]
273 #[repr(i32)]
274 pub enum CdcMessageType {
275 Unspecified = 0,
276 Heartbeat = 1,
277 Data = 2,
278 TransactionMeta = 3,
279 SchemaChange = 4,
280 }
281 impl CdcMessageType {
282 pub fn as_str_name(&self) -> &'static str {
287 match self {
288 Self::Unspecified => "UNSPECIFIED",
289 Self::Heartbeat => "HEARTBEAT",
290 Self::Data => "DATA",
291 Self::TransactionMeta => "TRANSACTION_META",
292 Self::SchemaChange => "SCHEMA_CHANGE",
293 }
294 }
295 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
297 match value {
298 "UNSPECIFIED" => Some(Self::Unspecified),
299 "HEARTBEAT" => Some(Self::Heartbeat),
300 "DATA" => Some(Self::Data),
301 "TRANSACTION_META" => Some(Self::TransactionMeta),
302 "SCHEMA_CHANGE" => Some(Self::SchemaChange),
303 _ => None,
304 }
305 }
306 }
307}
308#[derive(prost_helpers::AnyPB)]
309#[derive(Clone, PartialEq, ::prost::Message)]
310pub struct GetEventStreamRequest {
311 #[prost(uint64, tag = "1")]
312 pub source_id: u64,
313 #[prost(enumeration = "SourceType", tag = "2")]
314 pub source_type: i32,
315 #[prost(string, tag = "3")]
316 pub start_offset: ::prost::alloc::string::String,
317 #[prost(btree_map = "string, string", tag = "4")]
318 pub properties: ::prost::alloc::collections::BTreeMap<
319 ::prost::alloc::string::String,
320 ::prost::alloc::string::String,
321 >,
322 #[prost(bool, tag = "5")]
323 pub snapshot_done: bool,
324 #[prost(bool, tag = "6")]
325 pub is_source_job: bool,
326}
327#[derive(prost_helpers::AnyPB)]
328#[derive(Clone, PartialEq, ::prost::Message)]
329pub struct GetEventStreamResponse {
330 #[prost(uint64, tag = "1")]
331 pub source_id: u64,
332 #[prost(message, repeated, tag = "2")]
333 pub events: ::prost::alloc::vec::Vec<CdcMessage>,
334 #[prost(message, optional, tag = "3")]
335 pub control: ::core::option::Option<get_event_stream_response::ControlInfo>,
336}
337pub mod get_event_stream_response {
339 #[derive(prost_helpers::AnyPB)]
340 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
341 pub struct ControlInfo {
342 #[prost(bool, tag = "1")]
343 pub handshake_ok: bool,
344 }
345}
346#[derive(prost_helpers::AnyPB)]
347#[derive(Clone, PartialEq, ::prost::Message)]
348pub struct ValidateSourceRequest {
349 #[prost(uint64, tag = "1")]
350 pub source_id: u64,
351 #[prost(enumeration = "SourceType", tag = "2")]
352 pub source_type: i32,
353 #[prost(btree_map = "string, string", tag = "3")]
354 pub properties: ::prost::alloc::collections::BTreeMap<
355 ::prost::alloc::string::String,
356 ::prost::alloc::string::String,
357 >,
358 #[prost(message, optional, tag = "4")]
359 pub table_schema: ::core::option::Option<TableSchema>,
360 #[prost(bool, tag = "5")]
361 pub is_source_job: bool,
362 #[prost(bool, tag = "6")]
363 pub is_backfill_table: bool,
364}
365#[derive(prost_helpers::AnyPB)]
366#[derive(Clone, PartialEq, ::prost::Message)]
367pub struct ValidateSourceResponse {
368 #[prost(message, optional, tag = "1")]
370 pub error: ::core::option::Option<ValidationError>,
371}
372#[derive(prost_helpers::AnyPB)]
373#[derive(Clone, PartialEq, ::prost::Message)]
374pub struct CoordinateRequest {
375 #[prost(oneof = "coordinate_request::Msg", tags = "1, 2, 3, 4, 5")]
376 pub msg: ::core::option::Option<coordinate_request::Msg>,
377}
378pub mod coordinate_request {
380 #[derive(prost_helpers::AnyPB)]
383 #[derive(Clone, PartialEq, ::prost::Message)]
384 pub struct StartCoordinationRequest {
385 #[prost(message, optional, tag = "1")]
386 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
387 #[prost(message, optional, tag = "2")]
388 pub param: ::core::option::Option<super::SinkParam>,
389 }
390 #[derive(prost_helpers::AnyPB)]
391 #[derive(Clone, PartialEq, ::prost::Message)]
392 pub struct CommitRequest {
393 #[prost(uint64, tag = "1")]
394 pub epoch: u64,
395 #[prost(message, optional, tag = "2")]
396 pub metadata: ::core::option::Option<super::SinkMetadata>,
397 #[prost(message, optional, tag = "3")]
398 pub schema_change: ::core::option::Option<
399 super::super::stream_plan::SinkSchemaChange,
400 >,
401 }
402 #[derive(prost_helpers::AnyPB)]
403 #[derive(Clone, PartialEq, ::prost::Message)]
404 pub struct UpdateVnodeBitmapRequest {
405 #[prost(message, optional, tag = "1")]
406 pub vnode_bitmap: ::core::option::Option<super::super::common::Buffer>,
407 }
408 #[derive(prost_helpers::AnyPB)]
409 #[derive(Clone, PartialEq, ::prost::Oneof)]
410 pub enum Msg {
411 #[prost(message, tag = "1")]
412 StartRequest(StartCoordinationRequest),
413 #[prost(message, tag = "2")]
414 CommitRequest(CommitRequest),
415 #[prost(message, tag = "3")]
416 UpdateVnodeRequest(UpdateVnodeBitmapRequest),
417 #[prost(bool, tag = "4")]
418 Stop(bool),
419 #[prost(uint64, tag = "5")]
420 AlignInitialEpochRequest(u64),
421 }
422}
423#[derive(prost_helpers::AnyPB)]
424#[derive(Clone, Copy, PartialEq, ::prost::Message)]
425pub struct CoordinateResponse {
426 #[prost(oneof = "coordinate_response::Msg", tags = "1, 2, 3, 4")]
427 pub msg: ::core::option::Option<coordinate_response::Msg>,
428}
429pub mod coordinate_response {
431 #[derive(prost_helpers::AnyPB)]
432 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
433 pub struct StartCoordinationResponse {
434 #[prost(uint64, optional, tag = "1")]
435 pub log_store_rewind_start_epoch: ::core::option::Option<u64>,
436 }
437 #[derive(prost_helpers::AnyPB)]
438 #[derive(Clone, Copy, PartialEq, ::prost::Message)]
439 pub struct CommitResponse {
440 #[prost(uint64, tag = "1")]
441 pub epoch: u64,
442 }
443 #[derive(prost_helpers::AnyPB)]
444 #[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
445 pub enum Msg {
446 #[prost(message, tag = "1")]
447 StartResponse(StartCoordinationResponse),
448 #[prost(message, tag = "2")]
449 CommitResponse(CommitResponse),
450 #[prost(bool, tag = "3")]
451 Stopped(bool),
452 #[prost(uint64, tag = "4")]
453 AlignInitialEpochResponse(u64),
454 }
455}
456#[derive(prost_helpers::AnyPB)]
457#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
458#[repr(i32)]
459pub enum SourceType {
460 Unspecified = 0,
461 Mysql = 1,
462 Postgres = 2,
463 Citus = 3,
464 Mongodb = 4,
465 SqlServer = 5,
466}
467impl SourceType {
468 pub fn as_str_name(&self) -> &'static str {
473 match self {
474 Self::Unspecified => "UNSPECIFIED",
475 Self::Mysql => "MYSQL",
476 Self::Postgres => "POSTGRES",
477 Self::Citus => "CITUS",
478 Self::Mongodb => "MONGODB",
479 Self::SqlServer => "SQL_SERVER",
480 }
481 }
482 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
484 match value {
485 "UNSPECIFIED" => Some(Self::Unspecified),
486 "MYSQL" => Some(Self::Mysql),
487 "POSTGRES" => Some(Self::Postgres),
488 "CITUS" => Some(Self::Citus),
489 "MONGODB" => Some(Self::Mongodb),
490 "SQL_SERVER" => Some(Self::SqlServer),
491 _ => None,
492 }
493 }
494}
495pub mod connector_service_client {
497 #![allow(
498 unused_variables,
499 dead_code,
500 missing_docs,
501 clippy::wildcard_imports,
502 clippy::let_unit_value,
503 )]
504 use tonic::codegen::*;
505 use tonic::codegen::http::Uri;
506 #[derive(Debug, Clone)]
507 pub struct ConnectorServiceClient<T> {
508 inner: tonic::client::Grpc<T>,
509 }
510 impl ConnectorServiceClient<tonic::transport::Channel> {
511 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
513 where
514 D: TryInto<tonic::transport::Endpoint>,
515 D::Error: Into<StdError>,
516 {
517 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
518 Ok(Self::new(conn))
519 }
520 }
521 impl<T> ConnectorServiceClient<T>
522 where
523 T: tonic::client::GrpcService<tonic::body::BoxBody>,
524 T::Error: Into<StdError>,
525 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
526 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
527 {
528 pub fn new(inner: T) -> Self {
529 let inner = tonic::client::Grpc::new(inner);
530 Self { inner }
531 }
532 pub fn with_origin(inner: T, origin: Uri) -> Self {
533 let inner = tonic::client::Grpc::with_origin(inner, origin);
534 Self { inner }
535 }
536 pub fn with_interceptor<F>(
537 inner: T,
538 interceptor: F,
539 ) -> ConnectorServiceClient<InterceptedService<T, F>>
540 where
541 F: tonic::service::Interceptor,
542 T::ResponseBody: Default,
543 T: tonic::codegen::Service<
544 http::Request<tonic::body::BoxBody>,
545 Response = http::Response<
546 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
547 >,
548 >,
549 <T as tonic::codegen::Service<
550 http::Request<tonic::body::BoxBody>,
551 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
552 {
553 ConnectorServiceClient::new(InterceptedService::new(inner, interceptor))
554 }
555 #[must_use]
560 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
561 self.inner = self.inner.send_compressed(encoding);
562 self
563 }
564 #[must_use]
566 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
567 self.inner = self.inner.accept_compressed(encoding);
568 self
569 }
570 #[must_use]
574 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
575 self.inner = self.inner.max_decoding_message_size(limit);
576 self
577 }
578 #[must_use]
582 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
583 self.inner = self.inner.max_encoding_message_size(limit);
584 self
585 }
586 pub async fn sink_writer_stream(
587 &mut self,
588 request: impl tonic::IntoStreamingRequest<
589 Message = super::SinkWriterStreamRequest,
590 >,
591 ) -> std::result::Result<
592 tonic::Response<tonic::codec::Streaming<super::SinkWriterStreamResponse>>,
593 tonic::Status,
594 > {
595 self.inner
596 .ready()
597 .await
598 .map_err(|e| {
599 tonic::Status::unknown(
600 format!("Service was not ready: {}", e.into()),
601 )
602 })?;
603 let codec = tonic::codec::ProstCodec::default();
604 let path = http::uri::PathAndQuery::from_static(
605 "/connector_service.ConnectorService/SinkWriterStream",
606 );
607 let mut req = request.into_streaming_request();
608 req.extensions_mut()
609 .insert(
610 GrpcMethod::new(
611 "connector_service.ConnectorService",
612 "SinkWriterStream",
613 ),
614 );
615 self.inner.streaming(req, path, codec).await
616 }
617 pub async fn sink_coordinator_stream(
618 &mut self,
619 request: impl tonic::IntoStreamingRequest<
620 Message = super::SinkCoordinatorStreamRequest,
621 >,
622 ) -> std::result::Result<
623 tonic::Response<
624 tonic::codec::Streaming<super::SinkCoordinatorStreamResponse>,
625 >,
626 tonic::Status,
627 > {
628 self.inner
629 .ready()
630 .await
631 .map_err(|e| {
632 tonic::Status::unknown(
633 format!("Service was not ready: {}", e.into()),
634 )
635 })?;
636 let codec = tonic::codec::ProstCodec::default();
637 let path = http::uri::PathAndQuery::from_static(
638 "/connector_service.ConnectorService/SinkCoordinatorStream",
639 );
640 let mut req = request.into_streaming_request();
641 req.extensions_mut()
642 .insert(
643 GrpcMethod::new(
644 "connector_service.ConnectorService",
645 "SinkCoordinatorStream",
646 ),
647 );
648 self.inner.streaming(req, path, codec).await
649 }
650 pub async fn validate_sink(
651 &mut self,
652 request: impl tonic::IntoRequest<super::ValidateSinkRequest>,
653 ) -> std::result::Result<
654 tonic::Response<super::ValidateSinkResponse>,
655 tonic::Status,
656 > {
657 self.inner
658 .ready()
659 .await
660 .map_err(|e| {
661 tonic::Status::unknown(
662 format!("Service was not ready: {}", e.into()),
663 )
664 })?;
665 let codec = tonic::codec::ProstCodec::default();
666 let path = http::uri::PathAndQuery::from_static(
667 "/connector_service.ConnectorService/ValidateSink",
668 );
669 let mut req = request.into_request();
670 req.extensions_mut()
671 .insert(
672 GrpcMethod::new("connector_service.ConnectorService", "ValidateSink"),
673 );
674 self.inner.unary(req, path, codec).await
675 }
676 pub async fn get_event_stream(
677 &mut self,
678 request: impl tonic::IntoRequest<super::GetEventStreamRequest>,
679 ) -> std::result::Result<
680 tonic::Response<tonic::codec::Streaming<super::GetEventStreamResponse>>,
681 tonic::Status,
682 > {
683 self.inner
684 .ready()
685 .await
686 .map_err(|e| {
687 tonic::Status::unknown(
688 format!("Service was not ready: {}", e.into()),
689 )
690 })?;
691 let codec = tonic::codec::ProstCodec::default();
692 let path = http::uri::PathAndQuery::from_static(
693 "/connector_service.ConnectorService/GetEventStream",
694 );
695 let mut req = request.into_request();
696 req.extensions_mut()
697 .insert(
698 GrpcMethod::new(
699 "connector_service.ConnectorService",
700 "GetEventStream",
701 ),
702 );
703 self.inner.server_streaming(req, path, codec).await
704 }
705 pub async fn validate_source(
706 &mut self,
707 request: impl tonic::IntoRequest<super::ValidateSourceRequest>,
708 ) -> std::result::Result<
709 tonic::Response<super::ValidateSourceResponse>,
710 tonic::Status,
711 > {
712 self.inner
713 .ready()
714 .await
715 .map_err(|e| {
716 tonic::Status::unknown(
717 format!("Service was not ready: {}", e.into()),
718 )
719 })?;
720 let codec = tonic::codec::ProstCodec::default();
721 let path = http::uri::PathAndQuery::from_static(
722 "/connector_service.ConnectorService/ValidateSource",
723 );
724 let mut req = request.into_request();
725 req.extensions_mut()
726 .insert(
727 GrpcMethod::new(
728 "connector_service.ConnectorService",
729 "ValidateSource",
730 ),
731 );
732 self.inner.unary(req, path, codec).await
733 }
734 }
735}
736pub mod sink_coordination_service_client {
738 #![allow(
739 unused_variables,
740 dead_code,
741 missing_docs,
742 clippy::wildcard_imports,
743 clippy::let_unit_value,
744 )]
745 use tonic::codegen::*;
746 use tonic::codegen::http::Uri;
747 #[derive(Debug, Clone)]
748 pub struct SinkCoordinationServiceClient<T> {
749 inner: tonic::client::Grpc<T>,
750 }
751 impl SinkCoordinationServiceClient<tonic::transport::Channel> {
752 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
754 where
755 D: TryInto<tonic::transport::Endpoint>,
756 D::Error: Into<StdError>,
757 {
758 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
759 Ok(Self::new(conn))
760 }
761 }
762 impl<T> SinkCoordinationServiceClient<T>
763 where
764 T: tonic::client::GrpcService<tonic::body::BoxBody>,
765 T::Error: Into<StdError>,
766 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
767 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
768 {
769 pub fn new(inner: T) -> Self {
770 let inner = tonic::client::Grpc::new(inner);
771 Self { inner }
772 }
773 pub fn with_origin(inner: T, origin: Uri) -> Self {
774 let inner = tonic::client::Grpc::with_origin(inner, origin);
775 Self { inner }
776 }
777 pub fn with_interceptor<F>(
778 inner: T,
779 interceptor: F,
780 ) -> SinkCoordinationServiceClient<InterceptedService<T, F>>
781 where
782 F: tonic::service::Interceptor,
783 T::ResponseBody: Default,
784 T: tonic::codegen::Service<
785 http::Request<tonic::body::BoxBody>,
786 Response = http::Response<
787 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
788 >,
789 >,
790 <T as tonic::codegen::Service<
791 http::Request<tonic::body::BoxBody>,
792 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
793 {
794 SinkCoordinationServiceClient::new(
795 InterceptedService::new(inner, interceptor),
796 )
797 }
798 #[must_use]
803 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
804 self.inner = self.inner.send_compressed(encoding);
805 self
806 }
807 #[must_use]
809 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
810 self.inner = self.inner.accept_compressed(encoding);
811 self
812 }
813 #[must_use]
817 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
818 self.inner = self.inner.max_decoding_message_size(limit);
819 self
820 }
821 #[must_use]
825 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
826 self.inner = self.inner.max_encoding_message_size(limit);
827 self
828 }
829 pub async fn coordinate(
830 &mut self,
831 request: impl tonic::IntoStreamingRequest<Message = super::CoordinateRequest>,
832 ) -> std::result::Result<
833 tonic::Response<tonic::codec::Streaming<super::CoordinateResponse>>,
834 tonic::Status,
835 > {
836 self.inner
837 .ready()
838 .await
839 .map_err(|e| {
840 tonic::Status::unknown(
841 format!("Service was not ready: {}", e.into()),
842 )
843 })?;
844 let codec = tonic::codec::ProstCodec::default();
845 let path = http::uri::PathAndQuery::from_static(
846 "/connector_service.SinkCoordinationService/Coordinate",
847 );
848 let mut req = request.into_streaming_request();
849 req.extensions_mut()
850 .insert(
851 GrpcMethod::new(
852 "connector_service.SinkCoordinationService",
853 "Coordinate",
854 ),
855 );
856 self.inner.streaming(req, path, codec).await
857 }
858 }
859}
860pub mod connector_service_server {
862 #![allow(
863 unused_variables,
864 dead_code,
865 missing_docs,
866 clippy::wildcard_imports,
867 clippy::let_unit_value,
868 )]
869 use tonic::codegen::*;
870 #[async_trait]
872 pub trait ConnectorService: std::marker::Send + std::marker::Sync + 'static {
873 type SinkWriterStreamStream: tonic::codegen::tokio_stream::Stream<
875 Item = std::result::Result<
876 super::SinkWriterStreamResponse,
877 tonic::Status,
878 >,
879 >
880 + std::marker::Send
881 + 'static;
882 async fn sink_writer_stream(
883 &self,
884 request: tonic::Request<tonic::Streaming<super::SinkWriterStreamRequest>>,
885 ) -> std::result::Result<
886 tonic::Response<Self::SinkWriterStreamStream>,
887 tonic::Status,
888 >;
889 type SinkCoordinatorStreamStream: tonic::codegen::tokio_stream::Stream<
891 Item = std::result::Result<
892 super::SinkCoordinatorStreamResponse,
893 tonic::Status,
894 >,
895 >
896 + std::marker::Send
897 + 'static;
898 async fn sink_coordinator_stream(
899 &self,
900 request: tonic::Request<
901 tonic::Streaming<super::SinkCoordinatorStreamRequest>,
902 >,
903 ) -> std::result::Result<
904 tonic::Response<Self::SinkCoordinatorStreamStream>,
905 tonic::Status,
906 >;
907 async fn validate_sink(
908 &self,
909 request: tonic::Request<super::ValidateSinkRequest>,
910 ) -> std::result::Result<
911 tonic::Response<super::ValidateSinkResponse>,
912 tonic::Status,
913 >;
914 type GetEventStreamStream: tonic::codegen::tokio_stream::Stream<
916 Item = std::result::Result<super::GetEventStreamResponse, tonic::Status>,
917 >
918 + std::marker::Send
919 + 'static;
920 async fn get_event_stream(
921 &self,
922 request: tonic::Request<super::GetEventStreamRequest>,
923 ) -> std::result::Result<
924 tonic::Response<Self::GetEventStreamStream>,
925 tonic::Status,
926 >;
927 async fn validate_source(
928 &self,
929 request: tonic::Request<super::ValidateSourceRequest>,
930 ) -> std::result::Result<
931 tonic::Response<super::ValidateSourceResponse>,
932 tonic::Status,
933 >;
934 }
935 #[derive(Debug)]
936 pub struct ConnectorServiceServer<T> {
937 inner: Arc<T>,
938 accept_compression_encodings: EnabledCompressionEncodings,
939 send_compression_encodings: EnabledCompressionEncodings,
940 max_decoding_message_size: Option<usize>,
941 max_encoding_message_size: Option<usize>,
942 }
943 impl<T> ConnectorServiceServer<T> {
944 pub fn new(inner: T) -> Self {
945 Self::from_arc(Arc::new(inner))
946 }
947 pub fn from_arc(inner: Arc<T>) -> Self {
948 Self {
949 inner,
950 accept_compression_encodings: Default::default(),
951 send_compression_encodings: Default::default(),
952 max_decoding_message_size: None,
953 max_encoding_message_size: None,
954 }
955 }
956 pub fn with_interceptor<F>(
957 inner: T,
958 interceptor: F,
959 ) -> InterceptedService<Self, F>
960 where
961 F: tonic::service::Interceptor,
962 {
963 InterceptedService::new(Self::new(inner), interceptor)
964 }
965 #[must_use]
967 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
968 self.accept_compression_encodings.enable(encoding);
969 self
970 }
971 #[must_use]
973 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
974 self.send_compression_encodings.enable(encoding);
975 self
976 }
977 #[must_use]
981 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
982 self.max_decoding_message_size = Some(limit);
983 self
984 }
985 #[must_use]
989 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
990 self.max_encoding_message_size = Some(limit);
991 self
992 }
993 }
994 impl<T, B> tonic::codegen::Service<http::Request<B>> for ConnectorServiceServer<T>
995 where
996 T: ConnectorService,
997 B: Body + std::marker::Send + 'static,
998 B::Error: Into<StdError> + std::marker::Send + 'static,
999 {
1000 type Response = http::Response<tonic::body::BoxBody>;
1001 type Error = std::convert::Infallible;
1002 type Future = BoxFuture<Self::Response, Self::Error>;
1003 fn poll_ready(
1004 &mut self,
1005 _cx: &mut Context<'_>,
1006 ) -> Poll<std::result::Result<(), Self::Error>> {
1007 Poll::Ready(Ok(()))
1008 }
1009 fn call(&mut self, req: http::Request<B>) -> Self::Future {
1010 match req.uri().path() {
1011 "/connector_service.ConnectorService/SinkWriterStream" => {
1012 #[allow(non_camel_case_types)]
1013 struct SinkWriterStreamSvc<T: ConnectorService>(pub Arc<T>);
1014 impl<
1015 T: ConnectorService,
1016 > tonic::server::StreamingService<super::SinkWriterStreamRequest>
1017 for SinkWriterStreamSvc<T> {
1018 type Response = super::SinkWriterStreamResponse;
1019 type ResponseStream = T::SinkWriterStreamStream;
1020 type Future = BoxFuture<
1021 tonic::Response<Self::ResponseStream>,
1022 tonic::Status,
1023 >;
1024 fn call(
1025 &mut self,
1026 request: tonic::Request<
1027 tonic::Streaming<super::SinkWriterStreamRequest>,
1028 >,
1029 ) -> Self::Future {
1030 let inner = Arc::clone(&self.0);
1031 let fut = async move {
1032 <T as ConnectorService>::sink_writer_stream(&inner, request)
1033 .await
1034 };
1035 Box::pin(fut)
1036 }
1037 }
1038 let accept_compression_encodings = self.accept_compression_encodings;
1039 let send_compression_encodings = self.send_compression_encodings;
1040 let max_decoding_message_size = self.max_decoding_message_size;
1041 let max_encoding_message_size = self.max_encoding_message_size;
1042 let inner = self.inner.clone();
1043 let fut = async move {
1044 let method = SinkWriterStreamSvc(inner);
1045 let codec = tonic::codec::ProstCodec::default();
1046 let mut grpc = tonic::server::Grpc::new(codec)
1047 .apply_compression_config(
1048 accept_compression_encodings,
1049 send_compression_encodings,
1050 )
1051 .apply_max_message_size_config(
1052 max_decoding_message_size,
1053 max_encoding_message_size,
1054 );
1055 let res = grpc.streaming(method, req).await;
1056 Ok(res)
1057 };
1058 Box::pin(fut)
1059 }
1060 "/connector_service.ConnectorService/SinkCoordinatorStream" => {
1061 #[allow(non_camel_case_types)]
1062 struct SinkCoordinatorStreamSvc<T: ConnectorService>(pub Arc<T>);
1063 impl<
1064 T: ConnectorService,
1065 > tonic::server::StreamingService<
1066 super::SinkCoordinatorStreamRequest,
1067 > for SinkCoordinatorStreamSvc<T> {
1068 type Response = super::SinkCoordinatorStreamResponse;
1069 type ResponseStream = T::SinkCoordinatorStreamStream;
1070 type Future = BoxFuture<
1071 tonic::Response<Self::ResponseStream>,
1072 tonic::Status,
1073 >;
1074 fn call(
1075 &mut self,
1076 request: tonic::Request<
1077 tonic::Streaming<super::SinkCoordinatorStreamRequest>,
1078 >,
1079 ) -> Self::Future {
1080 let inner = Arc::clone(&self.0);
1081 let fut = async move {
1082 <T as ConnectorService>::sink_coordinator_stream(
1083 &inner,
1084 request,
1085 )
1086 .await
1087 };
1088 Box::pin(fut)
1089 }
1090 }
1091 let accept_compression_encodings = self.accept_compression_encodings;
1092 let send_compression_encodings = self.send_compression_encodings;
1093 let max_decoding_message_size = self.max_decoding_message_size;
1094 let max_encoding_message_size = self.max_encoding_message_size;
1095 let inner = self.inner.clone();
1096 let fut = async move {
1097 let method = SinkCoordinatorStreamSvc(inner);
1098 let codec = tonic::codec::ProstCodec::default();
1099 let mut grpc = tonic::server::Grpc::new(codec)
1100 .apply_compression_config(
1101 accept_compression_encodings,
1102 send_compression_encodings,
1103 )
1104 .apply_max_message_size_config(
1105 max_decoding_message_size,
1106 max_encoding_message_size,
1107 );
1108 let res = grpc.streaming(method, req).await;
1109 Ok(res)
1110 };
1111 Box::pin(fut)
1112 }
1113 "/connector_service.ConnectorService/ValidateSink" => {
1114 #[allow(non_camel_case_types)]
1115 struct ValidateSinkSvc<T: ConnectorService>(pub Arc<T>);
1116 impl<
1117 T: ConnectorService,
1118 > tonic::server::UnaryService<super::ValidateSinkRequest>
1119 for ValidateSinkSvc<T> {
1120 type Response = super::ValidateSinkResponse;
1121 type Future = BoxFuture<
1122 tonic::Response<Self::Response>,
1123 tonic::Status,
1124 >;
1125 fn call(
1126 &mut self,
1127 request: tonic::Request<super::ValidateSinkRequest>,
1128 ) -> Self::Future {
1129 let inner = Arc::clone(&self.0);
1130 let fut = async move {
1131 <T as ConnectorService>::validate_sink(&inner, request)
1132 .await
1133 };
1134 Box::pin(fut)
1135 }
1136 }
1137 let accept_compression_encodings = self.accept_compression_encodings;
1138 let send_compression_encodings = self.send_compression_encodings;
1139 let max_decoding_message_size = self.max_decoding_message_size;
1140 let max_encoding_message_size = self.max_encoding_message_size;
1141 let inner = self.inner.clone();
1142 let fut = async move {
1143 let method = ValidateSinkSvc(inner);
1144 let codec = tonic::codec::ProstCodec::default();
1145 let mut grpc = tonic::server::Grpc::new(codec)
1146 .apply_compression_config(
1147 accept_compression_encodings,
1148 send_compression_encodings,
1149 )
1150 .apply_max_message_size_config(
1151 max_decoding_message_size,
1152 max_encoding_message_size,
1153 );
1154 let res = grpc.unary(method, req).await;
1155 Ok(res)
1156 };
1157 Box::pin(fut)
1158 }
1159 "/connector_service.ConnectorService/GetEventStream" => {
1160 #[allow(non_camel_case_types)]
1161 struct GetEventStreamSvc<T: ConnectorService>(pub Arc<T>);
1162 impl<
1163 T: ConnectorService,
1164 > tonic::server::ServerStreamingService<super::GetEventStreamRequest>
1165 for GetEventStreamSvc<T> {
1166 type Response = super::GetEventStreamResponse;
1167 type ResponseStream = T::GetEventStreamStream;
1168 type Future = BoxFuture<
1169 tonic::Response<Self::ResponseStream>,
1170 tonic::Status,
1171 >;
1172 fn call(
1173 &mut self,
1174 request: tonic::Request<super::GetEventStreamRequest>,
1175 ) -> Self::Future {
1176 let inner = Arc::clone(&self.0);
1177 let fut = async move {
1178 <T as ConnectorService>::get_event_stream(&inner, request)
1179 .await
1180 };
1181 Box::pin(fut)
1182 }
1183 }
1184 let accept_compression_encodings = self.accept_compression_encodings;
1185 let send_compression_encodings = self.send_compression_encodings;
1186 let max_decoding_message_size = self.max_decoding_message_size;
1187 let max_encoding_message_size = self.max_encoding_message_size;
1188 let inner = self.inner.clone();
1189 let fut = async move {
1190 let method = GetEventStreamSvc(inner);
1191 let codec = tonic::codec::ProstCodec::default();
1192 let mut grpc = tonic::server::Grpc::new(codec)
1193 .apply_compression_config(
1194 accept_compression_encodings,
1195 send_compression_encodings,
1196 )
1197 .apply_max_message_size_config(
1198 max_decoding_message_size,
1199 max_encoding_message_size,
1200 );
1201 let res = grpc.server_streaming(method, req).await;
1202 Ok(res)
1203 };
1204 Box::pin(fut)
1205 }
1206 "/connector_service.ConnectorService/ValidateSource" => {
1207 #[allow(non_camel_case_types)]
1208 struct ValidateSourceSvc<T: ConnectorService>(pub Arc<T>);
1209 impl<
1210 T: ConnectorService,
1211 > tonic::server::UnaryService<super::ValidateSourceRequest>
1212 for ValidateSourceSvc<T> {
1213 type Response = super::ValidateSourceResponse;
1214 type Future = BoxFuture<
1215 tonic::Response<Self::Response>,
1216 tonic::Status,
1217 >;
1218 fn call(
1219 &mut self,
1220 request: tonic::Request<super::ValidateSourceRequest>,
1221 ) -> Self::Future {
1222 let inner = Arc::clone(&self.0);
1223 let fut = async move {
1224 <T as ConnectorService>::validate_source(&inner, request)
1225 .await
1226 };
1227 Box::pin(fut)
1228 }
1229 }
1230 let accept_compression_encodings = self.accept_compression_encodings;
1231 let send_compression_encodings = self.send_compression_encodings;
1232 let max_decoding_message_size = self.max_decoding_message_size;
1233 let max_encoding_message_size = self.max_encoding_message_size;
1234 let inner = self.inner.clone();
1235 let fut = async move {
1236 let method = ValidateSourceSvc(inner);
1237 let codec = tonic::codec::ProstCodec::default();
1238 let mut grpc = tonic::server::Grpc::new(codec)
1239 .apply_compression_config(
1240 accept_compression_encodings,
1241 send_compression_encodings,
1242 )
1243 .apply_max_message_size_config(
1244 max_decoding_message_size,
1245 max_encoding_message_size,
1246 );
1247 let res = grpc.unary(method, req).await;
1248 Ok(res)
1249 };
1250 Box::pin(fut)
1251 }
1252 _ => {
1253 Box::pin(async move {
1254 let mut response = http::Response::new(empty_body());
1255 let headers = response.headers_mut();
1256 headers
1257 .insert(
1258 tonic::Status::GRPC_STATUS,
1259 (tonic::Code::Unimplemented as i32).into(),
1260 );
1261 headers
1262 .insert(
1263 http::header::CONTENT_TYPE,
1264 tonic::metadata::GRPC_CONTENT_TYPE,
1265 );
1266 Ok(response)
1267 })
1268 }
1269 }
1270 }
1271 }
1272 impl<T> Clone for ConnectorServiceServer<T> {
1273 fn clone(&self) -> Self {
1274 let inner = self.inner.clone();
1275 Self {
1276 inner,
1277 accept_compression_encodings: self.accept_compression_encodings,
1278 send_compression_encodings: self.send_compression_encodings,
1279 max_decoding_message_size: self.max_decoding_message_size,
1280 max_encoding_message_size: self.max_encoding_message_size,
1281 }
1282 }
1283 }
1284 pub const SERVICE_NAME: &str = "connector_service.ConnectorService";
1286 impl<T> tonic::server::NamedService for ConnectorServiceServer<T> {
1287 const NAME: &'static str = SERVICE_NAME;
1288 }
1289}
1290pub mod sink_coordination_service_server {
1292 #![allow(
1293 unused_variables,
1294 dead_code,
1295 missing_docs,
1296 clippy::wildcard_imports,
1297 clippy::let_unit_value,
1298 )]
1299 use tonic::codegen::*;
1300 #[async_trait]
1302 pub trait SinkCoordinationService: std::marker::Send + std::marker::Sync + 'static {
1303 type CoordinateStream: tonic::codegen::tokio_stream::Stream<
1305 Item = std::result::Result<super::CoordinateResponse, tonic::Status>,
1306 >
1307 + std::marker::Send
1308 + 'static;
1309 async fn coordinate(
1310 &self,
1311 request: tonic::Request<tonic::Streaming<super::CoordinateRequest>>,
1312 ) -> std::result::Result<tonic::Response<Self::CoordinateStream>, tonic::Status>;
1313 }
1314 #[derive(Debug)]
1315 pub struct SinkCoordinationServiceServer<T> {
1316 inner: Arc<T>,
1317 accept_compression_encodings: EnabledCompressionEncodings,
1318 send_compression_encodings: EnabledCompressionEncodings,
1319 max_decoding_message_size: Option<usize>,
1320 max_encoding_message_size: Option<usize>,
1321 }
1322 impl<T> SinkCoordinationServiceServer<T> {
1323 pub fn new(inner: T) -> Self {
1324 Self::from_arc(Arc::new(inner))
1325 }
1326 pub fn from_arc(inner: Arc<T>) -> Self {
1327 Self {
1328 inner,
1329 accept_compression_encodings: Default::default(),
1330 send_compression_encodings: Default::default(),
1331 max_decoding_message_size: None,
1332 max_encoding_message_size: None,
1333 }
1334 }
1335 pub fn with_interceptor<F>(
1336 inner: T,
1337 interceptor: F,
1338 ) -> InterceptedService<Self, F>
1339 where
1340 F: tonic::service::Interceptor,
1341 {
1342 InterceptedService::new(Self::new(inner), interceptor)
1343 }
1344 #[must_use]
1346 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
1347 self.accept_compression_encodings.enable(encoding);
1348 self
1349 }
1350 #[must_use]
1352 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
1353 self.send_compression_encodings.enable(encoding);
1354 self
1355 }
1356 #[must_use]
1360 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
1361 self.max_decoding_message_size = Some(limit);
1362 self
1363 }
1364 #[must_use]
1368 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
1369 self.max_encoding_message_size = Some(limit);
1370 self
1371 }
1372 }
1373 impl<T, B> tonic::codegen::Service<http::Request<B>>
1374 for SinkCoordinationServiceServer<T>
1375 where
1376 T: SinkCoordinationService,
1377 B: Body + std::marker::Send + 'static,
1378 B::Error: Into<StdError> + std::marker::Send + 'static,
1379 {
1380 type Response = http::Response<tonic::body::BoxBody>;
1381 type Error = std::convert::Infallible;
1382 type Future = BoxFuture<Self::Response, Self::Error>;
1383 fn poll_ready(
1384 &mut self,
1385 _cx: &mut Context<'_>,
1386 ) -> Poll<std::result::Result<(), Self::Error>> {
1387 Poll::Ready(Ok(()))
1388 }
1389 fn call(&mut self, req: http::Request<B>) -> Self::Future {
1390 match req.uri().path() {
1391 "/connector_service.SinkCoordinationService/Coordinate" => {
1392 #[allow(non_camel_case_types)]
1393 struct CoordinateSvc<T: SinkCoordinationService>(pub Arc<T>);
1394 impl<
1395 T: SinkCoordinationService,
1396 > tonic::server::StreamingService<super::CoordinateRequest>
1397 for CoordinateSvc<T> {
1398 type Response = super::CoordinateResponse;
1399 type ResponseStream = T::CoordinateStream;
1400 type Future = BoxFuture<
1401 tonic::Response<Self::ResponseStream>,
1402 tonic::Status,
1403 >;
1404 fn call(
1405 &mut self,
1406 request: tonic::Request<
1407 tonic::Streaming<super::CoordinateRequest>,
1408 >,
1409 ) -> Self::Future {
1410 let inner = Arc::clone(&self.0);
1411 let fut = async move {
1412 <T as SinkCoordinationService>::coordinate(&inner, request)
1413 .await
1414 };
1415 Box::pin(fut)
1416 }
1417 }
1418 let accept_compression_encodings = self.accept_compression_encodings;
1419 let send_compression_encodings = self.send_compression_encodings;
1420 let max_decoding_message_size = self.max_decoding_message_size;
1421 let max_encoding_message_size = self.max_encoding_message_size;
1422 let inner = self.inner.clone();
1423 let fut = async move {
1424 let method = CoordinateSvc(inner);
1425 let codec = tonic::codec::ProstCodec::default();
1426 let mut grpc = tonic::server::Grpc::new(codec)
1427 .apply_compression_config(
1428 accept_compression_encodings,
1429 send_compression_encodings,
1430 )
1431 .apply_max_message_size_config(
1432 max_decoding_message_size,
1433 max_encoding_message_size,
1434 );
1435 let res = grpc.streaming(method, req).await;
1436 Ok(res)
1437 };
1438 Box::pin(fut)
1439 }
1440 _ => {
1441 Box::pin(async move {
1442 let mut response = http::Response::new(empty_body());
1443 let headers = response.headers_mut();
1444 headers
1445 .insert(
1446 tonic::Status::GRPC_STATUS,
1447 (tonic::Code::Unimplemented as i32).into(),
1448 );
1449 headers
1450 .insert(
1451 http::header::CONTENT_TYPE,
1452 tonic::metadata::GRPC_CONTENT_TYPE,
1453 );
1454 Ok(response)
1455 })
1456 }
1457 }
1458 }
1459 }
1460 impl<T> Clone for SinkCoordinationServiceServer<T> {
1461 fn clone(&self) -> Self {
1462 let inner = self.inner.clone();
1463 Self {
1464 inner,
1465 accept_compression_encodings: self.accept_compression_encodings,
1466 send_compression_encodings: self.send_compression_encodings,
1467 max_decoding_message_size: self.max_decoding_message_size,
1468 max_encoding_message_size: self.max_encoding_message_size,
1469 }
1470 }
1471 }
1472 pub const SERVICE_NAME: &str = "connector_service.SinkCoordinationService";
1474 impl<T> tonic::server::NamedService for SinkCoordinationServiceServer<T> {
1475 const NAME: &'static str = SERVICE_NAME;
1476 }
1477}