feat: postgre wire protocol for frontend (#269)

This commit is contained in:
Ning Sun
2022-09-19 15:39:53 +08:00
committed by GitHub
parent e697ba975b
commit 8a400669aa
5 changed files with 76 additions and 2 deletions

View File

@@ -38,6 +38,9 @@ struct StartCommand {
grpc_addr: Option<String>,
#[clap(long)]
mysql_addr: Option<String>,
#[cfg(feature = "postgres")]
#[clap(long)]
postgres_addr: Option<String>,
#[clap(short, long)]
config_file: Option<String>,
}
@@ -69,6 +72,10 @@ impl TryFrom<StartCommand> for FrontendOptions {
if let Some(addr) = cmd.mysql_addr {
opts.mysql_addr = Some(addr);
}
#[cfg(feature = "postgres")]
if let Some(addr) = cmd.postgres_addr {
opts.postgres_addr = Some(addr);
}
Ok(opts)
}
}
@@ -83,15 +90,24 @@ mod tests {
http_addr: Some("127.0.0.1:1234".to_string()),
grpc_addr: None,
mysql_addr: Some("127.0.0.1:5678".to_string()),
#[cfg(feature = "postgres")]
postgres_addr: Some("127.0.0.1:5432".to_string()),
config_file: None,
};
let opts: FrontendOptions = command.try_into().unwrap();
assert_eq!(opts.http_addr, Some("127.0.0.1:1234".to_string()));
assert_eq!(opts.mysql_addr, Some("127.0.0.1:5678".to_string()));
#[cfg(feature = "postgres")]
assert_eq!(opts.postgres_addr, Some("127.0.0.1:5432".to_string()));
let default_opts = FrontendOptions::default();
assert_eq!(opts.grpc_addr, default_opts.grpc_addr);
assert_eq!(opts.mysql_runtime_size, default_opts.mysql_runtime_size);
#[cfg(feature = "postgres")]
assert_eq!(
opts.postgres_runtime_size,
default_opts.postgres_runtime_size
);
}
}

View File

@@ -80,7 +80,9 @@ pub fn arrow_array_get(array: &dyn Array, idx: usize) -> Result<Value> {
mod test {
use arrow::array::Int64Array as ArrowI64Array;
use arrow::array::*;
use common_time::timestamp::TimeUnit;
use arrow::buffer::Buffer;
use arrow::datatypes::{DataType, TimeUnit as ArrowTimeUnit};
use common_time::timestamp::{TimeUnit, Timestamp};
use super::*;
use crate::prelude::Vector;
@@ -142,5 +144,25 @@ mod test {
value,
Value::Timestamp(Timestamp::new(2, TimeUnit::Millisecond))
);
let array4 = PrimitiveArray::<i64>::from_data(
DataType::Timestamp(ArrowTimeUnit::Millisecond, None),
Buffer::from_slice(&vec![1, 2, 3, 4]),
None,
);
assert_eq!(
Value::Timestamp(Timestamp::new(1, TimeUnit::Millisecond)),
arrow_array_get(&array4, 0).unwrap()
);
let array4 = PrimitiveArray::<i64>::from_data(
DataType::Timestamp(ArrowTimeUnit::Nanosecond, None),
Buffer::from_slice(&vec![1, 2, 3, 4]),
None,
);
assert_eq!(
Value::Timestamp(Timestamp::new(1, TimeUnit::Nanosecond)),
arrow_array_get(&array4, 0).unwrap()
);
}
}

View File

@@ -37,3 +37,7 @@ futures = "0.3"
tempdir = "0.3"
tonic = "0.8"
tower = "0.4"
[features]
default = ["postgres"]
postgres = ["servers/postgres"]

View File

@@ -13,6 +13,10 @@ pub struct FrontendOptions {
pub grpc_addr: Option<String>,
pub mysql_addr: Option<String>,
pub mysql_runtime_size: u32,
#[cfg(feature = "postgres")]
pub postgres_addr: Option<String>,
#[cfg(feature = "postgres")]
pub postgres_runtime_size: u32,
}
impl Default for FrontendOptions {
@@ -22,6 +26,10 @@ impl Default for FrontendOptions {
grpc_addr: Some("0.0.0.0:4001".to_string()),
mysql_addr: Some("0.0.0.0:4002".to_string()),
mysql_runtime_size: 2,
#[cfg(feature = "postgres")]
postgres_addr: Some("0.0.0.0:4003".to_string()),
#[cfg(feature = "postgres")]
postgres_runtime_size: 2,
}
}
}

View File

@@ -5,6 +5,8 @@ use common_runtime::Builder as RuntimeBuilder;
use servers::grpc::GrpcServer;
use servers::http::HttpServer;
use servers::mysql::server::MysqlServer;
#[cfg(feature = "postgres")]
use servers::postgres::PostgresServer;
use servers::server::Server;
use snafu::ResultExt;
use tokio::try_join;
@@ -55,10 +57,32 @@ impl Services {
None
};
#[cfg(feature = "postgres")]
let postgres_server_and_addr = if let Some(pg_addr) = &opts.postgres_addr {
let pg_addr = parse_addr(pg_addr)?;
let pg_io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.postgres_runtime_size as usize)
.thread_name("pg-io-handlers")
.build()
.context(error::RuntimeResourceSnafu)?,
);
let pg_server =
Box::new(PostgresServer::new(instance.clone(), pg_io_runtime)) as Box<dyn Server>;
Some((pg_server, pg_addr))
} else {
None
};
try_join!(
start_server(http_server_and_addr),
start_server(grpc_server_and_addr),
start_server(mysql_server_and_addr)
start_server(mysql_server_and_addr),
#[cfg(feature = "postgres")]
start_server(postgres_server_and_addr),
)
.context(error::StartServerSnafu)?;
Ok(())