feat: add extension field to HeartbeatRequest (#4688)

* feat: add extension field to HeartbeatRequest

* chore: extension to extensions

* chore: upgrade proto
This commit is contained in:
jeremyhi
2024-09-06 16:29:20 +08:00
committed by GitHub
parent 67d95d2088
commit e88465840d
14 changed files with 26 additions and 16 deletions

2
Cargo.lock generated
View File

@@ -4300,7 +4300,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=c437b55725b7f5224fe9d46db21072b4a682ee4b#c437b55725b7f5224fe9d46db21072b4a682ee4b"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=157cfdb52709e489cf1f3ce8e3042ed4ee8a524a#157cfdb52709e489cf1f3ce8e3042ed4ee8a524a"
dependencies = [
"prost 0.12.6",
"serde",

View File

@@ -120,7 +120,7 @@ etcd-client = { version = "0.13" }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c437b55725b7f5224fe9d46db21072b4a682ee4b" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "157cfdb52709e489cf1f3ce8e3042ed4ee8a524a" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"

View File

@@ -21,14 +21,14 @@ use greptime_proto::v1::region::RegionResponse as RegionResponseV1;
#[derive(Debug)]
pub struct RegionResponse {
pub affected_rows: AffectedRows,
pub extension: HashMap<String, Vec<u8>>,
pub extensions: HashMap<String, Vec<u8>>,
}
impl RegionResponse {
pub fn from_region_response(region_response: RegionResponseV1) -> Self {
Self {
affected_rows: region_response.affected_rows as _,
extension: region_response.extension,
extensions: region_response.extensions,
}
}
@@ -36,7 +36,7 @@ impl RegionResponse {
pub fn new(affected_rows: AffectedRows) -> Self {
Self {
affected_rows,
extension: Default::default(),
extensions: Default::default(),
}
}
}

View File

@@ -131,7 +131,7 @@ impl AlterLogicalTablesProcedure {
let phy_raw_schemas = future::join_all(alter_region_tasks)
.await
.into_iter()
.map(|res| res.map(|mut res| res.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.map(|res| res.map(|mut res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.collect::<Result<Vec<_>>>()?;
if phy_raw_schemas.is_empty() {

View File

@@ -157,7 +157,7 @@ impl CreateLogicalTablesProcedure {
let phy_raw_schemas = join_all(create_region_tasks)
.await
.into_iter()
.map(|res| res.map(|mut res| res.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.map(|res| res.map(|mut res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.collect::<Result<Vec<_>>>()?;
if phy_raw_schemas.is_empty() {

View File

@@ -324,10 +324,12 @@ impl HeartbeatTask {
region_id: stat.region_id.as_u64(),
engine: stat.engine,
role: RegionRole::from(stat.role).into(),
// TODO(jeremy): w/rcus
// TODO(weny): w/rcus
rcus: 0,
wcus: 0,
approximate_bytes: region_server.region_disk_usage(stat.region_id).unwrap_or(0),
// TODO(weny): add extensions
extensions: Default::default(),
})
.collect()
}

View File

@@ -366,10 +366,10 @@ impl RegionServerHandler for RegionServer {
// merge results by sum up affected rows and merge extensions.
let mut affected_rows = 0;
let mut extension = HashMap::new();
let mut extensions = HashMap::new();
for result in results {
affected_rows += result.affected_rows;
extension.extend(result.extension);
extensions.extend(result.extensions);
}
Ok(RegionResponseV1 {
@@ -380,7 +380,7 @@ impl RegionServerHandler for RegionServer {
}),
}),
affected_rows: affected_rows as _,
extension,
extensions,
})
}
}
@@ -708,7 +708,7 @@ impl RegionServerInner {
.await?;
Ok(RegionResponse {
affected_rows: result.affected_rows,
extension: result.extension,
extensions: result.extensions,
})
}
Err(err) => {

View File

@@ -93,6 +93,7 @@ mod tests {
approximate_bytes: 0,
engine: default_engine().to_string(),
role: RegionRole::Follower,
extensions: Default::default(),
}
}
acc.stat = Some(Stat {

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use api::v1::meta::HeartbeatRequest;
use common_meta::ClusterId;
@@ -57,6 +57,8 @@ pub struct RegionStat {
pub engine: String,
/// The region role.
pub role: RegionRole,
/// The extension info of this region
pub extensions: HashMap<String, Vec<u8>>,
}
impl Stat {
@@ -142,6 +144,7 @@ impl TryFrom<api::v1::meta::RegionStat> for RegionStat {
approximate_bytes: value.approximate_bytes,
engine: value.engine.to_string(),
role: RegionRole::from(value.role()),
extensions: value.extensions,
})
}
}

View File

@@ -135,6 +135,7 @@ mod test {
wcus: 0,
approximate_bytes: 0,
engine: String::new(),
extensions: Default::default(),
}
}

View File

@@ -100,7 +100,7 @@ pub mod mock {
}),
}),
affected_rows: 0,
extension: Default::default(),
extensions: Default::default(),
})
}
}

View File

@@ -199,6 +199,7 @@ mod tests {
approximate_bytes: 1,
engine: "mito2".to_string(),
role: RegionRole::Leader,
extensions: Default::default(),
}],
..Default::default()
}
@@ -215,6 +216,7 @@ mod tests {
approximate_bytes: 1,
engine: "mito2".to_string(),
role: RegionRole::Leader,
extensions: Default::default(),
}],
..Default::default()
}
@@ -231,6 +233,7 @@ mod tests {
approximate_bytes: 1,
engine: "mito2".to_string(),
role: RegionRole::Leader,
extensions: Default::default(),
}],
..Default::default()
}

View File

@@ -162,7 +162,7 @@ impl RegionEngine for MetricEngine {
result.map_err(BoxedError::new).map(|rows| RegionResponse {
affected_rows: rows,
extension: extension_return_value,
extensions: extension_return_value,
})
}

View File

@@ -119,7 +119,7 @@ impl FlowServiceOperator {
if let Some(prev) = &mut final_result {
prev.affected_rows = res.affected_rows;
prev.affected_flows.extend(res.affected_flows);
prev.extension.extend(res.extension);
prev.extensions.extend(res.extensions);
} else {
final_result = Some(res);
}