feat: support "use" in GRPC requests (#922)

* feat: support "use catalog and schema"(behave like the "use" in MySQL) in GRPC requests

* fix: rebase develop
This commit is contained in:
LFC
2023-02-02 20:02:56 +08:00
committed by GitHub
parent 74adb077bc
commit af935671b2
76 changed files with 957 additions and 963 deletions

9
Cargo.lock generated
View File

@@ -1304,6 +1304,7 @@ dependencies = [
"arrow-flight",
"async-stream",
"common-base",
"common-catalog",
"common-error",
"common-grpc",
"common-grpc-expr",
@@ -6697,6 +6698,7 @@ name = "session"
version = "0.1.0"
dependencies = [
"arc-swap",
"common-catalog",
"common-telemetry",
]
@@ -6938,9 +6940,9 @@ dependencies = [
[[package]]
name = "sqlness"
version = "0.1.1"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ffa69a2ae10018ec72a3cb7574e3a33a3fc322ed03740f6e435fd7f0c1db4a7"
checksum = "16a494ea677f9de93e8c25ec33b1073f8f72d61466d4595ecf1462ba877fe924"
dependencies = [
"async-trait",
"derive_builder 0.11.2",
@@ -6962,7 +6964,10 @@ dependencies = [
"common-error",
"common-grpc",
"common-query",
"common-time",
"serde",
"sqlness",
"tinytemplate",
"tokio",
]

View File

@@ -32,7 +32,6 @@ use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::task::JoinSet;
const DATABASE_NAME: &str = "greptime";
const CATALOG_NAME: &str = "greptime";
const SCHEMA_NAME: &str = "public";
const TABLE_NAME: &str = "nyc_taxi";
@@ -100,7 +99,6 @@ async fn write_data(
let record_batch = record_batch.unwrap();
let (columns, row_count) = convert_record_batch(record_batch);
let request = InsertRequest {
schema_name: "public".to_string(),
table_name: TABLE_NAME.to_string(),
region_number: 0,
columns,
@@ -424,7 +422,7 @@ fn main() {
.unwrap()
.block_on(async {
let client = Client::with_urls(vec![&args.endpoint]);
let db = Database::new(DATABASE_NAME, client);
let db = Database::with_client(client);
if !args.skip_write {
do_write(&args, &db).await;

View File

@@ -5,11 +5,19 @@ package greptime.v1;
import "greptime/v1/ddl.proto";
import "greptime/v1/column.proto";
message RequestHeader {
// The `catalog` that is selected to be used in this request.
string catalog = 1;
// The `schema` that is selected to be used in this request.
string schema = 2;
}
message GreptimeRequest {
RequestHeader header = 1;
oneof request {
InsertRequest insert = 1;
QueryRequest query = 2;
DdlRequest ddl = 3;
InsertRequest insert = 2;
QueryRequest query = 3;
DdlRequest ddl = 4;
}
}
@@ -21,8 +29,7 @@ message QueryRequest {
}
message InsertRequest {
string schema_name = 1;
string table_name = 2;
string table_name = 1;
// Data is represented here.
repeated Column columns = 3;

View File

@@ -9,6 +9,7 @@ api = { path = "../api" }
arrow-flight.workspace = true
async-stream.workspace = true
common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-grpc-expr = { path = "../common/grpc-expr" }

View File

@@ -65,13 +65,12 @@ async fn run() {
region_ids: vec![0],
};
let db = Database::new("create table", client.clone());
let db = Database::with_client(client);
let result = db.create(create_table_expr).await.unwrap();
event!(Level::INFO, "create table result: {:#?}", result);
let logical = mock_logical_plan();
event!(Level::INFO, "plan size: {:#?}", logical.len());
let db = Database::new("greptime", client);
let result = db.logical_plan(logical).await.unwrap();
event!(Level::INFO, "result: {:#?}", result);

View File

@@ -19,9 +19,10 @@ use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
AlterExpr, CreateTableExpr, DdlRequest, DropTableExpr, GreptimeRequest, InsertRequest,
QueryRequest,
QueryRequest, RequestHeader,
};
use arrow_flight::{FlightData, Ticket};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::*;
use common_grpc::flight::{flight_messages_to_recordbatches, FlightDecoder, FlightMessage};
use common_query::Output;
@@ -34,83 +35,89 @@ use crate::{error, Client, Result};
#[derive(Clone, Debug)]
pub struct Database {
name: String,
// The "catalog" and "schema" to be used in processing the requests at the server side.
// They are the "hint" or "context", just like how the "database" in "USE" statement is treated in MySQL.
// They will be carried in the request header.
catalog: String,
schema: String,
client: Client,
}
impl Database {
pub fn new(name: impl Into<String>, client: Client) -> Self {
pub fn new(catalog: impl Into<String>, schema: impl Into<String>, client: Client) -> Self {
Self {
name: name.into(),
catalog: catalog.into(),
schema: schema.into(),
client,
}
}
pub fn name(&self) -> &str {
&self.name
pub fn with_client(client: Client) -> Self {
Self::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client)
}
pub fn set_schema(&mut self, schema: impl Into<String>) {
self.schema = schema.into();
}
pub async fn insert(&self, request: InsertRequest) -> Result<Output> {
self.do_get(GreptimeRequest {
request: Some(Request::Insert(request)),
})
.await
self.do_get(Request::Insert(request)).await
}
pub async fn sql(&self, sql: &str) -> Result<Output> {
self.do_get(GreptimeRequest {
request: Some(Request::Query(QueryRequest {
query: Some(Query::Sql(sql.to_string())),
})),
})
self.do_get(Request::Query(QueryRequest {
query: Some(Query::Sql(sql.to_string())),
}))
.await
}
pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<Output> {
self.do_get(GreptimeRequest {
request: Some(Request::Query(QueryRequest {
query: Some(Query::LogicalPlan(logical_plan)),
})),
})
self.do_get(Request::Query(QueryRequest {
query: Some(Query::LogicalPlan(logical_plan)),
}))
.await
}
pub async fn create(&self, expr: CreateTableExpr) -> Result<Output> {
self.do_get(GreptimeRequest {
request: Some(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
})),
})
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
}))
.await
}
pub async fn alter(&self, expr: AlterExpr) -> Result<Output> {
self.do_get(GreptimeRequest {
request: Some(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
})),
})
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
}))
.await
}
pub async fn drop_table(&self, expr: DropTableExpr) -> Result<Output> {
self.do_get(GreptimeRequest {
request: Some(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(expr)),
})),
})
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(expr)),
}))
.await
}
async fn do_get(&self, request: GreptimeRequest) -> Result<Output> {
async fn do_get(&self, request: Request) -> Result<Output> {
let request = GreptimeRequest {
header: Some(RequestHeader {
catalog: self.catalog.clone(),
schema: self.schema.clone(),
}),
request: Some(request),
};
let request = Ticket {
ticket: request.encode_to_vec(),
};
let mut client = self.client.make_client()?;
// TODO(LFC): Streaming get flight data.
let flight_data: Vec<FlightData> = client
.mut_inner()
.do_get(Ticket {
ticket: request.encode_to_vec(),
})
.do_get(request)
.and_then(|response| response.into_inner().try_collect())
.await
.map_err(|e| {

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use clap::Parser;
use common_telemetry::{info, logging};
use common_telemetry::{info, logging, warn};
use meta_srv::bootstrap;
use meta_srv::metasrv::MetaSrvOptions;
use snafu::ResultExt;
@@ -58,6 +58,8 @@ struct StartCommand {
config_file: Option<String>,
#[clap(short, long)]
selector: Option<String>,
#[clap(long)]
use_memory_store: bool,
}
impl StartCommand {
@@ -100,6 +102,11 @@ impl TryFrom<StartCommand> for MetaSrvOptions {
info!("Using {} selector", selector_type);
}
if cmd.use_memory_store {
warn!("Using memory store for Meta. Make sure you are in running tests.");
opts.use_memory_store = true;
}
Ok(opts)
}
}
@@ -118,6 +125,7 @@ mod tests {
store_addr: Some("127.0.0.1:2380".to_string()),
config_file: None,
selector: Some("LoadBased".to_string()),
use_memory_store: false,
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
@@ -137,6 +145,7 @@ mod tests {
"{}/../../config/metasrv.example.toml",
std::env::current_dir().unwrap().as_path().to_str().unwrap()
)),
use_memory_store: false,
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);

View File

@@ -29,16 +29,8 @@ use crate::error::{
/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
let catalog_name = if expr.catalog_name.is_empty() {
None
} else {
Some(expr.catalog_name)
};
let schema_name = if expr.schema_name.is_empty() {
None
} else {
Some(expr.schema_name)
};
let catalog_name = expr.catalog_name;
let schema_name = expr.schema_name;
let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
match kind {
Kind::AddColumns(add_columns) => {
@@ -219,8 +211,8 @@ mod tests {
};
let alter_request = alter_expr_to_request(expr).unwrap();
assert_eq!(None, alter_request.catalog_name);
assert_eq!(None, alter_request.schema_name);
assert_eq!(alter_request.catalog_name, "");
assert_eq!(alter_request.schema_name, "");
assert_eq!("monitor".to_string(), alter_request.table_name);
let add_column = match alter_request.alter_kind {
AlterKind::AddColumns { mut columns } => columns.pop().unwrap(),
@@ -250,8 +242,8 @@ mod tests {
};
let alter_request = alter_expr_to_request(expr).unwrap();
assert_eq!(Some("test_catalog".to_string()), alter_request.catalog_name);
assert_eq!(Some("test_schema".to_string()), alter_request.schema_name);
assert_eq!(alter_request.catalog_name, "test_catalog");
assert_eq!(alter_request.schema_name, "test_schema");
assert_eq!("monitor".to_string(), alter_request.table_name);
let mut drop_names = match alter_request.alter_kind {

View File

@@ -21,7 +21,6 @@ use api::v1::{
InsertRequest as GrpcInsertRequest,
};
use common_base::BitVec;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_time::timestamp::Timestamp;
use common_time::{Date, DateTime};
use datatypes::data_type::{ConcreteDataType, DataType};
@@ -31,7 +30,7 @@ use datatypes::value::Value;
use datatypes::vectors::MutableVector;
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest};
use table::requests::InsertRequest;
use crate::error::{
ColumnDataTypeSnafu, CreateVectorSnafu, DuplicatedTimestampColumnSnafu, IllegalInsertDataSnafu,
@@ -81,20 +80,6 @@ pub fn find_new_columns(schema: &SchemaRef, columns: &[Column]) -> Result<Option
}
}
/// Build a alter table rqeusts that adding new columns.
#[inline]
pub fn build_alter_table_request(
table_name: &str,
columns: Vec<AddColumnRequest>,
) -> AlterTableRequest {
AlterTableRequest {
catalog_name: None,
schema_name: None,
table_name: table_name.to_string(),
alter_kind: AlterKind::AddColumns { columns },
}
}
pub fn column_to_vector(column: &Column, rows: u32) -> Result<VectorRef> {
let wrapper = ColumnDataTypeWrapper::try_new(column.datatype).context(ColumnDataTypeSnafu)?;
let column_datatype = wrapper.datatype();
@@ -281,9 +266,11 @@ pub fn build_create_expr_from_insertion(
Ok(expr)
}
pub fn to_table_insert_request(request: GrpcInsertRequest) -> Result<InsertRequest> {
let catalog_name = DEFAULT_CATALOG_NAME;
let schema_name = &request.schema_name;
pub fn to_table_insert_request(
catalog_name: &str,
schema_name: &str,
request: GrpcInsertRequest,
) -> Result<InsertRequest> {
let table_name = &request.table_name;
let row_count = request.row_count as usize;
@@ -617,13 +604,12 @@ mod tests {
fn test_to_table_insert_request() {
let (columns, row_count) = mock_insert_batch();
let request = GrpcInsertRequest {
schema_name: "public".to_string(),
table_name: "demo".to_string(),
columns,
row_count,
region_number: 0,
};
let insert_req = to_table_insert_request(request).unwrap();
let insert_req = to_table_insert_request("greptime", "public", request).unwrap();
assert_eq!("greptime", insert_req.catalog_name);
assert_eq!("public", insert_req.schema_name);

View File

@@ -17,6 +17,4 @@ pub mod error;
pub mod insert;
pub use alter::{alter_expr_to_request, create_expr_to_request, create_table_schema};
pub use insert::{
build_alter_table_request, build_create_expr_from_insertion, column_to_vector, find_new_columns,
};
pub use insert::{build_create_expr_from_insertion, column_to_vector, find_new_columns};

View File

@@ -80,7 +80,10 @@ pub enum Error {
},
#[snafu(display("Table not found: {}", table_name))]
TableNotFound { table_name: String },
TableNotFound {
table_name: String,
backtrace: Backtrace,
},
#[snafu(display("Column {} not found in table {}", column_name, table_name))]
ColumnNotFound {

View File

@@ -53,7 +53,7 @@ use crate::sql::SqlHandler;
mod grpc;
mod script;
mod sql;
pub mod sql;
pub(crate) type DefaultEngine = MitoEngine<EngineImpl<RaftEngineLogStore>>;

View File

@@ -15,14 +15,13 @@
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request as GrpcRequest;
use api::v1::query_request::Query;
use api::v1::{CreateDatabaseExpr, DdlRequest, GreptimeRequest, InsertRequest};
use api::v1::{CreateDatabaseExpr, DdlRequest, InsertRequest};
use async_trait::async_trait;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_query::Output;
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContext;
use session::context::QueryContextRef;
use snafu::prelude::*;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::requests::CreateDatabaseRequest;
@@ -50,26 +49,31 @@ impl Instance {
.context(ExecuteSqlSnafu)
}
async fn handle_query(&self, query: Query) -> Result<Output> {
async fn handle_query(&self, query: Query, ctx: QueryContextRef) -> Result<Output> {
Ok(match query {
Query::Sql(sql) => {
let stmt = QueryLanguageParser::parse_sql(&sql).context(ExecuteSqlSnafu)?;
self.execute_stmt(stmt, QueryContext::arc()).await?
self.execute_stmt(stmt, ctx).await?
}
Query::LogicalPlan(plan) => self.execute_logical(plan).await?,
})
}
pub async fn handle_insert(&self, request: InsertRequest) -> Result<Output> {
pub async fn handle_insert(
&self,
request: InsertRequest,
ctx: QueryContextRef,
) -> Result<Output> {
let catalog = &ctx.current_catalog();
let schema = &ctx.current_schema();
let table_name = &request.table_name.clone();
// TODO(LFC): InsertRequest should carry catalog name, too.
let table = self
.catalog_manager
.table(DEFAULT_CATALOG_NAME, &request.schema_name, table_name)
.table(catalog, schema, table_name)
.context(error::CatalogSnafu)?
.context(error::TableNotFoundSnafu { table_name })?;
let request = common_grpc_expr::insert::to_table_insert_request(request)
let request = common_grpc_expr::insert::to_table_insert_request(catalog, schema, request)
.context(error::InsertDataSnafu)?;
let affected_rows = table
@@ -96,19 +100,16 @@ impl Instance {
impl GrpcQueryHandler for Instance {
type Error = error::Error;
async fn do_query(&self, query: GreptimeRequest) -> Result<Output> {
let request = query.request.context(error::MissingRequiredFieldSnafu {
name: "GreptimeRequest.request",
})?;
async fn do_query(&self, request: GrpcRequest, ctx: QueryContextRef) -> Result<Output> {
match request {
GrpcRequest::Insert(request) => self.handle_insert(request).await,
GrpcRequest::Insert(request) => self.handle_insert(request, ctx).await,
GrpcRequest::Query(query_request) => {
let query = query_request
.query
.context(error::MissingRequiredFieldSnafu {
name: "QueryRequest.query",
})?;
self.handle_query(query).await
self.handle_query(query, ctx).await
}
GrpcRequest::Ddl(request) => self.handle_ddl(request).await,
}
@@ -124,6 +125,7 @@ mod test {
};
use common_recordbatch::RecordBatches;
use datatypes::prelude::*;
use session::context::QueryContext;
use super::*;
use crate::tests::test_util::{self, MockInstance};
@@ -133,67 +135,61 @@ mod test {
let instance = MockInstance::new("test_handle_ddl").await;
let instance = instance.inner();
let query = GreptimeRequest {
request: Some(GrpcRequest::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "my_database".to_string(),
create_if_not_exists: true,
})),
let query = GrpcRequest::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "my_database".to_string(),
create_if_not_exists: true,
})),
};
let output = instance.do_query(query).await.unwrap();
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
let query = GreptimeRequest {
request: Some(GrpcRequest::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(CreateTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "my_database".to_string(),
table_name: "my_table".to_string(),
desc: "blabla".to_string(),
column_defs: vec![
ColumnDef {
name: "a".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
},
ColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: vec![],
},
],
time_index: "ts".to_string(),
..Default::default()
})),
let query = GrpcRequest::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(CreateTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "my_database".to_string(),
table_name: "my_table".to_string(),
desc: "blabla".to_string(),
column_defs: vec![
ColumnDef {
name: "a".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
},
ColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: vec![],
},
],
time_index: "ts".to_string(),
..Default::default()
})),
};
let output = instance.do_query(query).await.unwrap();
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
let query = GreptimeRequest {
request: Some(GrpcRequest::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(AlterExpr {
catalog_name: "greptime".to_string(),
schema_name: "my_database".to_string(),
table_name: "my_table".to_string(),
kind: Some(alter_expr::Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(ColumnDef {
name: "b".to_string(),
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
}),
is_key: true,
}],
})),
let query = GrpcRequest::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(AlterExpr {
catalog_name: "greptime".to_string(),
schema_name: "my_database".to_string(),
table_name: "my_table".to_string(),
kind: Some(alter_expr::Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(ColumnDef {
name: "b".to_string(),
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
}),
is_key: true,
}],
})),
})),
};
let output = instance.do_query(query).await.unwrap();
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
let output = instance
@@ -232,7 +228,6 @@ mod test {
.unwrap();
let insert = InsertRequest {
schema_name: "public".to_string(),
table_name: "demo".to_string(),
columns: vec![
Column {
@@ -274,10 +269,8 @@ mod test {
..Default::default()
};
let query = GreptimeRequest {
request: Some(GrpcRequest::Insert(insert)),
};
let output = instance.do_query(query).await.unwrap();
let query = GrpcRequest::Insert(insert);
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
assert!(matches!(output, Output::AffectedRows(3)));
let output = instance
@@ -305,27 +298,23 @@ mod test {
.await
.unwrap();
let query = GreptimeRequest {
request: Some(GrpcRequest::Query(QueryRequest {
query: Some(Query::Sql(
"INSERT INTO demo(host, cpu, memory, ts) VALUES \
let query = GrpcRequest::Query(QueryRequest {
query: Some(Query::Sql(
"INSERT INTO demo(host, cpu, memory, ts) VALUES \
('host1', 66.6, 1024, 1672201025000),\
('host2', 88.8, 333.3, 1672201026000)"
.to_string(),
)),
})),
};
let output = instance.do_query(query).await.unwrap();
.to_string(),
)),
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
assert!(matches!(output, Output::AffectedRows(2)));
let query = GreptimeRequest {
request: Some(GrpcRequest::Query(QueryRequest {
query: Some(Query::Sql(
"SELECT ts, host, cpu, memory FROM demo".to_string(),
)),
})),
};
let output = instance.do_query(query).await.unwrap();
let query = GrpcRequest::Query(QueryRequest {
query: Some(Query::Sql(
"SELECT ts, host, cpu, memory FROM demo".to_string(),
)),
});
let output = instance.do_query(query, QueryContext::arc()).await.unwrap();
let Output::Stream(stream) = output else { unreachable!() };
let recordbatch = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use async_trait::async_trait;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging::info;
@@ -25,7 +24,7 @@ use snafu::prelude::*;
use sql::ast::ObjectName;
use sql::statements::statement::Statement;
use table::engine::TableReference;
use table::requests::CreateDatabaseRequest;
use table::requests::{CreateDatabaseRequest, DropTableRequest};
use crate::error::{self, BumpTableIdSnafu, ExecuteSqlSnafu, Result, TableIdProviderNotFoundSnafu};
use crate::instance::Instance;
@@ -89,12 +88,11 @@ impl Instance {
let name = c.name.clone();
let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?;
let table_ref = TableReference::full(&catalog, &schema, &table);
let request = self.sql_handler.create_to_request(table_id, c, table_ref)?;
let request = self
.sql_handler
.create_to_request(table_id, c, &table_ref)?;
let table_id = request.id;
info!(
"Creating table, catalog: {:?}, schema: {:?}, table name: {:?}, table id: {}",
catalog, schema, table, table_id
);
info!("Creating table: {table_ref}, table id = {table_id}",);
self.sql_handler
.execute(SqlRequest::CreateTable(request), query_ctx)
@@ -110,7 +108,13 @@ impl Instance {
.await
}
QueryStatement::Sql(Statement::DropTable(drop_table)) => {
let req = self.sql_handler.drop_table_to_request(drop_table);
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(drop_table.table_name(), query_ctx.clone())?;
let req = DropTableRequest {
catalog_name,
schema_name,
table_name,
};
self.sql_handler
.execute(SqlRequest::DropTable(req), query_ctx)
.await
@@ -138,16 +142,14 @@ impl Instance {
QueryStatement::Sql(Statement::ShowCreateTable(_stmt)) => {
unimplemented!("SHOW CREATE TABLE is unimplemented yet");
}
QueryStatement::Sql(Statement::Use(schema)) => {
let catalog = query_ctx.current_catalog();
let catalog = catalog.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
QueryStatement::Sql(Statement::Use(ref schema)) => {
let catalog = &query_ctx.current_catalog();
ensure!(
self.is_valid_schema(catalog, &schema)?,
self.is_valid_schema(catalog, schema)?,
error::DatabaseNotFoundSnafu { catalog, schema }
);
query_ctx.set_current_schema(&schema);
query_ctx.set_current_schema(schema);
Ok(Output::RecordBatches(RecordBatches::empty()))
}
@@ -168,18 +170,18 @@ impl Instance {
// TODO(LFC): Refactor consideration: move this function to some helper mod,
// could be done together or after `TableReference`'s refactoring, when issue #559 is resolved.
/// Converts maybe fully-qualified table name (`<catalog>.<schema>.<table>`) to tuple.
fn table_idents_to_full_name(
pub fn table_idents_to_full_name(
obj_name: &ObjectName,
query_ctx: QueryContextRef,
) -> Result<(String, String, String)> {
match &obj_name.0[..] {
[table] => Ok((
DEFAULT_CATALOG_NAME.to_string(),
query_ctx.current_schema().unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string()),
query_ctx.current_catalog(),
query_ctx.current_schema(),
table.value.clone(),
)),
[schema, table] => Ok((
DEFAULT_CATALOG_NAME.to_string(),
query_ctx.current_catalog(),
schema.value.clone(),
table.value.clone(),
)),
@@ -229,6 +231,7 @@ impl SqlQueryHandler for Instance {
mod test {
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use session::context::QueryContext;
use super::*;
@@ -244,10 +247,7 @@ mod test {
let bare = ObjectName(vec![my_table.into()]);
let using_schema = "foo";
let query_ctx = Arc::new(QueryContext::with(
DEFAULT_CATALOG_NAME.to_owned(),
using_schema.to_string(),
));
let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, using_schema));
let empty_ctx = Arc::new(QueryContext::new());
assert_eq!(

View File

@@ -26,7 +26,8 @@ use table::engine::{EngineContext, TableEngineRef, TableReference};
use table::requests::*;
use table::TableRef;
use crate::error::{ExecuteSqlSnafu, GetTableSnafu, Result, TableNotFoundSnafu};
use crate::error::{self, ExecuteSqlSnafu, GetTableSnafu, Result, TableNotFoundSnafu};
use crate::instance::sql::table_idents_to_full_name;
mod alter;
mod create;
@@ -81,17 +82,29 @@ impl SqlHandler {
show_databases(stmt, self.catalog_manager.clone()).context(ExecuteSqlSnafu)
}
SqlRequest::ShowTables(stmt) => {
show_tables(stmt, self.catalog_manager.clone(), query_ctx).context(ExecuteSqlSnafu)
show_tables(stmt, self.catalog_manager.clone(), query_ctx.clone())
.context(ExecuteSqlSnafu)
}
SqlRequest::DescribeTable(stmt) => {
describe_table(stmt, self.catalog_manager.clone()).context(ExecuteSqlSnafu)
let (catalog, schema, table) =
table_idents_to_full_name(stmt.name(), query_ctx.clone())?;
let table = self
.catalog_manager
.table(&catalog, &schema, &table)
.context(error::CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: stmt.name().to_string(),
})?;
describe_table(table).context(ExecuteSqlSnafu)
}
SqlRequest::Explain(stmt) => {
explain(stmt, self.query_engine.clone(), query_ctx.clone())
.await
.context(ExecuteSqlSnafu)
}
SqlRequest::Explain(stmt) => explain(stmt, self.query_engine.clone(), query_ctx)
.await
.context(ExecuteSqlSnafu),
};
if let Err(e) = &result {
error!("Datanode execution error: {:?}", e);
error!(e; "{query_ctx}");
}
result
}

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use catalog::RenameTableRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use snafu::prelude::*;
use sql::statements::alter::{AlterTable, AlterTableOperation};
@@ -27,12 +26,10 @@ use crate::sql::SqlHandler;
impl SqlHandler {
pub(crate) async fn alter(&self, req: AlterTableRequest) -> Result<Output> {
let ctx = EngineContext {};
let catalog_name = req.catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
let schema_name = req.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME);
let table_name = req.table_name.clone();
let table_ref = TableReference {
catalog: catalog_name,
schema: schema_name,
catalog: &req.catalog_name,
schema: &req.schema_name,
table: &table_name,
};
@@ -98,8 +95,8 @@ impl SqlHandler {
},
};
Ok(AlterTableRequest {
catalog_name: Some(table_ref.catalog.to_string()),
schema_name: Some(table_ref.schema.to_string()),
catalog_name: table_ref.catalog.to_string(),
schema_name: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
alter_kind,
})
@@ -134,10 +131,13 @@ mod tests {
let handler = create_mock_sql_handler().await;
let alter_table = parse_sql("ALTER TABLE my_metric_1 ADD tagk_i STRING Null;");
let req = handler
.alter_to_request(alter_table, TableReference::bare("my_metric_1"))
.alter_to_request(
alter_table,
TableReference::full("greptime", "public", "my_metric_1"),
)
.unwrap();
assert_eq!(req.catalog_name, Some("greptime".to_string()));
assert_eq!(req.schema_name, Some("public".to_string()));
assert_eq!(req.catalog_name, "greptime");
assert_eq!(req.schema_name, "public");
assert_eq!(req.table_name, "my_metric_1");
let alter_kind = req.alter_kind;
@@ -159,10 +159,13 @@ mod tests {
let handler = create_mock_sql_handler().await;
let alter_table = parse_sql("ALTER TABLE test_table RENAME table_t;");
let req = handler
.alter_to_request(alter_table, TableReference::bare("test_table"))
.alter_to_request(
alter_table,
TableReference::full("greptime", "public", "test_table"),
)
.unwrap();
assert_eq!(req.catalog_name, Some("greptime".to_string()));
assert_eq!(req.schema_name, Some("public".to_string()));
assert_eq!(req.catalog_name, "greptime");
assert_eq!(req.schema_name, "public");
assert_eq!(req.table_name, "test_table");
let alter_kind = req.alter_kind;

View File

@@ -122,7 +122,7 @@ impl SqlHandler {
&self,
table_id: TableId,
stmt: CreateTable,
table_ref: TableReference,
table_ref: &TableReference,
) -> Result<CreateTableRequest> {
let mut ts_index = usize::MAX;
let mut primary_keys = vec![];
@@ -259,7 +259,7 @@ mod tests {
PRIMARY KEY(host)) engine=mito with(regions=1);"#,
);
let c = handler
.create_to_request(42, parsed_stmt, TableReference::bare("demo_table"))
.create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
.unwrap();
assert_eq!("demo_table", c.table_name);
assert_eq!(42, c.id);
@@ -282,7 +282,7 @@ mod tests {
TIME INDEX (ts)) engine=mito with(regions=1);"#,
);
let c = handler
.create_to_request(42, parsed_stmt, TableReference::bare("demo_table"))
.create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
.unwrap();
assert!(c.primary_key_indices.is_empty());
assert_eq!(c.schema.timestamp_index(), Some(1));
@@ -300,7 +300,7 @@ mod tests {
);
let error = handler
.create_to_request(42, parsed_stmt, TableReference::bare("demo_table"))
.create_to_request(42, parsed_stmt, &TableReference::bare("demo_table"))
.unwrap_err();
assert_matches!(error, Error::KeyColumnNotFound { .. });
}
@@ -322,7 +322,7 @@ mod tests {
let handler = create_mock_sql_handler().await;
let error = handler
.create_to_request(42, create_table, TableReference::full("c", "s", "demo"))
.create_to_request(42, create_table, &TableReference::full("c", "s", "demo"))
.unwrap_err();
assert_matches!(error, Error::InvalidPrimaryKey { .. });
}
@@ -344,7 +344,7 @@ mod tests {
let handler = create_mock_sql_handler().await;
let request = handler
.create_to_request(42, create_table, TableReference::full("c", "s", "demo"))
.create_to_request(42, create_table, &TableReference::full("c", "s", "demo"))
.unwrap();
assert_eq!(42, request.id);

View File

@@ -17,7 +17,6 @@ use common_error::prelude::BoxedError;
use common_query::Output;
use common_telemetry::info;
use snafu::ResultExt;
use sql::statements::drop::DropTable;
use table::engine::{EngineContext, TableReference};
use table::requests::DropTableRequest;
@@ -60,12 +59,4 @@ impl SqlHandler {
Ok(Output::AffectedRows(1))
}
pub fn drop_table_to_request(&self, drop_table: DropTable) -> DropTableRequest {
DropTableRequest {
catalog_name: drop_table.catalog_name,
schema_name: drop_table.schema_name,
table_name: drop_table.table_name,
}
}
}

View File

@@ -647,10 +647,7 @@ async fn try_execute_sql_in_db(
sql: &str,
db: &str,
) -> Result<Output, crate::error::Error> {
let query_ctx = Arc::new(QueryContext::with(
DEFAULT_CATALOG_NAME.to_owned(),
db.to_string(),
));
let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, db));
instance.inner().execute_sql(sql, query_ctx).await
}

View File

@@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use session::context::QueryContext;
@@ -23,10 +20,7 @@ use crate::tests::test_util::{check_output_stream, setup_test_instance};
#[tokio::test(flavor = "multi_thread")]
async fn sql_insert_promql_query_ceil() {
let instance = setup_test_instance("test_execute_insert").await;
let query_ctx = Arc::new(QueryContext::with(
DEFAULT_CATALOG_NAME.to_owned(),
DEFAULT_SCHEMA_NAME.to_owned(),
));
let query_ctx = QueryContext::arc();
let put_output = instance
.inner()
.execute_sql(

View File

@@ -20,6 +20,12 @@ use store_api::storage::RegionId;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("{source}"))]
External {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to request Datanode, source: {}", source))]
RequestDatanode {
#[snafu(backtrace)]
@@ -401,6 +407,7 @@ impl ErrorExt for Error {
Error::InvokeDatanode { source } => source.status_code(),
Error::ColumnDefaultValue { source, .. } => source.status_code(),
Error::ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments,
Error::External { source } => source.status_code(),
Error::DeserializePartition { source, .. } | Error::FindTableRoute { source, .. } => {
source.status_code()
}

View File

@@ -25,17 +25,16 @@ use std::time::Duration;
use api::v1::alter_expr::Kind;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::{
AddColumns, AlterExpr, Column, DdlRequest, DropTableExpr, GreptimeRequest, InsertRequest,
};
use api::v1::{AddColumns, AlterExpr, Column, DdlRequest, DropTableExpr, InsertRequest};
use async_trait::async_trait;
use catalog::remote::MetaKvBackend;
use catalog::CatalogManagerRef;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging::{debug, info};
use datanode::instance::sql::table_idents_to_full_name;
use datanode::instance::InstanceRef as DnInstanceRef;
use distributed::DistInstance;
use meta_client::client::{MetaClient, MetaClientBuilder};
@@ -194,10 +193,14 @@ impl Instance {
}
/// Handle batch inserts
pub async fn handle_inserts(&self, requests: Vec<InsertRequest>) -> Result<Output> {
pub async fn handle_inserts(
&self,
requests: Vec<InsertRequest>,
ctx: QueryContextRef,
) -> Result<Output> {
let mut success = 0;
for request in requests {
match self.handle_insert(request).await? {
match self.handle_insert(request, ctx.clone()).await? {
Output::AffectedRows(rows) => success += rows,
_ => unreachable!("Insert should not yield output other than AffectedRows"),
}
@@ -205,20 +208,12 @@ impl Instance {
Ok(Output::AffectedRows(success))
}
async fn handle_insert(&self, request: InsertRequest) -> Result<Output> {
let schema_name = &request.schema_name;
let table_name = &request.table_name;
let catalog_name = DEFAULT_CATALOG_NAME;
let columns = &request.columns;
self.create_or_alter_table_on_demand(catalog_name, schema_name, table_name, columns)
async fn handle_insert(&self, request: InsertRequest, ctx: QueryContextRef) -> Result<Output> {
self.create_or_alter_table_on_demand(ctx.clone(), &request.table_name, &request.columns)
.await?;
let query = GreptimeRequest {
request: Some(Request::Insert(request)),
};
GrpcQueryHandler::do_query(&*self.grpc_query_handler, query).await
let query = Request::Insert(request);
GrpcQueryHandler::do_query(&*self.grpc_query_handler, query, ctx).await
}
// check if table already exist:
@@ -226,11 +221,13 @@ impl Instance {
// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
async fn create_or_alter_table_on_demand(
&self,
catalog_name: &str,
schema_name: &str,
ctx: QueryContextRef,
table_name: &str,
columns: &[Column],
) -> Result<()> {
let catalog_name = &ctx.current_catalog();
let schema_name = &ctx.current_schema();
let table = self
.catalog_manager
.table(catalog_name, schema_name, table_name)
@@ -241,7 +238,7 @@ impl Instance {
"Table {}.{}.{} does not exist, try create table",
catalog_name, schema_name, table_name,
);
self.create_table_by_columns(catalog_name, schema_name, table_name, columns)
self.create_table_by_columns(ctx, table_name, columns)
.await?;
info!(
"Successfully created table on insertion: {}.{}.{}",
@@ -258,13 +255,8 @@ impl Instance {
"Find new columns {:?} on insertion, try to alter table: {}.{}.{}",
add_columns, catalog_name, schema_name, table_name
);
self.add_new_columns_to_table(
catalog_name,
schema_name,
table_name,
add_columns,
)
.await?;
self.add_new_columns_to_table(ctx, table_name, add_columns)
.await?;
info!(
"Successfully altered table on insertion: {}.{}.{}",
catalog_name, schema_name, table_name
@@ -278,11 +270,13 @@ impl Instance {
/// Infer create table expr from inserting data
async fn create_table_by_columns(
&self,
catalog_name: &str,
schema_name: &str,
ctx: QueryContextRef,
table_name: &str,
columns: &[Column],
) -> Result<Output> {
let catalog_name = &ctx.current_catalog();
let schema_name = &ctx.current_schema();
// Create table automatically, build schema from data.
let create_expr = self
.create_expr_factory
@@ -295,18 +289,18 @@ impl Instance {
);
self.grpc_query_handler
.do_query(GreptimeRequest {
request: Some(Request::Ddl(DdlRequest {
.do_query(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(create_expr)),
})),
})
}),
ctx,
)
.await
}
async fn add_new_columns_to_table(
&self,
catalog_name: &str,
schema_name: &str,
ctx: QueryContextRef,
table_name: &str,
add_columns: AddColumns,
) -> Result<Output> {
@@ -315,25 +309,24 @@ impl Instance {
add_columns, table_name
);
let expr = AlterExpr {
catalog_name: ctx.current_catalog(),
schema_name: ctx.current_schema(),
table_name: table_name.to_string(),
schema_name: schema_name.to_string(),
catalog_name: catalog_name.to_string(),
kind: Some(Kind::AddColumns(add_columns)),
};
self.grpc_query_handler
.do_query(GreptimeRequest {
request: Some(Request::Ddl(DdlRequest {
.do_query(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
})),
})
}),
ctx,
)
.await
}
fn handle_use(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
let catalog = query_ctx.current_catalog();
let catalog = catalog.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
let catalog = &query_ctx.current_catalog();
ensure!(
self.catalog_manager
.schema(catalog, &db)
@@ -380,34 +373,28 @@ impl Instance {
| Statement::DescribeTable(_)
| Statement::Explain(_)
| Statement::Query(_)
| Statement::Insert(_) => {
| Statement::Insert(_)
| Statement::Alter(_) => {
return self.sql_handler.do_statement_query(stmt, query_ctx).await;
}
Statement::Alter(alter_stmt) => {
let expr =
AlterExpr::try_from(alter_stmt).context(error::AlterExprFromStmtSnafu)?;
return self
.grpc_query_handler
.do_query(GreptimeRequest {
request: Some(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
})),
})
.await;
}
Statement::DropTable(drop_stmt) => {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(drop_stmt.table_name(), query_ctx.clone())
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let expr = DropTableExpr {
catalog_name: drop_stmt.catalog_name,
schema_name: drop_stmt.schema_name,
table_name: drop_stmt.table_name,
catalog_name,
schema_name,
table_name,
};
return self
.grpc_query_handler
.do_query(GreptimeRequest {
request: Some(Request::Ddl(DdlRequest {
.do_query(
Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(expr)),
})),
})
}),
query_ctx,
)
.await;
}
Statement::ShowCreateTable(_) => error::NotSupportedSnafu { feat: query }.fail(),

View File

@@ -25,8 +25,10 @@ use catalog::{CatalogList, CatalogManager};
use chrono::DateTime;
use client::Database;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::BoxedError;
use common_query::Output;
use common_telemetry::{debug, error, info};
use datanode::instance::sql::table_idents_to_full_name;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use meta_client::client::MetaClient;
@@ -119,7 +121,7 @@ impl DistInstance {
for datanode in table_route.find_leaders() {
let client = self.datanode_clients.get_client(&datanode).await;
let client = Database::new("greptime", client);
let client = Database::with_client(client);
let regions = table_route.find_leader_regions(&datanode);
let mut create_expr_for_region = create_table.clone();
@@ -168,7 +170,19 @@ impl DistInstance {
Statement::ShowTables(stmt) => {
show_tables(stmt, self.catalog_manager.clone(), query_ctx)
}
Statement::DescribeTable(stmt) => describe_table(stmt, self.catalog_manager.clone()),
Statement::DescribeTable(stmt) => {
let (catalog, schema, table) = table_idents_to_full_name(stmt.name(), query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let table = self
.catalog_manager
.table(&catalog, &schema, &table)
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: stmt.name().to_string(),
})?;
describe_table(table)
}
Statement::Explain(stmt) => {
explain(Box::new(stmt), self.query_engine.clone(), query_ctx).await
}
@@ -346,16 +360,21 @@ impl DistInstance {
// GRPC InsertRequest to Table InsertRequest, than split Table InsertRequest, than assemble each GRPC InsertRequest, is rather inefficient,
// should operate on GRPC InsertRequest directly.
// Also remember to check the "region_number" carried in InsertRequest, too.
async fn handle_dist_insert(&self, request: InsertRequest) -> Result<Output> {
async fn handle_dist_insert(
&self,
request: InsertRequest,
ctx: QueryContextRef,
) -> Result<Output> {
let catalog = &ctx.current_catalog();
let schema = &ctx.current_schema();
let table_name = &request.table_name;
// TODO(LFC): InsertRequest should carry catalog name, too.
let table = self
.catalog_manager
.table(DEFAULT_CATALOG_NAME, &request.schema_name, table_name)
.table(catalog, schema, table_name)
.context(CatalogSnafu)?
.context(TableNotFoundSnafu { table_name })?;
let request = common_grpc_expr::insert::to_table_insert_request(request)
let request = common_grpc_expr::insert::to_table_insert_request(catalog, schema, request)
.context(ToTableInsertRequestSnafu)?;
let affected_rows = table.insert(request).await.context(TableSnafu)?;

View File

@@ -14,10 +14,10 @@
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::GreptimeRequest;
use async_trait::async_trait;
use common_query::Output;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContextRef;
use snafu::OptionExt;
use crate::error::{self, Result};
@@ -27,12 +27,9 @@ use crate::instance::distributed::DistInstance;
impl GrpcQueryHandler for DistInstance {
type Error = error::Error;
async fn do_query(&self, query: GreptimeRequest) -> Result<Output> {
let request = query.request.context(error::IncompleteGrpcResultSnafu {
err_msg: "Missing 'request' in GreptimeRequest",
})?;
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
match request {
Request::Insert(request) => self.handle_dist_insert(request).await,
Request::Insert(request) => self.handle_dist_insert(request, ctx).await,
Request::Query(_) => {
unreachable!("Query should have been handled directly in Frontend Instance!")
}

View File

@@ -14,12 +14,11 @@
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::GreptimeRequest;
use async_trait::async_trait;
use common_query::Output;
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContext;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt};
use crate::error::{self, Result};
@@ -29,12 +28,9 @@ use crate::instance::Instance;
impl GrpcQueryHandler for Instance {
type Error = error::Error;
async fn do_query(&self, query: GreptimeRequest) -> Result<Output> {
let request = query.request.context(error::IncompleteGrpcResultSnafu {
err_msg: "Missing field 'GreptimeRequest.request'",
})?;
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
let output = match request {
Request::Insert(request) => self.handle_insert(request).await?,
Request::Insert(request) => self.handle_insert(request, ctx).await?,
Request::Query(query_request) => {
let query = query_request
.query
@@ -43,8 +39,7 @@ impl GrpcQueryHandler for Instance {
})?;
match query {
Query::Sql(sql) => {
let mut result =
SqlQueryHandler::do_query(self, &sql, QueryContext::arc()).await;
let mut result = SqlQueryHandler::do_query(self, &sql, ctx).await;
ensure!(
result.len() == 1,
error::NotSupportedSnafu {
@@ -62,10 +57,8 @@ impl GrpcQueryHandler for Instance {
}
}
Request::Ddl(request) => {
let query = GreptimeRequest {
request: Some(Request::Ddl(request)),
};
GrpcQueryHandler::do_query(&*self.grpc_query_handler, query).await?
let query = Request::Ddl(request);
GrpcQueryHandler::do_query(&*self.grpc_query_handler, query, ctx).await?
}
};
Ok(output)
@@ -86,6 +79,7 @@ mod test {
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_query::Output;
use common_recordbatch::RecordBatches;
use session::context::QueryContext;
use super::*;
use crate::table::DistTable;
@@ -111,93 +105,83 @@ mod test {
}
async fn test_handle_ddl_request(instance: &Arc<Instance>) {
let query = GreptimeRequest {
request: Some(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "database_created_through_grpc".to_string(),
create_if_not_exists: true,
})),
let query = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "database_created_through_grpc".to_string(),
create_if_not_exists: true,
})),
};
let output = GrpcQueryHandler::do_query(instance.as_ref(), query)
});
let output = GrpcQueryHandler::do_query(instance.as_ref(), query, QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
let query = GreptimeRequest {
request: Some(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(CreateTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
column_defs: vec![
ColumnDef {
name: "a".to_string(),
datatype: ColumnDataType::String as _,
let query = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(CreateTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
column_defs: vec![
ColumnDef {
name: "a".to_string(),
datatype: ColumnDataType::String as _,
is_nullable: true,
default_constraint: vec![],
},
ColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
is_nullable: false,
default_constraint: vec![],
},
],
time_index: "ts".to_string(),
..Default::default()
})),
});
let output = GrpcQueryHandler::do_query(instance.as_ref(), query, QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
let query = Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(AlterExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
kind: Some(alter_expr::Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(ColumnDef {
name: "b".to_string(),
datatype: ColumnDataType::Int32 as _,
is_nullable: true,
default_constraint: vec![],
},
ColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
is_nullable: false,
default_constraint: vec![],
},
],
time_index: "ts".to_string(),
..Default::default()
}),
is_key: false,
}],
})),
})),
};
let output = GrpcQueryHandler::do_query(instance.as_ref(), query)
});
let output = GrpcQueryHandler::do_query(instance.as_ref(), query, QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
let query = GreptimeRequest {
request: Some(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(AlterExpr {
catalog_name: "greptime".to_string(),
schema_name: "database_created_through_grpc".to_string(),
table_name: "table_created_through_grpc".to_string(),
kind: Some(alter_expr::Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(ColumnDef {
name: "b".to_string(),
datatype: ColumnDataType::Int32 as _,
is_nullable: true,
default_constraint: vec![],
}),
is_key: false,
}],
})),
})),
})),
};
let output = GrpcQueryHandler::do_query(instance.as_ref(), query)
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
let query = GreptimeRequest {
request: Some(Request::Query(QueryRequest {
query: Some(Query::Sql("INSERT INTO database_created_through_grpc.table_created_through_grpc (a, b, ts) VALUES ('s', 1, 1672816466000)".to_string()))
}))
};
let output = GrpcQueryHandler::do_query(instance.as_ref(), query)
let query = Request::Query(QueryRequest {
query: Some(Query::Sql("INSERT INTO database_created_through_grpc.table_created_through_grpc (a, b, ts) VALUES ('s', 1, 1672816466000)".to_string()))
});
let output = GrpcQueryHandler::do_query(instance.as_ref(), query, QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
let query = GreptimeRequest {
request: Some(Request::Query(QueryRequest {
query: Some(Query::Sql(
"SELECT ts, a, b FROM database_created_through_grpc.table_created_through_grpc"
.to_string(),
)),
})),
};
let output = GrpcQueryHandler::do_query(instance.as_ref(), query)
let query = Request::Query(QueryRequest {
query: Some(Query::Sql(
"SELECT ts, a, b FROM database_created_through_grpc.table_created_through_grpc"
.to_string(),
)),
});
let output = GrpcQueryHandler::do_query(instance.as_ref(), query, QueryContext::arc())
.await
.unwrap();
let Output::Stream(stream) = output else { unreachable!() };
@@ -327,12 +311,10 @@ CREATE TABLE {table_name} (
}
async fn create_table(frontend: &Arc<Instance>, sql: String) {
let query = GreptimeRequest {
request: Some(Request::Query(QueryRequest {
query: Some(Query::Sql(sql)),
})),
};
let output = GrpcQueryHandler::do_query(frontend.as_ref(), query)
let query = Request::Query(QueryRequest {
query: Some(Query::Sql(sql)),
});
let output = GrpcQueryHandler::do_query(frontend.as_ref(), query, QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
@@ -340,7 +322,6 @@ CREATE TABLE {table_name} (
async fn test_insert_and_query_on_existing_table(instance: &Arc<Instance>, table_name: &str) {
let insert = InsertRequest {
schema_name: "public".to_string(),
table_name: table_name.to_string(),
columns: vec![
Column {
@@ -377,22 +358,18 @@ CREATE TABLE {table_name} (
..Default::default()
};
let query = GreptimeRequest {
request: Some(Request::Insert(insert)),
};
let output = GrpcQueryHandler::do_query(instance.as_ref(), query)
let query = Request::Insert(insert);
let output = GrpcQueryHandler::do_query(instance.as_ref(), query, QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(8)));
let query = GreptimeRequest {
request: Some(Request::Query(QueryRequest {
query: Some(Query::Sql(format!(
"SELECT ts, a FROM {table_name} ORDER BY ts"
))),
})),
};
let output = GrpcQueryHandler::do_query(instance.as_ref(), query)
let query = Request::Query(QueryRequest {
query: Some(Query::Sql(format!(
"SELECT ts, a FROM {table_name} ORDER BY ts"
))),
});
let output = GrpcQueryHandler::do_query(instance.as_ref(), query, QueryContext::arc())
.await
.unwrap();
let Output::Stream(stream) = output else { unreachable!() };
@@ -461,7 +438,6 @@ CREATE TABLE {table_name} (
async fn test_insert_and_query_on_auto_created_table(instance: &Arc<Instance>) {
let insert = InsertRequest {
schema_name: "public".to_string(),
table_name: "auto_created_table".to_string(),
columns: vec![
Column {
@@ -490,16 +466,13 @@ CREATE TABLE {table_name} (
};
// Test auto create not existed table upon insertion.
let query = GreptimeRequest {
request: Some(Request::Insert(insert)),
};
let output = GrpcQueryHandler::do_query(instance.as_ref(), query)
let query = Request::Insert(insert);
let output = GrpcQueryHandler::do_query(instance.as_ref(), query, QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(3)));
let insert = InsertRequest {
schema_name: "public".to_string(),
table_name: "auto_created_table".to_string(),
columns: vec![
Column {
@@ -528,22 +501,18 @@ CREATE TABLE {table_name} (
};
// Test auto add not existed column upon insertion.
let query = GreptimeRequest {
request: Some(Request::Insert(insert)),
};
let output = GrpcQueryHandler::do_query(instance.as_ref(), query)
let query = Request::Insert(insert);
let output = GrpcQueryHandler::do_query(instance.as_ref(), query, QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(3)));
let query = GreptimeRequest {
request: Some(Request::Query(QueryRequest {
query: Some(Query::Sql(
"SELECT ts, a, b FROM auto_created_table".to_string(),
)),
})),
};
let output = GrpcQueryHandler::do_query(instance.as_ref(), query)
let query = Request::Query(QueryRequest {
query: Some(Query::Sql(
"SELECT ts, a, b FROM auto_created_table".to_string(),
)),
});
let output = GrpcQueryHandler::do_query(instance.as_ref(), query, QueryContext::arc())
.await
.unwrap();
let Output::Stream(stream) = output else { unreachable!() };

View File

@@ -16,15 +16,20 @@ use async_trait::async_trait;
use common_error::prelude::BoxedError;
use servers::influxdb::InfluxdbRequest;
use servers::query_handler::InfluxdbLineProtocolHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
use crate::instance::Instance;
#[async_trait]
impl InfluxdbLineProtocolHandler for Instance {
async fn exec(&self, request: &InfluxdbRequest) -> servers::error::Result<()> {
async fn exec(
&self,
request: &InfluxdbRequest,
ctx: QueryContextRef,
) -> servers::error::Result<()> {
let requests = request.try_into()?;
self.handle_inserts(requests)
self.handle_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(servers::error::ExecuteGrpcQuerySnafu)?;
@@ -68,10 +73,9 @@ monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100
monitor1,host=host2 memory=1027 1663840496400340001";
let request = InfluxdbRequest {
precision: None,
db: "public".to_string(),
lines: lines.to_string(),
};
instance.exec(&request).await.unwrap();
instance.exec(&request, QueryContext::arc()).await.unwrap();
let mut output = instance
.do_query(

View File

@@ -17,6 +17,7 @@ use common_error::prelude::BoxedError;
use servers::error as server_error;
use servers::opentsdb::codec::DataPoint;
use servers::query_handler::OpentsdbProtocolHandler;
use session::context::QueryContext;
use snafu::prelude::*;
use crate::instance::Instance;
@@ -25,7 +26,7 @@ use crate::instance::Instance;
impl OpentsdbProtocolHandler for Instance {
async fn exec(&self, data_point: &DataPoint) -> server_error::Result<()> {
let request = data_point.as_grpc_insert();
self.handle_insert(request)
self.handle_insert(request, QueryContext::arc())
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {

View File

@@ -15,7 +15,7 @@
use api::prometheus::remote::read_request::ResponseType;
use api::prometheus::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use api::v1::greptime_request::Request;
use api::v1::{query_request, GreptimeRequest, QueryRequest};
use api::v1::{query_request, QueryRequest};
use async_trait::async_trait;
use common_error::prelude::BoxedError;
use common_query::Output;
@@ -26,6 +26,7 @@ use servers::error::{self, Result as ServerResult};
use servers::prometheus::{self, Metrics};
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use crate::instance::Instance;
@@ -74,26 +75,24 @@ async fn to_query_result(table_name: &str, output: Output) -> ServerResult<Query
impl Instance {
async fn handle_remote_queries(
&self,
db: &str,
ctx: QueryContextRef,
queries: &[Query],
) -> ServerResult<Vec<(String, Output)>> {
let mut results = Vec::with_capacity(queries.len());
for query in queries {
let (table_name, sql) = prometheus::query_to_sql(db, query)?;
let (table_name, sql) = prometheus::query_to_sql(query)?;
logging::debug!(
"prometheus remote read, table: {}, sql: {}",
table_name,
sql
);
let query = GreptimeRequest {
request: Some(Request::Query(QueryRequest {
query: Some(query_request::Query::Sql(sql.to_string())),
})),
};
let query = Request::Query(QueryRequest {
query: Some(query_request::Query::Sql(sql.to_string())),
});
let output = self
.do_query(query)
.do_query(query, ctx.clone())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
@@ -106,22 +105,24 @@ impl Instance {
#[async_trait]
impl PrometheusProtocolHandler for Instance {
async fn write(&self, database: &str, request: WriteRequest) -> ServerResult<()> {
let requests = prometheus::to_grpc_insert_requests(database, request.clone())?;
self.handle_inserts(requests)
async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> ServerResult<()> {
let requests = prometheus::to_grpc_insert_requests(request.clone())?;
self.handle_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
Ok(())
}
async fn read(&self, database: &str, request: ReadRequest) -> ServerResult<PrometheusResponse> {
async fn read(
&self,
request: ReadRequest,
ctx: QueryContextRef,
) -> ServerResult<PrometheusResponse> {
let response_type = negotiate_response_type(&request.accepted_response_types)?;
// TODO(dennis): use read_hints to speedup query if possible
let results = self
.handle_remote_queries(database, &request.queries)
.await?;
let results = self.handle_remote_queries(ctx, &request.queries).await?;
match response_type {
ResponseType::Samples => {
@@ -159,6 +160,7 @@ mod tests {
use api::prometheus::remote::label_matcher::Type as MatcherType;
use api::prometheus::remote::{Label, LabelMatcher, Sample};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContext;
@@ -190,18 +192,19 @@ mod tests {
};
let db = "prometheus";
let ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, db));
assert!(SqlQueryHandler::do_query(
instance.as_ref(),
"CREATE DATABASE IF NOT EXISTS prometheus",
QueryContext::arc()
ctx.clone(),
)
.await
.get(0)
.unwrap()
.is_ok());
instance.write(db, write_request).await.unwrap();
instance.write(write_request, ctx.clone()).await.unwrap();
let read_request = ReadRequest {
queries: vec![
@@ -236,7 +239,7 @@ mod tests {
..Default::default()
};
let resp = instance.read(db, read_request).await.unwrap();
let resp = instance.read(read_request, ctx).await.unwrap();
assert_eq!(resp.content_type, "application/x-protobuf");
assert_eq!(resp.content_encoding, "snappy");
let body = prometheus::snappy_decompress(&resp.body).unwrap();

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use api::v1::GreptimeRequest;
use api::v1::greptime_request::Request as GreptimeRequest;
use async_trait::async_trait;
use common_query::Output;
use datanode::error::Error as DatanodeError;
@@ -77,9 +77,9 @@ impl StandaloneGrpcQueryHandler {
impl GrpcQueryHandler for StandaloneGrpcQueryHandler {
type Error = error::Error;
async fn do_query(&self, query: GreptimeRequest) -> Result<Output> {
async fn do_query(&self, query: GreptimeRequest, ctx: QueryContextRef) -> Result<Output> {
self.0
.do_query(query)
.do_query(query, ctx)
.await
.context(error::InvokeDatanodeSnafu)
}

View File

@@ -20,7 +20,6 @@ use async_trait::async_trait;
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use catalog::remote::KvBackendRef;
use client::Database;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::prelude::BoxedError;
use common_query::error::Result as QueryResult;
use common_query::logical_plan::Expr;
@@ -117,15 +116,16 @@ impl Table for DistTable {
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let table_name = &self.table_name;
let mut partition_execs = Vec::with_capacity(datanodes.len());
for (datanode, _regions) in datanodes.iter() {
let client = self.datanode_clients.get_client(datanode).await;
let db = Database::new(&self.table_name.schema_name, client);
let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client);
let datanode_instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db);
// TODO(LFC): Pass in "regions" when Datanode supports multi regions for a table.
partition_execs.push(Arc::new(PartitionExec {
table_name: self.table_name.clone(),
table_name: table_name.clone(),
datanode_instance,
projection: projection.cloned(),
filters: filters.to_vec(),
@@ -258,10 +258,8 @@ impl DistTable {
}
);
for datanode in leaders {
let db = Database::new(
DEFAULT_CATALOG_NAME,
self.datanode_clients.get_client(&datanode).await,
);
let client = self.datanode_clients.get_client(&datanode).await;
let db = Database::with_client(client);
debug!("Sending {:?} to {:?}", expr, db);
let result = db
.alter(expr.clone())
@@ -405,6 +403,7 @@ mod test {
use partition::range::RangePartitionRule;
use partition::route::TableRoutes;
use partition::PartitionRuleRef;
use session::context::QueryContext;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use store_api::storage::RegionNumber;
@@ -921,13 +920,15 @@ mod test {
},
];
let request = InsertRequest {
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
columns,
row_count,
region_number: 0,
};
dn_instance.handle_insert(request).await.unwrap();
dn_instance
.handle_insert(request, QueryContext::arc())
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]

View File

@@ -35,13 +35,15 @@ impl DistTable {
&self,
inserts: HashMap<RegionNumber, InsertRequest>,
) -> Result<Output> {
let table_name = &self.table_name;
let route = self
.partition_manager
.find_table_route(&self.table_name)
.await
.with_context(|_| FindTableRouteSnafu {
table_name: self.table_name.to_string(),
table_name: table_name.to_string(),
})?;
let mut joins = Vec::with_capacity(inserts.len());
for (region_id, insert) in inserts {
let datanode = route
@@ -57,7 +59,7 @@ impl DistTable {
.context(error::FindDatanodeSnafu { region: region_id })?;
let client = self.datanode_clients.get_client(&datanode).await;
let db = Database::new(&self.table_name.schema_name, client);
let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client);
let instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db);
// TODO(fys): a separate runtime should be used here.
@@ -136,7 +138,6 @@ fn to_grpc_insert_request(
let table_name = insert.table_name.clone();
let (columns, row_count) = insert_request_to_insert_batch(&insert)?;
Ok(GrpcInsertRequest {
schema_name: insert.schema_name,
table_name,
region_number,
columns,

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::router_server::RouterServer;
use api::v1::meta::store_server::StoreServer;
@@ -24,6 +26,7 @@ use crate::election::etcd::EtcdElection;
use crate::metasrv::{MetaSrv, MetaSrvOptions};
use crate::service::admin;
use crate::service::store::etcd::EtcdStore;
use crate::service::store::memory::MemStore;
use crate::{error, Result};
// Bootstrap the rpc server to serve incoming request
@@ -58,10 +61,16 @@ pub fn router(meta_srv: MetaSrv) -> Router {
}
pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result<MetaSrv> {
let kv_store = EtcdStore::with_endpoints([&opts.store_addr]).await?;
let election = EtcdElection::with_endpoints(&opts.server_addr, [&opts.store_addr]).await?;
let (kv_store, election) = if opts.use_memory_store {
(Arc::new(MemStore::new()) as _, None)
} else {
(
EtcdStore::with_endpoints([&opts.store_addr]).await?,
Some(EtcdElection::with_endpoints(&opts.server_addr, [&opts.store_addr]).await?),
)
};
let selector = opts.selector.clone().into();
let meta_srv = MetaSrv::new(opts, kv_store, Some(selector), Some(election), None).await;
let meta_srv = MetaSrv::new(opts, kv_store, Some(selector), election, None).await;
meta_srv.start().await;
Ok(meta_srv)
}

View File

@@ -40,6 +40,7 @@ pub struct MetaSrvOptions {
pub store_addr: String,
pub datanode_lease_secs: i64,
pub selector: SelectorType,
pub use_memory_store: bool,
}
impl Default for MetaSrvOptions {
@@ -50,6 +51,7 @@ impl Default for MetaSrvOptions {
store_addr: "127.0.0.1:2379".to_string(),
datanode_lease_secs: 15,
selector: SelectorType::default(),
use_memory_store: false,
}
}
}

View File

@@ -16,7 +16,6 @@ use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_telemetry::logging;
use datatypes::schema::SchemaRef;
@@ -489,9 +488,9 @@ impl<S: StorageEngine> MitoEngineInner<S> {
}
async fn alter_table(&self, _ctx: &EngineContext, req: AlterTableRequest) -> Result<TableRef> {
let catalog_name = req.catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
let schema_name = req.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME);
let table_name = &req.table_name.clone();
let catalog_name = &req.catalog_name;
let schema_name = &req.schema_name;
let table_name = &req.table_name;
if let AlterKind::RenameTable { new_table_name } = &req.alter_kind {
let table_ref = TableReference {
@@ -562,6 +561,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
#[cfg(test)]
mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::physical_plan::SessionContext;
use common_recordbatch::util;
use datatypes::prelude::ConcreteDataType;
@@ -982,8 +982,8 @@ mod tests {
fn new_add_columns_req(new_tag: &ColumnSchema, new_field: &ColumnSchema) -> AlterTableRequest {
AlterTableRequest {
catalog_name: None,
schema_name: None,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
alter_kind: AlterKind::AddColumns {
columns: vec![
@@ -1061,8 +1061,8 @@ mod tests {
// Then remove memory and my_field from the table.
let req = AlterTableRequest {
catalog_name: None,
schema_name: None,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
alter_kind: AlterKind::DropColumns {
names: vec![String::from("memory"), String::from("my_field")],
@@ -1116,8 +1116,8 @@ mod tests {
.expect("create table must succeed");
// test renaming a table with an existing name.
let req = AlterTableRequest {
catalog_name: None,
schema_name: None,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
alter_kind: AlterKind::RenameTable {
new_table_name: another_name.to_string(),
@@ -1132,8 +1132,8 @@ mod tests {
let new_table_name = "test_table";
// test rename table
let req = AlterTableRequest {
catalog_name: None,
schema_name: None,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
alter_kind: AlterKind::RenameTable {
new_table_name: new_table_name.to_string(),

View File

@@ -112,8 +112,7 @@ impl DfContextProviderAdapter {
impl ContextProvider for DfContextProviderAdapter {
fn get_table_provider(&self, name: TableReference) -> DfResult<Arc<dyn TableSource>> {
let schema = self.query_ctx.current_schema();
self.state.get_table_provider(schema.as_deref(), name)
self.state.get_table_provider(self.query_ctx.clone(), name)
}
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {

View File

@@ -35,6 +35,7 @@ use datafusion_optimizer::optimizer::Optimizer;
use datafusion_sql::planner::ContextProvider;
use datatypes::arrow::datatypes::DataType;
use promql::extension_plan::PromExtensionPlanner;
use session::context::QueryContextRef;
use crate::datafusion::DfCatalogListAdapter;
use crate::optimizer::TypeConversionRule;
@@ -115,15 +116,19 @@ impl QueryEngineState {
pub(crate) fn get_table_provider(
&self,
schema: Option<&str>,
query_ctx: QueryContextRef,
name: TableReference,
) -> DfResult<Arc<dyn TableSource>> {
let name = if let (Some(schema), TableReference::Bare { table }) = (schema, name) {
TableReference::Partial { schema, table }
let state = self.df_context.state();
if let TableReference::Bare { table } = name {
let name = TableReference::Partial {
schema: &query_ctx.current_schema(),
table,
};
state.get_table_provider(name)
} else {
name
};
self.df_context.state().get_table_provider(name)
state.get_table_provider(name)
}
}
pub(crate) fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_query::Output;
use common_recordbatch::RecordBatches;
use datatypes::prelude::*;
@@ -24,10 +24,10 @@ use datatypes::vectors::{Helper, StringVector};
use once_cell::sync::Lazy;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::describe::DescribeTable;
use sql::statements::explain::Explain;
use sql::statements::show::{ShowDatabases, ShowKind, ShowTables};
use sql::statements::statement::Statement;
use table::TableRef;
use crate::error::{self, Result};
use crate::parser::QueryStatement;
@@ -129,15 +129,11 @@ pub fn show_tables(
let schema = if let Some(database) = stmt.database {
database
} else {
query_ctx
.current_schema()
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string())
query_ctx.current_schema()
};
// TODO(sunng87): move this function into query_ctx
let catalog = query_ctx.current_catalog();
let catalog = catalog.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
let schema = catalog_manager
.schema(catalog, &schema)
.schema(&query_ctx.current_catalog(), &schema)
.context(error::CatalogSnafu)?
.context(error::SchemaNotFoundSnafu { schema })?;
let mut tables = schema.table_names().context(error::CatalogSnafu)?;
@@ -170,24 +166,7 @@ pub async fn explain(
query_engine.execute(&plan).await
}
pub fn describe_table(stmt: DescribeTable, catalog_manager: CatalogManagerRef) -> Result<Output> {
let catalog = stmt.catalog_name.as_str();
let schema = stmt.schema_name.as_str();
catalog_manager
.catalog(catalog)
.context(error::CatalogSnafu)?
.context(error::CatalogNotFoundSnafu { catalog })?;
let schema = catalog_manager
.schema(catalog, schema)
.context(error::CatalogSnafu)?
.context(error::SchemaNotFoundSnafu { schema })?;
let table = schema
.table(&stmt.table_name)
.context(error::CatalogSnafu)?
.context(error::TableNotFoundSnafu {
table: &stmt.table_name,
})?;
pub fn describe_table(table: TableRef) -> Result<Output> {
let table_info = table.table_info();
let columns_schemas = table_info.meta.schema.column_schemas();
let columns = vec![
@@ -263,10 +242,6 @@ fn describe_column_semantic_types(
mod test {
use std::sync::Arc;
use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogManagerRef, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
use common_query::Output;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_time::timestamp::TimeUnit;
@@ -274,8 +249,8 @@ mod test {
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt32Vector, VectorRef};
use snafu::ResultExt;
use sql::statements::describe::DescribeTable;
use table::test_util::MemTable;
use table::TableRef;
use crate::error;
use crate::error::Result;
@@ -284,94 +259,8 @@ mod test {
SEMANTIC_TYPE_TIME_INDEX, SEMANTIC_TYPE_VALUE,
};
#[test]
fn test_describe_table_catalog_not_found() -> Result<()> {
let catalog_name = DEFAULT_CATALOG_NAME.to_string();
let schema_name = DEFAULT_SCHEMA_NAME.to_string();
let table_name = "test_table";
let table_schema = SchemaRef::new(Schema::new(vec![ColumnSchema::new(
"test_col",
ConcreteDataType::uint32_datatype(),
false,
)]));
let data = vec![Arc::new(UInt32Vector::from_vec(vec![0])) as _];
let catalog_manager =
prepare_describe_table(&catalog_name, &schema_name, table_name, table_schema, data);
let stmt = DescribeTable::new("unknown".to_string(), schema_name, table_name.to_string());
let err = describe_table(stmt, catalog_manager).err().unwrap();
let err = err.as_any().downcast_ref::<error::Error>().unwrap();
if let error::Error::CatalogNotFound { catalog, .. } = err {
assert_eq!(catalog, "unknown");
} else {
panic!("describe table returned incorrect error");
}
Ok(())
}
#[test]
fn test_describe_table_schema_not_found() -> Result<()> {
let catalog_name = DEFAULT_CATALOG_NAME.to_string();
let schema_name = DEFAULT_SCHEMA_NAME.to_string();
let table_name = "test_table";
let table_schema = SchemaRef::new(Schema::new(vec![ColumnSchema::new(
"test_col",
ConcreteDataType::uint32_datatype(),
false,
)]));
let data = vec![Arc::new(UInt32Vector::from_vec(vec![0])) as _];
let catalog_manager =
prepare_describe_table(&catalog_name, &schema_name, table_name, table_schema, data);
let stmt = DescribeTable::new(catalog_name, "unknown".to_string(), table_name.to_string());
let err = describe_table(stmt, catalog_manager).err().unwrap();
let err = err.as_any().downcast_ref::<error::Error>().unwrap();
if let error::Error::SchemaNotFound { schema, .. } = err {
assert_eq!(schema, "unknown");
} else {
panic!("describe table returned incorrect error");
}
Ok(())
}
#[test]
fn test_describe_table_table_not_found() -> Result<()> {
let catalog_name = DEFAULT_CATALOG_NAME.to_string();
let schema_name = DEFAULT_SCHEMA_NAME.to_string();
let table_name = "test_table";
let table_schema = SchemaRef::new(Schema::new(vec![ColumnSchema::new(
"test_col",
ConcreteDataType::uint32_datatype(),
false,
)]));
let data = vec![Arc::new(UInt32Vector::from_vec(vec![0])) as _];
let catalog_manager =
prepare_describe_table(&catalog_name, &schema_name, table_name, table_schema, data);
let stmt = DescribeTable::new(catalog_name, schema_name, "unknown".to_string());
let err = describe_table(stmt, catalog_manager).err().unwrap();
let err = err.as_any().downcast_ref::<error::Error>().unwrap();
if let error::Error::TableNotFound { table, .. } = err {
assert_eq!(table, "unknown");
} else {
panic!("describe table returned incorrect error");
}
Ok(())
}
#[test]
fn test_describe_table_multiple_columns() -> Result<()> {
let catalog_name = DEFAULT_CATALOG_NAME;
let schema_name = DEFAULT_SCHEMA_NAME;
let table_name = "test_table";
let schema = vec![
ColumnSchema::new("t1", ConcreteDataType::uint32_datatype(), true),
@@ -401,38 +290,23 @@ mod test {
])) as _,
];
describe_table_test_by_schema(
catalog_name,
schema_name,
table_name,
schema,
data,
expected_columns,
)
describe_table_test_by_schema(table_name, schema, data, expected_columns)
}
fn describe_table_test_by_schema(
catalog_name: &str,
schema_name: &str,
table_name: &str,
schema: Vec<ColumnSchema>,
data: Vec<VectorRef>,
expected_columns: Vec<VectorRef>,
) -> Result<()> {
let table_schema = SchemaRef::new(Schema::new(schema));
let catalog_manager =
prepare_describe_table(catalog_name, schema_name, table_name, table_schema, data);
let table = prepare_describe_table(table_name, table_schema, data);
let expected =
RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), expected_columns)
.context(error::CreateRecordBatchSnafu)?;
let stmt = DescribeTable::new(
catalog_name.to_string(),
schema_name.to_string(),
table_name.to_string(),
);
if let Output::RecordBatches(res) = describe_table(stmt, catalog_manager)? {
if let Output::RecordBatches(res) = describe_table(table)? {
assert_eq!(res.take(), expected.take());
} else {
panic!("describe table must return record batch");
@@ -442,28 +316,11 @@ mod test {
}
fn prepare_describe_table(
catalog_name: &str,
schema_name: &str,
table_name: &str,
table_schema: SchemaRef,
data: Vec<VectorRef>,
) -> CatalogManagerRef {
) -> TableRef {
let record_batch = RecordBatch::new(table_schema, data).unwrap();
let table = Arc::new(MemTable::new(table_name, record_batch));
let schema_provider = Arc::new(MemorySchemaProvider::new());
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
let catalog_manager = Arc::new(MemoryCatalogManager::default());
schema_provider
.register_table(table_name.to_string(), table)
.unwrap();
catalog_provider
.register_schema(schema_name.to_string(), schema_provider)
.unwrap();
catalog_manager
.register_catalog(catalog_name.to_string(), catalog_provider)
.unwrap();
catalog_manager
Arc::new(MemTable::new(table_name, record_batch))
}
}

View File

@@ -17,7 +17,7 @@ mod stream;
use std::pin::Pin;
use std::sync::Arc;
use api::v1::GreptimeRequest;
use api::v1::{GreptimeRequest, RequestHeader};
use arrow_flight::flight_service_server::FlightService;
use arrow_flight::{
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
@@ -29,7 +29,8 @@ use common_query::Output;
use common_runtime::Runtime;
use futures::Stream;
use prost::Message;
use snafu::ResultExt;
use session::context::{QueryContext, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use tokio::sync::oneshot;
use tonic::{Request, Response, Status, Streaming};
@@ -92,6 +93,11 @@ impl FlightService for FlightHandler {
let request =
GreptimeRequest::decode(ticket.as_slice()).context(error::InvalidFlightTicketSnafu)?;
let query = request.request.context(error::InvalidQuerySnafu {
reason: "Expecting non-empty GreptimeRequest.",
})?;
let query_ctx = create_query_context(request.header.as_ref());
let (tx, rx) = oneshot::channel();
let handler = self.handler.clone();
@@ -99,7 +105,7 @@ impl FlightService for FlightHandler {
// 1. prevent the execution from being cancelled unexpected by Tonic runtime;
// 2. avoid the handler blocks the gRPC runtime incidentally.
self.runtime.spawn(async move {
let result = handler.do_query(request).await;
let result = handler.do_query(query, query_ctx).await;
// Ignore the sending result.
// Usually an error indicates the rx at Tonic side is dropped (due to request timeout).
@@ -166,3 +172,17 @@ fn to_flight_data_stream(output: Output) -> TonicStream<FlightData> {
}
}
}
fn create_query_context(header: Option<&RequestHeader>) -> QueryContextRef {
let ctx = QueryContext::arc();
if let Some(header) = header {
if !header.catalog.is_empty() {
ctx.set_current_catalog(&header.catalog);
}
if !header.schema.is_empty() {
ctx.set_current_schema(&header.schema);
}
};
ctx
}

View File

@@ -30,7 +30,6 @@ use axum::body::BoxBody;
use axum::error_handling::HandleErrorLayer;
use axum::response::{Html, Json};
use axum::{routing, BoxError, Extension, Router};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
@@ -71,10 +70,7 @@ pub(crate) fn query_context_from_db(
let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db);
match query_handler.is_valid_schema(catalog, schema) {
Ok(true) => Ok(Arc::new(QueryContext::with(
catalog.to_owned(),
schema.to_owned(),
))),
Ok(true) => Ok(Arc::new(QueryContext::with(catalog, schema))),
Ok(false) => Err(JsonResponse::with_error(
format!("Database not found: {db}"),
StatusCode::DatabaseNotFound,
@@ -85,10 +81,7 @@ pub(crate) fn query_context_from_db(
)),
}
} else {
Ok(Arc::new(QueryContext::with(
DEFAULT_CATALOG_NAME.to_owned(),
DEFAULT_SCHEMA_NAME.to_owned(),
)))
Ok(QueryContext::arc())
}
}

View File

@@ -13,11 +13,13 @@
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_grpc::writer::Precision;
use session::context::QueryContext;
use crate::error::{Result, TimePrecisionSnafu};
use crate::influxdb::InfluxdbRequest;
@@ -32,17 +34,15 @@ pub async fn influxdb_write(
let db = params
.remove("db")
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
let ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, &db));
let precision = params
.get("precision")
.map(|val| parse_time_precision(val))
.transpose()?;
let request = InfluxdbRequest {
precision,
lines,
db,
};
handler.exec(&request).await?;
let request = InfluxdbRequest { precision, lines };
handler.exec(&request, ctx).await?;
Ok((StatusCode::NO_CONTENT, ()))
}

View File

@@ -12,15 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::prometheus::remote::{ReadRequest, WriteRequest};
use axum::extract::{Query, RawBody, State};
use axum::http::{header, StatusCode};
use axum::response::IntoResponse;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use hyper::Body;
use prost::Message;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use session::context::QueryContext;
use snafu::prelude::*;
use crate::error::{self, Result};
@@ -48,10 +51,13 @@ pub async fn remote_write(
) -> Result<(StatusCode, ())> {
let request = decode_remote_write_request(body).await?;
handler
.write(params.db.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME), request)
.await?;
let ctx = if let Some(db) = params.db {
Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, &db))
} else {
QueryContext::arc()
};
handler.write(request, ctx).await?;
Ok((StatusCode::NO_CONTENT, ()))
}
@@ -76,9 +82,13 @@ pub async fn remote_read(
) -> Result<PrometheusResponse> {
let request = decode_remote_read_request(body).await?;
handler
.read(params.db.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME), request)
.await
let ctx = if let Some(db) = params.db {
Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, &db))
} else {
QueryContext::arc()
};
handler.read(request, ctx).await
}
async fn decode_remote_write_request(body: Body) -> Result<WriteRequest> {

View File

@@ -27,7 +27,6 @@ pub const DEFAULT_TIME_PRECISION: Precision = Precision::Nanosecond;
#[derive(Debug)]
pub struct InfluxdbRequest {
pub precision: Option<Precision>,
pub db: String,
pub lines: String,
}
@@ -37,8 +36,6 @@ impl TryFrom<&InfluxdbRequest> for Vec<GrpcInsertRequest> {
type Error = Error;
fn try_from(value: &InfluxdbRequest) -> Result<Self, Self::Error> {
let schema_name = value.db.to_string();
let mut writers: HashMap<TableName, LinesWriter> = HashMap::new();
let lines = parse_lines(&value.lines)
.collect::<influxdb_line_protocol::Result<Vec<_>>>()
@@ -111,7 +108,6 @@ impl TryFrom<&InfluxdbRequest> for Vec<GrpcInsertRequest> {
.map(|(table_name, writer)| {
let (columns, row_count) = writer.finish();
GrpcInsertRequest {
schema_name: schema_name.clone(),
table_name,
region_number: 0,
columns,
@@ -140,7 +136,6 @@ monitor2,host=host3 cpu=66.5 1663840496100023102
monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
let influxdb_req = &InfluxdbRequest {
db: "public".to_string(),
precision: None,
lines: lines.to_string(),
};
@@ -149,7 +144,6 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003";
assert_eq!(2, requests.len());
for request in requests {
assert_eq!("public", request.schema_name);
match &request.table_name[..] {
"monitor1" => assert_monitor_1(&request.columns),
"monitor2" => assert_monitor_2(&request.columns),

View File

@@ -260,9 +260,7 @@ fn check_others(query: &str, query_ctx: QueryContextRef) -> Option<Output> {
let recordbatches = if SELECT_VERSION_PATTERN.is_match(query) {
Some(select_function("version()", MYSQL_VERSION))
} else if SELECT_DATABASE_PATTERN.is_match(query) {
let schema = query_ctx
.current_schema()
.unwrap_or_else(|| "NULL".to_string());
let schema = query_ctx.current_schema();
Some(select_function("database()", &schema))
} else if SELECT_TIME_DIFF_FUNC_PATTERN.is_match(query) {
Some(select_function(

View File

@@ -14,7 +14,6 @@
use api::v1::column::SemanticType;
use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest};
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use crate::error::{self, Result};
@@ -126,7 +125,6 @@ impl DataPoint {
}
pub fn as_grpc_insert(&self) -> GrpcInsertRequest {
let schema_name = DEFAULT_SCHEMA_NAME.to_string();
let mut columns = Vec::with_capacity(2 + self.tags.len());
let ts_column = Column {
@@ -167,7 +165,6 @@ impl DataPoint {
}
GrpcInsertRequest {
schema_name,
table_name: self.metric.clone(),
region_number: 0,
columns,

View File

@@ -41,7 +41,7 @@ pub struct Metrics {
/// Generate a sql from a remote request query
/// TODO(dennis): maybe use logical plan in future to prevent sql injection
pub fn query_to_sql(db: &str, q: &Query) -> Result<(String, String)> {
pub fn query_to_sql(q: &Query) -> Result<(String, String)> {
let start_timestamp_ms = q.start_timestamp_ms;
let end_timestamp_ms = q.end_timestamp_ms;
@@ -100,9 +100,7 @@ pub fn query_to_sql(db: &str, q: &Query) -> Result<(String, String)> {
Ok((
table_name.to_string(),
format!(
"select * from {db}.{table_name} where {conditions} order by {TIMESTAMP_COLUMN_NAME}",
),
format!("select * from {table_name} where {conditions} order by {TIMESTAMP_COLUMN_NAME}",),
))
}
@@ -284,21 +282,12 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Ve
Ok(timeseries_map.into_values().collect())
}
pub fn to_grpc_insert_requests(
database: &str,
mut request: WriteRequest,
) -> Result<Vec<GrpcInsertRequest>> {
pub fn to_grpc_insert_requests(mut request: WriteRequest) -> Result<Vec<GrpcInsertRequest>> {
let timeseries = std::mem::take(&mut request.timeseries);
timeseries
.into_iter()
.map(|timeseries| to_grpc_insert_request(database, timeseries))
.collect()
timeseries.into_iter().map(to_grpc_insert_request).collect()
}
fn to_grpc_insert_request(database: &str, mut timeseries: TimeSeries) -> Result<GrpcInsertRequest> {
let schema_name = database.to_string();
fn to_grpc_insert_request(mut timeseries: TimeSeries) -> Result<GrpcInsertRequest> {
// TODO(dennis): save exemplars into a column
let labels = std::mem::take(&mut timeseries.labels);
let samples = std::mem::take(&mut timeseries.samples);
@@ -355,7 +344,6 @@ fn to_grpc_insert_request(database: &str, mut timeseries: TimeSeries) -> Result<
}
Ok(GrpcInsertRequest {
schema_name,
table_name: table_name.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in timeseries",
})?,
@@ -467,7 +455,7 @@ mod tests {
matchers: vec![],
..Default::default()
};
let err = query_to_sql("public", &q).unwrap_err();
let err = query_to_sql(&q).unwrap_err();
assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
let q = Query {
@@ -480,9 +468,9 @@ mod tests {
}],
..Default::default()
};
let (table, sql) = query_to_sql("public", &q).unwrap();
let (table, sql) = query_to_sql(&q).unwrap();
assert_eq!("test", table);
assert_eq!("select * from public.test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 order by greptime_timestamp", sql);
assert_eq!("select * from test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 order by greptime_timestamp", sql);
let q = Query {
start_timestamp_ms: 1000,
@@ -506,9 +494,9 @@ mod tests {
],
..Default::default()
};
let (table, sql) = query_to_sql("public", &q).unwrap();
let (table, sql) = query_to_sql(&q).unwrap();
assert_eq!("test", table);
assert_eq!("select * from public.test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 AND job~'*prom*' AND instance!='localhost' order by greptime_timestamp", sql);
assert_eq!("select * from test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 AND job~'*prom*' AND instance!='localhost' order by greptime_timestamp", sql);
}
#[test]
@@ -518,11 +506,8 @@ mod tests {
..Default::default()
};
let exprs = to_grpc_insert_requests("prometheus", write_request).unwrap();
let exprs = to_grpc_insert_requests(write_request).unwrap();
assert_eq!(3, exprs.len());
assert_eq!("prometheus", exprs[0].schema_name);
assert_eq!("prometheus", exprs[1].schema_name);
assert_eq!("prometheus", exprs[2].schema_name);
assert_eq!("metric1", exprs[0].table_name);
assert_eq!("metric2", exprs[1].table_name);
assert_eq!("metric3", exprs[2].table_name);

View File

@@ -20,6 +20,7 @@ use std::sync::Arc;
use api::prometheus::remote::{ReadRequest, WriteRequest};
use async_trait::async_trait;
use common_query::Output;
use session::context::QueryContextRef;
use crate::error::Result;
use crate::influxdb::InfluxdbRequest;
@@ -51,7 +52,7 @@ pub trait ScriptHandler {
pub trait InfluxdbLineProtocolHandler {
/// A successful request will not return a response.
/// Only on error will the socket return a line of data.
async fn exec(&self, request: &InfluxdbRequest) -> Result<()>;
async fn exec(&self, request: &InfluxdbRequest, ctx: QueryContextRef) -> Result<()>;
}
#[async_trait]
@@ -70,9 +71,9 @@ pub struct PrometheusResponse {
#[async_trait]
pub trait PrometheusProtocolHandler {
/// Handling prometheus remote write requests
async fn write(&self, database: &str, request: WriteRequest) -> Result<()>;
async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> Result<()>;
/// Handling prometheus remote read requests
async fn read(&self, database: &str, request: ReadRequest) -> Result<PrometheusResponse>;
async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PrometheusResponse>;
/// Handling push gateway requests
async fn ingest_metrics(&self, metrics: Metrics) -> Result<()>;
}

View File

@@ -14,10 +14,11 @@
use std::sync::Arc;
use api::v1::GreptimeRequest;
use api::v1::greptime_request::Request as GreptimeRequest;
use async_trait::async_trait;
use common_error::prelude::*;
use common_query::Output;
use session::context::QueryContextRef;
use crate::error::{self, Result};
@@ -28,7 +29,11 @@ pub type ServerGrpcQueryHandlerRef = GrpcQueryHandlerRef<error::Error>;
pub trait GrpcQueryHandler {
type Error: ErrorExt;
async fn do_query(&self, query: GreptimeRequest) -> std::result::Result<Output, Self::Error>;
async fn do_query(
&self,
query: GreptimeRequest,
ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error>;
}
pub struct ServerGrpcQueryHandlerAdaptor<E>(GrpcQueryHandlerRef<E>);
@@ -46,9 +51,9 @@ where
{
type Error = error::Error;
async fn do_query(&self, query: GreptimeRequest) -> Result<Output> {
async fn do_query(&self, query: GreptimeRequest, ctx: QueryContextRef) -> Result<Output> {
self.0
.do_query(query)
.do_query(query, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)

View File

@@ -35,11 +35,11 @@ struct DummyInstance {
#[async_trait]
impl InfluxdbLineProtocolHandler for DummyInstance {
async fn exec(&self, request: &InfluxdbRequest) -> Result<()> {
async fn exec(&self, request: &InfluxdbRequest, ctx: QueryContextRef) -> Result<()> {
let requests: Vec<InsertRequest> = request.try_into()?;
for expr in requests {
let _ = self.tx.send((expr.schema_name, expr.table_name)).await;
let _ = self.tx.send((ctx.current_schema(), expr.table_name)).await;
}
Ok(())

View File

@@ -37,18 +37,18 @@ struct DummyInstance {
#[async_trait]
impl PrometheusProtocolHandler for DummyInstance {
async fn write(&self, db: &str, request: WriteRequest) -> Result<()> {
async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> Result<()> {
let _ = self
.tx
.send((db.to_string(), request.encode_to_vec()))
.send((ctx.current_schema(), request.encode_to_vec()))
.await;
Ok(())
}
async fn read(&self, db: &str, request: ReadRequest) -> Result<PrometheusResponse> {
async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PrometheusResponse> {
let _ = self
.tx
.send((db.to_string(), request.encode_to_vec()))
.send((ctx.current_schema(), request.encode_to_vec()))
.await;
let response = ReadResponse {

View File

@@ -6,4 +6,5 @@ license.workspace = true
[dependencies]
arc-swap = "1.5"
common-catalog = { path = "../common/catalog" }
common-telemetry = { path = "../common/telemetry" }

View File

@@ -12,18 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
use std::net::SocketAddr;
use std::sync::Arc;
use arc_swap::ArcSwapOption;
use common_telemetry::info;
use arc_swap::ArcSwap;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_telemetry::debug;
pub type QueryContextRef = Arc<QueryContext>;
pub type ConnInfoRef = Arc<ConnInfo>;
pub struct QueryContext {
current_catalog: ArcSwapOption<String>,
current_schema: ArcSwapOption<String>,
current_catalog: ArcSwap<String>,
current_schema: ArcSwap<String>,
}
impl Default for QueryContext {
@@ -32,6 +34,17 @@ impl Default for QueryContext {
}
}
impl Display for QueryContext {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"QueryContext{{catalog: {}, schema: {}}}",
self.current_catalog(),
self.current_schema()
)
}
}
impl QueryContext {
pub fn arc() -> QueryContextRef {
Arc::new(QueryContext::new())
@@ -39,39 +52,37 @@ impl QueryContext {
pub fn new() -> Self {
Self {
current_catalog: ArcSwapOption::new(None),
current_schema: ArcSwapOption::new(None),
current_catalog: ArcSwap::new(Arc::new(DEFAULT_CATALOG_NAME.to_string())),
current_schema: ArcSwap::new(Arc::new(DEFAULT_SCHEMA_NAME.to_string())),
}
}
pub fn with(catalog: String, schema: String) -> Self {
pub fn with(catalog: &str, schema: &str) -> Self {
Self {
current_catalog: ArcSwapOption::new(Some(Arc::new(catalog))),
current_schema: ArcSwapOption::new(Some(Arc::new(schema))),
current_catalog: ArcSwap::new(Arc::new(catalog.to_string())),
current_schema: ArcSwap::new(Arc::new(schema.to_string())),
}
}
pub fn current_schema(&self) -> Option<String> {
self.current_schema.load().as_deref().cloned()
pub fn current_schema(&self) -> String {
self.current_schema.load().as_ref().clone()
}
pub fn current_catalog(&self) -> Option<String> {
self.current_catalog.load().as_deref().cloned()
pub fn current_catalog(&self) -> String {
self.current_catalog.load().as_ref().clone()
}
pub fn set_current_schema(&self, schema: &str) {
let last = self.current_schema.swap(Some(Arc::new(schema.to_string())));
info!(
let last = self.current_schema.swap(Arc::new(schema.to_string()));
debug!(
"set new session default schema: {:?}, swap old: {:?}",
schema, last
)
}
pub fn set_current_catalog(&self, catalog: &str) {
let last = self
.current_catalog
.swap(Some(Arc::new(catalog.to_string())));
info!(
let last = self.current_catalog.swap(Arc::new(catalog.to_string()));
debug!(
"set new session default catalog: {:?}, swap old: {:?}",
catalog, last
)

View File

@@ -26,7 +26,6 @@ use crate::statements::drop::DropTable;
use crate::statements::explain::Explain;
use crate::statements::show::{ShowCreateTable, ShowDatabases, ShowKind, ShowTables};
use crate::statements::statement::Statement;
use crate::statements::table_idents_to_full_name;
/// GrepTime SQL parser context, a simple wrapper for Datafusion SQL parser.
pub struct ParserContext<'a> {
@@ -267,12 +266,7 @@ impl<'a> ParserContext<'a> {
name: table_idents.to_string(),
}
);
let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_idents)?;
Ok(Statement::DescribeTable(DescribeTable {
catalog_name,
schema_name,
table_name,
}))
Ok(Statement::DescribeTable(DescribeTable::new(table_idents)))
}
fn parse_explain(&mut self) -> Result<Statement> {
@@ -310,12 +304,7 @@ impl<'a> ParserContext<'a> {
}
);
let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_ident)?;
Ok(Statement::DropTable(DropTable {
catalog_name,
schema_name,
table_name,
}))
Ok(Statement::DropTable(DropTable::new(table_ident)))
}
// Report unexpected token
@@ -384,8 +373,9 @@ impl<'a> ParserContext<'a> {
mod tests {
use std::assert_matches::assert_matches;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use sqlparser::ast::{Query as SpQuery, Statement as SpStatement, WildcardAdditionalOptions};
use sqlparser::ast::{
Ident, ObjectName, Query as SpQuery, Statement as SpStatement, WildcardAdditionalOptions,
};
use sqlparser::dialect::GenericDialect;
use super::*;
@@ -584,11 +574,7 @@ mod tests {
let mut stmts = result.unwrap();
assert_eq!(
stmts.pop().unwrap(),
Statement::DropTable(DropTable {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "foo".to_string()
})
Statement::DropTable(DropTable::new(ObjectName(vec![Ident::new("foo")])))
);
let sql = "DROP TABLE my_schema.foo";
@@ -596,11 +582,10 @@ mod tests {
let mut stmts = result.unwrap();
assert_eq!(
stmts.pop().unwrap(),
Statement::DropTable(DropTable {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: "my_schema".to_string(),
table_name: "foo".to_string()
})
Statement::DropTable(DropTable::new(ObjectName(vec![
Ident::new("my_schema"),
Ident::new("foo")
])))
);
let sql = "DROP TABLE my_catalog.my_schema.foo";
@@ -608,11 +593,11 @@ mod tests {
let mut stmts = result.unwrap();
assert_eq!(
stmts.pop().unwrap(),
Statement::DropTable(DropTable {
catalog_name: "my_catalog".to_string(),
schema_name: "my_schema".to_string(),
table_name: "foo".to_string()
})
Statement::DropTable(DropTable::new(ObjectName(vec![
Ident::new("my_catalog"),
Ident::new("my_schema"),
Ident::new("foo")
])))
)
}
}

View File

@@ -12,22 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use sqlparser::ast::ObjectName;
/// SQL structure for `DESCRIBE TABLE`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DescribeTable {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
name: ObjectName,
}
impl DescribeTable {
/// Creates a statement for `DESCRIBE TABLE`
pub fn new(catalog_name: String, schema_name: String, table_name: String) -> Self {
DescribeTable {
catalog_name,
schema_name,
table_name,
}
pub fn new(name: ObjectName) -> Self {
Self { name }
}
pub fn name(&self) -> &ObjectName {
&self.name
}
}
@@ -49,7 +49,7 @@ mod tests {
assert_matches!(&stmts[0], Statement::DescribeTable { .. });
match &stmts[0] {
Statement::DescribeTable(show) => {
assert_eq!(show.table_name.as_str(), "test");
assert_eq!(show.name.to_string(), "test");
}
_ => {
unreachable!();
@@ -66,8 +66,7 @@ mod tests {
assert_matches!(&stmts[0], Statement::DescribeTable { .. });
match &stmts[0] {
Statement::DescribeTable(show) => {
assert_eq!(show.schema_name.as_str(), "test_schema");
assert_eq!(show.table_name.as_str(), "test");
assert_eq!(show.name.to_string(), "test_schema.test");
}
_ => {
unreachable!();
@@ -84,9 +83,7 @@ mod tests {
assert_matches!(&stmts[0], Statement::DescribeTable { .. });
match &stmts[0] {
Statement::DescribeTable(show) => {
assert_eq!(show.catalog_name.as_str(), "test_catalog");
assert_eq!(show.schema_name.as_str(), "test_schema");
assert_eq!(show.table_name.as_str(), "test");
assert_eq!(show.name.to_string(), "test_catalog.test_schema.test");
}
_ => {
unreachable!();

View File

@@ -12,21 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use sqlparser::ast::ObjectName;
/// DROP TABLE statement.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DropTable {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
table_name: ObjectName,
}
impl DropTable {
/// Creates a statement for `DROP TABLE`
pub fn new(catalog_name: String, schema_name: String, table_name: String) -> Self {
DropTable {
catalog_name,
schema_name,
table_name,
}
pub fn new(table_name: ObjectName) -> Self {
Self { table_name }
}
pub fn table_name(&self) -> &ObjectName {
&self.table_name
}
}

View File

@@ -20,6 +20,7 @@ use crate::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, O
use crate::TableRef;
/// Represents a resolved path to a table of the form “catalog.schema.table”
#[derive(Debug, PartialEq)]
pub struct TableReference<'a> {
pub catalog: &'a str,
pub schema: &'a str,

View File

@@ -64,8 +64,8 @@ pub struct OpenTableRequest {
/// Alter table request
#[derive(Debug)]
pub struct AlterTableRequest {
pub catalog_name: Option<String>,
pub schema_name: Option<String>,
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub alter_kind: AlterKind,
}

View File

@@ -18,7 +18,7 @@ use api::v1::{
InsertRequest, TableId,
};
use client::{Client, Database};
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_query::Output;
use servers::server::Server;
use tests_integration::test_util::{setup_grpc_server, StorageType};
@@ -65,7 +65,7 @@ pub async fn test_auto_create_table(store_type: StorageType) {
setup_grpc_server(store_type, "auto_create_table").await;
let grpc_client = Client::with_urls(vec![addr]);
let db = Database::new("greptime", grpc_client);
let db = Database::with_client(grpc_client);
insert_and_assert(&db).await;
let _ = fe_grpc_server.shutdown().await;
guard.remove_all().await;
@@ -131,8 +131,7 @@ pub async fn test_insert_and_select(store_type: StorageType) {
setup_grpc_server(store_type, "insert_and_select").await;
let grpc_client = Client::with_urls(vec![addr]);
let db = Database::new("greptime", grpc_client.clone());
let db = Database::with_client(grpc_client);
// create
let expr = testing_create_expr();
@@ -153,9 +152,9 @@ pub async fn test_insert_and_select(store_type: StorageType) {
}],
});
let expr = AlterExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "demo".to_string(),
catalog_name: "".to_string(),
schema_name: "".to_string(),
kind: Some(kind),
};
let result = db.alter(expr).await.unwrap();
@@ -173,7 +172,6 @@ async fn insert_and_assert(db: &Database) {
let (expected_host_col, expected_cpu_col, expected_mem_col, expected_ts_col) = expect_data();
let request = InsertRequest {
schema_name: "public".to_string(),
table_name: "demo".to_string(),
region_number: 0,
columns: vec![

View File

@@ -1,12 +1,21 @@
CREATE TABLE test_distinct (a INTEGER, b INTEGER, t BIGINT TIME INDEX);
CREATE SCHEMA test_distinct;
Affected Rows: 1
USE test_distinct;
++
++
CREATE TABLE test (a INTEGER, b INTEGER, t BIGINT TIME INDEX);
Affected Rows: 0
INSERT INTO test_distinct VALUES (11, 22, 1), (13, 22, 2), (11, 21, 3), (11, 22, 4);
INSERT INTO test VALUES (11, 22, 1), (13, 22, 2), (11, 21, 3), (11, 22, 4);
Affected Rows: 4
SELECT DISTINCT a, b FROM test_distinct ORDER BY a, b;
SELECT DISTINCT a, b FROM test ORDER BY a, b;
+----+----+
| a | b |
@@ -16,7 +25,7 @@ SELECT DISTINCT a, b FROM test_distinct ORDER BY a, b;
| 13 | 22 |
+----+----+
SELECT DISTINCT test_distinct.a, b FROM test_distinct ORDER BY a, b;
SELECT DISTINCT test.a, b FROM test ORDER BY a, b;
+----+----+
| a | b |
@@ -26,7 +35,7 @@ SELECT DISTINCT test_distinct.a, b FROM test_distinct ORDER BY a, b;
| 13 | 22 |
+----+----+
SELECT DISTINCT a FROM test_distinct ORDER BY a;
SELECT DISTINCT a FROM test ORDER BY a;
+----+
| a |
@@ -35,7 +44,7 @@ SELECT DISTINCT a FROM test_distinct ORDER BY a;
| 13 |
+----+
SELECT DISTINCT b FROM test_distinct ORDER BY b;
SELECT DISTINCT b FROM test ORDER BY b;
+----+
| b |
@@ -44,32 +53,32 @@ SELECT DISTINCT b FROM test_distinct ORDER BY b;
| 22 |
+----+
SELECT DISTINCT a, SUM(B) FROM test_distinct GROUP BY a ORDER BY a;
SELECT DISTINCT a, SUM(B) FROM test GROUP BY a ORDER BY a;
+----+----------------------+
| a | SUM(test_distinct.b) |
+----+----------------------+
| 11 | 65 |
| 13 | 22 |
+----+----------------------+
+----+-------------+
| a | SUM(test.b) |
+----+-------------+
| 11 | 65 |
| 13 | 22 |
+----+-------------+
SELECT DISTINCT MAX(b) FROM test_distinct GROUP BY a;
SELECT DISTINCT MAX(b) FROM test GROUP BY a;
+----------------------+
| MAX(test_distinct.b) |
+----------------------+
| 22 |
+----------------------+
+-------------+
| MAX(test.b) |
+-------------+
| 22 |
+-------------+
SELECT DISTINCT CASE WHEN a > 11 THEN 11 ELSE a END FROM test_distinct;
SELECT DISTINCT CASE WHEN a > 11 THEN 11 ELSE a END FROM test;
+-------------------------------------------------------------------------------+
| CASE WHEN test_distinct.a > Int64(11) THEN Int64(11) ELSE test_distinct.a END |
+-------------------------------------------------------------------------------+
| 11 |
+-------------------------------------------------------------------------------+
+-------------------------------------------------------------+
| CASE WHEN test.a > Int64(11) THEN Int64(11) ELSE test.a END |
+-------------------------------------------------------------+
| 11 |
+-------------------------------------------------------------+
DROP TABLE test_distinct;
DROP TABLE test;
Affected Rows: 1

View File

@@ -1,19 +1,23 @@
CREATE TABLE test_distinct (a INTEGER, b INTEGER, t BIGINT TIME INDEX);
CREATE SCHEMA test_distinct;
INSERT INTO test_distinct VALUES (11, 22, 1), (13, 22, 2), (11, 21, 3), (11, 22, 4);
USE test_distinct;
SELECT DISTINCT a, b FROM test_distinct ORDER BY a, b;
CREATE TABLE test (a INTEGER, b INTEGER, t BIGINT TIME INDEX);
SELECT DISTINCT test_distinct.a, b FROM test_distinct ORDER BY a, b;
INSERT INTO test VALUES (11, 22, 1), (13, 22, 2), (11, 21, 3), (11, 22, 4);
SELECT DISTINCT a FROM test_distinct ORDER BY a;
SELECT DISTINCT a, b FROM test ORDER BY a, b;
SELECT DISTINCT b FROM test_distinct ORDER BY b;
SELECT DISTINCT test.a, b FROM test ORDER BY a, b;
SELECT DISTINCT a, SUM(B) FROM test_distinct GROUP BY a ORDER BY a;
SELECT DISTINCT a FROM test ORDER BY a;
SELECT DISTINCT MAX(b) FROM test_distinct GROUP BY a;
SELECT DISTINCT b FROM test ORDER BY b;
SELECT DISTINCT CASE WHEN a > 11 THEN 11 ELSE a END FROM test_distinct;
SELECT DISTINCT a, SUM(B) FROM test GROUP BY a ORDER BY a;
DROP TABLE test_distinct;
SELECT DISTINCT MAX(b) FROM test GROUP BY a;
SELECT DISTINCT CASE WHEN a > 11 THEN 11 ELSE a END FROM test;
DROP TABLE test;

View File

@@ -1,3 +1,8 @@
USE public;
++
++
SELECT SUM(number) FROM numbers;
+---------------------+

View File

@@ -1,3 +1,5 @@
USE public;
SELECT SUM(number) FROM numbers;
SELECT SUM(1) FROM numbers;

View File

@@ -1,16 +1,25 @@
CREATE TABLE test_add_col(i INTEGER, j BIGINT TIME INDEX);
CREATE SCHEMA test_add_col;
Affected Rows: 1
USE test_add_col;
++
++
CREATE TABLE test(i INTEGER, j BIGINT TIME INDEX);
Affected Rows: 0
INSERT INTO test_add_col VALUES (1, 1), (2, 2);
INSERT INTO test VALUES (1, 1), (2, 2);
Affected Rows: 2
ALTER TABLE test_add_col ADD COLUMN k INTEGER;
ALTER TABLE test ADD COLUMN k INTEGER;
Affected Rows: 0
SELECT * FROM test_add_col;
SELECT * FROM test;
+---+---+---+
| i | j | k |
@@ -19,7 +28,7 @@ SELECT * FROM test_add_col;
| 2 | 2 | |
+---+---+---+
DROP TABLE test_add_col;
DROP TABLE test;
Affected Rows: 1

View File

@@ -1,9 +1,13 @@
CREATE TABLE test_add_col(i INTEGER, j BIGINT TIME INDEX);
CREATE SCHEMA test_add_col;
INSERT INTO test_add_col VALUES (1, 1), (2, 2);
USE test_add_col;
ALTER TABLE test_add_col ADD COLUMN k INTEGER;
CREATE TABLE test(i INTEGER, j BIGINT TIME INDEX);
SELECT * FROM test_add_col;
INSERT INTO test VALUES (1, 1), (2, 2);
DROP TABLE test_add_col;
ALTER TABLE test ADD COLUMN k INTEGER;
SELECT * FROM test;
DROP TABLE test;

View File

@@ -1,3 +1,12 @@
CREATE SCHEMA test_rename_table;
Affected Rows: 1
USE test_rename_table;
++
++
CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX);
Affected Rows: 0
@@ -31,11 +40,11 @@ Affected Rows: 0
DESC TABLE t;
Error: 1004(InvalidArguments), Table not found: t
Error: 4001(TableNotFound), Table not found: t
SELECT * FROM t;
Error: 3000(PlanQuery), Error during planning: table 'greptime.public.t' not found
Error: 3000(PlanQuery), Error during planning: table 'greptime.test_rename_table.t' not found
CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX);
@@ -62,11 +71,11 @@ SELECT * FROM new_table;
ALTER TABLE new_table RENAME new_table;
Error: 1004(InvalidArguments), Table already exists: greptime.public.new_table
Error: 1004(InvalidArguments), Table already exists: greptime.test_rename_table.new_table
ALTER TABLE new_table RENAME t;
Error: 1004(InvalidArguments), Table already exists: greptime.public.t
Error: 1004(InvalidArguments), Table already exists: greptime.test_rename_table.t
DROP TABLE t;

View File

@@ -1,3 +1,7 @@
CREATE SCHEMA test_rename_table;
USE test_rename_table;
CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX);
DESC TABLE t;

View File

@@ -1,41 +1,42 @@
CREATE SCHEMA test_schema;
CREATE SCHEMA test_public_schema;
Affected Rows: 1
SHOW DATABASES;
CREATE SCHEMA test_public_schema;
+-------------+
| Schemas |
+-------------+
| public |
| test_schema |
+-------------+
Error: 1003(Internal), Schema test_public_schema already exists
CREATE TABLE test_schema.hello(i BIGINT TIME INDEX);
SHOW DATABASES LIKE '%public%';
+--------------------+
| Schemas |
+--------------------+
| public |
| test_public_schema |
+--------------------+
USE test_public_schema;
++
++
CREATE TABLE hello(i BIGINT TIME INDEX);
Affected Rows: 0
DROP TABLE test_schema.hello;
DROP TABLE hello;
Affected Rows: 1
DROP SCHEMA test_schema;
Error: 1001(Unsupported), SQL statement is not supported: DROP SCHEMA test_schema;, keyword: SCHEMA
CREATE SCHEMA test_schema;
Error: 1003(Internal), Schema test_schema already exists
CREATE TABLE test_schema.hello(i BIGINT TIME INDEX);
CREATE TABLE hello(i BIGINT TIME INDEX);
Affected Rows: 0
INSERT INTO test_schema.hello VALUES (2), (3), (4);
INSERT INTO hello VALUES (2), (3), (4);
Affected Rows: 3
SELECT * FROM test_schema.hello;
SELECT * FROM hello;
+---+
| i |
@@ -47,6 +48,29 @@ SELECT * FROM test_schema.hello;
SHOW TABLES;
+--------+
| Tables |
+--------+
| hello |
+--------+
DROP TABLE hello;
Affected Rows: 1
DROP TABLE hello;
Error: 4001(TableNotFound), Table `greptime.test_public_schema.hello` not exist
SHOW TABLES FROM test_public_schema;
+--------+
| Tables |
+--------+
+--------+
SHOW TABLES FROM public;
+---------+
| Tables |
+---------+
@@ -54,34 +78,11 @@ SHOW TABLES;
| scripts |
+---------+
SHOW TABLES FROM test_schema;
DROP SCHEMA test_public_schema;
+--------+
| Tables |
+--------+
| hello |
+--------+
Error: 1001(Unsupported), SQL statement is not supported: DROP SCHEMA test_public_schema;, keyword: SCHEMA
DROP TABLE test_schema.hello;
SELECT * FROM test_public_schema.hello;
Affected Rows: 1
DROP TABLE test_schema.hello;
Error: 4001(TableNotFound), Table `greptime.test_schema.hello` not exist
SHOW TABLES FROM test_schema;
+--------+
| Tables |
+--------+
+--------+
DROP SCHEMA test_schema;
Error: 1001(Unsupported), SQL statement is not supported: DROP SCHEMA test_schema;, keyword: SCHEMA
SELECT * FROM test_schema.hello;
Error: 3000(PlanQuery), Error during planning: table 'greptime.test_schema.hello' not found
Error: 3000(PlanQuery), Error during planning: table 'greptime.test_public_schema.hello' not found

View File

@@ -1,31 +1,31 @@
CREATE SCHEMA test_schema;
CREATE SCHEMA test_public_schema;
SHOW DATABASES;
CREATE SCHEMA test_public_schema;
CREATE TABLE test_schema.hello(i BIGINT TIME INDEX);
SHOW DATABASES LIKE '%public%';
DROP TABLE test_schema.hello;
USE test_public_schema;
DROP SCHEMA test_schema;
CREATE TABLE hello(i BIGINT TIME INDEX);
CREATE SCHEMA test_schema;
DROP TABLE hello;
CREATE TABLE test_schema.hello(i BIGINT TIME INDEX);
CREATE TABLE hello(i BIGINT TIME INDEX);
INSERT INTO test_schema.hello VALUES (2), (3), (4);
INSERT INTO hello VALUES (2), (3), (4);
SELECT * FROM test_schema.hello;
SELECT * FROM hello;
SHOW TABLES;
SHOW TABLES FROM test_schema;
DROP TABLE hello;
DROP TABLE test_schema.hello;
DROP TABLE hello;
DROP TABLE test_schema.hello;
SHOW TABLES FROM test_public_schema;
SHOW TABLES FROM test_schema;
SHOW TABLES FROM public;
DROP SCHEMA test_schema;
DROP SCHEMA test_public_schema;
SELECT * FROM test_schema.hello;
SELECT * FROM test_public_schema.hello;

View File

@@ -1,16 +1,25 @@
CREATE TABLE insert_invalid_strings(i STRING, t BIGINT, time index(t));
Affected Rows: 0
INSERT INTO insert_invalid_strings VALUES ('â‚(', 1);
CREATE SCHEMA insert_invalid;
Affected Rows: 1
INSERT INTO insert_invalid_strings VALUES (3, 4);
USE insert_invalid;
++
++
CREATE TABLE strings(i STRING, t BIGINT, time index(t));
Affected Rows: 0
INSERT INTO strings VALUES ('â‚(', 1);
Affected Rows: 1
INSERT INTO strings VALUES (3, 4);
Error: 2000(InvalidSyntax), Failed to parse value: Fail to parse number 3, invalid column type: String(StringType)
SELECT * FROM insert_invalid_strings WHERE i = 'â‚(';
SELECT * FROM strings WHERE i = 'â‚(';
+-----+---+
| i | t |

View File

@@ -1,10 +1,14 @@
CREATE TABLE insert_invalid_strings(i STRING, t BIGINT, time index(t));
CREATE SCHEMA insert_invalid;
INSERT INTO insert_invalid_strings VALUES ('â‚(', 1);
USE insert_invalid;
INSERT INTO insert_invalid_strings VALUES (3, 4);
CREATE TABLE strings(i STRING, t BIGINT, time index(t));
SELECT * FROM insert_invalid_strings WHERE i = 'â‚(';
INSERT INTO strings VALUES ('â‚(', 1);
INSERT INTO strings VALUES (3, 4);
SELECT * FROM strings WHERE i = 'â‚(';
CREATE TABLE a(i integer, j BIGINT, time index(j));

View File

@@ -1,3 +1,8 @@
USE public;
++
++
SELECT * FROM (SELECT SUM(number) FROM numbers LIMIT 100000000000) LIMIT 0;
++

View File

@@ -1,3 +1,5 @@
USE public;
SELECT * FROM (SELECT SUM(number) FROM numbers LIMIT 100000000000) LIMIT 0;
EXPLAIN SELECT * FROM (SELECT SUM(number) FROM numbers LIMIT 100000000000) LIMIT 0;

View File

@@ -1,3 +1,12 @@
CREATE SCHEMA order_variable_size_payload;
Affected Rows: 1
USE order_variable_size_payload;
++
++
create table t0 (c0 varchar, t BIGINT TIME INDEX);
Affected Rows: 0
@@ -137,7 +146,7 @@ SELECT * FROM tpch_q1_agg ORDER BY l_returnflag, l_linestatus;
| R | F | 3785523 | 5337950526.47 | 5071818532.942 | 5274405503.049367 | 25.5259438574251 | 35994.029214030925 | 0.04998927856184382 | 148301 | 2 |
+--------------+--------------+---------+----------------+-----------------+--------------------+--------------------+--------------------+---------------------+-------------+---+
create table order_variable_size_payload_test5 (i int, s varchar, t BIGINT TIME INDEX);
create table test5 (i int, s varchar, t BIGINT TIME INDEX);
Affected Rows: 0

View File

@@ -1,3 +1,7 @@
CREATE SCHEMA order_variable_size_payload;
USE order_variable_size_payload;
create table t0 (c0 varchar, t BIGINT TIME INDEX);
insert into t0 values ('a', 1), (NULL,2), (NULL, 3), (NULL, 4), (NULL, 5), (NULL,6), (NULL,7);
@@ -35,7 +39,7 @@ INSERT INTO tpch_q1_agg VALUES ('N', 'O', 7459297, 10512270008.90, 9986238338.38
SELECT * FROM tpch_q1_agg ORDER BY l_returnflag, l_linestatus;
create table order_variable_size_payload_test5 (i int, s varchar, t BIGINT TIME INDEX);
create table test5 (i int, s varchar, t BIGINT TIME INDEX);
CREATE TABLE test6 (i1 INT, s1 VARCHAR, i2 int, s2 VARCHAR, t BIGINT TIME INDEX);

View File

@@ -0,0 +1,18 @@
mode = 'standalone'
enable_memory_catalog = false
[wal]
dir = '{wal_dir}'
file_size = '1GB'
purge_interval = '10m'
purge_threshold = '50GB'
read_batch_size = 128
sync_write = false
[storage]
type = 'File'
data_dir = '{data_dir}'
[grpc_options]
addr = '127.0.0.1:4001'
runtime_size = 8

View File

@@ -11,5 +11,8 @@ common-base = { path = "../../src/common/base" }
common-error = { path = "../../src/common/error" }
common-grpc = { path = "../../src/common/grpc" }
common-query = { path = "../../src/common/query" }
sqlness = "0.1"
common-time = { path = "../../src/common/time" }
serde.workspace = true
sqlness = "0.2"
tinytemplate = "1.2"
tokio.workspace = true

View File

@@ -14,7 +14,7 @@
use std::fmt::Display;
use std::fs::OpenOptions;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::time::Duration;
@@ -23,8 +23,11 @@ use client::{Client, Database as DB, Error as ClientError};
use common_error::ext::ErrorExt;
use common_error::snafu::ErrorCompat;
use common_query::Output;
use serde::Serialize;
use sqlness::{Database, EnvController};
use tinytemplate::TinyTemplate;
use tokio::process::{Child, Command};
use tokio::sync::Mutex;
use crate::util;
@@ -38,6 +41,7 @@ const DATANODE_LOG_FILE: &str = "/tmp/greptime-sqlness-datanode.log";
pub struct Env {}
#[allow(clippy::print_stdout)]
#[async_trait]
impl EnvController for Env {
type DB = GreptimeDB;
@@ -51,7 +55,6 @@ impl EnvController for Env {
}
/// Stop one [`Database`].
#[allow(clippy::print_stdout)]
async fn stop(&self, _mode: &str, mut database: Self::DB) {
let mut server = database.server_process;
Env::stop_server(&mut server).await;
@@ -65,8 +68,8 @@ impl EnvController for Env {
}
}
#[allow(clippy::print_stdout)]
impl Env {
#[allow(clippy::print_stdout)]
pub async fn start_standalone() -> GreptimeDB {
// Build the DB with `cargo build --bin greptime`
println!("Going to build the DB...");
@@ -90,10 +93,12 @@ impl Env {
.truncate(true)
.open(SERVER_LOG_FILE)
.unwrap_or_else(|_| panic!("Cannot open log file at {SERVER_LOG_FILE}"));
let conf = Self::generate_standalone_config_file();
// Start the DB
let server_process = Command::new("./greptime")
.current_dir(util::get_binary_dir("debug"))
.args(["standalone", "start"])
.args(["--log-level=debug", "standalone", "start", "-c", &conf])
.stdout(log_file)
.spawn()
.expect("Failed to start the DB");
@@ -105,17 +110,47 @@ impl Env {
println!("Started, going to test. Log will be write to {SERVER_LOG_FILE}");
let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new("greptime", client.clone());
let db = DB::with_client(client);
GreptimeDB {
server_process,
metasrv_process: None,
datanode_process: None,
client,
db,
client: Mutex::new(db),
}
}
fn generate_standalone_config_file() -> String {
let mut tt = TinyTemplate::new();
let mut template_file = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
template_file.push("../conf/standalone-test.toml.template");
let path = template_file.as_path();
let template = std::fs::read_to_string(path)
.unwrap_or_else(|_| panic!("Failed to read template config file: {}", path.display()));
tt.add_template("standalone", &template).unwrap();
#[derive(Serialize)]
struct Context {
wal_dir: String,
data_dir: String,
}
let current_time = common_time::util::current_time_millis();
let greptimedb_dir = format!("/tmp/greptimedb-{current_time}");
let ctx = Context {
wal_dir: format!("{greptimedb_dir}/wal/"),
data_dir: format!("{greptimedb_dir}/data/"),
};
let rendered = tt.render("standalone", &ctx).unwrap();
let conf_file = format!("/tmp/standalone-{current_time}.toml");
println!("Generating standalone config file in {conf_file}, full content:\n{rendered}");
std::fs::write(&conf_file, rendered).unwrap();
conf_file
}
pub async fn start_distributed() -> GreptimeDB {
let cargo_build_result = Command::new("cargo")
.current_dir(util::get_workspace_root())
@@ -147,14 +182,13 @@ impl Env {
}
let client = Client::with_urls(vec![SERVER_ADDR]);
let db = DB::new("greptime", client.clone());
let db = DB::with_client(client);
GreptimeDB {
server_process: frontend,
metasrv_process: Some(meta_server),
datanode_process: Some(datanode),
client,
db,
client: Mutex::new(db),
}
}
@@ -186,6 +220,8 @@ impl Env {
args.push("--node-id=1");
args.push("--data-dir=/tmp/greptimedb_node_1/data");
args.push("--wal-dir=/tmp/greptimedb_node_1/wal");
} else if subcommand == "metasrv" {
args.push("--use-memory-store");
}
let process = Command::new("./greptime")
@@ -202,15 +238,23 @@ pub struct GreptimeDB {
server_process: Child,
metasrv_process: Option<Child>,
datanode_process: Option<Child>,
#[allow(dead_code)]
client: Client,
db: DB,
client: Mutex<DB>,
}
#[async_trait]
impl Database for GreptimeDB {
async fn query(&self, query: String) -> Box<dyn Display> {
let result = self.db.sql(&query).await;
let mut client = self.client.lock().await;
if query.trim().starts_with("USE ") {
let database = query
.split_ascii_whitespace()
.nth(1)
.expect("Illegal `USE` statement: expecting a database.")
.trim_end_matches(';');
client.set_schema(database);
}
let result = client.sql(&query).await;
Box::new(ResultDisplayer { result }) as _
}
}