feat: make blob(binary) type working (#1818)

* feat: test blob type

* feat: make blob type working

* chore: comment

* Update src/sql/src/statements/insert.rs

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

* chore: by CR comments

* fix: comment

* Update src/sql/src/statements/insert.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/sql/src/statements/insert.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* fix: test

---------

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
dennis zhuang
2023-06-26 16:49:04 +08:00
committed by GitHub
parent a95f8767a8
commit 034564fd27
11 changed files with 335 additions and 71 deletions

View File

@@ -196,8 +196,11 @@ pub enum Error {
source: sql::error::Error,
},
#[snafu(display("Missing insert body"))]
MissingInsertBody { location: Location },
#[snafu(display("Missing insert body, source: {source}"))]
MissingInsertBody {
source: sql::error::Error,
location: Location,
},
#[snafu(display("Failed to insert value to table: {}, source: {}", table_name, source))]
Insert {
@@ -527,7 +530,6 @@ impl ErrorExt for Error {
| ConstraintNotSupported { .. }
| SchemaExists { .. }
| ParseTimestamp { .. }
| MissingInsertBody { .. }
| DatabaseNotFound { .. }
| MissingNodeId { .. }
| MissingMetasrvOpts { .. }
@@ -555,6 +557,7 @@ impl ErrorExt for Error {
| Catalog { .. }
| MissingRequiredField { .. }
| IncorrectInternalState { .. }
| MissingInsertBody { .. }
| ShutdownInstance { .. }
| CloseTableEngine { .. }
| JoinTask { .. } => StatusCode::Internal,

View File

@@ -41,9 +41,12 @@ impl Instance {
async fn do_execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
match stmt {
Statement::Insert(insert) => {
let request =
SqlHandler::insert_to_request(self.catalog_manager.clone(), *insert, query_ctx)
.await?;
let request = SqlHandler::insert_to_request(
self.catalog_manager.clone(),
&insert,
query_ctx.clone(),
)
.await?;
self.sql_handler.insert(request).await
}
Statement::CreateDatabase(create_database) => {

View File

@@ -29,8 +29,8 @@ use table::TableRef;
use crate::error::{
CatalogSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu, ColumnNotFoundSnafu,
ColumnValuesNumberMismatchSnafu, InsertSnafu, MissingInsertBodySnafu, ParseSqlSnafu,
ParseSqlValueSnafu, Result, TableNotFoundSnafu,
ColumnValuesNumberMismatchSnafu, InsertSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result,
TableNotFoundSnafu,
};
use crate::sql::{table_idents_to_full_name, SqlHandler};
@@ -58,12 +58,10 @@ impl SqlHandler {
fn build_request_from_values(
table_ref: TableReference,
table: &TableRef,
stmt: Insert,
stmt: &Insert,
) -> Result<InsertRequest> {
let values = stmt
.values_body()
.context(ParseSqlValueSnafu)?
.context(MissingInsertBodySnafu)?;
let values = stmt.values_body().context(MissingInsertBodySnafu)?;
let columns = stmt.columns();
let schema = table.schema();
let columns_num = if columns.is_empty() {
@@ -125,7 +123,7 @@ impl SqlHandler {
pub async fn insert_to_request(
catalog_manager: CatalogManagerRef,
stmt: Insert,
stmt: &Insert,
query_ctx: QueryContextRef,
) -> Result<InsertRequest> {
let (catalog_name, schema_name, table_name) =

View File

@@ -223,6 +223,11 @@ impl Helper {
ArrowDataType::Null => Arc::new(NullVector::try_from_arrow_array(array)?),
ArrowDataType::Boolean => Arc::new(BooleanVector::try_from_arrow_array(array)?),
ArrowDataType::LargeBinary => Arc::new(BinaryVector::try_from_arrow_array(array)?),
ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Binary => {
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::LargeBinary)
.context(crate::error::ArrowComputeSnafu)?;
Arc::new(BinaryVector::try_from_arrow_array(array)?)
}
ArrowDataType::Int8 => Arc::new(Int8Vector::try_from_arrow_array(array)?),
ArrowDataType::Int16 => Arc::new(Int16Vector::try_from_arrow_array(array)?),
ArrowDataType::Int32 => Arc::new(Int32Vector::try_from_arrow_array(array)?),
@@ -234,6 +239,11 @@ impl Helper {
ArrowDataType::Float32 => Arc::new(Float32Vector::try_from_arrow_array(array)?),
ArrowDataType::Float64 => Arc::new(Float64Vector::try_from_arrow_array(array)?),
ArrowDataType::Utf8 => Arc::new(StringVector::try_from_arrow_array(array)?),
ArrowDataType::LargeUtf8 => {
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Utf8)
.context(crate::error::ArrowComputeSnafu)?;
Arc::new(BinaryVector::try_from_arrow_array(array)?)
}
ArrowDataType::Date32 => Arc::new(DateVector::try_from_arrow_array(array)?),
ArrowDataType::Date64 => Arc::new(DateTimeVector::try_from_arrow_array(array)?),
ArrowDataType::List(_) => Arc::new(ListVector::try_from_arrow_array(array)?),
@@ -256,9 +266,6 @@ impl Helper {
| ArrowDataType::Time64(_)
| ArrowDataType::Duration(_)
| ArrowDataType::Interval(_)
| ArrowDataType::Binary
| ArrowDataType::FixedSizeBinary(_)
| ArrowDataType::LargeUtf8
| ArrowDataType::LargeList(_)
| ArrowDataType::FixedSizeList(_, _)
| ArrowDataType::Struct(_)

View File

@@ -407,7 +407,7 @@ impl DistInstance {
.context(TableNotFoundSnafu { table_name: table })?;
let insert_request =
SqlHandler::insert_to_request(self.catalog_manager.clone(), *insert, query_ctx)
SqlHandler::insert_to_request(self.catalog_manager.clone(), &insert, query_ctx)
.await
.context(InvokeDatanodeSnafu)?;

View File

@@ -84,10 +84,18 @@ impl StatementExecutor {
self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
}
// For performance consideration, only "insert with select" is executed by query engine.
// Plain insert ("insert with values") is still executed directly in statement.
Statement::Insert(ref insert) if insert.is_insert_select() => {
self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
// For performance consideration, only requests that can't extract values is executed by query engine.
// Plain insert ("insert with literal values") is still executed directly in statement.
Statement::Insert(insert) => {
if insert.can_extract_values() {
self.sql_stmt_executor
.execute_sql(Statement::Insert(insert), query_ctx)
.await
.context(ExecuteStatementSnafu)
} else {
self.plan_exec(QueryStatement::Sql(Statement::Insert(insert)), query_ctx)
.await
}
}
Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
@@ -120,7 +128,6 @@ impl StatementExecutor {
Statement::CreateDatabase(_)
| Statement::CreateTable(_)
| Statement::CreateExternalTable(_)
| Statement::Insert(_)
| Statement::Alter(_)
| Statement::DropTable(_)
| Statement::ShowCreateTable(_) => self
@@ -213,7 +220,7 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<
}
/// Converts [CopyDatabaseArgument] to [CopyDatabaseRequest].
/// This function extracts the necessary info including catalog/database name, time range, etc.
/// This function extracts the necessary info including catalog/database name, time range, etc.
fn to_copy_database_request(
arg: CopyDatabaseArgument,
query_ctx: &QueryContextRef,

View File

@@ -352,7 +352,9 @@ pub fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result<Co
SqlDataType::Double => Ok(ConcreteDataType::float64_datatype()),
SqlDataType::Boolean => Ok(ConcreteDataType::boolean_datatype()),
SqlDataType::Date => Ok(ConcreteDataType::date_datatype()),
SqlDataType::Varbinary(_) => Ok(ConcreteDataType::binary_datatype()),
SqlDataType::Blob(_) | SqlDataType::Bytea | SqlDataType::Varbinary(_) => {
Ok(ConcreteDataType::binary_datatype())
}
SqlDataType::Datetime(_) => Ok(ConcreteDataType::datetime_datatype()),
SqlDataType::Timestamp(precision, _) => Ok(precision
.as_ref()

View File

@@ -15,7 +15,7 @@ use sqlparser::ast::{ObjectName, Query, SetExpr, Statement, UnaryOperator, Value
use sqlparser::parser::ParserError;
use crate::ast::{Expr, Value};
use crate::error::{self, Result};
use crate::error::Result;
use crate::statements::query::Query as GtQuery;
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -24,6 +24,15 @@ pub struct Insert {
pub inner: Statement,
}
macro_rules! parse_fail {
($expr: expr) => {
return crate::error::ParseSqlValueSnafu {
msg: format!("{:?}", $expr),
}
.fail();
};
}
impl Insert {
pub fn table_name(&self) -> &ObjectName {
match &self.inner {
@@ -39,8 +48,9 @@ impl Insert {
}
}
pub fn values_body(&self) -> Result<Option<Vec<Vec<Value>>>> {
let values = match &self.inner {
/// Extracts the literal insert statement body if possible
pub fn values_body(&self) -> Result<Vec<Vec<Value>>> {
match &self.inner {
Statement::Insert {
source:
box Query {
@@ -48,11 +58,41 @@ impl Insert {
..
},
..
} => Some(sql_exprs_to_values(rows)?),
_ => None,
};
} => sql_exprs_to_values(rows),
_ => unreachable!(),
}
}
Ok(values)
/// Returns true when the insert statement can extract literal values.
/// The rules is the same as function `values_body()`.
pub fn can_extract_values(&self) -> bool {
match &self.inner {
Statement::Insert {
source:
box Query {
body: box SetExpr::Values(Values { rows, .. }),
..
},
..
} => rows.iter().all(|es| {
es.iter().all(|expr| match expr {
Expr::Value(_) => true,
Expr::Identifier(ident) => {
if ident.quote_style.is_none() {
ident.value.to_lowercase() == "default"
} else {
ident.quote_style == Some('"')
}
}
Expr::UnaryOp { op, expr } => {
matches!(op, UnaryOperator::Minus | UnaryOperator::Plus)
&& matches!(&**expr, Expr::Value(Value::Number(_, _)))
}
_ => false,
})
}),
_ => false,
}
}
pub fn query_body(&self) -> Result<Option<GtQuery>> {
@@ -63,19 +103,6 @@ impl Insert {
_ => None,
})
}
pub fn is_insert_select(&self) -> bool {
matches!(
self.inner,
Statement::Insert {
source: box Query {
body: box SetExpr::Select { .. },
..
},
..
}
)
}
}
fn sql_exprs_to_values(exprs: &Vec<Vec<Expr>>) -> Result<Vec<Vec<Value>>> {
@@ -87,9 +114,19 @@ fn sql_exprs_to_values(exprs: &Vec<Vec<Expr>>) -> Result<Vec<Vec<Value>>> {
Expr::Value(v) => v.clone(),
Expr::Identifier(ident) => {
if ident.quote_style.is_none() {
Value::Placeholder(ident.value.clone())
// Special processing for `default` value
if ident.value.to_lowercase() == "default" {
Value::Placeholder(ident.value.clone())
} else {
parse_fail!(expr);
}
} else {
Value::SingleQuotedString(ident.value.clone())
// Identifiers with double quotes, we treat them as strings.
if ident.quote_style == Some('"') {
Value::SingleQuotedString(ident.value.clone())
} else {
parse_fail!(expr);
}
}
}
Expr::UnaryOp { op, expr }
@@ -102,17 +139,11 @@ fn sql_exprs_to_values(exprs: &Vec<Vec<Expr>>) -> Result<Vec<Vec<Value>>> {
_ => unreachable!(),
}
} else {
return error::ParseSqlValueSnafu {
msg: format!("{expr:?}"),
}
.fail();
parse_fail!(expr);
}
}
_ => {
return error::ParseSqlValueSnafu {
msg: format!("{expr:?}"),
}
.fail()
parse_fail!(expr);
}
});
}
@@ -150,7 +181,7 @@ mod tests {
.remove(0);
match stmt {
Statement::Insert(insert) => {
let values = insert.values_body().unwrap().unwrap();
let values = insert.values_body().unwrap();
assert_eq!(values, vec![vec![Value::Number("-1".to_string(), false)]]);
}
_ => unreachable!(),
@@ -163,7 +194,7 @@ mod tests {
.remove(0);
match stmt {
Statement::Insert(insert) => {
let values = insert.values_body().unwrap().unwrap();
let values = insert.values_body().unwrap();
assert_eq!(values, vec![vec![Value::Number("1".to_string(), false)]]);
}
_ => unreachable!(),
@@ -179,7 +210,7 @@ mod tests {
.remove(0);
match stmt {
Statement::Insert(insert) => {
let values = insert.values_body().unwrap().unwrap();
let values = insert.values_body().unwrap();
assert_eq!(values, vec![vec![Value::Placeholder("default".to_owned())]]);
}
_ => unreachable!(),
@@ -195,7 +226,7 @@ mod tests {
.remove(0);
match stmt {
Statement::Insert(insert) => {
let values = insert.values_body().unwrap().unwrap();
let values = insert.values_body().unwrap();
assert_eq!(values, vec![vec![Value::Placeholder("DEFAULT".to_owned())]]);
}
_ => unreachable!(),
@@ -204,14 +235,14 @@ mod tests {
#[test]
fn test_insert_value_with_quoted_string() {
// insert "'default'"
// insert 'default'
let sql = "INSERT INTO my_table VALUES('default')";
let stmt = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {})
.unwrap()
.remove(0);
match stmt {
Statement::Insert(insert) => {
let values = insert.values_body().unwrap().unwrap();
let values = insert.values_body().unwrap();
assert_eq!(
values,
vec![vec![Value::SingleQuotedString("default".to_owned())]]
@@ -219,6 +250,33 @@ mod tests {
}
_ => unreachable!(),
}
// insert "default". Treating double-quoted identifiers as strings.
let sql = "INSERT INTO my_table VALUES(\"default\")";
let stmt = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {})
.unwrap()
.remove(0);
match stmt {
Statement::Insert(insert) => {
let values = insert.values_body().unwrap();
assert_eq!(
values,
vec![vec![Value::SingleQuotedString("default".to_owned())]]
);
}
_ => unreachable!(),
}
let sql = "INSERT INTO my_table VALUES(`default`)";
let stmt = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {})
.unwrap()
.remove(0);
match stmt {
Statement::Insert(insert) => {
assert!(insert.values_body().is_err());
}
_ => unreachable!(),
}
}
#[test]
@@ -229,7 +287,6 @@ mod tests {
.remove(0);
match stmt {
Statement::Insert(insert) => {
assert!(insert.is_insert_select());
let q = insert.query_body().unwrap().unwrap();
assert!(matches!(
q.inner,

View File

@@ -63,24 +63,29 @@ pub async fn test_mysql_crud(store_type: StorageType) {
.await
.unwrap();
sqlx::query("create table demo(i bigint, ts timestamp time index, d date, dt datetime)")
.execute(&pool)
.await
.unwrap();
sqlx::query(
"create table demo(i bigint, ts timestamp time index, d date, dt datetime, b blob)",
)
.execute(&pool)
.await
.unwrap();
for i in 0..10 {
let dt = DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp_opt(60, i).unwrap(), Utc);
let d = NaiveDate::from_yo_opt(2015, 100).unwrap();
sqlx::query("insert into demo values(?, ?, ?, ?)")
let hello = format!("hello{i}");
let bytes = hello.as_bytes();
sqlx::query("insert into demo values(?, ?, ?, ?, ?)")
.bind(i)
.bind(i)
.bind(d)
.bind(dt)
.bind(bytes)
.execute(&pool)
.await
.unwrap();
}
let rows = sqlx::query("select i, d, dt from demo")
let rows = sqlx::query("select i, d, dt, b from demo")
.fetch_all(&pool)
.await
.unwrap();
@@ -90,20 +95,19 @@ pub async fn test_mysql_crud(store_type: StorageType) {
let ret: i64 = row.get(0);
let d: NaiveDate = row.get(1);
let dt: DateTime<Utc> = row.get(2);
let bytes: Vec<u8> = row.get(3);
assert_eq!(ret, i as i64);
let expected_d = NaiveDate::from_yo_opt(2015, 100).unwrap();
assert_eq!(expected_d, d);
let expected_dt = DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp_opt(60, i as u32).unwrap(),
Utc,
);
assert_eq!(
format!("{}", expected_dt.format("%Y-%m-%d %H:%M:%S")),
format!("{}", dt.format("%Y-%m-%d %H:%M:%S"))
);
assert_eq!(format!("hello{i}"), String::from_utf8_lossy(&bytes));
}
let rows = sqlx::query("select i from demo where i=?")

View File

@@ -0,0 +1,133 @@
CREATE TABLE blobs (b BYTEA, t timestamp time index);
Affected Rows: 0
INSERT INTO blobs VALUES('\xaa\xff\xaa'::BYTEA, 1), ('\xAA\xFF\xAA\xAA\xFF\xAA'::BYTEA, 2), ('\xAA\xFF\xAA\xAA\xFF\xAA\xAA\xFF\xAA'::BYTEA, 3);
Affected Rows: 3
SELECT * FROM blobs;
+--------------------------------------------------------------------------+-------------------------+
| b | t |
+--------------------------------------------------------------------------+-------------------------+
| 5c7861615c7866665c786161 | 1970-01-01T00:00:00.001 |
| 5c7841415c7846465c7841415c7841415c7846465c784141 | 1970-01-01T00:00:00.002 |
| 5c7841415c7846465c7841415c7841415c7846465c7841415c7841415c7846465c784141 | 1970-01-01T00:00:00.003 |
+--------------------------------------------------------------------------+-------------------------+
DELETE FROM blobs;
Affected Rows: 3
INSERT INTO blobs VALUES('\xaa\xff\xaa'::BYTEA, 1), ('\xaa\xff\xaa\xaa\xff\xaa'::BYTEA, 2), ('\xaa\xff\xaa\xaa\xff\xaa\xaa\xff\xaa'::BYTEA, 3);
Affected Rows: 3
SELECT * FROM blobs;
+--------------------------------------------------------------------------+-------------------------+
| b | t |
+--------------------------------------------------------------------------+-------------------------+
| 5c7861615c7866665c786161 | 1970-01-01T00:00:00.001 |
| 5c7861615c7866665c7861615c7861615c7866665c786161 | 1970-01-01T00:00:00.002 |
| 5c7861615c7866665c7861615c7861615c7866665c7861615c7861615c7866665c786161 | 1970-01-01T00:00:00.003 |
+--------------------------------------------------------------------------+-------------------------+
DELETE FROM blobs;
Affected Rows: 3
INSERT INTO blobs VALUES('\xaa1199'::BYTEA, 1), ('\xaa1199aa1199'::BYTEA, 2), ('\xaa1199aa1199aa1199'::BYTEA, 3);
Affected Rows: 3
SELECT * FROM blobs;
+------------------------------------------+-------------------------+
| b | t |
+------------------------------------------+-------------------------+
| 5c78616131313939 | 1970-01-01T00:00:00.001 |
| 5c78616131313939616131313939 | 1970-01-01T00:00:00.002 |
| 5c78616131313939616131313939616131313939 | 1970-01-01T00:00:00.003 |
+------------------------------------------+-------------------------+
INSERT INTO blobs VALUES('\xGA\xFF\xAA'::BYTEA, 4);
Affected Rows: 1
INSERT INTO blobs VALUES('\xA'::BYTEA, 4);
Affected Rows: 1
INSERT INTO blobs VALUES('\xAA\xA'::BYTEA, 4);
Affected Rows: 1
INSERT INTO blobs VALUES('blablabla\x'::BYTEA, 4);
Affected Rows: 1
SELECT 'abc <20>'::BYTEA;
+----------------+
| Utf8("abc <20>") |
+----------------+
| 61626320efbfbd |
+----------------+
SELECT ''::BYTEA;
+----------+
| Utf8("") |
+----------+
| |
+----------+
SELECT NULL::BYTEA;
+------+
| NULL |
+------+
| |
+------+
CREATE TABLE blob_empty (b BYTEA, t timestamp time index);
Affected Rows: 0
INSERT INTO blob_empty VALUES(''::BYTEA, 1), (''::BYTEA, 2);
Affected Rows: 2
INSERT INTO blob_empty VALUES(NULL, 3), (NULL::BYTEA, 4);
Affected Rows: 2
SELECT * FROM blob_empty;
+---+-------------------------+
| b | t |
+---+-------------------------+
| | 1970-01-01T00:00:00.001 |
| | 1970-01-01T00:00:00.002 |
| | 1970-01-01T00:00:00.003 |
| | 1970-01-01T00:00:00.004 |
+---+-------------------------+
SELECT '\x7F'::BYTEA;
+--------------+
| Utf8("\x7F") |
+--------------+
| 5c783746 |
+--------------+
drop table blobs;
Affected Rows: 1
drop table blob_empty;
Affected Rows: 1

View File

@@ -0,0 +1,50 @@
CREATE TABLE blobs (b BYTEA, t timestamp time index);
--Insert valid hex strings--
INSERT INTO blobs VALUES('\xaa\xff\xaa'::BYTEA, 1), ('\xAA\xFF\xAA\xAA\xFF\xAA'::BYTEA, 2), ('\xAA\xFF\xAA\xAA\xFF\xAA\xAA\xFF\xAA'::BYTEA, 3);
SELECT * FROM blobs;
--Insert valid hex strings, lower case--
DELETE FROM blobs;
INSERT INTO blobs VALUES('\xaa\xff\xaa'::BYTEA, 1), ('\xaa\xff\xaa\xaa\xff\xaa'::BYTEA, 2), ('\xaa\xff\xaa\xaa\xff\xaa\xaa\xff\xaa'::BYTEA, 3);
SELECT * FROM blobs;
--Insert valid hex strings with number and letters--
DELETE FROM blobs;
INSERT INTO blobs VALUES('\xaa1199'::BYTEA, 1), ('\xaa1199aa1199'::BYTEA, 2), ('\xaa1199aa1199aa1199'::BYTEA, 3);
SELECT * FROM blobs;
--Insert invalid hex strings (invalid hex chars: G, H, I)--
INSERT INTO blobs VALUES('\xGA\xFF\xAA'::BYTEA, 4);
--Insert invalid hex strings (odd # of chars)--
INSERT INTO blobs VALUES('\xA'::BYTEA, 4);
INSERT INTO blobs VALUES('\xAA\xA'::BYTEA, 4);
INSERT INTO blobs VALUES('blablabla\x'::BYTEA, 4);
SELECT 'abc <20>'::BYTEA;
SELECT ''::BYTEA;
SELECT NULL::BYTEA;
CREATE TABLE blob_empty (b BYTEA, t timestamp time index);
INSERT INTO blob_empty VALUES(''::BYTEA, 1), (''::BYTEA, 2);
INSERT INTO blob_empty VALUES(NULL, 3), (NULL::BYTEA, 4);
SELECT * FROM blob_empty;
SELECT '\x7F'::BYTEA;
drop table blobs;
drop table blob_empty;