Merge branch 'main' into avoid-query-meta

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-05-31 18:16:32 +08:00
17 changed files with 86 additions and 79 deletions

2
Cargo.lock generated
View File

@@ -4188,7 +4188,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/killme2008/greptime-proto.git?rev=a15a54a714fe117d7e9f7635e149c4eecac773fa#a15a54a714fe117d7e9f7635e149c4eecac773fa"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ae26136accd82fbdf8be540cd502f2e94951077e#ae26136accd82fbdf8be540cd502f2e94951077e"
dependencies = [
"prost 0.12.4",
"serde",

View File

@@ -120,7 +120,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/killme2008/greptime-proto.git", rev = "a15a54a714fe117d7e9f7635e149c4eecac773fa" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ae26136accd82fbdf8be540cd502f2e94951077e" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"

View File

@@ -173,14 +173,14 @@ impl Client {
Ok(FlightClient { addr, client })
}
pub(crate) fn raw_region_client(&self) -> Result<PbRegionClient<Channel>> {
let (_, channel) = self.find_channel()?;
pub(crate) fn raw_region_client(&self) -> Result<(String, PbRegionClient<Channel>)> {
let (addr, channel) = self.find_channel()?;
let client = PbRegionClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size())
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd);
Ok(client)
Ok((addr, client))
}
pub fn make_prometheus_gateway_client(&self) -> Result<PrometheusGatewayClient<Channel>> {

View File

@@ -89,8 +89,9 @@ pub enum Error {
source: common_grpc::error::Error,
},
#[snafu(display("Failed to request RegionServer, code: {}", code))]
#[snafu(display("Failed to request RegionServer {}, code: {}", addr, code))]
RegionServer {
addr: String,
code: Code,
source: BoxedError,
#[snafu(implicit)]

View File

@@ -177,7 +177,7 @@ impl RegionRequester {
.with_label_values(&[request_type.as_str()])
.start_timer();
let mut client = self.client.raw_region_client()?;
let (addr, mut client) = self.client.raw_region_client()?;
let response = client
.handle(request)
@@ -187,6 +187,7 @@ impl RegionRequester {
let err: error::Error = e.into();
// Uses `Error::RegionServer` instead of `Error::Server`
error::Error::RegionServer {
addr,
code,
source: BoxedError::new(err),
location: location!(),

View File

@@ -489,8 +489,7 @@ async fn handle_create_table_task(
Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
table_id: Some(table_id),
..Default::default()
table_ids: vec![table_id],
})
}
@@ -534,7 +533,6 @@ async fn handle_create_logical_table_tasks(
Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
table_ids,
..Default::default()
})
}
@@ -690,8 +688,7 @@ async fn handle_create_view_task(
Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
table_id: Some(view_id),
..Default::default()
table_ids: vec![view_id],
})
}

View File

