fix: correctly open table when distributed datanode restart (#576)

Co-authored-by: luofucong <luofucong@greptime.com>
This commit is contained in:
LFC
2022-11-21 15:15:14 +08:00
committed by GitHub
parent 2b6b979d5a
commit 62fcb54258
20 changed files with 262 additions and 151 deletions

4
Cargo.lock generated
View File

@@ -3616,9 +3616,9 @@ dependencies = [
[[package]]
name = "opensrv-mysql"
version = "0.1.0"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bcb5fc2fda7e5e5f8478cd637285bbdd6196a9601e32293d0897e469a7dd020"
checksum = "e4c24c12fd688cb5aa5b1a54c6ccb2e30fb9b5132debb0e89fcb432b3f73db8f"
dependencies = [
"async-trait",
"byteorder",

38
src/api/src/column_def.rs Normal file
View File

@@ -0,0 +1,38 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use snafu::ResultExt;
use crate::error::{self, Result};
use crate::helper::ColumnDataTypeWrapper;
use crate::v1::ColumnDef;
impl ColumnDef {
pub fn try_as_column_schema(&self) -> Result<ColumnSchema> {
let data_type = ColumnDataTypeWrapper::try_new(self.datatype)?;
let constraint = match &self.default_constraint {
None => None,
Some(v) => Some(
ColumnDefaultConstraint::try_from(&v[..])
.context(error::ConvertColumnDefaultConstraintSnafu { column: &self.name })?,
),
};
ColumnSchema::new(&self.name, data_type.into(), self.is_nullable)
.with_default_constraint(constraint)
.context(error::InvalidColumnDefaultConstraintSnafu { column: &self.name })
}
}

View File

@@ -33,6 +33,28 @@ pub enum Error {
from: ConcreteDataType,
backtrace: Backtrace,
},
#[snafu(display(
"Failed to convert column default constraint, column: {}, source: {}",
column,
source
))]
ConvertColumnDefaultConstraint {
column: String,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display(
"Invalid column default constraint, column: {}, source: {}",
column,
source
))]
InvalidColumnDefaultConstraint {
column: String,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
}
impl ErrorExt for Error {
@@ -40,6 +62,8 @@ impl ErrorExt for Error {
match self {
Error::UnknownColumnDataType { .. } => StatusCode::InvalidArguments,
Error::IntoColumnDataType { .. } => StatusCode::Unexpected,
Error::ConvertColumnDefaultConstraint { source, .. }
| Error::InvalidColumnDefaultConstraint { source, .. } => source.status_code(),
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod column_def;
pub mod error;
pub mod helper;
pub mod prometheus;

View File

@@ -241,6 +241,7 @@ impl LocalCatalogManager {
schema_name: t.schema_name.clone(),
table_name: t.table_name.clone(),
table_id: t.table_id,
region_numbers: vec![0],
};
let option = self

View File

@@ -316,11 +316,15 @@ impl RemoteCatalogManager {
..
} = table_value;
// unwrap safety: checked in yielding this table when `iter_remote_tables`
let region_numbers = regions_id_map.get(&self.node_id).unwrap();
let request = OpenTableRequest {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
table_id,
region_numbers: region_numbers.clone(),
};
match self
.engine
@@ -361,7 +365,7 @@ impl RemoteCatalogManager {
table_name: table_name.clone(),
desc: None,
schema: Arc::new(schema),
region_numbers: regions_id_map.get(&self.node_id).unwrap().clone(), // this unwrap is safe because region_id_map is checked in `iter_remote_tables`
region_numbers: region_numbers.clone(),
primary_key_indices: meta.primary_key_indices.clone(),
create_if_not_exists: true,
table_options: meta.options.clone(),

View File

@@ -87,6 +87,7 @@ impl SystemCatalogTable {
schema_name: INFORMATION_SCHEMA_NAME.to_string(),
table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(),
table_id: SYSTEM_CATALOG_TABLE_ID,
region_numbers: vec![0],
};
let schema = Arc::new(build_system_catalog_schema());
let ctx = EngineContext::default();

View File

@@ -14,7 +14,7 @@
use clap::Parser;
use common_telemetry::logging;
use datanode::datanode::{Datanode, DatanodeOptions};
use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig};
use meta_client::MetaClientOpts;
use servers::Mode;
use snafu::ResultExt;
@@ -47,7 +47,7 @@ impl SubCommand {
}
}
#[derive(Debug, Parser)]
#[derive(Debug, Parser, Default)]
struct StartCommand {
#[clap(long)]
node_id: Option<u64>,
@@ -59,6 +59,10 @@ struct StartCommand {
metasrv_addr: Option<String>,
#[clap(short, long)]
config_file: Option<String>,
#[clap(long)]
data_dir: Option<String>,
#[clap(long)]
wal_dir: Option<String>,
}
impl StartCommand {
@@ -115,6 +119,14 @@ impl TryFrom<StartCommand> for DatanodeOptions {
}
.fail();
}
if let Some(data_dir) = cmd.data_dir {
opts.storage = ObjectStoreConfig::File { data_dir };
}
if let Some(wal_dir) = cmd.wal_dir {
opts.wal_dir = wal_dir;
}
Ok(opts)
}
}
@@ -131,14 +143,11 @@ mod tests {
#[test]
fn test_read_from_config_file() {
let cmd = StartCommand {
node_id: None,
rpc_addr: None,
mysql_addr: None,
metasrv_addr: None,
config_file: Some(format!(
"{}/../../config/datanode.example.toml",
std::env::current_dir().unwrap().as_path().to_str().unwrap()
)),
..Default::default()
};
let options: DatanodeOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr);
@@ -168,44 +177,30 @@ mod tests {
fn test_try_from_cmd() {
assert_eq!(
Mode::Standalone,
DatanodeOptions::try_from(StartCommand {
node_id: None,
rpc_addr: None,
mysql_addr: None,
metasrv_addr: None,
config_file: None
})
.unwrap()
.mode
DatanodeOptions::try_from(StartCommand::default())
.unwrap()
.mode
);
let mode = DatanodeOptions::try_from(StartCommand {
node_id: Some(42),
rpc_addr: None,
mysql_addr: None,
metasrv_addr: Some("127.0.0.1:3002".to_string()),
config_file: None,
..Default::default()
})
.unwrap()
.mode;
assert_matches!(mode, Mode::Distributed);
assert!(DatanodeOptions::try_from(StartCommand {
node_id: None,
rpc_addr: None,
mysql_addr: None,
metasrv_addr: Some("127.0.0.1:3002".to_string()),
config_file: None,
..Default::default()
})
.is_err());
// Providing node_id but leave metasrv_addr absent is ok since metasrv_addr has default value
DatanodeOptions::try_from(StartCommand {
node_id: Some(42),
rpc_addr: None,
mysql_addr: None,
metasrv_addr: None,
config_file: None,
..Default::default()
})
.unwrap();
}
@@ -213,14 +208,11 @@ mod tests {
#[test]
fn test_merge_config() {
let dn_opts = DatanodeOptions::try_from(StartCommand {
node_id: None,
rpc_addr: None,
mysql_addr: None,
metasrv_addr: None,
config_file: Some(format!(
"{}/../../config/datanode.example.toml",
std::env::current_dir().unwrap().as_path().to_str().unwrap()
)),
..Default::default()
})
.unwrap();
assert_eq!(Some(42), dn_opts.node_id);

View File

@@ -208,10 +208,15 @@ pub enum Error {
source: api::error::Error,
},
#[snafu(display("Invalid column default constraint, source: {}", source))]
ColumnDefaultConstraint {
#[snafu(display(
"Invalid column proto definition, column: {}, source: {}",
column,
source
))]
InvalidColumnDef {
column: String,
#[snafu(backtrace)]
source: datatypes::error::Error,
source: api::error::Error,
},
#[snafu(display("Failed to parse SQL, source: {}", source))]
@@ -311,8 +316,7 @@ impl ErrorExt for Error {
source.status_code()
}
Error::ColumnDefaultConstraint { source, .. }
| Error::CreateSchema { source, .. }
Error::CreateSchema { source, .. }
| Error::ConvertSchema { source, .. }
| Error::VectorComputation { source } => source.status_code(),
@@ -337,9 +341,12 @@ impl ErrorExt for Error {
| Error::RegisterSchema { .. }
| Error::IntoPhysicalPlan { .. }
| Error::UnsupportedExpr { .. }
| Error::ColumnDataType { .. }
| Error::Catalog { .. } => StatusCode::Internal,
Error::ColumnDataType { source } | Error::InvalidColumnDef { source, .. } => {
source.status_code()
}
Error::InitBackend { .. } => StatusCode::StorageUnavailable,
Error::OpenLogStore { source } => source.status_code(),
Error::StartScriptManager { source } => source.status_code(),

View File

@@ -14,23 +14,20 @@
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::result::AdminResultBuilder;
use api::v1::alter_expr::Kind;
use api::v1::{AdminResult, AlterExpr, ColumnDef, CreateExpr, DropColumns};
use api::v1::{AdminResult, AlterExpr, CreateExpr, DropColumns};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::{ErrorExt, StatusCode};
use common_query::Output;
use common_telemetry::{error, info};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder, SchemaRef};
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
use futures::TryFutureExt;
use snafu::prelude::*;
use table::metadata::TableId;
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest};
use crate::error::{
self, BumpTableIdSnafu, ColumnDefaultConstraintSnafu, MissingFieldSnafu, Result,
};
use crate::error::{self, BumpTableIdSnafu, MissingFieldSnafu, Result};
use crate::instance::Instance;
use crate::sql::SqlRequest;
@@ -172,7 +169,12 @@ fn alter_expr_to_request(expr: AlterExpr) -> Result<Option<AlterTableRequest>> {
field: "column_def",
})?;
let schema = create_column_schema(&column_def)?;
let schema =
column_def
.try_as_column_schema()
.context(error::InvalidColumnDefSnafu {
column: &column_def.name,
})?;
add_column_requests.push(AddColumnRequest {
column_schema: schema,
is_key: add_column_expr.is_key,
@@ -212,7 +214,10 @@ fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
let column_schemas = expr
.column_defs
.iter()
.map(create_column_schema)
.map(|x| {
x.try_as_column_schema()
.context(error::InvalidColumnDefSnafu { column: &x.name })
})
.collect::<Result<Vec<ColumnSchema>>>()?;
ensure!(
@@ -243,28 +248,12 @@ fn create_table_schema(expr: &CreateExpr) -> Result<SchemaRef> {
))
}
fn create_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
let data_type =
ColumnDataTypeWrapper::try_new(column_def.datatype).context(error::ColumnDataTypeSnafu)?;
let default_constraint = match &column_def.default_constraint {
None => None,
Some(v) => {
Some(ColumnDefaultConstraint::try_from(&v[..]).context(ColumnDefaultConstraintSnafu)?)
}
};
ColumnSchema::new(
column_def.name.clone(),
data_type.into(),
column_def.is_nullable,
)
.with_default_constraint(default_constraint)
.context(ColumnDefaultConstraintSnafu)
}
#[cfg(test)]
mod tests {
use api::v1::ColumnDef;
use common_catalog::consts::MIN_USER_TABLE_ID;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnDefaultConstraint;
use datatypes::value::Value;
use super::*;
@@ -321,12 +310,11 @@ mod tests {
is_nullable: true,
default_constraint: None,
};
let result = create_column_schema(&column_def);
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Column datatype error, source: Unknown proto column datatype: 1024"
);
let result = column_def.try_as_column_schema();
assert!(matches!(
result.unwrap_err(),
api::error::Error::UnknownColumnDataType { .. }
));
let column_def = ColumnDef {
name: "a".to_string(),
@@ -334,7 +322,7 @@ mod tests {
is_nullable: true,
default_constraint: None,
};
let column_schema = create_column_schema(&column_def).unwrap();
let column_schema = column_def.try_as_column_schema().unwrap();
assert_eq!(column_schema.name, "a");
assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype());
assert!(column_schema.is_nullable());
@@ -346,7 +334,7 @@ mod tests {
is_nullable: true,
default_constraint: Some(default_constraint.clone().try_into().unwrap()),
};
let column_schema = create_column_schema(&column_def).unwrap();
let column_schema = column_def.try_as_column_schema().unwrap();
assert_eq!(column_schema.name, "a");
assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype());
assert!(column_schema.is_nullable());

