feat: schema level opts (#2283)

* chore: update proto

* chore: add try from for schema name value

* chore: merge schema opts to table opts while creating table

* chore: use table ttl opts first

* chore: add unit test

* chore: update proto version
This commit is contained in:
shuiyisong
2023-08-30 16:11:08 +08:00
committed by GitHub
parent 8e9f2ffce4
commit 5df4d44761
16 changed files with 126 additions and 25 deletions

3
Cargo.lock generated
View File

@@ -1837,6 +1837,7 @@ dependencies = [
"datatypes",
"etcd-client",
"futures",
"humantime-serde",
"hyper",
"lazy_static",
"prost",
@@ -4154,7 +4155,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ec2d346e09a2f6db3b1d0aaf010e89ed8a69eccc#ec2d346e09a2f6db3b1d0aaf010e89ed8a69eccc"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=4a277f27caa035a801d5b9c020a0449777736614#4a277f27caa035a801d5b9c020a0449777736614"
dependencies = [
"prost",
"serde",

View File

@@ -77,7 +77,8 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
derive_builder = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec2d346e09a2f6db3b1d0aaf010e89ed8a69eccc" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4a277f27caa035a801d5b9c020a0449777736614" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
once_cell = "1.18"

View File

@@ -205,7 +205,7 @@ impl MigrateTableMetadata {
async fn migrate_schema_key(&self, key: &v1SchemaKey) -> Result<()> {
let new_key = SchemaNameKey::new(&key.catalog_name, &key.schema_name);
let schema_name_value = SchemaNameValue;
let schema_name_value = SchemaNameValue::default();
info!("Creating '{new_key}'");

View File

@@ -15,6 +15,7 @@ common-telemetry = { workspace = true }
common-time = { workspace = true }
etcd-client.workspace = true
futures.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
prost.workspace = true
regex.workspace = true

View File

@@ -54,6 +54,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse value {} into key {}", value, key))]
ParseOption {
key: String,
value: String,
location: Location,
},
#[snafu(display("Corrupted table route data, err: {}", err_msg))]
RouteInfoCorrupted { err_msg: String, location: Location },
@@ -151,6 +158,7 @@ impl ErrorExt for Error {
IllegalServerState { .. } | EtcdTxnOpResponse { .. } => StatusCode::Internal,
SerdeJson { .. }
| ParseOption { .. }
| RouteInfoCorrupted { .. }
| InvalidProtoMsg { .. }
| InvalidTableMetadata { .. }

View File

@@ -12,22 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
use std::time::Duration;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use futures::stream::BoxStream;
use futures::StreamExt;
use humantime_serde::re::humantime;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Error, InvalidTableMetadataSnafu, Result};
use crate::error::{self, Error, InvalidTableMetadataSnafu, ParseOptionSnafu, Result};
use crate::key::{TableMetaKey, SCHEMA_NAME_KEY_PATTERN, SCHEMA_NAME_KEY_PREFIX};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::{PutRequest, RangeRequest};
use crate::rpc::KeyValue;
const OPT_KEY_TTL: &str = "ttl";
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct SchemaNameKey<'a> {
pub catalog: &'a str,
@@ -43,8 +48,33 @@ impl<'a> Default for SchemaNameKey<'a> {
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SchemaNameValue;
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
pub struct SchemaNameValue {
#[serde(default)]
#[serde(with = "humantime_serde")]
pub ttl: Option<Duration>,
}
impl TryFrom<&HashMap<String, String>> for SchemaNameValue {
type Error = Error;
fn try_from(value: &HashMap<String, String>) -> std::result::Result<Self, Self::Error> {
let ttl = value
.get(OPT_KEY_TTL)
.map(|ttl_str| {
ttl_str.parse::<humantime::Duration>().map_err(|_| {
ParseOptionSnafu {
key: OPT_KEY_TTL,
value: ttl_str.clone(),
}
.build()
})
})
.transpose()?
.map(|ttl| ttl.into());
Ok(Self { ttl })
}
}
impl<'a> SchemaNameKey<'a> {
pub fn new(catalog: &'a str, schema: &'a str) -> Self {
@@ -108,11 +138,15 @@ impl SchemaManager {
}
/// Creates `SchemaNameKey`.
pub async fn create(&self, schema: SchemaNameKey<'_>) -> Result<()> {
pub async fn create(
&self,
schema: SchemaNameKey<'_>,
value: Option<SchemaNameValue>,
) -> Result<()> {
let raw_key = schema.as_raw_key();
let req = PutRequest::new()
.with_key(raw_key)
.with_value(SchemaNameValue.try_as_raw_value()?);
.with_value(value.unwrap_or_default().try_as_raw_value()?);
self.kv_backend.put(req).await?;
@@ -125,6 +159,14 @@ impl SchemaManager {
Ok(self.kv_backend.get(&raw_key).await?.is_some())
}
pub async fn get(&self, schema: SchemaNameKey<'_>) -> Result<Option<SchemaNameValue>> {
let raw_key = schema.as_raw_key();
let value = self.kv_backend.get(&raw_key).await?;
value
.map(|v| SchemaNameValue::try_from_raw_value(v.value.as_ref()))
.transpose()
}
/// Returns a schema stream, it lists all schemas belong to the target `catalog`.
pub async fn schema_names(&self, catalog: &str) -> BoxStream<'static, Result<String>> {
let start_key = SchemaNameKey::range_start_key(catalog);
@@ -143,25 +185,35 @@ impl SchemaManager {
#[cfg(test)]
mod tests {
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
#[test]
fn test_serialization() {
let key = SchemaNameKey::new("my-catalog", "my-schema");
assert_eq!(key.to_string(), "__schema_name/my-catalog/my-schema");
let parsed: SchemaNameKey<'_> = "__schema_name/my-catalog/my-schema".try_into().unwrap();
assert_eq!(key, parsed);
let value = SchemaNameValue {
ttl: Some(Duration::from_secs(10)),
};
let mut opts: HashMap<String, String> = HashMap::new();
opts.insert("ttl".to_string(), "10s".to_string());
let from_value = SchemaNameValue::try_from(&opts).unwrap();
assert_eq!(value, from_value);
let parsed = SchemaNameValue::try_from_raw_value("{\"ttl\":\"10s\"}".as_bytes()).unwrap();
assert_eq!(value, parsed);
}
#[tokio::test]
async fn test_key_exist() {
let manager = SchemaManager::new(Arc::new(MemoryKvBackend::default()));
let schema_key = SchemaNameKey::new("my-catalog", "my-schema");
manager.create(schema_key).await.unwrap();
manager.create(schema_key, None).await.unwrap();
assert!(manager.exist(schema_key).await.unwrap());

View File

@@ -12,7 +12,7 @@ common-error = { workspace = true }
common-runtime = { workspace = true }
common-telemetry = { workspace = true }
futures.workspace = true
humantime-serde = "1.1"
humantime-serde.workspace = true
object-store = { workspace = true }
serde.workspace = true
serde_json = "1.0"

View File

@@ -40,7 +40,7 @@ datatypes = { workspace = true }
file-table-engine = { workspace = true }
futures = "0.3"
futures-util.workspace = true
humantime-serde = "1.1"
humantime-serde.workspace = true
hyper = { version = "0.14", features = ["full"] }
key-lock = "0.1"
log-store = { workspace = true }

View File

@@ -365,6 +365,7 @@ mod test {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "my_database".to_string(),
create_if_not_exists: true,
options: Default::default(),
})),
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
@@ -418,6 +419,7 @@ mod test {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "my_database".to_string(),
create_if_not_exists: true,
options: Default::default(),
})),
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
@@ -485,6 +487,7 @@ mod test {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "my_database".to_string(),
create_if_not_exists: true,
options: Default::default(),
})),
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
@@ -589,6 +592,7 @@ mod test {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "my_database".to_string(),
create_if_not_exists: true,
options: Default::default(),
})),
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
@@ -661,6 +665,7 @@ mod test {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "my_database".to_string(),
create_if_not_exists: true,
options: Default::default(),
})),
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();

View File

@@ -39,7 +39,7 @@ datatypes = { workspace = true }
file-table-engine = { workspace = true }
futures = "0.3"
futures-util.workspace = true
humantime-serde = "1.1"
humantime-serde.workspace = true
itertools.workspace = true
meta-client = { workspace = true }
# Although it is not used, please do not delete it.

View File

@@ -34,7 +34,7 @@ use client::Database;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
use common_meta::peer::Peer;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::router::{Partition, Partition as MetaPartition, RouteRequest};
@@ -67,8 +67,9 @@ use crate::catalog::FrontendCatalogManager;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
DeserializePartitionSnafu, InvokeDatanodeSnafu, NotSupportedSnafu, ParseSqlSnafu,
RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu,
TableNotFoundSnafu, TableSnafu, UnrecognizedTableOptionSnafu,
RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, SchemaNotFoundSnafu,
TableAlreadyExistSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, TableSnafu,
UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
use crate::instance::distributed::deleter::DistDeleter;
@@ -104,6 +105,25 @@ impl DistInstance {
partitions: Option<Partitions>,
) -> Result<TableRef> {
let _timer = common_telemetry::timer!(crate::metrics::DIST_CREATE_TABLE);
// 1. get schema info
let schema = self
.catalog_manager
.table_metadata_manager_ref()
.schema_manager()
.get(SchemaNameKey::new(
&create_table.catalog_name,
&create_table.schema_name,
))
.await
.context(TableMetadataManagerSnafu)?;
let Some(schema_opts) = schema else {
return SchemaNotFoundSnafu {
schema_info: &create_table.schema_name,
}
.fail();
};
let table_name = TableName::new(
&create_table.catalog_name,
&create_table.schema_name,
@@ -112,7 +132,7 @@ impl DistInstance {
let (partitions, partition_cols) = parse_partitions(create_table, partitions)?;
let mut table_info = create_table_info(create_table, partition_cols)?;
let mut table_info = create_table_info(create_table, partition_cols, schema_opts)?;
let resp = self
.create_table_procedure(create_table, partitions, table_info.clone())
@@ -340,6 +360,7 @@ impl DistInstance {
let expr = CreateDatabaseExpr {
database_name: stmt.name.to_string(),
create_if_not_exists: stmt.if_not_exists,
options: Default::default(),
};
self.handle_create_database(expr, query_ctx).await
}
@@ -477,10 +498,12 @@ impl DistInstance {
}
);
let schema_value =
SchemaNameValue::try_from(&expr.options).context(error::TableMetadataManagerSnafu)?;
self.catalog_manager
.table_metadata_manager_ref()
.schema_manager()
.create(schema)
.create(schema, Some(schema_value))
.await
.context(error::TableMetadataManagerSnafu)?;
@@ -772,6 +795,7 @@ fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<Option<Parti
fn create_table_info(
create_table: &CreateTableExpr,
partition_columns: Vec<String>,
schema_opts: SchemaNameValue,
) -> Result<RawTableInfo> {
let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
let mut column_name_to_index_map = HashMap::new();
@@ -818,6 +842,10 @@ fn create_table_info(
})
.collect::<Result<Vec<_>>>()?;
let table_options = TableOptions::try_from(&create_table.table_options)
.context(UnrecognizedTableOptionSnafu)?;
let table_options = merge_options(table_options, schema_opts);
let meta = RawTableMeta {
schema: raw_schema,
primary_key_indices,
@@ -826,8 +854,7 @@ fn create_table_info(
next_column_id: column_schemas.len() as u32,
region_numbers: vec![],
engine_options: HashMap::new(),
options: TableOptions::try_from(&create_table.table_options)
.context(UnrecognizedTableOptionSnafu)?,
options: table_options,
created_on: DateTime::default(),
partition_key_indices,
};
@@ -854,6 +881,11 @@ fn create_table_info(
Ok(table_info)
}
fn merge_options(mut table_opts: TableOptions, schema_opts: SchemaNameValue) -> TableOptions {
table_opts.ttl = table_opts.ttl.or(schema_opts.ttl);
table_opts
}
fn parse_partitions(
create_table: &CreateTableExpr,
partitions: Option<Partitions>,

View File

@@ -205,7 +205,7 @@ mod tests {
.await
.unwrap();
schema_manager
.create(SchemaNameKey::default())
.create(SchemaNameKey::default(), None)
.await
.unwrap();

View File

@@ -90,7 +90,7 @@ impl MetadataService for DefaultMetadataService {
if !exist {
self.table_metadata_manager
.schema_manager()
.create(schema)
.create(schema, None)
.await
.context(error::TableMetadataManagerSnafu)?;

View File

@@ -44,7 +44,7 @@ headers = "0.3"
hex = { version = "0.4" }
hostname = "0.3.1"
http-body = "0.4"
humantime-serde = "1.1"
humantime-serde.workspace = true
hyper = { version = "0.14", features = ["full"] }
influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" }
itertools.workspace = true

View File

@@ -27,7 +27,7 @@ datatypes = { workspace = true }
derive_builder.workspace = true
futures.workspace = true
humantime = "2.1"
humantime-serde = "1.1"
humantime-serde.workspace = true
paste = "1.0"
serde.workspace = true
snafu = { version = "0.7", features = ["backtraces"] }

View File

@@ -69,6 +69,7 @@ mod test {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "database_created_through_grpc".to_string(),
create_if_not_exists: true,
options: Default::default(),
})),
});
let output = query(instance, request).await;