1pub mod columnar_value;
16pub mod error;
17pub mod logical_plan;
18pub mod prelude;
19pub mod request;
20pub mod stream;
21#[cfg(any(test, feature = "testing"))]
22pub mod test_util;
23
24use std::fmt::{Debug, Display, Formatter};
25use std::sync::Arc;
26
27use api::greptime_proto::v1::AddColumnLocation as Location;
28use api::greptime_proto::v1::add_column_location::LocationType;
29use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
30use datafusion::physical_plan::ExecutionPlan;
31use serde::{Deserialize, Serialize};
32use sqlparser_derive::{Visit, VisitMut};
33
34#[derive(Debug)]
36pub struct Output {
37 pub data: OutputData,
38 pub meta: OutputMeta,
39}
40
41pub enum OutputData {
44 AffectedRows(OutputRows),
45 RecordBatches(RecordBatches),
46 Stream(SendableRecordBatchStream),
47}
48
49impl OutputData {
50 pub async fn pretty_print(self) -> String {
52 match self {
53 OutputData::AffectedRows(x) => {
54 format!("Affected Rows: {x}")
55 }
56 OutputData::RecordBatches(x) => x.pretty_print().unwrap_or_else(|e| e.to_string()),
57 OutputData::Stream(x) => common_recordbatch::util::collect_batches(x)
58 .await
59 .and_then(|x| x.pretty_print())
60 .unwrap_or_else(|e| e.to_string()),
61 }
62 }
63}
64
65#[derive(Debug, Default)]
67pub struct OutputMeta {
68 pub plan: Option<Arc<dyn ExecutionPlan>>,
70 pub cost: OutputCost,
71}
72
73impl Output {
74 pub fn new_with_affected_rows(affected_rows: OutputRows) -> Self {
75 Self {
76 data: OutputData::AffectedRows(affected_rows),
77 meta: Default::default(),
78 }
79 }
80
81 pub fn new_with_record_batches(recordbatches: RecordBatches) -> Self {
82 Self {
83 data: OutputData::RecordBatches(recordbatches),
84 meta: Default::default(),
85 }
86 }
87
88 pub fn new_with_stream(stream: SendableRecordBatchStream) -> Self {
89 Self {
90 data: OutputData::Stream(stream),
91 meta: Default::default(),
92 }
93 }
94
95 pub fn new(data: OutputData, meta: OutputMeta) -> Self {
96 Self { data, meta }
97 }
98
99 pub fn extract_rows_and_cost(&self) -> (OutputRows, OutputCost) {
100 match self.data {
101 OutputData::AffectedRows(rows) => (rows, self.meta.cost),
102 _ => (0, self.meta.cost),
103 }
104 }
105}
106
107impl Debug for OutputData {
108 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
109 match self {
110 OutputData::AffectedRows(rows) => write!(f, "OutputData::AffectedRows({rows})"),
111 OutputData::RecordBatches(recordbatches) => {
112 write!(f, "OutputData::RecordBatches({recordbatches:?})")
113 }
114 OutputData::Stream(s) => {
115 write!(f, "OutputData::Stream(<{}>)", s.name())
116 }
117 }
118 }
119}
120
121impl OutputMeta {
122 pub fn new(plan: Option<Arc<dyn ExecutionPlan>>, cost: usize) -> Self {
123 Self { plan, cost }
124 }
125
126 pub fn new_with_plan(plan: Arc<dyn ExecutionPlan>) -> Self {
127 Self {
128 plan: Some(plan),
129 cost: 0,
130 }
131 }
132
133 pub fn new_with_cost(cost: usize) -> Self {
134 Self { plan: None, cost }
135 }
136}
137
138#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
139pub enum AddColumnLocation {
140 First,
141 After { column_name: String },
142}
143
144impl Display for AddColumnLocation {
145 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146 match self {
147 AddColumnLocation::First => write!(f, r#"FIRST"#),
148 AddColumnLocation::After { column_name } => {
149 write!(f, r#"AFTER {column_name}"#)
150 }
151 }
152 }
153}
154
155impl From<&AddColumnLocation> for Location {
156 fn from(value: &AddColumnLocation) -> Self {
157 match value {
158 AddColumnLocation::First => Location {
159 location_type: LocationType::First.into(),
160 after_column_name: String::default(),
161 },
162 AddColumnLocation::After { column_name } => Location {
163 location_type: LocationType::After.into(),
164 after_column_name: column_name.clone(),
165 },
166 }
167 }
168}
169
170pub type OutputRows = usize;
171pub type OutputCost = usize;