From 43afea1a9d8392de8d0b361a0427879739933744 Mon Sep 17 00:00:00 2001 From: Jeremyhi Date: Fri, 31 May 2024 14:41:53 +0800 Subject: [PATCH 1/3] refactor!: remove the tableid in ddl response since tableids is enough (#4080) * refactor: remove the tableid in ddl response since tableids is enough * chore: upgrade proto --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/common/meta/src/ddl_manager.rs | 7 ++----- src/common/meta/src/rpc/ddl.rs | 10 +--------- src/operator/src/statement/ddl.rs | 20 ++++++++++++++------ 5 files changed, 19 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4af34e8623..007f777441 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4188,7 +4188,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/killme2008/greptime-proto.git?rev=a15a54a714fe117d7e9f7635e149c4eecac773fa#a15a54a714fe117d7e9f7635e149c4eecac773fa" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ae26136accd82fbdf8be540cd502f2e94951077e#ae26136accd82fbdf8be540cd502f2e94951077e" dependencies = [ "prost 0.12.4", "serde", diff --git a/Cargo.toml b/Cargo.toml index 7d03eb0f43..734d2438b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/killme2008/greptime-proto.git", rev = "a15a54a714fe117d7e9f7635e149c4eecac773fa" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ae26136accd82fbdf8be540cd502f2e94951077e" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 03cd00d13f..ba6199c738 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -489,8 +489,7 @@ async fn handle_create_table_task( Ok(SubmitDdlTaskResponse { key: procedure_id.into(), - table_id: Some(table_id), - ..Default::default() + table_ids: vec![table_id], }) } @@ -534,7 +533,6 @@ async fn handle_create_logical_table_tasks( Ok(SubmitDdlTaskResponse { key: procedure_id.into(), table_ids, - ..Default::default() }) } @@ -690,8 +688,7 @@ async fn handle_create_view_task( Ok(SubmitDdlTaskResponse { key: procedure_id.into(), - table_id: Some(view_id), - ..Default::default() + table_ids: vec![view_id], }) } diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index e6140cdaef..0389be9708 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -274,10 +274,7 @@ impl TryFrom for PbDdlTaskRequest { #[derive(Debug, Default)] pub struct SubmitDdlTaskResponse { pub key: Vec, - // For create physical table - // TODO(jeremy): remove it? - pub table_id: Option, - // For create multi logical tables + // `table_id`s for `CREATE TABLE` or `CREATE LOGICAL TABLES` task. pub table_ids: Vec, } @@ -285,11 +282,9 @@ impl TryFrom for SubmitDdlTaskResponse { type Error = error::Error; fn try_from(resp: PbDdlTaskResponse) -> Result { - let table_id = resp.table_id.map(|t| t.id); let table_ids = resp.table_ids.into_iter().map(|t| t.id).collect(); Ok(Self { key: resp.pid.map(|pid| pid.key).unwrap_or_default(), - table_id, table_ids, }) } @@ -299,9 +294,6 @@ impl From for PbDdlTaskResponse { fn from(val: SubmitDdlTaskResponse) -> Self { Self { pid: Some(ProcedureId { key: val.key }), - table_id: val - .table_id - .map(|table_id| api::v1::TableId { id: table_id }), table_ids: val .table_ids .into_iter() diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 2cfe71fd0d..2cce8f38d7 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -238,9 +238,13 @@ impl StatementExecutor { ) .await?; - let table_id = resp.table_id.context(error::UnexpectedSnafu { - violated: "expected table_id", - })?; + let table_id = resp + .table_ids + .into_iter() + .next() + .context(error::UnexpectedSnafu { + violated: "expected table_id", + })?; info!("Successfully created table '{table_name}' with table id {table_id}"); table_info.ident.table_id = table_id; @@ -531,9 +535,13 @@ impl StatementExecutor { resp ); - let view_id = resp.table_id.context(error::UnexpectedSnafu { - violated: "expected table_id", - })?; + let view_id = resp + .table_ids + .into_iter() + .next() + .context(error::UnexpectedSnafu { + violated: "expected table_id", + })?; info!("Successfully created view '{view_name}' with view id {view_id}"); // Invalidates local cache ASAP. From ea49f8a5c43a138fe84207bcf2a0aadd7cfe4da7 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Fri, 31 May 2024 14:50:22 +0800 Subject: [PATCH 2/3] feat(flow): make write path faster with shared lock (#4073) * feat(WIP): make write faster * feat: read lock on fast path * chore: per review --- src/flow/src/adapter.rs | 20 +++++------ src/flow/src/adapter/flownode_impl.rs | 2 +- src/flow/src/adapter/node_context.rs | 48 ++++++++++++++++----------- 3 files changed, 40 insertions(+), 30 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 52209a172f..2e66f3850b 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -159,7 +159,7 @@ pub struct FlownodeManager { table_info_source: TableSource, frontend_invoker: RwLock>>, /// contains mapping from table name to global id, and table schema - node_context: Mutex, + node_context: RwLock, flow_err_collectors: RwLock>, src_send_buf_lens: RwLock>>, tick_manager: FlowTickManager, @@ -194,7 +194,7 @@ impl FlownodeManager { query_engine, table_info_source: srv_map, frontend_invoker: RwLock::new(None), - node_context: Mutex::new(node_context), + node_context: RwLock::new(node_context), flow_err_collectors: Default::default(), src_send_buf_lens: Default::default(), tick_manager, @@ -298,7 +298,7 @@ impl FlownodeManager { } else { // TODO(discord9): condiser remove buggy auto create by schema - let node_ctx = self.node_context.lock().await; + let node_ctx = self.node_context.read().await; let gid: GlobalId = node_ctx .table_repr .get_by_name(&table_name) @@ -462,7 +462,7 @@ impl FlownodeManager { let mut output = BTreeMap::new(); for (name, sink_recv) in self .node_context - .lock() + .write() .await .sink_receiver .iter_mut() @@ -542,11 +542,11 @@ impl FlownodeManager { } // first check how many inputs were sent let (flush_res, buf_len) = if blocking { - let mut ctx = self.node_context.lock().await; - (ctx.flush_all_sender(), ctx.get_send_buf_size()) + let ctx = self.node_context.read().await; + (ctx.flush_all_sender().await, ctx.get_send_buf_size().await) } else { - match self.node_context.try_lock() { - Ok(mut ctx) => (ctx.flush_all_sender(), ctx.get_send_buf_size()), + match self.node_context.try_read() { + Ok(ctx) => (ctx.flush_all_sender().await, ctx.get_send_buf_size().await), Err(_) => return Ok(()), } }; @@ -580,7 +580,7 @@ impl FlownodeManager { rows.len() ); let table_id = region_id.table_id(); - self.node_context.lock().await.send(table_id, rows)?; + self.node_context.read().await.send(table_id, rows).await?; // TODO(discord9): put it in a background task? // self.run_available(false).await?; Ok(()) @@ -628,7 +628,7 @@ impl FlownodeManager { } } - let mut node_ctx = self.node_context.lock().await; + let mut node_ctx = self.node_context.write().await; // assign global id to source and sink table for source in source_table_ids { node_ctx diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 152251975a..11b2f6d04f 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -116,7 +116,7 @@ impl Flownode for FlownodeManager { let now = self.tick_manager.tick(); let fetch_order = { - let ctx = self.node_context.lock().await; + let ctx = self.node_context.read().await; let table_col_names = ctx .table_repr .get_by_table_id(&table_id) diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index ffaa3cc702..fdcc150697 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -21,7 +21,7 @@ use common_telemetry::debug; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{broadcast, mpsc, RwLock}; use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu}; use crate::adapter::{FlowId, TableName, TableSource}; @@ -65,7 +65,7 @@ pub struct FlownodeContext { #[derive(Debug)] pub struct SourceSender { sender: broadcast::Sender, - send_buf: VecDeque, + send_buf: RwLock>, } impl Default for SourceSender { @@ -78,6 +78,7 @@ impl Default for SourceSender { } } +// TODO: make all send operation immut impl SourceSender { pub fn get_receiver(&self) -> broadcast::Receiver { self.sender.subscribe() @@ -85,15 +86,16 @@ impl SourceSender { /// send as many as possible rows from send buf /// until send buf is empty or broadchannel is full - pub fn try_send_all(&mut self) -> Result { + pub async fn try_send_all(&self) -> Result { let mut row_cnt = 0; loop { + let mut send_buf = self.send_buf.write().await; // if inner sender channel is empty or send buf is empty, there // is nothing to do for now, just break - if self.sender.len() >= BROADCAST_CAP || self.send_buf.is_empty() { + if self.sender.len() >= BROADCAST_CAP || send_buf.is_empty() { break; } - if let Some(row) = self.send_buf.pop_front() { + if let Some(row) = send_buf.pop_front() { self.sender .send(row) .map_err(|err| { @@ -108,17 +110,20 @@ impl SourceSender { } if row_cnt > 0 { debug!("Send {} rows", row_cnt); - debug!("Remaining Send buf.len() = {}", self.send_buf.len()); + debug!( + "Remaining Send buf.len() = {}", + self.send_buf.read().await.len() + ); } Ok(row_cnt) } /// return number of rows it actual send(including what's in the buffer) - pub fn send_rows(&mut self, rows: Vec) -> Result { - self.send_buf.extend(rows); + pub async fn send_rows(&self, rows: Vec) -> Result { + self.send_buf.write().await.extend(rows); - let row_cnt = self.try_send_all()?; + let row_cnt = self.try_send_all().await?; Ok(row_cnt) } @@ -128,30 +133,35 @@ impl FlownodeContext { /// return number of rows it actual send(including what's in the buffer) /// /// TODO(discord9): make this concurrent - pub fn send(&mut self, table_id: TableId, rows: Vec) -> Result { + pub async fn send(&self, table_id: TableId, rows: Vec) -> Result { let sender = self .source_sender - .get_mut(&table_id) + .get(&table_id) .with_context(|| TableNotFoundSnafu { name: table_id.to_string(), })?; // debug!("FlownodeContext::send: trying to send {} rows", rows.len()); - sender.send_rows(rows) + sender.send_rows(rows).await } /// flush all sender's buf /// /// return numbers being sent - pub fn flush_all_sender(&mut self) -> Result { - self.source_sender - .iter_mut() - .map(|(_table_id, src_sender)| src_sender.try_send_all()) - .try_fold(0, |acc, x| x.map(|x| x + acc)) + pub async fn flush_all_sender(&self) -> Result { + let mut sum = 0; + for sender in self.source_sender.values() { + sender.try_send_all().await.inspect(|x| sum += x)?; + } + Ok(sum) } /// Return the sum number of rows in all send buf - pub fn get_send_buf_size(&self) -> usize { - self.source_sender.values().map(|v| v.send_buf.len()).sum() + pub async fn get_send_buf_size(&self) -> usize { + let mut sum = 0; + for sender in self.source_sender.values() { + sum += sender.send_buf.read().await.len(); + } + sum } } From 45fee948e9cb02ecbd9f224385d16f87e7c1860d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 31 May 2024 17:25:14 +0800 Subject: [PATCH 3/3] fix: display error in correct format (#4082) * fix: display error in correct format Signed-off-by: Ruihang Xia * add address to RegionServer error Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/client/src/client.rs | 6 +++--- src/client/src/error.rs | 3 ++- src/client/src/region.rs | 3 ++- src/datanode/src/event_listener.rs | 10 ++-------- src/file-engine/src/engine.rs | 15 +++++++++------ src/flow/src/adapter/worker.rs | 4 ++-- .../inverted_index/create/sort/external_sort.rs | 2 +- src/operator/src/insert.rs | 7 ++++--- src/servers/src/export_metrics.rs | 4 ++-- 9 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/client/src/client.rs b/src/client/src/client.rs index df8943ad15..e9dc07b230 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -173,14 +173,14 @@ impl Client { Ok(FlightClient { addr, client }) } - pub(crate) fn raw_region_client(&self) -> Result> { - let (_, channel) = self.find_channel()?; + pub(crate) fn raw_region_client(&self) -> Result<(String, PbRegionClient)> { + let (addr, channel) = self.find_channel()?; let client = PbRegionClient::new(channel) .max_decoding_message_size(self.max_grpc_recv_message_size()) .max_encoding_message_size(self.max_grpc_send_message_size()) .accept_compressed(CompressionEncoding::Zstd) .send_compressed(CompressionEncoding::Zstd); - Ok(client) + Ok((addr, client)) } pub fn make_prometheus_gateway_client(&self) -> Result> { diff --git a/src/client/src/error.rs b/src/client/src/error.rs index e265662e9f..2e3e78d19b 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -89,8 +89,9 @@ pub enum Error { source: common_grpc::error::Error, }, - #[snafu(display("Failed to request RegionServer, code: {}", code))] + #[snafu(display("Failed to request RegionServer {}, code: {}", addr, code))] RegionServer { + addr: String, code: Code, source: BoxedError, #[snafu(implicit)] diff --git a/src/client/src/region.rs b/src/client/src/region.rs index f00bb4265c..51517b5af1 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -177,7 +177,7 @@ impl RegionRequester { .with_label_values(&[request_type.as_str()]) .start_timer(); - let mut client = self.client.raw_region_client()?; + let (addr, mut client) = self.client.raw_region_client()?; let response = client .handle(request) @@ -187,6 +187,7 @@ impl RegionRequester { let err: error::Error = e.into(); // Uses `Error::RegionServer` instead of `Error::Server` error::Error::RegionServer { + addr, code, source: BoxedError::new(err), location: location!(), diff --git a/src/datanode/src/event_listener.rs b/src/datanode/src/event_listener.rs index 1c48f98d6c..5402dcb2b5 100644 --- a/src/datanode/src/event_listener.rs +++ b/src/datanode/src/event_listener.rs @@ -41,19 +41,13 @@ pub struct RegionServerEventSender(pub(crate) UnboundedSender impl RegionServerEventListener for RegionServerEventSender { fn on_region_registered(&self, region_id: RegionId) { if let Err(e) = self.0.send(RegionServerEvent::Registered(region_id)) { - error!( - "Failed to send registering region: {region_id} event, source: {}", - e - ); + error!(e; "Failed to send registering region: {region_id} event"); } } fn on_region_deregistered(&self, region_id: RegionId) { if let Err(e) = self.0.send(RegionServerEvent::Deregistered(region_id)) { - error!( - "Failed to send deregistering region: {region_id} event, source: {}", - e - ); + error!(e; "Failed to send deregistering region: {region_id} event"); } } } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index f71622178d..4ecfba9019 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -229,8 +229,9 @@ impl EngineInner { let res = FileRegion::create(region_id, request, &self.object_store).await; let region = res.inspect_err(|err| { error!( - "Failed to create region, region_id: {}, err: {}", - region_id, err + err; + "Failed to create region, region_id: {}", + region_id ); })?; self.regions.write().unwrap().insert(region_id, region); @@ -259,8 +260,9 @@ impl EngineInner { let res = FileRegion::open(region_id, request, &self.object_store).await; let region = res.inspect_err(|err| { error!( - "Failed to open region, region_id: {}, err: {}", - region_id, err + err; + "Failed to open region, region_id: {}", + region_id ); })?; self.regions.write().unwrap().insert(region_id, region); @@ -302,8 +304,9 @@ impl EngineInner { let res = FileRegion::drop(®ion, &self.object_store).await; res.inspect_err(|err| { error!( - "Failed to drop region, region_id: {}, err: {}", - region_id, err + err; + "Failed to drop region, region_id: {}", + region_id ); })?; } diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 9df68c6e5d..0f948c48b5 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -285,8 +285,8 @@ impl<'s> Worker<'s> { Ok(Some((id, resp))) => { if let Err(err) = self.itc_server.blocking_lock().resp(id, resp) { common_telemetry::error!( - "Worker's itc server has been closed unexpectedly, shutting down worker: {}", - err + err; + "Worker's itc server has been closed unexpectedly, shutting down worker" ); break; }; diff --git a/src/index/src/inverted_index/create/sort/external_sort.rs b/src/index/src/inverted_index/create/sort/external_sort.rs index f43e3df204..17afd7ced3 100644 --- a/src/index/src/inverted_index/create/sort/external_sort.rs +++ b/src/index/src/inverted_index/create/sort/external_sort.rs @@ -256,7 +256,7 @@ impl ExternalSorter { IntermediateWriter::new(writer).write_all(values, bitmap_leading_zeros as _).await.inspect(|_| debug!("Dumped {entries} entries ({memory_usage} bytes) to intermediate file {file_id} for index {index_name}") ).inspect_err(|e| - error!("Failed to dump {entries} entries to intermediate file {file_id} for index {index_name}. Error: {e}") + error!(e; "Failed to dump {entries} entries to intermediate file {file_id} for index {index_name}") ) } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index b54efc06da..5cd6846c0e 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -501,7 +501,7 @@ impl Inserter { Ok(()) } Err(err) => { - error!("Failed to create table {table_reference}: {err}",); + error!(err; "Failed to create table {table_reference}"); Err(err) } } @@ -634,8 +634,9 @@ impl Inserter { }) .collect::>(); error!( - "Failed to create logical tables {:?}: {}", - failed_tables, err + err; + "Failed to create logical tables {:?}", + failed_tables ); Err(err) } diff --git a/src/servers/src/export_metrics.rs b/src/servers/src/export_metrics.rs index 3f595fc943..d71dbcaa88 100644 --- a/src/servers/src/export_metrics.rs +++ b/src/servers/src/export_metrics.rs @@ -226,7 +226,7 @@ pub async fn write_system_metric_by_network( error!("report export metrics error, msg: {:#?}", resp); } } - Err(e) => error!("report export metrics failed, error {}", e), + Err(e) => error!(e; "report export metrics failed"), }; } } @@ -265,7 +265,7 @@ pub async fn write_system_metric_by_handler( }; if let Err(e) = handler.write(requests, ctx.clone(), false).await { - error!("report export metrics by handler failed, error {}", e); + error!(e; "report export metrics by handler failed"); } else { crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); }