feat: add remote dyn filter region rpc scaffolding

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-02 16:25:33 +08:00
parent 727e681fd5
commit 550985cded
5 changed files with 294 additions and 13 deletions

16
Cargo.lock generated
View File

@@ -2139,7 +2139,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c"
dependencies = [
"lazy_static",
"windows-sys 0.59.0",
"windows-sys 0.48.0",
]
[[package]]
@@ -5721,7 +5721,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=092ba1d01e2da676dca66cca7eebb55009da8ef8#092ba1d01e2da676dca66cca7eebb55009da8ef8"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=9423b6ae25e8e64b1c57ef2594a6a7698efb3c5a#9423b6ae25e8e64b1c57ef2594a6a7698efb3c5a"
dependencies = [
"prost 0.14.1",
"prost-types 0.14.1",
@@ -6241,7 +6241,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
"socket2 0.6.0",
"socket2 0.5.10",
"tokio",
"tower-service",
"tracing",
@@ -7330,7 +7330,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667"
dependencies = [
"cfg-if",
"windows-targets 0.52.6",
"windows-targets 0.48.5",
]
[[package]]
@@ -10361,7 +10361,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1"
dependencies = [
"heck 0.5.0",
"itertools 0.14.0",
"itertools 0.10.5",
"log",
"multimap",
"once_cell",
@@ -10409,7 +10409,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools 0.14.0",
"itertools 0.10.5",
"proc-macro2",
"quote",
"syn 2.0.117",
@@ -10422,7 +10422,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425"
dependencies = [
"anyhow",
"itertools 0.14.0",
"itertools 0.10.5",
"proc-macro2",
"quote",
"syn 2.0.117",
@@ -15080,7 +15080,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.59.0",
"windows-sys 0.48.0",
]
[[package]]

View File

@@ -155,7 +155,7 @@ etcd-client = { version = "0.17", features = [
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "092ba1d01e2da676dca66cca7eebb55009da8ef8" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9423b6ae25e8e64b1c57ef2594a6a7698efb3c5a" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -16,7 +16,10 @@ use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::ResponseHeader;
use api::v1::region::RegionRequest;
use api::v1::region::{
RegionRequest, RegionRequestHeader, RemoteDynFilterRequest, RemoteDynFilterUnregister,
RemoteDynFilterUpdate, region_request, remote_dyn_filter_request,
};
use arc_swap::ArcSwapOption;
use arrow_flight::Ticket;
use async_stream::stream;
@@ -284,6 +287,48 @@ impl RegionRequester {
pub async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
self.handle_inner(request).await
}
pub async fn handle_remote_dyn_filter_update(
&self,
query_id: impl Into<String>,
update: RemoteDynFilterUpdate,
) -> Result<RegionResponse> {
self.handle_inner(build_remote_dyn_filter_request(
query_id.into(),
remote_dyn_filter_request::Action::Update(update),
))
.await
}
pub async fn handle_remote_dyn_filter_unregister(
&self,
query_id: impl Into<String>,
unregister: RemoteDynFilterUnregister,
) -> Result<RegionResponse> {
self.handle_inner(build_remote_dyn_filter_request(
query_id.into(),
remote_dyn_filter_request::Action::Unregister(unregister),
))
.await
}
}
fn build_remote_dyn_filter_request(
query_id: String,
action: remote_dyn_filter_request::Action,
) -> RegionRequest {
RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::RemoteDynFilter(
RemoteDynFilterRequest {
query_id,
action: Some(action),
},
)),
}
}
pub fn check_response_header(header: &Option<ResponseHeader>) -> Result<()> {
@@ -312,6 +357,7 @@ pub fn check_response_header(header: &Option<ResponseHeader>) -> Result<()> {
#[cfg(test)]
mod test {
use api::v1::Status as PbStatus;
use api::v1::region::{RemoteDynFilterUpdate, region_request, remote_dyn_filter_request};
use super::*;
use crate::Error::{IllegalDatabaseResponse, Server};
@@ -361,4 +407,30 @@ mod test {
assert_eq!(code, StatusCode::Internal);
assert_eq!(msg, "blabla");
}
#[test]
fn test_build_remote_dyn_filter_request_sets_header_and_body() {
let request = build_remote_dyn_filter_request(
"query-1".to_string(),
remote_dyn_filter_request::Action::Update(RemoteDynFilterUpdate {
filter_id: "filter-1".to_string(),
payload: vec![1, 2, 3],
generation: 7,
is_complete: false,
}),
);
request.header.expect("remote dyn filter header must exist");
let body = request.body.expect("remote dyn filter body must exist");
let region_request::Body::RemoteDynFilter(remote_request) = body else {
panic!("expected remote dyn filter request body");
};
assert_eq!(remote_request.query_id, "query-1");
assert!(matches!(
remote_request.action,
Some(remote_dyn_filter_request::Action::Update(_))
));
}
}

View File

@@ -25,7 +25,8 @@ use api::region::RegionResponse;
use api::v1::meta::TopicStat;
use api::v1::region::sync_request::ManifestInfo;
use api::v1::region::{
ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
ListMetadataRequest, RegionResponse as RegionResponseV1, RemoteDynFilterRequest, SyncRequest,
region_request,
};
use api::v1::{ResponseHeader, Status};
use arrow_flight::{FlightData, Ticket};
@@ -84,8 +85,9 @@ use crate::error::{
ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
NewPlanDecoderSnafu, NotYetImplementedSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu,
RegionNotReadySnafu, Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu,
UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;
use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
@@ -696,6 +698,70 @@ impl RegionServer {
Ok(response)
}
async fn handle_remote_dyn_filter_request(
&self,
request: &RemoteDynFilterRequest,
) -> Result<RegionResponse> {
if request.query_id.is_empty() {
return error::MissingRequiredFieldSnafu { name: "query_id" }.fail();
}
match request
.action
.as_ref()
.context(error::MissingRequiredFieldSnafu { name: "action" })?
{
api::v1::region::remote_dyn_filter_request::Action::Update(update) => {
self.handle_remote_dyn_filter_update(&request.query_id, update)
.await
}
api::v1::region::remote_dyn_filter_request::Action::Unregister(unregister) => {
self.handle_remote_dyn_filter_unregister(&request.query_id, unregister)
.await
}
}
}
async fn handle_remote_dyn_filter_update(
&self,
query_id: &str,
request: &api::v1::region::RemoteDynFilterUpdate,
) -> Result<RegionResponse> {
if request.filter_id.is_empty() {
return error::MissingRequiredFieldSnafu { name: "filter_id" }.fail();
}
if request.payload.is_empty() {
return error::MissingRequiredFieldSnafu { name: "payload" }.fail();
}
NotYetImplementedSnafu {
what: format!(
"remote dyn filter update unary RPC placeholder for query_id {query_id}, filter_id {}",
request.filter_id
),
}
.fail()
}
async fn handle_remote_dyn_filter_unregister(
&self,
query_id: &str,
request: &api::v1::region::RemoteDynFilterUnregister,
) -> Result<RegionResponse> {
if request.filter_id.is_empty() {
return error::MissingRequiredFieldSnafu { name: "filter_id" }.fail();
}
NotYetImplementedSnafu {
what: format!(
"remote dyn filter unregister unary RPC placeholder for query_id {query_id}, filter_id {}",
request.filter_id
),
}
.fail()
}
/// Sync region manifest and registers new opened logical regions.
pub async fn sync_region(
&self,
@@ -767,6 +833,10 @@ impl RegionServerHandler for RegionServer {
self.handle_list_metadata_request(list_metadata_request)
.await
}
region_request::Body::RemoteDynFilter(remote_dyn_filter_request) => {
self.handle_remote_dyn_filter_request(remote_dyn_filter_request)
.await
}
_ => self.handle_requests_in_serial(request).await,
}
.map_err(BoxedError::new)
@@ -1670,6 +1740,10 @@ mod tests {
use std::assert_matches;
use api::v1::SemanticType;
use api::v1::region::{
RemoteDynFilterRequest, RemoteDynFilterUnregister, RemoteDynFilterUpdate,
remote_dyn_filter_request,
};
use common_error::ext::ErrorExt;
use datatypes::prelude::ConcreteDataType;
use mito2::test_util::CreateRequestBuilder;
@@ -2304,4 +2378,135 @@ mod tests {
.await
.unwrap_err();
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_request_requires_query_id() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: String::new(),
action: Some(remote_dyn_filter_request::Action::Unregister(
RemoteDynFilterUnregister {
filter_id: "filter-1".to_string(),
},
)),
})
.await
.unwrap_err();
assert_matches!(
err,
crate::error::Error::MissingRequiredField { ref name, .. } if name == "query_id"
);
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_request_requires_action() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: "query-1".to_string(),
action: None,
})
.await
.unwrap_err();
assert_matches!(
err,
crate::error::Error::MissingRequiredField { ref name, .. } if name == "action"
);
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_update_requires_filter_id() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: "query-1".to_string(),
action: Some(remote_dyn_filter_request::Action::Update(
RemoteDynFilterUpdate {
filter_id: String::new(),
payload: vec![1],
generation: 1,
is_complete: false,
},
)),
})
.await
.unwrap_err();
assert_matches!(
err,
crate::error::Error::MissingRequiredField { ref name, .. } if name == "filter_id"
);
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_update_requires_payload() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: "query-1".to_string(),
action: Some(remote_dyn_filter_request::Action::Update(
RemoteDynFilterUpdate {
filter_id: "filter-1".to_string(),
payload: Vec::new(),
generation: 1,
is_complete: false,
},
)),
})
.await
.unwrap_err();
assert_matches!(
err,
crate::error::Error::MissingRequiredField { ref name, .. } if name == "payload"
);
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_update_placeholder() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: "query-1".to_string(),
action: Some(remote_dyn_filter_request::Action::Update(
RemoteDynFilterUpdate {
filter_id: "filter-1".to_string(),
payload: vec![1],
generation: 1,
is_complete: false,
},
)),
})
.await
.unwrap_err();
assert_matches!(err, crate::error::Error::NotYetImplemented { .. });
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_unregister_placeholder() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: "query-1".to_string(),
action: Some(remote_dyn_filter_request::Action::Unregister(
RemoteDynFilterUnregister {
filter_id: "filter-1".to_string(),
},
)),
})
.await
.unwrap_err();
assert_matches!(err, crate::error::Error::NotYetImplemented { .. });
}
}

View File

@@ -184,6 +184,10 @@ impl RegionRequest {
reason: "ListMetadata request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::RemoteDynFilter(_) => UnexpectedSnafu {
reason: "RemoteDynFilter request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::ApplyStagingManifest(apply) => {
make_region_apply_staging_manifest(apply)
}