mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat: Add region schema for storage engine (#171)
* refactor: Merge RowKeyMetadata into ColumnsMetadata Now RowKeyMetadata and ColumnsMetadata are almost always being used together, no need to separate them into two structs. Now they are combined into the single ColumnsMetadata struct. chore: Make some fields of metadata private feat: Replace schema in RegionMetadata by RegionSchema The internal schema of a region should have the knownledge about all internal columns that are reserved and used by the storage engine, such as sequence, value type. So we introduce the `RegionSchema`, and it would holds a `SchemaRef` that only contains the columns that user could see. feat: Value derives Serialize and supports converting into json value feat: Add version to schema The schema version has an initial value 0 and would bump each time the schema being altered. feat: Adds internal columns to region metadata Introduce the concept of reserved columns and internal columns. Reserved columns are columns that their names, ids are reserved by the storage engine, and could not be used by the user. Reserved columns usually have special usage. Reserved columns expect the version columns are also called internal columns (though the version could also be thought as a special kind of internal column), are not visible to user, such as our internal sequence, value_type columns. The RegionMetadataBuilder always push internal columns used by the engine to the columns in metadata. Internal columns are all stored behind all user columns in the columns vector. To avoid column id collision, the id reserved for columns has the most significant bit set to 1. And the RegionMetadataBuilder would check the uniqueness of the column id. chore: Rebase develop and fix compile error feat: add internal schema to region schema feat: Add SchemaBuilder to build Schema feat: Store row key end in region schema metadata Also move the arrow schema construction to region::schema mod feat: Add SstSchema refactor: Replace MemtableSchema by RegionSchema Now when writing sst files, we could use the arrow schema from our sst schema, which contains the internal columns. feat: Use SstSchema to read parquet Adds user_column_end to metadata. When reading parquet file, converts the arrow schema into SstSchema, then uses the row_key_end and user_column_end to find out row key parts, value parts and internal columns, instead of using the timestamp index, which may yields incorrect index if we don't put the timestamp at the end of row key. Move conversion from Batch to arrow Chunk to SstSchema, so SST mod doesn't need to care the order of key, value and internal columns. test: Add test for Value to serde_json::Value feat: Add RawRegionMetadata to persist RegionMetadata test: Add test to RegionSchema fix: Fix clippy To fix clippy::enum_clike_unportable_variant lint, define the column id offset in ReservedColumnType and compute the final column id in ReservedColumnId's const method refactor: Move batch/chunk conversion to SstSchema The parquet ChunkStream now holds the SstSchema and use its method to convert Chunk into Batch. chore: Address CR comment Also add a test for pushing internal column to RegionMetadataBuilder chore: Address CR comment chore: Use bitwise or to compute column id * chore: Address CR comment
This commit is contained in:
@@ -81,12 +81,6 @@ pub enum Error {
|
||||
source: common_recordbatch::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build table schema for system catalog table"))]
|
||||
SystemCatalogSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to insert table creation record to system catalog, source: {}",
|
||||
source
|
||||
@@ -116,7 +110,7 @@ impl ErrorExt for Error {
|
||||
| Error::OpenTable { .. }
|
||||
| Error::ReadSystemCatalog { .. }
|
||||
| Error::InsertTableRecord { .. } => StatusCode::StorageUnavailable,
|
||||
Error::RegisterTable { .. } | Error::SystemCatalogSchema { .. } => StatusCode::Internal,
|
||||
Error::RegisterTable { .. } => StatusCode::Internal,
|
||||
Error::TableExists { .. } => StatusCode::TableAlreadyExists,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_telemetry::debug;
|
||||
use common_time::util;
|
||||
use datatypes::prelude::{ConcreteDataType, ScalarVector};
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
|
||||
use datatypes::vectors::{BinaryVector, Int64Vector, UInt8Vector};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
@@ -22,7 +22,7 @@ use crate::consts::{
|
||||
};
|
||||
use crate::error::{
|
||||
CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InvalidEntryTypeSnafu, InvalidKeySnafu,
|
||||
OpenSystemCatalogSnafu, Result, SystemCatalogSchemaSnafu, ValueDeserializeSnafu,
|
||||
OpenSystemCatalogSnafu, Result, ValueDeserializeSnafu,
|
||||
};
|
||||
|
||||
pub const ENTRY_TYPE_INDEX: usize = 0;
|
||||
@@ -68,7 +68,7 @@ impl SystemCatalogTable {
|
||||
table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
|
||||
table_id: SYSTEM_CATALOG_TABLE_ID,
|
||||
};
|
||||
let schema = Arc::new(build_system_catalog_schema()?);
|
||||
let schema = Arc::new(build_system_catalog_schema());
|
||||
let ctx = EngineContext::default();
|
||||
|
||||
if let Some(table) = engine
|
||||
@@ -107,12 +107,14 @@ impl SystemCatalogTable {
|
||||
}
|
||||
|
||||
/// Build system catalog table schema.
|
||||
/// A system catalog table consists of 4 columns, namely
|
||||
/// A system catalog table consists of 6 columns, namely
|
||||
/// - entry_type: type of entry in current row, can be any variant of [EntryType].
|
||||
/// - key: a binary encoded key of entry, differs according to different entry type.
|
||||
/// - timestamp: currently not used.
|
||||
/// - value: JSON-encoded value of entry's metadata.
|
||||
fn build_system_catalog_schema() -> Result<Schema> {
|
||||
/// - gmt_created: create time of this metadata.
|
||||
/// - gmt_modified: last updated time of this metadata.
|
||||
fn build_system_catalog_schema() -> Schema {
|
||||
let cols = vec![
|
||||
ColumnSchema::new(
|
||||
"entry_type".to_string(),
|
||||
@@ -146,7 +148,11 @@ fn build_system_catalog_schema() -> Result<Schema> {
|
||||
),
|
||||
];
|
||||
|
||||
Schema::with_timestamp_index(cols, TIMESTAMP_INDEX).context(SystemCatalogSchemaSnafu)
|
||||
// The schema of this table must be valid.
|
||||
SchemaBuilder::from(cols)
|
||||
.timestamp_index(2)
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> InsertRequest {
|
||||
|
||||
@@ -193,7 +193,7 @@ mod tests {
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use datatypes::{
|
||||
data_type::ConcreteDataType,
|
||||
schema::{ColumnSchema, Schema, SchemaRef},
|
||||
schema::{ColumnSchema, SchemaBuilder, SchemaRef},
|
||||
value::Value,
|
||||
};
|
||||
use table::error::Result as TableResult;
|
||||
@@ -280,7 +280,12 @@ mod tests {
|
||||
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
|
||||
];
|
||||
|
||||
Arc::new(Schema::with_timestamp_index(column_schemas, 3).unwrap())
|
||||
Arc::new(
|
||||
SchemaBuilder::from(column_schemas)
|
||||
.timestamp_index(3)
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
}
|
||||
async fn scan(
|
||||
&self,
|
||||
|
||||
@@ -64,7 +64,7 @@ mod tests {
|
||||
use common_query::logical_plan::Expr;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
|
||||
use datatypes::value::Value;
|
||||
use log_store::fs::noop::NoopLogStore;
|
||||
use object_store::{backend::fs::Backend, ObjectStore};
|
||||
@@ -96,7 +96,12 @@ mod tests {
|
||||
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
|
||||
];
|
||||
|
||||
Arc::new(Schema::with_timestamp_index(column_schemas, 3).unwrap())
|
||||
Arc::new(
|
||||
SchemaBuilder::from(column_schemas)
|
||||
.timestamp_index(3)
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
}
|
||||
async fn scan(
|
||||
&self,
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::sync::Arc;
|
||||
use catalog::RegisterTableRequest;
|
||||
use common_telemetry::tracing::info;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use datatypes::schema::{ColumnSchema, SchemaBuilder};
|
||||
use query::query_engine::Output;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use sql::ast::{ColumnDef, ColumnOption, DataType as SqlDataType, ObjectName, TableConstraint};
|
||||
@@ -134,7 +134,10 @@ impl<Engine: TableEngine> SqlHandler<Engine> {
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let schema = Arc::new(
|
||||
Schema::with_timestamp_index(columns_schemas, ts_index).context(CreateSchemaSnafu)?,
|
||||
SchemaBuilder::from(columns_schemas)
|
||||
.timestamp_index(ts_index)
|
||||
.build()
|
||||
.context(CreateSchemaSnafu)?,
|
||||
);
|
||||
|
||||
let request = CreateTableRequest {
|
||||
|
||||
@@ -40,7 +40,7 @@ async fn test_sql_api() {
|
||||
let body = res.text().await;
|
||||
assert_eq!(
|
||||
body,
|
||||
r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"number","data_type":"UInt32","is_nullable":false,"metadata":{}}],"metadata":{}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}]}}"#
|
||||
r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"number","data_type":"UInt32","is_nullable":false,"metadata":{}}],"metadata":{"greptime:version":"0"}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}]}}"#
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
|
||||
use datanode::error::{CreateTableSnafu, Result};
|
||||
use datanode::instance::Instance;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use datatypes::schema::{ColumnSchema, SchemaBuilder};
|
||||
use snafu::ResultExt;
|
||||
use table::engine::EngineContext;
|
||||
use table::engine::TableEngineRef;
|
||||
@@ -62,7 +62,9 @@ pub async fn create_test_table(instance: &Instance) -> Result<()> {
|
||||
table_name: table_name.to_string(),
|
||||
desc: Some(" a test table".to_string()),
|
||||
schema: Arc::new(
|
||||
Schema::with_timestamp_index(column_schemas, 3)
|
||||
SchemaBuilder::from(column_schemas)
|
||||
.timestamp_index(3)
|
||||
.build()
|
||||
.expect("ts is expected to be timestamp column"),
|
||||
),
|
||||
create_if_not_exists: true,
|
||||
|
||||
@@ -42,6 +42,17 @@ pub enum Error {
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to parse version in schema meta, value: {}, source: {}",
|
||||
value,
|
||||
source
|
||||
))]
|
||||
ParseSchemaVersion {
|
||||
value: String,
|
||||
source: std::num::ParseIntError,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid timestamp index: {}", index))]
|
||||
InvalidTimestampIndex { index: usize, backtrace: Backtrace },
|
||||
}
|
||||
|
||||
@@ -15,3 +15,4 @@ pub mod value;
|
||||
pub mod vectors;
|
||||
|
||||
pub use arrow;
|
||||
pub use error::{Error, Result};
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::datatypes::{Field, Metadata, Schema as ArrowSchema};
|
||||
pub use arrow::datatypes::Metadata;
|
||||
use arrow::datatypes::{Field, Schema as ArrowSchema};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
@@ -9,9 +10,7 @@ use crate::data_type::{ConcreteDataType, DataType};
|
||||
use crate::error::{self, Error, Result};
|
||||
|
||||
const TIMESTAMP_INDEX_KEY: &str = "greptime:timestamp_index";
|
||||
|
||||
// TODO(yingwen): consider assign a version to schema so compare schema can be
|
||||
// done by compare version.
|
||||
const VERSION_KEY: &str = "greptime:version";
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct ColumnSchema {
|
||||
@@ -34,6 +33,7 @@ impl ColumnSchema {
|
||||
}
|
||||
}
|
||||
|
||||
/// A common schema, should be immutable.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct Schema {
|
||||
column_schemas: Vec<ColumnSchema>,
|
||||
@@ -44,43 +44,27 @@ pub struct Schema {
|
||||
/// Timestamp key column is the column holds the timestamp and forms part of
|
||||
/// the primary key. None means there is no timestamp key column.
|
||||
timestamp_index: Option<usize>,
|
||||
/// Version of the schema.
|
||||
///
|
||||
/// Initial value is zero. The version should bump after altering schema.
|
||||
version: u32,
|
||||
}
|
||||
|
||||
impl Schema {
|
||||
/// Initial version of the schema.
|
||||
pub const INITIAL_VERSION: u32 = 0;
|
||||
|
||||
pub fn new(column_schemas: Vec<ColumnSchema>) -> Schema {
|
||||
let (arrow_schema, name_to_index) = collect_column_schemas(&column_schemas);
|
||||
|
||||
Schema {
|
||||
column_schemas,
|
||||
name_to_index,
|
||||
arrow_schema: Arc::new(arrow_schema),
|
||||
timestamp_index: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_timestamp_index(
|
||||
column_schemas: Vec<ColumnSchema>,
|
||||
timestamp_index: usize,
|
||||
) -> Result<Schema> {
|
||||
let (arrow_schema, name_to_index) = collect_column_schemas(&column_schemas);
|
||||
let mut metadata = BTreeMap::new();
|
||||
metadata.insert(TIMESTAMP_INDEX_KEY.to_string(), timestamp_index.to_string());
|
||||
let arrow_schema = Arc::new(arrow_schema.with_metadata(metadata));
|
||||
|
||||
validate_timestamp_index(&column_schemas, timestamp_index)?;
|
||||
|
||||
Ok(Schema {
|
||||
column_schemas,
|
||||
name_to_index,
|
||||
arrow_schema,
|
||||
timestamp_index: Some(timestamp_index),
|
||||
})
|
||||
// Builder won't fail
|
||||
SchemaBuilder::from(column_schemas).build().unwrap()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
|
||||
&self.arrow_schema
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn column_schemas(&self) -> &[ColumnSchema] {
|
||||
&self.column_schemas
|
||||
}
|
||||
@@ -106,11 +90,89 @@ impl Schema {
|
||||
pub fn timestamp_column(&self) -> Option<&ColumnSchema> {
|
||||
self.timestamp_index.map(|idx| &self.column_schemas[idx])
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn version(&self) -> u32 {
|
||||
self.version
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn metadata(&self) -> &Metadata {
|
||||
&self.arrow_schema.metadata
|
||||
}
|
||||
}
|
||||
|
||||
fn collect_column_schemas(
|
||||
column_schemas: &[ColumnSchema],
|
||||
) -> (ArrowSchema, HashMap<String, usize>) {
|
||||
#[derive(Default)]
|
||||
pub struct SchemaBuilder {
|
||||
column_schemas: Vec<ColumnSchema>,
|
||||
name_to_index: HashMap<String, usize>,
|
||||
fields: Vec<Field>,
|
||||
timestamp_index: Option<usize>,
|
||||
version: u32,
|
||||
metadata: Metadata,
|
||||
}
|
||||
|
||||
impl From<Vec<ColumnSchema>> for SchemaBuilder {
|
||||
fn from(column_schemas: Vec<ColumnSchema>) -> SchemaBuilder {
|
||||
SchemaBuilder::from_columns(column_schemas)
|
||||
}
|
||||
}
|
||||
|
||||
impl SchemaBuilder {
|
||||
pub fn from_columns(column_schemas: Vec<ColumnSchema>) -> Self {
|
||||
let (fields, name_to_index) = collect_fields(&column_schemas);
|
||||
|
||||
Self {
|
||||
column_schemas,
|
||||
name_to_index,
|
||||
fields,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Set timestamp index.
|
||||
///
|
||||
/// The validation of timestamp column is done in `build()`.
|
||||
pub fn timestamp_index(mut self, timestamp_index: usize) -> Self {
|
||||
self.timestamp_index = Some(timestamp_index);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn version(mut self, version: u32) -> Self {
|
||||
self.version = version;
|
||||
self
|
||||
}
|
||||
|
||||
/// Add key value pair to metadata.
|
||||
///
|
||||
/// Old metadata with same key would be overwritten.
|
||||
pub fn add_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
|
||||
self.metadata.insert(key.into(), value.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(mut self) -> Result<Schema> {
|
||||
if let Some(timestamp_index) = self.timestamp_index {
|
||||
validate_timestamp_index(&self.column_schemas, timestamp_index)?;
|
||||
self.metadata
|
||||
.insert(TIMESTAMP_INDEX_KEY.to_string(), timestamp_index.to_string());
|
||||
}
|
||||
self.metadata
|
||||
.insert(VERSION_KEY.to_string(), self.version.to_string());
|
||||
|
||||
let arrow_schema = ArrowSchema::from(self.fields).with_metadata(self.metadata);
|
||||
|
||||
Ok(Schema {
|
||||
column_schemas: self.column_schemas,
|
||||
name_to_index: self.name_to_index,
|
||||
arrow_schema: Arc::new(arrow_schema),
|
||||
timestamp_index: self.timestamp_index,
|
||||
version: self.version,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn collect_fields(column_schemas: &[ColumnSchema]) -> (Vec<Field>, HashMap<String, usize>) {
|
||||
let mut fields = Vec::with_capacity(column_schemas.len());
|
||||
let mut name_to_index = HashMap::with_capacity(column_schemas.len());
|
||||
for (index, column_schema) in column_schemas.iter().enumerate() {
|
||||
@@ -119,7 +181,7 @@ fn collect_column_schemas(
|
||||
name_to_index.insert(column_schema.name.clone(), index);
|
||||
}
|
||||
|
||||
(ArrowSchema::from(fields), name_to_index)
|
||||
(fields, name_to_index)
|
||||
}
|
||||
|
||||
fn validate_timestamp_index(column_schemas: &[ColumnSchema], timestamp_index: usize) -> Result<()> {
|
||||
@@ -183,16 +245,28 @@ impl TryFrom<Arc<ArrowSchema>> for Schema {
|
||||
if let Some(index) = timestamp_index {
|
||||
validate_timestamp_index(&column_schemas, index)?;
|
||||
}
|
||||
let version = try_parse_version(&arrow_schema.metadata, VERSION_KEY)?;
|
||||
|
||||
Ok(Self {
|
||||
column_schemas,
|
||||
name_to_index,
|
||||
arrow_schema,
|
||||
timestamp_index,
|
||||
version,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ArrowSchema> for Schema {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(arrow_schema: ArrowSchema) -> Result<Schema> {
|
||||
let arrow_schema = Arc::new(arrow_schema);
|
||||
|
||||
Schema::try_from(arrow_schema)
|
||||
}
|
||||
}
|
||||
|
||||
fn try_parse_index(metadata: &Metadata, key: &str) -> Result<Option<usize>> {
|
||||
if let Some(value) = metadata.get(key) {
|
||||
let index = value
|
||||
@@ -205,6 +279,18 @@ fn try_parse_index(metadata: &Metadata, key: &str) -> Result<Option<usize>> {
|
||||
}
|
||||
}
|
||||
|
||||
fn try_parse_version(metadata: &Metadata, key: &str) -> Result<u32> {
|
||||
if let Some(value) = metadata.get(key) {
|
||||
let version = value
|
||||
.parse()
|
||||
.context(error::ParseSchemaVersionSnafu { value })?;
|
||||
|
||||
Ok(version)
|
||||
} else {
|
||||
Ok(Schema::INITIAL_VERSION)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
@@ -223,6 +309,14 @@ mod tests {
|
||||
assert_eq!(column_schema, new_column_schema);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_empty_schema() {
|
||||
let schema = SchemaBuilder::default().build().unwrap();
|
||||
assert_eq!(0, schema.num_columns());
|
||||
|
||||
assert!(SchemaBuilder::default().timestamp_index(0).build().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_schema_no_timestamp() {
|
||||
let column_schemas = vec![
|
||||
@@ -234,6 +328,7 @@ mod tests {
|
||||
assert_eq!(2, schema.num_columns());
|
||||
assert!(schema.timestamp_index().is_none());
|
||||
assert!(schema.timestamp_column().is_none());
|
||||
assert_eq!(Schema::INITIAL_VERSION, schema.version());
|
||||
|
||||
for column_schema in &column_schemas {
|
||||
let found = schema.column_schema_by_name(&column_schema.name).unwrap();
|
||||
@@ -241,15 +336,25 @@ mod tests {
|
||||
}
|
||||
assert!(schema.column_schema_by_name("col3").is_none());
|
||||
|
||||
let fields: Vec<_> = column_schemas.iter().map(Field::from).collect();
|
||||
let arrow_schema = Arc::new(ArrowSchema::from(fields));
|
||||
|
||||
let new_schema = Schema::try_from(arrow_schema.clone()).unwrap();
|
||||
let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
|
||||
|
||||
assert_eq!(schema, new_schema);
|
||||
assert_eq!(column_schemas, schema.column_schemas());
|
||||
assert_eq!(arrow_schema, *schema.arrow_schema());
|
||||
assert_eq!(arrow_schema, *new_schema.arrow_schema());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_metadata() {
|
||||
let column_schemas = vec![ColumnSchema::new(
|
||||
"col1",
|
||||
ConcreteDataType::int32_datatype(),
|
||||
false,
|
||||
)];
|
||||
let schema = SchemaBuilder::from(column_schemas)
|
||||
.add_metadata("k1", "v1")
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!("v1", schema.metadata().get("k1").unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -258,10 +363,15 @@ mod tests {
|
||||
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
|
||||
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false),
|
||||
];
|
||||
let schema = Schema::with_timestamp_index(column_schemas.clone(), 1).unwrap();
|
||||
let schema = SchemaBuilder::from(column_schemas.clone())
|
||||
.timestamp_index(1)
|
||||
.version(123)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(1, schema.timestamp_index().unwrap());
|
||||
assert_eq!(&column_schemas[1], schema.timestamp_column().unwrap());
|
||||
assert_eq!(123, schema.version());
|
||||
|
||||
let new_schema = Schema::try_from(schema.arrow_schema().clone()).unwrap();
|
||||
assert_eq!(1, schema.timestamp_index().unwrap());
|
||||
@@ -274,8 +384,17 @@ mod tests {
|
||||
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
|
||||
ColumnSchema::new("col2", ConcreteDataType::float64_datatype(), false),
|
||||
];
|
||||
assert!(Schema::with_timestamp_index(column_schemas.clone(), 0).is_err());
|
||||
assert!(Schema::with_timestamp_index(column_schemas.clone(), 1).is_err());
|
||||
assert!(Schema::with_timestamp_index(column_schemas, 2).is_err());
|
||||
assert!(SchemaBuilder::from(column_schemas.clone())
|
||||
.timestamp_index(0)
|
||||
.build()
|
||||
.is_err());
|
||||
assert!(SchemaBuilder::from(column_schemas.clone())
|
||||
.timestamp_index(1)
|
||||
.build()
|
||||
.is_err());
|
||||
assert!(SchemaBuilder::from(column_schemas)
|
||||
.timestamp_index(1)
|
||||
.build()
|
||||
.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::cmp::Ordering;
|
||||
use common_base::bytes::{Bytes, StringBytes};
|
||||
use datafusion_common::ScalarValue;
|
||||
pub use ordered_float::OrderedFloat;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::prelude::*;
|
||||
|
||||
@@ -15,7 +15,7 @@ pub type OrderedF64 = OrderedFloat<f64>;
|
||||
/// Although compare Value with different data type is allowed, it is recommended to only
|
||||
/// compare Value with same data type. Comparing Value with different data type may not
|
||||
/// behaves as what you expect.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
pub enum Value {
|
||||
Null,
|
||||
|
||||
@@ -132,30 +132,31 @@ impl From<&[u8]> for Value {
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for Value {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
match self {
|
||||
Value::Null => serde_json::Value::Null.serialize(serializer),
|
||||
Value::Boolean(v) => v.serialize(serializer),
|
||||
Value::UInt8(v) => v.serialize(serializer),
|
||||
Value::UInt16(v) => v.serialize(serializer),
|
||||
Value::UInt32(v) => v.serialize(serializer),
|
||||
Value::UInt64(v) => v.serialize(serializer),
|
||||
Value::Int8(v) => v.serialize(serializer),
|
||||
Value::Int16(v) => v.serialize(serializer),
|
||||
Value::Int32(v) => v.serialize(serializer),
|
||||
Value::Int64(v) => v.serialize(serializer),
|
||||
Value::Float32(v) => v.serialize(serializer),
|
||||
Value::Float64(v) => v.serialize(serializer),
|
||||
Value::String(bytes) => bytes.serialize(serializer),
|
||||
Value::Binary(bytes) => bytes.serialize(serializer),
|
||||
Value::Date(v) => v.serialize(serializer),
|
||||
Value::DateTime(v) => v.serialize(serializer),
|
||||
Value::List(_) => unimplemented!(),
|
||||
}
|
||||
impl TryFrom<Value> for serde_json::Value {
|
||||
type Error = serde_json::Error;
|
||||
|
||||
fn try_from(value: Value) -> serde_json::Result<serde_json::Value> {
|
||||
let json_value = match value {
|
||||
Value::Null => serde_json::Value::Null,
|
||||
Value::Boolean(v) => serde_json::Value::Bool(v),
|
||||
Value::UInt8(v) => serde_json::Value::from(v),
|
||||
Value::UInt16(v) => serde_json::Value::from(v),
|
||||
Value::UInt32(v) => serde_json::Value::from(v),
|
||||
Value::UInt64(v) => serde_json::Value::from(v),
|
||||
Value::Int8(v) => serde_json::Value::from(v),
|
||||
Value::Int16(v) => serde_json::Value::from(v),
|
||||
Value::Int32(v) => serde_json::Value::from(v),
|
||||
Value::Int64(v) => serde_json::Value::from(v),
|
||||
Value::Float32(v) => serde_json::Value::from(v.0),
|
||||
Value::Float64(v) => serde_json::Value::from(v.0),
|
||||
Value::String(bytes) => serde_json::Value::String(bytes.as_utf8().to_string()),
|
||||
Value::Binary(bytes) => serde_json::to_value(bytes)?,
|
||||
Value::Date(v) => serde_json::Value::Number(v.into()),
|
||||
Value::DateTime(v) => serde_json::Value::Number(v.into()),
|
||||
Value::List(v) => serde_json::to_value(v)?,
|
||||
};
|
||||
|
||||
Ok(json_value)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -437,4 +438,80 @@ mod tests {
|
||||
|
||||
assert_eq!(ScalarValue::Boolean(None), Value::Null.into());
|
||||
}
|
||||
|
||||
fn to_json(value: Value) -> serde_json::Value {
|
||||
value.try_into().unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_to_json_value() {
|
||||
assert_eq!(serde_json::Value::Null, to_json(Value::Null));
|
||||
assert_eq!(serde_json::Value::Bool(true), to_json(Value::Boolean(true)));
|
||||
assert_eq!(
|
||||
serde_json::Value::Number(20u8.into()),
|
||||
to_json(Value::UInt8(20))
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::Value::Number(20i8.into()),
|
||||
to_json(Value::Int8(20))
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::Value::Number(2000u16.into()),
|
||||
to_json(Value::UInt16(2000))
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::Value::Number(2000i16.into()),
|
||||
to_json(Value::Int16(2000))
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::Value::Number(3000u32.into()),
|
||||
to_json(Value::UInt32(3000))
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::Value::Number(3000i32.into()),
|
||||
to_json(Value::Int32(3000))
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::Value::Number(4000u64.into()),
|
||||
to_json(Value::UInt64(4000))
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::Value::Number(4000i64.into()),
|
||||
to_json(Value::Int64(4000))
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::Value::from(125.0f32),
|
||||
to_json(Value::Float32(125.0.into()))
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::Value::from(125.0f64),
|
||||
to_json(Value::Float64(125.0.into()))
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::Value::String(String::from("hello")),
|
||||
to_json(Value::String(StringBytes::from("hello")))
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::Value::from(b"world".as_slice()),
|
||||
to_json(Value::Binary(Bytes::from(b"world".as_slice())))
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::Value::Number(5000i32.into()),
|
||||
to_json(Value::Date(5000))
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::Value::Number(5000i64.into()),
|
||||
to_json(Value::DateTime(5000))
|
||||
);
|
||||
|
||||
let json_value: serde_json::Value =
|
||||
serde_json::from_str(r#"{"items":[{"Int32":123}],"datatype":{"Int32":{}}}"#).unwrap();
|
||||
assert_eq!(
|
||||
json_value,
|
||||
to_json(Value::List(ListValue {
|
||||
items: Some(Box::new(vec![Value::Int32(123)])),
|
||||
datatype: ConcreteDataType::int32_datatype(),
|
||||
}))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,7 +114,7 @@ impl Serializable for ConstantVector {
|
||||
fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
|
||||
std::iter::repeat(self.try_get(0)?)
|
||||
.take(self.len())
|
||||
.map(serde_json::to_value)
|
||||
.map(serde_json::Value::try_from)
|
||||
.collect::<serde_json::Result<_>>()
|
||||
.context(SerializeSnafu)
|
||||
}
|
||||
|
||||
@@ -4,21 +4,23 @@ pub mod schema_util;
|
||||
|
||||
use datatypes::type_id::LogicalTypeId;
|
||||
use storage::{
|
||||
memtable::{DefaultMemtableBuilder, MemtableBuilder, MemtableRef, MemtableSchema},
|
||||
memtable::{DefaultMemtableBuilder, MemtableBuilder, MemtableRef},
|
||||
metadata::RegionMetadata,
|
||||
schema::RegionSchemaRef,
|
||||
};
|
||||
|
||||
use crate::memtable::util::regiondesc_util::RegionDescBuilder;
|
||||
|
||||
pub const TIMESTAMP_NAME: &str = "timestamp";
|
||||
|
||||
pub fn schema_for_test() -> MemtableSchema {
|
||||
pub fn schema_for_test() -> RegionSchemaRef {
|
||||
let desc = RegionDescBuilder::new("bench")
|
||||
.push_value_column(("v1", LogicalTypeId::UInt64, true))
|
||||
.push_value_column(("v2", LogicalTypeId::String, true))
|
||||
.build();
|
||||
let metadata: RegionMetadata = desc.try_into().unwrap();
|
||||
MemtableSchema::new(metadata.columns_row_key)
|
||||
|
||||
metadata.schema().clone()
|
||||
}
|
||||
|
||||
pub fn new_memtable() -> MemtableRef {
|
||||
|
||||
@@ -175,14 +175,10 @@ pub enum Error {
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Parquet file schema is not valid: {}", msg))]
|
||||
SequenceColumnNotFound { msg: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Parquet file schema is not valid, msg: {}, source: {}", msg, source))]
|
||||
#[snafu(display("Parquet file schema is invalid, source: {}", source))]
|
||||
InvalidParquetSchema {
|
||||
msg: String,
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
source: crate::schema::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Region is under {} state, cannot proceed operation", state))]
|
||||
@@ -221,6 +217,20 @@ pub enum Error {
|
||||
given: SequenceNumber,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert sst schema, file: {}, source: {}", file, source))]
|
||||
ConvertSstSchema {
|
||||
file: String,
|
||||
#[snafu(backtrace)]
|
||||
source: crate::schema::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid raw region metadata, region: {}, source: {}", region, source))]
|
||||
InvalidRawRegion {
|
||||
region: String,
|
||||
#[snafu(backtrace)]
|
||||
source: MetadataError,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -245,10 +255,11 @@ impl ErrorExt for Error {
|
||||
| DecodeMetaActionList { .. }
|
||||
| Readline { .. }
|
||||
| InvalidParquetSchema { .. }
|
||||
| SequenceColumnNotFound { .. }
|
||||
| WalDataCorrupted { .. }
|
||||
| VersionNotFound { .. }
|
||||
| SequenceNotMonotonic { .. } => StatusCode::Unexpected,
|
||||
| SequenceNotMonotonic { .. }
|
||||
| ConvertSstSchema { .. }
|
||||
| InvalidRawRegion { .. } => StatusCode::Unexpected,
|
||||
|
||||
FlushIo { .. }
|
||||
| WriteParquet { .. }
|
||||
|
||||
@@ -199,7 +199,7 @@ impl<S: LogStore> FlushJob<S> {
|
||||
|
||||
async fn write_to_manifest(&self, file_metas: &[FileMeta]) -> Result<ManifestVersion> {
|
||||
let edit = RegionEdit {
|
||||
region_version: self.shared.version_control.metadata().version,
|
||||
region_version: self.shared.version_control.metadata().version(),
|
||||
flushed_sequence: self.flush_sequence,
|
||||
files_to_add: file_metas.to_vec(),
|
||||
files_to_remove: Vec::default(),
|
||||
|
||||
@@ -14,6 +14,7 @@ pub mod metadata;
|
||||
mod proto;
|
||||
mod read;
|
||||
mod region;
|
||||
pub mod schema;
|
||||
mod snapshot;
|
||||
mod sst;
|
||||
mod sync;
|
||||
|
||||
@@ -14,12 +14,38 @@ use crate::error::{
|
||||
ReadlineSnafu, Result,
|
||||
};
|
||||
use crate::manifest::helper;
|
||||
use crate::metadata::{RegionMetadataRef, VersionNumber};
|
||||
use crate::metadata::{ColumnFamilyMetadata, ColumnMetadata, VersionNumber};
|
||||
use crate::sst::FileMeta;
|
||||
|
||||
/// Minimal data that could be used to persist and recover [RegionMetadata](crate::metadata::RegionMetadata).
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct RawRegionMetadata {
|
||||
pub id: RegionId,
|
||||
pub name: String,
|
||||
pub columns: RawColumnsMetadata,
|
||||
pub column_families: RawColumnFamiliesMetadata,
|
||||
pub version: VersionNumber,
|
||||
}
|
||||
|
||||
/// Minimal data that could be used to persist and recover [ColumnsMetadata](crate::metadata::ColumnsMetadata).
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct RawColumnsMetadata {
|
||||
pub columns: Vec<ColumnMetadata>,
|
||||
pub row_key_end: usize,
|
||||
pub timestamp_key_index: usize,
|
||||
pub enable_version_column: bool,
|
||||
pub user_column_end: usize,
|
||||
}
|
||||
|
||||
/// Minimal data that could be used to persist and recover [ColumnFamiliesMetadata](crate::metadata::ColumnFamiliesMetadata).
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct RawColumnFamiliesMetadata {
|
||||
pub column_families: Vec<ColumnFamilyMetadata>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub struct RegionChange {
|
||||
pub metadata: RegionMetadataRef,
|
||||
pub metadata: RawRegionMetadata,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
@@ -35,12 +61,6 @@ pub struct RegionEdit {
|
||||
pub files_to_remove: Vec<FileMeta>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub struct RegionMetaActionList {
|
||||
pub actions: Vec<RegionMetaAction>,
|
||||
pub prev_version: ManifestVersion,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub enum RegionMetaAction {
|
||||
Protocol(ProtocolAction),
|
||||
@@ -49,6 +69,12 @@ pub enum RegionMetaAction {
|
||||
Edit(RegionEdit),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub struct RegionMetaActionList {
|
||||
pub actions: Vec<RegionMetaAction>,
|
||||
pub prev_version: ManifestVersion,
|
||||
}
|
||||
|
||||
impl RegionMetaActionList {
|
||||
pub fn with_action(action: RegionMetaAction) -> Self {
|
||||
Self {
|
||||
|
||||
@@ -14,6 +14,7 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::manifest::test_utils::*;
|
||||
use crate::metadata::RegionMetadata;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_region_manifest() {
|
||||
@@ -43,7 +44,7 @@ mod tests {
|
||||
manifest
|
||||
.update(RegionMetaActionList::with_action(RegionMetaAction::Change(
|
||||
RegionChange {
|
||||
metadata: region_meta.clone(),
|
||||
metadata: (&*region_meta).into(),
|
||||
},
|
||||
)))
|
||||
.await
|
||||
@@ -58,7 +59,10 @@ mod tests {
|
||||
|
||||
match action {
|
||||
RegionMetaAction::Change(c) => {
|
||||
assert_eq!(c.metadata, region_meta);
|
||||
assert_eq!(
|
||||
RegionMetadata::try_from(c.metadata.clone()).unwrap(),
|
||||
*region_meta
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
@@ -79,7 +83,10 @@ mod tests {
|
||||
let action = &action_list.actions[0];
|
||||
match action {
|
||||
RegionMetaAction::Change(c) => {
|
||||
assert_eq!(c.metadata, region_meta);
|
||||
assert_eq!(
|
||||
RegionMetadata::try_from(c.metadata.clone()).unwrap(),
|
||||
*region_meta
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
mod btree;
|
||||
mod inserter;
|
||||
mod schema;
|
||||
#[cfg(test)]
|
||||
pub mod tests;
|
||||
mod version;
|
||||
@@ -13,9 +12,9 @@ use store_api::storage::{consts, SequenceNumber, ValueType};
|
||||
use crate::error::Result;
|
||||
use crate::memtable::btree::BTreeMemtable;
|
||||
pub use crate::memtable::inserter::Inserter;
|
||||
pub use crate::memtable::schema::MemtableSchema;
|
||||
pub use crate::memtable::version::{MemtableSet, MemtableVersion};
|
||||
use crate::read::Batch;
|
||||
use crate::schema::RegionSchemaRef;
|
||||
|
||||
/// Unique id for memtables under same region.
|
||||
pub type MemtableId = u32;
|
||||
@@ -24,7 +23,7 @@ pub type MemtableId = u32;
|
||||
pub trait Memtable: Send + Sync + std::fmt::Debug {
|
||||
fn id(&self) -> MemtableId;
|
||||
|
||||
fn schema(&self) -> &MemtableSchema;
|
||||
fn schema(&self) -> RegionSchemaRef;
|
||||
|
||||
/// Write key/values to the memtable.
|
||||
///
|
||||
@@ -83,7 +82,7 @@ pub enum RowOrdering {
|
||||
/// as an async trait.
|
||||
pub trait BatchIterator: Iterator<Item = Result<Batch>> + Send + Sync {
|
||||
/// Returns the schema of this iterator.
|
||||
fn schema(&self) -> &MemtableSchema;
|
||||
fn schema(&self) -> RegionSchemaRef;
|
||||
|
||||
/// Returns the ordering of the output rows from this iterator.
|
||||
fn ordering(&self) -> RowOrdering;
|
||||
@@ -92,7 +91,7 @@ pub trait BatchIterator: Iterator<Item = Result<Batch>> + Send + Sync {
|
||||
pub type BoxedBatchIterator = Box<dyn BatchIterator>;
|
||||
|
||||
pub trait MemtableBuilder: Send + Sync + std::fmt::Debug {
|
||||
fn build(&self, id: MemtableId, schema: MemtableSchema) -> MemtableRef;
|
||||
fn build(&self, id: MemtableId, schema: RegionSchemaRef) -> MemtableRef;
|
||||
}
|
||||
|
||||
pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
|
||||
@@ -136,7 +135,7 @@ impl KeyValues {
|
||||
pub struct DefaultMemtableBuilder;
|
||||
|
||||
impl MemtableBuilder for DefaultMemtableBuilder {
|
||||
fn build(&self, id: MemtableId, schema: MemtableSchema) -> MemtableRef {
|
||||
fn build(&self, id: MemtableId, schema: RegionSchemaRef) -> MemtableRef {
|
||||
Arc::new(BTreeMemtable::new(id, schema))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,10 +15,10 @@ use store_api::storage::{SequenceNumber, ValueType};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::{
|
||||
BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId,
|
||||
MemtableSchema, RowOrdering,
|
||||
BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, RowOrdering,
|
||||
};
|
||||
use crate::read::Batch;
|
||||
use crate::schema::RegionSchemaRef;
|
||||
|
||||
type RwLockMap = RwLock<BTreeMap<InnerKey, RowValue>>;
|
||||
|
||||
@@ -28,13 +28,13 @@ type RwLockMap = RwLock<BTreeMap<InnerKey, RowValue>>;
|
||||
#[derive(Debug)]
|
||||
pub struct BTreeMemtable {
|
||||
id: MemtableId,
|
||||
schema: MemtableSchema,
|
||||
schema: RegionSchemaRef,
|
||||
map: Arc<RwLockMap>,
|
||||
estimated_bytes: AtomicUsize,
|
||||
}
|
||||
|
||||
impl BTreeMemtable {
|
||||
pub fn new(id: MemtableId, schema: MemtableSchema) -> BTreeMemtable {
|
||||
pub fn new(id: MemtableId, schema: RegionSchemaRef) -> BTreeMemtable {
|
||||
BTreeMemtable {
|
||||
id,
|
||||
schema,
|
||||
@@ -49,8 +49,8 @@ impl Memtable for BTreeMemtable {
|
||||
self.id
|
||||
}
|
||||
|
||||
fn schema(&self) -> &MemtableSchema {
|
||||
&self.schema
|
||||
fn schema(&self) -> RegionSchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
fn write(&self, kvs: &KeyValues) -> Result<()> {
|
||||
@@ -81,14 +81,14 @@ impl Memtable for BTreeMemtable {
|
||||
|
||||
struct BTreeIterator {
|
||||
ctx: IterContext,
|
||||
schema: MemtableSchema,
|
||||
schema: RegionSchemaRef,
|
||||
map: Arc<RwLockMap>,
|
||||
last_key: Option<InnerKey>,
|
||||
}
|
||||
|
||||
impl BatchIterator for BTreeIterator {
|
||||
fn schema(&self) -> &MemtableSchema {
|
||||
&self.schema
|
||||
fn schema(&self) -> RegionSchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
fn ordering(&self) -> RowOrdering {
|
||||
@@ -105,7 +105,7 @@ impl Iterator for BTreeIterator {
|
||||
}
|
||||
|
||||
impl BTreeIterator {
|
||||
fn new(ctx: IterContext, schema: MemtableSchema, map: Arc<RwLockMap>) -> BTreeIterator {
|
||||
fn new(ctx: IterContext, schema: RegionSchemaRef, map: Arc<RwLockMap>) -> BTreeIterator {
|
||||
BTreeIterator {
|
||||
ctx,
|
||||
schema,
|
||||
|
||||
@@ -293,10 +293,9 @@ mod tests {
|
||||
use store_api::storage::{PutOperation, WriteRequest};
|
||||
|
||||
use super::*;
|
||||
use crate::memtable::{
|
||||
DefaultMemtableBuilder, IterContext, MemtableBuilder, MemtableId, MemtableSchema,
|
||||
};
|
||||
use crate::memtable::{DefaultMemtableBuilder, IterContext, MemtableBuilder, MemtableId};
|
||||
use crate::metadata::RegionMetadata;
|
||||
use crate::schema::RegionSchemaRef;
|
||||
use crate::test_util::descriptor_util::RegionDescBuilder;
|
||||
use crate::test_util::write_batch_util;
|
||||
|
||||
@@ -558,7 +557,7 @@ mod tests {
|
||||
)
|
||||
}
|
||||
|
||||
fn new_memtable_schema() -> MemtableSchema {
|
||||
fn new_region_schema() -> RegionSchemaRef {
|
||||
let desc = RegionDescBuilder::new("test")
|
||||
.timestamp(("ts", LogicalTypeId::Int64, false))
|
||||
.push_value_column(("value", LogicalTypeId::Int64, true))
|
||||
@@ -566,7 +565,7 @@ mod tests {
|
||||
.build();
|
||||
let metadata: RegionMetadata = desc.try_into().unwrap();
|
||||
|
||||
MemtableSchema::new(metadata.columns_row_key)
|
||||
metadata.schema().clone()
|
||||
}
|
||||
|
||||
fn put_batch(batch: &mut WriteBatch, data: &[(i64, Option<i64>)]) {
|
||||
@@ -579,7 +578,7 @@ mod tests {
|
||||
batch.put(put_data).unwrap();
|
||||
}
|
||||
|
||||
fn new_memtable_set(time_ranges: &[RangeMillis], schema: &MemtableSchema) -> MemtableSet {
|
||||
fn new_memtable_set(time_ranges: &[RangeMillis], schema: &RegionSchemaRef) -> MemtableSet {
|
||||
let mut set = MemtableSet::new();
|
||||
for (id, range) in time_ranges.iter().enumerate() {
|
||||
let mem = DefaultMemtableBuilder {}.build(id as MemtableId, schema.clone());
|
||||
@@ -619,7 +618,7 @@ mod tests {
|
||||
let sequence = 11111;
|
||||
let bucket_duration = 100;
|
||||
let time_ranges = new_time_ranges(&[0], bucket_duration);
|
||||
let memtable_schema = new_memtable_schema();
|
||||
let memtable_schema = new_region_schema();
|
||||
let memtables = new_memtable_set(&time_ranges, &memtable_schema);
|
||||
let mut inserter = Inserter::new(
|
||||
sequence,
|
||||
@@ -657,7 +656,7 @@ mod tests {
|
||||
let sequence = 11111;
|
||||
let bucket_duration = 100;
|
||||
let time_ranges = new_time_ranges(&[0, 100, 200], bucket_duration);
|
||||
let memtable_schema = new_memtable_schema();
|
||||
let memtable_schema = new_region_schema();
|
||||
let memtables = new_memtable_set(&time_ranges, &memtable_schema);
|
||||
let mut inserter = Inserter::new(
|
||||
sequence,
|
||||
|
||||
@@ -1,32 +0,0 @@
|
||||
use crate::metadata::{ColumnMetadata, ColumnsRowKeyMetadataRef};
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct MemtableSchema {
|
||||
columns_row_key: ColumnsRowKeyMetadataRef,
|
||||
}
|
||||
|
||||
impl MemtableSchema {
|
||||
pub fn new(columns_row_key: ColumnsRowKeyMetadataRef) -> MemtableSchema {
|
||||
MemtableSchema { columns_row_key }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn row_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
|
||||
self.columns_row_key.iter_row_key_columns()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn value_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
|
||||
self.columns_row_key.iter_value_columns()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn num_row_key_columns(&self) -> usize {
|
||||
self.columns_row_key.num_row_key_columns()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn num_value_columns(&self) -> usize {
|
||||
self.columns_row_key.num_value_columns()
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ use datatypes::vectors::{Int64VectorBuilder, UInt64VectorBuilder};
|
||||
|
||||
use super::*;
|
||||
use crate::metadata::RegionMetadata;
|
||||
use crate::schema::RegionSchemaRef;
|
||||
use crate::test_util::descriptor_util::RegionDescBuilder;
|
||||
|
||||
// For simplicity, all memtables in test share same memtable id.
|
||||
@@ -12,15 +13,15 @@ const MEMTABLE_ID: MemtableId = 1;
|
||||
// Schema for testing memtable:
|
||||
// - key: Int64(timestamp), UInt64(version),
|
||||
// - value: UInt64
|
||||
pub fn schema_for_test() -> MemtableSchema {
|
||||
// Just build a region desc and use its columns_row_key metadata.
|
||||
pub fn schema_for_test() -> RegionSchemaRef {
|
||||
// Just build a region desc and use its columns metadata.
|
||||
let desc = RegionDescBuilder::new("test")
|
||||
.enable_version_column(true)
|
||||
.push_value_column(("v1", LogicalTypeId::UInt64, true))
|
||||
.build();
|
||||
let metadata: RegionMetadata = desc.try_into().unwrap();
|
||||
|
||||
MemtableSchema::new(metadata.columns_row_key)
|
||||
metadata.schema().clone()
|
||||
}
|
||||
|
||||
fn kvs_for_test_with_index(
|
||||
@@ -128,10 +129,8 @@ fn check_iter_content(
|
||||
assert_eq!(keys.len(), index);
|
||||
}
|
||||
|
||||
// TODO(yingwen): Check size of the returned batch.
|
||||
|
||||
struct MemtableTester {
|
||||
schema: MemtableSchema,
|
||||
schema: RegionSchemaRef,
|
||||
builders: Vec<MemtableBuilderRef>,
|
||||
}
|
||||
|
||||
@@ -172,7 +171,7 @@ impl MemtableTester {
|
||||
}
|
||||
|
||||
struct TestContext {
|
||||
schema: MemtableSchema,
|
||||
schema: RegionSchemaRef,
|
||||
memtable: MemtableRef,
|
||||
}
|
||||
|
||||
@@ -216,7 +215,7 @@ fn write_iter_memtable_case(ctx: &TestContext) {
|
||||
..Default::default()
|
||||
};
|
||||
let mut iter = ctx.memtable.iter(iter_ctx).unwrap();
|
||||
assert_eq!(ctx.schema, *iter.schema());
|
||||
assert_eq!(ctx.schema, iter.schema());
|
||||
assert_eq!(RowOrdering::Key, iter.ordering());
|
||||
|
||||
check_iter_content(
|
||||
|
||||
@@ -4,30 +4,42 @@ use std::sync::Arc;
|
||||
use common_error::prelude::*;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ensure;
|
||||
use snafu::{ensure, OptionExt};
|
||||
use store_api::storage::{
|
||||
consts, ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyId,
|
||||
ColumnId, ColumnSchema, RegionDescriptor, RegionId, RegionMeta, RowKeyDescriptor, Schema,
|
||||
SchemaRef,
|
||||
consts::{self, ReservedColumnId},
|
||||
ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyId, ColumnId,
|
||||
ColumnSchema, RegionDescriptor, RegionId, RegionMeta, RowKeyDescriptor, Schema, SchemaRef,
|
||||
};
|
||||
|
||||
use crate::manifest::action::{RawColumnFamiliesMetadata, RawColumnsMetadata, RawRegionMetadata};
|
||||
use crate::schema::{RegionSchema, RegionSchemaRef};
|
||||
|
||||
/// Error for handling metadata.
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Column name already exists, name: {}", name))]
|
||||
#[snafu(display("Column name {} already exists", name))]
|
||||
ColNameExists { name: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Column family name already exists, name: {}", name))]
|
||||
#[snafu(display("Column family name {} already exists", name))]
|
||||
CfNameExists { name: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Column family id already exists, id: {}", id))]
|
||||
#[snafu(display("Column family id {} already exists", id))]
|
||||
CfIdExists { id: ColumnId, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Column id {} already exists", id))]
|
||||
ColIdExists { id: ColumnId, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Failed to build schema, source: {}", source))]
|
||||
InvalidSchema {
|
||||
source: datatypes::error::Error,
|
||||
backtrace: Backtrace,
|
||||
#[snafu(backtrace)]
|
||||
source: crate::schema::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Column name {} is reserved by the system", name))]
|
||||
ReservedColumn { name: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Missing timestamp key column"))]
|
||||
MissingTimestamp { backtrace: Backtrace },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -48,34 +60,27 @@ impl RegionMetaImpl {
|
||||
|
||||
impl RegionMeta for RegionMetaImpl {
|
||||
fn schema(&self) -> &SchemaRef {
|
||||
&self.metadata.schema
|
||||
self.metadata.user_schema()
|
||||
}
|
||||
}
|
||||
|
||||
pub type VersionNumber = u32;
|
||||
|
||||
// TODO(yingwen): Make some fields of metadata private.
|
||||
// TODO(yingwen): We may need to hold a list of history schema.
|
||||
|
||||
/// In memory metadata of region.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct RegionMetadata {
|
||||
// The following fields are immutable.
|
||||
id: RegionId,
|
||||
name: String,
|
||||
|
||||
// The following fields are mutable.
|
||||
/// Schema of the region.
|
||||
///
|
||||
/// Holding a [SchemaRef] to allow converting into `SchemaRef`/`arrow::SchemaRef`
|
||||
/// conveniently. The fields order in `SchemaRef` **must** be consistent with
|
||||
/// columns order in [ColumnsMetadata] to ensure the projection index of a field
|
||||
/// is correct.
|
||||
pub schema: SchemaRef,
|
||||
pub columns_row_key: ColumnsRowKeyMetadataRef,
|
||||
pub column_families: ColumnFamiliesMetadata,
|
||||
/// Version of the metadata. Version is set to zero initially and bumped once the
|
||||
/// metadata have been altered.
|
||||
pub version: VersionNumber,
|
||||
/// Latest schema of the region.
|
||||
schema: RegionSchemaRef,
|
||||
pub columns: ColumnsMetadataRef,
|
||||
column_families: ColumnFamiliesMetadata,
|
||||
version: VersionNumber,
|
||||
}
|
||||
|
||||
impl RegionMetadata {
|
||||
@@ -88,66 +93,171 @@ impl RegionMetadata {
|
||||
pub fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn schema(&self) -> &RegionSchemaRef {
|
||||
&self.schema
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn user_schema(&self) -> &SchemaRef {
|
||||
self.schema.user_schema()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn version(&self) -> u32 {
|
||||
self.schema.version()
|
||||
}
|
||||
}
|
||||
|
||||
pub type RegionMetadataRef = Arc<RegionMetadata>;
|
||||
|
||||
impl From<&RegionMetadata> for RawRegionMetadata {
|
||||
fn from(data: &RegionMetadata) -> RawRegionMetadata {
|
||||
RawRegionMetadata {
|
||||
id: data.id,
|
||||
name: data.name.clone(),
|
||||
columns: (&*data.columns).into(),
|
||||
column_families: (&data.column_families).into(),
|
||||
version: data.version,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<RawRegionMetadata> for RegionMetadata {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(raw: RawRegionMetadata) -> Result<RegionMetadata> {
|
||||
let columns = Arc::new(ColumnsMetadata::from(raw.columns));
|
||||
let schema =
|
||||
Arc::new(RegionSchema::new(columns.clone(), raw.version).context(InvalidSchemaSnafu)?);
|
||||
|
||||
Ok(RegionMetadata {
|
||||
id: raw.id,
|
||||
name: raw.name,
|
||||
schema,
|
||||
columns,
|
||||
column_families: raw.column_families.into(),
|
||||
version: raw.version,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct ColumnMetadata {
|
||||
pub cf_id: ColumnFamilyId,
|
||||
pub desc: ColumnDescriptor,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct ColumnsMetadata {
|
||||
/// All columns, in `(key columns, timestamp, [version,] value columns)` order.
|
||||
/// All columns.
|
||||
///
|
||||
/// Columns order should be consistent with fields order in [SchemaRef].
|
||||
pub columns: Vec<ColumnMetadata>,
|
||||
/// Columns are organized in the following order:
|
||||
/// ```text
|
||||
/// key columns, timestamp, [version,] value columns, internal columns
|
||||
/// ```
|
||||
///
|
||||
/// The key columns, timestamp and version forms the row key.
|
||||
columns: Vec<ColumnMetadata>,
|
||||
/// Maps column name to index of columns, used to fast lookup column by name.
|
||||
pub name_to_col_index: HashMap<String, usize>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
|
||||
pub struct RowKeyMetadata {
|
||||
name_to_col_index: HashMap<String, usize>,
|
||||
/// Exclusive end index of row key columns.
|
||||
row_key_end: usize,
|
||||
/// Index of timestamp key column.
|
||||
pub timestamp_key_index: usize,
|
||||
timestamp_key_index: usize,
|
||||
/// If version column is enabled, then the last column of key columns is a
|
||||
/// version column.
|
||||
pub enable_version_column: bool,
|
||||
enable_version_column: bool,
|
||||
/// Exclusive end index of user columns.
|
||||
///
|
||||
/// Columns in `[user_column_end..)` are internal columns.
|
||||
user_column_end: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct ColumnsRowKeyMetadata {
|
||||
columns: ColumnsMetadata,
|
||||
row_key: RowKeyMetadata,
|
||||
}
|
||||
|
||||
impl ColumnsRowKeyMetadata {
|
||||
impl ColumnsMetadata {
|
||||
/// Returns an iterator to all row key columns.
|
||||
///
|
||||
/// Row key columns includes all key columns, the timestamp column and the
|
||||
/// optional version column.
|
||||
pub fn iter_row_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
|
||||
self.columns.columns.iter().take(self.row_key.row_key_end)
|
||||
self.columns.iter().take(self.row_key_end)
|
||||
}
|
||||
|
||||
/// Returns an iterator to all value columns (internal columns are excluded).
|
||||
pub fn iter_value_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
|
||||
self.columns.columns.iter().skip(self.row_key.row_key_end)
|
||||
self.columns[self.row_key_end..self.user_column_end].iter()
|
||||
}
|
||||
|
||||
pub fn iter_user_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
|
||||
self.columns.iter().take(self.user_column_end)
|
||||
}
|
||||
|
||||
pub fn iter_all_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
|
||||
self.columns.iter()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn num_row_key_columns(&self) -> usize {
|
||||
self.row_key.row_key_end
|
||||
self.row_key_end
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn num_value_columns(&self) -> usize {
|
||||
self.columns.columns.len() - self.row_key.row_key_end
|
||||
self.user_column_end - self.row_key_end
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn timestamp_key_index(&self) -> usize {
|
||||
self.timestamp_key_index
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn row_key_end(&self) -> usize {
|
||||
self.row_key_end
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn user_column_end(&self) -> usize {
|
||||
self.user_column_end
|
||||
}
|
||||
}
|
||||
|
||||
pub type ColumnsRowKeyMetadataRef = Arc<ColumnsRowKeyMetadata>;
|
||||
pub type ColumnsMetadataRef = Arc<ColumnsMetadata>;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
impl From<&ColumnsMetadata> for RawColumnsMetadata {
|
||||
fn from(data: &ColumnsMetadata) -> RawColumnsMetadata {
|
||||
RawColumnsMetadata {
|
||||
columns: data.columns.clone(),
|
||||
row_key_end: data.row_key_end,
|
||||
timestamp_key_index: data.timestamp_key_index,
|
||||
enable_version_column: data.enable_version_column,
|
||||
user_column_end: data.user_column_end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RawColumnsMetadata> for ColumnsMetadata {
|
||||
fn from(raw: RawColumnsMetadata) -> ColumnsMetadata {
|
||||
let name_to_col_index = raw
|
||||
.columns
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, col)| (col.desc.name.clone(), i))
|
||||
.collect();
|
||||
|
||||
ColumnsMetadata {
|
||||
columns: raw.columns,
|
||||
name_to_col_index,
|
||||
row_key_end: raw.row_key_end,
|
||||
timestamp_key_index: raw.timestamp_key_index,
|
||||
enable_version_column: raw.enable_version_column,
|
||||
user_column_end: raw.user_column_end,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct ColumnFamiliesMetadata {
|
||||
/// Map column family id to column family metadata.
|
||||
id_to_cfs: HashMap<ColumnFamilyId, ColumnFamilyMetadata>,
|
||||
@@ -159,6 +269,24 @@ impl ColumnFamiliesMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&ColumnFamiliesMetadata> for RawColumnFamiliesMetadata {
|
||||
fn from(data: &ColumnFamiliesMetadata) -> RawColumnFamiliesMetadata {
|
||||
let column_families = data.id_to_cfs.values().cloned().collect();
|
||||
RawColumnFamiliesMetadata { column_families }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RawColumnFamiliesMetadata> for ColumnFamiliesMetadata {
|
||||
fn from(raw: RawColumnFamiliesMetadata) -> ColumnFamiliesMetadata {
|
||||
let id_to_cfs = raw
|
||||
.column_families
|
||||
.into_iter()
|
||||
.map(|cf| (cf.cf_id, cf))
|
||||
.collect();
|
||||
ColumnFamiliesMetadata { id_to_cfs }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct ColumnFamilyMetadata {
|
||||
/// Column family name.
|
||||
@@ -189,6 +317,7 @@ impl TryFrom<RegionDescriptor> for RegionMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(yingwen): Add a builder to build ColumnMetadata and refactor this builder.
|
||||
#[derive(Default)]
|
||||
struct RegionMetadataBuilder {
|
||||
id: RegionId,
|
||||
@@ -196,8 +325,13 @@ struct RegionMetadataBuilder {
|
||||
columns: Vec<ColumnMetadata>,
|
||||
column_schemas: Vec<ColumnSchema>,
|
||||
name_to_col_index: HashMap<String, usize>,
|
||||
/// Column id set, used to validate column id uniqueness.
|
||||
column_ids: HashSet<ColumnId>,
|
||||
|
||||
row_key: RowKeyMetadata,
|
||||
// Row key metadata:
|
||||
row_key_end: usize,
|
||||
timestamp_key_index: Option<usize>,
|
||||
enable_version_column: bool,
|
||||
|
||||
id_to_cfs: HashMap<ColumnFamilyId, ColumnFamilyMetadata>,
|
||||
cf_names: HashSet<String>,
|
||||
@@ -224,7 +358,7 @@ impl RegionMetadataBuilder {
|
||||
}
|
||||
|
||||
// TODO(yingwen): Validate this is a timestamp column.
|
||||
let timestamp_key_index = self.columns.len();
|
||||
self.timestamp_key_index = Some(self.columns.len());
|
||||
self.push_row_key_column(key.timestamp)?;
|
||||
|
||||
if key.enable_version_column {
|
||||
@@ -233,13 +367,8 @@ impl RegionMetadataBuilder {
|
||||
self.push_row_key_column(version_col)?;
|
||||
}
|
||||
|
||||
let row_key_end = self.columns.len();
|
||||
|
||||
self.row_key = RowKeyMetadata {
|
||||
row_key_end,
|
||||
timestamp_key_index,
|
||||
enable_version_column: key.enable_version_column,
|
||||
};
|
||||
self.row_key_end = self.columns.len();
|
||||
self.enable_version_column = key.enable_version_column;
|
||||
|
||||
Ok(self)
|
||||
}
|
||||
@@ -274,29 +403,33 @@ impl RegionMetadataBuilder {
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
fn build(self) -> Result<RegionMetadata> {
|
||||
let schema = if self.column_schemas.is_empty() {
|
||||
Arc::new(Schema::new(self.column_schemas))
|
||||
} else {
|
||||
Arc::new(
|
||||
Schema::with_timestamp_index(self.column_schemas, self.row_key.timestamp_key_index)
|
||||
.context(InvalidSchemaSnafu)?,
|
||||
)
|
||||
};
|
||||
let columns = ColumnsMetadata {
|
||||
fn build(mut self) -> Result<RegionMetadata> {
|
||||
let timestamp_key_index = self.timestamp_key_index.context(MissingTimestampSnafu)?;
|
||||
|
||||
let user_column_end = self.columns.len();
|
||||
// Setup internal columns.
|
||||
for internal_desc in internal_column_descs() {
|
||||
self.push_new_column(consts::DEFAULT_CF_ID, internal_desc)?;
|
||||
}
|
||||
|
||||
let columns = Arc::new(ColumnsMetadata {
|
||||
columns: self.columns,
|
||||
name_to_col_index: self.name_to_col_index,
|
||||
};
|
||||
let columns_row_key = Arc::new(ColumnsRowKeyMetadata {
|
||||
columns,
|
||||
row_key: self.row_key,
|
||||
row_key_end: self.row_key_end,
|
||||
timestamp_key_index,
|
||||
enable_version_column: self.enable_version_column,
|
||||
user_column_end,
|
||||
});
|
||||
let schema = Arc::new(
|
||||
RegionSchema::new(columns.clone(), Schema::INITIAL_VERSION)
|
||||
.context(InvalidSchemaSnafu)?,
|
||||
);
|
||||
|
||||
Ok(RegionMetadata {
|
||||
id: self.id,
|
||||
name: self.name,
|
||||
schema,
|
||||
columns_row_key,
|
||||
columns,
|
||||
column_families: ColumnFamiliesMetadata {
|
||||
id_to_cfs: self.id_to_cfs,
|
||||
},
|
||||
@@ -311,22 +444,35 @@ impl RegionMetadataBuilder {
|
||||
}
|
||||
|
||||
fn push_value_column(&mut self, cf_id: ColumnFamilyId, desc: ColumnDescriptor) -> Result<()> {
|
||||
ensure!(
|
||||
!is_internal_value_column(&desc.name),
|
||||
ReservedColumnSnafu { name: &desc.name }
|
||||
);
|
||||
|
||||
self.push_new_column(cf_id, desc)
|
||||
}
|
||||
|
||||
fn push_new_column(&mut self, cf_id: ColumnFamilyId, desc: ColumnDescriptor) -> Result<()> {
|
||||
ensure!(
|
||||
!self.name_to_col_index.contains_key(&desc.name),
|
||||
ColNameExistsSnafu { name: &desc.name }
|
||||
);
|
||||
ensure!(
|
||||
!self.column_ids.contains(&desc.id),
|
||||
ColIdExistsSnafu { id: desc.id }
|
||||
);
|
||||
|
||||
let column_schema = ColumnSchema::from(&desc);
|
||||
|
||||
let column_name = desc.name.clone();
|
||||
let column_id = desc.id;
|
||||
let meta = ColumnMetadata { cf_id, desc };
|
||||
|
||||
// TODO(yingwen): Store cf_id to metadata in field.
|
||||
|
||||
let column_index = self.columns.len();
|
||||
self.columns.push(meta);
|
||||
self.column_schemas.push(column_schema);
|
||||
self.name_to_col_index.insert(column_name, column_index);
|
||||
self.column_ids.insert(column_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -334,7 +480,7 @@ impl RegionMetadataBuilder {
|
||||
|
||||
fn version_column_desc() -> ColumnDescriptor {
|
||||
ColumnDescriptorBuilder::new(
|
||||
consts::VERSION_COLUMN_ID,
|
||||
ReservedColumnId::version(),
|
||||
consts::VERSION_COLUMN_NAME.to_string(),
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
)
|
||||
@@ -343,6 +489,36 @@ fn version_column_desc() -> ColumnDescriptor {
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn internal_column_descs() -> [ColumnDescriptor; 2] {
|
||||
[
|
||||
ColumnDescriptorBuilder::new(
|
||||
ReservedColumnId::sequence(),
|
||||
consts::SEQUENCE_COLUMN_NAME.to_string(),
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
)
|
||||
.is_nullable(false)
|
||||
.build()
|
||||
.unwrap(),
|
||||
ColumnDescriptorBuilder::new(
|
||||
ReservedColumnId::value_type(),
|
||||
consts::VALUE_TYPE_COLUMN_NAME.to_string(),
|
||||
ConcreteDataType::uint8_datatype(),
|
||||
)
|
||||
.is_nullable(false)
|
||||
.build()
|
||||
.unwrap(),
|
||||
]
|
||||
}
|
||||
|
||||
/// Returns true if this is an internal column for value column.
|
||||
#[inline]
|
||||
fn is_internal_value_column(column_name: &str) -> bool {
|
||||
matches!(
|
||||
column_name,
|
||||
consts::SEQUENCE_COLUMN_NAME | consts::VALUE_TYPE_COLUMN_NAME
|
||||
)
|
||||
}
|
||||
|
||||
// TODO(yingwen): Add tests for using invalid row_key/cf to build metadata.
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
@@ -378,33 +554,95 @@ mod tests {
|
||||
|
||||
let metadata = RegionMetadata::try_from(desc).unwrap();
|
||||
assert_eq!(region_name, metadata.name);
|
||||
assert_eq!(expect_schema, metadata.schema);
|
||||
assert_eq!(2, metadata.columns_row_key.num_row_key_columns());
|
||||
assert_eq!(1, metadata.columns_row_key.num_value_columns());
|
||||
assert_eq!(expect_schema, *metadata.user_schema());
|
||||
assert_eq!(2, metadata.columns.num_row_key_columns());
|
||||
assert_eq!(1, metadata.columns.num_value_columns());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_empty_region_metadata() {
|
||||
let metadata = RegionMetadataBuilder::default().build().unwrap();
|
||||
assert!(metadata.schema.column_schemas().is_empty());
|
||||
let err = RegionMetadataBuilder::default().build().err().unwrap();
|
||||
assert!(matches!(err, Error::MissingTimestamp { .. }));
|
||||
}
|
||||
|
||||
assert!(metadata.columns_row_key.columns.columns.is_empty());
|
||||
assert_eq!(0, metadata.columns_row_key.num_row_key_columns());
|
||||
assert!(metadata
|
||||
.columns_row_key
|
||||
.iter_row_key_columns()
|
||||
.next()
|
||||
.is_none());
|
||||
assert_eq!(0, metadata.columns_row_key.num_value_columns());
|
||||
assert!(metadata
|
||||
.columns_row_key
|
||||
.iter_value_columns()
|
||||
.next()
|
||||
.is_none());
|
||||
#[test]
|
||||
fn test_build_metadata_duplicate_name() {
|
||||
let cf = ColumnFamilyDescriptorBuilder::default()
|
||||
.push_column(
|
||||
ColumnDescriptorBuilder::new(4, "v1", ConcreteDataType::int64_datatype())
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
.push_column(
|
||||
ColumnDescriptorBuilder::new(5, "v1", ConcreteDataType::int64_datatype())
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
.build()
|
||||
.unwrap();
|
||||
let err = RegionMetadataBuilder::new()
|
||||
.add_column_family(cf)
|
||||
.err()
|
||||
.unwrap();
|
||||
assert!(matches!(err, Error::ColNameExists { .. }));
|
||||
}
|
||||
|
||||
assert!(metadata.column_families.id_to_cfs.is_empty());
|
||||
#[test]
|
||||
fn test_build_metadata_internal_name() {
|
||||
let names = [consts::SEQUENCE_COLUMN_NAME, consts::VALUE_TYPE_COLUMN_NAME];
|
||||
for name in names {
|
||||
let cf = ColumnFamilyDescriptorBuilder::default()
|
||||
.push_column(
|
||||
ColumnDescriptorBuilder::new(5, name, ConcreteDataType::int64_datatype())
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
.build()
|
||||
.unwrap();
|
||||
let err = RegionMetadataBuilder::new()
|
||||
.add_column_family(cf)
|
||||
.err()
|
||||
.unwrap();
|
||||
assert!(matches!(err, Error::ReservedColumn { .. }));
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(0, metadata.version);
|
||||
#[test]
|
||||
fn test_build_metadata_duplicate_id() {
|
||||
let cf = ColumnFamilyDescriptorBuilder::default()
|
||||
.push_column(
|
||||
ColumnDescriptorBuilder::new(4, "v1", ConcreteDataType::int64_datatype())
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
.push_column(
|
||||
ColumnDescriptorBuilder::new(4, "v2", ConcreteDataType::int64_datatype())
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
.build()
|
||||
.unwrap();
|
||||
let err = RegionMetadataBuilder::new()
|
||||
.add_column_family(cf)
|
||||
.err()
|
||||
.unwrap();
|
||||
assert!(matches!(err, Error::ColIdExists { .. }));
|
||||
|
||||
let timestamp = ColumnDescriptorBuilder::new(2, "ts", ConcreteDataType::int64_datatype())
|
||||
.is_nullable(false)
|
||||
.build()
|
||||
.unwrap();
|
||||
let row_key = RowKeyDescriptorBuilder::new(timestamp)
|
||||
.push_column(
|
||||
ColumnDescriptorBuilder::new(2, "k1", ConcreteDataType::int64_datatype())
|
||||
.is_nullable(false)
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
.build()
|
||||
.unwrap();
|
||||
let err = RegionMetadataBuilder::new().row_key(row_key).err().unwrap();
|
||||
assert!(matches!(err, Error::ColIdExists { .. }));
|
||||
}
|
||||
|
||||
fn new_metadata(enable_version_column: bool) -> RegionMetadata {
|
||||
@@ -454,30 +692,30 @@ mod tests {
|
||||
Some(1),
|
||||
);
|
||||
|
||||
assert_eq!(expect_schema, metadata.schema);
|
||||
assert_eq!(expect_schema, *metadata.user_schema());
|
||||
|
||||
// 3 columns
|
||||
assert_eq!(3, metadata.columns_row_key.columns.columns.len());
|
||||
// 3 user columns and 2 internal columns
|
||||
assert_eq!(5, metadata.columns.columns.len());
|
||||
// 2 row key columns
|
||||
assert_eq!(2, metadata.columns_row_key.num_row_key_columns());
|
||||
assert_eq!(2, metadata.columns.num_row_key_columns());
|
||||
let row_key_names: Vec<_> = metadata
|
||||
.columns_row_key
|
||||
.columns
|
||||
.iter_row_key_columns()
|
||||
.map(|column| &column.desc.name)
|
||||
.collect();
|
||||
assert_eq!(["k1", "ts"], &row_key_names[..]);
|
||||
// 1 value column
|
||||
assert_eq!(1, metadata.columns_row_key.num_value_columns());
|
||||
assert_eq!(1, metadata.columns.num_value_columns());
|
||||
let value_names: Vec<_> = metadata
|
||||
.columns_row_key
|
||||
.columns
|
||||
.iter_value_columns()
|
||||
.map(|column| &column.desc.name)
|
||||
.collect();
|
||||
assert_eq!(["v1"], &value_names[..]);
|
||||
// Check timestamp index.
|
||||
assert_eq!(1, metadata.columns_row_key.row_key.timestamp_key_index);
|
||||
assert_eq!(1, metadata.columns.timestamp_key_index);
|
||||
// Check version column.
|
||||
assert!(!metadata.columns_row_key.row_key.enable_version_column);
|
||||
assert!(!metadata.columns.enable_version_column);
|
||||
|
||||
assert!(metadata
|
||||
.column_families
|
||||
@@ -502,14 +740,14 @@ mod tests {
|
||||
Some(1),
|
||||
);
|
||||
|
||||
assert_eq!(expect_schema, metadata.schema);
|
||||
assert_eq!(expect_schema, *metadata.user_schema());
|
||||
|
||||
// 4 columns
|
||||
assert_eq!(4, metadata.columns_row_key.columns.columns.len());
|
||||
// 4 user columns and 2 internal columns.
|
||||
assert_eq!(6, metadata.columns.columns.len());
|
||||
// 3 row key columns
|
||||
assert_eq!(3, metadata.columns_row_key.num_row_key_columns());
|
||||
assert_eq!(3, metadata.columns.num_row_key_columns());
|
||||
let row_key_names: Vec<_> = metadata
|
||||
.columns_row_key
|
||||
.columns
|
||||
.iter_row_key_columns()
|
||||
.map(|column| &column.desc.name)
|
||||
.collect();
|
||||
@@ -518,16 +756,25 @@ mod tests {
|
||||
&row_key_names[..]
|
||||
);
|
||||
// 1 value column
|
||||
assert_eq!(1, metadata.columns_row_key.num_value_columns());
|
||||
assert_eq!(1, metadata.columns.num_value_columns());
|
||||
let value_names: Vec<_> = metadata
|
||||
.columns_row_key
|
||||
.columns
|
||||
.iter_value_columns()
|
||||
.map(|column| &column.desc.name)
|
||||
.collect();
|
||||
assert_eq!(["v1"], &value_names[..]);
|
||||
// Check timestamp index.
|
||||
assert_eq!(1, metadata.columns_row_key.row_key.timestamp_key_index);
|
||||
assert_eq!(1, metadata.columns.timestamp_key_index);
|
||||
// Check version column.
|
||||
assert!(metadata.columns_row_key.row_key.enable_version_column);
|
||||
assert!(metadata.columns.enable_version_column);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_convert_between_raw() {
|
||||
let metadata = new_metadata(true);
|
||||
let raw = RawRegionMetadata::from(&metadata);
|
||||
|
||||
let converted = RegionMetadata::try_from(raw).unwrap();
|
||||
assert_eq!(metadata, converted);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,11 +36,10 @@ pub enum Error {
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid timestamp index: {}", index))]
|
||||
InvalidTimestampIndex {
|
||||
index: usize,
|
||||
#[snafu(display("Failed to convert schema, source: {}", source))]
|
||||
ConvertSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -81,10 +80,10 @@ impl TryFrom<Schema> for schema::SchemaRef {
|
||||
|
||||
let schema: schema::SchemaRef = match schema.timestamp_index {
|
||||
Some(index) => Arc::new(
|
||||
schema::Schema::with_timestamp_index(column_schemas, index.value as usize)
|
||||
.context(InvalidTimestampIndexSnafu {
|
||||
index: index.value as usize,
|
||||
})?,
|
||||
schema::SchemaBuilder::from(column_schemas)
|
||||
.timestamp_index(index.value as usize)
|
||||
.build()
|
||||
.context(ConvertSchemaSnafu)?,
|
||||
),
|
||||
None => Arc::new(schema::Schema::new(column_schemas)),
|
||||
};
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::logging;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use snafu::ensure;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::manifest::{
|
||||
self, action::ProtocolAction, Manifest, ManifestVersion, MetaActionIterator,
|
||||
@@ -103,7 +103,7 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
.update(RegionMetaActionList::new(vec![
|
||||
RegionMetaAction::Protocol(ProtocolAction::new()),
|
||||
RegionMetaAction::Change(RegionChange {
|
||||
metadata: metadata.clone(),
|
||||
metadata: (&*metadata).into(),
|
||||
}),
|
||||
]))
|
||||
.await?;
|
||||
@@ -206,8 +206,13 @@ impl<S: LogStore> RegionImpl<S> {
|
||||
for action in action_list.actions {
|
||||
match (action, version) {
|
||||
(RegionMetaAction::Change(c), None) => {
|
||||
let region = c.metadata.name.clone();
|
||||
let region_metadata = c
|
||||
.metadata
|
||||
.try_into()
|
||||
.context(error::InvalidRawRegionSnafu { region })?;
|
||||
version = Some(Version::with_manifest_version(
|
||||
c.metadata,
|
||||
Arc::new(region_metadata),
|
||||
last_manifest_version,
|
||||
));
|
||||
for (manifest_version, action) in actions.drain(..) {
|
||||
|
||||
@@ -197,7 +197,7 @@ async fn test_recover_region_manifets() {
|
||||
manifest
|
||||
.update(RegionMetaActionList::with_action(RegionMetaAction::Change(
|
||||
RegionChange {
|
||||
metadata: region_meta.clone(),
|
||||
metadata: (&*region_meta).into(),
|
||||
},
|
||||
)))
|
||||
.await
|
||||
|
||||
@@ -309,7 +309,7 @@ impl WriterInner {
|
||||
&& memtables_to_add.get_by_range(range).is_none()
|
||||
{
|
||||
// Memtable for this range is missing, need to create a new memtable.
|
||||
let memtable_schema = current_version.memtable_schema();
|
||||
let memtable_schema = current_version.schema().clone();
|
||||
let id = self.alloc_memtable_id();
|
||||
let memtable = self.memtable_builder.build(id, memtable_schema);
|
||||
memtables_to_add.insert(*range, memtable);
|
||||
|
||||
450
src/storage/src/schema.rs
Normal file
450
src/storage/src/schema.rs
Normal file
@@ -0,0 +1,450 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::prelude::*;
|
||||
use datatypes::arrow::array::Array;
|
||||
use datatypes::arrow::chunk::Chunk;
|
||||
use datatypes::arrow::datatypes::Schema as ArrowSchema;
|
||||
use datatypes::prelude::Vector;
|
||||
use datatypes::schema::Metadata;
|
||||
use datatypes::vectors::{Helper, UInt64Vector, UInt8Vector};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ensure;
|
||||
use store_api::storage::{consts, ColumnSchema, Schema, SchemaBuilder, SchemaRef};
|
||||
|
||||
use crate::metadata::{ColumnMetadata, ColumnsMetadata, ColumnsMetadataRef};
|
||||
use crate::read::Batch;
|
||||
|
||||
const ROW_KEY_END_KEY: &str = "greptime:storage:row_key_end";
|
||||
const USER_COLUMN_END_KEY: &str = "greptime:storage:user_column_end";
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Failed to build schema, source: {}", source))]
|
||||
BuildSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert from arrow schema, source: {}", source))]
|
||||
ConvertArrowSchema {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid internal column index in arrow schema"))]
|
||||
InvalidIndex { backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Missing metadata {} in arrow schema", key))]
|
||||
MissingMeta { key: String, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("Missing column {} in arrow schema", column))]
|
||||
MissingColumn {
|
||||
column: String,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to parse index in schema meta, value: {}, source: {}",
|
||||
value,
|
||||
source
|
||||
))]
|
||||
ParseIndex {
|
||||
value: String,
|
||||
source: std::num::ParseIntError,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Failed to convert arrow chunk to batch, name: {}, source: {}",
|
||||
name,
|
||||
source
|
||||
))]
|
||||
ConvertChunk {
|
||||
name: String,
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
/// Schema of region.
|
||||
///
|
||||
/// The `RegionSchema` has the knowledge of reserved and internal columns.
|
||||
/// Reserved columns are columns that their names, ids are reserved by the storage
|
||||
/// engine, and could not be used by the user. Reserved columns usually have
|
||||
/// special usage. Reserved columns expect the version columns are also
|
||||
/// called internal columns (though the version could also be thought as a
|
||||
/// special kind of internal column), are not visible to user, such as our
|
||||
/// internal sequence, value_type columns.
|
||||
///
|
||||
/// The user schema is the schema that only contains columns that user could visit,
|
||||
/// as well as what the schema user created.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct RegionSchema {
|
||||
/// Schema that only contains columns that user defined, excluding internal columns
|
||||
/// that are reserved and used by the storage engine.
|
||||
///
|
||||
/// Holding a [SchemaRef] to allow converting into `SchemaRef`/`arrow::SchemaRef`
|
||||
/// conveniently. The fields order in `SchemaRef` **must** be consistent with
|
||||
/// columns order in [ColumnsMetadata] to ensure the projection index of a field
|
||||
/// is correct.
|
||||
user_schema: SchemaRef,
|
||||
/// SST schema contains all columns of the region, including all internal columns.
|
||||
sst_schema: SstSchema,
|
||||
/// Metadata of columns.
|
||||
columns: ColumnsMetadataRef,
|
||||
}
|
||||
|
||||
impl RegionSchema {
|
||||
pub fn new(columns: ColumnsMetadataRef, version: u32) -> Result<RegionSchema> {
|
||||
let user_schema = Arc::new(build_user_schema(&columns, version)?);
|
||||
let sst_schema = SstSchema::new(&columns, version)?;
|
||||
|
||||
debug_assert_eq!(user_schema.version(), sst_schema.version());
|
||||
debug_assert_eq!(version, user_schema.version());
|
||||
|
||||
Ok(RegionSchema {
|
||||
user_schema,
|
||||
sst_schema,
|
||||
columns,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the schema of the region, excluding internal columns that used by
|
||||
/// the storage engine.
|
||||
#[inline]
|
||||
pub fn user_schema(&self) -> &SchemaRef {
|
||||
&self.user_schema
|
||||
}
|
||||
|
||||
/// Returns the schema for sst, which contains all columns used by the region.
|
||||
#[inline]
|
||||
pub fn sst_schema(&self) -> &SstSchema {
|
||||
&self.sst_schema
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn row_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
|
||||
self.columns.iter_row_key_columns()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn value_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
|
||||
self.columns.iter_value_columns()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn num_row_key_columns(&self) -> usize {
|
||||
self.columns.num_row_key_columns()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn num_value_columns(&self) -> usize {
|
||||
self.columns.num_value_columns()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn version(&self) -> u32 {
|
||||
self.user_schema.version()
|
||||
}
|
||||
}
|
||||
|
||||
pub type RegionSchemaRef = Arc<RegionSchema>;
|
||||
|
||||
/// Schema of SST.
|
||||
///
|
||||
/// Only contains a reference to schema and some indices, so it should be cheap to clone.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct SstSchema {
|
||||
schema: SchemaRef,
|
||||
row_key_end: usize,
|
||||
user_column_end: usize,
|
||||
}
|
||||
|
||||
impl SstSchema {
|
||||
fn new(columns: &ColumnsMetadata, version: u32) -> Result<SstSchema> {
|
||||
let column_schemas: Vec<_> = columns
|
||||
.iter_all_columns()
|
||||
.map(|col| ColumnSchema::from(&col.desc))
|
||||
.collect();
|
||||
|
||||
let schema = SchemaBuilder::from(column_schemas)
|
||||
.timestamp_index(columns.timestamp_key_index())
|
||||
.version(version)
|
||||
.add_metadata(ROW_KEY_END_KEY, columns.row_key_end().to_string())
|
||||
.add_metadata(USER_COLUMN_END_KEY, columns.user_column_end().to_string())
|
||||
.build()
|
||||
.context(BuildSchemaSnafu)?;
|
||||
|
||||
let user_column_end = columns.user_column_end();
|
||||
assert_eq!(
|
||||
consts::SEQUENCE_COLUMN_NAME,
|
||||
schema.column_schemas()[user_column_end].name
|
||||
);
|
||||
assert_eq!(
|
||||
consts::VALUE_TYPE_COLUMN_NAME,
|
||||
schema.column_schemas()[user_column_end + 1].name
|
||||
);
|
||||
|
||||
Ok(SstSchema {
|
||||
schema: Arc::new(schema),
|
||||
row_key_end: columns.row_key_end(),
|
||||
user_column_end,
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn version(&self) -> u32 {
|
||||
self.schema.version()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn schema(&self) -> &SchemaRef {
|
||||
&self.schema
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
|
||||
self.schema.arrow_schema()
|
||||
}
|
||||
|
||||
pub fn batch_to_arrow_chunk(&self, batch: &Batch) -> Chunk<Arc<dyn Array>> {
|
||||
assert_eq!(
|
||||
self.schema.num_columns(),
|
||||
// key columns + value columns + sequence + value_type
|
||||
batch.keys.len() + batch.values.len() + 2
|
||||
);
|
||||
|
||||
Chunk::new(
|
||||
batch
|
||||
.keys
|
||||
.iter()
|
||||
.map(|v| v.to_arrow_array())
|
||||
.chain(batch.values.iter().map(|v| v.to_arrow_array()))
|
||||
.chain(std::iter::once(batch.sequences.to_arrow_array()))
|
||||
.chain(std::iter::once(batch.value_types.to_arrow_array()))
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn arrow_chunk_to_batch(&self, chunk: &Chunk<Arc<dyn Array>>) -> Result<Batch> {
|
||||
let keys = self
|
||||
.row_key_indices()
|
||||
.map(|i| {
|
||||
Helper::try_into_vector(&chunk[i].clone()).context(ConvertChunkSnafu {
|
||||
name: self.column_name(i),
|
||||
})
|
||||
})
|
||||
.collect::<Result<_>>()?;
|
||||
let sequences = UInt64Vector::try_from_arrow_array(&chunk[self.sequence_index()].clone())
|
||||
.context(ConvertChunkSnafu {
|
||||
name: consts::SEQUENCE_COLUMN_NAME,
|
||||
})?;
|
||||
let value_types = UInt8Vector::try_from_arrow_array(
|
||||
&chunk[self.value_type_index()].clone(),
|
||||
)
|
||||
.context(ConvertChunkSnafu {
|
||||
name: consts::VALUE_TYPE_COLUMN_NAME,
|
||||
})?;
|
||||
let values = self
|
||||
.value_indices()
|
||||
.map(|i| {
|
||||
Helper::try_into_vector(&chunk[i].clone()).context(ConvertChunkSnafu {
|
||||
name: self.column_name(i),
|
||||
})
|
||||
})
|
||||
.collect::<Result<_>>()?;
|
||||
|
||||
Ok(Batch {
|
||||
keys,
|
||||
sequences,
|
||||
value_types,
|
||||
values,
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn sequence_index(&self) -> usize {
|
||||
self.user_column_end
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn value_type_index(&self) -> usize {
|
||||
self.user_column_end + 1
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn row_key_indices(&self) -> impl Iterator<Item = usize> {
|
||||
0..self.row_key_end
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn value_indices(&self) -> impl Iterator<Item = usize> {
|
||||
self.row_key_end..self.user_column_end
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn column_name(&self, idx: usize) -> &str {
|
||||
&self.schema.column_schemas()[idx].name
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ArrowSchema> for SstSchema {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(arrow_schema: ArrowSchema) -> Result<SstSchema> {
|
||||
let schema = Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?;
|
||||
// Recover other metadata from schema.
|
||||
let row_key_end = parse_index_from_metadata(schema.metadata(), ROW_KEY_END_KEY)?;
|
||||
let user_column_end = parse_index_from_metadata(schema.metadata(), USER_COLUMN_END_KEY)?;
|
||||
|
||||
// There should be sequence and value type columns.
|
||||
ensure!(
|
||||
consts::SEQUENCE_COLUMN_NAME == schema.column_schemas()[user_column_end].name,
|
||||
InvalidIndexSnafu
|
||||
);
|
||||
ensure!(
|
||||
consts::VALUE_TYPE_COLUMN_NAME == schema.column_schemas()[user_column_end + 1].name,
|
||||
InvalidIndexSnafu
|
||||
);
|
||||
|
||||
Ok(SstSchema {
|
||||
schema: Arc::new(schema),
|
||||
row_key_end,
|
||||
user_column_end,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_index_from_metadata(metadata: &Metadata, key: &str) -> Result<usize> {
|
||||
let value = metadata.get(key).context(MissingMetaSnafu { key })?;
|
||||
value.parse().context(ParseIndexSnafu { value })
|
||||
}
|
||||
|
||||
// Now user schema don't have extra metadata like sst schema.
|
||||
fn build_user_schema(columns: &ColumnsMetadata, version: u32) -> Result<Schema> {
|
||||
let column_schemas: Vec<_> = columns
|
||||
.iter_user_columns()
|
||||
.map(|col| ColumnSchema::from(&col.desc))
|
||||
.collect();
|
||||
|
||||
SchemaBuilder::from(column_schemas)
|
||||
.timestamp_index(columns.timestamp_key_index())
|
||||
.version(version)
|
||||
.build()
|
||||
.context(BuildSchemaSnafu)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datatypes::type_id::LogicalTypeId;
|
||||
use datatypes::vectors::{Int64Vector, UInt64Vector, UInt8Vector};
|
||||
|
||||
use super::*;
|
||||
use crate::metadata::RegionMetadata;
|
||||
use crate::test_util::{descriptor_util::RegionDescBuilder, schema_util};
|
||||
|
||||
fn new_batch() -> Batch {
|
||||
let k1 = Int64Vector::from_slice(&[1, 2, 3]);
|
||||
let timestamp = Int64Vector::from_slice(&[4, 5, 6]);
|
||||
let v1 = Int64Vector::from_slice(&[7, 8, 9]);
|
||||
|
||||
Batch {
|
||||
keys: vec![Arc::new(k1), Arc::new(timestamp)],
|
||||
values: vec![Arc::new(v1)],
|
||||
sequences: UInt64Vector::from_slice(&[100, 100, 100]),
|
||||
value_types: UInt8Vector::from_slice(&[0, 0, 0]),
|
||||
}
|
||||
}
|
||||
|
||||
fn check_chunk_batch(chunk: &Chunk<Arc<dyn Array>>, batch: &Batch) {
|
||||
assert_eq!(5, chunk.columns().len());
|
||||
assert_eq!(3, chunk.len());
|
||||
|
||||
for i in 0..2 {
|
||||
assert_eq!(chunk[i], batch.keys[i].to_arrow_array());
|
||||
}
|
||||
assert_eq!(chunk[2], batch.values[0].to_arrow_array());
|
||||
assert_eq!(chunk[3], batch.sequences.to_arrow_array());
|
||||
assert_eq!(chunk[4], batch.value_types.to_arrow_array());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_region_schema() {
|
||||
let desc = RegionDescBuilder::new("test")
|
||||
.push_key_column(("k1", LogicalTypeId::Int64, false))
|
||||
.push_value_column(("v1", LogicalTypeId::Int64, true))
|
||||
.build();
|
||||
let metadata: RegionMetadata = desc.try_into().unwrap();
|
||||
|
||||
let columns = metadata.columns;
|
||||
let region_schema = RegionSchema::new(columns.clone(), 0).unwrap();
|
||||
|
||||
let expect_schema = schema_util::new_schema(
|
||||
&[
|
||||
("k1", LogicalTypeId::Int64, false),
|
||||
("timestamp", LogicalTypeId::Int64, false),
|
||||
("v1", LogicalTypeId::Int64, true),
|
||||
],
|
||||
Some(1),
|
||||
);
|
||||
|
||||
assert_eq!(expect_schema, **region_schema.user_schema());
|
||||
|
||||
let mut row_keys = region_schema.row_key_columns();
|
||||
assert_eq!("k1", row_keys.next().unwrap().desc.name);
|
||||
assert_eq!("timestamp", row_keys.next().unwrap().desc.name);
|
||||
assert_eq!(None, row_keys.next());
|
||||
assert_eq!(2, region_schema.num_row_key_columns());
|
||||
|
||||
let mut values = region_schema.value_columns();
|
||||
assert_eq!("v1", values.next().unwrap().desc.name);
|
||||
assert_eq!(None, values.next());
|
||||
assert_eq!(1, region_schema.num_value_columns());
|
||||
|
||||
assert_eq!(0, region_schema.version());
|
||||
{
|
||||
let region_schema = RegionSchema::new(columns, 1234).unwrap();
|
||||
assert_eq!(1234, region_schema.version());
|
||||
assert_eq!(1234, region_schema.sst_schema().version());
|
||||
}
|
||||
|
||||
let sst_schema = region_schema.sst_schema();
|
||||
let sst_arrow_schema = sst_schema.arrow_schema();
|
||||
let converted_sst_schema = SstSchema::try_from((**sst_arrow_schema).clone()).unwrap();
|
||||
|
||||
assert_eq!(*sst_schema, converted_sst_schema);
|
||||
|
||||
let expect_schema = schema_util::new_schema(
|
||||
&[
|
||||
("k1", LogicalTypeId::Int64, false),
|
||||
("timestamp", LogicalTypeId::Int64, false),
|
||||
("v1", LogicalTypeId::Int64, true),
|
||||
(consts::SEQUENCE_COLUMN_NAME, LogicalTypeId::UInt64, false),
|
||||
(consts::VALUE_TYPE_COLUMN_NAME, LogicalTypeId::UInt8, false),
|
||||
],
|
||||
Some(1),
|
||||
);
|
||||
assert_eq!(
|
||||
expect_schema.column_schemas(),
|
||||
sst_schema.schema().column_schemas()
|
||||
);
|
||||
assert_eq!(3, sst_schema.sequence_index());
|
||||
assert_eq!(4, sst_schema.value_type_index());
|
||||
let row_key_indices: Vec<_> = sst_schema.row_key_indices().collect();
|
||||
assert_eq!([0, 1], &row_key_indices[..]);
|
||||
let value_indices: Vec<_> = sst_schema.value_indices().collect();
|
||||
assert_eq!([2], &value_indices[..]);
|
||||
|
||||
// Test batch and chunk conversion.
|
||||
let batch = new_batch();
|
||||
// Convert batch to chunk.
|
||||
let chunk = sst_schema.batch_to_arrow_chunk(&batch);
|
||||
check_chunk_batch(&chunk, &batch);
|
||||
|
||||
// Convert chunk to batch.
|
||||
let converted_batch = sst_schema.arrow_chunk_to_batch(&chunk).unwrap();
|
||||
check_chunk_batch(&chunk, &converted_batch);
|
||||
}
|
||||
}
|
||||
@@ -26,7 +26,7 @@ impl Snapshot for SnapshotImpl {
|
||||
type Reader = ChunkReaderImpl;
|
||||
|
||||
fn schema(&self) -> &SchemaRef {
|
||||
self.version.schema()
|
||||
self.version.user_schema()
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
@@ -41,7 +41,7 @@ impl Snapshot for SnapshotImpl {
|
||||
let immutables = memtable_version.immutable_memtables();
|
||||
|
||||
let mut builder =
|
||||
ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone())
|
||||
ChunkReaderBuilder::new(self.version.user_schema().clone(), self.sst_layer.clone())
|
||||
.reserve_num_memtables(memtable_version.num_memtables())
|
||||
.iter_ctx(IterContext {
|
||||
batch_size: ctx.batch_size,
|
||||
|
||||
@@ -15,22 +15,15 @@ use datatypes::arrow::io::parquet::read::{
|
||||
use datatypes::arrow::io::parquet::write::{
|
||||
Compression, Encoding, FileSink, Version, WriteOptions,
|
||||
};
|
||||
use datatypes::prelude::{ConcreteDataType, Vector};
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::vectors::{Helper, UInt64Vector, UInt8Vector};
|
||||
use futures_util::sink::SinkExt;
|
||||
use futures_util::{Stream, TryStreamExt};
|
||||
use object_store::{ObjectStore, SeekableReader};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::consts;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{
|
||||
FlushIoSnafu, InvalidParquetSchemaSnafu, ReadParquetIoSnafu, ReadParquetSnafu, Result,
|
||||
SequenceColumnNotFoundSnafu, WriteParquetSnafu,
|
||||
};
|
||||
use crate::memtable::{BoxedBatchIterator, MemtableSchema};
|
||||
use crate::metadata::ColumnMetadata;
|
||||
use crate::error::{self, Result};
|
||||
use crate::memtable::BoxedBatchIterator;
|
||||
use crate::read::{Batch, BatchReader};
|
||||
use crate::schema::SstSchema;
|
||||
use crate::sst;
|
||||
|
||||
/// Parquet sst writer.
|
||||
@@ -61,20 +54,23 @@ impl<'a> ParquetWriter<'a> {
|
||||
/// A chunk of records yielded from each iteration with a size given
|
||||
/// in config will be written to a single row group.
|
||||
async fn write_rows(self, extra_meta: Option<HashMap<String, String>>) -> Result<()> {
|
||||
let schema = memtable_schema_to_arrow_schema(self.iter.schema());
|
||||
let region_schema = self.iter.schema();
|
||||
let sst_schema = region_schema.sst_schema();
|
||||
let schema = sst_schema.arrow_schema();
|
||||
let object = self.object_store.object(self.file_path);
|
||||
|
||||
// FIXME(hl): writer size is not used in fs backend so just leave it to 0,
|
||||
// but in s3/azblob backend the Content-Length field of HTTP request is set
|
||||
// to this value.
|
||||
let writer = object.writer(0).await.context(FlushIoSnafu)?;
|
||||
let writer = object.writer(0).await.context(error::FlushIoSnafu)?;
|
||||
|
||||
// now all physical types use plain encoding, maybe let caller to choose encoding for each type.
|
||||
let encodings = get_encoding_for_schema(&schema, |_| Encoding::Plain);
|
||||
let encodings = get_encoding_for_schema(&*schema, |_| Encoding::Plain);
|
||||
|
||||
let mut sink = FileSink::try_new(
|
||||
writer,
|
||||
schema,
|
||||
// The file sink needs the `Schema` instead of a reference.
|
||||
(**schema).clone(),
|
||||
encodings,
|
||||
WriteOptions {
|
||||
write_statistics: true,
|
||||
@@ -82,22 +78,13 @@ impl<'a> ParquetWriter<'a> {
|
||||
version: Version::V2,
|
||||
},
|
||||
)
|
||||
.context(WriteParquetSnafu)?;
|
||||
.context(error::WriteParquetSnafu)?;
|
||||
|
||||
for batch in self.iter {
|
||||
let batch = batch?;
|
||||
sink.send(Chunk::new(
|
||||
batch
|
||||
.keys
|
||||
.iter()
|
||||
.map(|v| v.to_arrow_array())
|
||||
.chain(std::iter::once(batch.sequences.to_arrow_array()))
|
||||
.chain(std::iter::once(batch.value_types.to_arrow_array()))
|
||||
.chain(batch.values.iter().map(|v| v.to_arrow_array()))
|
||||
.collect(),
|
||||
))
|
||||
.await
|
||||
.context(WriteParquetSnafu)?;
|
||||
sink.send(sst_schema.batch_to_arrow_chunk(&batch))
|
||||
.await
|
||||
.context(error::WriteParquetSnafu)?;
|
||||
}
|
||||
|
||||
if let Some(meta) = extra_meta {
|
||||
@@ -105,42 +92,13 @@ impl<'a> ParquetWriter<'a> {
|
||||
sink.metadata.insert(k, Some(v));
|
||||
}
|
||||
}
|
||||
sink.close().await.context(WriteParquetSnafu)?;
|
||||
sink.close().await.context(error::WriteParquetSnafu)?;
|
||||
// FIXME(yingwen): Hack to workaround an [arrow2 BUG](https://github.com/jorgecarleitao/parquet2/issues/162),
|
||||
// upgrading to latest arrow2 can fixed this, but now datafusion is still using an old arrow2 version.
|
||||
sink.flush().await.context(WriteParquetSnafu)
|
||||
sink.flush().await.context(error::WriteParquetSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
/// Assembles arrow schema from memtable schema info.
|
||||
/// TODO(hl): implement `From<Schema>` for `MemtableSchema`/`Physical schema`
|
||||
fn memtable_schema_to_arrow_schema(schema: &MemtableSchema) -> Schema {
|
||||
let col_meta_to_field: fn(&ColumnMetadata) -> Field = |col_meta| {
|
||||
Field::from(&ColumnSchema::new(
|
||||
col_meta.desc.name.clone(),
|
||||
col_meta.desc.data_type.clone(),
|
||||
col_meta.desc.is_nullable,
|
||||
))
|
||||
};
|
||||
|
||||
let fields = schema
|
||||
.row_key_columns()
|
||||
.map(col_meta_to_field)
|
||||
.chain(std::iter::once(Field::from(&ColumnSchema::new(
|
||||
consts::SEQUENCE_COLUMN_NAME,
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
false,
|
||||
))))
|
||||
.chain(std::iter::once(Field::from(&ColumnSchema::new(
|
||||
consts::VALUE_TYPE_COLUMN_NAME,
|
||||
ConcreteDataType::uint8_datatype(),
|
||||
false,
|
||||
))))
|
||||
.chain(schema.value_columns().map(col_meta_to_field))
|
||||
.collect::<Vec<_>>();
|
||||
Schema::from(fields)
|
||||
}
|
||||
|
||||
fn get_encoding_for_schema<F: Fn(&DataType) -> Encoding + Clone>(
|
||||
schema: &Schema,
|
||||
map: F,
|
||||
@@ -213,9 +171,11 @@ impl<'a> ParquetReader<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(yingwen): Projection is not supported now, since field index would change after projection.
|
||||
// To support projection, we may need to implement some helper methods in schema.
|
||||
pub async fn chunk_stream(
|
||||
&self,
|
||||
projection: Option<FieldProjection>,
|
||||
_projection: Option<FieldProjection>,
|
||||
chunk_size: usize,
|
||||
) -> Result<ChunkStream> {
|
||||
let file_path = self.file_path.to_string();
|
||||
@@ -229,17 +189,17 @@ impl<'a> ParquetReader<'a> {
|
||||
let file_path = self.file_path.to_string();
|
||||
let mut reader = reader_factory()
|
||||
.await
|
||||
.context(ReadParquetIoSnafu { file: &file_path })?;
|
||||
.context(error::ReadParquetIoSnafu { file: &file_path })?;
|
||||
let metadata = read_metadata_async(&mut reader)
|
||||
.await
|
||||
.context(ReadParquetSnafu { file: &file_path })?;
|
||||
let schema = infer_schema(&metadata).context(ReadParquetSnafu { file: &file_path })?;
|
||||
.context(error::ReadParquetSnafu { file: &file_path })?;
|
||||
let arrow_schema =
|
||||
infer_schema(&metadata).context(error::ReadParquetSnafu { file: &file_path })?;
|
||||
// Just read all fields.
|
||||
let projected_fields = arrow_schema.fields.clone();
|
||||
|
||||
let projected_fields = projection.map_or_else(|| schema.fields.clone(), |p| p(&schema));
|
||||
let projected_schema = Schema {
|
||||
fields: projected_fields.clone(),
|
||||
metadata: schema.metadata,
|
||||
};
|
||||
let sst_schema = SstSchema::try_from(arrow_schema)
|
||||
.context(error::ConvertSstSchemaSnafu { file: &file_path })?;
|
||||
|
||||
let chunk_stream = try_stream!({
|
||||
for rg in metadata.row_groups {
|
||||
@@ -250,36 +210,31 @@ impl<'a> ParquetReader<'a> {
|
||||
Some(chunk_size),
|
||||
)
|
||||
.await
|
||||
.context(ReadParquetSnafu { file: &file_path })?;
|
||||
.context(error::ReadParquetSnafu { file: &file_path })?;
|
||||
|
||||
let chunks = RowGroupDeserializer::new(column_chunks, rg.num_rows() as usize, None);
|
||||
for maybe_chunk in chunks {
|
||||
let columns_in_chunk =
|
||||
maybe_chunk.context(ReadParquetSnafu { file: &file_path })?;
|
||||
maybe_chunk.context(error::ReadParquetSnafu { file: &file_path })?;
|
||||
yield columns_in_chunk;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
ChunkStream::new(projected_schema, Box::pin(chunk_stream))
|
||||
ChunkStream::new(sst_schema, Box::pin(chunk_stream))
|
||||
}
|
||||
}
|
||||
|
||||
type ChunkConverter = Box<dyn Fn(Chunk<Arc<dyn Array>>) -> Result<Batch> + Send + Sync>;
|
||||
|
||||
pub type SendableChunkStream = Pin<Box<dyn Stream<Item = Result<Chunk<Arc<dyn Array>>>> + Send>>;
|
||||
|
||||
pub struct ChunkStream {
|
||||
schema: SstSchema,
|
||||
stream: SendableChunkStream,
|
||||
converter: ChunkConverter,
|
||||
}
|
||||
|
||||
impl ChunkStream {
|
||||
pub fn new(schema: Schema, stream: SendableChunkStream) -> Result<Self> {
|
||||
Ok(Self {
|
||||
converter: batch_converter_factory(schema)?,
|
||||
stream,
|
||||
})
|
||||
pub fn new(schema: SstSchema, stream: SendableChunkStream) -> Result<Self> {
|
||||
Ok(Self { schema, stream })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -289,63 +244,15 @@ impl BatchReader for ChunkStream {
|
||||
self.stream
|
||||
.try_next()
|
||||
.await?
|
||||
.map(&self.converter)
|
||||
.map(|chunk| {
|
||||
self.schema
|
||||
.arrow_chunk_to_batch(&chunk)
|
||||
.context(error::InvalidParquetSchemaSnafu)
|
||||
})
|
||||
.transpose()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(hl): Currently rowkey/values/reserved columns are identified by their position in field:
|
||||
// all fields before `__sequence` column are rowkeys and fields after `__value_type` are values.
|
||||
// But it would be better to persist rowkey/value columns' positions to Parquet metadata and
|
||||
// parse `MemtableSchema` from metadata while building BatchReader.
|
||||
fn batch_converter_factory(schema: Schema) -> Result<ChunkConverter> {
|
||||
let ts_idx = schema
|
||||
.fields
|
||||
.iter()
|
||||
.position(|f| f.name == consts::SEQUENCE_COLUMN_NAME)
|
||||
.with_context(|| SequenceColumnNotFoundSnafu {
|
||||
msg: format!("Schema: {:?}", schema),
|
||||
})?;
|
||||
|
||||
let field_len = schema.fields.len();
|
||||
|
||||
macro_rules! handle_err {
|
||||
($stmt: expr, $schema: ident) => {
|
||||
$stmt.with_context(|_| InvalidParquetSchemaSnafu {
|
||||
msg: format!("Schema type error: {:?}", $schema),
|
||||
})?
|
||||
};
|
||||
}
|
||||
|
||||
let converter = move |c: Chunk<Arc<dyn Array>>| {
|
||||
Ok(Batch {
|
||||
sequences: handle_err!(
|
||||
UInt64Vector::try_from_arrow_array(&c.arrays()[ts_idx].clone()),
|
||||
schema
|
||||
),
|
||||
value_types: handle_err!(
|
||||
UInt8Vector::try_from_arrow_array(&c.arrays()[ts_idx + 1].clone()),
|
||||
schema
|
||||
),
|
||||
keys: handle_err!(
|
||||
(0..ts_idx)
|
||||
.into_iter()
|
||||
.map(|i| Helper::try_into_vector(&c.arrays()[i].clone()))
|
||||
.collect::<std::result::Result<_, _>>(),
|
||||
schema
|
||||
),
|
||||
values: handle_err!(
|
||||
(ts_idx + 2..field_len)
|
||||
.into_iter()
|
||||
.map(|i| Helper::try_into_vector(&c.arrays()[i].clone()))
|
||||
.collect::<std::result::Result<_, _>>(),
|
||||
schema
|
||||
),
|
||||
})
|
||||
};
|
||||
Ok(Box::new(converter))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
@@ -398,10 +305,11 @@ mod tests {
|
||||
let reader = std::fs::File::open(dir.path().join(sst_file_name)).unwrap();
|
||||
let mut file_reader = FileReader::try_new(reader, None, Some(128), None, None).unwrap();
|
||||
|
||||
// chunk schema: timestamp, __version, __sequence, __value_type, v1
|
||||
// chunk schema: timestamp, __version, v1, __sequence, __value_type
|
||||
let chunk = file_reader.next().unwrap().unwrap();
|
||||
assert_eq!(5, chunk.arrays().len());
|
||||
|
||||
// timestamp
|
||||
assert_eq!(
|
||||
Arc::new(Int64Array::from_slice(&[
|
||||
1000, 1000, 1001, 2002, 2003, 2003
|
||||
@@ -409,23 +317,27 @@ mod tests {
|
||||
chunk.arrays()[0]
|
||||
);
|
||||
|
||||
// version
|
||||
assert_eq!(
|
||||
Arc::new(UInt64Array::from_slice(&[1, 2, 1, 1, 1, 5])) as Arc<dyn Array>,
|
||||
chunk.arrays()[1]
|
||||
);
|
||||
|
||||
// v1
|
||||
assert_eq!(
|
||||
Arc::new(UInt64Array::from_slice(&[10, 10, 10, 10, 10, 10])) as Arc<dyn Array>,
|
||||
Arc::new(UInt64Array::from_slice(&[1, 2, 3, 7, 8, 9])) as Arc<dyn Array>,
|
||||
chunk.arrays()[2]
|
||||
);
|
||||
|
||||
// sequence
|
||||
assert_eq!(
|
||||
Arc::new(UInt8Array::from_slice(&[0, 0, 0, 0, 0, 0])) as Arc<dyn Array>,
|
||||
Arc::new(UInt64Array::from_slice(&[10, 10, 10, 10, 10, 10])) as Arc<dyn Array>,
|
||||
chunk.arrays()[3]
|
||||
);
|
||||
|
||||
// value type
|
||||
assert_eq!(
|
||||
Arc::new(UInt64Array::from_slice(&[1, 2, 3, 7, 8, 9])) as Arc<dyn Array>,
|
||||
Arc::new(UInt8Array::from_slice(&[0, 0, 0, 0, 0, 0])) as Arc<dyn Array>,
|
||||
chunk.arrays()[4]
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::prelude::*;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
|
||||
|
||||
/// Column definition: (name, datatype, is_nullable)
|
||||
pub type ColumnDef<'a> = (&'a str, LogicalTypeId, bool);
|
||||
@@ -16,7 +16,10 @@ pub fn new_schema(column_defs: &[ColumnDef], timestamp_index: Option<usize>) ->
|
||||
.collect();
|
||||
|
||||
if let Some(index) = timestamp_index {
|
||||
Schema::with_timestamp_index(column_schemas, index).unwrap()
|
||||
SchemaBuilder::from(column_schemas)
|
||||
.timestamp_index(index)
|
||||
.build()
|
||||
.unwrap()
|
||||
} else {
|
||||
Schema::new(column_schemas)
|
||||
}
|
||||
|
||||
@@ -14,8 +14,9 @@ use std::time::Duration;
|
||||
use store_api::manifest::ManifestVersion;
|
||||
use store_api::storage::{SchemaRef, SequenceNumber};
|
||||
|
||||
use crate::memtable::{MemtableId, MemtableSchema, MemtableSet, MemtableVersion};
|
||||
use crate::memtable::{MemtableId, MemtableSet, MemtableVersion};
|
||||
use crate::metadata::RegionMetadataRef;
|
||||
use crate::schema::RegionSchemaRef;
|
||||
use crate::sst::LevelMetas;
|
||||
use crate::sst::{FileHandle, FileMeta};
|
||||
use crate::sync::CowCell;
|
||||
@@ -170,8 +171,13 @@ impl Version {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn schema(&self) -> &SchemaRef {
|
||||
&self.metadata.schema
|
||||
pub fn schema(&self) -> &RegionSchemaRef {
|
||||
self.metadata.schema()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn user_schema(&self) -> &SchemaRef {
|
||||
self.metadata.user_schema()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -199,11 +205,6 @@ impl Version {
|
||||
DEFAULT_BUCKET_DURATION
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn memtable_schema(&self) -> MemtableSchema {
|
||||
MemtableSchema::new(self.metadata.columns_row_key.clone())
|
||||
}
|
||||
|
||||
pub fn apply_edit(&mut self, edit: VersionEdit) {
|
||||
let flushed_sequence = edit.flushed_sequence.unwrap_or(self.flushed_sequence);
|
||||
if self.flushed_sequence < flushed_sequence {
|
||||
|
||||
@@ -12,7 +12,7 @@ mod snapshot;
|
||||
mod types;
|
||||
|
||||
pub use datatypes::data_type::ConcreteDataType;
|
||||
pub use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
pub use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
|
||||
|
||||
pub use self::chunk::{Chunk, ChunkReader};
|
||||
pub use self::descriptors::*;
|
||||
|
||||
@@ -2,11 +2,11 @@
|
||||
|
||||
use crate::storage::descriptors::{ColumnFamilyId, ColumnId};
|
||||
|
||||
// ---------- Ids reserved for internal column families ------------------------
|
||||
// ---------- Reserved column family ids ---------------------------------------
|
||||
|
||||
/// Column family Id for row key columns.
|
||||
///
|
||||
/// This is virtual column family, actually row key columns are not
|
||||
/// This is a virtual column family, actually row key columns are not
|
||||
/// stored in any column family.
|
||||
pub const KEY_CF_ID: ColumnFamilyId = 0;
|
||||
/// Id for default column family.
|
||||
@@ -14,11 +14,45 @@ pub const DEFAULT_CF_ID: ColumnFamilyId = 1;
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
// ---------- Ids reserved for internal columns --------------------------------
|
||||
// ---------- Reserved column ids ----------------------------------------------
|
||||
|
||||
// TODO(yingwen): Reserve one bit for internal columns.
|
||||
/// Column id for version column.
|
||||
pub const VERSION_COLUMN_ID: ColumnId = 1;
|
||||
// The reserved column id is too large to be defined as enum value (denied by the
|
||||
// `clippy::enum_clike_unportable_variant` lint). So we add this enum as offset
|
||||
// in ReservedColumnId to get the final column id.
|
||||
enum ReservedColumnType {
|
||||
Version = 0,
|
||||
Sequence,
|
||||
ValueType,
|
||||
}
|
||||
|
||||
/// Column id reserved by the engine.
|
||||
///
|
||||
/// All reserved column id has MSB (Most Significant Bit) set to 1.
|
||||
///
|
||||
/// Reserved column includes version column and other internal columns.
|
||||
pub struct ReservedColumnId;
|
||||
|
||||
impl ReservedColumnId {
|
||||
// Set MSB to 1.
|
||||
const BASE: ColumnId = 1 << (ColumnId::BITS - 1);
|
||||
|
||||
/// Column id for version column.
|
||||
/// Version column is a special reserved column that is enabled by user and
|
||||
/// visible to user.
|
||||
pub const fn version() -> ColumnId {
|
||||
Self::BASE | ReservedColumnType::Version as ColumnId
|
||||
}
|
||||
|
||||
/// Id for `__sequence` column.
|
||||
pub const fn sequence() -> ColumnId {
|
||||
Self::BASE | ReservedColumnType::Sequence as ColumnId
|
||||
}
|
||||
|
||||
/// Id for `__value_type` column.
|
||||
pub const fn value_type() -> ColumnId {
|
||||
Self::BASE | ReservedColumnType::ValueType as ColumnId
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
@@ -36,6 +70,7 @@ pub const SEQUENCE_COLUMN_NAME: &str = "__sequence";
|
||||
/// Name for time index constraint name.
|
||||
pub const TIME_INDEX_NAME: &str = "__time_index";
|
||||
|
||||
// TODO(yingwen): `__op_type` might be proper than `__value_type`.
|
||||
/// Name for reserved column: value_type
|
||||
pub const VALUE_TYPE_COLUMN_NAME: &str = "__value_type";
|
||||
|
||||
@@ -46,3 +81,15 @@ pub const VALUE_TYPE_COLUMN_NAME: &str = "__value_type";
|
||||
pub const READ_BATCH_SIZE: usize = 256;
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_reserved_id() {
|
||||
assert_eq!(0x80000000, ReservedColumnId::version());
|
||||
assert_eq!(0x80000001, ReservedColumnId::sequence());
|
||||
assert_eq!(0x80000002, ReservedColumnId::value_type());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,8 +3,7 @@ mod mock_engine;
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
|
||||
use log_store::fs::noop::NoopLogStore;
|
||||
use object_store::{backend::fs::Backend, ObjectStore};
|
||||
use storage::config::EngineConfig as StorageEngineConfig;
|
||||
@@ -32,7 +31,10 @@ pub fn schema_for_test() -> Schema {
|
||||
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
|
||||
];
|
||||
|
||||
Schema::with_timestamp_index(column_schemas, 0).expect("ts must be timestamp column")
|
||||
SchemaBuilder::from(column_schemas)
|
||||
.timestamp_index(0)
|
||||
.build()
|
||||
.expect("ts must be timestamp column")
|
||||
}
|
||||
|
||||
pub type MockMitoEngine = MitoEngine<MockEngine>;
|
||||
|
||||
@@ -43,7 +43,7 @@ impl Snapshot for MockSnapshot {
|
||||
type Reader = MockChunkReader;
|
||||
|
||||
fn schema(&self) -> &SchemaRef {
|
||||
&self.metadata.schema
|
||||
self.metadata.user_schema()
|
||||
}
|
||||
|
||||
async fn scan(
|
||||
@@ -52,7 +52,7 @@ impl Snapshot for MockSnapshot {
|
||||
_request: ScanRequest,
|
||||
) -> Result<ScanResponse<MockChunkReader>> {
|
||||
let reader = MockChunkReader {
|
||||
schema: self.metadata.schema.clone(),
|
||||
schema: self.metadata.user_schema().clone(),
|
||||
};
|
||||
|
||||
Ok(ScanResponse { reader })
|
||||
|
||||
Reference in New Issue
Block a user