View File

@@ -65,6 +65,17 @@ pub enum Error {
source: api::error::Error,
},
#[snafu(display(
"Invalid column proto definition, column: {}, source: {}",
column,
source
))]
InvalidColumnDef {
column: String,
#[snafu(backtrace)]
source: api::error::Error,
},
#[snafu(display(
"Failed to convert column default constraint, column: {}, source: {}",
column_name,
@@ -452,8 +463,11 @@ impl ErrorExt for Error {
| Error::RequestDatanode { source }
| Error::InvalidAdminResult { source } => source.status_code(),
Error::ColumnDataType { .. }
| Error::FindDatanode { .. }
Error::ColumnDataType { source } | Error::InvalidColumnDef { source, .. } => {
source.status_code()
}
Error::FindDatanode { .. }
| Error::GetCache { .. }
| Error::FindTableRoutes { .. }
| Error::SerializeJson { .. }

View File

@@ -24,7 +24,7 @@ use common_catalog::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue};
use common_query::Output;
use common_telemetry::{debug, info};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema};
use datatypes::schema::RawSchema;
use meta_client::client::MetaClient;
use meta_client::rpc::{
CreateRequest as MetaCreateRequest, Partition as MetaPartition, PutRequest, RouteResponse,
@@ -42,8 +42,8 @@ use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{
self, CatalogEntrySerdeSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
PrimaryKeyNotFoundSnafu, RequestMetaSnafu, Result, StartMetaClientSnafu,
self, CatalogEntrySerdeSnafu, ColumnDataTypeSnafu, PrimaryKeyNotFoundSnafu, RequestMetaSnafu,
Result, StartMetaClientSnafu,
};
use crate::partitioning::{PartitionBound, PartitionDef};
@@ -229,17 +229,40 @@ fn create_table_global_value(
let node_id = region_routes[0]
.leader_peer
.as_ref()
.context(error::FindLeaderPeerSnafu {
.with_context(|| error::FindLeaderPeerSnafu {
region: region_routes[0].region.id,
table_name: table_name.to_string(),
})?
.id;
let mut regions_id_map = HashMap::new();
for route in region_routes.iter() {
let node_id = route
.leader_peer
.as_ref()
.with_context(|| error::FindLeaderPeerSnafu {
region: route.region.id,
table_name: table_name.to_string(),
})?
.id;
regions_id_map
.entry(node_id)
.or_insert_with(Vec::new)
.push(route.region.id as u32);
}
let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
let mut column_name_to_index_map = HashMap::new();
for (idx, column) in create_table.column_defs.iter().enumerate() {
column_schemas.push(create_column_schema(column)?);
let schema = column
.try_as_column_schema()
.context(error::InvalidColumnDefSnafu {
column: &column.name,
})?;
let schema = schema.with_time_index(column.name == create_table.time_index);
column_schemas.push(schema);
column_name_to_index_map.insert(column.name.clone(), idx);
}
@@ -291,34 +314,11 @@ fn create_table_global_value(
Ok(TableGlobalValue {
node_id,
regions_id_map: HashMap::new(),
regions_id_map,
table_info,
})
}
// Remove this duplication in the future
fn create_column_schema(column_def: &api::v1::ColumnDef) -> Result<ColumnSchema> {
let data_type =
ColumnDataTypeWrapper::try_new(column_def.datatype).context(error::ColumnDataTypeSnafu)?;
let default_constraint = match &column_def.default_constraint {
None => None,
Some(v) => Some(ColumnDefaultConstraint::try_from(&v[..]).context(
ConvertColumnDefaultConstraintSnafu {
column_name: &column_def.name,
},
)?),
};
ColumnSchema::new(
column_def.name.clone(),
data_type.into(),
column_def.is_nullable,
)
.with_default_constraint(default_constraint)
.context(ConvertColumnDefaultConstraintSnafu {
column_name: &column_def.name,
})
}
fn parse_partitions(
create_table: &CreateExpr,
partitions: Option<Partitions>,

View File

@@ -404,7 +404,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let table_id = request.table_id;
// TODO(dennis): supports multi regions;
let region_number = 0;
assert_eq!(request.region_numbers.len(), 1);
let region_number = request.region_numbers[0];
let region_name = region_name(table_id, region_number);
let region = match self
@@ -804,6 +805,7 @@ mod tests {
table_name: test_util::TABLE_NAME.to_string(),
// the test table id is 1
table_id: 1,
region_numbers: vec![0],
};
let (engine, table, object_store, _dir) = {

View File

@@ -15,7 +15,6 @@
use std::str::FromStr;
use std::sync::Arc;
use common_telemetry::debug;
use common_time::timestamp::{TimeUnit, Timestamp};
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_plan::plan::Filter;
@@ -169,10 +168,6 @@ impl<'a> TypeConverter<'a> {
match (left, right) {
(Expr::Column(col), Expr::Literal(value)) => {
let casted_right = Self::cast_scalar_value(value, left_type)?;
debug!(
"Converting type, origin_left:{:?}, type:{:?}, right:{:?}, casted_right:{:?}",
col, left_type, value, casted_right
);
if casted_right.is_null() {
return Err(DataFusionError::Plan(format!(
"column:{:?} value:{:?} is invalid",

View File

@@ -29,7 +29,7 @@ metrics = "0.20"
num_cpus = "1.13"
once_cell = "1.16"
openmetrics-parser = "0.4"
opensrv-mysql = "0.1"
opensrv-mysql = "0.2"
pgwire = "0.5"
prost = "0.11"
regex = "1.6"

View File

@@ -12,15 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io;
use std::sync::Arc;
use std::time::Instant;
use async_trait::async_trait;
use common_telemetry::error;
use common_telemetry::{debug, error};
use opensrv_mysql::{
AsyncMysqlShim, ErrorKind, ParamParser, QueryResultWriter, StatementMetaWriter,
};
use rand::RngCore;
use tokio::io::AsyncWrite;
use tokio::sync::RwLock;
use crate::context::AuthHashMethod::DoubleSha1;
@@ -63,7 +64,7 @@ impl MysqlInstanceShim {
}
#[async_trait]
impl<W: io::Write + Send + Sync> AsyncMysqlShim<W> for MysqlInstanceShim {
impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShim {
type Error = error::Error;
fn salt(&self) -> [u8; 20] {
@@ -108,15 +109,12 @@ impl<W: io::Write + Send + Sync> AsyncMysqlShim<W> for MysqlInstanceShim {
};
}
async fn on_prepare<'a>(
&'a mut self,
_: &'a str,
writer: StatementMetaWriter<'a, W>,
) -> Result<()> {
writer.error(
async fn on_prepare<'a>(&'a mut self, _: &'a str, w: StatementMetaWriter<'a, W>) -> Result<()> {
w.error(
ErrorKind::ER_UNKNOWN_ERROR,
"prepare statement is not supported yet".as_bytes(),
)?;
b"prepare statement is not supported yet",
)
.await?;
Ok(())
}
@@ -124,12 +122,13 @@ impl<W: io::Write + Send + Sync> AsyncMysqlShim<W> for MysqlInstanceShim {
&'a mut self,
_: u32,
_: ParamParser<'a>,
writer: QueryResultWriter<'a, W>,
w: QueryResultWriter<'a, W>,
) -> Result<()> {
writer.error(
w.error(
ErrorKind::ER_UNKNOWN_ERROR,
"prepare statement is not supported yet".as_bytes(),
)?;
b"prepare statement is not supported yet",
)
.await?;
Ok(())
}
@@ -145,6 +144,9 @@ impl<W: io::Write + Send + Sync> AsyncMysqlShim<W> for MysqlInstanceShim {
query: &'a str,
writer: QueryResultWriter<'a, W>,
) -> Result<()> {
debug!("Start executing query: '{}'", query);
let start = Instant::now();
// TODO(LFC): Find a better way:
// `check` uses regex to filter out unsupported statements emitted by MySQL's federated
// components, this is quick and dirty, there must be a better way to do it.
@@ -154,7 +156,13 @@ impl<W: io::Write + Send + Sync> AsyncMysqlShim<W> for MysqlInstanceShim {
self.query_handler.do_query(query).await
};
debug!(
"Finished executing query: '{}', total time costs in microseconds: {}",
query,
start.elapsed().as_micros()
);
let mut writer = MysqlResultWriter::new(writer);
writer.write(output).await
writer.write(query, output).await
}
}

View File

@@ -22,6 +22,7 @@ use common_telemetry::logging::{error, info};
use futures::StreamExt;
use opensrv_mysql::AsyncMysqlIntermediary;
use tokio;
use tokio::io::BufWriter;
use tokio::net::TcpStream;
use crate::error::Result;
@@ -29,6 +30,9 @@ use crate::mysql::handler::MysqlInstanceShim;
use crate::query_handler::SqlQueryHandlerRef;
use crate::server::{AbortableStream, BaseTcpServer, Server};
// Default size of ResultSet write buffer: 100KB
const DEFAULT_RESULT_SET_WRITE_BUFFER_SIZE: usize = 100 * 1024;
pub struct MysqlServer {
base_server: BaseTcpServer,
query_handler: SqlQueryHandlerRef,
@@ -58,7 +62,8 @@ impl MysqlServer {
match tcp_stream {
Err(error) => error!("Broken pipe: {}", error), // IoError doesn't impl ErrorExt.
Ok(io_stream) => {
if let Err(error) = Self::handle(io_stream, io_runtime, query_handler) {
if let Err(error) = Self::handle(io_stream, io_runtime, query_handler).await
{
error!(error; "Unexpected error when handling TcpStream");
};
}
@@ -67,15 +72,30 @@ impl MysqlServer {
})
}
pub fn handle(
async fn handle(
stream: TcpStream,
io_runtime: Arc<Runtime>,
query_handler: SqlQueryHandlerRef,
) -> Result<()> {
info!("MySQL connection coming from: {}", stream.peer_addr()?);
let shim = MysqlInstanceShim::create(query_handler, stream.peer_addr()?.to_string());
// TODO(LFC): Relate "handler" with MySQL session; also deal with panics there.
let _handler = io_runtime.spawn(AsyncMysqlIntermediary::run_on(shim, stream));
let (r, w) = stream.into_split();
let w = BufWriter::with_capacity(DEFAULT_RESULT_SET_WRITE_BUFFER_SIZE, w);
// TODO(LFC): Use `output_stream` to write large MySQL ResultSet to client.
let spawn_result = io_runtime
.spawn(AsyncMysqlIntermediary::run_on(shim, r, w))
.await;
match spawn_result {
Ok(run_result) => {
if let Err(e) = run_result {
// TODO(LFC): Write this error and the below one to client as well, in MySQL text protocol.
// Looks like we have to expose opensrv-mysql's `PacketWriter`?
error!(e; "Internal error occurred during query exec, server actively close the channel to let client try next time.")
}
}
Err(e) => error!("IO runtime cannot execute task, error: {}", e),
}
Ok(())
}
}

View File

@@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io;
use std::ops::Deref;
use common_query::Output;
use common_recordbatch::{util, RecordBatch};
use common_telemetry::error;
use common_time::datetime::DateTime;
use common_time::timestamp::TimeUnit;
use datatypes::prelude::{ConcreteDataType, Value};
@@ -25,6 +25,7 @@ use opensrv_mysql::{
Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
};
use snafu::prelude::*;
use tokio::io::AsyncWrite;
use crate::error::{self, Error, Result};
@@ -33,18 +34,18 @@ struct QueryResult {
schema: SchemaRef,
}
pub struct MysqlResultWriter<'a, W: io::Write> {
pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> {
// `QueryResultWriter` will be consumed when the write completed (see
// QueryResultWriter::completed), thus we use an option to wrap it.
inner: Option<QueryResultWriter<'a, W>>,
}
impl<'a, W: io::Write> MysqlResultWriter<'a, W> {
impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
pub fn new(inner: QueryResultWriter<'a, W>) -> MysqlResultWriter<'a, W> {
MysqlResultWriter::<'a, W> { inner: Some(inner) }
}
pub async fn write(&mut self, output: Result<Output>) -> Result<()> {
pub async fn write(&mut self, query: &str, output: Result<Output>) -> Result<()> {
let writer = self.inner.take().context(error::InternalSnafu {
err_msg: "inner MySQL writer is consumed",
})?;
@@ -59,48 +60,53 @@ impl<'a, W: io::Write> MysqlResultWriter<'a, W> {
recordbatches,
schema,
};
Self::write_query_result(query_result, writer)?
Self::write_query_result(query, query_result, writer).await?
}
Output::RecordBatches(recordbatches) => {
let query_result = QueryResult {
schema: recordbatches.schema(),
recordbatches: recordbatches.take(),
};
Self::write_query_result(query_result, writer)?
Self::write_query_result(query, query_result, writer).await?
}
Output::AffectedRows(rows) => Self::write_affected_rows(writer, rows)?,
Output::AffectedRows(rows) => Self::write_affected_rows(writer, rows).await?,
},
Err(error) => Self::write_query_error(error, writer)?,
Err(error) => Self::write_query_error(query, error, writer).await?,
}
Ok(())
}
fn write_affected_rows(writer: QueryResultWriter<W>, rows: usize) -> Result<()> {
writer.completed(OkResponse {
async fn write_affected_rows(w: QueryResultWriter<'a, W>, rows: usize) -> Result<()> {
w.completed(OkResponse {
affected_rows: rows as u64,
..Default::default()
})?;
})
.await?;
Ok(())
}
fn write_query_result(
async fn write_query_result(
query: &str,
query_result: QueryResult,
writer: QueryResultWriter<'a, W>,
) -> Result<()> {
match create_mysql_column_def(&query_result.schema) {
Ok(column_def) => {
let mut row_writer = writer.start(&column_def)?;
let mut row_writer = writer.start(&column_def).await?;
for recordbatch in &query_result.recordbatches {
Self::write_recordbatch(&mut row_writer, recordbatch)?;
Self::write_recordbatch(&mut row_writer, recordbatch).await?;
}
row_writer.finish()?;
row_writer.finish().await?;
Ok(())
}
Err(error) => Self::write_query_error(error, writer),
Err(error) => Self::write_query_error(query, error, writer).await,
}
}
fn write_recordbatch(row_writer: &mut RowWriter<W>, recordbatch: &RecordBatch) -> Result<()> {
async fn write_recordbatch(
row_writer: &mut RowWriter<'_, W>,
recordbatch: &RecordBatch,
) -> Result<()> {
for row in recordbatch.rows() {
let row = row.context(error::CollectRecordbatchSnafu)?;
for value in row.into_iter() {
@@ -133,13 +139,20 @@ impl<'a, W: io::Write> MysqlResultWriter<'a, W> {
}
}
}
row_writer.end_row()?;
row_writer.end_row().await?;
}
Ok(())
}
fn write_query_error(error: Error, writer: QueryResultWriter<'a, W>) -> Result<()> {
writer.error(ErrorKind::ER_INTERNAL_ERROR, error.to_string().as_bytes())?;
async fn write_query_error(
query: &str,
error: Error,
w: QueryResultWriter<'a, W>,
) -> Result<()> {
error!(error; "Failed to execute query '{}'", query);
let kind = ErrorKind::ER_INTERNAL_ERROR;
w.error(kind, error.to_string().as_bytes()).await?;
Ok(())
}
}

View File

@@ -184,6 +184,7 @@ async fn test_query_concurrently() -> Result<()> {
let should_recreate_conn = expected == 1;
if should_recreate_conn {
connection.disconnect().await.unwrap();
connection = create_connection(server_port, index % 2 == 0)
.await
.unwrap();

View File

@@ -17,6 +17,7 @@ use std::collections::HashMap;
use datatypes::prelude::VectorRef;
use datatypes::schema::{ColumnSchema, SchemaRef};
use store_api::storage::RegionNumber;
use crate::metadata::TableId;
@@ -56,6 +57,7 @@ pub struct OpenTableRequest {
pub schema_name: String,
pub table_name: String,
pub table_id: TableId,
pub region_numbers: Vec<RegionNumber>,
}
/// Alter table request