1use std::str::Utf8Error;
16use std::sync::Arc;
17
18use common_error::ext::{BoxedError, ErrorExt};
19use common_error::status_code::StatusCode;
20use common_macro::stack_trace_debug;
21use common_procedure::ProcedureId;
22use common_wal::options::WalOptions;
23use serde_json::error::Error as JsonError;
24use snafu::{Location, Snafu};
25use store_api::storage::RegionId;
26use table::metadata::TableId;
27
28use crate::DatanodeId;
29use crate::peer::Peer;
30
31#[derive(Snafu)]
32#[snafu(visibility(pub))]
33#[stack_trace_debug]
34pub enum Error {
35 #[snafu(display("Empty key is not allowed"))]
36 EmptyKey {
37 #[snafu(implicit)]
38 location: Location,
39 },
40
41 #[snafu(display(
42 "Another procedure is operating the region: {} on peer: {}",
43 region_id,
44 peer_id
45 ))]
46 RegionOperatingRace {
47 #[snafu(implicit)]
48 location: Location,
49 peer_id: DatanodeId,
50 region_id: RegionId,
51 },
52
53 #[snafu(display("Failed to connect to Etcd"))]
54 ConnectEtcd {
55 #[snafu(source)]
56 error: etcd_client::Error,
57 #[snafu(implicit)]
58 location: Location,
59 },
60
61 #[snafu(display("Failed to execute via Etcd"))]
62 EtcdFailed {
63 #[snafu(source)]
64 error: etcd_client::Error,
65 #[snafu(implicit)]
66 location: Location,
67 },
68
69 #[snafu(display("Failed to execute {} txn operations via Etcd", max_operations))]
70 EtcdTxnFailed {
71 max_operations: usize,
72 #[snafu(source)]
73 error: etcd_client::Error,
74 #[snafu(implicit)]
75 location: Location,
76 },
77
78 #[snafu(display("Failed to get sequence: {}", err_msg))]
79 NextSequence {
80 err_msg: String,
81 #[snafu(implicit)]
82 location: Location,
83 },
84
85 #[snafu(display("Unexpected sequence value: {}", err_msg))]
86 UnexpectedSequenceValue {
87 err_msg: String,
88 #[snafu(implicit)]
89 location: Location,
90 },
91
92 #[snafu(display("Table info not found: {}", table))]
93 TableInfoNotFound {
94 table: String,
95 #[snafu(implicit)]
96 location: Location,
97 },
98
99 #[snafu(display("Failed to register procedure loader, type name: {}", type_name))]
100 RegisterProcedureLoader {
101 type_name: String,
102 #[snafu(implicit)]
103 location: Location,
104 source: common_procedure::error::Error,
105 },
106
107 #[snafu(display("Failed to register repartition procedure loader"))]
108 RegisterRepartitionProcedureLoader {
109 #[snafu(implicit)]
110 location: Location,
111 source: BoxedError,
112 },
113
114 #[snafu(display("Failed to create repartition procedure"))]
115 CreateRepartitionProcedure {
116 source: BoxedError,
117 #[snafu(implicit)]
118 location: Location,
119 },
120
121 #[snafu(display("Failed to submit procedure"))]
122 SubmitProcedure {
123 #[snafu(implicit)]
124 location: Location,
125 source: common_procedure::Error,
126 },
127
128 #[snafu(display("Failed to query procedure"))]
129 QueryProcedure {
130 #[snafu(implicit)]
131 location: Location,
132 source: common_procedure::Error,
133 },
134
135 #[snafu(display("Procedure not found: {pid}"))]
136 ProcedureNotFound {
137 #[snafu(implicit)]
138 location: Location,
139 pid: String,
140 },
141
142 #[snafu(display("Failed to parse procedure id: {key}"))]
143 ParseProcedureId {
144 #[snafu(implicit)]
145 location: Location,
146 key: String,
147 #[snafu(source)]
148 error: common_procedure::ParseIdError,
149 },
150
151 #[snafu(display("Unsupported operation {}", operation))]
152 Unsupported {
153 operation: String,
154 #[snafu(implicit)]
155 location: Location,
156 },
157
158 #[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
159 ProcedureStateReceiver {
160 procedure_id: ProcedureId,
161 #[snafu(implicit)]
162 location: Location,
163 source: common_procedure::Error,
164 },
165
166 #[snafu(display("Procedure state receiver not found: {procedure_id}"))]
167 ProcedureStateReceiverNotFound {
168 procedure_id: ProcedureId,
169 #[snafu(implicit)]
170 location: Location,
171 },
172
173 #[snafu(display("Failed to wait procedure done"))]
174 WaitProcedure {
175 #[snafu(implicit)]
176 location: Location,
177 source: common_procedure::Error,
178 },
179
180 #[snafu(display("Failed to start procedure manager"))]
181 StartProcedureManager {
182 #[snafu(implicit)]
183 location: Location,
184 source: common_procedure::Error,
185 },
186
187 #[snafu(display("Failed to stop procedure manager"))]
188 StopProcedureManager {
189 #[snafu(implicit)]
190 location: Location,
191 source: common_procedure::Error,
192 },
193
194 #[snafu(display(
195 "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
196 ))]
197 ProcedureOutput {
198 procedure_id: String,
199 err_msg: String,
200 #[snafu(implicit)]
201 location: Location,
202 },
203
204 #[snafu(display("Failed to convert RawTableInfo into TableInfo"))]
205 ConvertRawTableInfo {
206 #[snafu(implicit)]
207 location: Location,
208 source: datatypes::Error,
209 },
210
211 #[snafu(display("Primary key '{key}' not found when creating region request"))]
212 PrimaryKeyNotFound {
213 key: String,
214 #[snafu(implicit)]
215 location: Location,
216 },
217
218 #[snafu(display("Failed to build table meta for table: {}", table_name))]
219 BuildTableMeta {
220 table_name: String,
221 #[snafu(source)]
222 error: table::metadata::TableMetaBuilderError,
223 #[snafu(implicit)]
224 location: Location,
225 },
226
227 #[snafu(display("Table occurs error"))]
228 Table {
229 #[snafu(implicit)]
230 location: Location,
231 source: table::error::Error,
232 },
233
234 #[snafu(display("Failed to find table route for table id {}", table_id))]
235 TableRouteNotFound {
236 table_id: TableId,
237 #[snafu(implicit)]
238 location: Location,
239 },
240
241 #[snafu(display("Failed to find table repartition metadata for table id {}", table_id))]
242 TableRepartNotFound {
243 table_id: TableId,
244 #[snafu(implicit)]
245 location: Location,
246 },
247
248 #[snafu(display("Failed to decode protobuf"))]
249 DecodeProto {
250 #[snafu(implicit)]
251 location: Location,
252 #[snafu(source)]
253 error: prost::DecodeError,
254 },
255
256 #[snafu(display("Failed to encode object into json"))]
257 EncodeJson {
258 #[snafu(implicit)]
259 location: Location,
260 #[snafu(source)]
261 error: JsonError,
262 },
263
264 #[snafu(display("Failed to decode object from json"))]
265 DecodeJson {
266 #[snafu(implicit)]
267 location: Location,
268 #[snafu(source)]
269 error: JsonError,
270 },
271
272 #[snafu(display("Failed to serialize to json: {}", input))]
273 SerializeToJson {
274 input: String,
275 #[snafu(source)]
276 error: serde_json::error::Error,
277 #[snafu(implicit)]
278 location: Location,
279 },
280
281 #[snafu(display("Failed to deserialize from json: {}", input))]
282 DeserializeFromJson {
283 input: String,
284 #[snafu(source)]
285 error: serde_json::error::Error,
286 #[snafu(implicit)]
287 location: Location,
288 },
289
290 #[snafu(display("Payload not exist"))]
291 PayloadNotExist {
292 #[snafu(implicit)]
293 location: Location,
294 },
295
296 #[snafu(display("Failed to serde json"))]
297 SerdeJson {
298 #[snafu(source)]
299 error: serde_json::error::Error,
300 #[snafu(implicit)]
301 location: Location,
302 },
303
304 #[snafu(display("Failed to parse value {} into key {}", value, key))]
305 ParseOption {
306 key: String,
307 value: String,
308 #[snafu(implicit)]
309 location: Location,
310 },
311
312 #[snafu(display("Corrupted table route data, err: {}", err_msg))]
313 RouteInfoCorrupted {
314 err_msg: String,
315 #[snafu(implicit)]
316 location: Location,
317 },
318
319 #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))]
320 IllegalServerState {
321 code: i32,
322 err_msg: String,
323 #[snafu(implicit)]
324 location: Location,
325 },
326
327 #[snafu(display("Failed to convert alter table request"))]
328 ConvertAlterTableRequest {
329 source: common_grpc_expr::error::Error,
330 #[snafu(implicit)]
331 location: Location,
332 },
333
334 #[snafu(display("Invalid protobuf message: {err_msg}"))]
335 InvalidProtoMsg {
336 err_msg: String,
337 #[snafu(implicit)]
338 location: Location,
339 },
340
341 #[snafu(display("Unexpected: {err_msg}"))]
342 Unexpected {
343 err_msg: String,
344 #[snafu(implicit)]
345 location: Location,
346 },
347
348 #[snafu(display("Table already exists, table: {}", table_name))]
349 TableAlreadyExists {
350 table_name: String,
351 #[snafu(implicit)]
352 location: Location,
353 },
354
355 #[snafu(display("View already exists, view: {}", view_name))]
356 ViewAlreadyExists {
357 view_name: String,
358 #[snafu(implicit)]
359 location: Location,
360 },
361
362 #[snafu(display("Flow already exists: {}", flow_name))]
363 FlowAlreadyExists {
364 flow_name: String,
365 #[snafu(implicit)]
366 location: Location,
367 },
368
369 #[snafu(display("Schema already exists, catalog:{}, schema: {}", catalog, schema))]
370 SchemaAlreadyExists {
371 catalog: String,
372 schema: String,
373 #[snafu(implicit)]
374 location: Location,
375 },
376
377 #[snafu(display("Failed to convert raw key to str"))]
378 ConvertRawKey {
379 #[snafu(implicit)]
380 location: Location,
381 #[snafu(source)]
382 error: Utf8Error,
383 },
384
385 #[snafu(display("Table not found: '{}'", table_name))]
386 TableNotFound {
387 table_name: String,
388 #[snafu(implicit)]
389 location: Location,
390 },
391
392 #[snafu(display("Region not found: {}", region_id))]
393 RegionNotFound {
394 region_id: RegionId,
395 #[snafu(implicit)]
396 location: Location,
397 },
398
399 #[snafu(display("View not found: '{}'", view_name))]
400 ViewNotFound {
401 view_name: String,
402 #[snafu(implicit)]
403 location: Location,
404 },
405
406 #[snafu(display("Flow not found: '{}'", flow_name))]
407 FlowNotFound {
408 flow_name: String,
409 #[snafu(implicit)]
410 location: Location,
411 },
412
413 #[snafu(display("Flow route not found: '{}'", flow_name))]
414 FlowRouteNotFound {
415 flow_name: String,
416 #[snafu(implicit)]
417 location: Location,
418 },
419
420 #[snafu(display("Schema nod found, schema: {}", table_schema))]
421 SchemaNotFound {
422 table_schema: String,
423 #[snafu(implicit)]
424 location: Location,
425 },
426
427 #[snafu(display("Catalog not found, catalog: {}", catalog))]
428 CatalogNotFound {
429 catalog: String,
430 #[snafu(implicit)]
431 location: Location,
432 },
433
434 #[snafu(display("Invalid metadata, err: {}", err_msg))]
435 InvalidMetadata {
436 err_msg: String,
437 #[snafu(implicit)]
438 location: Location,
439 },
440
441 #[snafu(display("Invalid view info, err: {}", err_msg))]
442 InvalidViewInfo {
443 err_msg: String,
444 #[snafu(implicit)]
445 location: Location,
446 },
447
448 #[snafu(display("Invalid flow request body: {:?}", body))]
449 InvalidFlowRequestBody {
450 body: Box<Option<api::v1::flow::flow_request::Body>>,
451 #[snafu(implicit)]
452 location: Location,
453 },
454
455 #[snafu(display("Failed to get kv cache, err: {}", err_msg))]
456 GetKvCache { err_msg: String },
457
458 #[snafu(display("Get null from cache, key: {}", key))]
459 CacheNotGet {
460 key: String,
461 #[snafu(implicit)]
462 location: Location,
463 },
464
465 #[snafu(display("Etcd txn error: {err_msg}"))]
466 EtcdTxnOpResponse {
467 err_msg: String,
468 #[snafu(implicit)]
469 location: Location,
470 },
471
472 #[snafu(display("External error"))]
473 External {
474 #[snafu(implicit)]
475 location: Location,
476 source: BoxedError,
477 },
478
479 #[snafu(display("The response exceeded size limit"))]
480 ResponseExceededSizeLimit {
481 #[snafu(implicit)]
482 location: Location,
483 source: BoxedError,
484 },
485
486 #[snafu(display("Invalid heartbeat response"))]
487 InvalidHeartbeatResponse {
488 #[snafu(implicit)]
489 location: Location,
490 },
491
492 #[snafu(display("Failed to operate on datanode: {}", peer))]
493 OperateDatanode {
494 #[snafu(implicit)]
495 location: Location,
496 peer: Peer,
497 source: BoxedError,
498 },
499
500 #[snafu(display("Retry later"))]
501 RetryLater {
502 source: BoxedError,
503 clean_poisons: bool,
504 },
505
506 #[snafu(display("Abort procedure"))]
507 AbortProcedure {
508 #[snafu(implicit)]
509 location: Location,
510 source: BoxedError,
511 clean_poisons: bool,
512 },
513
514 #[snafu(display(
515 "Failed to encode a wal options to json string, wal_options: {:?}",
516 wal_options
517 ))]
518 EncodeWalOptions {
519 wal_options: WalOptions,
520 #[snafu(source)]
521 error: serde_json::Error,
522 #[snafu(implicit)]
523 location: Location,
524 },
525
526 #[snafu(display("Invalid number of topics {}", num_topics))]
527 InvalidNumTopics {
528 num_topics: usize,
529 #[snafu(implicit)]
530 location: Location,
531 },
532
533 #[snafu(display(
534 "Failed to build a Kafka client, broker endpoints: {:?}",
535 broker_endpoints
536 ))]
537 BuildKafkaClient {
538 broker_endpoints: Vec<String>,
539 #[snafu(implicit)]
540 location: Location,
541 #[snafu(source)]
542 error: rskafka::client::error::Error,
543 },
544
545 #[snafu(display("Failed to create TLS Config"))]
546 TlsConfig {
547 #[snafu(implicit)]
548 location: Location,
549 source: common_wal::error::Error,
550 },
551
552 #[snafu(display("Failed to build a Kafka controller client"))]
553 BuildKafkaCtrlClient {
554 #[snafu(implicit)]
555 location: Location,
556 #[snafu(source)]
557 error: rskafka::client::error::Error,
558 },
559
560 #[snafu(display(
561 "Failed to get a Kafka partition client, topic: {}, partition: {}",
562 topic,
563 partition
564 ))]
565 KafkaPartitionClient {
566 topic: String,
567 partition: i32,
568 #[snafu(implicit)]
569 location: Location,
570 #[snafu(source)]
571 error: rskafka::client::error::Error,
572 },
573
574 #[snafu(display(
575 "Failed to get offset from Kafka, topic: {}, partition: {}",
576 topic,
577 partition
578 ))]
579 KafkaGetOffset {
580 topic: String,
581 partition: i32,
582 #[snafu(implicit)]
583 location: Location,
584 #[snafu(source)]
585 error: rskafka::client::error::Error,
586 },
587
588 #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
589 ProduceRecord {
590 topic: String,
591 #[snafu(implicit)]
592 location: Location,
593 #[snafu(source)]
594 error: rskafka::client::error::Error,
595 },
596
597 #[snafu(display("Failed to create a Kafka wal topic"))]
598 CreateKafkaWalTopic {
599 #[snafu(implicit)]
600 location: Location,
601 #[snafu(source)]
602 error: rskafka::client::error::Error,
603 },
604
605 #[snafu(display("The topic pool is empty"))]
606 EmptyTopicPool {
607 #[snafu(implicit)]
608 location: Location,
609 },
610
611 #[snafu(display("Unexpected table route type: {}", err_msg))]
612 UnexpectedLogicalRouteTable {
613 #[snafu(implicit)]
614 location: Location,
615 err_msg: String,
616 },
617
618 #[snafu(display("The tasks of {} cannot be empty", name))]
619 EmptyDdlTasks {
620 name: String,
621 #[snafu(implicit)]
622 location: Location,
623 },
624
625 #[snafu(display("Metadata corruption: {}", err_msg))]
626 MetadataCorruption {
627 err_msg: String,
628 #[snafu(implicit)]
629 location: Location,
630 },
631
632 #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))]
633 AlterLogicalTablesInvalidArguments {
634 err_msg: String,
635 #[snafu(implicit)]
636 location: Location,
637 },
638
639 #[snafu(display("Create logical tables invalid arguments: {}", err_msg))]
640 CreateLogicalTablesInvalidArguments {
641 err_msg: String,
642 #[snafu(implicit)]
643 location: Location,
644 },
645
646 #[snafu(display("Invalid node info key: {}", key))]
647 InvalidNodeInfoKey {
648 key: String,
649 #[snafu(implicit)]
650 location: Location,
651 },
652
653 #[snafu(display("Invalid node stat key: {}", key))]
654 InvalidStatKey {
655 key: String,
656 #[snafu(implicit)]
657 location: Location,
658 },
659
660 #[snafu(display("Failed to parse number: {}", err_msg))]
661 ParseNum {
662 err_msg: String,
663 #[snafu(source)]
664 error: std::num::ParseIntError,
665 #[snafu(implicit)]
666 location: Location,
667 },
668
669 #[snafu(display("Invalid role: {}", role))]
670 InvalidRole {
671 role: i32,
672 #[snafu(implicit)]
673 location: Location,
674 },
675
676 #[snafu(display("Invalid set database option, key: {}, value: {}", key, value))]
677 InvalidSetDatabaseOption {
678 key: String,
679 value: String,
680 #[snafu(implicit)]
681 location: Location,
682 },
683
684 #[snafu(display("Invalid unset database option, key: {}", key))]
685 InvalidUnsetDatabaseOption {
686 key: String,
687 #[snafu(implicit)]
688 location: Location,
689 },
690
691 #[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
692 MismatchPrefix {
693 prefix: String,
694 key: String,
695 #[snafu(implicit)]
696 location: Location,
697 },
698
699 #[snafu(display("Failed to move values: {err_msg}"))]
700 MoveValues {
701 err_msg: String,
702 #[snafu(implicit)]
703 location: Location,
704 },
705
706 #[snafu(display("Failed to parse {} from utf8", name))]
707 FromUtf8 {
708 name: String,
709 #[snafu(source)]
710 error: std::string::FromUtf8Error,
711 #[snafu(implicit)]
712 location: Location,
713 },
714
715 #[snafu(display("Value not exists"))]
716 ValueNotExist {
717 #[snafu(implicit)]
718 location: Location,
719 },
720
721 #[snafu(display("Failed to get cache"))]
722 GetCache { source: Arc<Error> },
723
724 #[cfg(feature = "pg_kvbackend")]
725 #[snafu(display("Failed to execute via Postgres, sql: {}", sql))]
726 PostgresExecution {
727 sql: String,
728 #[snafu(source)]
729 error: tokio_postgres::Error,
730 #[snafu(implicit)]
731 location: Location,
732 },
733
734 #[cfg(feature = "pg_kvbackend")]
735 #[snafu(display("Failed to create connection pool for Postgres"))]
736 CreatePostgresPool {
737 #[snafu(source)]
738 error: deadpool_postgres::CreatePoolError,
739 #[snafu(implicit)]
740 location: Location,
741 },
742
743 #[cfg(feature = "pg_kvbackend")]
744 #[snafu(display("Failed to get Postgres connection from pool: {}", reason))]
745 GetPostgresConnection {
746 reason: String,
747 #[snafu(implicit)]
748 location: Location,
749 },
750
751 #[cfg(feature = "pg_kvbackend")]
752 #[snafu(display("Failed to {} Postgres transaction", operation))]
753 PostgresTransaction {
754 #[snafu(source)]
755 error: tokio_postgres::Error,
756 #[snafu(implicit)]
757 location: Location,
758 operation: String,
759 },
760
761 #[cfg(feature = "pg_kvbackend")]
762 #[snafu(display("Failed to setup PostgreSQL TLS configuration: {}", reason))]
763 PostgresTlsConfig {
764 reason: String,
765 #[snafu(implicit)]
766 location: Location,
767 },
768
769 #[snafu(display("Failed to load TLS certificate from path: {}", path))]
770 LoadTlsCertificate {
771 path: String,
772 #[snafu(source)]
773 error: std::io::Error,
774 #[snafu(implicit)]
775 location: Location,
776 },
777
778 #[cfg(feature = "pg_kvbackend")]
779 #[snafu(display("Invalid TLS configuration: {}", reason))]
780 InvalidTlsConfig {
781 reason: String,
782 #[snafu(implicit)]
783 location: Location,
784 },
785
786 #[cfg(feature = "mysql_kvbackend")]
787 #[snafu(display("Failed to execute via MySql, sql: {}", sql))]
788 MySqlExecution {
789 sql: String,
790 #[snafu(source)]
791 error: sqlx::Error,
792 #[snafu(implicit)]
793 location: Location,
794 },
795
796 #[cfg(feature = "mysql_kvbackend")]
797 #[snafu(display("Failed to create connection pool for MySql"))]
798 CreateMySqlPool {
799 #[snafu(source)]
800 error: sqlx::Error,
801 #[snafu(implicit)]
802 location: Location,
803 },
804
805 #[cfg(feature = "mysql_kvbackend")]
806 #[snafu(display("Failed to {} MySql transaction", operation))]
807 MySqlTransaction {
808 #[snafu(source)]
809 error: sqlx::Error,
810 #[snafu(implicit)]
811 location: Location,
812 operation: String,
813 },
814
815 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
816 #[snafu(display("Rds transaction retry failed"))]
817 RdsTransactionRetryFailed {
818 #[snafu(implicit)]
819 location: Location,
820 },
821
822 #[snafu(display(
823 "Datanode table info not found, table id: {}, datanode id: {}",
824 table_id,
825 datanode_id
826 ))]
827 DatanodeTableInfoNotFound {
828 datanode_id: DatanodeId,
829 table_id: TableId,
830 #[snafu(implicit)]
831 location: Location,
832 },
833
834 #[snafu(display("Invalid topic name prefix: {}", prefix))]
835 InvalidTopicNamePrefix {
836 prefix: String,
837 #[snafu(implicit)]
838 location: Location,
839 },
840
841 #[snafu(display("Failed to parse wal options: {}", wal_options))]
842 ParseWalOptions {
843 wal_options: String,
844 #[snafu(implicit)]
845 location: Location,
846 #[snafu(source)]
847 error: serde_json::Error,
848 },
849
850 #[snafu(display("No leader found for table_id: {}", table_id))]
851 NoLeader {
852 table_id: TableId,
853 #[snafu(implicit)]
854 location: Location,
855 },
856
857 #[snafu(display(
858 "Procedure poison key already exists with a different value, key: {}, value: {}",
859 key,
860 value
861 ))]
862 ProcedurePoisonConflict {
863 key: String,
864 value: String,
865 #[snafu(implicit)]
866 location: Location,
867 },
868
869 #[snafu(display("Failed to put poison, table metadata may be corrupted"))]
870 PutPoison {
871 #[snafu(implicit)]
872 location: Location,
873 #[snafu(source)]
874 source: common_procedure::error::Error,
875 },
876
877 #[snafu(display("Failed to parse timezone"))]
878 InvalidTimeZone {
879 #[snafu(implicit)]
880 location: Location,
881 #[snafu(source)]
882 error: common_time::error::Error,
883 },
884 #[snafu(display("Invalid file path: {}", file_path))]
885 InvalidFilePath {
886 #[snafu(implicit)]
887 location: Location,
888 file_path: String,
889 },
890
891 #[snafu(display("Failed to serialize flexbuffers"))]
892 SerializeFlexbuffers {
893 #[snafu(implicit)]
894 location: Location,
895 #[snafu(source)]
896 error: flexbuffers::SerializationError,
897 },
898
899 #[snafu(display("Failed to deserialize flexbuffers"))]
900 DeserializeFlexbuffers {
901 #[snafu(implicit)]
902 location: Location,
903 #[snafu(source)]
904 error: flexbuffers::DeserializationError,
905 },
906
907 #[snafu(display("Failed to read flexbuffers"))]
908 ReadFlexbuffers {
909 #[snafu(implicit)]
910 location: Location,
911 #[snafu(source)]
912 error: flexbuffers::ReaderError,
913 },
914
915 #[snafu(display("Invalid file name: {}", reason))]
916 InvalidFileName {
917 #[snafu(implicit)]
918 location: Location,
919 reason: String,
920 },
921
922 #[snafu(display("Invalid file extension: {}", reason))]
923 InvalidFileExtension {
924 #[snafu(implicit)]
925 location: Location,
926 reason: String,
927 },
928
929 #[snafu(display("Failed to write object, file path: {}", file_path))]
930 WriteObject {
931 #[snafu(implicit)]
932 location: Location,
933 file_path: String,
934 #[snafu(source)]
935 error: object_store::Error,
936 },
937
938 #[snafu(display("Failed to read object, file path: {}", file_path))]
939 ReadObject {
940 #[snafu(implicit)]
941 location: Location,
942 file_path: String,
943 #[snafu(source)]
944 error: object_store::Error,
945 },
946
947 #[snafu(display("Missing column ids"))]
948 MissingColumnIds {
949 #[snafu(implicit)]
950 location: Location,
951 },
952
953 #[snafu(display(
954 "Missing column in column metadata: {}, table: {}, table_id: {}",
955 column_name,
956 table_name,
957 table_id,
958 ))]
959 MissingColumnInColumnMetadata {
960 column_name: String,
961 #[snafu(implicit)]
962 location: Location,
963 table_name: String,
964 table_id: TableId,
965 },
966
967 #[snafu(display(
968 "Mismatch column id: column_name: {}, column_id: {}, table: {}, table_id: {}",
969 column_name,
970 column_id,
971 table_name,
972 table_id,
973 ))]
974 MismatchColumnId {
975 column_name: String,
976 column_id: u32,
977 #[snafu(implicit)]
978 location: Location,
979 table_name: String,
980 table_id: TableId,
981 },
982
983 #[snafu(display("Failed to convert column def, column: {}", column))]
984 ConvertColumnDef {
985 column: String,
986 #[snafu(implicit)]
987 location: Location,
988 source: api::error::Error,
989 },
990
991 #[snafu(display("Failed to convert time ranges"))]
992 ConvertTimeRanges {
993 #[snafu(implicit)]
994 location: Location,
995 source: api::error::Error,
996 },
997
998 #[snafu(display(
999 "Column metadata inconsistencies found in table: {}, table_id: {}",
1000 table_name,
1001 table_id
1002 ))]
1003 ColumnMetadataConflicts {
1004 table_name: String,
1005 table_id: TableId,
1006 },
1007
1008 #[snafu(display(
1009 "Column not found in column metadata, column_name: {}, column_id: {}",
1010 column_name,
1011 column_id
1012 ))]
1013 ColumnNotFound { column_name: String, column_id: u32 },
1014
1015 #[snafu(display(
1016 "Column id mismatch, column_name: {}, expected column_id: {}, actual column_id: {}",
1017 column_name,
1018 expected_column_id,
1019 actual_column_id
1020 ))]
1021 ColumnIdMismatch {
1022 column_name: String,
1023 expected_column_id: u32,
1024 actual_column_id: u32,
1025 },
1026
1027 #[snafu(display(
1028 "Timestamp column mismatch, expected column_name: {}, expected column_id: {}, actual column_name: {}, actual column_id: {}",
1029 expected_column_name,
1030 expected_column_id,
1031 actual_column_name,
1032 actual_column_id,
1033 ))]
1034 TimestampMismatch {
1035 expected_column_name: String,
1036 expected_column_id: u32,
1037 actual_column_name: String,
1038 actual_column_id: u32,
1039 },
1040
1041 #[cfg(feature = "enterprise")]
1042 #[snafu(display("Too large duration"))]
1043 TooLargeDuration {
1044 #[snafu(source)]
1045 error: prost_types::DurationError,
1046 #[snafu(implicit)]
1047 location: Location,
1048 },
1049
1050 #[cfg(feature = "enterprise")]
1051 #[snafu(display("Negative duration"))]
1052 NegativeDuration {
1053 #[snafu(source)]
1054 error: prost_types::DurationError,
1055 #[snafu(implicit)]
1056 location: Location,
1057 },
1058
1059 #[cfg(feature = "enterprise")]
1060 #[snafu(display("Missing interval field"))]
1061 MissingInterval {
1062 #[snafu(implicit)]
1063 location: Location,
1064 },
1065}
1066
1067pub type Result<T> = std::result::Result<T, Error>;
1068
1069impl ErrorExt for Error {
1070 fn status_code(&self) -> StatusCode {
1071 use Error::*;
1072 match self {
1073 IllegalServerState { .. }
1074 | EtcdTxnOpResponse { .. }
1075 | EtcdFailed { .. }
1076 | EtcdTxnFailed { .. }
1077 | ConnectEtcd { .. }
1078 | MoveValues { .. }
1079 | GetCache { .. }
1080 | SerializeToJson { .. }
1081 | DeserializeFromJson { .. } => StatusCode::Internal,
1082
1083 NoLeader { .. } => StatusCode::TableUnavailable,
1084 ValueNotExist { .. }
1085 | ProcedurePoisonConflict { .. }
1086 | ProcedureStateReceiverNotFound { .. }
1087 | MissingColumnIds { .. }
1088 | MissingColumnInColumnMetadata { .. }
1089 | MismatchColumnId { .. }
1090 | ColumnMetadataConflicts { .. }
1091 | ColumnNotFound { .. }
1092 | ColumnIdMismatch { .. }
1093 | TimestampMismatch { .. } => StatusCode::Unexpected,
1094
1095 Unsupported { .. } => StatusCode::Unsupported,
1096 WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
1097
1098 SerdeJson { .. }
1099 | ParseOption { .. }
1100 | RouteInfoCorrupted { .. }
1101 | InvalidProtoMsg { .. }
1102 | InvalidMetadata { .. }
1103 | Unexpected { .. }
1104 | TableInfoNotFound { .. }
1105 | NextSequence { .. }
1106 | UnexpectedSequenceValue { .. }
1107 | InvalidHeartbeatResponse { .. }
1108 | EncodeJson { .. }
1109 | DecodeJson { .. }
1110 | PayloadNotExist { .. }
1111 | ConvertRawKey { .. }
1112 | DecodeProto { .. }
1113 | BuildTableMeta { .. }
1114 | TableRouteNotFound { .. }
1115 | TableRepartNotFound { .. }
1116 | ConvertRawTableInfo { .. }
1117 | RegionOperatingRace { .. }
1118 | EncodeWalOptions { .. }
1119 | BuildKafkaClient { .. }
1120 | BuildKafkaCtrlClient { .. }
1121 | KafkaPartitionClient { .. }
1122 | ProduceRecord { .. }
1123 | CreateKafkaWalTopic { .. }
1124 | EmptyTopicPool { .. }
1125 | UnexpectedLogicalRouteTable { .. }
1126 | ProcedureOutput { .. }
1127 | FromUtf8 { .. }
1128 | MetadataCorruption { .. }
1129 | ParseWalOptions { .. }
1130 | KafkaGetOffset { .. }
1131 | ReadFlexbuffers { .. }
1132 | SerializeFlexbuffers { .. }
1133 | DeserializeFlexbuffers { .. }
1134 | ConvertTimeRanges { .. } => StatusCode::Unexpected,
1135
1136 GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal,
1137
1138 SchemaAlreadyExists { .. } => StatusCode::DatabaseAlreadyExists,
1139
1140 ProcedureNotFound { .. }
1141 | InvalidViewInfo { .. }
1142 | PrimaryKeyNotFound { .. }
1143 | EmptyKey { .. }
1144 | AlterLogicalTablesInvalidArguments { .. }
1145 | CreateLogicalTablesInvalidArguments { .. }
1146 | MismatchPrefix { .. }
1147 | TlsConfig { .. }
1148 | InvalidSetDatabaseOption { .. }
1149 | InvalidUnsetDatabaseOption { .. }
1150 | InvalidTopicNamePrefix { .. }
1151 | InvalidTimeZone { .. }
1152 | InvalidFileExtension { .. }
1153 | InvalidFileName { .. }
1154 | InvalidFlowRequestBody { .. }
1155 | InvalidFilePath { .. } => StatusCode::InvalidArguments,
1156
1157 #[cfg(feature = "enterprise")]
1158 MissingInterval { .. } | NegativeDuration { .. } | TooLargeDuration { .. } => {
1159 StatusCode::InvalidArguments
1160 }
1161
1162 FlowNotFound { .. } => StatusCode::FlowNotFound,
1163 FlowRouteNotFound { .. } => StatusCode::Unexpected,
1164 FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
1165
1166 ViewNotFound { .. } | TableNotFound { .. } | RegionNotFound { .. } => {
1167 StatusCode::TableNotFound
1168 }
1169 ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
1170
1171 SubmitProcedure { source, .. }
1172 | QueryProcedure { source, .. }
1173 | WaitProcedure { source, .. }
1174 | StartProcedureManager { source, .. }
1175 | StopProcedureManager { source, .. } => source.status_code(),
1176 RegisterProcedureLoader { source, .. } => source.status_code(),
1177 External { source, .. } => source.status_code(),
1178 ResponseExceededSizeLimit { source, .. } => source.status_code(),
1179 OperateDatanode { source, .. } => source.status_code(),
1180 Table { source, .. } => source.status_code(),
1181 RetryLater { source, .. } => source.status_code(),
1182 AbortProcedure { source, .. } => source.status_code(),
1183 ConvertAlterTableRequest { source, .. } => source.status_code(),
1184 PutPoison { source, .. } => source.status_code(),
1185 ConvertColumnDef { source, .. } => source.status_code(),
1186 ProcedureStateReceiver { source, .. } => source.status_code(),
1187 RegisterRepartitionProcedureLoader { source, .. } => source.status_code(),
1188 CreateRepartitionProcedure { source, .. } => source.status_code(),
1189
1190 ParseProcedureId { .. }
1191 | InvalidNumTopics { .. }
1192 | SchemaNotFound { .. }
1193 | CatalogNotFound { .. }
1194 | InvalidNodeInfoKey { .. }
1195 | InvalidStatKey { .. }
1196 | ParseNum { .. }
1197 | InvalidRole { .. }
1198 | EmptyDdlTasks { .. } => StatusCode::InvalidArguments,
1199
1200 LoadTlsCertificate { .. } => StatusCode::Internal,
1201
1202 #[cfg(feature = "pg_kvbackend")]
1203 PostgresExecution { .. }
1204 | CreatePostgresPool { .. }
1205 | GetPostgresConnection { .. }
1206 | PostgresTransaction { .. }
1207 | PostgresTlsConfig { .. }
1208 | InvalidTlsConfig { .. } => StatusCode::Internal,
1209 #[cfg(feature = "mysql_kvbackend")]
1210 MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => {
1211 StatusCode::Internal
1212 }
1213 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1214 RdsTransactionRetryFailed { .. } => StatusCode::Internal,
1215 DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
1216 }
1217 }
1218
1219 fn as_any(&self) -> &dyn std::any::Any {
1220 self
1221 }
1222}
1223
1224impl Error {
1225 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
1226 pub fn is_serialization_error(&self) -> bool {
1228 match self {
1229 #[cfg(feature = "pg_kvbackend")]
1230 Error::PostgresTransaction { error, .. } => {
1231 error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1232 }
1233 #[cfg(feature = "pg_kvbackend")]
1234 Error::PostgresExecution { error, .. } => {
1235 error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
1236 }
1237 #[cfg(feature = "mysql_kvbackend")]
1238 Error::MySqlExecution {
1239 error: sqlx::Error::Database(database_error),
1240 ..
1241 } => {
1242 matches!(
1243 database_error.message(),
1244 "Deadlock found when trying to get lock; try restarting transaction"
1245 | "can't serialize access for this transaction"
1246 )
1247 }
1248 _ => false,
1249 }
1250 }
1251
1252 pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
1254 Error::RetryLater {
1255 source: BoxedError::new(err),
1256 clean_poisons: false,
1257 }
1258 }
1259
1260 pub fn is_retry_later(&self) -> bool {
1262 matches!(self, Error::RetryLater { .. })
1263 }
1264
1265 pub fn need_clean_poisons(&self) -> bool {
1267 matches!(
1268 self,
1269 Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
1270 ) || matches!(
1271 self,
1272 Error::RetryLater { clean_poisons, .. } if *clean_poisons
1273 )
1274 }
1275
1276 pub fn is_exceeded_size_limit(&self) -> bool {
1278 match self {
1279 Error::EtcdFailed {
1280 error: etcd_client::Error::GRpcStatus(status),
1281 ..
1282 } => status.code() == tonic::Code::OutOfRange,
1283 Error::ResponseExceededSizeLimit { .. } => true,
1284 _ => false,
1285 }
1286 }
1287}