mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-30 11:50:38 +00:00
feat: Region supports write requests with old schema (#297)
* feat: Adds ColumnDefaultConstraint::create_default_vector ColumnDefaultConstraint::create_default_vector is ported from MitoTable::try_get_column_default_constraint_vector. * refactor: Replace try_get_column_default_constraint_vector by create_default_vector * style: Remove unnecessary map_err in MitoTable::insert * feat: Adds compat_write For column in `dest_schema` but not in `write_batch`, this method would insert a vector with default value to the `write_batch`. If there are columns not in `dest_schema`, an error would be returned. * chore: Add info log to RegionInner::alter * feat(storage): RegionImpl::write support request with old version * feat: Add nullable check when creating default value * feat: Validate nullable and default value * chore: Modify PutOperation comments * chore: Make ColumnDescriptor::is_nullable readonly and validate name * feat: Use CompatWrite trait to replace campat::compat_write method Adds a CompactWrite trait to support padding columns to WriteBatch: - The WriteBatch and PutData implements this trait - Fix the issue that WriteBatch::schema is not updated to the schema after compat - Also validate the created column when adding to PutData The WriteBatch would also pad default value to missing columns in PutData, so the memtable inserter don't need to manually check whether the column is nullable and then insert a NullVector. All WriteBatch is ensured to have all columns defined by the schema in its PutData. * feat: Validate constraint by ColumnDefaultConstraint::validate() The ColumnDefaultConstraint::validate() would also ensure the default value has the same data type as the column's. * feat: Use NullVector for null columns * fix: Fix BinaryType returns wrong logical_type_id * fix: Fix tests and revert NullVector for null columns NullVector doesn't support custom logical type make it hard to encode/decode, which also cause the arrow/protobuf codec of write batch fail. * fix: create_default_vector use replicate to create vector with default value This would fix the test_codec_with_none_column_protobuf test, as we need to downcast the vector to construct the protobuf values. * test: add tests for column default constraints * test: Add tests for CompatWrite trait impl * test: Test write region with old schema * fix(storage): Fix replay() applies metadata too early The committed sequence of the RegionChange action is the sequence of the last entry that use the old metadata (schema). During replay, we should apply the new metadata after we see an entry that has sequence greater than (not equals to) the `RegionChange::committed_sequence` Also remove duplicate `set_committed_sequence()` call in persist_manifest_version() * chore: Removes some unreachable codes Also add more comments to document codes in these files * refactor: Refactor MitoTable::insert Return error if we could not create a default vector for given column, instead of ignoring the error * chore: Fix incorrect comments * chore: Fix typo in error message
This commit is contained in:
@@ -192,7 +192,7 @@ pub enum Error {
|
||||
source: api::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Column default constraint error, source: {}", source))]
|
||||
#[snafu(display("Invalid column default constraint, source: {}", source))]
|
||||
ColumnDefaultConstraint {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
|
||||
@@ -147,17 +147,19 @@ fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
|
||||
fn create_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
|
||||
let data_type =
|
||||
ColumnDataTypeWrapper::try_new(column_def.datatype).context(error::ColumnDataTypeSnafu)?;
|
||||
Ok(ColumnSchema {
|
||||
name: column_def.name.clone(),
|
||||
data_type: data_type.into(),
|
||||
is_nullable: column_def.is_nullable,
|
||||
default_constraint: match &column_def.default_constraint {
|
||||
None => None,
|
||||
Some(v) => Some(
|
||||
ColumnDefaultConstraint::try_from(&v[..]).context(ColumnDefaultConstraintSnafu)?,
|
||||
),
|
||||
},
|
||||
})
|
||||
let default_constraint = match &column_def.default_constraint {
|
||||
None => None,
|
||||
Some(v) => {
|
||||
Some(ColumnDefaultConstraint::try_from(&v[..]).context(ColumnDefaultConstraintSnafu)?)
|
||||
}
|
||||
};
|
||||
ColumnSchema::new(
|
||||
column_def.name.clone(),
|
||||
data_type.into(),
|
||||
column_def.is_nullable,
|
||||
)
|
||||
.with_default_constraint(default_constraint)
|
||||
.context(ColumnDefaultConstraintSnafu)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -237,7 +239,7 @@ mod tests {
|
||||
let column_schema = create_column_schema(&column_def).unwrap();
|
||||
assert_eq!(column_schema.name, "a");
|
||||
assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype());
|
||||
assert!(column_schema.is_nullable);
|
||||
assert!(column_schema.is_nullable());
|
||||
|
||||
let default_constraint = ColumnDefaultConstraint::Value(Value::from("defaut value"));
|
||||
let column_def = ColumnDef {
|
||||
@@ -249,10 +251,10 @@ mod tests {
|
||||
let column_schema = create_column_schema(&column_def).unwrap();
|
||||
assert_eq!(column_schema.name, "a");
|
||||
assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype());
|
||||
assert!(column_schema.is_nullable);
|
||||
assert!(column_schema.is_nullable());
|
||||
assert_eq!(
|
||||
default_constraint,
|
||||
column_schema.default_constraint.unwrap()
|
||||
*column_schema.default_constraint().unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -298,30 +300,10 @@ mod tests {
|
||||
|
||||
fn expected_table_schema() -> SchemaRef {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema {
|
||||
name: "host".to_string(),
|
||||
data_type: ConcreteDataType::string_datatype(),
|
||||
is_nullable: false,
|
||||
default_constraint: None,
|
||||
},
|
||||
ColumnSchema {
|
||||
name: "ts".to_string(),
|
||||
data_type: ConcreteDataType::timestamp_millis_datatype(),
|
||||
is_nullable: false,
|
||||
default_constraint: None,
|
||||
},
|
||||
ColumnSchema {
|
||||
name: "cpu".to_string(),
|
||||
data_type: ConcreteDataType::float32_datatype(),
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
},
|
||||
ColumnSchema {
|
||||
name: "memory".to_string(),
|
||||
data_type: ConcreteDataType::float64_datatype(),
|
||||
is_nullable: true,
|
||||
default_constraint: None,
|
||||
},
|
||||
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false),
|
||||
ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true),
|
||||
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
|
||||
];
|
||||
Arc::new(
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
|
||||
@@ -119,8 +119,7 @@ pub fn build_create_table_request(
|
||||
} in columns
|
||||
{
|
||||
if !new_columns.contains(column_name) {
|
||||
let mut column_schema = build_column_schema(column_name, *datatype, true)?;
|
||||
|
||||
let mut is_nullable = true;
|
||||
match *semantic_type {
|
||||
TAG_SEMANTIC_TYPE => primary_key_indices.push(column_schemas.len()),
|
||||
TIMESTAMP_SEMANTIC_TYPE => {
|
||||
@@ -133,11 +132,12 @@ pub fn build_create_table_request(
|
||||
);
|
||||
timestamp_index = column_schemas.len();
|
||||
// Timestamp column must not be null.
|
||||
column_schema.is_nullable = false;
|
||||
is_nullable = false;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let column_schema = build_column_schema(column_name, *datatype, is_nullable)?;
|
||||
column_schemas.push(column_schema);
|
||||
new_columns.insert(column_name.to_string());
|
||||
}
|
||||
|
||||
@@ -91,7 +91,7 @@ mod tests {
|
||||
let new_column = &columns[0].column_schema;
|
||||
|
||||
assert_eq!(new_column.name, "tagk_i");
|
||||
assert!(new_column.is_nullable);
|
||||
assert!(new_column.is_nullable());
|
||||
assert_eq!(new_column.data_type, ConcreteDataType::string_datatype());
|
||||
}
|
||||
_ => unreachable!(),
|
||||
|
||||
@@ -115,6 +115,10 @@ impl ConcreteDataType {
|
||||
pub fn from_arrow_type(dt: &ArrowDataType) -> Self {
|
||||
ConcreteDataType::try_from(dt).expect("Unimplemented type")
|
||||
}
|
||||
|
||||
pub fn is_null(&self) -> bool {
|
||||
matches!(self, ConcreteDataType::Null(NullType))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&ArrowDataType> for ConcreteDataType {
|
||||
@@ -322,7 +326,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_from_arrow_timestamp() {
|
||||
fn test_from_arrow_timestamp() {
|
||||
assert_eq!(
|
||||
ConcreteDataType::timestamp_millis_datatype(),
|
||||
ConcreteDataType::from_arrow_time_unit(&arrow::datatypes::TimeUnit::Millisecond)
|
||||
@@ -342,7 +346,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_is_timestamp() {
|
||||
fn test_is_timestamp() {
|
||||
assert!(ConcreteDataType::timestamp_millis_datatype().is_timestamp());
|
||||
assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Second).is_timestamp());
|
||||
assert!(ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond).is_timestamp());
|
||||
@@ -353,4 +357,10 @@ mod tests {
|
||||
// to be used a data type for timestamp column
|
||||
assert!(!ConcreteDataType::int64_datatype().is_timestamp());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_is_null() {
|
||||
assert!(ConcreteDataType::null_datatype().is_null());
|
||||
assert!(!ConcreteDataType::int32_datatype().is_null());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,6 +63,18 @@ pub enum Error {
|
||||
source: arrow::error::ArrowError,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Unsupported column default constraint expression: {}", expr))]
|
||||
UnsupportedDefaultExpr { expr: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Default value should not be null for non null column"))]
|
||||
NullDefault { backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Incompatible default value type, reason: {}", reason))]
|
||||
DefaultValueType {
|
||||
reason: String,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
mod constraint;
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -8,35 +10,8 @@ use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
use crate::data_type::{ConcreteDataType, DataType};
|
||||
use crate::error::{self, DeserializeSnafu, Error, Result, SerializeSnafu};
|
||||
use crate::value::Value;
|
||||
|
||||
/// Column's default constraint.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum ColumnDefaultConstraint {
|
||||
// A function invocation
|
||||
// TODO(dennis): we save the function expression here, maybe use a struct in future.
|
||||
Function(String),
|
||||
// A value
|
||||
Value(Value),
|
||||
}
|
||||
|
||||
impl TryFrom<&[u8]> for ColumnDefaultConstraint {
|
||||
type Error = error::Error;
|
||||
|
||||
fn try_from(bytes: &[u8]) -> Result<Self> {
|
||||
let json = String::from_utf8_lossy(bytes);
|
||||
serde_json::from_str(&json).context(DeserializeSnafu { json })
|
||||
}
|
||||
}
|
||||
|
||||
impl TryInto<Vec<u8>> for ColumnDefaultConstraint {
|
||||
type Error = error::Error;
|
||||
|
||||
fn try_into(self) -> Result<Vec<u8>> {
|
||||
let s = serde_json::to_string(&self).context(SerializeSnafu)?;
|
||||
Ok(s.into_bytes())
|
||||
}
|
||||
}
|
||||
pub use crate::schema::constraint::ColumnDefaultConstraint;
|
||||
use crate::vectors::VectorRef;
|
||||
|
||||
/// Key used to store column name of the timestamp column in metadata.
|
||||
///
|
||||
@@ -54,8 +29,8 @@ const ARROW_FIELD_DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint";
|
||||
pub struct ColumnSchema {
|
||||
pub name: String,
|
||||
pub data_type: ConcreteDataType,
|
||||
pub is_nullable: bool,
|
||||
pub default_constraint: Option<ColumnDefaultConstraint>,
|
||||
is_nullable: bool,
|
||||
default_constraint: Option<ColumnDefaultConstraint>,
|
||||
}
|
||||
|
||||
impl ColumnSchema {
|
||||
@@ -72,12 +47,45 @@ impl ColumnSchema {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn is_nullable(&self) -> bool {
|
||||
self.is_nullable
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn default_constraint(&self) -> Option<&ColumnDefaultConstraint> {
|
||||
self.default_constraint.as_ref()
|
||||
}
|
||||
|
||||
pub fn with_default_constraint(
|
||||
mut self,
|
||||
default_constraint: Option<ColumnDefaultConstraint>,
|
||||
) -> Self {
|
||||
) -> Result<Self> {
|
||||
if let Some(constraint) = &default_constraint {
|
||||
constraint.validate(&self.data_type, self.is_nullable)?;
|
||||
}
|
||||
|
||||
self.default_constraint = default_constraint;
|
||||
self
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
pub fn create_default_vector(&self, num_rows: usize) -> Result<Option<VectorRef>> {
|
||||
match &self.default_constraint {
|
||||
Some(c) => c
|
||||
.create_default_vector(&self.data_type, self.is_nullable, num_rows)
|
||||
.map(Some),
|
||||
None => {
|
||||
if self.is_nullable {
|
||||
// No default constraint, use null as default value.
|
||||
// TODO(yingwen): Use NullVector once it supports setting logical type.
|
||||
ColumnDefaultConstraint::null_value()
|
||||
.create_default_vector(&self.data_type, self.is_nullable, num_rows)
|
||||
.map(Some)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -327,7 +335,7 @@ impl TryFrom<&ColumnSchema> for Field {
|
||||
Ok(Field::new(
|
||||
column_schema.name.clone(),
|
||||
column_schema.data_type.as_arrow_type(),
|
||||
column_schema.is_nullable,
|
||||
column_schema.is_nullable(),
|
||||
)
|
||||
.with_metadata(metadata))
|
||||
}
|
||||
@@ -393,6 +401,7 @@ mod tests {
|
||||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
|
||||
use super::*;
|
||||
use crate::value::Value;
|
||||
|
||||
#[test]
|
||||
fn test_column_schema() {
|
||||
@@ -409,7 +418,8 @@ mod tests {
|
||||
#[test]
|
||||
fn test_column_schema_with_default_constraint() {
|
||||
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
|
||||
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(99))));
|
||||
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(99))))
|
||||
.unwrap();
|
||||
let field = Field::try_from(&column_schema).unwrap();
|
||||
assert_eq!("test", field.name);
|
||||
assert_eq!(ArrowDataType::Int32, field.data_type);
|
||||
@@ -426,6 +436,13 @@ mod tests {
|
||||
assert_eq!(column_schema, new_column_schema);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_column_schema_invalid_default_constraint() {
|
||||
ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false)
|
||||
.with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_column_default_constraint_try_into_from() {
|
||||
let default_constraint = ColumnDefaultConstraint::Value(Value::from(42i64));
|
||||
@@ -436,6 +453,29 @@ mod tests {
|
||||
assert_eq!(default_constraint, from_value);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_column_schema_create_default_null() {
|
||||
// Implicit default null.
|
||||
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true);
|
||||
let v = column_schema.create_default_vector(5).unwrap().unwrap();
|
||||
assert_eq!(5, v.len());
|
||||
assert!(v.only_null());
|
||||
|
||||
// Explicit default null.
|
||||
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
|
||||
.with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
|
||||
.unwrap();
|
||||
let v = column_schema.create_default_vector(5).unwrap().unwrap();
|
||||
assert_eq!(5, v.len());
|
||||
assert!(v.only_null());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_column_schema_no_default() {
|
||||
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false);
|
||||
assert!(column_schema.create_default_vector(5).unwrap().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_empty_schema() {
|
||||
let schema = SchemaBuilder::default().build().unwrap();
|
||||
|
||||
244
src/datatypes/src/schema/constraint.rs
Normal file
244
src/datatypes/src/schema/constraint.rs
Normal file
@@ -0,0 +1,244 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_time::{util, Timestamp};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::data_type::{ConcreteDataType, DataType};
|
||||
use crate::error::{self, Result};
|
||||
use crate::scalars::ScalarVector;
|
||||
use crate::value::Value;
|
||||
use crate::vectors::{ConstantVector, TimestampVector, VectorRef};
|
||||
|
||||
const CURRENT_TIMESTAMP: &str = "current_timestamp()";
|
||||
|
||||
/// Column's default constraint.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum ColumnDefaultConstraint {
|
||||
// A function invocation
|
||||
// TODO(dennis): we save the function expression here, maybe use a struct in future.
|
||||
Function(String),
|
||||
// A value
|
||||
Value(Value),
|
||||
}
|
||||
|
||||
impl TryFrom<&[u8]> for ColumnDefaultConstraint {
|
||||
type Error = error::Error;
|
||||
|
||||
fn try_from(bytes: &[u8]) -> Result<Self> {
|
||||
let json = String::from_utf8_lossy(bytes);
|
||||
serde_json::from_str(&json).context(error::DeserializeSnafu { json })
|
||||
}
|
||||
}
|
||||
|
||||
impl TryInto<Vec<u8>> for ColumnDefaultConstraint {
|
||||
type Error = error::Error;
|
||||
|
||||
fn try_into(self) -> Result<Vec<u8>> {
|
||||
let s = serde_json::to_string(&self).context(error::SerializeSnafu)?;
|
||||
Ok(s.into_bytes())
|
||||
}
|
||||
}
|
||||
|
||||
impl ColumnDefaultConstraint {
|
||||
/// Returns a default null constraint.
|
||||
pub fn null_value() -> ColumnDefaultConstraint {
|
||||
ColumnDefaultConstraint::Value(Value::Null)
|
||||
}
|
||||
|
||||
/// Check whether the constraint is valid for columns with given `data_type`
|
||||
/// and `is_nullable` attributes.
|
||||
pub fn validate(&self, data_type: &ConcreteDataType, is_nullable: bool) -> Result<()> {
|
||||
ensure!(is_nullable || !self.maybe_null(), error::NullDefaultSnafu);
|
||||
|
||||
match self {
|
||||
ColumnDefaultConstraint::Function(expr) => {
|
||||
ensure!(
|
||||
expr == CURRENT_TIMESTAMP,
|
||||
error::UnsupportedDefaultExprSnafu { expr }
|
||||
);
|
||||
ensure!(
|
||||
data_type.is_timestamp(),
|
||||
error::DefaultValueTypeSnafu {
|
||||
reason: "return value of the function must has timestamp type",
|
||||
}
|
||||
);
|
||||
}
|
||||
ColumnDefaultConstraint::Value(v) => {
|
||||
if !v.is_null() {
|
||||
// Whether the value could be nullable has been checked before, only need
|
||||
// to check the type compatibility here.
|
||||
ensure!(
|
||||
data_type.logical_type_id() == v.logical_type_id(),
|
||||
error::DefaultValueTypeSnafu {
|
||||
reason: format!(
|
||||
"column has type {:?} but default value has type {:?}",
|
||||
data_type.logical_type_id(),
|
||||
v.logical_type_id()
|
||||
),
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a vector that contains `num_rows` default values for given `data_type`.
|
||||
///
|
||||
/// If `is_nullable` is `true`, then this method would returns error if the created
|
||||
/// default value is null.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if `num_rows == 0`.
|
||||
pub fn create_default_vector(
|
||||
&self,
|
||||
data_type: &ConcreteDataType,
|
||||
is_nullable: bool,
|
||||
num_rows: usize,
|
||||
) -> Result<VectorRef> {
|
||||
assert!(num_rows > 0);
|
||||
|
||||
match self {
|
||||
ColumnDefaultConstraint::Function(expr) => {
|
||||
// Functions should also ensure its return value is not null when
|
||||
// is_nullable is true.
|
||||
match &expr[..] {
|
||||
// TODO(dennis): we only supports current_timestamp right now,
|
||||
// it's better to use a expression framework in future.
|
||||
CURRENT_TIMESTAMP => {
|
||||
// TODO(yingwen): We should coerce the type to the physical type of
|
||||
// input `data_type`.
|
||||
let vector =
|
||||
Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(
|
||||
util::current_time_millis(),
|
||||
)]));
|
||||
Ok(Arc::new(ConstantVector::new(vector, num_rows)))
|
||||
}
|
||||
_ => error::UnsupportedDefaultExprSnafu { expr }.fail(),
|
||||
}
|
||||
}
|
||||
ColumnDefaultConstraint::Value(v) => {
|
||||
ensure!(is_nullable || !v.is_null(), error::NullDefaultSnafu);
|
||||
|
||||
// TODO(yingwen):
|
||||
// 1. For null value, we could use NullVector once it supports custom logical type.
|
||||
// 2. For non null value, we could use ConstantVector, but it would cause all codes
|
||||
// attempt to downcast the vector fail if they don't check whether the vector is const
|
||||
// first.
|
||||
let mut mutable_vector = data_type.create_mutable_vector(1);
|
||||
mutable_vector.push_value_ref(v.as_value_ref())?;
|
||||
let base_vector = mutable_vector.to_vector();
|
||||
Ok(base_vector.replicate(&[num_rows]))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if this constraint might creates NULL.
|
||||
fn maybe_null(&self) -> bool {
|
||||
// Once we support more functions, we may return true if given function
|
||||
// could return null.
|
||||
matches!(self, ColumnDefaultConstraint::Value(Value::Null))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::vectors::Int32Vector;
|
||||
|
||||
#[test]
|
||||
fn test_null_default_constraint() {
|
||||
let constraint = ColumnDefaultConstraint::null_value();
|
||||
assert!(constraint.maybe_null());
|
||||
let constraint = ColumnDefaultConstraint::Value(Value::Int32(10));
|
||||
assert!(!constraint.maybe_null());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_null_constraint() {
|
||||
let constraint = ColumnDefaultConstraint::null_value();
|
||||
let data_type = ConcreteDataType::int32_datatype();
|
||||
constraint.validate(&data_type, false).unwrap_err();
|
||||
constraint.validate(&data_type, true).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_value_constraint() {
|
||||
let constraint = ColumnDefaultConstraint::Value(Value::Int32(10));
|
||||
let data_type = ConcreteDataType::int32_datatype();
|
||||
constraint.validate(&data_type, false).unwrap();
|
||||
constraint.validate(&data_type, true).unwrap();
|
||||
|
||||
constraint
|
||||
.validate(&ConcreteDataType::uint32_datatype(), true)
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_function_constraint() {
|
||||
let constraint = ColumnDefaultConstraint::Function(CURRENT_TIMESTAMP.to_string());
|
||||
constraint
|
||||
.validate(&ConcreteDataType::timestamp_millis_datatype(), false)
|
||||
.unwrap();
|
||||
constraint
|
||||
.validate(&ConcreteDataType::boolean_datatype(), false)
|
||||
.unwrap_err();
|
||||
|
||||
let constraint = ColumnDefaultConstraint::Function("hello()".to_string());
|
||||
constraint
|
||||
.validate(&ConcreteDataType::timestamp_millis_datatype(), false)
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_default_vector_by_null() {
|
||||
let constraint = ColumnDefaultConstraint::null_value();
|
||||
let data_type = ConcreteDataType::int32_datatype();
|
||||
constraint
|
||||
.create_default_vector(&data_type, false, 10)
|
||||
.unwrap_err();
|
||||
|
||||
let constraint = ColumnDefaultConstraint::null_value();
|
||||
let v = constraint
|
||||
.create_default_vector(&data_type, true, 3)
|
||||
.unwrap();
|
||||
assert_eq!(3, v.len());
|
||||
for i in 0..v.len() {
|
||||
assert_eq!(Value::Null, v.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_default_vector_by_value() {
|
||||
let constraint = ColumnDefaultConstraint::Value(Value::Int32(10));
|
||||
let data_type = ConcreteDataType::int32_datatype();
|
||||
let v = constraint
|
||||
.create_default_vector(&data_type, false, 4)
|
||||
.unwrap();
|
||||
let expect: VectorRef = Arc::new(Int32Vector::from_values(vec![10; 4]));
|
||||
assert_eq!(expect, v);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_default_vector_by_func() {
|
||||
let constraint = ColumnDefaultConstraint::Function(CURRENT_TIMESTAMP.to_string());
|
||||
let data_type = ConcreteDataType::timestamp_millis_datatype();
|
||||
let v = constraint
|
||||
.create_default_vector(&data_type, false, 4)
|
||||
.unwrap();
|
||||
assert_eq!(4, v.len());
|
||||
assert!(
|
||||
matches!(v.get(0), Value::Timestamp(_)),
|
||||
"v {:?} is not timestamp",
|
||||
v.get(0)
|
||||
);
|
||||
|
||||
let constraint = ColumnDefaultConstraint::Function("no".to_string());
|
||||
let data_type = ConcreteDataType::timestamp_millis_datatype();
|
||||
constraint
|
||||
.create_default_vector(&data_type, false, 4)
|
||||
.unwrap_err();
|
||||
}
|
||||
}
|
||||
@@ -25,7 +25,7 @@ impl DataType for BinaryType {
|
||||
}
|
||||
|
||||
fn logical_type_id(&self) -> LogicalTypeId {
|
||||
LogicalTypeId::String
|
||||
LogicalTypeId::Binary
|
||||
}
|
||||
|
||||
fn default_value(&self) -> Value {
|
||||
|
||||
@@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::prelude::*;
|
||||
use crate::type_id::LogicalTypeId;
|
||||
use crate::vectors::ListVector;
|
||||
|
||||
pub type OrderedF32 = OrderedFloat<f32>;
|
||||
@@ -69,7 +70,7 @@ impl Value {
|
||||
Value::Binary(_) => ConcreteDataType::binary_datatype(),
|
||||
Value::List(list) => ConcreteDataType::list_datatype(list.datatype().clone()),
|
||||
Value::Date(_) => ConcreteDataType::date_datatype(),
|
||||
Value::DateTime(_) => ConcreteDataType::date_datatype(),
|
||||
Value::DateTime(_) => ConcreteDataType::datetime_datatype(),
|
||||
Value::Timestamp(v) => ConcreteDataType::timestamp_datatype(v.unit()),
|
||||
}
|
||||
}
|
||||
@@ -114,6 +115,30 @@ impl Value {
|
||||
Value::Timestamp(v) => ValueRef::Timestamp(*v),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the logical type of the value.
|
||||
pub fn logical_type_id(&self) -> LogicalTypeId {
|
||||
match self {
|
||||
Value::Null => LogicalTypeId::Null,
|
||||
Value::Boolean(_) => LogicalTypeId::Boolean,
|
||||
Value::UInt8(_) => LogicalTypeId::UInt8,
|
||||
Value::UInt16(_) => LogicalTypeId::UInt16,
|
||||
Value::UInt32(_) => LogicalTypeId::UInt32,
|
||||
Value::UInt64(_) => LogicalTypeId::UInt64,
|
||||
Value::Int8(_) => LogicalTypeId::Int8,
|
||||
Value::Int16(_) => LogicalTypeId::Int16,
|
||||
Value::Int32(_) => LogicalTypeId::Int32,
|
||||
Value::Int64(_) => LogicalTypeId::Int64,
|
||||
Value::Float32(_) => LogicalTypeId::Float32,
|
||||
Value::Float64(_) => LogicalTypeId::Float64,
|
||||
Value::String(_) => LogicalTypeId::String,
|
||||
Value::Binary(_) => LogicalTypeId::Binary,
|
||||
Value::List(_) => LogicalTypeId::List,
|
||||
Value::Date(_) => LogicalTypeId::Date,
|
||||
Value::DateTime(_) => LogicalTypeId::DateTime,
|
||||
Value::Timestamp(_) => LogicalTypeId::Timestamp,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! impl_ord_for_value_like {
|
||||
@@ -582,68 +607,69 @@ mod tests {
|
||||
assert_eq!(Value::Binary(bytes.clone()), Value::from(bytes));
|
||||
}
|
||||
|
||||
fn check_type_and_value(data_type: &ConcreteDataType, value: &Value) {
|
||||
assert_eq!(*data_type, value.data_type());
|
||||
assert_eq!(data_type.logical_type_id(), value.logical_type_id());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_value_datatype() {
|
||||
assert_eq!(
|
||||
ConcreteDataType::boolean_datatype(),
|
||||
Value::Boolean(true).data_type()
|
||||
check_type_and_value(&ConcreteDataType::boolean_datatype(), &Value::Boolean(true));
|
||||
check_type_and_value(&ConcreteDataType::uint8_datatype(), &Value::UInt8(u8::MIN));
|
||||
check_type_and_value(
|
||||
&ConcreteDataType::uint16_datatype(),
|
||||
&Value::UInt16(u16::MIN),
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::uint8_datatype(),
|
||||
Value::UInt8(u8::MIN).data_type()
|
||||
check_type_and_value(
|
||||
&ConcreteDataType::uint16_datatype(),
|
||||
&Value::UInt16(u16::MAX),
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::uint16_datatype(),
|
||||
Value::UInt16(u16::MIN).data_type()
|
||||
check_type_and_value(
|
||||
&ConcreteDataType::uint32_datatype(),
|
||||
&Value::UInt32(u32::MIN),
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::uint16_datatype(),
|
||||
Value::UInt16(u16::MAX).data_type()
|
||||
check_type_and_value(
|
||||
&ConcreteDataType::uint64_datatype(),
|
||||
&Value::UInt64(u64::MIN),
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
Value::UInt32(u32::MIN).data_type()
|
||||
check_type_and_value(&ConcreteDataType::int8_datatype(), &Value::Int8(i8::MIN));
|
||||
check_type_and_value(&ConcreteDataType::int16_datatype(), &Value::Int16(i16::MIN));
|
||||
check_type_and_value(&ConcreteDataType::int32_datatype(), &Value::Int32(i32::MIN));
|
||||
check_type_and_value(&ConcreteDataType::int64_datatype(), &Value::Int64(i64::MIN));
|
||||
check_type_and_value(
|
||||
&ConcreteDataType::float32_datatype(),
|
||||
&Value::Float32(OrderedFloat(f32::MIN)),
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
Value::UInt64(u64::MIN).data_type()
|
||||
check_type_and_value(
|
||||
&ConcreteDataType::float64_datatype(),
|
||||
&Value::Float64(OrderedFloat(f64::MIN)),
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::int8_datatype(),
|
||||
Value::Int8(i8::MIN).data_type()
|
||||
check_type_and_value(
|
||||
&ConcreteDataType::string_datatype(),
|
||||
&Value::String(StringBytes::from("hello")),
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::int16_datatype(),
|
||||
Value::Int16(i16::MIN).data_type()
|
||||
check_type_and_value(
|
||||
&ConcreteDataType::binary_datatype(),
|
||||
&Value::Binary(Bytes::from(b"world".as_slice())),
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::int32_datatype(),
|
||||
Value::Int32(i32::MIN).data_type()
|
||||
check_type_and_value(
|
||||
&ConcreteDataType::list_datatype(ConcreteDataType::int32_datatype()),
|
||||
&Value::List(ListValue::new(
|
||||
Some(Box::new(vec![Value::Int32(10)])),
|
||||
ConcreteDataType::int32_datatype(),
|
||||
)),
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::int64_datatype(),
|
||||
Value::Int64(i64::MIN).data_type()
|
||||
check_type_and_value(
|
||||
&ConcreteDataType::date_datatype(),
|
||||
&Value::Date(Date::new(1)),
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::float32_datatype(),
|
||||
Value::Float32(OrderedFloat(f32::MIN)).data_type(),
|
||||
check_type_and_value(
|
||||
&ConcreteDataType::datetime_datatype(),
|
||||
&Value::DateTime(DateTime::new(1)),
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::float64_datatype(),
|
||||
Value::Float64(OrderedFloat(f64::MIN)).data_type(),
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::string_datatype(),
|
||||
Value::String(StringBytes::from("hello")).data_type(),
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::binary_datatype(),
|
||||
Value::Binary(Bytes::from(b"world".as_slice())).data_type()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
ConcreteDataType::timestamp_millis_datatype(),
|
||||
Value::Timestamp(Timestamp::from_millis(1)).data_type()
|
||||
check_type_and_value(
|
||||
&ConcreteDataType::timestamp_millis_datatype(),
|
||||
&Value::Timestamp(Timestamp::from_millis(1)),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,8 @@ use crate::vectors::{Vector, VectorRef};
|
||||
/// Vector compute operations.
|
||||
pub trait VectorOp {
|
||||
/// Copies each element according `offsets` parameter.
|
||||
/// (`i-th` element should be copied `offsets[i] - offsets[i - 1]` times.)
|
||||
/// - `i-th` element should be copied `offsets[i] - offsets[i - 1]` times
|
||||
/// - `0-th` element would be copied `offsets[0]` times
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if `offsets.len() != self.len()`.
|
||||
|
||||
@@ -216,8 +216,8 @@ fn columns_to_expr(column_defs: &[ColumnDef]) -> Result<Vec<GrpcColumnDef>> {
|
||||
Ok(GrpcColumnDef {
|
||||
name: schema.name.clone(),
|
||||
datatype: datatype as i32,
|
||||
is_nullable: schema.is_nullable,
|
||||
default_constraint: match &schema.default_constraint {
|
||||
is_nullable: schema.is_nullable(),
|
||||
default_constraint: match schema.default_constraint() {
|
||||
None => None,
|
||||
Some(v) => Some(v.clone().try_into().context(
|
||||
ConvertColumnDefaultConstraintSnafu {
|
||||
|
||||
@@ -82,6 +82,13 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Invalid database name: {}", name))]
|
||||
InvalidDatabaseName { name: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Invalid default constraint, column: {}, source: {}", column, source))]
|
||||
InvalidDefault {
|
||||
column: String,
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
@@ -96,7 +103,8 @@ impl ErrorExt for Error {
|
||||
| Tokenizer { .. }
|
||||
| InvalidSql { .. }
|
||||
| ParseSqlValue { .. }
|
||||
| SqlTypeNotSupported { .. } => StatusCode::InvalidSyntax,
|
||||
| SqlTypeNotSupported { .. }
|
||||
| InvalidDefault { .. } => StatusCode::InvalidSyntax,
|
||||
|
||||
InvalidDatabaseName { .. } | ColumnTypeMismatch { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
|
||||
use datatypes::types::DateTimeType;
|
||||
use datatypes::value::Value;
|
||||
use snafu::ensure;
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::ast::{
|
||||
ColumnDef, ColumnOption, ColumnOptionDef, DataType as SqlDataType, Expr, ObjectName,
|
||||
@@ -209,6 +209,8 @@ fn parse_column_default_constraint(
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(yingwen): Make column nullable by default, and checks invalid case like
|
||||
// a column is not nullable but has a default value null.
|
||||
/// Create a `ColumnSchema` from `ColumnDef`.
|
||||
pub fn column_def_to_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
|
||||
let is_nullable = column_def
|
||||
@@ -221,12 +223,11 @@ pub fn column_def_to_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
|
||||
let default_constraint =
|
||||
parse_column_default_constraint(&name, &data_type, &column_def.options)?;
|
||||
|
||||
Ok(ColumnSchema {
|
||||
name,
|
||||
data_type,
|
||||
is_nullable,
|
||||
default_constraint,
|
||||
})
|
||||
ColumnSchema::new(name, data_type, is_nullable)
|
||||
.with_default_constraint(default_constraint)
|
||||
.context(error::InvalidDefaultSnafu {
|
||||
column: &column_def.name.value,
|
||||
})
|
||||
}
|
||||
|
||||
fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result<ConcreteDataType> {
|
||||
@@ -386,7 +387,7 @@ mod tests {
|
||||
&SqlValue::DoubleQuotedString("2022-02-22 00:01:03".to_string()),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(ConcreteDataType::date_datatype(), value.data_type());
|
||||
assert_eq!(ConcreteDataType::datetime_datatype(), value.data_type());
|
||||
if let Value::DateTime(d) = value {
|
||||
assert_eq!("2022-02-22 00:01:03", d.to_string());
|
||||
} else {
|
||||
|
||||
@@ -22,12 +22,6 @@ pub enum Error {
|
||||
source: MetadataError,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid schema of input data, region: {}", region))]
|
||||
InvalidInputSchema {
|
||||
region: String,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Missing column {} in write batch", column))]
|
||||
BatchMissingColumn {
|
||||
column: String,
|
||||
@@ -132,7 +126,10 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid timestamp in write batch, source: {}", source))]
|
||||
InvalidTimestamp { source: crate::write_batch::Error },
|
||||
InvalidTimestamp {
|
||||
#[snafu(backtrace)]
|
||||
source: crate::write_batch::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Task already cancelled"))]
|
||||
Cancelled { backtrace: Backtrace },
|
||||
@@ -269,6 +266,35 @@ pub enum Error {
|
||||
#[snafu(backtrace)]
|
||||
source: MetadataError,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to add default value for column {}, source: {}",
|
||||
column,
|
||||
source
|
||||
))]
|
||||
AddDefault {
|
||||
column: String,
|
||||
source: crate::write_batch::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Not allowed to write data with version {} to schema with version {}",
|
||||
data_version,
|
||||
schema_version
|
||||
))]
|
||||
WriteToOldVersion {
|
||||
/// Schema version of data to write.
|
||||
data_version: u32,
|
||||
schema_version: u32,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Column {} not in schema with version {}", column, version))]
|
||||
NotInSchemaToCompat {
|
||||
column: String,
|
||||
version: u32,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -279,12 +305,13 @@ impl ErrorExt for Error {
|
||||
|
||||
match self {
|
||||
InvalidScanIndex { .. }
|
||||
| InvalidInputSchema { .. }
|
||||
| BatchMissingColumn { .. }
|
||||
| BatchMissingTimestamp { .. }
|
||||
| InvalidTimestamp { .. }
|
||||
| InvalidProjection { .. }
|
||||
| BuildBatch { .. } => StatusCode::InvalidArguments,
|
||||
| BuildBatch { .. }
|
||||
| NotInSchemaToCompat { .. }
|
||||
| WriteToOldVersion { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Utf8 { .. }
|
||||
| EncodeJson { .. }
|
||||
@@ -322,6 +349,7 @@ impl ErrorExt for Error {
|
||||
source.status_code()
|
||||
}
|
||||
PushBatch { source, .. } => source.status_code(),
|
||||
AddDefault { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -45,7 +45,11 @@ pub struct RawColumnFamiliesMetadata {
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub struct RegionChange {
|
||||
/// The committed sequence of the region when this change happens. So the
|
||||
/// data with sequence **greater than** this sequence would use the new
|
||||
/// metadata.
|
||||
pub committed_sequence: SequenceNumber,
|
||||
/// The metadata after changed.
|
||||
pub metadata: RawRegionMetadata,
|
||||
}
|
||||
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_time::{RangeMillis, TimestampMillis};
|
||||
use datatypes::prelude::ScalarVector;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use datatypes::vectors::{NullVector, TimestampVector, VectorRef};
|
||||
use snafu::{ensure, OptionExt};
|
||||
use datatypes::vectors::{TimestampVector, VectorRef};
|
||||
use snafu::OptionExt;
|
||||
use store_api::storage::{ColumnDescriptor, OpType, SequenceNumber};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
@@ -60,6 +59,9 @@ impl Inserter {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Only validate schema in debug mod.
|
||||
validate_input_and_memtable_schemas(batch, memtables);
|
||||
|
||||
// Enough to hold all key or value columns.
|
||||
let total_column_num = batch.schema().num_columns();
|
||||
// Reusable KeyValues buffer.
|
||||
@@ -164,6 +166,18 @@ impl Inserter {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
fn validate_input_and_memtable_schemas(batch: &WriteBatch, memtables: &MemtableSet) {
|
||||
let batch_schema = batch.schema();
|
||||
for (_, memtable) in memtables.iter() {
|
||||
let memtable_schema = memtable.schema();
|
||||
let user_schema = memtable_schema.user_schema();
|
||||
debug_assert_eq!(batch_schema.version(), user_schema.version());
|
||||
// Only validate column schemas.
|
||||
debug_assert_eq!(batch_schema.column_schemas(), user_schema.column_schemas());
|
||||
}
|
||||
}
|
||||
|
||||
fn new_range_index_map(time_ranges: &[RangeMillis]) -> RangeIndexMap {
|
||||
time_ranges
|
||||
.iter()
|
||||
@@ -177,18 +191,10 @@ fn clone_put_data_column_to(
|
||||
desc: &ColumnDescriptor,
|
||||
target: &mut Vec<VectorRef>,
|
||||
) -> Result<()> {
|
||||
if let Some(vector) = put_data.column_by_name(&desc.name) {
|
||||
target.push(vector.clone());
|
||||
} else {
|
||||
// The write batch should have been validated before.
|
||||
ensure!(
|
||||
desc.is_nullable,
|
||||
error::BatchMissingColumnSnafu { column: &desc.name }
|
||||
);
|
||||
|
||||
let num_rows = put_data.num_rows();
|
||||
target.push(Arc::new(NullVector::new(num_rows)));
|
||||
}
|
||||
let vector = put_data
|
||||
.column_by_name(&desc.name)
|
||||
.context(error::BatchMissingColumnSnafu { column: &desc.name })?;
|
||||
target.push(vector.clone());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -289,6 +295,8 @@ fn compute_slice_indexes(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_time::timestamp::Timestamp;
|
||||
use datatypes::prelude::ScalarVectorBuilder;
|
||||
use datatypes::vectors::{Int64Vector, TimestampVector, TimestampVectorBuilder};
|
||||
|
||||
@@ -202,7 +202,7 @@ impl RegionMetadata {
|
||||
// We don't check the case that the column is not nullable but default constraint is null. The
|
||||
// caller should guarantee this.
|
||||
ensure!(
|
||||
add_column.desc.is_nullable || add_column.desc.default_constraint.is_some(),
|
||||
add_column.desc.is_nullable() || add_column.desc.default_constraint().is_some(),
|
||||
AddNonNullColumnSnafu {
|
||||
name: &add_column.desc.name,
|
||||
}
|
||||
|
||||
@@ -98,7 +98,7 @@ impl From<&schema::ColumnSchema> for ColumnSchema {
|
||||
Self {
|
||||
name: cs.name.clone(),
|
||||
data_type: DataType::from(&cs.data_type).into(),
|
||||
is_nullable: cs.is_nullable,
|
||||
is_nullable: cs.is_nullable(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -140,7 +140,12 @@ impl From<&ConcreteDataType> for DataType {
|
||||
ConcreteDataType::Null(_) => DataType::Null,
|
||||
ConcreteDataType::Binary(_) => DataType::Binary,
|
||||
ConcreteDataType::Timestamp(_) => DataType::Timestamp,
|
||||
_ => unimplemented!(), // TODO(jiachun): Maybe support some composite types in the future , such as list, struct, etc.
|
||||
ConcreteDataType::Date(_)
|
||||
| ConcreteDataType::DateTime(_)
|
||||
| ConcreteDataType::List(_) => {
|
||||
// TODO(jiachun): Maybe support some composite types in the future , such as list, struct, etc.
|
||||
unimplemented!("data type {:?} is not supported", data_type)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -276,7 +281,8 @@ gen_put_data!(string, StringVectorBuilder, v, v.as_str());
|
||||
gen_put_data!(timestamp, TimestampVectorBuilder, v, (*v).into());
|
||||
|
||||
pub fn gen_columns(vector: &VectorRef) -> Result<Column> {
|
||||
match vector.data_type() {
|
||||
let data_type = vector.data_type();
|
||||
match data_type {
|
||||
ConcreteDataType::Boolean(_) => gen_columns_bool(vector),
|
||||
ConcreteDataType::Int8(_) => gen_columns_i8(vector),
|
||||
ConcreteDataType::Int16(_) => gen_columns_i16(vector),
|
||||
@@ -291,8 +297,12 @@ pub fn gen_columns(vector: &VectorRef) -> Result<Column> {
|
||||
ConcreteDataType::Binary(_) => gen_columns_binary(vector),
|
||||
ConcreteDataType::String(_) => gen_columns_string(vector),
|
||||
ConcreteDataType::Timestamp(_) => gen_columns_timestamp(vector),
|
||||
_ => {
|
||||
unimplemented!() // TODO(jiachun): Maybe support some composite types in the future, such as list, struct, etc.
|
||||
ConcreteDataType::Null(_)
|
||||
| ConcreteDataType::Date(_)
|
||||
| ConcreteDataType::DateTime(_)
|
||||
| ConcreteDataType::List(_) => {
|
||||
// TODO(jiachun): Maybe support some composite types in the future, such as list, struct, etc.
|
||||
unimplemented!("data type {:?} is not supported", data_type)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -313,6 +323,12 @@ pub fn gen_put_data_vector(data_type: ConcreteDataType, column: Column) -> Resul
|
||||
ConcreteDataType::Binary(_) => gen_put_data_binary(column),
|
||||
ConcreteDataType::String(_) => gen_put_data_string(column),
|
||||
ConcreteDataType::Timestamp(_) => gen_put_data_timestamp(column),
|
||||
_ => unimplemented!(), // TODO(jiachun): Maybe support some composite types in the future, such as list, struct, etc.
|
||||
ConcreteDataType::Null(_)
|
||||
| ConcreteDataType::Date(_)
|
||||
| ConcreteDataType::DateTime(_)
|
||||
| ConcreteDataType::List(_) => {
|
||||
// TODO(jiachun): Maybe support some composite types in the future, such as list, struct, etc.
|
||||
unimplemented!("data type {:?} is not supported", data_type)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::logging;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
|
||||
use store_api::storage::{
|
||||
@@ -23,6 +23,7 @@ use crate::manifest::{
|
||||
use crate::memtable::MemtableBuilderRef;
|
||||
use crate::metadata::{RegionMetaImpl, RegionMetadata};
|
||||
pub use crate::region::writer::{AlterContext, RegionWriter, RegionWriterRef, WriterContext};
|
||||
use crate::schema::compat::CompatWrite;
|
||||
use crate::snapshot::SnapshotImpl;
|
||||
use crate::sst::AccessLayerRef;
|
||||
use crate::version::VersionEdit;
|
||||
@@ -63,7 +64,10 @@ impl<S: LogStore> Region for RegionImpl<S> {
|
||||
self.inner.in_memory_metadata()
|
||||
}
|
||||
|
||||
async fn write(&self, ctx: &WriteContext, request: WriteBatch) -> Result<WriteResponse> {
|
||||
async fn write(&self, ctx: &WriteContext, mut request: WriteBatch) -> Result<WriteResponse> {
|
||||
// Compat the schema of the write batch outside of the write lock.
|
||||
self.inner.compat_write_batch(&mut request)?;
|
||||
|
||||
self.inner.write(ctx, request).await
|
||||
}
|
||||
|
||||
@@ -320,6 +324,11 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
async fn wait_flush_done(&self) -> Result<()> {
|
||||
self.inner.writer.wait_flush_done().await
|
||||
}
|
||||
|
||||
/// Write to inner, also the `RegionWriter` directly.
|
||||
async fn write_inner(&self, ctx: &WriteContext, request: WriteBatch) -> Result<WriteResponse> {
|
||||
self.inner.write(ctx, request).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared data of region.
|
||||
@@ -377,19 +386,17 @@ impl<S: LogStore> RegionInner<S> {
|
||||
SnapshotImpl::new(version, sequence, self.sst_layer.clone())
|
||||
}
|
||||
|
||||
async fn write(&self, ctx: &WriteContext, request: WriteBatch) -> Result<WriteResponse> {
|
||||
// FIXME(yingwen): [alter] The schema may be outdated.
|
||||
let metadata = self.in_memory_metadata();
|
||||
fn compat_write_batch(&self, request: &mut WriteBatch) -> Result<()> {
|
||||
let metadata = self.version_control().metadata();
|
||||
let schema = metadata.schema();
|
||||
|
||||
// Only compare column schemas.
|
||||
ensure!(
|
||||
schema.column_schemas() == request.schema().column_schemas(),
|
||||
error::InvalidInputSchemaSnafu {
|
||||
region: &self.shared.name
|
||||
}
|
||||
);
|
||||
// Try to make request schema compatible with region's outside of write lock. Note that
|
||||
// schema might be altered after this step.
|
||||
request.compat_write(schema.user_schema())
|
||||
}
|
||||
|
||||
/// Write to writer directly.
|
||||
async fn write(&self, ctx: &WriteContext, request: WriteBatch) -> Result<WriteResponse> {
|
||||
let writer_ctx = WriterContext {
|
||||
shared: &self.shared,
|
||||
flush_strategy: &self.flush_strategy,
|
||||
@@ -399,12 +406,18 @@ impl<S: LogStore> RegionInner<S> {
|
||||
writer: &self.writer,
|
||||
manifest: &self.manifest,
|
||||
};
|
||||
// Now altering schema is not allowed, so it is safe to validate schema outside of the lock.
|
||||
// The writer would also try to compat the schema of write batch if it finds out the
|
||||
// schema version of request is less than current schema version.
|
||||
self.writer.write(ctx, request, writer_ctx).await
|
||||
}
|
||||
|
||||
async fn alter(&self, request: AlterRequest) -> Result<()> {
|
||||
// TODO(yingwen): [alter] Log the request.
|
||||
logging::info!(
|
||||
"Alter region {}, name: {}, request: {:?}",
|
||||
self.shared.id,
|
||||
self.shared.name,
|
||||
request
|
||||
);
|
||||
|
||||
let alter_ctx = AlterContext {
|
||||
shared: &self.shared,
|
||||
|
||||
@@ -64,6 +64,20 @@ impl<S: LogStore> TesterBase<S> {
|
||||
self.region.write(&self.write_ctx, batch).await.unwrap()
|
||||
}
|
||||
|
||||
/// Put without version specified directly to inner writer.
|
||||
pub async fn put_inner(&self, data: &[(i64, Option<i64>)]) -> WriteResponse {
|
||||
let data: Vec<(Timestamp, Option<i64>)> =
|
||||
data.iter().map(|(l, r)| ((*l).into(), *r)).collect();
|
||||
let mut batch = new_write_batch_for_test(false);
|
||||
let put_data = new_put_data(&data);
|
||||
batch.put(put_data).unwrap();
|
||||
|
||||
self.region
|
||||
.write_inner(&self.write_ctx, batch)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Scan all data.
|
||||
pub async fn full_scan(&self) -> Vec<(i64, Option<i64>)> {
|
||||
logging::info!("Full scan with ctx {:?}", self.read_ctx);
|
||||
|
||||
@@ -3,7 +3,6 @@ use std::sync::Arc;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::prelude::ScalarVector;
|
||||
use datatypes::type_id::LogicalTypeId;
|
||||
use datatypes::vectors::Int64Vector;
|
||||
use datatypes::vectors::TimestampVector;
|
||||
use log_store::fs::log::LocalFileLogStore;
|
||||
@@ -18,10 +17,9 @@ use tempdir::TempDir;
|
||||
use crate::region::tests::{self, FileTesterBase};
|
||||
use crate::region::OpenOptions;
|
||||
use crate::region::RegionImpl;
|
||||
use crate::test_util;
|
||||
use crate::test_util::config_util;
|
||||
use crate::test_util::{self, write_batch_util};
|
||||
use crate::write_batch::PutData;
|
||||
use crate::write_batch::WriteBatch;
|
||||
|
||||
const REGION_NAME: &str = "region-alter-0";
|
||||
|
||||
@@ -58,18 +56,6 @@ impl DataRow {
|
||||
}
|
||||
}
|
||||
|
||||
fn new_write_batch_for_test() -> WriteBatch {
|
||||
write_batch_util::new_write_batch(
|
||||
&[
|
||||
("k0", LogicalTypeId::Int64, true),
|
||||
(test_util::TIMESTAMP_NAME, LogicalTypeId::Timestamp, false),
|
||||
("v0", LogicalTypeId::Int64, true),
|
||||
("v1", LogicalTypeId::Int64, true),
|
||||
],
|
||||
Some(1),
|
||||
)
|
||||
}
|
||||
|
||||
fn new_put_data(data: &[DataRow]) -> PutData {
|
||||
let mut put_data = PutData::with_num_columns(4);
|
||||
|
||||
@@ -122,8 +108,9 @@ impl AlterTester {
|
||||
metadata.schema().clone()
|
||||
}
|
||||
|
||||
// Put with schema k0, ts, v0, v1
|
||||
async fn put(&self, data: &[DataRow]) -> WriteResponse {
|
||||
let mut batch = new_write_batch_for_test();
|
||||
let mut batch = self.base().region.write_request();
|
||||
let put_data = new_put_data(data);
|
||||
batch.put(put_data).unwrap();
|
||||
|
||||
@@ -135,10 +122,17 @@ impl AlterTester {
|
||||
}
|
||||
|
||||
/// Put data with initial schema.
|
||||
async fn put_before_alter(&self, data: &[(i64, Option<i64>)]) {
|
||||
async fn put_with_init_schema(&self, data: &[(i64, Option<i64>)]) {
|
||||
// put of FileTesterBase always use initial schema version.
|
||||
self.base().put(data).await;
|
||||
}
|
||||
|
||||
/// Put data to inner writer with initial schema.
|
||||
async fn put_inner_with_init_schema(&self, data: &[(i64, Option<i64>)]) {
|
||||
// put of FileTesterBase always use initial schema version.
|
||||
self.base().put_inner(data).await;
|
||||
}
|
||||
|
||||
async fn alter(&self, mut req: AlterRequest) {
|
||||
let version = self.version();
|
||||
req.version = version;
|
||||
@@ -200,13 +194,14 @@ fn check_schema_names(schema: &SchemaRef, names: &[&str]) {
|
||||
#[tokio::test]
|
||||
async fn test_alter_region_with_reopen() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let dir = TempDir::new("alter-region").unwrap();
|
||||
let store_dir = dir.path().to_str().unwrap();
|
||||
let mut tester = AlterTester::new(store_dir).await;
|
||||
|
||||
let data = vec![(1000, Some(100)), (1001, Some(101)), (1002, Some(102))];
|
||||
|
||||
tester.put_before_alter(&data).await;
|
||||
tester.put_with_init_schema(&data).await;
|
||||
assert_eq!(3, tester.full_scan().await.len());
|
||||
|
||||
let schema = tester.schema();
|
||||
@@ -265,7 +260,7 @@ async fn test_alter_region() {
|
||||
|
||||
let data = vec![(1000, Some(100)), (1001, Some(101)), (1002, Some(102))];
|
||||
|
||||
tester.put_before_alter(&data).await;
|
||||
tester.put_with_init_schema(&data).await;
|
||||
|
||||
let schema = tester.schema();
|
||||
check_schema_names(&schema, &["timestamp", "v0"]);
|
||||
@@ -295,3 +290,29 @@ async fn test_alter_region() {
|
||||
let schema = tester.schema();
|
||||
check_schema_names(&schema, &["k0", "timestamp", "v2", "v3"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put_old_schema_after_alter() {
|
||||
let dir = TempDir::new("put-old").unwrap();
|
||||
let store_dir = dir.path().to_str().unwrap();
|
||||
let tester = AlterTester::new(store_dir).await;
|
||||
|
||||
let data = vec![(1000, Some(100)), (1001, Some(101)), (1002, Some(102))];
|
||||
|
||||
tester.put_with_init_schema(&data).await;
|
||||
|
||||
let req = add_column_req(&[
|
||||
(new_column_desc(4, "k0"), true), // key column k0
|
||||
(new_column_desc(5, "v1"), false), // value column v1
|
||||
]);
|
||||
tester.alter(req).await;
|
||||
|
||||
// Put with old schema.
|
||||
let data = vec![(1003, Some(103)), (1004, Some(104))];
|
||||
tester.put_with_init_schema(&data).await;
|
||||
|
||||
// Put data with old schema directly to the inner writer, to check that the region
|
||||
// writer could compat the schema of write batch.
|
||||
let data = vec![(1003, Some(103)), (1004, Some(104))];
|
||||
tester.put_inner_with_init_schema(&data).await;
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ use crate::manifest::action::{
|
||||
use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableSet};
|
||||
use crate::proto::wal::WalHeader;
|
||||
use crate::region::{RecoveredMetadataMap, RegionManifest, SharedDataRef};
|
||||
use crate::schema::compat::CompatWrite;
|
||||
use crate::sst::AccessLayerRef;
|
||||
use crate::version::{VersionControlRef, VersionEdit};
|
||||
use crate::wal::{Payload, Wal};
|
||||
@@ -153,16 +154,25 @@ impl RegionWriter {
|
||||
// Acquire the version lock before altering the metadata.
|
||||
let _lock = self.version_mutex.lock().await;
|
||||
|
||||
let committed_sequence = version_control.committed_sequence();
|
||||
let mut action_list =
|
||||
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
|
||||
metadata: raw,
|
||||
committed_sequence: version_control.committed_sequence(),
|
||||
committed_sequence,
|
||||
}));
|
||||
let new_metadata = Arc::new(new_metadata);
|
||||
|
||||
// Persist the meta action.
|
||||
let prev_version = version_control.current_manifest_version();
|
||||
action_list.set_prev_version(prev_version);
|
||||
|
||||
logging::debug!(
|
||||
"Try to alter schema of region {}, region_id: {}, action_list: {:?}",
|
||||
new_metadata.name(),
|
||||
new_metadata.id(),
|
||||
action_list
|
||||
);
|
||||
|
||||
let manifest_version = alter_ctx.manifest.update(action_list).await?;
|
||||
|
||||
// Now we could switch memtables and apply the new metadata to the version.
|
||||
@@ -181,6 +191,8 @@ impl RegionWriter {
|
||||
version_control: &VersionControlRef,
|
||||
manifest_version: ManifestVersion,
|
||||
) -> Result<()> {
|
||||
// We always bump the committed sequence regardless whether persisting the manifest version
|
||||
// to wal is success, to avoid RegionMetaAction use same committed sequence in accident.
|
||||
let next_sequence = version_control.committed_sequence() + 1;
|
||||
version_control.set_committed_sequence(next_sequence);
|
||||
|
||||
@@ -188,8 +200,6 @@ impl RegionWriter {
|
||||
wal.write_to_wal(next_sequence, header, Payload::None)
|
||||
.await?;
|
||||
|
||||
version_control.set_committed_sequence(next_sequence);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -261,20 +271,26 @@ impl WriterInner {
|
||||
&mut self,
|
||||
version_mutex: &Mutex<()>,
|
||||
_ctx: &WriteContext,
|
||||
request: WriteBatch,
|
||||
mut request: WriteBatch,
|
||||
writer_ctx: WriterContext<'_, S>,
|
||||
) -> Result<WriteResponse> {
|
||||
let time_ranges = self.preprocess_write(&request, &writer_ctx).await?;
|
||||
|
||||
// TODO(yingwen): Write wal and get sequence.
|
||||
let version_control = writer_ctx.version_control();
|
||||
let version = version_control.current();
|
||||
|
||||
let _lock = version_mutex.lock().await;
|
||||
|
||||
let metadata = version_control.metadata();
|
||||
// We need to check the schema again since it might has been altered. We need
|
||||
// to compat request's schema before writing it into the WAL otherwise some
|
||||
// default constraint like `current_timestamp()` would yield different value
|
||||
// during replay.
|
||||
request.compat_write(metadata.schema().user_schema())?;
|
||||
|
||||
let committed_sequence = version_control.committed_sequence();
|
||||
// Sequence for current write batch.
|
||||
let next_sequence = committed_sequence + 1;
|
||||
|
||||
let version = version_control.current();
|
||||
let wal_header = WalHeader::with_last_manifest_version(version.manifest_version());
|
||||
writer_ctx
|
||||
.wal
|
||||
@@ -318,10 +334,11 @@ impl WriterInner {
|
||||
// should be flushed_sequence + 1.
|
||||
let mut stream = writer_ctx.wal.read_from_wal(flushed_sequence + 1).await?;
|
||||
while let Some((req_sequence, _header, request)) = stream.try_next().await? {
|
||||
while let Some((next_apply_sequence, _)) = next_apply_metadata {
|
||||
if req_sequence >= next_apply_sequence {
|
||||
// It's safe to unwrap here. It's checked above.
|
||||
// Move out metadata to avoid cloning it.
|
||||
while let Some((sequence_before_alter, _)) = next_apply_metadata {
|
||||
// There might be multiple metadata changes to be applied, so a loop is necessary.
|
||||
if req_sequence > sequence_before_alter {
|
||||
// This is the first request that use the new metadata.
|
||||
// It's safe to unwrap here. It's checked above. Move out metadata to avoid cloning it.
|
||||
let (_, (manifest_version, metadata)) = next_apply_metadata.take().unwrap();
|
||||
version_control.freeze_mutable_and_apply_metadata(
|
||||
Arc::new(metadata.try_into().context(
|
||||
@@ -332,13 +349,15 @@ impl WriterInner {
|
||||
manifest_version,
|
||||
);
|
||||
num_recovered_metadata += 1;
|
||||
logging::debug!("Applied metadata to region: {} when replaying WAL: sequence={} manifest={} ",
|
||||
writer_ctx.shared.name,
|
||||
next_apply_sequence,
|
||||
manifest_version);
|
||||
logging::debug!(
|
||||
"Applied metadata to region: {} when replaying WAL: sequence={} manifest={} ",
|
||||
writer_ctx.shared.name,
|
||||
sequence_before_alter,
|
||||
manifest_version
|
||||
);
|
||||
next_apply_metadata = recovered_metadata.pop_first();
|
||||
} else {
|
||||
// Keep the next_apply_metadata until req_sequence >= next_apply_sequence
|
||||
// Keep the next_apply_metadata until req_sequence > sequence_before_alter
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
pub mod compat;
|
||||
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
use std::sync::Arc;
|
||||
|
||||
16
src/storage/src/schema/compat.rs
Normal file
16
src/storage/src/schema/compat.rs
Normal file
@@ -0,0 +1,16 @@
|
||||
//! Utilities for resolving schema compatibility problems.
|
||||
|
||||
use datatypes::schema::SchemaRef;
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
/// Make schema compatible to write to target with another schema.
|
||||
pub trait CompatWrite {
|
||||
/// Makes the schema of `self` compatible with `dest_schema`.
|
||||
///
|
||||
/// For column in `dest_schema` but not in `self`, this method would insert a
|
||||
/// vector with default value.
|
||||
///
|
||||
/// If there are columns not in `dest_schema`, an error would be returned.
|
||||
fn compat_write(&mut self, dest_schema: &SchemaRef) -> Result<()>;
|
||||
}
|
||||
@@ -1,3 +1,5 @@
|
||||
mod compat;
|
||||
|
||||
use std::{
|
||||
any::Any,
|
||||
collections::{BTreeSet, HashMap},
|
||||
@@ -7,13 +9,14 @@ use std::{
|
||||
|
||||
use common_error::prelude::*;
|
||||
use common_time::{RangeMillis, TimestampMillis};
|
||||
use datatypes::schema::{ColumnSchema, SchemaRef};
|
||||
use datatypes::vectors::TimestampVector;
|
||||
use datatypes::{
|
||||
arrow::error::ArrowError, data_type::ConcreteDataType, prelude::ScalarVector, prelude::Value,
|
||||
schema::SchemaRef, vectors::VectorRef,
|
||||
vectors::VectorRef,
|
||||
};
|
||||
use prost::{DecodeError, EncodeError};
|
||||
use snafu::ensure;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::{consts, PutOperation, WriteRequest};
|
||||
|
||||
use crate::proto;
|
||||
@@ -127,6 +130,17 @@ pub enum Error {
|
||||
source: proto::write_batch::Error,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to create default value for column {}, source: {}",
|
||||
name,
|
||||
source
|
||||
))]
|
||||
CreateDefault {
|
||||
name: String,
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -155,26 +169,16 @@ pub struct WriteBatch {
|
||||
num_rows: usize,
|
||||
}
|
||||
|
||||
impl WriteBatch {
|
||||
pub fn new(schema: SchemaRef) -> Self {
|
||||
Self {
|
||||
schema,
|
||||
mutations: Vec::new(),
|
||||
num_rows: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WriteRequest for WriteBatch {
|
||||
type Error = Error;
|
||||
type PutOp = PutData;
|
||||
|
||||
fn put(&mut self, data: PutData) -> Result<()> {
|
||||
fn put(&mut self, mut data: PutData) -> Result<()> {
|
||||
if data.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.validate_put(&data)?;
|
||||
self.preprocess_put_data(&mut data)?;
|
||||
|
||||
self.add_num_rows(data.num_rows())?;
|
||||
self.mutations.push(Mutation::Put(data));
|
||||
@@ -254,6 +258,14 @@ fn align_timestamp(ts: i64, duration: i64) -> Option<i64> {
|
||||
|
||||
// WriteBatch pub methods.
|
||||
impl WriteBatch {
|
||||
pub fn new(schema: SchemaRef) -> Self {
|
||||
Self {
|
||||
schema,
|
||||
mutations: Vec::new(),
|
||||
num_rows: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn schema(&self) -> &SchemaRef {
|
||||
&self.schema
|
||||
}
|
||||
@@ -267,6 +279,7 @@ impl WriteBatch {
|
||||
}
|
||||
}
|
||||
|
||||
/// Enum to wrap different operations.
|
||||
pub enum Mutation {
|
||||
Put(PutData),
|
||||
}
|
||||
@@ -286,6 +299,47 @@ impl PutData {
|
||||
columns: HashMap::with_capacity(num_columns),
|
||||
}
|
||||
}
|
||||
|
||||
fn add_column_by_name(&mut self, name: &str, vector: VectorRef) -> Result<()> {
|
||||
ensure!(
|
||||
!self.columns.contains_key(name),
|
||||
DuplicateColumnSnafu { name }
|
||||
);
|
||||
|
||||
if let Some(col) = self.columns.values().next() {
|
||||
ensure!(
|
||||
col.len() == vector.len(),
|
||||
LenNotEqualsSnafu {
|
||||
name,
|
||||
expect: col.len(),
|
||||
given: vector.len(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
self.columns.insert(name.to_string(), vector);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Add columns by its default value.
|
||||
fn add_default_by_name(&mut self, column_schema: &ColumnSchema) -> Result<()> {
|
||||
let num_rows = self.num_rows();
|
||||
|
||||
// If column is not provided, fills it by default value.
|
||||
let vector = column_schema
|
||||
.create_default_vector(num_rows)
|
||||
.context(CreateDefaultSnafu {
|
||||
name: &column_schema.name,
|
||||
})?
|
||||
.context(MissingColumnSnafu {
|
||||
name: &column_schema.name,
|
||||
})?;
|
||||
|
||||
validate_column(column_schema, &vector)?;
|
||||
|
||||
self.add_column_by_name(&column_schema.name, vector)
|
||||
}
|
||||
}
|
||||
|
||||
impl PutOperation for PutData {
|
||||
@@ -296,7 +350,6 @@ impl PutOperation for PutData {
|
||||
}
|
||||
|
||||
fn add_version_column(&mut self, vector: VectorRef) -> Result<()> {
|
||||
// TODO(yingwen): Maybe ensure that version column must be a uint64 vector.
|
||||
self.add_column_by_name(consts::VERSION_COLUMN_NAME, vector)
|
||||
}
|
||||
|
||||
@@ -349,34 +402,42 @@ impl PutData {
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_column(column_schema: &ColumnSchema, col: &VectorRef) -> Result<()> {
|
||||
if !col.data_type().is_null() {
|
||||
// This allow us to use NullVector for columns that only have null value.
|
||||
// TODO(yingwen): Let NullVector supports different logical type so we could
|
||||
// check data type directly.
|
||||
ensure!(
|
||||
col.data_type() == column_schema.data_type,
|
||||
TypeMismatchSnafu {
|
||||
name: &column_schema.name,
|
||||
expect: column_schema.data_type.clone(),
|
||||
given: col.data_type(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
ensure!(
|
||||
column_schema.is_nullable() || col.null_count() == 0,
|
||||
HasNullSnafu {
|
||||
name: &column_schema.name,
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl WriteBatch {
|
||||
fn validate_put(&self, data: &PutData) -> Result<()> {
|
||||
/// Validate [PutData] and fill missing columns by default value.
|
||||
fn preprocess_put_data(&self, data: &mut PutData) -> Result<()> {
|
||||
for column_schema in self.schema.column_schemas() {
|
||||
match data.column_by_name(&column_schema.name) {
|
||||
Some(col) => {
|
||||
ensure!(
|
||||
col.data_type() == column_schema.data_type,
|
||||
TypeMismatchSnafu {
|
||||
name: &column_schema.name,
|
||||
expect: column_schema.data_type.clone(),
|
||||
given: col.data_type(),
|
||||
}
|
||||
);
|
||||
|
||||
ensure!(
|
||||
column_schema.is_nullable || col.null_count() == 0,
|
||||
HasNullSnafu {
|
||||
name: &column_schema.name,
|
||||
}
|
||||
);
|
||||
validate_column(column_schema, col)?;
|
||||
}
|
||||
None => {
|
||||
ensure!(
|
||||
column_schema.is_nullable,
|
||||
MissingColumnSnafu {
|
||||
name: &column_schema.name,
|
||||
}
|
||||
);
|
||||
// If column is not provided, fills it by default value.
|
||||
data.add_default_by_name(column_schema)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -412,30 +473,6 @@ impl<'a> IntoIterator for &'a WriteBatch {
|
||||
}
|
||||
}
|
||||
|
||||
impl PutData {
|
||||
fn add_column_by_name(&mut self, name: &str, vector: VectorRef) -> Result<()> {
|
||||
ensure!(
|
||||
!self.columns.contains_key(name),
|
||||
DuplicateColumnSnafu { name }
|
||||
);
|
||||
|
||||
if let Some(col) = self.columns.values().next() {
|
||||
ensure!(
|
||||
col.len() == vector.len(),
|
||||
LenNotEqualsSnafu {
|
||||
name,
|
||||
expect: col.len(),
|
||||
given: vector.len(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
self.columns.insert(name.to_string(), vector);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub mod codec {
|
||||
|
||||
use std::{io::Cursor, sync::Arc};
|
||||
@@ -1173,8 +1210,7 @@ mod tests {
|
||||
|
||||
let encoder = codec::WriteBatchProtobufEncoder {};
|
||||
let mut dst = vec![];
|
||||
let result = encoder.encode(&batch, &mut dst);
|
||||
assert!(result.is_ok());
|
||||
encoder.encode(&batch, &mut dst).unwrap();
|
||||
|
||||
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras);
|
||||
let result = decoder.decode(&dst);
|
||||
|
||||
199
src/storage/src/write_batch/compat.rs
Normal file
199
src/storage/src/write_batch/compat.rs
Normal file
@@ -0,0 +1,199 @@
|
||||
use datatypes::schema::{ColumnSchema, SchemaRef};
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::schema::compat::CompatWrite;
|
||||
use crate::write_batch::{Mutation, PutData, WriteBatch};
|
||||
|
||||
impl CompatWrite for WriteBatch {
|
||||
fn compat_write(&mut self, dest_schema: &SchemaRef) -> Result<()> {
|
||||
let (data_version, schema_version) = (dest_schema.version(), self.schema.version());
|
||||
// Fast path, nothing to do if schema version of the write batch is equal to version
|
||||
// of destination.
|
||||
if data_version == schema_version {
|
||||
debug_assert_eq!(dest_schema.column_schemas(), self.schema.column_schemas());
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
ensure!(
|
||||
data_version > schema_version,
|
||||
error::WriteToOldVersionSnafu {
|
||||
data_version,
|
||||
schema_version,
|
||||
}
|
||||
);
|
||||
|
||||
// For columns not in schema, returns error instead of discarding the column silently.
|
||||
let column_not_in = column_not_in_schema(dest_schema, self.schema.column_schemas());
|
||||
ensure!(
|
||||
column_not_in.is_none(),
|
||||
error::NotInSchemaToCompatSnafu {
|
||||
column: column_not_in.unwrap(),
|
||||
version: data_version,
|
||||
}
|
||||
);
|
||||
|
||||
for m in &mut self.mutations {
|
||||
match m {
|
||||
Mutation::Put(put_data) => {
|
||||
put_data.compat_write(dest_schema)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Change schema to `dest_schema`.
|
||||
self.schema = dest_schema.clone();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl CompatWrite for PutData {
|
||||
fn compat_write(&mut self, dest_schema: &SchemaRef) -> Result<()> {
|
||||
if self.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
for column_schema in dest_schema.column_schemas() {
|
||||
if self.column_by_name(&column_schema.name).is_none() {
|
||||
// We need to fill the column by null or its default value.
|
||||
self.add_default_by_name(column_schema)
|
||||
.context(error::AddDefaultSnafu {
|
||||
column: &column_schema.name,
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn column_not_in_schema(schema: &SchemaRef, column_schemas: &[ColumnSchema]) -> Option<String> {
|
||||
column_schemas.iter().find_map(|col| {
|
||||
if schema.column_schema_by_name(&col.name).is_none() {
|
||||
Some(col.name.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnDefaultConstraint, SchemaBuilder};
|
||||
use datatypes::vectors::{Int32Vector, TimestampVector};
|
||||
use store_api::storage::{PutOperation, WriteRequest};
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
|
||||
fn new_test_schema_builder(
|
||||
v0_constraint: Option<Option<ColumnDefaultConstraint>>,
|
||||
) -> SchemaBuilder {
|
||||
let mut column_schemas = vec![
|
||||
ColumnSchema::new("k0", ConcreteDataType::int32_datatype(), false),
|
||||
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false),
|
||||
];
|
||||
|
||||
if let Some(v0_constraint) = v0_constraint {
|
||||
column_schemas.push(
|
||||
ColumnSchema::new("v0", ConcreteDataType::int32_datatype(), true)
|
||||
.with_default_constraint(v0_constraint)
|
||||
.unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
SchemaBuilder::try_from(column_schemas)
|
||||
.unwrap()
|
||||
.timestamp_index(1)
|
||||
}
|
||||
|
||||
fn new_test_schema(v0_constraint: Option<Option<ColumnDefaultConstraint>>) -> SchemaRef {
|
||||
let schema = new_test_schema_builder(v0_constraint).build().unwrap();
|
||||
|
||||
Arc::new(schema)
|
||||
}
|
||||
|
||||
fn new_put_data() -> PutData {
|
||||
let mut put_data = PutData::new();
|
||||
let k0 = Arc::new(Int32Vector::from_slice(&[1, 2, 3]));
|
||||
let ts = Arc::new(TimestampVector::from_values([11, 12, 13]));
|
||||
|
||||
put_data.add_key_column("k0", k0).unwrap();
|
||||
put_data.add_key_column("ts", ts).unwrap();
|
||||
|
||||
put_data
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_put_data_compat_write() {
|
||||
let mut put_data = new_put_data();
|
||||
let schema = new_test_schema(Some(Some(ColumnDefaultConstraint::null_value())));
|
||||
put_data.compat_write(&schema).unwrap();
|
||||
let v0 = put_data.column_by_name("v0").unwrap();
|
||||
assert!(v0.only_null());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_batch_compat_write() {
|
||||
let schema_old = new_test_schema(None);
|
||||
let mut batch = WriteBatch::new(schema_old);
|
||||
let put_data = new_put_data();
|
||||
batch.put(put_data).unwrap();
|
||||
|
||||
let schema_new = Arc::new(
|
||||
new_test_schema_builder(Some(Some(ColumnDefaultConstraint::null_value())))
|
||||
.version(1)
|
||||
.build()
|
||||
.unwrap(),
|
||||
);
|
||||
batch.compat_write(&schema_new).unwrap();
|
||||
assert_eq!(schema_new, *batch.schema());
|
||||
let Mutation::Put(put_data) = batch.iter().next().unwrap();
|
||||
put_data.column_by_name("v0").unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_batch_compat_to_old() {
|
||||
let schema_old = new_test_schema(None);
|
||||
let schema_new = Arc::new(
|
||||
new_test_schema_builder(None)
|
||||
.version(1) // Bump the version
|
||||
.build()
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let mut batch = WriteBatch::new(schema_new);
|
||||
let err = batch.compat_write(&schema_old).unwrap_err();
|
||||
assert!(
|
||||
matches!(err, Error::WriteToOldVersion { .. }),
|
||||
"err {} is not WriteToOldVersion",
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_batch_skip_compat() {
|
||||
let schema = new_test_schema(None);
|
||||
let mut batch = WriteBatch::new(schema.clone());
|
||||
batch.compat_write(&schema).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_batch_compat_columns_not_in_schema() {
|
||||
let schema_has_column = new_test_schema(Some(None));
|
||||
let mut batch = WriteBatch::new(schema_has_column);
|
||||
|
||||
let schema_no_column = Arc::new(new_test_schema_builder(None).version(1).build().unwrap());
|
||||
let err = batch.compat_write(&schema_no_column).unwrap_err();
|
||||
assert!(
|
||||
matches!(err, Error::NotInSchemaToCompat { .. }),
|
||||
"err {} is not NotInSchemaToCompat",
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -10,10 +10,9 @@ pub type ColumnFamilyId = u32;
|
||||
/// Id of the region.
|
||||
pub type RegionId = u64;
|
||||
|
||||
// TODO(yingwen): Validate default value has same type with column, and name is a valid column name.
|
||||
/// A [ColumnDescriptor] contains information to create a column.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Builder)]
|
||||
#[builder(pattern = "owned")]
|
||||
#[builder(pattern = "owned", build_fn(validate = "Self::validate"))]
|
||||
pub struct ColumnDescriptor {
|
||||
pub id: ColumnId,
|
||||
#[builder(setter(into))]
|
||||
@@ -21,15 +20,27 @@ pub struct ColumnDescriptor {
|
||||
pub data_type: ConcreteDataType,
|
||||
/// Is column nullable, default is true.
|
||||
#[builder(default = "true")]
|
||||
pub is_nullable: bool,
|
||||
is_nullable: bool,
|
||||
/// Default constraint of column, default is None, which means no default constraint
|
||||
/// for this column, and user must provide a value for a not-null column.
|
||||
#[builder(default)]
|
||||
pub default_constraint: Option<ColumnDefaultConstraint>,
|
||||
default_constraint: Option<ColumnDefaultConstraint>,
|
||||
#[builder(default, setter(into))]
|
||||
pub comment: String,
|
||||
}
|
||||
|
||||
impl ColumnDescriptor {
|
||||
#[inline]
|
||||
pub fn is_nullable(&self) -> bool {
|
||||
self.is_nullable
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn default_constraint(&self) -> Option<&ColumnDefaultConstraint> {
|
||||
self.default_constraint.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl ColumnDescriptorBuilder {
|
||||
pub fn new<S: Into<String>>(id: ColumnId, name: S, data_type: ConcreteDataType) -> Self {
|
||||
Self {
|
||||
@@ -39,12 +50,35 @@ impl ColumnDescriptorBuilder {
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn validate(&self) -> Result<(), String> {
|
||||
if let Some(name) = &self.name {
|
||||
if name.is_empty() {
|
||||
return Err("name should not be empty".to_string());
|
||||
}
|
||||
}
|
||||
|
||||
if let (Some(Some(constraint)), Some(data_type)) =
|
||||
(&self.default_constraint, &self.data_type)
|
||||
{
|
||||
// The default value of unwrap_or should be same as the default value
|
||||
// defined in the `#[builder(default = "xxx")]` attribute.
|
||||
let is_nullable = self.is_nullable.unwrap_or(true);
|
||||
|
||||
constraint
|
||||
.validate(data_type, is_nullable)
|
||||
.map_err(|e| e.to_string())?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&ColumnDescriptor> for ColumnSchema {
|
||||
fn from(desc: &ColumnDescriptor) -> ColumnSchema {
|
||||
ColumnSchema::new(&desc.name, desc.data_type.clone(), desc.is_nullable)
|
||||
.with_default_constraint(desc.default_constraint.clone())
|
||||
.expect("ColumnDescriptor should validate default constraint")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,7 +173,7 @@ mod tests {
|
||||
.is_nullable(false)
|
||||
.build()
|
||||
.unwrap();
|
||||
assert!(!desc.is_nullable);
|
||||
assert!(!desc.is_nullable());
|
||||
|
||||
let desc = new_column_desc_builder()
|
||||
.default_constraint(Some(ColumnDefaultConstraint::Value(Value::Null)))
|
||||
@@ -147,7 +181,7 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
ColumnDefaultConstraint::Value(Value::Null),
|
||||
desc.default_constraint.unwrap()
|
||||
*desc.default_constraint().unwrap()
|
||||
);
|
||||
|
||||
let desc = new_column_desc_builder()
|
||||
@@ -164,6 +198,12 @@ mod tests {
|
||||
.build()
|
||||
.unwrap();
|
||||
assert_eq!("A test column", desc.comment);
|
||||
|
||||
new_column_desc_builder()
|
||||
.is_nullable(false)
|
||||
.default_constraint(Some(ColumnDefaultConstraint::Value(Value::Null)))
|
||||
.build()
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
fn new_timestamp_desc() -> ColumnDescriptor {
|
||||
|
||||
@@ -9,10 +9,14 @@ use datatypes::vectors::VectorRef;
|
||||
use crate::storage::{ColumnDescriptor, RegionDescriptor, SequenceNumber};
|
||||
|
||||
/// Write request holds a collection of updates to apply to a region.
|
||||
///
|
||||
/// The implementation of the write request should ensure all operations in
|
||||
/// the request follows the same schema restriction.
|
||||
pub trait WriteRequest: Send {
|
||||
type Error: ErrorExt + Send + Sync;
|
||||
type PutOp: PutOperation;
|
||||
|
||||
/// Add put operation to the request.
|
||||
fn put(&mut self, put: Self::PutOp) -> Result<(), Self::Error>;
|
||||
|
||||
/// Returns all possible time ranges that contain the timestamp in this batch.
|
||||
@@ -20,8 +24,10 @@ pub trait WriteRequest: Send {
|
||||
/// Each time range is aligned to given `duration`.
|
||||
fn time_ranges(&self, duration: Duration) -> Result<Vec<RangeMillis>, Self::Error>;
|
||||
|
||||
/// Create a new put operation.
|
||||
fn put_op(&self) -> Self::PutOp;
|
||||
|
||||
/// Create a new put operation with capacity reserved for `num_columns`.
|
||||
fn put_op_with_columns(num_columns: usize) -> Self::PutOp;
|
||||
}
|
||||
|
||||
@@ -29,10 +35,13 @@ pub trait WriteRequest: Send {
|
||||
pub trait PutOperation: Send + std::fmt::Debug {
|
||||
type Error: ErrorExt + Send + Sync;
|
||||
|
||||
/// Put data to the key column.
|
||||
fn add_key_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>;
|
||||
|
||||
/// Put data to the version column.
|
||||
fn add_version_column(&mut self, vector: VectorRef) -> Result<(), Self::Error>;
|
||||
|
||||
/// Put data to the value column.
|
||||
fn add_value_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
|
||||
@@ -143,8 +143,8 @@ fn build_row_key_desc(
|
||||
ts_column_schema.name.clone(),
|
||||
ts_column_schema.data_type.clone(),
|
||||
)
|
||||
.default_constraint(ts_column_schema.default_constraint.clone())
|
||||
.is_nullable(ts_column_schema.is_nullable)
|
||||
.default_constraint(ts_column_schema.default_constraint().cloned())
|
||||
.is_nullable(ts_column_schema.is_nullable())
|
||||
.build()
|
||||
.context(BuildColumnDescriptorSnafu {
|
||||
column_name: &ts_column_schema.name,
|
||||
@@ -169,8 +169,8 @@ fn build_row_key_desc(
|
||||
column_schema.name.clone(),
|
||||
column_schema.data_type.clone(),
|
||||
)
|
||||
.default_constraint(column_schema.default_constraint.clone())
|
||||
.is_nullable(column_schema.is_nullable)
|
||||
.default_constraint(column_schema.default_constraint().cloned())
|
||||
.is_nullable(column_schema.is_nullable())
|
||||
.build()
|
||||
.context(BuildColumnDescriptorSnafu {
|
||||
column_name: &column_schema.name,
|
||||
@@ -212,8 +212,8 @@ fn build_column_family(
|
||||
column_schema.name.clone(),
|
||||
column_schema.data_type.clone(),
|
||||
)
|
||||
.default_constraint(column_schema.default_constraint.clone())
|
||||
.is_nullable(column_schema.is_nullable)
|
||||
.default_constraint(column_schema.default_constraint().cloned())
|
||||
.is_nullable(column_schema.is_nullable())
|
||||
.build()
|
||||
.context(BuildColumnDescriptorSnafu {
|
||||
column_name: &column_schema.name,
|
||||
@@ -444,7 +444,8 @@ mod tests {
|
||||
let column_schemas = vec![
|
||||
ColumnSchema::new("name", ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new("n", ConcreteDataType::int32_datatype(), true)
|
||||
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(42i32)))),
|
||||
.with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(42i32))))
|
||||
.unwrap(),
|
||||
ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_datatype(common_time::timestamp::TimeUnit::Millisecond),
|
||||
|
||||
@@ -173,8 +173,11 @@ pub enum Error {
|
||||
column_qualified_name: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Unsupported column default constraint: {}", expr))]
|
||||
UnsupportedDefaultConstraint { expr: String, backtrace: Backtrace },
|
||||
#[snafu(display("Unsupported column default constraint, source: {}", source))]
|
||||
UnsupportedDefaultConstraint {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
impl From<Error> for table::error::Error {
|
||||
|
||||
@@ -11,12 +11,8 @@ use common_query::logical_plan::Expr;
|
||||
use common_recordbatch::error::{Error as RecordBatchError, Result as RecordBatchResult};
|
||||
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
|
||||
use common_telemetry::logging;
|
||||
use common_time::util;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::data_type::DataType;
|
||||
use datatypes::prelude::ScalarVector;
|
||||
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder};
|
||||
use datatypes::vectors::{ConstantVector, TimestampVector, VectorRef};
|
||||
use datatypes::schema::{ColumnSchema, SchemaBuilder};
|
||||
use datatypes::vectors::VectorRef;
|
||||
use futures::task::{Context, Poll};
|
||||
use futures::Stream;
|
||||
use object_store::ObjectStore;
|
||||
@@ -85,39 +81,35 @@ impl<R: Region> Table for MitoTable<R> {
|
||||
// columns_values is not empty, it's safe to unwrap
|
||||
let rows_num = columns_values.values().next().unwrap().len();
|
||||
|
||||
//Add row key and columns
|
||||
// Add row key columns
|
||||
for name in key_columns {
|
||||
let column_schema = schema
|
||||
.column_schema_by_name(name)
|
||||
.expect("column schema not found");
|
||||
|
||||
let vector = columns_values.remove(name).or_else(|| {
|
||||
Self::try_get_column_default_constraint_vector(column_schema, rows_num).ok()?
|
||||
});
|
||||
let vector = match columns_values.remove(name) {
|
||||
Some(v) => v,
|
||||
None => Self::try_get_column_default_constraint_vector(column_schema, rows_num)?,
|
||||
};
|
||||
|
||||
if let Some(vector) = vector {
|
||||
put_op
|
||||
.add_key_column(name, vector)
|
||||
.map_err(TableError::new)?;
|
||||
} else if !column_schema.is_nullable {
|
||||
return MissingColumnSnafu { name }.fail()?;
|
||||
}
|
||||
put_op
|
||||
.add_key_column(name, vector)
|
||||
.map_err(TableError::new)?;
|
||||
}
|
||||
|
||||
// Add value columns
|
||||
for name in value_columns {
|
||||
let column_schema = schema
|
||||
.column_schema_by_name(name)
|
||||
.expect("column schema not found");
|
||||
|
||||
let vector = columns_values.remove(name).or_else(|| {
|
||||
Self::try_get_column_default_constraint_vector(column_schema, rows_num).ok()?
|
||||
});
|
||||
|
||||
if let Some(v) = vector {
|
||||
put_op.add_value_column(name, v).map_err(TableError::new)?;
|
||||
} else if !column_schema.is_nullable {
|
||||
return MissingColumnSnafu { name }.fail()?;
|
||||
}
|
||||
let vector = match columns_values.remove(name) {
|
||||
Some(v) => v,
|
||||
None => Self::try_get_column_default_constraint_vector(column_schema, rows_num)?,
|
||||
};
|
||||
put_op
|
||||
.add_value_column(name, vector)
|
||||
.map_err(TableError::new)?;
|
||||
}
|
||||
|
||||
ensure!(
|
||||
@@ -132,7 +124,7 @@ impl<R: Region> Table for MitoTable<R> {
|
||||
}
|
||||
);
|
||||
|
||||
logging::debug!(
|
||||
logging::trace!(
|
||||
"Insert into table {} with put_op: {:?}",
|
||||
table_info.name,
|
||||
put_op
|
||||
@@ -220,8 +212,8 @@ impl<R: Region> Table for MitoTable<R> {
|
||||
&new_column.name,
|
||||
new_column.data_type.clone(),
|
||||
)
|
||||
.is_nullable(new_column.is_nullable)
|
||||
.default_constraint(new_column.default_constraint.clone())
|
||||
.is_nullable(new_column.is_nullable())
|
||||
.default_constraint(new_column.default_constraint().cloned())
|
||||
.build()
|
||||
.context(error::BuildColumnDescriptorSnafu {
|
||||
table_name,
|
||||
@@ -468,41 +460,16 @@ impl<R: Region> MitoTable<R> {
|
||||
fn try_get_column_default_constraint_vector(
|
||||
column_schema: &ColumnSchema,
|
||||
rows_num: usize,
|
||||
) -> TableResult<Option<VectorRef>> {
|
||||
) -> TableResult<VectorRef> {
|
||||
// TODO(dennis): when we support altering schema, we should check the schemas difference between table and region
|
||||
if let Some(v) = &column_schema.default_constraint {
|
||||
assert!(rows_num > 0);
|
||||
let vector = column_schema
|
||||
.create_default_vector(rows_num)
|
||||
.context(UnsupportedDefaultConstraintSnafu)?
|
||||
.context(MissingColumnSnafu {
|
||||
name: &column_schema.name,
|
||||
})?;
|
||||
|
||||
match v {
|
||||
ColumnDefaultConstraint::Value(v) => {
|
||||
let mut mutable_vector = column_schema.data_type.create_mutable_vector(1);
|
||||
mutable_vector
|
||||
.push_value_ref(v.as_value_ref())
|
||||
.map_err(TableError::new)?;
|
||||
let vector =
|
||||
Arc::new(ConstantVector::new(mutable_vector.to_vector(), rows_num));
|
||||
Ok(Some(vector))
|
||||
}
|
||||
ColumnDefaultConstraint::Function(expr) => {
|
||||
match &expr[..] {
|
||||
// TODO(dennis): we only supports current_timestamp right now,
|
||||
// it's better to use a expression framework in future.
|
||||
"current_timestamp()" => {
|
||||
let vector =
|
||||
Arc::new(TimestampVector::from_slice(&[Timestamp::from_millis(
|
||||
util::current_time_millis(),
|
||||
)]));
|
||||
Ok(Some(Arc::new(ConstantVector::new(vector, rows_num))))
|
||||
}
|
||||
_ => UnsupportedDefaultConstraintSnafu { expr }
|
||||
.fail()
|
||||
.map_err(TableError::new),
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
Ok(vector)
|
||||
}
|
||||
|
||||
pub async fn open(
|
||||
|
||||
@@ -125,6 +125,7 @@ pub struct MockRegionInner {
|
||||
memtable: Arc<RwLock<MockMemtable>>,
|
||||
}
|
||||
|
||||
/// A columnar memtable, maps column name to data of that column in each row.
|
||||
type MockMemtable = HashMap<String, Vec<Value>>;
|
||||
|
||||
#[async_trait]
|
||||
@@ -189,11 +190,10 @@ impl MockRegionInner {
|
||||
{
|
||||
let mut memtable = self.memtable.write().unwrap();
|
||||
|
||||
// Now drop columns is not supported.
|
||||
let rows = memtable.values().last().unwrap().len();
|
||||
|
||||
// currently dropping columns are not supported, so we only add columns here
|
||||
for column in metadata.user_schema().column_schemas() {
|
||||
let _ = memtable
|
||||
memtable
|
||||
.entry(column.name.clone())
|
||||
.or_insert_with(|| vec![Value::Null; rows]);
|
||||
}
|
||||
@@ -211,8 +211,6 @@ impl MockRegionInner {
|
||||
let column = memtable.get_mut(name).unwrap();
|
||||
if let Some(data) = put.column_by_name(name) {
|
||||
(0..data.len()).for_each(|i| column.push(data.get(i)));
|
||||
} else {
|
||||
column.extend_from_slice(&vec![Value::Null; put.num_rows()]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user