feat: create view (#3807)

* add statement

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: rebase with main

* fix: create flow

* feat: adds gRPC stuff

* feat: impl create_view ddl in operator

* feat: impl CreateViewProcedure

* chore: update cargo lock

* fix: format

* chore: compile error after rebasing main

* chore: refactor and test create view parser

* chore: fixed todo list and comments

* fix: compile error after rebeasing

* test: add create view test

* test: test view_info keys

* test: adds test for CreateViewProcedure and clean code

* test: adds more sqlness test for creating views

* chore: update cargo lock

* fix: don't replace normal table in CreateViewProcedure

* chore: apply suggestion

Co-authored-by: Jeremyhi <jiachun_feng@proton.me>

* chore: style

Co-authored-by: Jeremyhi <jiachun_feng@proton.me>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Jeremyhi <jiachun_feng@proton.me>
This commit is contained in:
dennis zhuang
2024-05-14 16:03:29 +08:00
committed by GitHub
parent f16ce3ca27
commit efd3f04b7c
36 changed files with 1966 additions and 116 deletions

104
Cargo.lock generated
View File

@@ -581,11 +581,11 @@ dependencies = [
[[package]]
name = "async-compression"
version = "0.4.9"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e9eabd7a98fe442131a17c316bd9349c43695e49e730c3c8e12cfb5f4da2693"
checksum = "9c90a406b4495d129f00461241616194cb8a032c8d1c53c657f0961d5f8e0498"
dependencies = [
"brotli 5.0.0",
"brotli 6.0.0",
"bzip2",
"flate2",
"futures-core",
@@ -1068,9 +1068,9 @@ dependencies = [
[[package]]
name = "brotli"
version = "5.0.0"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19483b140a7ac7174d34b5a581b406c64f84da5409d3e09cf4fff604f9270e67"
checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
@@ -2589,7 +2589,7 @@ dependencies = [
"arrow-array",
"arrow-ipc",
"arrow-schema",
"async-compression 0.4.9",
"async-compression 0.4.10",
"async-trait",
"bytes",
"bzip2",
@@ -2641,7 +2641,7 @@ dependencies = [
"arrow-array",
"arrow-ipc",
"arrow-schema",
"async-compression 0.4.9",
"async-compression 0.4.10",
"async-trait",
"bytes",
"bzip2",
@@ -3577,9 +3577,9 @@ dependencies = [
[[package]]
name = "errno"
version = "0.3.8"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245"
checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba"
dependencies = [
"libc",
"windows-sys 0.52.0",
@@ -4200,7 +4200,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=219b2409bb701f75b43fc0ba64967d2ed8e75491#219b2409bb701f75b43fc0ba64967d2ed8e75491"
source = "git+https://github.com/killme2008/greptime-proto.git?rev=57e186d572c6c5898e90d6ab9e91b0867c30d1da#57e186d572c6c5898e90d6ab9e91b0867c30d1da"
dependencies = [
"prost 0.12.4",
"serde",
@@ -4550,7 +4550,7 @@ dependencies = [
"clap 4.5.4",
"data-encoding",
"itertools 0.10.5",
"prettyplease 0.2.19",
"prettyplease 0.2.20",
"proc-macro2",
"quote",
"regex",
@@ -6173,9 +6173,9 @@ dependencies = [
[[package]]
name = "num"
version = "0.4.2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3135b08af27d103b0a51f2ae0f8632117b7b185ccf931445affa8df530576a41"
checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23"
dependencies = [
"num-bigint",
"num-complex",
@@ -6214,9 +6214,9 @@ dependencies = [
[[package]]
name = "num-complex"
version = "0.4.5"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23c6602fda94a57c990fe0df199a035d83576b496aa29f4e634a8ac6004e68a6"
checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495"
dependencies = [
"num-traits",
]
@@ -6270,11 +6270,10 @@ dependencies = [
[[package]]
name = "num-rational"
version = "0.4.1"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0"
checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824"
dependencies = [
"autocfg",
"num-bigint",
"num-integer",
"num-traits",
@@ -6643,6 +6642,7 @@ dependencies = [
"sql",
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)",
"store-api",
"substrait 0.7.2",
"table",
"tokio",
"tonic 0.11.0",
@@ -7022,9 +7022,9 @@ dependencies = [
[[package]]
name = "petgraph"
version = "0.6.4"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9"
checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db"
dependencies = [
"fixedbitset",
"indexmap 2.2.6",
@@ -7313,7 +7313,7 @@ dependencies = [
"parking_lot 0.12.2",
"prost 0.12.4",
"prost-build 0.12.4",
"prost-derive 0.12.4",
"prost-derive 0.12.5",
"protobuf",
"sha2",
"smallvec",
@@ -7386,9 +7386,9 @@ dependencies = [
[[package]]
name = "prettyplease"
version = "0.2.19"
version = "0.2.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ac2cf0f2e4f42b49f5ffd07dae8d746508ef7526c13940e5f524012ae6c6550"
checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e"
dependencies = [
"proc-macro2",
"syn 2.0.61",
@@ -7439,9 +7439,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.81"
version = "1.0.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba"
checksum = "8ad3d49ab951a01fbaafe34f2ec74122942fe18a3f9814c3268f1bb72042131b"
dependencies = [
"unicode-ident",
]
@@ -7561,7 +7561,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0f5d036824e4761737860779c906171497f6d55681139d8312388f8fe398922"
dependencies = [
"bytes",
"prost-derive 0.12.4",
"prost-derive 0.12.5",
]
[[package]]
@@ -7599,7 +7599,7 @@ dependencies = [
"multimap 0.10.0",
"once_cell",
"petgraph",
"prettyplease 0.2.19",
"prettyplease 0.2.20",
"prost 0.12.4",
"prost-types 0.12.4",
"regex",
@@ -7622,9 +7622,9 @@ dependencies = [
[[package]]
name = "prost-derive"
version = "0.12.4"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19de2de2a00075bf566bee3bd4db014b11587e84184d3f7a791bc17f1a8e9e48"
checksum = "9554e3ab233f0a932403704f1a1d08c30d5ccd931adfdfa1e8b5a19b52c1d55a"
dependencies = [
"anyhow",
"itertools 0.12.1",
@@ -8087,18 +8087,18 @@ dependencies = [
[[package]]
name = "ref-cast"
version = "1.0.22"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4846d4c50d1721b1a3bef8af76924eef20d5e723647333798c1b519b3a9473f"
checksum = "ccf0a6f84d5f1d581da8b41b47ec8600871962f2a528115b542b362d4b744931"
dependencies = [
"ref-cast-impl",
]
[[package]]
name = "ref-cast-impl"
version = "1.0.22"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fddb4f8d99b0a2ebafc65a87a69a7b9875e4b1ae1f00db265d300ef7f28bccc"
checksum = "bcc303e793d3734489387d205e9b186fac9c6cfacedd98cbb2e8a5943595f3e6"
dependencies = [
"proc-macro2",
"quote",
@@ -8591,9 +8591,9 @@ dependencies = [
[[package]]
name = "rustc-demangle"
version = "0.1.23"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "rustc-hash"
@@ -8694,9 +8694,9 @@ dependencies = [
[[package]]
name = "rustls-pki-types"
version = "1.5.0"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "beb461507cee2c2ff151784c52762cf4d9ff6a61f3e80968600ed24fa837fa54"
checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d"
[[package]]
name = "rustls-webpki"
@@ -9318,18 +9318,18 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4"
[[package]]
name = "serde"
version = "1.0.200"
version = "1.0.201"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddc6f9cc94d67c0e21aaf7eda3a010fd3af78ebf6e096aa6e2e13c79749cce4f"
checksum = "780f1cebed1629e4753a1a38a3c72d30b97ec044f0aef68cb26650a3c5cf363c"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.200"
version = "1.0.201"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "856f046b9400cee3c8c94ed572ecdb752444c24528c035cd35882aad6f492bcb"
checksum = "c5e405930b9796f1c00bee880d03fc7e0bb4b9a11afc776885ffe84320da2865"
dependencies = [
"proc-macro2",
"quote",
@@ -9349,9 +9349,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.116"
version = "1.0.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e17db7126d17feb94eb3fad46bf1a96b034e8aacbc2e775fe81505f8b0b2813"
checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3"
dependencies = [
"indexmap 2.2.6",
"itoa",
@@ -10310,7 +10310,7 @@ checksum = "f1e8440a1c9b95a7c9a00a19f78b980749e8c945eb880687a5d673cea83729c5"
dependencies = [
"git2",
"heck 0.4.1",
"prettyplease 0.2.19",
"prettyplease 0.2.20",
"prost 0.12.4",
"prost-build 0.12.4",
"prost-types 0.12.4",
@@ -10331,7 +10331,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba959c71b2a1a341a94e1f362615d7e5f1a4de9d25d82fceea8160f79f1e1dfb"
dependencies = [
"heck 0.5.0",
"prettyplease 0.2.19",
"prettyplease 0.2.20",
"prost 0.12.4",
"prost-build 0.12.4",
"prost-types 0.12.4",
@@ -11189,7 +11189,7 @@ version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889"
dependencies = [
"prettyplease 0.2.19",
"prettyplease 0.2.20",
"proc-macro2",
"prost-build 0.12.4",
"quote",
@@ -11202,7 +11202,7 @@ version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4ef6dd70a610078cb4e338a0f79d06bc759ff1b22d2120c2ff02ae264ba9c2"
dependencies = [
"prettyplease 0.2.19",
"prettyplease 0.2.20",
"proc-macro2",
"prost-build 0.12.4",
"quote",
@@ -11249,7 +11249,7 @@ version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140"
dependencies = [
"async-compression 0.4.9",
"async-compression 0.4.10",
"base64 0.21.7",
"bitflags 2.5.0",
"bytes",
@@ -12605,18 +12605,18 @@ dependencies = [
[[package]]
name = "zerocopy"
version = "0.7.33"
version = "0.7.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "087eca3c1eaf8c47b94d02790dd086cd594b912d2043d4de4bfdd466b3befb7c"
checksum = "ae87e3fcd617500e5d106f0380cf7b77f3c6092aae37191433159dda23cfb087"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.33"
version = "0.7.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f4b6c273f496d8fd4eaf18853e6b448760225dc030ff2c485a786859aea6393"
checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b"
dependencies = [
"proc-macro2",
"quote",

View File

@@ -117,7 +117,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "219b2409bb701f75b43fc0ba64967d2ed8e75491" }
greptime-proto = { git = "https://github.com/killme2008/greptime-proto.git", rev = "57e186d572c6c5898e90d6ab9e91b0867c30d1da" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"

View File

@@ -480,6 +480,8 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
Some(Expr::TruncateTable(_)) => "ddl.truncate_table",
Some(Expr::CreateFlow(_)) => "ddl.create_flow",
Some(Expr::DropFlow(_)) => "ddl.drop_flow",
Some(Expr::CreateView(_)) => "ddl.create_view",
Some(Expr::DropView(_)) => "ddl.drop_view",
None => "ddl.empty",
}
}

View File

@@ -38,6 +38,7 @@ pub mod create_flow;
pub mod create_logical_tables;
pub mod create_table;
mod create_table_template;
pub mod create_view;
pub mod drop_database;
pub mod drop_flow;
pub mod drop_table;
@@ -89,6 +90,7 @@ pub struct TableMetadataAllocatorContext {
}
/// Metadata allocated to a table.
#[derive(Default)]
pub struct TableMetadata {
/// Table id.
pub table_id: TableId,

View File

@@ -43,7 +43,7 @@ impl CreateLogicalTablesProcedure {
} else {
self.context
.table_metadata_allocator
.allocate_table_id(task)
.allocate_table_id(&task.create_table.table_id)
.await?
};
task.set_table_id(table_id);

View File

@@ -0,0 +1,276 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId, TableType};
use table::table_reference::TableReference;
use crate::ddl::utils::handle_retry_error;
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;
use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
use crate::rpc::ddl::CreateViewTask;
use crate::{metrics, ClusterId};
// The proceudure to execute `[CreateViewTask]`.
pub struct CreateViewProcedure {
pub context: DdlContext,
pub creator: ViewCreator,
}
impl CreateViewProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateView";
pub fn new(cluster_id: ClusterId, task: CreateViewTask, context: DdlContext) -> Self {
Self {
context,
creator: ViewCreator::new(cluster_id, task),
}
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
let creator = ViewCreator { data };
Ok(CreateViewProcedure { context, creator })
}
fn view_info(&self) -> &RawTableInfo {
&self.creator.data.task.view_info
}
fn need_update(&self) -> bool {
self.creator.data.need_update
}
pub(crate) fn view_id(&self) -> TableId {
self.view_info().ident.table_id
}
#[cfg(any(test, feature = "testing"))]
pub fn set_allocated_metadata(&mut self, view_id: TableId) {
self.creator.set_allocated_metadata(view_id, false)
}
/// On the prepare step, it performs:
/// - Checks whether the view exists.
/// - Allocates the view id.
///
/// Abort(non-retry):
/// - ViewName exists and `create_if_not_exists` is false.
/// - Failed to allocate [ViewMetadata].
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
let expr = &self.creator.data.task.create_view;
let view_name_value = self
.context
.table_metadata_manager
.table_name_manager()
.get(TableNameKey::new(
&expr.catalog_name,
&expr.schema_name,
&expr.view_name,
))
.await?;
// If `view_id` is None, creating the new view,
// otherwise:
// - replaces the exists one when `or_replace` is true.
// - returns the exists one when `create_if_not_exists` is true.
// - throws the `[ViewAlreadyExistsSnafu]` error.
let mut view_id = None;
if let Some(value) = view_name_value {
ensure!(
expr.create_if_not_exists || expr.or_replace,
error::ViewAlreadyExistsSnafu {
view_name: self.creator.data.table_ref().to_string(),
}
);
let exists_view_id = value.table_id();
if !expr.or_replace {
return Ok(Status::done_with_output(exists_view_id));
}
view_id = Some(exists_view_id);
}
if let Some(view_id) = view_id {
let view_info_value = self
.context
.table_metadata_manager
.table_info_manager()
.get(view_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table: self.creator.data.table_ref().to_string(),
})?;
// Ensure the exists one is view, we can't replace a table.
ensure!(
view_info_value.table_info.table_type == TableType::View,
error::TableAlreadyExistsSnafu {
table_name: self.creator.data.table_ref().to_string(),
}
);
self.creator.set_allocated_metadata(view_id, true);
} else {
// Allocate the new `view_id`.
let TableMetadata { table_id, .. } = self
.context
.table_metadata_allocator
.create_view(
&TableMetadataAllocatorContext {
cluster_id: self.creator.data.cluster_id,
},
&None,
)
.await?;
self.creator.set_allocated_metadata(table_id, false);
}
self.creator.data.state = CreateViewState::CreateMetadata;
Ok(Status::executing(true))
}
/// Creates view metadata
///
/// Abort(not-retry):
/// - Failed to create view metadata.
async fn on_create_metadata(&mut self, ctx: &ProcedureContext) -> Result<Status> {
let view_id = self.view_id();
let manager = &self.context.table_metadata_manager;
if self.need_update() {
// Retrieve the current view info and try to update it.
let current_view_info = manager
.view_info_manager()
.get(view_id)
.await?
.with_context(|| error::ViewNotFoundSnafu {
view_name: self.creator.data.table_ref().to_string(),
})?;
let new_logical_plan = self.creator.data.task.raw_logical_plan().clone();
manager
.update_view_info(view_id, &current_view_info, new_logical_plan)
.await?;
info!("Updated view metadata for view {view_id}");
} else {
let raw_view_info = self.view_info().clone();
manager
.create_view_metadata(raw_view_info, self.creator.data.task.raw_logical_plan())
.await?;
info!(
"Created view metadata for view {view_id} with procedure: {}",
ctx.procedure_id
);
}
Ok(Status::done_with_output(view_id))
}
}
#[async_trait]
impl Procedure for CreateViewProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.creator.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_CREATE_VIEW
.with_label_values(&[state.as_ref()])
.start_timer();
match state {
CreateViewState::Prepare => self.on_prepare().await,
CreateViewState::CreateMetadata => self.on_create_metadata(ctx).await,
}
.map_err(handle_retry_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.creator.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let table_ref = &self.creator.data.table_ref();
LockKey::new(vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
])
}
}
/// The VIEW creator
pub struct ViewCreator {
/// The serializable data.
pub data: CreateViewData,
}
impl ViewCreator {
pub fn new(cluster_id: u64, task: CreateViewTask) -> Self {
Self {
data: CreateViewData {
state: CreateViewState::Prepare,
cluster_id,
task,
need_update: false,
},
}
}
fn set_allocated_metadata(&mut self, view_id: TableId, need_update: bool) {
self.data.task.view_info.ident.table_id = view_id;
self.data.need_update = need_update;
}
}
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
pub enum CreateViewState {
/// Prepares to create the table
Prepare,
/// Creates metadata
CreateMetadata,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateViewData {
pub state: CreateViewState,
pub task: CreateViewTask,
pub cluster_id: ClusterId,
/// Whether to update the view info.
pub need_update: bool,
}
impl CreateViewData {
fn table_ref(&self) -> TableReference<'_> {
self.task.table_ref()
}
}

View File

@@ -62,8 +62,11 @@ impl TableMetadataAllocator {
}
}
pub(crate) async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
let table_id = if let Some(table_id) = &task.create_table.table_id {
pub(crate) async fn allocate_table_id(
&self,
table_id: &Option<api::v1::TableId>,
) -> Result<TableId> {
let table_id = if let Some(table_id) = table_id {
let table_id = table_id.id;
ensure!(
@@ -143,12 +146,26 @@ impl TableMetadataAllocator {
Ok(PhysicalTableRouteValue::new(region_routes))
}
/// Create VIEW metadata
pub async fn create_view(
&self,
_ctx: &TableMetadataAllocatorContext,
table_id: &Option<api::v1::TableId>,
) -> Result<TableMetadata> {
let table_id = self.allocate_table_id(table_id).await?;
Ok(TableMetadata {
table_id,
..Default::default()
})
}
pub async fn create(
&self,
ctx: &TableMetadataAllocatorContext,
task: &CreateTableTask,
) -> Result<TableMetadata> {
let table_id = self.allocate_table_id(task).await?;
let table_id = self.allocate_table_id(&task.create_table.table_id).await?;
let table_route = self.create_table_route(ctx, table_id, task).await?;
let region_wal_options = self.create_wal_options(&table_route)?;

View File

@@ -17,6 +17,7 @@ mod alter_table;
mod create_flow;
mod create_logical_tables;
mod create_table;
mod create_view;
mod drop_database;
mod drop_flow;
mod drop_table;

View File

@@ -40,7 +40,7 @@ use crate::kv_backend::memory::MemoryKvBackend;
use crate::rpc::ddl::CreateTableTask;
use crate::test_util::{new_ddl_context, new_ddl_context_with_kv_backend, MockDatanodeManager};
fn test_create_table_task(name: &str) -> CreateTableTask {
pub(crate) fn test_create_table_task(name: &str) -> CreateTableTask {
let create_table = TestCreateTableExprBuilder::default()
.column_defs([
TestColumnDefBuilder::default()

View File

@@ -0,0 +1,227 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::sync::Arc;
use api::v1::CreateViewExpr;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status};
use common_procedure_test::MockContextProvider;
use table::metadata;
use table::metadata::{RawTableInfo, RawTableMeta, TableType};
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::create_view::CreateViewProcedure;
use crate::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
use crate::ddl::tests::create_table::test_create_table_task;
use crate::error::Error;
use crate::rpc::ddl::CreateViewTask;
use crate::test_util::{new_ddl_context, MockDatanodeManager};
fn test_create_view_task(name: &str) -> CreateViewTask {
let expr = CreateViewExpr {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
view_name: name.to_string(),
or_replace: false,
create_if_not_exists: false,
logical_plan: vec![1, 2, 3],
};
let view_info = RawTableInfo {
ident: metadata::TableIdent {
table_id: 0,
version: 0,
},
name: expr.view_name.clone(),
desc: None,
catalog_name: expr.catalog_name.clone(),
schema_name: expr.schema_name.clone(),
meta: RawTableMeta::default(),
table_type: TableType::View,
};
CreateViewTask {
create_view: expr,
view_info,
}
}
#[tokio::test]
async fn test_on_prepare_view_exists_err() {
let node_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(node_manager);
let cluster_id = 1;
let task = test_create_view_task("foo");
assert!(!task.create_view.create_if_not_exists);
// Puts a value to table name key.
ddl_context
.table_metadata_manager
.create_view_metadata(task.view_info.clone(), &task.create_view.logical_plan)
.await
.unwrap();
let mut procedure = CreateViewProcedure::new(cluster_id, task, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, Error::ViewAlreadyExists { .. });
assert_eq!(err.status_code(), StatusCode::TableAlreadyExists);
}
#[tokio::test]
async fn test_on_prepare_with_create_if_view_exists() {
let node_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(node_manager);
let cluster_id = 1;
let mut task = test_create_view_task("foo");
task.create_view.create_if_not_exists = true;
task.view_info.ident.table_id = 1024;
// Puts a value to table name key.
ddl_context
.table_metadata_manager
.create_view_metadata(task.view_info.clone(), &task.create_view.logical_plan)
.await
.unwrap();
let mut procedure = CreateViewProcedure::new(cluster_id, task, ddl_context);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Done { output: Some(..) });
let table_id = *status.downcast_output_ref::<u32>().unwrap();
assert_eq!(table_id, 1024);
}
#[tokio::test]
async fn test_on_prepare_without_create_if_table_exists() {
let node_manager = Arc::new(MockDatanodeManager::new(()));
let ddl_context = new_ddl_context(node_manager);
let cluster_id = 1;
let mut task = test_create_view_task("foo");
task.create_view.create_if_not_exists = true;
let mut procedure = CreateViewProcedure::new(cluster_id, task, ddl_context);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_eq!(procedure.view_id(), 1024);
}
#[tokio::test]
async fn test_on_create_metadata() {
common_telemetry::init_default_ut_logging();
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(node_manager);
let cluster_id = 1;
let task = test_create_view_task("foo");
assert!(!task.create_view.create_if_not_exists);
let mut procedure = CreateViewProcedure::new(cluster_id, task, ddl_context);
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
// Triggers procedure to create view metadata
let status = procedure.execute(&ctx).await.unwrap();
let view_id = status.downcast_output_ref::<u32>().unwrap();
assert_eq!(*view_id, 1024);
}
#[tokio::test]
async fn test_replace_view_metadata() {
common_telemetry::init_default_ut_logging();
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(node_manager.clone());
let cluster_id = 1;
let task = test_create_view_task("foo");
assert!(!task.create_view.create_if_not_exists);
let mut procedure = CreateViewProcedure::new(cluster_id, task.clone(), ddl_context.clone());
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
// Triggers procedure to create view metadata
let status = procedure.execute(&ctx).await.unwrap();
let view_id = status.downcast_output_ref::<u32>().unwrap();
assert_eq!(*view_id, 1024);
let current_view_info = ddl_context
.table_metadata_manager
.view_info_manager()
.get(*view_id)
.await
.unwrap()
.unwrap();
assert_eq!(current_view_info.view_info, vec![1, 2, 3]);
// Create new task to replace the exists one.
let mut task = test_create_view_task("foo");
// The view already exists, prepare should fail
{
let mut procedure = CreateViewProcedure::new(cluster_id, task.clone(), ddl_context.clone());
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, Error::ViewAlreadyExists { .. });
assert_eq!(err.status_code(), StatusCode::TableAlreadyExists);
}
// Set `or_replce` to be `true` and try again
task.create_view.or_replace = true;
task.create_view.logical_plan = vec![4, 5, 6];
let mut procedure = CreateViewProcedure::new(cluster_id, task, ddl_context.clone());
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
// Triggers procedure to replace view metadata, but the view_id is unchanged.
let status = procedure.execute(&ctx).await.unwrap();
let view_id = status.downcast_output_ref::<u32>().unwrap();
assert_eq!(*view_id, 1024);
let current_view_info = ddl_context
.table_metadata_manager
.view_info_manager()
.get(*view_id)
.await
.unwrap()
.unwrap();
assert_eq!(current_view_info.view_info, vec![4, 5, 6]);
}
#[tokio::test]
async fn test_replace_table() {
common_telemetry::init_default_ut_logging();
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(node_manager.clone());
let cluster_id = 1;
{
// Create a `foo` table.
let task = test_create_table_task("foo");
let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context.clone());
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
};
procedure.execute(&ctx).await.unwrap();
procedure.execute(&ctx).await.unwrap();
}
// Try to replace a view named `foo` too.
let mut task = test_create_view_task("foo");
task.create_view.or_replace = true;
let mut procedure = CreateViewProcedure::new(cluster_id, task.clone(), ddl_context.clone());
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, Error::TableAlreadyExists { .. });
assert_eq!(err.status_code(), StatusCode::TableAlreadyExists);
}

View File

@@ -29,6 +29,7 @@ use crate::ddl::create_database::CreateDatabaseProcedure;
use crate::ddl::create_flow::CreateFlowProcedure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::create_view::CreateViewProcedure;
use crate::ddl::drop_database::DropDatabaseProcedure;
use crate::ddl::drop_flow::DropFlowProcedure;
use crate::ddl::drop_table::DropTableProcedure;
@@ -45,12 +46,12 @@ use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::rpc::ddl::DdlTask::{
AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables, CreateTable,
DropDatabase, DropFlow, DropLogicalTables, DropTable, TruncateTable,
CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView, TruncateTable,
};
use crate::rpc::ddl::{
AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, DropDatabaseTask,
DropFlowTask, DropTableTask, QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse,
TruncateTableTask,
AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, CreateViewTask,
DropDatabaseTask, DropFlowTask, DropTableTask, QueryContext, SubmitDdlTaskRequest,
SubmitDdlTaskResponse, TruncateTableTask,
};
use crate::rpc::procedure;
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
@@ -122,6 +123,7 @@ impl DdlManager {
let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!(
CreateTableProcedure,
CreateLogicalTablesProcedure,
CreateViewProcedure,
CreateFlowProcedure,
AlterTableProcedure,
AlterLogicalTablesProcedure,
@@ -142,8 +144,8 @@ impl DdlManager {
Ok(())
}
#[tracing::instrument(skip_all)]
/// Submits and executes an alter table task.
#[tracing::instrument(skip_all)]
pub async fn submit_alter_table_task(
&self,
cluster_id: ClusterId,
@@ -159,8 +161,8 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
/// Submits and executes a create table task.
#[tracing::instrument(skip_all)]
pub async fn submit_create_table_task(
&self,
cluster_id: ClusterId,
@@ -175,8 +177,24 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
/// Submits and executes a `[CreateViewTask]`.
#[tracing::instrument(skip_all)]
pub async fn submit_create_view_task(
&self,
cluster_id: ClusterId,
create_view_task: CreateViewTask,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = CreateViewProcedure::new(cluster_id, create_view_task, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
}
/// Submits and executes a create multiple logical table tasks.
#[tracing::instrument(skip_all)]
pub async fn submit_create_logical_table_tasks(
&self,
cluster_id: ClusterId,
@@ -197,8 +215,8 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
/// Submits and executes alter multiple table tasks.
#[tracing::instrument(skip_all)]
pub async fn submit_alter_logical_table_tasks(
&self,
cluster_id: ClusterId,
@@ -219,8 +237,8 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
/// Submits and executes a drop table task.
#[tracing::instrument(skip_all)]
pub async fn submit_drop_table_task(
&self,
cluster_id: ClusterId,
@@ -235,8 +253,8 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
/// Submits and executes a create database task.
#[tracing::instrument(skip_all)]
pub async fn submit_create_database(
&self,
_cluster_id: ClusterId,
@@ -255,8 +273,8 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
/// Submits and executes a drop table task.
#[tracing::instrument(skip_all)]
pub async fn submit_drop_database(
&self,
_cluster_id: ClusterId,
@@ -273,8 +291,8 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
/// Submits and executes a create flow task.
#[tracing::instrument(skip_all)]
pub async fn submit_create_flow_task(
&self,
cluster_id: ClusterId,
@@ -302,8 +320,8 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
#[tracing::instrument(skip_all)]
/// Submits and executes a truncate table task.
#[tracing::instrument(skip_all)]
pub async fn submit_truncate_table_task(
&self,
cluster_id: ClusterId,
@@ -649,6 +667,34 @@ async fn handle_alter_logical_table_tasks(
})
}
/// Handle the `[CreateViewTask]` and returns the DDL response when success.
async fn handle_create_view_task(
ddl_manager: &DdlManager,
cluster_id: ClusterId,
create_view_task: CreateViewTask,
) -> Result<SubmitDdlTaskResponse> {
let (id, output) = ddl_manager
.submit_create_view_task(cluster_id, create_view_task)
.await?;
let procedure_id = id.to_string();
let output = output.context(ProcedureOutputSnafu {
procedure_id: &procedure_id,
err_msg: "empty output",
})?;
let view_id = *(output.downcast_ref::<u32>().context(ProcedureOutputSnafu {
procedure_id: &procedure_id,
err_msg: "downcast to `u32`",
})?);
info!("View: {view_id} is created via procedure_id {id:?}");
Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
table_id: Some(view_id),
..Default::default()
})
}
/// TODO(dennis): let [`DdlManager`] implement [`ProcedureExecutor`] looks weird, find some way to refactor it.
#[async_trait::async_trait]
impl ProcedureExecutor for DdlManager {
@@ -704,6 +750,12 @@ impl ProcedureExecutor for DdlManager {
DropFlow(drop_flow_task) => {
handle_drop_flow_task(self, cluster_id, drop_flow_task).await
}
CreateView(create_view_task) => {
handle_create_view_task(self, cluster_id, create_view_task).await
}
DropView(_create_view_task) => {
todo!("implemented in the following PR");
}
}
}
.trace(span)

View File

@@ -313,6 +313,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("View already exists, view: {}", view_name))]
ViewAlreadyExists {
view_name: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Flow already exists: {}", flow_name))]
FlowAlreadyExists {
flow_name: String,
@@ -350,6 +357,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("View not found: '{}'", view_name))]
ViewNotFound {
view_name: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Flow not found: '{}'", flow_name))]
FlowNotFound {
flow_name: String,
@@ -378,6 +392,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid view info, err: {}", err_msg))]
InvalidViewInfo {
err_msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to get kv cache, err: {}", err_msg))]
GetKvCache { err_msg: String },
@@ -638,6 +659,7 @@ impl ErrorExt for Error {
| RouteInfoCorrupted { .. }
| InvalidProtoMsg { .. }
| InvalidTableMetadata { .. }
| InvalidViewInfo { .. }
| MoveRegion { .. }
| Unexpected { .. }
| TableInfoNotFound { .. }
@@ -688,8 +710,8 @@ impl ErrorExt for Error {
FlowNotFound { .. } => StatusCode::FlowNotFound,
FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
TableNotFound { .. } => StatusCode::TableNotFound,
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
ViewNotFound { .. } | TableNotFound { .. } => StatusCode::TableNotFound,
ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
SubmitProcedure { source, .. }
| QueryProcedure { source, .. }

View File

@@ -48,6 +48,9 @@
//! 9. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}`
//! - Mapping source table's {table_id} to {flownode_id}
//! - Used in `Flownode` booting.
//! 10. View info key: `__view_info/{view_id}`
//! - The value is a [ViewInfoValue] struct; it contains the encoded logical plan.
//! - This key is mainly used in constructing the view in Datanode and Frontend.
//!
//! All keys have related managers. The managers take care of the serialization and deserialization
//! of keys and values, and the interaction with the underlying KV store backend.
@@ -89,6 +92,7 @@ pub mod table_name;
// TODO(weny): removes it.
#[allow(deprecated)]
pub mod table_region;
pub mod view_info;
// TODO(weny): removes it.
#[allow(deprecated)]
pub mod table_route;
@@ -117,6 +121,7 @@ use store_api::storage::RegionNumber;
use table::metadata::{RawTableInfo, TableId};
use table_info::{TableInfoKey, TableInfoManager, TableInfoValue};
use table_name::{TableNameKey, TableNameManager, TableNameValue};
use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue};
use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue};
use self::datanode_table::RegionInfo;
@@ -142,6 +147,7 @@ pub const MAINTENANCE_KEY: &str = "maintenance";
const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
const TABLE_REGION_KEY_PREFIX: &str = "__table_region";
pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info";
pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
@@ -166,6 +172,11 @@ lazy_static! {
Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
}
lazy_static! {
static ref VIEW_INFO_KEY_PATTERN: Regex =
Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
}
lazy_static! {
static ref TABLE_ROUTE_KEY_PATTERN: Regex =
Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap();
@@ -188,7 +199,7 @@ lazy_static! {
static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
"^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$"
))
.unwrap();
.unwrap();
}
lazy_static! {
@@ -196,7 +207,7 @@ lazy_static! {
static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!(
"^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
))
.unwrap();
.unwrap();
}
/// The key of metadata.
@@ -247,6 +258,7 @@ pub type TableMetadataManagerRef = Arc<TableMetadataManager>;
pub struct TableMetadataManager {
table_name_manager: TableNameManager,
table_info_manager: TableInfoManager,
view_info_manager: ViewInfoManager,
datanode_table_manager: DatanodeTableManager,
catalog_manager: CatalogManager,
schema_manager: SchemaManager,
@@ -390,6 +402,7 @@ impl TableMetadataManager {
TableMetadataManager {
table_name_manager: TableNameManager::new(kv_backend.clone()),
table_info_manager: TableInfoManager::new(kv_backend.clone()),
view_info_manager: ViewInfoManager::new(kv_backend.clone()),
datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()),
catalog_manager: CatalogManager::new(kv_backend.clone()),
schema_manager: SchemaManager::new(kv_backend.clone()),
@@ -427,6 +440,10 @@ impl TableMetadataManager {
&self.table_info_manager
}
pub fn view_info_manager(&self) -> &ViewInfoManager {
&self.view_info_manager
}
pub fn datanode_table_manager(&self) -> &DatanodeTableManager {
&self.datanode_table_manager
}
@@ -468,6 +485,69 @@ impl TableMetadataManager {
Ok((table_info_value, table_route_value))
}
/// Creates metadata for view and returns an error if different metadata exists.
/// The caller MUST ensure it has the exclusive access to `TableNameKey`.
pub async fn create_view_metadata(
&self,
view_info: RawTableInfo,
raw_logical_plan: &Vec<u8>,
) -> Result<()> {
let view_id = view_info.ident.table_id;
// Creates view name.
let view_name = TableNameKey::new(
&view_info.catalog_name,
&view_info.schema_name,
&view_info.name,
);
let create_table_name_txn = self
.table_name_manager()
.build_create_txn(&view_name, view_id)?;
// Creates table info.
let table_info_value = TableInfoValue::new(view_info);
let (create_table_info_txn, on_create_table_info_failure) = self
.table_info_manager()
.build_create_txn(view_id, &table_info_value)?;
// Creates view info
let view_info_value = ViewInfoValue::new(raw_logical_plan);
let (create_view_info_txn, on_create_view_info_failure) = self
.view_info_manager()
.build_create_txn(view_id, &view_info_value)?;
let txn = Txn::merge_all(vec![
create_table_name_txn,
create_table_info_txn,
create_view_info_txn,
]);
let mut r = self.kv_backend.txn(txn).await?;
// Checks whether metadata was already created.
if !r.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_table_info = on_create_table_info_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table info during the create table metadata",
})?
.into_inner();
let remote_view_info = on_create_view_info_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty view info during the create view info",
})?
.into_inner();
let op_name = "the creating view metadata";
ensure_values!(remote_table_info, table_info_value, op_name);
ensure_values!(remote_view_info, view_info_value, op_name);
}
Ok(())
}
/// Creates metadata for table and returns an error if different metadata exists.
/// The caller MUST ensure it has the exclusive access to `TableNameKey`.
pub async fn create_table_metadata(
@@ -817,6 +897,37 @@ impl TableMetadataManager {
Ok(())
}
/// Updates view info and returns an error if different metadata exists.
pub async fn update_view_info(
&self,
view_id: TableId,
current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
new_view_info: Vec<u8>,
) -> Result<()> {
let new_view_info_value = current_view_info_value.update(new_view_info);
// Updates view info.
let (update_view_info_txn, on_update_view_info_failure) = self
.view_info_manager()
.build_update_txn(view_id, current_view_info_value, &new_view_info_value)?;
let mut r = self.kv_backend.txn(update_view_info_txn).await?;
// Checks whether metadata was already updated.
if !r.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut r.responses);
let remote_view_info = on_update_view_info_failure(&mut set)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty view info during the updating view info",
})?
.into_inner();
let op_name = "the updating view info";
ensure_values!(remote_view_info, new_view_info_value, op_name);
}
Ok(())
}
pub fn batch_update_table_info_value_chunk_size(&self) -> usize {
self.kv_backend.max_txn_ops()
}
@@ -1025,6 +1136,7 @@ macro_rules! impl_meta_key_get_txn_op {
impl_meta_key_get_txn_op! {
TableNameKey<'_>,
TableInfoKey,
ViewInfoKey,
TableRouteKey,
DatanodeTableKey
}
@@ -1049,6 +1161,7 @@ macro_rules! impl_optional_meta_value {
impl_table_meta_value! {
TableNameValue,
TableInfoValue,
ViewInfoValue,
DatanodeTableValue,
FlowInfoValue,
FlowNameValue
@@ -1080,7 +1193,7 @@ mod tests {
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::key::{DeserializedValueWithBytes, TableMetadataManager};
use crate::key::{DeserializedValueWithBytes, TableMetadataManager, ViewInfoValue};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::rpc::router::{region_distribution, Region, RegionRoute, RegionStatus};
@@ -1836,4 +1949,87 @@ mod tests {
let kvs = mem_kv.dump();
assert_eq!(kvs, expected_result);
}
#[tokio::test]
async fn test_create_update_view_info() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let view_info: RawTableInfo = new_test_table_info(Vec::<u32>::new().into_iter()).into();
let view_id = view_info.ident.table_id;
let logical_plan: Vec<u8> = vec![1, 2, 3];
// Create metadata
table_metadata_manager
.create_view_metadata(view_info.clone(), &logical_plan)
.await
.unwrap();
{
// assert view info
let current_view_info = table_metadata_manager
.view_info_manager()
.get(view_id)
.await
.unwrap()
.unwrap()
.into_inner();
assert_eq!(current_view_info.view_info, logical_plan);
// assert table info
let current_table_info = table_metadata_manager
.table_info_manager()
.get(view_id)
.await
.unwrap()
.unwrap()
.into_inner();
assert_eq!(current_table_info.table_info, view_info);
}
let new_logical_plan: Vec<u8> = vec![4, 5, 6];
let current_view_info_value =
DeserializedValueWithBytes::from_inner(ViewInfoValue::new(&logical_plan));
// should be ok.
table_metadata_manager
.update_view_info(view_id, &current_view_info_value, new_logical_plan.clone())
.await
.unwrap();
// if table info was updated, it should be ok.
table_metadata_manager
.update_view_info(view_id, &current_view_info_value, new_logical_plan.clone())
.await
.unwrap();
// updated view_info should equal the `new_logical_plan`
let updated_view_info = table_metadata_manager
.view_info_manager()
.get(view_id)
.await
.unwrap()
.unwrap()
.into_inner();
assert_eq!(updated_view_info.view_info, new_logical_plan);
let wrong_view_info = logical_plan.clone();
let wrong_view_info_value =
DeserializedValueWithBytes::from_inner(current_view_info_value.update(wrong_view_info));
// if the current_view_info_value is wrong, it should return an error.
// The ABA problem.
assert!(table_metadata_manager
.update_view_info(view_id, &wrong_view_info_value, new_logical_plan.clone())
.await
.is_err());
// The view_info is not changed.
let current_view_info = table_metadata_manager
.view_info_manager()
.get(view_id)
.await
.unwrap()
.unwrap()
.into_inner();
assert_eq!(current_view_info.view_info, new_logical_plan);
}
}

View File

@@ -56,7 +56,7 @@ pub enum TableRouteValue {
Logical(LogicalTableRouteValue),
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
pub struct PhysicalTableRouteValue {
pub region_routes: Vec<RegionRoute>,
version: u64,

View File

@@ -0,0 +1,265 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::fmt::Display;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::TableId;
use super::VIEW_INFO_KEY_PATTERN;
use crate::error::{InvalidViewInfoSnafu, Result};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{DeserializedValueWithBytes, MetaKey, TableMetaValue, VIEW_INFO_KEY_PREFIX};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchGetRequest;
/// The VIEW logical plan encoded bytes
type RawViewLogicalPlan = Vec<u8>;
/// The key stores the metadata of the view.
///
/// The layout: `__view_info/{view_id}`.
#[derive(Debug, PartialEq)]
pub struct ViewInfoKey {
view_id: TableId,
}
impl ViewInfoKey {
/// Returns a new `[ViewInfoKey]`.
pub fn new(view_id: TableId) -> Self {
Self { view_id }
}
}
impl Display for ViewInfoKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", VIEW_INFO_KEY_PREFIX, self.view_id)
}
}
impl<'a> MetaKey<'a, ViewInfoKey> for ViewInfoKey {
fn to_bytes(&self) -> Vec<u8> {
self.to_string().into_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<ViewInfoKey> {
let key = std::str::from_utf8(bytes).map_err(|e| {
InvalidViewInfoSnafu {
err_msg: format!(
"ViewInfoKey '{}' is not a valid UTF8 string: {e}",
String::from_utf8_lossy(bytes)
),
}
.build()
})?;
let captures = VIEW_INFO_KEY_PATTERN
.captures(key)
.context(InvalidViewInfoSnafu {
err_msg: format!("Invalid ViewInfoKey '{key}'"),
})?;
// Safety: pass the regex check above
let view_id = captures[1].parse::<TableId>().unwrap();
Ok(ViewInfoKey { view_id })
}
}
/// The VIEW info value that keeps the metadata.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ViewInfoValue {
pub view_info: RawViewLogicalPlan,
version: u64,
}
impl ViewInfoValue {
pub fn new(view_info: &RawViewLogicalPlan) -> Self {
Self {
view_info: view_info.clone(),
version: 0,
}
}
pub(crate) fn update(&self, new_view_info: RawViewLogicalPlan) -> Self {
Self {
view_info: new_view_info,
version: self.version + 1,
}
}
}
/// The `[ViewInfo]` manager
pub struct ViewInfoManager {
kv_backend: KvBackendRef,
}
impl ViewInfoManager {
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Builds a create view info transaction, it expected the `__view_info/{view_id}` wasn't occupied.
pub(crate) fn build_create_txn(
&self,
view_id: TableId,
view_info_value: &ViewInfoValue,
) -> Result<(
Txn,
impl FnOnce(
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<ViewInfoValue>>>,
)> {
let key = ViewInfoKey::new(view_id);
let raw_key = key.to_bytes();
let txn = Txn::put_if_not_exists(raw_key.clone(), view_info_value.try_as_raw_value()?);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
/// Builds a update view info transaction, it expected the remote value equals the `current_current_view_info_value`.
/// It retrieves the latest value if the comparing failed.
pub(crate) fn build_update_txn(
&self,
view_id: TableId,
current_view_info_value: &DeserializedValueWithBytes<ViewInfoValue>,
new_view_info_value: &ViewInfoValue,
) -> Result<(
Txn,
impl FnOnce(
&mut TxnOpGetResponseSet,
) -> Result<Option<DeserializedValueWithBytes<ViewInfoValue>>>,
)> {
let key = ViewInfoKey::new(view_id);
let raw_key = key.to_bytes();
let raw_value = current_view_info_value.get_raw_bytes();
let new_raw_value: Vec<u8> = new_view_info_value.try_as_raw_value()?;
let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)),
))
}
/// Get the `[ViewInfoValue]` by the view id
pub async fn get(
&self,
view_id: TableId,
) -> Result<Option<DeserializedValueWithBytes<ViewInfoValue>>> {
let key = ViewInfoKey::new(view_id);
let raw_key = key.to_bytes();
self.kv_backend
.get(&raw_key)
.await?
.map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
.transpose()
}
/// Get the `[ViewInfoValue]` by the view id slice in batch
pub async fn batch_get(&self, view_ids: &[TableId]) -> Result<HashMap<TableId, ViewInfoValue>> {
let lookup_table = view_ids
.iter()
.map(|id| (ViewInfoKey::new(*id).to_bytes(), id))
.collect::<HashMap<_, _>>();
let resp = self
.kv_backend
.batch_get(BatchGetRequest {
keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
})
.await?;
let values = resp
.kvs
.iter()
.map(|kv| {
Ok((
// Safety: must exist.
**lookup_table.get(kv.key()).unwrap(),
ViewInfoValue::try_from_raw_value(&kv.value)?,
))
})
.collect::<Result<HashMap<_, _>>>()?;
Ok(values)
}
/// Returns batch of `DeserializedValueWithBytes<ViewInfoValue>`.
pub async fn batch_get_raw(
&self,
view_ids: &[TableId],
) -> Result<HashMap<TableId, DeserializedValueWithBytes<ViewInfoValue>>> {
let lookup_table = view_ids
.iter()
.map(|id| (ViewInfoKey::new(*id).to_bytes(), id))
.collect::<HashMap<_, _>>();
let resp = self
.kv_backend
.batch_get(BatchGetRequest {
keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
})
.await?;
let values = resp
.kvs
.iter()
.map(|kv| {
Ok((
// Safety: must exist.
**lookup_table.get(kv.key()).unwrap(),
DeserializedValueWithBytes::from_inner_slice(&kv.value)?,
))
})
.collect::<Result<HashMap<_, _>>>()?;
Ok(values)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_key_serialization() {
let key = ViewInfoKey::new(42);
let raw_key = key.to_bytes();
assert_eq!(raw_key, b"__view_info/42");
}
#[test]
fn test_key_deserialization() {
let expected = ViewInfoKey::new(42);
let key = ViewInfoKey::from_bytes(b"__view_info/42").unwrap();
assert_eq!(key, expected);
}
#[test]
fn test_value_serialization() {
let value = ViewInfoValue {
view_info: vec![1, 2, 3],
version: 1,
};
let serialized = value.try_as_raw_value().unwrap();
let deserialized = ViewInfoValue::try_from_raw_value(&serialized).unwrap();
assert_eq!(value, deserialized);
}
}

View File

@@ -39,6 +39,12 @@ lazy_static! {
&["step"]
)
.unwrap();
pub static ref METRIC_META_PROCEDURE_CREATE_VIEW: HistogramVec = register_histogram_vec!(
"greptime_meta_procedure_create_view",
"meta procedure create view",
&["step"]
)
.unwrap();
pub static ref METRIC_META_PROCEDURE_CREATE_FLOW: HistogramVec = register_histogram_vec!(
"greptime_meta_procedure_create_flow",
"meta procedure create flow",

View File

@@ -20,14 +20,16 @@ use api::v1::meta::{
AlterTableTask as PbAlterTableTask, AlterTableTasks as PbAlterTableTasks,
CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks, Partition, ProcedureId,
CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest,
DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask,
DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId,
TruncateTableTask as PbTruncateTableTask,
};
use api::v1::{
AlterExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, DropDatabaseExpr, DropFlowExpr,
DropTableExpr, QueryContext as PbQueryContext, TruncateTableExpr,
AlterExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, QueryContext as PbQueryContext,
TruncateTableExpr,
};
use base64::engine::general_purpose;
use base64::Engine as _;
@@ -43,6 +45,7 @@ use crate::error::{self, Result};
use crate::key::FlowId;
use crate::table_name::TableName;
/// DDL tasks
#[derive(Debug, Clone)]
pub enum DdlTask {
CreateTable(CreateTableTask),
@@ -56,6 +59,8 @@ pub enum DdlTask {
DropDatabase(DropDatabaseTask),
CreateFlow(CreateFlowTask),
DropFlow(DropFlowTask),
CreateView(CreateViewTask),
DropView(DropViewTask),
}
impl DdlTask {
@@ -148,6 +153,14 @@ impl DdlTask {
table_id,
})
}
// Create a `[DdlTask::CreateView]` task.
pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self {
DdlTask::CreateView(CreateViewTask {
create_view,
view_info,
})
}
}
impl TryFrom<Task> for DdlTask {
@@ -197,6 +210,8 @@ impl TryFrom<Task> for DdlTask {
}
Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)),
Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)),
}
}
}
@@ -213,7 +228,7 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
let task = match request.task {
DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?),
DdlTask::DropTable(task) => Task::DropTableTask(task.try_into()?),
DdlTask::DropTable(task) => Task::DropTableTask(task.into()),
DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?),
DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?),
DdlTask::CreateLogicalTables(tasks) => {
@@ -227,8 +242,8 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
DdlTask::DropLogicalTables(tasks) => {
let tasks = tasks
.into_iter()
.map(|task| task.try_into())
.collect::<Result<Vec<_>>>()?;
.map(|task| task.into())
.collect::<Vec<_>>();
Task::DropTableTasks(PbDropTableTasks { tasks })
}
@@ -244,6 +259,8 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?),
DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()),
DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()),
DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?),
DdlTask::DropView(task) => Task::DropViewTask(task.into()),
};
Ok(Self {
@@ -295,6 +312,138 @@ impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
}
}
/// A `CREATE VIEW` task.
#[derive(Debug, PartialEq, Clone)]
pub struct CreateViewTask {
pub create_view: CreateViewExpr,
pub view_info: RawTableInfo,
}
impl CreateViewTask {
/// Returns the `[TableReference]` of view.
pub fn table_ref(&self) -> TableReference {
TableReference {
catalog: &self.create_view.catalog_name,
schema: &self.create_view.schema_name,
table: &self.create_view.view_name,
}
}
pub fn raw_logical_plan(&self) -> &Vec<u8> {
&self.create_view.logical_plan
}
}
impl TryFrom<PbCreateViewTask> for CreateViewTask {
type Error = error::Error;
fn try_from(pb: PbCreateViewTask) -> Result<Self> {
let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?;
Ok(CreateViewTask {
create_view: pb.create_view.context(error::InvalidProtoMsgSnafu {
err_msg: "expected create view",
})?,
view_info,
})
}
}
impl TryFrom<CreateViewTask> for PbCreateViewTask {
type Error = error::Error;
fn try_from(task: CreateViewTask) -> Result<PbCreateViewTask> {
Ok(PbCreateViewTask {
create_view: Some(task.create_view),
view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?,
})
}
}
impl Serialize for CreateViewTask {
fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let view_info = serde_json::to_vec(&self.view_info)
.map_err(|err| serde::ser::Error::custom(err.to_string()))?;
let pb = PbCreateViewTask {
create_view: Some(self.create_view.clone()),
view_info,
};
let buf = pb.encode_to_vec();
let encoded = general_purpose::STANDARD_NO_PAD.encode(buf);
serializer.serialize_str(&encoded)
}
}
impl<'de> Deserialize<'de> for CreateViewTask {
fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let encoded = String::deserialize(deserializer)?;
let buf = general_purpose::STANDARD_NO_PAD
.decode(encoded)
.map_err(|err| serde::de::Error::custom(err.to_string()))?;
let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf)
.map_err(|err| serde::de::Error::custom(err.to_string()))?;
let expr = CreateViewTask::try_from(expr)
.map_err(|err| serde::de::Error::custom(err.to_string()))?;
Ok(expr)
}
}
/// A `DROP VIEW` task.
#[derive(Debug, PartialEq, Clone)]
pub struct DropViewTask {
pub catalog: String,
pub schema: String,
pub view: String,
pub view_id: TableId,
pub drop_if_exists: bool,
}
impl TryFrom<PbDropViewTask> for DropViewTask {
type Error = error::Error;
fn try_from(pb: PbDropViewTask) -> Result<Self> {
let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu {
err_msg: "expected drop view",
})?;
Ok(DropViewTask {
catalog: expr.catalog_name,
schema: expr.schema_name,
view: expr.view_name,
view_id: expr
.view_id
.context(error::InvalidProtoMsgSnafu {
err_msg: "expected view_id",
})?
.id,
drop_if_exists: expr.drop_if_exists,
})
}
}
impl From<DropViewTask> for PbDropViewTask {
fn from(task: DropViewTask) -> Self {
PbDropViewTask {
drop_view: Some(DropViewExpr {
catalog_name: task.catalog,
schema_name: task.schema,
view_name: task.view,
view_id: Some(api::v1::TableId { id: task.view_id }),
drop_if_exists: task.drop_if_exists,
}),
}
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct DropTableTask {
pub catalog: String,
@@ -346,11 +495,9 @@ impl TryFrom<PbDropTableTask> for DropTableTask {
}
}
impl TryFrom<DropTableTask> for PbDropTableTask {
type Error = error::Error;
fn try_from(task: DropTableTask) -> Result<Self> {
Ok(PbDropTableTask {
impl From<DropTableTask> for PbDropTableTask {
fn from(task: DropTableTask) -> Self {
PbDropTableTask {
drop_table: Some(DropTableExpr {
catalog_name: task.catalog,
schema_name: task.schema,
@@ -358,7 +505,7 @@ impl TryFrom<DropTableTask> for PbDropTableTask {
table_id: Some(api::v1::TableId { id: task.table_id }),
drop_if_exists: task.drop_if_exists,
}),
})
}
}
}

View File

@@ -20,7 +20,7 @@ use crate::schema::{ColumnSchema, Schema, SchemaBuilder};
/// Struct used to serialize and deserialize [`Schema`](crate::schema::Schema).
///
/// This struct only contains necessary data to recover the Schema.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct RawSchema {
/// Schema of columns.
pub column_schemas: Vec<ColumnSchema>,

View File

@@ -511,7 +511,6 @@ pub fn check_permission(
| Statement::ShowDatabases(_)
| Statement::DropDatabase(_)
| Statement::DropFlow(_) => {}
Statement::ShowCreateTable(stmt) => {
validate_param(&stmt.table_name, query_ctx)?;
}
@@ -522,6 +521,9 @@ pub fn check_permission(
// TODO: should also validate source table name here?
validate_param(&stmt.sink_table_name, query_ctx)?;
}
Statement::CreateView(stmt) => {
validate_param(&stmt.name, query_ctx)?;
}
Statement::Alter(stmt) => {
validate_param(stmt.table_name(), query_ctx)?;
}

View File

@@ -159,6 +159,17 @@ impl GrpcQueryHandler for Instance {
.drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
.await?
}
DdlExpr::CreateView(expr) => {
let _ = self
.statement_executor
.create_view_by_expr(expr, ctx.clone())
.await?;
Output::new_with_affected_rows(0)
}
DdlExpr::DropView(_) => {
todo!("implemented in the following PR")
}
}
}
};
@@ -207,6 +218,12 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte
expr.catalog_name = catalog.to_string();
}
}
Expr::CreateView(expr) => {
check_and_fill!(expr);
}
Expr::DropView(expr) => {
check_and_fill!(expr);
}
}
}

View File

@@ -53,6 +53,7 @@ snafu.workspace = true
sql.workspace = true
sqlparser.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
tonic.workspace = true

View File

@@ -35,6 +35,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("View already exists: `{name}`"))]
ViewAlreadyExists {
name: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to invalidate table cache"))]
InvalidateTableCache {
#[snafu(implicit)]
@@ -129,6 +136,12 @@ pub enum Error {
source: api::error::Error,
},
#[snafu(display("Invalid statement to create view"))]
InvalidViewStmt {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to convert column default constraint, column: {}", column_name))]
ConvertColumnDefaultConstraint {
column_name: String,
@@ -637,6 +650,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid view name: {name}"))]
InvalidViewName {
name: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Do not support {} in multiple catalogs", ddl_name))]
DdlWithMultiCatalogs {
ddl_name: String,
@@ -686,6 +706,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to convert between logical plan and substrait plan"))]
SubstraitCodec {
#[snafu(implicit)]
location: Location,
source: substrait::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -714,10 +741,14 @@ impl ErrorExt for Error {
| Error::SchemaIncompatible { .. }
| Error::UnsupportedRegionRequest { .. }
| Error::InvalidTableName { .. }
| Error::ConvertIdentifier { .. }
| Error::InvalidExpr { .. } => StatusCode::InvalidArguments,
| Error::InvalidViewName { .. }
| Error::InvalidExpr { .. }
| Error::InvalidViewStmt { .. }
| Error::ConvertIdentifier { .. } => StatusCode::InvalidArguments,
Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
Error::TableAlreadyExists { .. } | Error::ViewAlreadyExists { .. } => {
StatusCode::TableAlreadyExists
}
Error::NotSupported { .. } => StatusCode::Unsupported,
@@ -744,6 +775,7 @@ impl ErrorExt for Error {
Error::RequestInserts { source, .. } => source.status_code(),
Error::RequestRegion { source, .. } => source.status_code(),
Error::RequestDeletes { source, .. } => source.status_code(),
Error::SubstraitCodec { source, .. } => source.status_code(),
Error::ColumnDataType { source, .. } | Error::InvalidColumnDef { source, .. } => {
source.status_code()

View File

@@ -18,8 +18,8 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_expr::Kind;
use api::v1::{
AddColumn, AddColumns, AlterExpr, ChangeColumnType, ChangeColumnTypes, Column, ColumnDataType,
ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, DropColumn, DropColumns, RenameTable,
SemanticType, TableName,
ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
DropColumns, RenameTable, SemanticType, TableName,
};
use common_error::ext::BoxedError;
use common_grpc_expr::util::ColumnExpr;
@@ -36,7 +36,9 @@ use session::table_name::table_idents_to_full_name;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::{ColumnDef, ColumnOption, TableConstraint};
use sql::statements::alter::{AlterTable, AlterTableOperation};
use sql::statements::create::{CreateExternalTable, CreateFlow, CreateTable, TIME_INDEX};
use sql::statements::create::{
CreateExternalTable, CreateFlow, CreateTable, CreateView, TIME_INDEX,
};
use sql::statements::{
column_def_to_schema, sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type,
};
@@ -513,6 +515,28 @@ pub(crate) fn to_alter_expr(
})
}
/// Try to cast the `[CreateViewExpr]` statement into gRPC `[CreateViewExpr]`.
pub fn to_create_view_expr(
stmt: CreateView,
logical_plan: Vec<u8>,
query_ctx: QueryContextRef,
) -> Result<CreateViewExpr> {
let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let expr = CreateViewExpr {
catalog_name,
schema_name,
view_name,
logical_plan,
create_if_not_exists: stmt.if_not_exists,
or_replace: stmt.or_replace,
};
Ok(expr)
}
pub fn to_create_flow_task_expr(
create_flow: CreateFlow,
query_ctx: &QueryContextRef,
@@ -767,4 +791,54 @@ mod tests {
);
assert!(change_column_type.target_type_extension.is_none());
}
#[test]
fn test_to_create_view_expr() {
let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateView(stmt) = stmt else {
unreachable!()
};
let logical_plan = vec![1, 2, 3];
let expr = to_create_view_expr(stmt, logical_plan.clone(), QueryContext::arc()).unwrap();
assert_eq!("greptime", expr.catalog_name);
assert_eq!("public", expr.schema_name);
assert_eq!("test", expr.view_name);
assert!(!expr.create_if_not_exists);
assert!(!expr.or_replace);
assert_eq!(logical_plan, expr.logical_plan);
}
#[test]
fn test_to_create_view_expr_complex() {
let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateView(stmt) = stmt else {
unreachable!()
};
let logical_plan = vec![1, 2, 3];
let expr = to_create_view_expr(stmt, logical_plan.clone(), QueryContext::arc()).unwrap();
assert_eq!("greptime", expr.catalog_name);
assert_eq!("test", expr.schema_name);
assert_eq!("test_view", expr.view_name);
assert!(expr.create_if_not_exists);
assert!(expr.or_replace);
assert_eq!(logical_plan, expr.logical_plan);
}
}

View File

@@ -41,4 +41,9 @@ lazy_static! {
"table operator delete rows"
)
.unwrap();
pub static ref DIST_CREATE_VIEW: Histogram = register_histogram!(
"greptime_ddl_operator_create_view",
"DDL operator create view"
)
.unwrap();
}

View File

@@ -179,6 +179,10 @@ impl StatementExecutor {
)
.await
}
Statement::CreateView(stmt) => {
let _ = self.create_view(stmt, query_ctx).await?;
Ok(Output::new_with_affected_rows(0))
}
Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
Statement::DropTable(stmt) => {
let (catalog, schema, table) =
@@ -274,6 +278,13 @@ impl StatementExecutor {
.context(PlanStatementSnafu)
}
pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
self.query_engine
.planner()
.optimize(plan)
.context(PlanStatementSnafu)
}
#[tracing::instrument(skip_all)]
async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
let plan = self.plan(stmt, query_ctx.clone()).await?;

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
use api::v1::{column_def, AlterExpr, CreateFlowExpr, CreateTableExpr};
use api::v1::{column_def, AlterExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr};
use catalog::CatalogManagerRef;
use chrono::Utc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
@@ -34,7 +34,7 @@ use common_meta::rpc::ddl::{
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::{info, tracing};
use common_telemetry::{debug, info, tracing};
use common_time::Timezone;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
@@ -42,6 +42,7 @@ use datatypes::value::Value;
use lazy_static::lazy_static;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::partition::{PartitionBound, PartitionDef};
use query::parser::QueryStatement;
use query::sql::create_table_stmt;
use regex::Regex;
use session::context::QueryContextRef;
@@ -49,11 +50,13 @@ use session::table_name::table_idents_to_full_name;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::alter::AlterTable;
use sql::statements::create::{
CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, Partitions,
CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
};
use sql::statements::sql_value_to_value;
use sql::statements::statement::Statement;
use sqlparser::ast::{Expr, Ident, Value as ParserValue};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::dist_table::DistTable;
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
use table::requests::{AlterKind, AlterTableRequest, TableOptions};
@@ -65,8 +68,9 @@ use crate::error::{
CreateLogicalTablesSnafu, CreateTableInfoSnafu, DdlWithMultiCatalogsSnafu,
DdlWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu, FlowNotFoundSnafu,
InvalidPartitionColumnsSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu,
ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistsSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result, SchemaNotFoundSnafu,
SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
};
use crate::expr_factory;
use crate::statement::show::create_partitions_stmt;
@@ -374,6 +378,165 @@ impl StatementExecutor {
.context(error::ExecuteDdlSnafu)
}
#[tracing::instrument(skip_all)]
pub async fn create_view(
&self,
create_view: CreateView,
ctx: QueryContextRef,
) -> Result<TableRef> {
// convert input into logical plan
let logical_plan = match &*create_view.query {
Statement::Query(query) => {
self.plan(
QueryStatement::Sql(Statement::Query(query.clone())),
ctx.clone(),
)
.await?
}
Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?,
_ => {
return InvalidViewStmtSnafu {}.fail();
}
};
let optimized_plan = self.optimize_logical_plan(logical_plan)?;
// encode logical plan
let encoded_plan = DFLogicalSubstraitConvertor
.encode(&optimized_plan.unwrap_df_plan())
.context(SubstraitCodecSnafu)?;
let expr =
expr_factory::to_create_view_expr(create_view, encoded_plan.to_vec(), ctx.clone())?;
self.create_view_by_expr(expr, ctx).await
}
pub async fn create_view_by_expr(
&self,
expr: CreateViewExpr,
ctx: QueryContextRef,
) -> Result<TableRef> {
let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer();
let schema_exists = self
.table_metadata_manager
.schema_manager()
.exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name))
.await
.context(TableMetadataManagerSnafu)?;
ensure!(
schema_exists,
SchemaNotFoundSnafu {
schema_info: &expr.schema_name,
}
);
// if view or table exists.
if let Some(table) = self
.catalog_manager
.table(&expr.catalog_name, &expr.schema_name, &expr.view_name)
.await
.context(CatalogSnafu)?
{
let table_type = table.table_info().table_type;
match (table_type, expr.create_if_not_exists, expr.or_replace) {
(TableType::View, true, false) => {
return Ok(table);
}
(TableType::View, false, false) => {
return ViewAlreadyExistsSnafu {
name: format_full_table_name(
&expr.catalog_name,
&expr.schema_name,
&expr.view_name,
),
}
.fail();
}
(TableType::View, _, true) => {
// Try to replace an exists view
}
_ => {
return TableAlreadyExistsSnafu {
table: format_full_table_name(
&expr.catalog_name,
&expr.schema_name,
&expr.view_name,
),
}
.fail();
}
}
}
ensure!(
NAME_PATTERN_REG.is_match(&expr.view_name),
InvalidViewNameSnafu {
name: expr.view_name.clone(),
}
);
let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name);
let mut view_info = RawTableInfo {
ident: metadata::TableIdent {
// The view id of distributed table is assigned by Meta, set "0" here as a placeholder.
table_id: 0,
version: 0,
},
name: expr.view_name.clone(),
desc: None,
catalog_name: expr.catalog_name.clone(),
schema_name: expr.schema_name.clone(),
// The meta doesn't make sense for views, so using a default one.
meta: RawTableMeta::default(),
table_type: TableType::View,
};
let request = SubmitDdlTaskRequest {
query_context: ctx,
task: DdlTask::new_create_view(expr, view_info.clone()),
};
let resp = self
.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)?;
debug!(
"Submit creating view '{view_name}' task response: {:?}",
resp
);
let view_id = resp.table_id.context(error::UnexpectedSnafu {
violated: "expected table_id",
})?;
info!("Successfully created view '{view_name}' with view id {view_id}");
// Invalidates local cache ASAP.
self.cache_invalidator
.invalidate(
&Context::default(),
&[
CacheIdent::TableId(view_id),
CacheIdent::TableName(view_name.clone()),
],
)
.await
.context(error::InvalidateTableCacheSnafu)?;
view_info.ident.table_id = view_id;
let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?);
let table = DistTable::table(view_info);
Ok(table)
}
#[tracing::instrument(skip_all)]
pub async fn drop_flow(
&self,
@@ -1175,7 +1338,7 @@ mod test {
.unwrap_err()
.to_string(),
"Invalid partition columns when creating table 'my_table', \
reason: partition column must belongs to primary keys or equals to time index",
reason: partition column must belongs to primary keys or equals to time index",
);
}

View File

@@ -20,6 +20,7 @@ use query::parser::{
PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME,
DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
};
use query::plan::LogicalPlan;
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::statements::tql::Tql;
@@ -28,8 +29,9 @@ use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Re
use crate::statement::StatementExecutor;
impl StatementExecutor {
/// Plan the given [Tql] query and return the [LogicalPlan].
#[tracing::instrument(skip_all)]
pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
pub async fn plan_tql(&self, tql: Tql, query_ctx: &QueryContextRef) -> Result<LogicalPlan> {
let stmt = match tql {
Tql::Eval(eval) => {
let promql = PromQuery {
@@ -41,7 +43,7 @@ impl StatementExecutor {
.lookback
.unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()),
};
QueryLanguageParser::parse_promql(&promql, &query_ctx).context(ParseQuerySnafu)?
QueryLanguageParser::parse_promql(&promql, query_ctx).context(ParseQuerySnafu)?
}
Tql::Explain(explain) => {
let promql = PromQuery {
@@ -58,7 +60,7 @@ impl StatementExecutor {
}
.to_string();
let params = HashMap::from([("name".to_string(), explain_node_name)]);
QueryLanguageParser::parse_promql(&promql, &query_ctx)
QueryLanguageParser::parse_promql(&promql, query_ctx)
.context(ParseQuerySnafu)?
.post_process(params)
.unwrap()
@@ -80,18 +82,23 @@ impl StatementExecutor {
}
.to_string();
let params = HashMap::from([("name".to_string(), analyze_node_name)]);
QueryLanguageParser::parse_promql(&promql, &query_ctx)
QueryLanguageParser::parse_promql(&promql, query_ctx)
.context(ParseQuerySnafu)?
.post_process(params)
.unwrap()
}
};
let plan = self
.query_engine
self.query_engine
.planner()
.plan(stmt, query_ctx.clone())
.await
.context(PlanStatementSnafu)?;
.context(PlanStatementSnafu)
}
/// Execute the given [Tql] query and return the result.
#[tracing::instrument(skip_all)]
pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
let plan = self.plan_tql(tql, &query_ctx).await?;
self.query_engine
.execute(plan, query_ctx)
.await

View File

@@ -87,6 +87,13 @@ impl LogicalPlan {
.context(DataFusionSnafu)
.map(LogicalPlan::DfPlan)
}
/// Unwrap the logical plan into a DataFusion logical plan
pub fn unwrap_df_plan(self) -> DfLogicalPlan {
match self {
LogicalPlan::DfPlan(plan) => plan,
}
}
}
impl From<DfLogicalPlan> for LogicalPlan {

View File

@@ -42,6 +42,8 @@ use crate::{DfContextProviderAdapter, QueryEngineContext};
pub trait LogicalPlanner: Send + Sync {
async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
fn as_any(&self) -> &dyn Any;
}
@@ -145,6 +147,14 @@ impl DfLogicalPlanner {
.map_err(BoxedError::new)
.context(QueryPlanSnafu)
}
#[tracing::instrument(skip_all)]
fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
self.engine_state
.optimize_logical_plan(plan.unwrap_df_plan())
.context(DataFusionSnafu)
.map(Into::into)
}
}
#[async_trait]
@@ -157,6 +167,10 @@ impl LogicalPlanner for DfLogicalPlanner {
}
}
fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
self.optimize_logical_plan(plan)
}
fn as_any(&self) -> &dyn Any {
self
}

View File

@@ -157,6 +157,11 @@ impl QueryEngineState {
})
}
/// Run the full logical plan optimize phase for the given plan.
pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
self.session_state().optimize(&plan)
}
/// Register an udf function.
/// Will override if the function with same name is already registered.
pub fn register_function(&self, func: FunctionRef) {

View File

@@ -33,8 +33,8 @@ use crate::error::{
};
use crate::parser::{ParserContext, FLOW};
use crate::statements::create::{
CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, Partitions,
TIME_INDEX,
CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView,
Partitions, TIME_INDEX,
};
use crate::statements::statement::Statement;
use crate::statements::{get_data_type_by_alias_name, OptionMap};
@@ -70,6 +70,7 @@ impl<'a> ParserContext<'a> {
.context(SyntaxSnafu)?;
match self.parser.next_token().token {
Token::Word(w) => match w.keyword {
Keyword::VIEW => self.parse_create_view(true),
Keyword::NoKeyword => {
let uppercase = w.value.to_uppercase();
match uppercase.as_str() {
@@ -83,6 +84,11 @@ impl<'a> ParserContext<'a> {
}
}
Keyword::VIEW => {
let _ = self.parser.next_token();
self.parse_create_view(false)
}
Keyword::NoKeyword => {
let _ = self.parser.next_token();
let uppercase = w.value.to_uppercase();
@@ -91,13 +97,31 @@ impl<'a> ParserContext<'a> {
_ => self.unsupported(w.to_string()),
}
}
_ => self.unsupported(w.to_string()),
},
unexpected => self.unsupported(unexpected.to_string()),
}
}
/// Parse `CREAVE VIEW` statement.
fn parse_create_view(&mut self, or_replace: bool) -> Result<Statement> {
let if_not_exists = self.parse_if_not_exist()?;
let view_name = self.intern_parse_table_name()?;
self.parser
.expect_keyword(Keyword::AS)
.context(SyntaxSnafu)?;
let query = self.parse_query()?;
Ok(Statement::CreateView(CreateView {
name: view_name,
or_replace,
query: Box::new(query),
if_not_exists,
}))
}
fn parse_create_external_table(&mut self) -> Result<Statement> {
let _ = self.parser.next_token();
self.parser
@@ -1770,4 +1794,46 @@ non TIMESTAMP(6) TIME INDEX,
_ => unreachable!(),
}
}
#[test]
fn test_parse_create_view() {
let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
match &result[0] {
Statement::CreateView(c) => {
assert_eq!(c.to_string(), sql);
assert!(!c.or_replace);
assert!(!c.if_not_exists);
assert_eq!("test", c.name.to_string());
}
_ => unreachable!(),
}
let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
match &result[0] {
Statement::CreateView(c) => {
assert_eq!(c.to_string(), sql);
assert!(c.or_replace);
assert!(c.if_not_exists);
assert_eq!("test", c.name.to_string());
}
_ => unreachable!(),
}
}
#[test]
fn test_parse_create_view_invalid_query() {
let sql = "CREATE VIEW test AS DELETE from demo";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(result.is_err());
assert_matches!(result, Err(crate::error::Error::Syntax { .. }));
}
}

View File

@@ -20,6 +20,7 @@ use sqlparser::ast::{Expr, Query};
use sqlparser_derive::{Visit, VisitMut};
use crate::ast::{ColumnDef, Ident, ObjectName, TableConstraint, Value as SqlValue};
use crate::statements::statement::Statement;
use crate::statements::OptionMap;
const LINE_SEP: &str = ",\n";
@@ -284,6 +285,35 @@ impl Display for CreateFlow {
}
}
/// Create SQL view statement.
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)]
pub struct CreateView {
/// View name
pub name: ObjectName,
/// The clause after `As` that defines the VIEW.
/// Can only be either [Statement::Query] or [Statement::Tql].
pub query: Box<Statement>,
/// Whether to replace existing VIEW
pub or_replace: bool,
/// Create VIEW only when it doesn't exists
pub if_not_exists: bool,
}
impl Display for CreateView {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "CREATE ")?;
if self.or_replace {
write!(f, "OR REPLACE ")?;
}
write!(f, "VIEW ")?;
if self.if_not_exists {
write!(f, "IF NOT EXISTS ")?;
}
write!(f, "{} ", &self.name)?;
write!(f, "AS {}", &self.query)
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;

View File

@@ -21,7 +21,7 @@ use sqlparser_derive::{Visit, VisitMut};
use crate::error::{ConvertToDfStatementSnafu, Error};
use crate::statements::alter::AlterTable;
use crate::statements::create::{
CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike,
CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView,
};
use crate::statements::delete::Delete;
use crate::statements::describe::DescribeTable;
@@ -56,6 +56,8 @@ pub enum Statement {
CreateFlow(CreateFlow),
// DROP FLOW
DropFlow(DropFlow),
// CREATE VIEW ... AS
CreateView(CreateView),
// DROP TABLE
DropTable(DropTable),
// DROP DATABASE
@@ -126,6 +128,7 @@ impl Display for Statement {
Statement::ShowCollation(kind) => {
write!(f, "SHOW COLLATION {kind}")
}
Statement::CreateView(s) => s.fmt(f),
}
}
}

View File

@@ -722,7 +722,7 @@ impl From<TableId> for TableIdent {
}
/// Struct used to serialize and deserialize [`TableMeta`].
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
pub struct RawTableMeta {
pub schema: RawSchema,
/// The indices of columns in primary key. Note that the index of timestamp column

View File

@@ -0,0 +1,148 @@
--- test CREATE VIEW ---
CREATE DATABASE for_test_view;
Affected Rows: 1
USE for_test_view;
Affected Rows: 0
CREATE TABLE test_table(a STRING, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
CREATE VIEW test_view;
Error: 2000(InvalidSyntax), sql parser error: Expected AS, found: ; at Line: 1, Column 22
CREATE VIEW test_view as DELETE FROM public.numbers;
Error: 2000(InvalidSyntax), sql parser error: Expected SELECT, VALUES, or a subquery in the query body, found: DELETE at Line: 1, Column 26
--- Table already exists ---
CREATE VIEW test_table as SELECT * FROM public.numbers;
Error: 4000(TableAlreadyExists), Table already exists: `greptime.for_test_view.test_table`
--- Table already exists even when create_if_not_exists ---
CREATE VIEW IF NOT EXISTS test_table as SELECT * FROM public.numbers;
Error: 4000(TableAlreadyExists), Table already exists: `greptime.for_test_view.test_table`
--- Table already exists even when or_replace ---
CREATE OR REPLACE VIEW test_table as SELECT * FROM public.numbers;
Error: 4000(TableAlreadyExists), Table already exists: `greptime.for_test_view.test_table`
CREATE VIEW test_view as SELECT * FROM public.numbers;
Affected Rows: 0
--- View already exists ----
CREATE VIEW test_view as SELECT * FROM public.numbers;
Error: 4000(TableAlreadyExists), View already exists: `greptime.for_test_view.test_view`
CREATE VIEW IF NOT EXISTS test_view as SELECT * FROM public.numbers;
Affected Rows: 0
CREATE OR REPLACE VIEW test_view as SELECT * FROM public.numbers;
Affected Rows: 0
SHOW TABLES;
+------------+
| Tables |
+------------+
| test_table |
| test_view |
+------------+
SHOW FULL TABLES;
+------------+------------+
| Tables | Table_type |
+------------+------------+
| test_table | BASE TABLE |
| test_view | VIEW |
+------------+------------+
-- SQLNESS REPLACE (\s\d+\s) ID
SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE;
+---------------+--------------------+---------------------------------------+-----------------+----------+-------------+
| table_catalog | table_schema | table_name | table_type | table_id | engine |
+---------------+--------------------+---------------------------------------+-----------------+----------+-------------+
| greptime | information_schema | build_info | LOCAL TEMPORARY |ID | |
| greptime | information_schema | character_sets | LOCAL TEMPORARY |ID | |
| greptime | information_schema | check_constraints | LOCAL TEMPORARY |ID | |
| greptime | information_schema | cluster_info | LOCAL TEMPORARY |ID | |
| greptime | information_schema | collation_character_set_applicability | LOCAL TEMPORARY |ID | |
| greptime | information_schema | collations | LOCAL TEMPORARY |ID | |
| greptime | information_schema | column_privileges | LOCAL TEMPORARY |ID | |
| greptime | information_schema | column_statistics | LOCAL TEMPORARY |ID | |
| greptime | information_schema | columns | LOCAL TEMPORARY |ID | |
| greptime | information_schema | engines | LOCAL TEMPORARY |ID | |
| greptime | information_schema | events | LOCAL TEMPORARY |ID | |
| greptime | information_schema | files | LOCAL TEMPORARY |ID | |
| greptime | information_schema | global_status | LOCAL TEMPORARY |ID | |
| greptime | information_schema | key_column_usage | LOCAL TEMPORARY |ID | |
| greptime | public | numbers | LOCAL TEMPORARY |ID | test_engine |
| greptime | information_schema | optimizer_trace | LOCAL TEMPORARY |ID | |
| greptime | information_schema | parameters | LOCAL TEMPORARY |ID | |
| greptime | information_schema | partitions | LOCAL TEMPORARY |ID | |
| greptime | information_schema | profiling | LOCAL TEMPORARY |ID | |
| greptime | information_schema | referential_constraints | LOCAL TEMPORARY |ID | |
| greptime | information_schema | region_peers | LOCAL TEMPORARY |ID | |
| greptime | information_schema | routines | LOCAL TEMPORARY |ID | |
| greptime | information_schema | runtime_metrics | LOCAL TEMPORARY |ID | |
| greptime | information_schema | schema_privileges | LOCAL TEMPORARY |ID | |
| greptime | information_schema | schemata | LOCAL TEMPORARY |ID | |
| greptime | information_schema | session_status | LOCAL TEMPORARY |ID | |
| greptime | information_schema | table_constraints | LOCAL TEMPORARY |ID | |
| greptime | information_schema | table_privileges | LOCAL TEMPORARY |ID | |
| greptime | information_schema | tables | LOCAL TEMPORARY |ID | |
| greptime | for_test_view | test_table | BASE TABLE |ID | mito |
| greptime | for_test_view | test_view | VIEW |ID | |
| greptime | information_schema | triggers | LOCAL TEMPORARY |ID | |
+---------------+--------------------+---------------------------------------+-----------------+----------+-------------+
-- SQLNESS REPLACE (\s\d+\s) ID
SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'VIEW';
+---------------+---------------+------------+------------+----------+--------+
| table_catalog | table_schema | table_name | table_type | table_id | engine |
+---------------+---------------+------------+------------+----------+--------+
| greptime | for_test_view | test_view | VIEW |ID | |
+---------------+---------------+------------+------------+----------+--------+
SHOW COLUMNS FROM test_view;
++
++
SHOW FULL COLUMNS FROM test_view;
++
++
SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'test_view';
++
++
--- FIXED in the following PR ---
SELECT * FROM test_view;
Error: 3001(EngineExecuteQuery), DataFusion error: Unsupported operation: get stream from a distributed table
USE public;
Affected Rows: 0
DROP DATABASE for_test_view;
Affected Rows: 0

View File

@@ -0,0 +1,52 @@
--- test CREATE VIEW ---
CREATE DATABASE for_test_view;
USE for_test_view;
CREATE TABLE test_table(a STRING, ts TIMESTAMP TIME INDEX);
CREATE VIEW test_view;
CREATE VIEW test_view as DELETE FROM public.numbers;
--- Table already exists ---
CREATE VIEW test_table as SELECT * FROM public.numbers;
--- Table already exists even when create_if_not_exists ---
CREATE VIEW IF NOT EXISTS test_table as SELECT * FROM public.numbers;
--- Table already exists even when or_replace ---
CREATE OR REPLACE VIEW test_table as SELECT * FROM public.numbers;
CREATE VIEW test_view as SELECT * FROM public.numbers;
--- View already exists ----
CREATE VIEW test_view as SELECT * FROM public.numbers;
CREATE VIEW IF NOT EXISTS test_view as SELECT * FROM public.numbers;
CREATE OR REPLACE VIEW test_view as SELECT * FROM public.numbers;
SHOW TABLES;
SHOW FULL TABLES;
-- SQLNESS REPLACE (\s\d+\s) ID
SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE;
-- SQLNESS REPLACE (\s\d+\s) ID
SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'VIEW';
SHOW COLUMNS FROM test_view;
SHOW FULL COLUMNS FROM test_view;
SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'test_view';
--- FIXED in the following PR ---
SELECT * FROM test_view;
USE public;
DROP DATABASE for_test_view;