@@ -274,10 +274,7 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
#[derive(Debug, Default)]
pub struct SubmitDdlTaskResponse {
pub key: Vec<u8>,
// For create physical table
// TODO(jeremy): remove it?
pub table_id: Option<TableId>,
// For create multi logical tables
// `table_id`s for `CREATE TABLE` or `CREATE LOGICAL TABLES` task.
pub table_ids: Vec<TableId>,
}
@@ -285,11 +282,9 @@ impl TryFrom<PbDdlTaskResponse> for SubmitDdlTaskResponse {
type Error = error::Error;
fn try_from(resp: PbDdlTaskResponse) -> Result<Self> {
let table_id = resp.table_id.map(|t| t.id);
let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect();
Ok(Self {
key: resp.pid.map(|pid| pid.key).unwrap_or_default(),
table_id,
table_ids,
})
}
@@ -299,9 +294,6 @@ impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
fn from(val: SubmitDdlTaskResponse) -> Self {
Self {
pid: Some(ProcedureId { key: val.key }),
table_id: val
.table_id
.map(|table_id| api::v1::TableId { id: table_id }),
table_ids: val
.table_ids
.into_iter()

View File

@@ -41,19 +41,13 @@ pub struct RegionServerEventSender(pub(crate) UnboundedSender<RegionServerEvent>
impl RegionServerEventListener for RegionServerEventSender {
fn on_region_registered(&self, region_id: RegionId) {
if let Err(e) = self.0.send(RegionServerEvent::Registered(region_id)) {
error!(
"Failed to send registering region: {region_id} event, source: {}",
e
);
error!(e; "Failed to send registering region: {region_id} event");
}
}
fn on_region_deregistered(&self, region_id: RegionId) {
if let Err(e) = self.0.send(RegionServerEvent::Deregistered(region_id)) {
error!(
"Failed to send deregistering region: {region_id} event, source: {}",
e
);
error!(e; "Failed to send deregistering region: {region_id} event");
}
}
}

View File

@@ -229,8 +229,9 @@ impl EngineInner {
let res = FileRegion::create(region_id, request, &self.object_store).await;
let region = res.inspect_err(|err| {
error!(
"Failed to create region, region_id: {}, err: {}",
region_id, err
err;
"Failed to create region, region_id: {}",
region_id
);
})?;
self.regions.write().unwrap().insert(region_id, region);
@@ -259,8 +260,9 @@ impl EngineInner {
let res = FileRegion::open(region_id, request, &self.object_store).await;
let region = res.inspect_err(|err| {
error!(
"Failed to open region, region_id: {}, err: {}",
region_id, err
err;
"Failed to open region, region_id: {}",
region_id
);
})?;
self.regions.write().unwrap().insert(region_id, region);
@@ -302,8 +304,9 @@ impl EngineInner {
let res = FileRegion::drop(&region, &self.object_store).await;
res.inspect_err(|err| {
error!(
"Failed to drop region, region_id: {}, err: {}",
region_id, err
err;
"Failed to drop region, region_id: {}",
region_id
);
})?;
}

View File

@@ -159,7 +159,7 @@ pub struct FlownodeManager {
table_info_source: TableSource,
frontend_invoker: RwLock<Option<Box<dyn FrontendInvoker + Send + Sync>>>,
/// contains mapping from table name to global id, and table schema
node_context: Mutex<FlownodeContext>,
node_context: RwLock<FlownodeContext>,
flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
tick_manager: FlowTickManager,
@@ -194,7 +194,7 @@ impl FlownodeManager {
query_engine,
table_info_source: srv_map,
frontend_invoker: RwLock::new(None),
node_context: Mutex::new(node_context),
node_context: RwLock::new(node_context),
flow_err_collectors: Default::default(),
src_send_buf_lens: Default::default(),
tick_manager,
@@ -298,7 +298,7 @@ impl FlownodeManager {
} else {
// TODO(discord9): condiser remove buggy auto create by schema
let node_ctx = self.node_context.lock().await;
let node_ctx = self.node_context.read().await;
let gid: GlobalId = node_ctx
.table_repr
.get_by_name(&table_name)
@@ -462,7 +462,7 @@ impl FlownodeManager {
let mut output = BTreeMap::new();
for (name, sink_recv) in self
.node_context
.lock()
.write()
.await
.sink_receiver
.iter_mut()
@@ -542,11 +542,11 @@ impl FlownodeManager {
}
// first check how many inputs were sent
let (flush_res, buf_len) = if blocking {
let mut ctx = self.node_context.lock().await;
(ctx.flush_all_sender(), ctx.get_send_buf_size())
let ctx = self.node_context.read().await;
(ctx.flush_all_sender().await, ctx.get_send_buf_size().await)
} else {
match self.node_context.try_lock() {
Ok(mut ctx) => (ctx.flush_all_sender(), ctx.get_send_buf_size()),
match self.node_context.try_read() {
Ok(ctx) => (ctx.flush_all_sender().await, ctx.get_send_buf_size().await),
Err(_) => return Ok(()),
}
};
@@ -580,7 +580,7 @@ impl FlownodeManager {
rows.len()
);
let table_id = region_id.table_id();
self.node_context.lock().await.send(table_id, rows)?;
self.node_context.read().await.send(table_id, rows).await?;
// TODO(discord9): put it in a background task?
// self.run_available(false).await?;
Ok(())
@@ -628,7 +628,7 @@ impl FlownodeManager {
}
}
let mut node_ctx = self.node_context.lock().await;
let mut node_ctx = self.node_context.write().await;
// assign global id to source and sink table
for source in source_table_ids {
node_ctx

View File

@@ -116,7 +116,7 @@ impl Flownode for FlownodeManager {
let now = self.tick_manager.tick();
let fetch_order = {
let ctx = self.node_context.lock().await;
let ctx = self.node_context.read().await;
let table_col_names = ctx
.table_repr
.get_by_table_id(&table_id)

View File

@@ -21,7 +21,7 @@ use common_telemetry::debug;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use tokio::sync::{broadcast, mpsc};
use tokio::sync::{broadcast, mpsc, RwLock};
use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::adapter::{FlowId, TableName, TableSource};
@@ -65,7 +65,7 @@ pub struct FlownodeContext {
#[derive(Debug)]
pub struct SourceSender {
sender: broadcast::Sender<DiffRow>,
send_buf: VecDeque<DiffRow>,
send_buf: RwLock<VecDeque<DiffRow>>,
}
impl Default for SourceSender {
@@ -78,6 +78,7 @@ impl Default for SourceSender {
}
}
// TODO: make all send operation immut
impl SourceSender {
pub fn get_receiver(&self) -> broadcast::Receiver<DiffRow> {
self.sender.subscribe()
@@ -85,15 +86,16 @@ impl SourceSender {
/// send as many as possible rows from send buf
/// until send buf is empty or broadchannel is full
pub fn try_send_all(&mut self) -> Result<usize, Error> {
pub async fn try_send_all(&self) -> Result<usize, Error> {
let mut row_cnt = 0;
loop {
let mut send_buf = self.send_buf.write().await;
// if inner sender channel is empty or send buf is empty, there
// is nothing to do for now, just break
if self.sender.len() >= BROADCAST_CAP || self.send_buf.is_empty() {
if self.sender.len() >= BROADCAST_CAP || send_buf.is_empty() {
break;
}
if let Some(row) = self.send_buf.pop_front() {
if let Some(row) = send_buf.pop_front() {
self.sender
.send(row)
.map_err(|err| {
@@ -108,17 +110,20 @@ impl SourceSender {
}
if row_cnt > 0 {
debug!("Send {} rows", row_cnt);
debug!("Remaining Send buf.len() = {}", self.send_buf.len());
debug!(
"Remaining Send buf.len() = {}",
self.send_buf.read().await.len()
);
}
Ok(row_cnt)
}
/// return number of rows it actual send(including what's in the buffer)
pub fn send_rows(&mut self, rows: Vec<DiffRow>) -> Result<usize, Error> {
self.send_buf.extend(rows);
pub async fn send_rows(&self, rows: Vec<DiffRow>) -> Result<usize, Error> {
self.send_buf.write().await.extend(rows);
let row_cnt = self.try_send_all()?;
let row_cnt = self.try_send_all().await?;
Ok(row_cnt)
}
@@ -128,30 +133,35 @@ impl FlownodeContext {
/// return number of rows it actual send(including what's in the buffer)
///
/// TODO(discord9): make this concurrent
pub fn send(&mut self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
pub async fn send(&self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
let sender = self
.source_sender
.get_mut(&table_id)
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;
// debug!("FlownodeContext::send: trying to send {} rows", rows.len());
sender.send_rows(rows)
sender.send_rows(rows).await
}
/// flush all sender's buf
///
/// return numbers being sent
pub fn flush_all_sender(&mut self) -> Result<usize, Error> {
self.source_sender
.iter_mut()
.map(|(_table_id, src_sender)| src_sender.try_send_all())
.try_fold(0, |acc, x| x.map(|x| x + acc))
pub async fn flush_all_sender(&self) -> Result<usize, Error> {
let mut sum = 0;
for sender in self.source_sender.values() {
sender.try_send_all().await.inspect(|x| sum += x)?;
}
Ok(sum)
}
/// Return the sum number of rows in all send buf
pub fn get_send_buf_size(&self) -> usize {
self.source_sender.values().map(|v| v.send_buf.len()).sum()
pub async fn get_send_buf_size(&self) -> usize {
let mut sum = 0;
for sender in self.source_sender.values() {
sum += sender.send_buf.read().await.len();
}
sum
}
}

View File

@@ -285,8 +285,8 @@ impl<'s> Worker<'s> {
Ok(Some((id, resp))) => {
if let Err(err) = self.itc_server.blocking_lock().resp(id, resp) {
common_telemetry::error!(
"Worker's itc server has been closed unexpectedly, shutting down worker: {}",
err
err;
"Worker's itc server has been closed unexpectedly, shutting down worker"
);
break;
};

View File

@@ -256,7 +256,7 @@ impl ExternalSorter {
IntermediateWriter::new(writer).write_all(values, bitmap_leading_zeros as _).await.inspect(|_|
debug!("Dumped {entries} entries ({memory_usage} bytes) to intermediate file {file_id} for index {index_name}")
).inspect_err(|e|
error!("Failed to dump {entries} entries to intermediate file {file_id} for index {index_name}. Error: {e}")
error!(e; "Failed to dump {entries} entries to intermediate file {file_id} for index {index_name}")
)
}

View File

@@ -501,7 +501,7 @@ impl Inserter {
Ok(())
}
Err(err) => {
error!("Failed to create table {table_reference}: {err}",);
error!(err; "Failed to create table {table_reference}");
Err(err)
}
}
@@ -634,8 +634,9 @@ impl Inserter {
})
.collect::<Vec<_>>();
error!(
"Failed to create logical tables {:?}: {}",
failed_tables, err
err;
"Failed to create logical tables {:?}",
failed_tables
);
Err(err)
}

View File

@@ -238,9 +238,13 @@ impl StatementExecutor {
)
.await?;
let table_id = resp.table_id.context(error::UnexpectedSnafu {
violated: "expected table_id",
})?;
let table_id = resp
.table_ids
.into_iter()
.next()
.context(error::UnexpectedSnafu {
violated: "expected table_id",
})?;
info!("Successfully created table '{table_name}' with table id {table_id}");
table_info.ident.table_id = table_id;
@@ -531,9 +535,13 @@ impl StatementExecutor {
resp
);
let view_id = resp.table_id.context(error::UnexpectedSnafu {
violated: "expected table_id",
})?;
let view_id = resp
.table_ids
.into_iter()
.next()
.context(error::UnexpectedSnafu {
violated: "expected table_id",
})?;
info!("Successfully created view '{view_name}' with view id {view_id}");
// Invalidates local cache ASAP.

View File

@@ -226,7 +226,7 @@ pub async fn write_system_metric_by_network(
error!("report export metrics error, msg: {:#?}", resp);
}
}
Err(e) => error!("report export metrics failed, error {}", e),
Err(e) => error!(e; "report export metrics failed"),
};
}
}
@@ -265,7 +265,7 @@ pub async fn write_system_metric_by_handler(
};
if let Err(e) = handler.write(requests, ctx.clone(), false).await {
error!("report export metrics by handler failed, error {}", e);
error!(e; "report export metrics by handler failed");
} else {
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
}