mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
Compare commits
5 Commits
replace-ar
...
v0.1.0-alp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ea1896493b | ||
|
|
66bca11401 | ||
|
|
7c16a4a17b | ||
|
|
28bd7404ad | ||
|
|
0653301754 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -6312,6 +6312,7 @@ dependencies = [
|
||||
"sha1",
|
||||
"snafu",
|
||||
"snap",
|
||||
"sql",
|
||||
"strum",
|
||||
"table",
|
||||
"tempdir",
|
||||
|
||||
@@ -28,7 +28,7 @@ use arrow::record_batch::RecordBatch;
|
||||
use clap::Parser;
|
||||
use client::admin::Admin;
|
||||
use client::api::v1::column::Values;
|
||||
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateExpr, InsertExpr};
|
||||
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertExpr, TableId};
|
||||
use client::{Client, Database, Select};
|
||||
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
|
||||
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
|
||||
@@ -219,126 +219,126 @@ fn build_values(column: &ArrayRef) -> Values {
|
||||
}
|
||||
}
|
||||
|
||||
fn create_table_expr() -> CreateExpr {
|
||||
CreateExpr {
|
||||
catalog_name: Some(CATALOG_NAME.to_string()),
|
||||
schema_name: Some(SCHEMA_NAME.to_string()),
|
||||
fn create_table_expr() -> CreateTableExpr {
|
||||
CreateTableExpr {
|
||||
catalog_name: CATALOG_NAME.to_string(),
|
||||
schema_name: SCHEMA_NAME.to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
desc: None,
|
||||
desc: "".to_string(),
|
||||
column_defs: vec![
|
||||
ColumnDef {
|
||||
name: "VendorID".to_string(),
|
||||
datatype: ColumnDataType::Int64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "tpep_pickup_datetime".to_string(),
|
||||
datatype: ColumnDataType::Int64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "tpep_dropoff_datetime".to_string(),
|
||||
datatype: ColumnDataType::Int64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "passenger_count".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "trip_distance".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "RatecodeID".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "store_and_fwd_flag".to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "PULocationID".to_string(),
|
||||
datatype: ColumnDataType::Int64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "DOLocationID".to_string(),
|
||||
datatype: ColumnDataType::Int64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "payment_type".to_string(),
|
||||
datatype: ColumnDataType::Int64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "fare_amount".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "extra".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "mta_tax".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "tip_amount".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "tolls_amount".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "improvement_surcharge".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "total_amount".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "congestion_surcharge".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "airport_fee".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
],
|
||||
time_index: "tpep_pickup_datetime".to_string(),
|
||||
@@ -346,7 +346,7 @@ fn create_table_expr() -> CreateExpr {
|
||||
create_if_not_exists: false,
|
||||
table_options: Default::default(),
|
||||
region_ids: vec![0],
|
||||
table_id: Some(0),
|
||||
table_id: Some(TableId { id: 0 }),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ message AdminResponse {
|
||||
message AdminExpr {
|
||||
ExprHeader header = 1;
|
||||
oneof expr {
|
||||
CreateExpr create = 2;
|
||||
CreateTableExpr create_table = 2;
|
||||
AlterExpr alter = 3;
|
||||
CreateDatabaseExpr create_database = 4;
|
||||
DropTableExpr drop_table = 5;
|
||||
@@ -31,24 +31,23 @@ message AdminResult {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(hl): rename to CreateTableExpr
|
||||
message CreateExpr {
|
||||
optional string catalog_name = 1;
|
||||
optional string schema_name = 2;
|
||||
message CreateTableExpr {
|
||||
string catalog_name = 1;
|
||||
string schema_name = 2;
|
||||
string table_name = 3;
|
||||
optional string desc = 4;
|
||||
string desc = 4;
|
||||
repeated ColumnDef column_defs = 5;
|
||||
string time_index = 6;
|
||||
repeated string primary_keys = 7;
|
||||
bool create_if_not_exists = 8;
|
||||
map<string, string> table_options = 9;
|
||||
optional uint32 table_id = 10;
|
||||
TableId table_id = 10;
|
||||
repeated uint32 region_ids = 11;
|
||||
}
|
||||
|
||||
message AlterExpr {
|
||||
optional string catalog_name = 1;
|
||||
optional string schema_name = 2;
|
||||
string catalog_name = 1;
|
||||
string schema_name = 2;
|
||||
string table_name = 3;
|
||||
oneof kind {
|
||||
AddColumns add_columns = 4;
|
||||
@@ -62,6 +61,11 @@ message DropTableExpr {
|
||||
string table_name = 3;
|
||||
}
|
||||
|
||||
message CreateDatabaseExpr {
|
||||
//TODO(hl): maybe rename to schema_name?
|
||||
string database_name = 1;
|
||||
}
|
||||
|
||||
message AddColumns {
|
||||
repeated AddColumn add_columns = 1;
|
||||
}
|
||||
@@ -79,7 +83,6 @@ message DropColumn {
|
||||
string name = 1;
|
||||
}
|
||||
|
||||
message CreateDatabaseExpr {
|
||||
//TODO(hl): maybe rename to schema_name?
|
||||
string database_name = 1;
|
||||
message TableId {
|
||||
uint32 id = 1;
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ message ColumnDef {
|
||||
string name = 1;
|
||||
ColumnDataType datatype = 2;
|
||||
bool is_nullable = 3;
|
||||
optional bytes default_constraint = 4;
|
||||
bytes default_constraint = 4;
|
||||
}
|
||||
|
||||
enum ColumnDataType {
|
||||
|
||||
@@ -23,12 +23,13 @@ impl ColumnDef {
|
||||
pub fn try_as_column_schema(&self) -> Result<ColumnSchema> {
|
||||
let data_type = ColumnDataTypeWrapper::try_new(self.datatype)?;
|
||||
|
||||
let constraint = match &self.default_constraint {
|
||||
None => None,
|
||||
Some(v) => Some(
|
||||
ColumnDefaultConstraint::try_from(&v[..])
|
||||
let constraint = if self.default_constraint.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(
|
||||
ColumnDefaultConstraint::try_from(self.default_constraint.as_slice())
|
||||
.context(error::ConvertColumnDefaultConstraintSnafu { column: &self.name })?,
|
||||
),
|
||||
)
|
||||
};
|
||||
|
||||
ColumnSchema::new(&self.name, data_type.into(), self.is_nullable)
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::{ColumnDataType, ColumnDef, CreateExpr};
|
||||
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, TableId};
|
||||
use client::admin::Admin;
|
||||
use client::{Client, Database};
|
||||
use prost_09::Message;
|
||||
@@ -33,36 +33,36 @@ fn main() {
|
||||
async fn run() {
|
||||
let client = Client::with_urls(vec!["127.0.0.1:3001"]);
|
||||
|
||||
let create_table_expr = CreateExpr {
|
||||
catalog_name: Some("greptime".to_string()),
|
||||
schema_name: Some("public".to_string()),
|
||||
let create_table_expr = CreateTableExpr {
|
||||
catalog_name: "greptime".to_string(),
|
||||
schema_name: "public".to_string(),
|
||||
table_name: "test_logical_dist_exec".to_string(),
|
||||
desc: None,
|
||||
desc: "".to_string(),
|
||||
column_defs: vec![
|
||||
ColumnDef {
|
||||
name: "timestamp".to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "key".to_string(),
|
||||
datatype: ColumnDataType::Uint64 as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "value".to_string(),
|
||||
datatype: ColumnDataType::Uint64 as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
],
|
||||
time_index: "timestamp".to_string(),
|
||||
primary_keys: vec!["key".to_string()],
|
||||
create_if_not_exists: false,
|
||||
table_options: Default::default(),
|
||||
table_id: Some(1024),
|
||||
table_id: Some(TableId { id: 1024 }),
|
||||
region_ids: vec![0],
|
||||
};
|
||||
|
||||
|
||||
@@ -34,13 +34,13 @@ impl Admin {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create(&self, expr: CreateExpr) -> Result<AdminResult> {
|
||||
pub async fn create(&self, expr: CreateTableExpr) -> Result<AdminResult> {
|
||||
let header = ExprHeader {
|
||||
version: PROTOCOL_VERSION,
|
||||
};
|
||||
let expr = AdminExpr {
|
||||
header: Some(header),
|
||||
expr: Some(admin_expr::Expr::Create(expr)),
|
||||
expr: Some(admin_expr::Expr::CreateTable(expr)),
|
||||
};
|
||||
self.do_request(expr).await
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::alter_expr::Kind;
|
||||
use api::v1::{AlterExpr, CreateExpr, DropColumns};
|
||||
use api::v1::{AlterExpr, CreateTableExpr, DropColumns};
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
@@ -29,6 +29,16 @@ use crate::error::{
|
||||
|
||||
/// Convert an [`AlterExpr`] to an optional [`AlterTableRequest`]
|
||||
pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest>> {
|
||||
let catalog_name = if expr.catalog_name.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(expr.catalog_name)
|
||||
};
|
||||
let schema_name = if expr.schema_name.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(expr.schema_name)
|
||||
};
|
||||
match expr.kind {
|
||||
Some(Kind::AddColumns(add_columns)) => {
|
||||
let add_column_requests = add_columns
|
||||
@@ -57,8 +67,8 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest
|
||||
};
|
||||
|
||||
let request = AlterTableRequest {
|
||||
catalog_name: expr.catalog_name,
|
||||
schema_name: expr.schema_name,
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name: expr.table_name,
|
||||
alter_kind,
|
||||
};
|
||||
@@ -70,8 +80,8 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest
|
||||
};
|
||||
|
||||
let request = AlterTableRequest {
|
||||
catalog_name: expr.catalog_name,
|
||||
schema_name: expr.schema_name,
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name: expr.table_name,
|
||||
alter_kind,
|
||||
};
|
||||
@@ -81,7 +91,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
|
||||
pub fn create_table_schema(expr: &CreateTableExpr) -> Result<SchemaRef> {
|
||||
let column_schemas = expr
|
||||
.column_defs
|
||||
.iter()
|
||||
@@ -119,7 +129,10 @@ pub fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
|
||||
))
|
||||
}
|
||||
|
||||
pub fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result<CreateTableRequest> {
|
||||
pub fn create_expr_to_request(
|
||||
table_id: TableId,
|
||||
expr: CreateTableExpr,
|
||||
) -> Result<CreateTableRequest> {
|
||||
let schema = create_table_schema(&expr)?;
|
||||
let primary_key_indices = expr
|
||||
.primary_keys
|
||||
@@ -134,12 +147,19 @@ pub fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result<Cre
|
||||
})
|
||||
.collect::<Result<Vec<usize>>>()?;
|
||||
|
||||
let catalog_name = expr
|
||||
.catalog_name
|
||||
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string());
|
||||
let schema_name = expr
|
||||
.schema_name
|
||||
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
|
||||
let mut catalog_name = expr.catalog_name;
|
||||
if catalog_name.is_empty() {
|
||||
catalog_name = DEFAULT_CATALOG_NAME.to_string();
|
||||
}
|
||||
let mut schema_name = expr.schema_name;
|
||||
if schema_name.is_empty() {
|
||||
schema_name = DEFAULT_SCHEMA_NAME.to_string();
|
||||
}
|
||||
let desc = if expr.desc.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(expr.desc)
|
||||
};
|
||||
|
||||
let region_ids = if expr.region_ids.is_empty() {
|
||||
vec![0]
|
||||
@@ -152,7 +172,7 @@ pub fn create_expr_to_request(table_id: TableId, expr: CreateExpr) -> Result<Cre
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name: expr.table_name,
|
||||
desc: expr.desc,
|
||||
desc,
|
||||
schema,
|
||||
region_numbers: region_ids,
|
||||
primary_key_indices,
|
||||
@@ -171,8 +191,8 @@ mod tests {
|
||||
#[test]
|
||||
fn test_alter_expr_to_request() {
|
||||
let expr = AlterExpr {
|
||||
catalog_name: None,
|
||||
schema_name: None,
|
||||
catalog_name: "".to_string(),
|
||||
schema_name: "".to_string(),
|
||||
table_name: "monitor".to_string(),
|
||||
|
||||
kind: Some(Kind::AddColumns(AddColumns {
|
||||
@@ -181,7 +201,7 @@ mod tests {
|
||||
name: "mem_usage".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
}),
|
||||
is_key: false,
|
||||
}],
|
||||
@@ -208,8 +228,8 @@ mod tests {
|
||||
#[test]
|
||||
fn test_drop_column_expr() {
|
||||
let expr = AlterExpr {
|
||||
catalog_name: Some("test_catalog".to_string()),
|
||||
schema_name: Some("test_schema".to_string()),
|
||||
catalog_name: "test_catalog".to_string(),
|
||||
schema_name: "test_schema".to_string(),
|
||||
table_name: "monitor".to_string(),
|
||||
|
||||
kind: Some(Kind::DropColumns(DropColumns {
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::sync::Arc;
|
||||
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::column::{SemanticType, Values};
|
||||
use api::v1::{AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateExpr};
|
||||
use api::v1::{AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateTableExpr};
|
||||
use common_base::BitVec;
|
||||
use common_time::timestamp::Timestamp;
|
||||
use common_time::{Date, DateTime};
|
||||
@@ -45,7 +45,7 @@ fn build_column_def(column_name: &str, datatype: i32, nullable: bool) -> ColumnD
|
||||
name: column_name.to_string(),
|
||||
datatype,
|
||||
is_nullable: nullable,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
@@ -214,7 +214,7 @@ pub fn build_create_expr_from_insertion(
|
||||
table_id: Option<TableId>,
|
||||
table_name: &str,
|
||||
columns: &[Column],
|
||||
) -> Result<CreateExpr> {
|
||||
) -> Result<CreateTableExpr> {
|
||||
let mut new_columns: HashSet<String> = HashSet::default();
|
||||
let mut column_defs = Vec::default();
|
||||
let mut primary_key_indices = Vec::default();
|
||||
@@ -263,17 +263,17 @@ pub fn build_create_expr_from_insertion(
|
||||
.map(|idx| columns[*idx].column_name.clone())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let expr = CreateExpr {
|
||||
catalog_name: Some(catalog_name.to_string()),
|
||||
schema_name: Some(schema_name.to_string()),
|
||||
let expr = CreateTableExpr {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
schema_name: schema_name.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
desc: Some("Created on insertion".to_string()),
|
||||
desc: "Created on insertion".to_string(),
|
||||
column_defs,
|
||||
time_index: timestamp_field_name,
|
||||
primary_keys,
|
||||
create_if_not_exists: true,
|
||||
table_options: Default::default(),
|
||||
table_id,
|
||||
table_id: table_id.map(|id| api::v1::TableId { id }),
|
||||
region_ids: vec![0], // TODO:(hl): region id should be allocated by frontend
|
||||
};
|
||||
|
||||
@@ -516,9 +516,9 @@ mod tests {
|
||||
build_create_expr_from_insertion("", "", table_id, table_name, &insert_batch.0)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(table_id, create_expr.table_id);
|
||||
assert_eq!(table_id, create_expr.table_id.map(|x| x.id));
|
||||
assert_eq!(table_name, create_expr.table_name);
|
||||
assert_eq!(Some("Created on insertion".to_string()), create_expr.desc);
|
||||
assert_eq!("Created on insertion".to_string(), create_expr.desc);
|
||||
assert_eq!(
|
||||
vec![create_expr.column_defs[0].name.clone()],
|
||||
create_expr.primary_keys
|
||||
|
||||
@@ -188,7 +188,9 @@ impl GrpcQueryHandler for Instance {
|
||||
impl GrpcAdminHandler for Instance {
|
||||
async fn exec_admin_request(&self, expr: AdminExpr) -> servers::error::Result<AdminResult> {
|
||||
let admin_resp = match expr.expr {
|
||||
Some(admin_expr::Expr::Create(create_expr)) => self.handle_create(create_expr).await,
|
||||
Some(admin_expr::Expr::CreateTable(create_expr)) => {
|
||||
self.handle_create(create_expr).await
|
||||
}
|
||||
Some(admin_expr::Expr::Alter(alter_expr)) => self.handle_alter(alter_expr).await,
|
||||
Some(admin_expr::Expr::CreateDatabase(create_database_expr)) => {
|
||||
self.execute_create_database(create_database_expr).await
|
||||
|
||||
@@ -33,12 +33,11 @@ use crate::metric;
|
||||
use crate::sql::SqlRequest;
|
||||
|
||||
impl Instance {
|
||||
pub async fn execute_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
let stmt = self
|
||||
.query_engine
|
||||
.sql_to_statement(sql)
|
||||
.context(ExecuteSqlSnafu)?;
|
||||
|
||||
pub async fn execute_stmt(
|
||||
&self,
|
||||
stmt: Statement,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
match stmt {
|
||||
Statement::Query(_) => {
|
||||
let logical_plan = self
|
||||
@@ -153,6 +152,14 @@ impl Instance {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn execute_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
let stmt = self
|
||||
.query_engine
|
||||
.sql_to_statement(sql)
|
||||
.context(ExecuteSqlSnafu)?;
|
||||
self.execute_stmt(stmt, query_ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(LFC): Refactor consideration: move this function to some helper mod,
|
||||
@@ -193,15 +200,33 @@ impl SqlQueryHandler for Instance {
|
||||
&self,
|
||||
query: &str,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> servers::error::Result<Output> {
|
||||
) -> Vec<servers::error::Result<Output>> {
|
||||
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
|
||||
self.execute_sql(query, query_ctx)
|
||||
// we assume sql string has only 1 statement in datanode
|
||||
let result = self
|
||||
.execute_sql(query, query_ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(e; "Instance failed to execute sql");
|
||||
BoxedError::new(e)
|
||||
})
|
||||
.context(servers::error::ExecuteQuerySnafu { query })
|
||||
.context(servers::error::ExecuteQuerySnafu { query });
|
||||
vec![result]
|
||||
}
|
||||
|
||||
async fn do_statement_query(
|
||||
&self,
|
||||
stmt: Statement,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> servers::error::Result<Output> {
|
||||
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
|
||||
self.execute_stmt(stmt, query_ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(e; "Instance failed to execute sql");
|
||||
BoxedError::new(e)
|
||||
})
|
||||
.context(servers::error::ExecuteStatementSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::result::AdminResultBuilder;
|
||||
use api::v1::{AdminResult, AlterExpr, CreateExpr, DropTableExpr};
|
||||
use api::v1::{AdminResult, AlterExpr, CreateTableExpr, DropTableExpr};
|
||||
use common_error::prelude::{ErrorExt, StatusCode};
|
||||
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
|
||||
use common_query::Output;
|
||||
@@ -31,15 +31,15 @@ use crate::sql::SqlRequest;
|
||||
|
||||
impl Instance {
|
||||
/// Handle gRPC create table requests.
|
||||
pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult {
|
||||
pub(crate) async fn handle_create(&self, expr: CreateTableExpr) -> AdminResult {
|
||||
// Respect CreateExpr's table id and region ids if present, or allocate table id
|
||||
// from local table id provider and set region id to 0.
|
||||
let table_id = if let Some(table_id) = expr.table_id {
|
||||
let table_id = if let Some(table_id) = &expr.table_id {
|
||||
info!(
|
||||
"Creating table {:?}.{:?}.{:?} with table id from frontend: {}",
|
||||
expr.catalog_name, expr.schema_name, expr.table_name, table_id
|
||||
expr.catalog_name, expr.schema_name, expr.table_name, table_id.id
|
||||
);
|
||||
table_id
|
||||
table_id.id
|
||||
} else {
|
||||
match self.table_id_provider.as_ref() {
|
||||
None => {
|
||||
@@ -157,7 +157,7 @@ impl Instance {
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::{ColumnDataType, ColumnDef};
|
||||
use api::v1::{ColumnDataType, ColumnDef, TableId};
|
||||
use common_catalog::consts::MIN_USER_TABLE_ID;
|
||||
use common_grpc_expr::create_table_schema;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
@@ -175,7 +175,7 @@ mod tests {
|
||||
assert_eq!(request.catalog_name, "greptime".to_string());
|
||||
assert_eq!(request.schema_name, "public".to_string());
|
||||
assert_eq!(request.table_name, "my-metrics");
|
||||
assert_eq!(request.desc, Some("blabla".to_string()));
|
||||
assert_eq!(request.desc, Some("blabla little magic fairy".to_string()));
|
||||
assert_eq!(request.schema, expected_table_schema());
|
||||
assert_eq!(request.primary_key_indices, vec![1, 0]);
|
||||
assert!(request.create_if_not_exists);
|
||||
@@ -214,7 +214,7 @@ mod tests {
|
||||
name: "a".to_string(),
|
||||
datatype: 1024,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
};
|
||||
let result = column_def.try_as_column_schema();
|
||||
assert!(matches!(
|
||||
@@ -226,7 +226,7 @@ mod tests {
|
||||
name: "a".to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
};
|
||||
let column_schema = column_def.try_as_column_schema().unwrap();
|
||||
assert_eq!(column_schema.name, "a");
|
||||
@@ -238,7 +238,7 @@ mod tests {
|
||||
name: "a".to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: Some(default_constraint.clone().try_into().unwrap()),
|
||||
default_constraint: default_constraint.clone().try_into().unwrap(),
|
||||
};
|
||||
let column_schema = column_def.try_as_column_schema().unwrap();
|
||||
assert_eq!(column_schema.name, "a");
|
||||
@@ -250,44 +250,46 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
fn testing_create_expr() -> CreateExpr {
|
||||
fn testing_create_expr() -> CreateTableExpr {
|
||||
let column_defs = vec![
|
||||
ColumnDef {
|
||||
name: "host".to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "ts".to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "cpu".to_string(),
|
||||
datatype: ColumnDataType::Float32 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "memory".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
];
|
||||
CreateExpr {
|
||||
catalog_name: None,
|
||||
schema_name: None,
|
||||
CreateTableExpr {
|
||||
catalog_name: "".to_string(),
|
||||
schema_name: "".to_string(),
|
||||
table_name: "my-metrics".to_string(),
|
||||
desc: Some("blabla".to_string()),
|
||||
desc: "blabla little magic fairy".to_string(),
|
||||
column_defs,
|
||||
time_index: "ts".to_string(),
|
||||
primary_keys: vec!["ts".to_string(), "host".to_string()],
|
||||
create_if_not_exists: true,
|
||||
table_options: Default::default(),
|
||||
table_id: Some(MIN_USER_TABLE_ID),
|
||||
table_id: Some(TableId {
|
||||
id: MIN_USER_TABLE_ID,
|
||||
}),
|
||||
region_ids: vec![0],
|
||||
}
|
||||
}
|
||||
|
||||
@@ -387,6 +387,12 @@ pub enum Error {
|
||||
source: query::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to execute statement, source: {}", source))]
|
||||
ExecuteStatement {
|
||||
#[snafu(backtrace)]
|
||||
source: query::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to do vector computation, source: {}", source))]
|
||||
VectorComputation {
|
||||
#[snafu(backtrace)]
|
||||
@@ -536,6 +542,7 @@ impl ErrorExt for Error {
|
||||
Error::DeserializeInsertBatch { source, .. } => source.status_code(),
|
||||
Error::PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments,
|
||||
Error::ExecuteSql { source, .. } => source.status_code(),
|
||||
Error::ExecuteStatement { source, .. } => source.status_code(),
|
||||
Error::InsertBatchToRequest { source, .. } => source.status_code(),
|
||||
Error::CollectRecordbatchStream { source } | Error::CreateRecordbatches { source } => {
|
||||
source.status_code()
|
||||
|
||||
@@ -16,7 +16,7 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::{Column, ColumnDataType, CreateExpr};
|
||||
use api::v1::{Column, ColumnDataType, CreateTableExpr};
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use sql::ast::{ColumnDef, TableConstraint};
|
||||
@@ -32,7 +32,7 @@ pub type CreateExprFactoryRef = Arc<dyn CreateExprFactory + Send + Sync>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait CreateExprFactory {
|
||||
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateExpr>;
|
||||
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateTableExpr>;
|
||||
|
||||
async fn create_expr_by_columns(
|
||||
&self,
|
||||
@@ -40,7 +40,7 @@ pub trait CreateExprFactory {
|
||||
schema_name: &str,
|
||||
table_name: &str,
|
||||
columns: &[Column],
|
||||
) -> crate::error::Result<CreateExpr>;
|
||||
) -> crate::error::Result<CreateTableExpr>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -48,7 +48,7 @@ pub struct DefaultCreateExprFactory;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl CreateExprFactory for DefaultCreateExprFactory {
|
||||
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateExpr> {
|
||||
async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result<CreateTableExpr> {
|
||||
create_to_expr(None, vec![0], stmt)
|
||||
}
|
||||
|
||||
@@ -58,7 +58,7 @@ impl CreateExprFactory for DefaultCreateExprFactory {
|
||||
schema_name: &str,
|
||||
table_name: &str,
|
||||
columns: &[Column],
|
||||
) -> Result<CreateExpr> {
|
||||
) -> Result<CreateTableExpr> {
|
||||
let table_id = None;
|
||||
let create_expr = common_grpc_expr::build_create_expr_from_insertion(
|
||||
catalog_name,
|
||||
@@ -78,23 +78,23 @@ fn create_to_expr(
|
||||
table_id: Option<u32>,
|
||||
region_ids: Vec<u32>,
|
||||
create: &CreateTable,
|
||||
) -> Result<CreateExpr> {
|
||||
) -> Result<CreateTableExpr> {
|
||||
let (catalog_name, schema_name, table_name) =
|
||||
table_idents_to_full_name(&create.name).context(ParseSqlSnafu)?;
|
||||
|
||||
let time_index = find_time_index(&create.constraints)?;
|
||||
let expr = CreateExpr {
|
||||
catalog_name: Some(catalog_name),
|
||||
schema_name: Some(schema_name),
|
||||
let expr = CreateTableExpr {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
desc: None,
|
||||
desc: "".to_string(),
|
||||
column_defs: columns_to_expr(&create.columns, &time_index)?,
|
||||
time_index,
|
||||
primary_keys: find_primary_keys(&create.constraints)?,
|
||||
create_if_not_exists: create.if_not_exists,
|
||||
// TODO(LFC): Fill in other table options.
|
||||
table_options: HashMap::from([("engine".to_string(), create.engine.clone())]),
|
||||
table_id,
|
||||
table_id: table_id.map(|id| api::v1::TableId { id }),
|
||||
region_ids,
|
||||
};
|
||||
Ok(expr)
|
||||
@@ -171,12 +171,14 @@ fn columns_to_expr(
|
||||
datatype: datatype as i32,
|
||||
is_nullable: schema.is_nullable(),
|
||||
default_constraint: match schema.default_constraint() {
|
||||
None => None,
|
||||
Some(v) => Some(v.clone().try_into().context(
|
||||
ConvertColumnDefaultConstraintSnafu {
|
||||
column_name: &schema.name,
|
||||
},
|
||||
)?),
|
||||
None => vec![],
|
||||
Some(v) => {
|
||||
v.clone()
|
||||
.try_into()
|
||||
.context(ConvertColumnDefaultConstraintSnafu {
|
||||
column_name: &schema.name,
|
||||
})?
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
@@ -25,7 +25,7 @@ use api::v1::alter_expr::Kind;
|
||||
use api::v1::object_expr::Expr;
|
||||
use api::v1::{
|
||||
admin_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, Column, CreateDatabaseExpr,
|
||||
CreateExpr, DropTableExpr, ExprHeader, InsertExpr, ObjectExpr,
|
||||
CreateTableExpr, DropTableExpr, ExprHeader, InsertExpr, ObjectExpr,
|
||||
ObjectResult as GrpcObjectResult,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
@@ -196,7 +196,7 @@ impl Instance {
|
||||
/// Handle create expr.
|
||||
pub async fn handle_create_table(
|
||||
&self,
|
||||
mut expr: CreateExpr,
|
||||
mut expr: CreateTableExpr,
|
||||
partitions: Option<Partitions>,
|
||||
) -> Result<Output> {
|
||||
if let Some(v) = &self.dist_instance {
|
||||
@@ -206,7 +206,7 @@ impl Instance {
|
||||
header: Some(ExprHeader {
|
||||
version: PROTOCOL_VERSION,
|
||||
}),
|
||||
expr: Some(admin_expr::Expr::Create(expr)),
|
||||
expr: Some(admin_expr::Expr::CreateTable(expr)),
|
||||
};
|
||||
let result = self
|
||||
.grpc_admin_handler
|
||||
@@ -359,8 +359,8 @@ impl Instance {
|
||||
);
|
||||
let expr = AlterExpr {
|
||||
table_name: table_name.to_string(),
|
||||
schema_name: Some(schema_name.to_string()),
|
||||
catalog_name: Some(catalog_name.to_string()),
|
||||
schema_name: schema_name.to_string(),
|
||||
catalog_name: catalog_name.to_string(),
|
||||
kind: Some(Kind::AddColumns(add_columns)),
|
||||
};
|
||||
|
||||
@@ -461,31 +461,18 @@ impl FrontendInstance for Instance {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_stmt(sql: &str) -> Result<Statement> {
|
||||
let mut stmt = ParserContext::create_with_dialect(sql, &GenericDialect {})
|
||||
.context(error::ParseSqlSnafu)?;
|
||||
// TODO(LFC): Support executing multiple SQL queries,
|
||||
// which seems to be a major change to our whole server framework?
|
||||
ensure!(
|
||||
stmt.len() == 1,
|
||||
error::InvalidSqlSnafu {
|
||||
err_msg: "Currently executing multiple SQL queries are not supported."
|
||||
}
|
||||
);
|
||||
Ok(stmt.remove(0))
|
||||
fn parse_stmt(sql: &str) -> Result<Vec<Statement>> {
|
||||
ParserContext::create_with_dialect(sql, &GenericDialect {}).context(error::ParseSqlSnafu)
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl SqlQueryHandler for Instance {
|
||||
async fn do_query(
|
||||
impl Instance {
|
||||
async fn query_statement(
|
||||
&self,
|
||||
query: &str,
|
||||
stmt: Statement,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> server_error::Result<Output> {
|
||||
let stmt = parse_stmt(query)
|
||||
.map_err(BoxedError::new)
|
||||
.context(server_error::ExecuteQuerySnafu { query })?;
|
||||
|
||||
// TODO(sunng87): provide a better form to log or track statement
|
||||
let query = &format!("{:?}", &stmt);
|
||||
match stmt {
|
||||
Statement::CreateDatabase(_)
|
||||
| Statement::ShowDatabases(_)
|
||||
@@ -494,7 +481,7 @@ impl SqlQueryHandler for Instance {
|
||||
| Statement::DescribeTable(_)
|
||||
| Statement::Explain(_)
|
||||
| Statement::Query(_) => {
|
||||
return self.sql_handler.do_query(query, query_ctx).await;
|
||||
return self.sql_handler.do_statement_query(stmt, query_ctx).await;
|
||||
}
|
||||
Statement::Insert(insert) => match self.mode {
|
||||
Mode::Standalone => {
|
||||
@@ -569,6 +556,45 @@ impl SqlQueryHandler for Instance {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl SqlQueryHandler for Instance {
|
||||
async fn do_query(
|
||||
&self,
|
||||
query: &str,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Vec<server_error::Result<Output>> {
|
||||
match parse_stmt(query)
|
||||
.map_err(BoxedError::new)
|
||||
.context(server_error::ExecuteQuerySnafu { query })
|
||||
{
|
||||
Ok(stmts) => {
|
||||
let mut results = Vec::with_capacity(stmts.len());
|
||||
for stmt in stmts {
|
||||
match self.query_statement(stmt, query_ctx.clone()).await {
|
||||
Ok(output) => results.push(Ok(output)),
|
||||
Err(e) => {
|
||||
results.push(Err(e));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
results
|
||||
}
|
||||
Err(e) => {
|
||||
vec![Err(e)]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_statement_query(
|
||||
&self,
|
||||
stmt: Statement,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> server_error::Result<Output> {
|
||||
self.query_statement(stmt, query_ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ScriptHandler for Instance {
|
||||
async fn insert_script(&self, name: &str, script: &str) -> server_error::Result<()> {
|
||||
@@ -630,7 +656,7 @@ impl GrpcAdminHandler for Instance {
|
||||
async fn exec_admin_request(&self, mut expr: AdminExpr) -> server_error::Result<AdminResult> {
|
||||
// Force the default to be `None` rather than `Some(0)` comes from gRPC decode.
|
||||
// Related issue: #480
|
||||
if let Some(api::v1::admin_expr::Expr::Create(create)) = &mut expr.expr {
|
||||
if let Some(api::v1::admin_expr::Expr::CreateTable(create)) = &mut expr.expr {
|
||||
create.table_id = None;
|
||||
}
|
||||
self.grpc_admin_handler.exec_admin_request(expr).await
|
||||
@@ -671,6 +697,7 @@ mod tests {
|
||||
) engine=mito with(regions=1);"#;
|
||||
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
|
||||
.await
|
||||
.remove(0)
|
||||
.unwrap();
|
||||
match output {
|
||||
Output::AffectedRows(rows) => assert_eq!(rows, 1),
|
||||
@@ -684,6 +711,7 @@ mod tests {
|
||||
"#;
|
||||
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
|
||||
.await
|
||||
.remove(0)
|
||||
.unwrap();
|
||||
match output {
|
||||
Output::AffectedRows(rows) => assert_eq!(rows, 3),
|
||||
@@ -693,6 +721,7 @@ mod tests {
|
||||
let sql = "select * from demo";
|
||||
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
|
||||
.await
|
||||
.remove(0)
|
||||
.unwrap();
|
||||
match output {
|
||||
Output::RecordBatches(_) => {
|
||||
@@ -720,6 +749,7 @@ mod tests {
|
||||
let sql = "select * from demo where ts>cast(1000000000 as timestamp)"; // use nanoseconds as where condition
|
||||
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
|
||||
.await
|
||||
.remove(0)
|
||||
.unwrap();
|
||||
match output {
|
||||
Output::RecordBatches(_) => {
|
||||
@@ -808,7 +838,7 @@ mod tests {
|
||||
let create_expr = create_expr();
|
||||
let admin_expr = AdminExpr {
|
||||
header: Some(ExprHeader::default()),
|
||||
expr: Some(admin_expr::Expr::Create(create_expr)),
|
||||
expr: Some(admin_expr::Expr::CreateTable(create_expr)),
|
||||
};
|
||||
let result = GrpcAdminHandler::exec_admin_request(&*instance, admin_expr)
|
||||
.await
|
||||
@@ -886,48 +916,46 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn create_expr() -> CreateExpr {
|
||||
fn create_expr() -> CreateTableExpr {
|
||||
let column_defs = vec![
|
||||
GrpcColumnDef {
|
||||
name: "host".to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
GrpcColumnDef {
|
||||
name: "cpu".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
GrpcColumnDef {
|
||||
name: "memory".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
GrpcColumnDef {
|
||||
name: "disk_util".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: Some(
|
||||
ColumnDefaultConstraint::Value(Value::from(9.9f64))
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
),
|
||||
default_constraint: ColumnDefaultConstraint::Value(Value::from(9.9f64))
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
},
|
||||
GrpcColumnDef {
|
||||
name: "ts".to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
];
|
||||
CreateExpr {
|
||||
catalog_name: None,
|
||||
schema_name: None,
|
||||
CreateTableExpr {
|
||||
catalog_name: "".to_string(),
|
||||
schema_name: "".to_string(),
|
||||
table_name: "demo".to_string(),
|
||||
desc: None,
|
||||
desc: "".to_string(),
|
||||
column_defs,
|
||||
time_index: "ts".to_string(),
|
||||
primary_keys: vec!["host".to_string()],
|
||||
|
||||
@@ -18,8 +18,8 @@ use std::sync::Arc;
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::result::AdminResultBuilder;
|
||||
use api::v1::{
|
||||
admin_expr, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, CreateExpr, ObjectExpr,
|
||||
ObjectResult,
|
||||
admin_expr, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, CreateTableExpr, ObjectExpr,
|
||||
ObjectResult, TableId,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use catalog::helper::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue};
|
||||
@@ -86,7 +86,7 @@ impl DistInstance {
|
||||
|
||||
pub(crate) async fn create_table(
|
||||
&self,
|
||||
create_table: &mut CreateExpr,
|
||||
create_table: &mut CreateTableExpr,
|
||||
partitions: Option<Partitions>,
|
||||
) -> Result<Output> {
|
||||
let response = self.create_table_in_meta(create_table, partitions).await?;
|
||||
@@ -112,7 +112,9 @@ impl DistInstance {
|
||||
table_name: create_table.table_name.to_string()
|
||||
}
|
||||
);
|
||||
create_table.table_id = Some(table_route.table.id as u32);
|
||||
create_table.table_id = Some(TableId {
|
||||
id: table_route.table.id as u32,
|
||||
});
|
||||
self.put_table_global_meta(create_table, table_route)
|
||||
.await?;
|
||||
|
||||
@@ -140,14 +142,17 @@ impl DistInstance {
|
||||
Ok(Output::AffectedRows(0))
|
||||
}
|
||||
|
||||
async fn handle_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
let stmt = parse_stmt(sql)?;
|
||||
async fn handle_statement(
|
||||
&self,
|
||||
stmt: Statement,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
match stmt {
|
||||
Statement::Query(_) => {
|
||||
let plan = self
|
||||
.query_engine
|
||||
.statement_to_plan(stmt, query_ctx)
|
||||
.context(error::ExecuteSqlSnafu { sql })?;
|
||||
.context(error::ExecuteStatementSnafu {})?;
|
||||
self.query_engine.execute(&plan).await
|
||||
}
|
||||
Statement::CreateDatabase(stmt) => {
|
||||
@@ -171,7 +176,30 @@ impl DistInstance {
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
.context(error::ExecuteSqlSnafu { sql })
|
||||
.context(error::ExecuteStatementSnafu)
|
||||
}
|
||||
|
||||
async fn handle_sql(&self, sql: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
|
||||
let stmts = parse_stmt(sql);
|
||||
match stmts {
|
||||
Ok(stmts) => {
|
||||
let mut results = Vec::with_capacity(stmts.len());
|
||||
|
||||
for stmt in stmts {
|
||||
let result = self.handle_statement(stmt, query_ctx.clone()).await;
|
||||
let is_err = result.is_err();
|
||||
|
||||
results.push(result);
|
||||
|
||||
if is_err {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
results
|
||||
}
|
||||
Err(e) => vec![Err(e)],
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles distributed database creation
|
||||
@@ -194,8 +222,16 @@ impl DistInstance {
|
||||
}
|
||||
|
||||
async fn handle_alter_table(&self, expr: AlterExpr) -> Result<AdminResult> {
|
||||
let catalog_name = expr.catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
|
||||
let schema_name = expr.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME);
|
||||
let catalog_name = if expr.catalog_name.is_empty() {
|
||||
DEFAULT_CATALOG_NAME
|
||||
} else {
|
||||
expr.catalog_name.as_str()
|
||||
};
|
||||
let schema_name = if expr.schema_name.is_empty() {
|
||||
DEFAULT_SCHEMA_NAME
|
||||
} else {
|
||||
expr.schema_name.as_str()
|
||||
};
|
||||
let table_name = expr.table_name.as_str();
|
||||
let table = self
|
||||
.catalog_manager
|
||||
@@ -223,20 +259,18 @@ impl DistInstance {
|
||||
|
||||
async fn create_table_in_meta(
|
||||
&self,
|
||||
create_table: &CreateExpr,
|
||||
create_table: &CreateTableExpr,
|
||||
partitions: Option<Partitions>,
|
||||
) -> Result<RouteResponse> {
|
||||
let table_name = TableName::new(
|
||||
create_table
|
||||
.catalog_name
|
||||
.clone()
|
||||
.unwrap_or_else(|| DEFAULT_CATALOG_NAME.to_string()),
|
||||
create_table
|
||||
.schema_name
|
||||
.clone()
|
||||
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()),
|
||||
create_table.table_name.clone(),
|
||||
);
|
||||
let mut catalog_name = create_table.catalog_name.clone();
|
||||
if catalog_name.is_empty() {
|
||||
catalog_name = DEFAULT_CATALOG_NAME.to_string();
|
||||
}
|
||||
let mut schema_name = create_table.schema_name.clone();
|
||||
if schema_name.is_empty() {
|
||||
schema_name = DEFAULT_SCHEMA_NAME.to_string();
|
||||
}
|
||||
let table_name = TableName::new(catalog_name, schema_name, create_table.table_name.clone());
|
||||
|
||||
let partitions = parse_partitions(create_table, partitions)?;
|
||||
let request = MetaCreateRequest {
|
||||
@@ -252,7 +286,7 @@ impl DistInstance {
|
||||
// TODO(LFC): Maybe move this to FrontendCatalogManager's "register_table" method?
|
||||
async fn put_table_global_meta(
|
||||
&self,
|
||||
create_table: &CreateExpr,
|
||||
create_table: &CreateTableExpr,
|
||||
table_route: &TableRoute,
|
||||
) -> Result<()> {
|
||||
let table_name = &table_route.table.table_name;
|
||||
@@ -274,10 +308,12 @@ impl DistInstance {
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
{
|
||||
let existing_bytes = existing.unwrap(); //this unwrap is safe since we compare with empty bytes and failed
|
||||
let existing_bytes = existing.unwrap(); // this unwrap is safe since we compare with empty bytes and failed
|
||||
let existing_value =
|
||||
TableGlobalValue::from_bytes(&existing_bytes).context(CatalogEntrySerdeSnafu)?;
|
||||
if existing_value.table_info.ident.table_id != create_table.table_id.unwrap() {
|
||||
if existing_value.table_info.ident.table_id
|
||||
!= create_table.table_id.as_ref().unwrap().id
|
||||
{
|
||||
error!(
|
||||
"Table with name {} already exists, value in catalog: {:?}",
|
||||
key, existing_bytes
|
||||
@@ -300,11 +336,26 @@ impl SqlQueryHandler for DistInstance {
|
||||
&self,
|
||||
query: &str,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> server_error::Result<Output> {
|
||||
) -> Vec<server_error::Result<Output>> {
|
||||
self.handle_sql(query, query_ctx)
|
||||
.await
|
||||
.into_iter()
|
||||
.map(|r| {
|
||||
r.map_err(BoxedError::new)
|
||||
.context(server_error::ExecuteQuerySnafu { query })
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn do_statement_query(
|
||||
&self,
|
||||
stmt: Statement,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> server_error::Result<Output> {
|
||||
self.handle_statement(stmt, query_ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(server_error::ExecuteQuerySnafu { query })
|
||||
.context(server_error::ExecuteStatementSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -340,7 +391,7 @@ impl GrpcAdminHandler for DistInstance {
|
||||
}
|
||||
|
||||
fn create_table_global_value(
|
||||
create_table: &CreateExpr,
|
||||
create_table: &CreateTableExpr,
|
||||
table_route: &TableRoute,
|
||||
) -> Result<TableGlobalValue> {
|
||||
let table_name = &table_route.table.table_name;
|
||||
@@ -419,13 +470,19 @@ fn create_table_global_value(
|
||||
created_on: DateTime::default(),
|
||||
};
|
||||
|
||||
let desc = if create_table.desc.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(create_table.desc.clone())
|
||||
};
|
||||
|
||||
let table_info = RawTableInfo {
|
||||
ident: TableIdent {
|
||||
table_id: table_route.table.id as u32,
|
||||
version: 0,
|
||||
},
|
||||
name: table_name.table_name.clone(),
|
||||
desc: create_table.desc.clone(),
|
||||
desc,
|
||||
catalog_name: table_name.catalog_name.clone(),
|
||||
schema_name: table_name.schema_name.clone(),
|
||||
meta,
|
||||
@@ -440,7 +497,7 @@ fn create_table_global_value(
|
||||
}
|
||||
|
||||
fn parse_partitions(
|
||||
create_table: &CreateExpr,
|
||||
create_table: &CreateTableExpr,
|
||||
partitions: Option<Partitions>,
|
||||
) -> Result<Vec<MetaPartition>> {
|
||||
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
|
||||
@@ -455,7 +512,7 @@ fn parse_partitions(
|
||||
}
|
||||
|
||||
fn find_partition_entries(
|
||||
create_table: &CreateExpr,
|
||||
create_table: &CreateTableExpr,
|
||||
partitions: &Option<Partitions>,
|
||||
partition_columns: &[String],
|
||||
) -> Result<Vec<Vec<PartitionBound>>> {
|
||||
@@ -505,7 +562,7 @@ fn find_partition_entries(
|
||||
}
|
||||
|
||||
fn find_partition_columns(
|
||||
create_table: &CreateExpr,
|
||||
create_table: &CreateTableExpr,
|
||||
partitions: &Option<Partitions>,
|
||||
) -> Result<Vec<String>> {
|
||||
let columns = if let Some(partitions) = partitions {
|
||||
@@ -539,7 +596,7 @@ mod test {
|
||||
let cases = [
|
||||
(
|
||||
r"
|
||||
CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
|
||||
CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
|
||||
PARTITION BY RANGE COLUMNS (b) (
|
||||
PARTITION r0 VALUES LESS THAN ('hz'),
|
||||
PARTITION r1 VALUES LESS THAN ('sh'),
|
||||
@@ -585,6 +642,7 @@ ENGINE=mito",
|
||||
let output = dist_instance
|
||||
.handle_sql(sql, QueryContext::arc())
|
||||
.await
|
||||
.remove(0)
|
||||
.unwrap();
|
||||
match output {
|
||||
Output::AffectedRows(rows) => assert_eq!(rows, 1),
|
||||
@@ -595,6 +653,7 @@ ENGINE=mito",
|
||||
let output = dist_instance
|
||||
.handle_sql(sql, QueryContext::arc())
|
||||
.await
|
||||
.remove(0)
|
||||
.unwrap();
|
||||
match output {
|
||||
Output::RecordBatches(r) => {
|
||||
@@ -633,6 +692,7 @@ ENGINE=mito",
|
||||
dist_instance
|
||||
.handle_sql(sql, QueryContext::arc())
|
||||
.await
|
||||
.remove(0)
|
||||
.unwrap();
|
||||
|
||||
let sql = "
|
||||
@@ -651,11 +711,16 @@ ENGINE=mito",
|
||||
dist_instance
|
||||
.handle_sql(sql, QueryContext::arc())
|
||||
.await
|
||||
.remove(0)
|
||||
.unwrap();
|
||||
|
||||
async fn assert_show_tables(instance: SqlQueryHandlerRef) {
|
||||
let sql = "show tables in test_show_tables";
|
||||
let output = instance.do_query(sql, QueryContext::arc()).await.unwrap();
|
||||
let output = instance
|
||||
.do_query(sql, QueryContext::arc())
|
||||
.await
|
||||
.remove(0)
|
||||
.unwrap();
|
||||
match output {
|
||||
Output::RecordBatches(r) => {
|
||||
let expected = r#"+--------------+
|
||||
|
||||
@@ -130,6 +130,7 @@ mod tests {
|
||||
Arc::new(QueryContext::new()),
|
||||
)
|
||||
.await
|
||||
.remove(0)
|
||||
.unwrap();
|
||||
match output {
|
||||
Output::Stream(stream) => {
|
||||
|
||||
@@ -27,9 +27,9 @@ use servers::prometheus::{self, Metrics};
|
||||
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse};
|
||||
use servers::Mode;
|
||||
use session::context::QueryContext;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
use crate::instance::Instance;
|
||||
use crate::instance::{parse_stmt, Instance};
|
||||
|
||||
const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
|
||||
|
||||
@@ -94,13 +94,26 @@ impl Instance {
|
||||
);
|
||||
|
||||
let query_ctx = Arc::new(QueryContext::with_current_schema(db.to_string()));
|
||||
let output = self.sql_handler.do_query(&sql, query_ctx).await;
|
||||
|
||||
let mut stmts = parse_stmt(&sql)
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteQuerySnafu { query: &sql })?;
|
||||
|
||||
ensure!(
|
||||
stmts.len() == 1,
|
||||
error::InvalidQuerySnafu {
|
||||
reason: "The sql has multiple statements".to_string()
|
||||
}
|
||||
);
|
||||
let stmt = stmts.remove(0);
|
||||
|
||||
let output = self.sql_handler.do_statement_query(stmt, query_ctx).await;
|
||||
|
||||
let object_result = to_object_result(output)
|
||||
.await
|
||||
.try_into()
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteQuerySnafu { query: sql })?;
|
||||
.context(error::ExecuteQuerySnafu { query: &sql })?;
|
||||
|
||||
results.push((table_name, object_result));
|
||||
}
|
||||
|
||||
@@ -354,7 +354,7 @@ impl DistTable {
|
||||
Ok(partition_rule)
|
||||
}
|
||||
|
||||
/// Define a `alter_by_expr` instead of impl [`Table::alter`] to avoid redundant conversion between
|
||||
/// Define a `alter_by_expr` instead of impl [`Table::alter`] to avoid redundant conversion between
|
||||
/// [`table::requests::AlterTableRequest`] and [`AlterExpr`].
|
||||
pub(crate) async fn alter_by_expr(&self, expr: AlterExpr) -> Result<()> {
|
||||
let table_routes = self.table_routes.get_route(&self.table_name).await?;
|
||||
@@ -735,6 +735,7 @@ mod test {
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_dist_table_scan() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let table = Arc::new(new_dist_table().await);
|
||||
// should scan all regions
|
||||
// select a, row_id from numbers
|
||||
@@ -896,8 +897,8 @@ mod test {
|
||||
async fn new_dist_table() -> DistTable {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false),
|
||||
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false),
|
||||
ColumnSchema::new("row_id", ConcreteDataType::int32_datatype(), false),
|
||||
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
|
||||
ColumnSchema::new("row_id", ConcreteDataType::int32_datatype(), true),
|
||||
];
|
||||
let schema = Arc::new(Schema::new(column_schemas.clone()));
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ use common_telemetry::timer;
|
||||
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
|
||||
use datafusion::physical_plan::ExecutionPlan;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::dialect::GenericDialect;
|
||||
use sql::parser::ParserContext;
|
||||
use sql::statements::statement::Statement;
|
||||
@@ -72,8 +72,7 @@ impl QueryEngine for DatafusionQueryEngine {
|
||||
fn sql_to_statement(&self, sql: &str) -> Result<Statement> {
|
||||
let mut statement = ParserContext::create_with_dialect(sql, &GenericDialect {})
|
||||
.context(error::ParseSqlSnafu)?;
|
||||
// TODO(dennis): supports multi statement in one sql?
|
||||
assert!(1 == statement.len());
|
||||
ensure!(1 == statement.len(), error::MultipleStatementsSnafu { sql });
|
||||
Ok(statement.remove(0))
|
||||
}
|
||||
|
||||
@@ -280,6 +279,7 @@ mod tests {
|
||||
.sql_to_plan(sql, Arc::new(QueryContext::new()))
|
||||
.unwrap();
|
||||
|
||||
// TODO(sunng87): do not rely on to_string for compare
|
||||
assert_eq!(
|
||||
format!("{:?}", plan),
|
||||
r#"DfPlan(Limit: skip=0, fetch=20
|
||||
@@ -297,6 +297,7 @@ mod tests {
|
||||
let plan = engine
|
||||
.sql_to_plan(sql, Arc::new(QueryContext::new()))
|
||||
.unwrap();
|
||||
|
||||
let output = engine.execute(&plan).await.unwrap();
|
||||
|
||||
match output {
|
||||
|
||||
@@ -40,6 +40,9 @@ pub enum InnerError {
|
||||
source: sql::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("The SQL string has multiple statements, sql: {}", sql))]
|
||||
MultipleStatements { sql: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))]
|
||||
PlanSql {
|
||||
sql: String,
|
||||
@@ -90,6 +93,7 @@ impl ErrorExt for InnerError {
|
||||
PlanSql { .. } => StatusCode::PlanQuery,
|
||||
ConvertDfRecordBatchStream { source } => source.status_code(),
|
||||
ExecutePhysicalPlan { source } => source.status_code(),
|
||||
MultipleStatements { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -47,6 +47,7 @@ session = { path = "../session" }
|
||||
sha1 = "0.10"
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
snap = "1"
|
||||
sql = { path = "../sql" }
|
||||
strum = { version = "0.24", features = ["derive"] }
|
||||
table = { path = "../table" }
|
||||
tokio = { version = "1.20", features = ["full"] }
|
||||
|
||||
@@ -78,6 +78,12 @@ pub enum Error {
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to execute sql statement, source: {}", source))]
|
||||
ExecuteStatement {
|
||||
#[snafu(backtrace)]
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to execute insert: {}, source: {}", msg, source))]
|
||||
ExecuteInsert {
|
||||
msg: String,
|
||||
@@ -257,6 +263,7 @@ impl ErrorExt for Error {
|
||||
InsertScript { source, .. }
|
||||
| ExecuteScript { source, .. }
|
||||
| ExecuteQuery { source, .. }
|
||||
| ExecuteStatement { source, .. }
|
||||
| ExecuteInsert { source, .. }
|
||||
| ExecuteAlter { source, .. }
|
||||
| PutOpentsdbDataPoint { source, .. } => source.status_code(),
|
||||
|
||||
@@ -232,28 +232,52 @@ impl JsonResponse {
|
||||
}
|
||||
|
||||
/// Create a json response from query result
|
||||
async fn from_output(output: Result<Output>) -> Self {
|
||||
match output {
|
||||
Ok(Output::AffectedRows(rows)) => {
|
||||
Self::with_output(Some(vec![JsonOutput::AffectedRows(rows)]))
|
||||
}
|
||||
Ok(Output::Stream(stream)) => match util::collect(stream).await {
|
||||
Ok(rows) => match HttpRecordsOutput::try_from(rows) {
|
||||
Ok(rows) => Self::with_output(Some(vec![JsonOutput::Records(rows)])),
|
||||
Err(err) => Self::with_error(err, StatusCode::Internal),
|
||||
async fn from_output(outputs: Vec<Result<Output>>) -> Self {
|
||||
// TODO(sunng87): this api response structure cannot represent error
|
||||
// well. It hides successful execution results from error response
|
||||
let mut results = Vec::with_capacity(outputs.len());
|
||||
for out in outputs {
|
||||
match out {
|
||||
Ok(Output::AffectedRows(rows)) => {
|
||||
results.push(JsonOutput::AffectedRows(rows));
|
||||
}
|
||||
Ok(Output::Stream(stream)) => {
|
||||
// TODO(sunng87): streaming response
|
||||
match util::collect(stream).await {
|
||||
Ok(rows) => match HttpRecordsOutput::try_from(rows) {
|
||||
Ok(rows) => {
|
||||
results.push(JsonOutput::Records(rows));
|
||||
}
|
||||
Err(err) => {
|
||||
return Self::with_error(err, StatusCode::Internal);
|
||||
}
|
||||
},
|
||||
|
||||
Err(e) => {
|
||||
return Self::with_error(
|
||||
format!("Recordbatch error: {}", e),
|
||||
e.status_code(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Output::RecordBatches(rbs)) => match HttpRecordsOutput::try_from(rbs.take()) {
|
||||
Ok(rows) => {
|
||||
results.push(JsonOutput::Records(rows));
|
||||
}
|
||||
Err(err) => {
|
||||
return Self::with_error(err, StatusCode::Internal);
|
||||
}
|
||||
},
|
||||
Err(e) => Self::with_error(format!("Recordbatch error: {}", e), e.status_code()),
|
||||
},
|
||||
Ok(Output::RecordBatches(recordbatches)) => {
|
||||
match HttpRecordsOutput::try_from(recordbatches.take()) {
|
||||
Ok(rows) => Self::with_output(Some(vec![JsonOutput::Records(rows)])),
|
||||
Err(err) => Self::with_error(err, StatusCode::Internal),
|
||||
Err(e) => {
|
||||
return Self::with_error(
|
||||
format!("Query engine output error: {}", e),
|
||||
e.status_code(),
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
Self::with_error(format!("Query engine output error: {}", e), e.status_code())
|
||||
}
|
||||
}
|
||||
Self::with_output(Some(results))
|
||||
}
|
||||
|
||||
pub fn code(&self) -> u32 {
|
||||
@@ -519,7 +543,15 @@ mod test {
|
||||
|
||||
#[async_trait]
|
||||
impl SqlQueryHandler for DummyInstance {
|
||||
async fn do_query(&self, _: &str, _: QueryContextRef) -> Result<Output> {
|
||||
async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn do_statement_query(
|
||||
&self,
|
||||
_stmt: sql::statements::statement::Statement,
|
||||
_query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
@@ -582,7 +614,8 @@ mod test {
|
||||
let recordbatch = RecordBatch::new(schema.clone(), columns).unwrap();
|
||||
let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch]).unwrap();
|
||||
|
||||
let json_resp = JsonResponse::from_output(Ok(Output::RecordBatches(recordbatches))).await;
|
||||
let json_resp =
|
||||
JsonResponse::from_output(vec![Ok(Output::RecordBatches(recordbatches))]).await;
|
||||
|
||||
let json_output = &json_resp.output.unwrap()[0];
|
||||
if let JsonOutput::Records(r) = json_output {
|
||||
|
||||
@@ -45,8 +45,11 @@ pub async fn sql(
|
||||
let sql_handler = &state.sql_handler;
|
||||
let start = Instant::now();
|
||||
let resp = if let Some(sql) = ¶ms.sql {
|
||||
// TODO(LFC): Sessions in http server.
|
||||
let query_ctx = Arc::new(QueryContext::new());
|
||||
if let Some(db) = params.database {
|
||||
query_ctx.set_current_schema(db.as_ref());
|
||||
}
|
||||
|
||||
JsonResponse::from_output(sql_handler.do_query(sql, query_ctx).await).await
|
||||
} else {
|
||||
JsonResponse::with_error(
|
||||
@@ -78,8 +81,8 @@ pub struct HealthQuery {}
|
||||
#[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
|
||||
pub struct HealthResponse {}
|
||||
|
||||
/// Handler to export healthy check
|
||||
///
|
||||
/// Handler to export healthy check
|
||||
///
|
||||
/// Currently simply return status "200 OK" (default) with an empty json payload "{}"
|
||||
#[axum_macros::debug_handler]
|
||||
pub async fn health(Query(_params): Query<HealthQuery>) -> Json<HealthResponse> {
|
||||
|
||||
@@ -91,7 +91,7 @@ pub async fn run_script(
|
||||
}
|
||||
|
||||
let output = script_handler.execute_script(name.unwrap()).await;
|
||||
let resp = JsonResponse::from_output(output).await;
|
||||
let resp = JsonResponse::from_output(vec![output]).await;
|
||||
|
||||
Json(resp.with_execution_time(start.elapsed().as_millis()))
|
||||
} else {
|
||||
|
||||
@@ -73,7 +73,7 @@ impl MysqlInstanceShim {
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_query(&self, query: &str) -> Result<Output> {
|
||||
async fn do_query(&self, query: &str) -> Vec<Result<Output>> {
|
||||
debug!("Start executing query: '{}'", query);
|
||||
let start = Instant::now();
|
||||
|
||||
@@ -82,7 +82,7 @@ impl MysqlInstanceShim {
|
||||
// components, this is quick and dirty, there must be a better way to do it.
|
||||
let output =
|
||||
if let Some(output) = crate::mysql::federated::check(query, self.session.context()) {
|
||||
Ok(output)
|
||||
vec![Ok(output)]
|
||||
} else {
|
||||
self.query_handler
|
||||
.do_query(query, self.session.context())
|
||||
@@ -193,14 +193,17 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
|
||||
query: &'a str,
|
||||
writer: QueryResultWriter<'a, W>,
|
||||
) -> Result<()> {
|
||||
let output = self.do_query(query).await;
|
||||
let outputs = self.do_query(query).await;
|
||||
let mut writer = MysqlResultWriter::new(writer);
|
||||
writer.write(query, output).await
|
||||
for output in outputs {
|
||||
writer.write(query, output).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_init<'a>(&'a mut self, database: &'a str, w: InitWriter<'a, W>) -> Result<()> {
|
||||
let query = format!("USE {}", database.trim());
|
||||
let output = self.do_query(&query).await;
|
||||
let output = self.do_query(&query).await.remove(0);
|
||||
if let Err(e) = output {
|
||||
w.error(ErrorKind::ER_UNKNOWN_ERROR, e.to_string().as_bytes())
|
||||
.await
|
||||
|
||||
@@ -26,7 +26,7 @@ use pgwire::api::portal::Portal;
|
||||
use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler};
|
||||
use pgwire::api::results::{text_query_response, FieldInfo, Response, Tag, TextDataRowEncoder};
|
||||
use pgwire::api::{ClientInfo, Type};
|
||||
use pgwire::error::{PgWireError, PgWireResult};
|
||||
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
|
||||
use session::context::QueryContext;
|
||||
|
||||
use crate::error::{self, Error, Result};
|
||||
@@ -61,36 +61,43 @@ impl SimpleQueryHandler for PostgresServerHandler {
|
||||
C: ClientInfo + Unpin + Send + Sync,
|
||||
{
|
||||
let query_ctx = query_context_from_client_info(client);
|
||||
let output = self
|
||||
.query_handler
|
||||
.do_query(query, query_ctx)
|
||||
.await
|
||||
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
|
||||
let outputs = self.query_handler.do_query(query, query_ctx).await;
|
||||
|
||||
match output {
|
||||
Output::AffectedRows(rows) => Ok(vec![Response::Execution(Tag::new_for_execution(
|
||||
"OK",
|
||||
Some(rows),
|
||||
))]),
|
||||
Output::Stream(record_stream) => {
|
||||
let schema = record_stream.schema();
|
||||
recordbatches_to_query_response(record_stream, schema)
|
||||
}
|
||||
Output::RecordBatches(recordbatches) => {
|
||||
let schema = recordbatches.schema();
|
||||
recordbatches_to_query_response(
|
||||
stream::iter(recordbatches.take().into_iter().map(Ok)),
|
||||
schema,
|
||||
)
|
||||
}
|
||||
let mut results = Vec::with_capacity(outputs.len());
|
||||
|
||||
for output in outputs {
|
||||
let resp = match output {
|
||||
Ok(Output::AffectedRows(rows)) => {
|
||||
Response::Execution(Tag::new_for_execution("OK", Some(rows)))
|
||||
}
|
||||
Ok(Output::Stream(record_stream)) => {
|
||||
let schema = record_stream.schema();
|
||||
recordbatches_to_query_response(record_stream, schema)?
|
||||
}
|
||||
Ok(Output::RecordBatches(recordbatches)) => {
|
||||
let schema = recordbatches.schema();
|
||||
recordbatches_to_query_response(
|
||||
stream::iter(recordbatches.take().into_iter().map(Ok)),
|
||||
schema,
|
||||
)?
|
||||
}
|
||||
Err(e) => Response::Error(Box::new(ErrorInfo::new(
|
||||
"ERROR".to_string(),
|
||||
"XX000".to_string(),
|
||||
e.to_string(),
|
||||
))),
|
||||
};
|
||||
results.push(resp);
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
}
|
||||
|
||||
fn recordbatches_to_query_response<S>(
|
||||
recordbatches_stream: S,
|
||||
schema: SchemaRef,
|
||||
) -> PgWireResult<Vec<Response>>
|
||||
) -> PgWireResult<Response>
|
||||
where
|
||||
S: Stream<Item = RecordBatchResult<RecordBatch>> + Send + Unpin + 'static,
|
||||
{
|
||||
@@ -121,10 +128,10 @@ where
|
||||
})
|
||||
});
|
||||
|
||||
Ok(vec![Response::Query(text_query_response(
|
||||
Ok(Response::Query(text_query_response(
|
||||
pg_schema,
|
||||
data_row_stream,
|
||||
))])
|
||||
)))
|
||||
}
|
||||
|
||||
fn schema_to_pg(origin: SchemaRef) -> Result<Vec<FieldInfo>> {
|
||||
|
||||
@@ -19,6 +19,7 @@ use api::v1::{AdminExpr, AdminResult, ObjectExpr, ObjectResult};
|
||||
use async_trait::async_trait;
|
||||
use common_query::Output;
|
||||
use session::context::QueryContextRef;
|
||||
use sql::statements::statement::Statement;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::influxdb::InfluxdbRequest;
|
||||
@@ -45,7 +46,13 @@ pub type ScriptHandlerRef = Arc<dyn ScriptHandler + Send + Sync>;
|
||||
|
||||
#[async_trait]
|
||||
pub trait SqlQueryHandler {
|
||||
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Result<Output>;
|
||||
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>>;
|
||||
|
||||
async fn do_statement_query(
|
||||
&self,
|
||||
stmt: Statement,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -46,7 +46,15 @@ impl InfluxdbLineProtocolHandler for DummyInstance {
|
||||
|
||||
#[async_trait]
|
||||
impl SqlQueryHandler for DummyInstance {
|
||||
async fn do_query(&self, _: &str, _: QueryContextRef) -> Result<Output> {
|
||||
async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn do_statement_query(
|
||||
&self,
|
||||
_stmt: sql::statements::statement::Statement,
|
||||
_query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,7 +45,15 @@ impl OpentsdbProtocolHandler for DummyInstance {
|
||||
|
||||
#[async_trait]
|
||||
impl SqlQueryHandler for DummyInstance {
|
||||
async fn do_query(&self, _: &str, _: QueryContextRef) -> Result<Output> {
|
||||
async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn do_statement_query(
|
||||
&self,
|
||||
_stmt: sql::statements::statement::Statement,
|
||||
_query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,7 +70,15 @@ impl PrometheusProtocolHandler for DummyInstance {
|
||||
|
||||
#[async_trait]
|
||||
impl SqlQueryHandler for DummyInstance {
|
||||
async fn do_query(&self, _: &str, _: QueryContextRef) -> Result<Output> {
|
||||
async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn do_statement_query(
|
||||
&self,
|
||||
_stmt: sql::statements::statement::Statement,
|
||||
_query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,9 +54,18 @@ impl DummyInstance {
|
||||
|
||||
#[async_trait]
|
||||
impl SqlQueryHandler for DummyInstance {
|
||||
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
|
||||
let plan = self.query_engine.sql_to_plan(query, query_ctx).unwrap();
|
||||
Ok(self.query_engine.execute(&plan).await.unwrap())
|
||||
let output = self.query_engine.execute(&plan).await.unwrap();
|
||||
vec![Ok(output)]
|
||||
}
|
||||
|
||||
async fn do_statement_query(
|
||||
&self,
|
||||
_stmt: sql::statements::statement::Statement,
|
||||
_query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -245,7 +245,8 @@ pub fn column_def_to_schema(column_def: &ColumnDef, is_time_index: bool) -> Resu
|
||||
let is_nullable = column_def
|
||||
.options
|
||||
.iter()
|
||||
.any(|o| matches!(o.option, ColumnOption::Null));
|
||||
.all(|o| !matches!(o.option, ColumnOption::NotNull))
|
||||
&& !is_time_index;
|
||||
|
||||
let name = column_def.name.value.clone();
|
||||
let data_type = sql_data_type_to_concrete_data_type(&column_def.data_type)?;
|
||||
@@ -265,10 +266,10 @@ pub fn sql_column_def_to_grpc_column_def(col: ColumnDef) -> Result<api::v1::Colu
|
||||
let name = col.name.value.clone();
|
||||
let data_type = sql_data_type_to_concrete_data_type(&col.data_type)?;
|
||||
|
||||
let nullable = !col
|
||||
let is_nullable = col
|
||||
.options
|
||||
.iter()
|
||||
.any(|o| matches!(o.option, ColumnOption::NotNull));
|
||||
.all(|o| !matches!(o.option, ColumnOption::NotNull));
|
||||
|
||||
let default_constraint = parse_column_default_constraint(&name, &data_type, &col.options)?
|
||||
.map(ColumnDefaultConstraint::try_into) // serialize default constraint to bytes
|
||||
@@ -281,8 +282,8 @@ pub fn sql_column_def_to_grpc_column_def(col: ColumnDef) -> Result<api::v1::Colu
|
||||
Ok(api::v1::ColumnDef {
|
||||
name,
|
||||
datatype: data_type,
|
||||
is_nullable: nullable,
|
||||
default_constraint,
|
||||
is_nullable,
|
||||
default_constraint: default_constraint.unwrap_or_default(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -587,7 +588,7 @@ mod tests {
|
||||
assert_eq!("col", grpc_column_def.name);
|
||||
assert!(grpc_column_def.is_nullable); // nullable when options are empty
|
||||
assert_eq!(ColumnDataType::Float64 as i32, grpc_column_def.datatype);
|
||||
assert_eq!(None, grpc_column_def.default_constraint);
|
||||
assert!(grpc_column_def.default_constraint.is_empty());
|
||||
|
||||
// test not null
|
||||
let column_def = ColumnDef {
|
||||
@@ -603,4 +604,51 @@ mod tests {
|
||||
let grpc_column_def = sql_column_def_to_grpc_column_def(column_def).unwrap();
|
||||
assert!(!grpc_column_def.is_nullable);
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_column_def_to_schema() {
|
||||
let column_def = ColumnDef {
|
||||
name: "col".into(),
|
||||
data_type: SqlDataType::Double,
|
||||
collation: None,
|
||||
options: vec![],
|
||||
};
|
||||
|
||||
let column_schema = column_def_to_schema(&column_def, false).unwrap();
|
||||
|
||||
assert_eq!("col", column_schema.name);
|
||||
assert_eq!(
|
||||
ConcreteDataType::float64_datatype(),
|
||||
column_schema.data_type
|
||||
);
|
||||
assert!(column_schema.is_nullable());
|
||||
assert!(!column_schema.is_time_index());
|
||||
|
||||
let column_schema = column_def_to_schema(&column_def, true).unwrap();
|
||||
|
||||
assert_eq!("col", column_schema.name);
|
||||
assert_eq!(
|
||||
ConcreteDataType::float64_datatype(),
|
||||
column_schema.data_type
|
||||
);
|
||||
assert!(!column_schema.is_nullable());
|
||||
assert!(column_schema.is_time_index());
|
||||
|
||||
let column_def = ColumnDef {
|
||||
name: "col2".into(),
|
||||
data_type: SqlDataType::String,
|
||||
collation: None,
|
||||
options: vec![ColumnOptionDef {
|
||||
name: None,
|
||||
option: ColumnOption::NotNull,
|
||||
}],
|
||||
};
|
||||
|
||||
let column_schema = column_def_to_schema(&column_def, false).unwrap();
|
||||
|
||||
assert_eq!("col2", column_schema.name);
|
||||
assert_eq!(ConcreteDataType::string_datatype(), column_schema.data_type);
|
||||
assert!(!column_schema.is_nullable());
|
||||
assert!(!column_schema.is_time_index());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ impl TryFrom<AlterTable> for AlterExpr {
|
||||
type Error = crate::error::Error;
|
||||
|
||||
fn try_from(value: AlterTable) -> Result<Self, Self::Error> {
|
||||
let (catalog, schema, table) = table_idents_to_full_name(&value.table_name)?;
|
||||
let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&value.table_name)?;
|
||||
|
||||
let kind = match value.alter_operation {
|
||||
AlterTableOperation::AddConstraint(_) => {
|
||||
@@ -80,9 +80,9 @@ impl TryFrom<AlterTable> for AlterExpr {
|
||||
}
|
||||
};
|
||||
let expr = AlterExpr {
|
||||
catalog_name: Some(catalog),
|
||||
schema_name: Some(schema),
|
||||
table_name: table,
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
kind: Some(kind),
|
||||
};
|
||||
|
||||
|
||||
@@ -21,8 +21,8 @@ futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
lazy_static = "1.4"
|
||||
object-store = { path = "../object-store" }
|
||||
paste = "1.0"
|
||||
parquet = { version = "26", features = ["async"] }
|
||||
paste = "1.0"
|
||||
planus = "0.2"
|
||||
prost = "0.11"
|
||||
regex = "1.5"
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod codec;
|
||||
mod compat;
|
||||
|
||||
use std::any::Any;
|
||||
@@ -499,280 +500,23 @@ impl<'a> IntoIterator for &'a WriteBatch {
|
||||
}
|
||||
}
|
||||
|
||||
pub mod codec {
|
||||
#[cfg(test)]
|
||||
pub(crate) fn new_test_batch() -> WriteBatch {
|
||||
use datatypes::type_id::LogicalTypeId;
|
||||
|
||||
use std::io::Cursor;
|
||||
use std::sync::Arc;
|
||||
use crate::test_util::write_batch_util;
|
||||
|
||||
use datatypes::arrow::ipc::reader::StreamReader;
|
||||
use datatypes::arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::schema::{Schema, SchemaRef};
|
||||
use datatypes::vectors::Helper;
|
||||
use prost::Message;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::WriteRequest;
|
||||
|
||||
use crate::codec::{Decoder, Encoder};
|
||||
use crate::proto::wal::MutationType;
|
||||
use crate::proto::write_batch::{self, gen_columns, gen_put_data_vector};
|
||||
use crate::write_batch::{
|
||||
DataCorruptedSnafu, DecodeArrowSnafu, DecodeProtobufSnafu, DecodeVectorSnafu,
|
||||
EncodeArrowSnafu, EncodeProtobufSnafu, Error as WriteBatchError, FromProtobufSnafu,
|
||||
MissingColumnSnafu, Mutation, ParseSchemaSnafu, PutData, Result, ToProtobufSnafu,
|
||||
WriteBatch,
|
||||
};
|
||||
|
||||
// TODO(jiachun): We can make a comparison with protobuf, including performance, storage cost,
|
||||
// CPU consumption, etc
|
||||
#[derive(Default)]
|
||||
pub struct WriteBatchArrowEncoder {}
|
||||
|
||||
impl WriteBatchArrowEncoder {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
}
|
||||
|
||||
impl Encoder for WriteBatchArrowEncoder {
|
||||
type Item = WriteBatch;
|
||||
type Error = WriteBatchError;
|
||||
|
||||
fn encode(&self, item: &WriteBatch, dst: &mut Vec<u8>) -> Result<()> {
|
||||
let item_schema = item.schema();
|
||||
let arrow_schema = item_schema.arrow_schema();
|
||||
|
||||
let opts = IpcWriteOptions::default();
|
||||
let mut writer = StreamWriter::try_new_with_options(dst, arrow_schema, opts)
|
||||
.context(EncodeArrowSnafu)?;
|
||||
|
||||
for mutation in item.iter() {
|
||||
let rb = match mutation {
|
||||
Mutation::Put(put) => {
|
||||
let arrays = item_schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.map(|column_schema| {
|
||||
let vector = put.column_by_name(&column_schema.name).context(
|
||||
MissingColumnSnafu {
|
||||
name: &column_schema.name,
|
||||
},
|
||||
)?;
|
||||
Ok(vector.to_arrow_array())
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
RecordBatch::try_new(arrow_schema.clone(), arrays)
|
||||
.context(EncodeArrowSnafu)?
|
||||
}
|
||||
};
|
||||
writer.write(&rb).context(EncodeArrowSnafu)?;
|
||||
}
|
||||
writer.finish().context(EncodeArrowSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WriteBatchArrowDecoder {
|
||||
mutation_types: Vec<i32>,
|
||||
}
|
||||
|
||||
impl WriteBatchArrowDecoder {
|
||||
pub fn new(mutation_types: Vec<i32>) -> Self {
|
||||
Self { mutation_types }
|
||||
}
|
||||
}
|
||||
|
||||
impl Decoder for WriteBatchArrowDecoder {
|
||||
type Item = WriteBatch;
|
||||
type Error = WriteBatchError;
|
||||
|
||||
fn decode(&self, src: &[u8]) -> Result<WriteBatch> {
|
||||
let reader = Cursor::new(src);
|
||||
let mut reader = StreamReader::try_new(reader, None).context(DecodeArrowSnafu)?;
|
||||
let arrow_schema = reader.schema();
|
||||
let mut chunks = Vec::with_capacity(self.mutation_types.len());
|
||||
|
||||
for maybe_record_batch in reader.by_ref() {
|
||||
let record_batch = maybe_record_batch.context(DecodeArrowSnafu)?;
|
||||
chunks.push(record_batch);
|
||||
}
|
||||
|
||||
// check if exactly finished
|
||||
ensure!(
|
||||
reader.is_finished(),
|
||||
DataCorruptedSnafu {
|
||||
message: "Impossible, the num of data chunks is different than expected."
|
||||
}
|
||||
);
|
||||
|
||||
ensure!(
|
||||
chunks.len() == self.mutation_types.len(),
|
||||
DataCorruptedSnafu {
|
||||
message: format!(
|
||||
"expected {} mutations, but got {}",
|
||||
self.mutation_types.len(),
|
||||
chunks.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
|
||||
let schema = Arc::new(Schema::try_from(arrow_schema).context(ParseSchemaSnafu)?);
|
||||
let mut write_batch = WriteBatch::new(schema.clone());
|
||||
|
||||
for (mutation_type, record_batch) in self.mutation_types.iter().zip(chunks.into_iter())
|
||||
{
|
||||
match MutationType::from_i32(*mutation_type) {
|
||||
Some(MutationType::Put) => {
|
||||
let mut put_data = PutData::with_num_columns(schema.num_columns());
|
||||
for (column_schema, array) in schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.zip(record_batch.columns().iter())
|
||||
{
|
||||
let vector =
|
||||
Helper::try_into_vector(array).context(DecodeVectorSnafu)?;
|
||||
put_data.add_column_by_name(&column_schema.name, vector)?;
|
||||
}
|
||||
|
||||
write_batch.put(put_data)?;
|
||||
}
|
||||
Some(MutationType::Delete) => {
|
||||
unimplemented!("delete mutation is not implemented")
|
||||
}
|
||||
_ => {
|
||||
return DataCorruptedSnafu {
|
||||
message: format!("Unexpceted mutation type: {}", mutation_type),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(write_batch)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WriteBatchProtobufEncoder {}
|
||||
|
||||
impl Encoder for WriteBatchProtobufEncoder {
|
||||
type Item = WriteBatch;
|
||||
type Error = WriteBatchError;
|
||||
|
||||
fn encode(&self, item: &WriteBatch, dst: &mut Vec<u8>) -> Result<()> {
|
||||
let schema = item.schema().into();
|
||||
|
||||
let mutations = item
|
||||
.iter()
|
||||
.map(|mtn| match mtn {
|
||||
Mutation::Put(put_data) => item
|
||||
.schema()
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.map(|cs| {
|
||||
let vector = put_data
|
||||
.column_by_name(&cs.name)
|
||||
.context(MissingColumnSnafu { name: &cs.name })?;
|
||||
gen_columns(vector).context(ToProtobufSnafu)
|
||||
})
|
||||
.collect::<Result<Vec<_>>>(),
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?
|
||||
.into_iter()
|
||||
.map(|columns| write_batch::Mutation {
|
||||
mutation: Some(write_batch::mutation::Mutation::Put(write_batch::Put {
|
||||
columns,
|
||||
})),
|
||||
})
|
||||
.collect();
|
||||
|
||||
let write_batch = write_batch::WriteBatch {
|
||||
schema: Some(schema),
|
||||
mutations,
|
||||
};
|
||||
|
||||
write_batch.encode(dst).context(EncodeProtobufSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WriteBatchProtobufDecoder {
|
||||
mutation_types: Vec<i32>,
|
||||
}
|
||||
|
||||
impl WriteBatchProtobufDecoder {
|
||||
#[allow(dead_code)]
|
||||
pub fn new(mutation_types: Vec<i32>) -> Self {
|
||||
Self { mutation_types }
|
||||
}
|
||||
}
|
||||
|
||||
impl Decoder for WriteBatchProtobufDecoder {
|
||||
type Item = WriteBatch;
|
||||
type Error = WriteBatchError;
|
||||
|
||||
fn decode(&self, src: &[u8]) -> Result<WriteBatch> {
|
||||
let write_batch = write_batch::WriteBatch::decode(src).context(DecodeProtobufSnafu)?;
|
||||
|
||||
let schema = write_batch.schema.context(DataCorruptedSnafu {
|
||||
message: "schema required",
|
||||
})?;
|
||||
|
||||
let schema = SchemaRef::try_from(schema).context(FromProtobufSnafu {})?;
|
||||
|
||||
ensure!(
|
||||
write_batch.mutations.len() == self.mutation_types.len(),
|
||||
DataCorruptedSnafu {
|
||||
message: &format!(
|
||||
"expected {} mutations, but got {}",
|
||||
self.mutation_types.len(),
|
||||
write_batch.mutations.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
|
||||
let mutations = write_batch
|
||||
.mutations
|
||||
.into_iter()
|
||||
.map(|mtn| match mtn.mutation {
|
||||
Some(write_batch::mutation::Mutation::Put(put)) => {
|
||||
let mut put_data = PutData::with_num_columns(put.columns.len());
|
||||
|
||||
let res = schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.map(|column| (column.name.clone(), column.data_type.clone()))
|
||||
.zip(put.columns.into_iter())
|
||||
.map(|((name, data_type), column)| {
|
||||
gen_put_data_vector(data_type, column)
|
||||
.map(|vector| (name, vector))
|
||||
.context(FromProtobufSnafu)
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?
|
||||
.into_iter()
|
||||
.map(|(name, vector)| put_data.add_column_by_name(&name, vector))
|
||||
.collect::<Result<Vec<_>>>();
|
||||
|
||||
res.map(|_| Mutation::Put(put_data))
|
||||
}
|
||||
Some(write_batch::mutation::Mutation::Delete(_)) => todo!(),
|
||||
_ => DataCorruptedSnafu {
|
||||
message: "invalid mutation type",
|
||||
}
|
||||
.fail(),
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let mut write_batch = WriteBatch::new(schema);
|
||||
|
||||
mutations
|
||||
.into_iter()
|
||||
.try_for_each(|mutation| match mutation {
|
||||
Mutation::Put(put_data) => write_batch.put(put_data),
|
||||
})?;
|
||||
|
||||
Ok(write_batch)
|
||||
}
|
||||
}
|
||||
write_batch_util::new_write_batch(
|
||||
&[
|
||||
("k1", LogicalTypeId::UInt64, false),
|
||||
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
|
||||
("ts", LogicalTypeId::TimestampMillisecond, false),
|
||||
("v1", LogicalTypeId::Boolean, true),
|
||||
],
|
||||
Some(2),
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::iter;
|
||||
@@ -785,8 +529,6 @@ mod tests {
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use crate::codec::{Decoder, Encoder};
|
||||
use crate::proto;
|
||||
use crate::test_util::write_batch_util;
|
||||
|
||||
#[test]
|
||||
@@ -824,18 +566,6 @@ mod tests {
|
||||
assert!(put_data.is_empty());
|
||||
}
|
||||
|
||||
fn new_test_batch() -> WriteBatch {
|
||||
write_batch_util::new_write_batch(
|
||||
&[
|
||||
("k1", LogicalTypeId::UInt64, false),
|
||||
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
|
||||
("ts", LogicalTypeId::TimestampMillisecond, false),
|
||||
("v1", LogicalTypeId::Boolean, true),
|
||||
],
|
||||
Some(2),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_batch_put() {
|
||||
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
|
||||
@@ -959,7 +689,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_align_timestamp() {
|
||||
fn test_align_timestamp() {
|
||||
let duration_millis = 20;
|
||||
let ts = [-21, -20, -19, -1, 0, 5, 15, 19, 20, 21];
|
||||
let res = ts.map(|t| align_timestamp(t, duration_millis));
|
||||
@@ -967,7 +697,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_align_timestamp_overflow() {
|
||||
fn test_align_timestamp_overflow() {
|
||||
assert_eq!(Some(i64::MIN), align_timestamp(i64::MIN, 1));
|
||||
assert_eq!(Some(-9223372036854775808), align_timestamp(i64::MIN, 2));
|
||||
assert_eq!(
|
||||
@@ -982,7 +712,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_write_batch_time_range() {
|
||||
fn test_write_batch_time_range() {
|
||||
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3, 4, 5, 6]));
|
||||
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![
|
||||
-21, -20, -1, 0, 1, 20,
|
||||
@@ -1011,7 +741,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_write_batch_time_range_const_vector() {
|
||||
fn test_write_batch_time_range_const_vector() {
|
||||
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3, 4, 5, 6]));
|
||||
let tsv = Arc::new(ConstantVector::new(
|
||||
Arc::new(TimestampMillisecondVector::from_vec(vec![20])),
|
||||
@@ -1039,111 +769,4 @@ mod tests {
|
||||
ranges.as_slice()
|
||||
)
|
||||
}
|
||||
|
||||
fn gen_new_batch_and_types() -> (WriteBatch, Vec<i32>) {
|
||||
let mut batch = new_test_batch();
|
||||
for i in 0..10 {
|
||||
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
|
||||
let boolv = Arc::new(BooleanVector::from(vec![Some(true), Some(false), None]));
|
||||
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![i, i, i]));
|
||||
|
||||
let mut put_data = PutData::new();
|
||||
put_data.add_key_column("k1", intv.clone()).unwrap();
|
||||
put_data.add_version_column(intv).unwrap();
|
||||
put_data.add_value_column("v1", boolv).unwrap();
|
||||
put_data.add_key_column("ts", tsv).unwrap();
|
||||
|
||||
batch.put(put_data).unwrap();
|
||||
}
|
||||
|
||||
let types = proto::wal::gen_mutation_types(&batch);
|
||||
|
||||
(batch, types)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_codec_arrow() -> Result<()> {
|
||||
let (batch, mutation_types) = gen_new_batch_and_types();
|
||||
|
||||
let encoder = codec::WriteBatchArrowEncoder::new();
|
||||
let mut dst = vec![];
|
||||
let result = encoder.encode(&batch, &mut dst);
|
||||
assert!(result.is_ok());
|
||||
|
||||
let decoder = codec::WriteBatchArrowDecoder::new(mutation_types);
|
||||
let result = decoder.decode(&dst);
|
||||
let batch2 = result?;
|
||||
assert_eq!(batch.num_rows, batch2.num_rows);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_codec_protobuf() -> Result<()> {
|
||||
let (batch, mutation_types) = gen_new_batch_and_types();
|
||||
|
||||
let encoder = codec::WriteBatchProtobufEncoder {};
|
||||
let mut dst = vec![];
|
||||
let result = encoder.encode(&batch, &mut dst);
|
||||
assert!(result.is_ok());
|
||||
|
||||
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_types);
|
||||
let result = decoder.decode(&dst);
|
||||
let batch2 = result?;
|
||||
assert_eq!(batch.num_rows, batch2.num_rows);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn gen_new_batch_and_types_with_none_column() -> (WriteBatch, Vec<i32>) {
|
||||
let mut batch = new_test_batch();
|
||||
for _ in 0..10 {
|
||||
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
|
||||
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![0, 0, 0]));
|
||||
|
||||
let mut put_data = PutData::new();
|
||||
put_data.add_key_column("k1", intv.clone()).unwrap();
|
||||
put_data.add_version_column(intv).unwrap();
|
||||
put_data.add_key_column("ts", tsv).unwrap();
|
||||
|
||||
batch.put(put_data).unwrap();
|
||||
}
|
||||
|
||||
let types = proto::wal::gen_mutation_types(&batch);
|
||||
|
||||
(batch, types)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_codec_with_none_column_arrow() -> Result<()> {
|
||||
let (batch, mutation_types) = gen_new_batch_and_types_with_none_column();
|
||||
|
||||
let encoder = codec::WriteBatchArrowEncoder::new();
|
||||
let mut dst = vec![];
|
||||
let result = encoder.encode(&batch, &mut dst);
|
||||
assert!(result.is_ok());
|
||||
|
||||
let decoder = codec::WriteBatchArrowDecoder::new(mutation_types);
|
||||
let result = decoder.decode(&dst);
|
||||
let batch2 = result?;
|
||||
assert_eq!(batch.num_rows, batch2.num_rows);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_codec_with_none_column_protobuf() -> Result<()> {
|
||||
let (batch, mutation_types) = gen_new_batch_and_types_with_none_column();
|
||||
|
||||
let encoder = codec::WriteBatchProtobufEncoder {};
|
||||
let mut dst = vec![];
|
||||
encoder.encode(&batch, &mut dst).unwrap();
|
||||
|
||||
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_types);
|
||||
let result = decoder.decode(&dst);
|
||||
let batch2 = result?;
|
||||
assert_eq!(batch.num_rows, batch2.num_rows);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
399
src/storage/src/write_batch/codec.rs
Normal file
399
src/storage/src/write_batch/codec.rs
Normal file
@@ -0,0 +1,399 @@
|
||||
// Copyright 2022 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::io::Cursor;
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::arrow::ipc::reader::StreamReader;
|
||||
use datatypes::arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::schema::{Schema, SchemaRef};
|
||||
use datatypes::vectors::Helper;
|
||||
use prost::Message;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::WriteRequest;
|
||||
|
||||
use crate::codec::{Decoder, Encoder};
|
||||
use crate::proto::wal::MutationType;
|
||||
use crate::proto::write_batch::{self, gen_columns, gen_put_data_vector};
|
||||
use crate::write_batch::{
|
||||
DataCorruptedSnafu, DecodeArrowSnafu, DecodeProtobufSnafu, DecodeVectorSnafu, EncodeArrowSnafu,
|
||||
EncodeProtobufSnafu, Error as WriteBatchError, FromProtobufSnafu, MissingColumnSnafu, Mutation,
|
||||
ParseSchemaSnafu, PutData, Result, ToProtobufSnafu, WriteBatch,
|
||||
};
|
||||
|
||||
// TODO(jiachun): We can make a comparison with protobuf, including performance, storage cost,
|
||||
// CPU consumption, etc
|
||||
#[derive(Default)]
|
||||
pub struct WriteBatchArrowEncoder {}
|
||||
|
||||
impl WriteBatchArrowEncoder {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
}
|
||||
|
||||
impl Encoder for WriteBatchArrowEncoder {
|
||||
type Item = WriteBatch;
|
||||
type Error = WriteBatchError;
|
||||
|
||||
fn encode(&self, item: &WriteBatch, dst: &mut Vec<u8>) -> Result<()> {
|
||||
let item_schema = item.schema();
|
||||
let arrow_schema = item_schema.arrow_schema();
|
||||
|
||||
let opts = IpcWriteOptions::default();
|
||||
let mut writer = StreamWriter::try_new_with_options(dst, arrow_schema, opts)
|
||||
.context(EncodeArrowSnafu)?;
|
||||
|
||||
for mutation in item.iter() {
|
||||
let rb = match mutation {
|
||||
Mutation::Put(put) => {
|
||||
let arrays = item_schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.map(|column_schema| {
|
||||
let vector = put.column_by_name(&column_schema.name).context(
|
||||
MissingColumnSnafu {
|
||||
name: &column_schema.name,
|
||||
},
|
||||
)?;
|
||||
Ok(vector.to_arrow_array())
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
RecordBatch::try_new(arrow_schema.clone(), arrays).context(EncodeArrowSnafu)?
|
||||
}
|
||||
};
|
||||
writer.write(&rb).context(EncodeArrowSnafu)?;
|
||||
}
|
||||
writer.finish().context(EncodeArrowSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WriteBatchArrowDecoder {
|
||||
mutation_types: Vec<i32>,
|
||||
}
|
||||
|
||||
impl WriteBatchArrowDecoder {
|
||||
pub fn new(mutation_types: Vec<i32>) -> Self {
|
||||
Self { mutation_types }
|
||||
}
|
||||
}
|
||||
|
||||
impl Decoder for WriteBatchArrowDecoder {
|
||||
type Item = WriteBatch;
|
||||
type Error = WriteBatchError;
|
||||
|
||||
fn decode(&self, src: &[u8]) -> Result<WriteBatch> {
|
||||
let reader = Cursor::new(src);
|
||||
let mut reader = StreamReader::try_new(reader, None).context(DecodeArrowSnafu)?;
|
||||
let arrow_schema = reader.schema();
|
||||
let mut chunks = Vec::with_capacity(self.mutation_types.len());
|
||||
|
||||
for maybe_record_batch in reader.by_ref() {
|
||||
let record_batch = maybe_record_batch.context(DecodeArrowSnafu)?;
|
||||
chunks.push(record_batch);
|
||||
}
|
||||
|
||||
// check if exactly finished
|
||||
ensure!(
|
||||
reader.is_finished(),
|
||||
DataCorruptedSnafu {
|
||||
message: "Impossible, the num of data chunks is different than expected."
|
||||
}
|
||||
);
|
||||
|
||||
ensure!(
|
||||
chunks.len() == self.mutation_types.len(),
|
||||
DataCorruptedSnafu {
|
||||
message: format!(
|
||||
"expected {} mutations, but got {}",
|
||||
self.mutation_types.len(),
|
||||
chunks.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
|
||||
let schema = Arc::new(Schema::try_from(arrow_schema).context(ParseSchemaSnafu)?);
|
||||
let mut write_batch = WriteBatch::new(schema.clone());
|
||||
|
||||
for (mutation_type, record_batch) in self.mutation_types.iter().zip(chunks.into_iter()) {
|
||||
match MutationType::from_i32(*mutation_type) {
|
||||
Some(MutationType::Put) => {
|
||||
let mut put_data = PutData::with_num_columns(schema.num_columns());
|
||||
for (column_schema, array) in schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.zip(record_batch.columns().iter())
|
||||
{
|
||||
let vector = Helper::try_into_vector(array).context(DecodeVectorSnafu)?;
|
||||
put_data.add_column_by_name(&column_schema.name, vector)?;
|
||||
}
|
||||
|
||||
write_batch.put(put_data)?;
|
||||
}
|
||||
Some(MutationType::Delete) => {
|
||||
unimplemented!("delete mutation is not implemented")
|
||||
}
|
||||
_ => {
|
||||
return DataCorruptedSnafu {
|
||||
message: format!("Unexpceted mutation type: {}", mutation_type),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(write_batch)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WriteBatchProtobufEncoder {}
|
||||
|
||||
impl Encoder for WriteBatchProtobufEncoder {
|
||||
type Item = WriteBatch;
|
||||
type Error = WriteBatchError;
|
||||
|
||||
fn encode(&self, item: &WriteBatch, dst: &mut Vec<u8>) -> Result<()> {
|
||||
let schema = item.schema().into();
|
||||
|
||||
let mutations = item
|
||||
.iter()
|
||||
.map(|mtn| match mtn {
|
||||
Mutation::Put(put_data) => item
|
||||
.schema()
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.map(|cs| {
|
||||
let vector = put_data
|
||||
.column_by_name(&cs.name)
|
||||
.context(MissingColumnSnafu { name: &cs.name })?;
|
||||
gen_columns(vector).context(ToProtobufSnafu)
|
||||
})
|
||||
.collect::<Result<Vec<_>>>(),
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?
|
||||
.into_iter()
|
||||
.map(|columns| write_batch::Mutation {
|
||||
mutation: Some(write_batch::mutation::Mutation::Put(write_batch::Put {
|
||||
columns,
|
||||
})),
|
||||
})
|
||||
.collect();
|
||||
|
||||
let write_batch = write_batch::WriteBatch {
|
||||
schema: Some(schema),
|
||||
mutations,
|
||||
};
|
||||
|
||||
write_batch.encode(dst).context(EncodeProtobufSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WriteBatchProtobufDecoder {
|
||||
mutation_types: Vec<i32>,
|
||||
}
|
||||
|
||||
impl WriteBatchProtobufDecoder {
|
||||
#[allow(dead_code)]
|
||||
pub fn new(mutation_types: Vec<i32>) -> Self {
|
||||
Self { mutation_types }
|
||||
}
|
||||
}
|
||||
|
||||
impl Decoder for WriteBatchProtobufDecoder {
|
||||
type Item = WriteBatch;
|
||||
type Error = WriteBatchError;
|
||||
|
||||
fn decode(&self, src: &[u8]) -> Result<WriteBatch> {
|
||||
let write_batch = write_batch::WriteBatch::decode(src).context(DecodeProtobufSnafu)?;
|
||||
|
||||
let schema = write_batch.schema.context(DataCorruptedSnafu {
|
||||
message: "schema required",
|
||||
})?;
|
||||
|
||||
let schema = SchemaRef::try_from(schema).context(FromProtobufSnafu {})?;
|
||||
|
||||
ensure!(
|
||||
write_batch.mutations.len() == self.mutation_types.len(),
|
||||
DataCorruptedSnafu {
|
||||
message: &format!(
|
||||
"expected {} mutations, but got {}",
|
||||
self.mutation_types.len(),
|
||||
write_batch.mutations.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
|
||||
let mutations = write_batch
|
||||
.mutations
|
||||
.into_iter()
|
||||
.map(|mtn| match mtn.mutation {
|
||||
Some(write_batch::mutation::Mutation::Put(put)) => {
|
||||
let mut put_data = PutData::with_num_columns(put.columns.len());
|
||||
|
||||
let res = schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.map(|column| (column.name.clone(), column.data_type.clone()))
|
||||
.zip(put.columns.into_iter())
|
||||
.map(|((name, data_type), column)| {
|
||||
gen_put_data_vector(data_type, column)
|
||||
.map(|vector| (name, vector))
|
||||
.context(FromProtobufSnafu)
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?
|
||||
.into_iter()
|
||||
.map(|(name, vector)| put_data.add_column_by_name(&name, vector))
|
||||
.collect::<Result<Vec<_>>>();
|
||||
|
||||
res.map(|_| Mutation::Put(put_data))
|
||||
}
|
||||
Some(write_batch::mutation::Mutation::Delete(_)) => todo!(),
|
||||
_ => DataCorruptedSnafu {
|
||||
message: "invalid mutation type",
|
||||
}
|
||||
.fail(),
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let mut write_batch = WriteBatch::new(schema);
|
||||
|
||||
mutations
|
||||
.into_iter()
|
||||
.try_for_each(|mutation| match mutation {
|
||||
Mutation::Put(put_data) => write_batch.put(put_data),
|
||||
})?;
|
||||
|
||||
Ok(write_batch)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::vectors::{BooleanVector, TimestampMillisecondVector, UInt64Vector};
|
||||
use store_api::storage::PutOperation;
|
||||
|
||||
use super::*;
|
||||
use crate::{proto, write_batch};
|
||||
|
||||
fn gen_new_batch_and_types() -> (WriteBatch, Vec<i32>) {
|
||||
let mut batch = write_batch::new_test_batch();
|
||||
for i in 0..10 {
|
||||
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
|
||||
let boolv = Arc::new(BooleanVector::from(vec![Some(true), Some(false), None]));
|
||||
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![i, i, i]));
|
||||
|
||||
let mut put_data = PutData::new();
|
||||
put_data.add_key_column("k1", intv.clone()).unwrap();
|
||||
put_data.add_version_column(intv).unwrap();
|
||||
put_data.add_value_column("v1", boolv).unwrap();
|
||||
put_data.add_key_column("ts", tsv).unwrap();
|
||||
|
||||
batch.put(put_data).unwrap();
|
||||
}
|
||||
|
||||
let types = proto::wal::gen_mutation_types(&batch);
|
||||
|
||||
(batch, types)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_codec_arrow() -> Result<()> {
|
||||
let (batch, mutation_types) = gen_new_batch_and_types();
|
||||
|
||||
let encoder = WriteBatchArrowEncoder::new();
|
||||
let mut dst = vec![];
|
||||
let result = encoder.encode(&batch, &mut dst);
|
||||
assert!(result.is_ok());
|
||||
|
||||
let decoder = WriteBatchArrowDecoder::new(mutation_types);
|
||||
let result = decoder.decode(&dst);
|
||||
let batch2 = result?;
|
||||
assert_eq!(batch.num_rows, batch2.num_rows);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_codec_protobuf() -> Result<()> {
|
||||
let (batch, mutation_types) = gen_new_batch_and_types();
|
||||
|
||||
let encoder = WriteBatchProtobufEncoder {};
|
||||
let mut dst = vec![];
|
||||
let result = encoder.encode(&batch, &mut dst);
|
||||
assert!(result.is_ok());
|
||||
|
||||
let decoder = WriteBatchProtobufDecoder::new(mutation_types);
|
||||
let result = decoder.decode(&dst);
|
||||
let batch2 = result?;
|
||||
assert_eq!(batch.num_rows, batch2.num_rows);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn gen_new_batch_and_types_with_none_column() -> (WriteBatch, Vec<i32>) {
|
||||
let mut batch = write_batch::new_test_batch();
|
||||
for _ in 0..10 {
|
||||
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
|
||||
let tsv = Arc::new(TimestampMillisecondVector::from_vec(vec![0, 0, 0]));
|
||||
|
||||
let mut put_data = PutData::new();
|
||||
put_data.add_key_column("k1", intv.clone()).unwrap();
|
||||
put_data.add_version_column(intv).unwrap();
|
||||
put_data.add_key_column("ts", tsv).unwrap();
|
||||
|
||||
batch.put(put_data).unwrap();
|
||||
}
|
||||
|
||||
let types = proto::wal::gen_mutation_types(&batch);
|
||||
|
||||
(batch, types)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_codec_with_none_column_arrow() -> Result<()> {
|
||||
let (batch, mutation_types) = gen_new_batch_and_types_with_none_column();
|
||||
|
||||
let encoder = WriteBatchArrowEncoder::new();
|
||||
let mut dst = vec![];
|
||||
let result = encoder.encode(&batch, &mut dst);
|
||||
assert!(result.is_ok());
|
||||
|
||||
let decoder = WriteBatchArrowDecoder::new(mutation_types);
|
||||
let result = decoder.decode(&dst);
|
||||
let batch2 = result?;
|
||||
assert_eq!(batch.num_rows, batch2.num_rows);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_codec_with_none_column_protobuf() -> Result<()> {
|
||||
let (batch, mutation_types) = gen_new_batch_and_types_with_none_column();
|
||||
|
||||
let encoder = WriteBatchProtobufEncoder {};
|
||||
let mut dst = vec![];
|
||||
encoder.encode(&batch, &mut dst).unwrap();
|
||||
|
||||
let decoder = WriteBatchProtobufDecoder::new(mutation_types);
|
||||
let result = decoder.decode(&dst);
|
||||
let batch2 = result?;
|
||||
assert_eq!(batch.num_rows, batch2.num_rows);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -27,6 +27,6 @@ tokio = { version = "1.18", features = ["full"] }
|
||||
|
||||
[dev-dependencies]
|
||||
datafusion-expr = "14.0.0"
|
||||
parquet = { version = "26", features = ["async"] }
|
||||
tempdir = "0.3"
|
||||
tokio-util = { version = "0.7", features = ["compat"] }
|
||||
parquet = { version = "26", features = ["async"] }
|
||||
|
||||
@@ -15,7 +15,7 @@ use api::v1::alter_expr::Kind;
|
||||
use api::v1::column::SemanticType;
|
||||
use api::v1::{
|
||||
admin_result, column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
|
||||
CreateExpr, InsertExpr, MutateResult,
|
||||
CreateTableExpr, InsertExpr, MutateResult, TableId,
|
||||
};
|
||||
use client::admin::Admin;
|
||||
use client::{Client, Database, ObjectResult};
|
||||
@@ -151,7 +151,7 @@ pub async fn test_insert_and_select(store_type: StorageType) {
|
||||
name: "test_column".to_string(),
|
||||
datatype: ColumnDataType::Int64.into(),
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
};
|
||||
let kind = Kind::AddColumns(AddColumns {
|
||||
add_columns: vec![AddColumn {
|
||||
@@ -161,8 +161,8 @@ pub async fn test_insert_and_select(store_type: StorageType) {
|
||||
});
|
||||
let expr = AlterExpr {
|
||||
table_name: "test_table".to_string(),
|
||||
catalog_name: None,
|
||||
schema_name: None,
|
||||
catalog_name: "".to_string(),
|
||||
schema_name: "".to_string(),
|
||||
kind: Some(kind),
|
||||
};
|
||||
let result = admin.alter(expr).await.unwrap();
|
||||
@@ -222,44 +222,46 @@ async fn insert_and_assert(db: &Database) {
|
||||
}
|
||||
}
|
||||
|
||||
fn testing_create_expr() -> CreateExpr {
|
||||
fn testing_create_expr() -> CreateTableExpr {
|
||||
let column_defs = vec![
|
||||
ColumnDef {
|
||||
name: "host".to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
is_nullable: false,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "cpu".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "memory".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
ColumnDef {
|
||||
name: "ts".to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond as i32, // timestamp
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
default_constraint: vec![],
|
||||
},
|
||||
];
|
||||
CreateExpr {
|
||||
catalog_name: None,
|
||||
schema_name: None,
|
||||
CreateTableExpr {
|
||||
catalog_name: "".to_string(),
|
||||
schema_name: "".to_string(),
|
||||
table_name: "demo".to_string(),
|
||||
desc: Some("blabla".to_string()),
|
||||
desc: "blabla little magic fairy".to_string(),
|
||||
column_defs,
|
||||
time_index: "ts".to_string(),
|
||||
primary_keys: vec!["host".to_string()],
|
||||
create_if_not_exists: true,
|
||||
table_options: Default::default(),
|
||||
table_id: Some(MIN_USER_TABLE_ID),
|
||||
table_id: Some(TableId {
|
||||
id: MIN_USER_TABLE_ID,
|
||||
}),
|
||||
region_ids: vec![0],
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,13 +59,12 @@ macro_rules! http_tests {
|
||||
|
||||
pub async fn test_sql_api(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) = setup_test_app(store_type, "sql_api").await;
|
||||
let (app, mut guard) = setup_test_app_with_frontend(store_type, "sql_api").await;
|
||||
let client = TestClient::new(app);
|
||||
let res = client.get("/v1/sql").send().await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
|
||||
// body json: r#"{"code":1004,"error":"sql parameter is required."}"#
|
||||
assert_eq!(body.code(), 1004);
|
||||
assert_eq!(body.error().unwrap(), "sql parameter is required.");
|
||||
assert!(body.execution_time_ms().is_some());
|
||||
@@ -77,9 +76,6 @@ pub async fn test_sql_api(store_type: StorageType) {
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
|
||||
// body json:
|
||||
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"number","data_type":"UInt32"}]},"rows":[[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]}}]}"#
|
||||
|
||||
assert!(body.success());
|
||||
assert!(body.execution_time_ms().is_some());
|
||||
|
||||
@@ -107,7 +103,6 @@ pub async fn test_sql_api(store_type: StorageType) {
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
|
||||
// body json: r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"host","data_type":"String"},{"name":"cpu","data_type":"Float64"},{"name":"memory","data_type":"Float64"},{"name":"ts","data_type":"Timestamp"}]},"rows":[["host",66.6,1024.0,0]]}}]}"#
|
||||
assert!(body.success());
|
||||
assert!(body.execution_time_ms().is_some());
|
||||
let output = body.output().unwrap();
|
||||
@@ -128,8 +123,6 @@ pub async fn test_sql_api(store_type: StorageType) {
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
|
||||
// body json:
|
||||
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"Timestamp"}]},"rows":[[66.6,0]]}}]}"#
|
||||
assert!(body.success());
|
||||
assert!(body.execution_time_ms().is_some());
|
||||
let output = body.output().unwrap();
|
||||
@@ -150,8 +143,6 @@ pub async fn test_sql_api(store_type: StorageType) {
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
|
||||
// body json:
|
||||
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"c","data_type":"Float64"},{"name":"time","data_type":"Timestamp"}]},"rows":[[66.6,0]]}}]}"#
|
||||
assert!(body.success());
|
||||
assert!(body.execution_time_ms().is_some());
|
||||
let output = body.output().unwrap();
|
||||
@@ -163,6 +154,44 @@ pub async fn test_sql_api(store_type: StorageType) {
|
||||
})).unwrap()
|
||||
);
|
||||
|
||||
// test multi-statement
|
||||
let res = client
|
||||
.get("/v1/sql?sql=select cpu, ts from demo limit 1;select cpu, ts from demo where ts > 0;")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
|
||||
assert!(body.success());
|
||||
assert!(body.execution_time_ms().is_some());
|
||||
let outputs = body.output().unwrap();
|
||||
assert_eq!(outputs.len(), 2);
|
||||
assert_eq!(
|
||||
outputs[0],
|
||||
serde_json::from_value::<JsonOutput>(json!({
|
||||
"records":{"schema":{"column_schemas":[{"name":"cpu","data_type":"Float64"},{"name":"ts","data_type":"TimestampMillisecond"}]},"rows":[[66.6,0]]}
|
||||
})).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
outputs[1],
|
||||
serde_json::from_value::<JsonOutput>(json!({
|
||||
"records":{"rows":[]}
|
||||
}))
|
||||
.unwrap()
|
||||
);
|
||||
|
||||
// test multi-statement with error
|
||||
let res = client
|
||||
.get("/v1/sql?sql=select cpu, ts from demo limit 1;select cpu, ts from demo2 where ts > 0;")
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
|
||||
assert!(!body.success());
|
||||
assert!(body.execution_time_ms().is_some());
|
||||
assert!(body.error().unwrap().contains("not found"));
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
@@ -206,7 +235,6 @@ def test(n):
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
|
||||
// body json: r#"{"code":0}"#
|
||||
assert_eq!(body.code(), 0);
|
||||
assert!(body.output().is_none());
|
||||
|
||||
@@ -215,8 +243,6 @@ def test(n):
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
|
||||
|
||||
// body json:
|
||||
// r#"{"code":0,"output":[{"records":{"schema":{"column_schemas":[{"name":"n","data_type":"Float64"}]},"rows":[[1.0],[2.0],[3.0],[4.0],[5.0],[6.0],[7.0],[8.0],[9.0],[10.0]]}}]}"#
|
||||
assert_eq!(body.code(), 0);
|
||||
assert!(body.execution_time_ms().is_some());
|
||||
let output = body.output().unwrap();
|
||||
|
||||
Reference in New Issue
Block a user