feat: introduce vector type (#4964)

* feat: introduce vector type

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* test: fix prepared stmt

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* test: add grpc test

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* test: parse vector value

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* test: column to row

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* test: sqlness

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: merge issue

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: add check for bytes size

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* Update tests/cases/standalone/common/types/vector/vector.sql

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* chore: update proto

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: simplify cargo

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: address comment

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Zhenchi
2024-11-12 16:28:44 +08:00
committed by GitHub
parent 84aa5b7b22
commit d616bd92ef
32 changed files with 1109 additions and 120 deletions

View File

@@ -562,6 +562,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Convert SQL value error"))]
ConvertSqlValue {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -674,6 +680,8 @@ impl ErrorExt for Error {
ConvertScalarValue { source, .. } => source.status_code(),
ToJson { .. } => StatusCode::Internal,
ConvertSqlValue { source, .. } => source.status_code(),
}
}

View File

@@ -21,6 +21,7 @@ use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::{debug, error};
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::SchemaRef;
use datatypes::types::vector_type_value_to_string;
use futures::StreamExt;
use opensrv_mysql::{
Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
@@ -29,7 +30,7 @@ use session::context::QueryContextRef;
use snafu::prelude::*;
use tokio::io::AsyncWrite;
use crate::error::{self, Error, Result};
use crate::error::{self, ConvertSqlValueSnafu, Error, Result};
use crate::metrics::*;
/// Try to write multiple output to the writer if possible.
@@ -168,7 +169,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
&mut row_writer,
&record_batch,
query_context.clone(),
&column_def,
&query_result.schema,
)
.await?
}
@@ -192,10 +193,10 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
row_writer: &mut RowWriter<'_, W>,
recordbatch: &RecordBatch,
query_context: QueryContextRef,
column_def: &[Column],
schema: &SchemaRef,
) -> Result<()> {
for row in recordbatch.rows() {
for (value, column) in row.into_iter().zip(column_def.iter()) {
for (value, column) in row.into_iter().zip(schema.column_schemas().iter()) {
match value {
Value::Null => row_writer.write_col(None::<u8>)?,
Value::Boolean(v) => row_writer.write_col(v as i8)?,
@@ -210,10 +211,15 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
Value::Float32(v) => row_writer.write_col(v.0)?,
Value::Float64(v) => row_writer.write_col(v.0)?,
Value::String(v) => row_writer.write_col(v.as_utf8())?,
Value::Binary(v) => match column.coltype {
ColumnType::MYSQL_TYPE_JSON => {
Value::Binary(v) => match column.data_type {
ConcreteDataType::Json(_) => {
row_writer.write_col(jsonb::to_string(&v))?;
}
ConcreteDataType::Vector(d) => {
let s = vector_type_value_to_string(&v, d.dim)
.context(ConvertSqlValueSnafu)?;
row_writer.write_col(s)?;
}
_ => {
row_writer.write_col(v.deref())?;
}
@@ -295,6 +301,7 @@ pub(crate) fn create_mysql_column(
ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL),
ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
ConcreteDataType::Vector(_) => Ok(ColumnType::MYSQL_TYPE_STRING),
_ => error::UnsupportedDataTypeSnafu {
data_type,
reason: "not implemented",

View File

@@ -27,7 +27,7 @@ use datafusion_expr::LogicalPlan;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::Schema;
use datatypes::types::{IntervalType, TimestampType};
use datatypes::types::{vector_type_value_to_string, IntervalType, TimestampType};
use datatypes::value::ListValue;
use pgwire::api::portal::{Format, Portal};
use pgwire::api::results::{DataRowEncoder, FieldInfo};
@@ -364,6 +364,24 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<String>>>>()?;
builder.encode_field(&array)
}
&ConcreteDataType::Vector(d) => {
let array = value_list
.items()
.iter()
.map(|v| match v {
Value::Null => Ok(None),
Value::Binary(v) => {
let s = vector_type_value_to_string(v, d.dim)
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
Ok(Some(s))
}
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Invalid list item type, find {v:?}, expected vector",),
}))),
})
.collect::<PgWireResult<Vec<Option<String>>>>()?;
builder.encode_field(&array)
}
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!(
"cannot write array type {:?} in postgres protocol: unimplemented",
@@ -395,6 +413,11 @@ pub(super) fn encode_value(
Value::String(v) => builder.encode_field(&v.as_utf8()),
Value::Binary(v) => match datatype {
ConcreteDataType::Json(_) => builder.encode_field(&jsonb::to_string(v)),
ConcreteDataType::Vector(d) => {
let s = vector_type_value_to_string(v, d.dim)
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
builder.encode_field(&s)
}
_ => {
let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
match *bytea_output {
@@ -499,6 +522,7 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
&ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
&ConcreteDataType::Duration(_)
| &ConcreteDataType::Dictionary(_)
| &ConcreteDataType::Vector(_)
| &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
data_type: origin,
reason: "not implemented",
@@ -512,6 +536,7 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
}
.fail()
}
&ConcreteDataType::Vector(_) => Ok(Type::FLOAT4_ARRAY),
}
}