common_query/
lib.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod columnar_value;
16pub mod error;
17pub mod logical_plan;
18pub mod prelude;
19pub mod request;
20pub mod stream;
21#[cfg(any(test, feature = "testing"))]
22pub mod test_util;
23
24use std::fmt::{Debug, Display, Formatter};
25use std::sync::Arc;
26
27use api::greptime_proto::v1::AddColumnLocation as Location;
28use api::greptime_proto::v1::add_column_location::LocationType;
29use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
30use datafusion::physical_plan::ExecutionPlan;
31use serde::{Deserialize, Serialize};
32use sqlparser_derive::{Visit, VisitMut};
33
34/// new Output struct with output data(previously Output) and output meta
35#[derive(Debug)]
36pub struct Output {
37    pub data: OutputData,
38    pub meta: OutputMeta,
39}
40
41/// Original Output struct
42/// carrying result data to response/client/user interface
43pub enum OutputData {
44    AffectedRows(OutputRows),
45    RecordBatches(RecordBatches),
46    Stream(SendableRecordBatchStream),
47}
48
49impl OutputData {
50    /// Consume the data to pretty printed string.
51    pub async fn pretty_print(self) -> String {
52        match self {
53            OutputData::AffectedRows(x) => {
54                format!("Affected Rows: {x}")
55            }
56            OutputData::RecordBatches(x) => x.pretty_print().unwrap_or_else(|e| e.to_string()),
57            OutputData::Stream(x) => common_recordbatch::util::collect_batches(x)
58                .await
59                .and_then(|x| x.pretty_print())
60                .unwrap_or_else(|e| e.to_string()),
61        }
62    }
63}
64
65/// OutputMeta stores meta information produced/generated during the execution
66#[derive(Debug, Default)]
67pub struct OutputMeta {
68    /// May exist for query output. One can retrieve execution metrics from this plan.
69    pub plan: Option<Arc<dyn ExecutionPlan>>,
70    pub cost: OutputCost,
71}
72
73impl Output {
74    pub fn new_with_affected_rows(affected_rows: OutputRows) -> Self {
75        Self {
76            data: OutputData::AffectedRows(affected_rows),
77            meta: Default::default(),
78        }
79    }
80
81    pub fn new_with_record_batches(recordbatches: RecordBatches) -> Self {
82        Self {
83            data: OutputData::RecordBatches(recordbatches),
84            meta: Default::default(),
85        }
86    }
87
88    pub fn new_with_stream(stream: SendableRecordBatchStream) -> Self {
89        Self {
90            data: OutputData::Stream(stream),
91            meta: Default::default(),
92        }
93    }
94
95    pub fn new(data: OutputData, meta: OutputMeta) -> Self {
96        Self { data, meta }
97    }
98
99    pub fn extract_rows_and_cost(&self) -> (OutputRows, OutputCost) {
100        match self.data {
101            OutputData::AffectedRows(rows) => (rows, self.meta.cost),
102            _ => (0, self.meta.cost),
103        }
104    }
105}
106
107impl Debug for OutputData {
108    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
109        match self {
110            OutputData::AffectedRows(rows) => write!(f, "OutputData::AffectedRows({rows})"),
111            OutputData::RecordBatches(recordbatches) => {
112                write!(f, "OutputData::RecordBatches({recordbatches:?})")
113            }
114            OutputData::Stream(s) => {
115                write!(f, "OutputData::Stream(<{}>)", s.name())
116            }
117        }
118    }
119}
120
121impl OutputMeta {
122    pub fn new(plan: Option<Arc<dyn ExecutionPlan>>, cost: usize) -> Self {
123        Self { plan, cost }
124    }
125
126    pub fn new_with_plan(plan: Arc<dyn ExecutionPlan>) -> Self {
127        Self {
128            plan: Some(plan),
129            cost: 0,
130        }
131    }
132
133    pub fn new_with_cost(cost: usize) -> Self {
134        Self { plan: None, cost }
135    }
136}
137
138#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
139pub enum AddColumnLocation {
140    First,
141    After { column_name: String },
142}
143
144impl Display for AddColumnLocation {
145    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146        match self {
147            AddColumnLocation::First => write!(f, r#"FIRST"#),
148            AddColumnLocation::After { column_name } => {
149                write!(f, r#"AFTER {column_name}"#)
150            }
151        }
152    }
153}
154
155impl From<&AddColumnLocation> for Location {
156    fn from(value: &AddColumnLocation) -> Self {
157        match value {
158            AddColumnLocation::First => Location {
159                location_type: LocationType::First.into(),
160                after_column_name: String::default(),
161            },
162            AddColumnLocation::After { column_name } => Location {
163                location_type: LocationType::After.into(),
164                after_column_name: column_name.clone(),
165            },
166        }
167    }
168}
169
170pub type OutputRows = usize;
171pub type OutputCost = usize;