feat: add remote dyn filter region rpc scaffolding

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-02 16:25:33 +08:00
parent decef45603
commit 287bdd9711
4 changed files with 286 additions and 5 deletions

View File

@@ -159,7 +159,7 @@ fs2 = "0.4"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0de5437582920c8b30d6c34212f161db71d95c50" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1b1812e839930f2037992d98657c0ff04fa5b2ee" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -16,7 +16,10 @@ use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::ResponseHeader;
use api::v1::region::RegionRequest;
use api::v1::region::{
RegionRequest, RegionRequestHeader, RemoteDynFilterRequest, RemoteDynFilterUnregister,
RemoteDynFilterUpdate, region_request, remote_dyn_filter_request,
};
use arc_swap::ArcSwapOption;
use arrow_flight::Ticket;
use async_stream::stream;
@@ -284,6 +287,48 @@ impl RegionRequester {
pub async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
self.handle_inner(request).await
}
pub async fn handle_remote_dyn_filter_update(
&self,
query_id: impl Into<String>,
update: RemoteDynFilterUpdate,
) -> Result<RegionResponse> {
self.handle_inner(build_remote_dyn_filter_request(
query_id.into(),
remote_dyn_filter_request::Action::Update(update),
))
.await
}
pub async fn handle_remote_dyn_filter_unregister(
&self,
query_id: impl Into<String>,
unregister: RemoteDynFilterUnregister,
) -> Result<RegionResponse> {
self.handle_inner(build_remote_dyn_filter_request(
query_id.into(),
remote_dyn_filter_request::Action::Unregister(unregister),
))
.await
}
}
fn build_remote_dyn_filter_request(
query_id: String,
action: remote_dyn_filter_request::Action,
) -> RegionRequest {
RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::RemoteDynFilter(
RemoteDynFilterRequest {
query_id,
action: Some(action),
},
)),
}
}
pub fn check_response_header(header: &Option<ResponseHeader>) -> Result<()> {
@@ -312,6 +357,7 @@ pub fn check_response_header(header: &Option<ResponseHeader>) -> Result<()> {
#[cfg(test)]
mod test {
use api::v1::Status as PbStatus;
use api::v1::region::{RemoteDynFilterUpdate, region_request, remote_dyn_filter_request};
use super::*;
use crate::Error::{IllegalDatabaseResponse, Server};
@@ -361,4 +407,30 @@ mod test {
assert_eq!(code, StatusCode::Internal);
assert_eq!(msg, "blabla");
}
#[test]
fn test_build_remote_dyn_filter_request_sets_header_and_body() {
let request = build_remote_dyn_filter_request(
"query-1".to_string(),
remote_dyn_filter_request::Action::Update(RemoteDynFilterUpdate {
filter_id: "filter-1".to_string(),
payload: vec![1, 2, 3],
generation: 7,
is_complete: false,
}),
);
request.header.expect("remote dyn filter header must exist");
let body = request.body.expect("remote dyn filter body must exist");
let region_request::Body::RemoteDynFilter(remote_request) = body else {
panic!("expected remote dyn filter request body");
};
assert_eq!(remote_request.query_id, "query-1");
assert!(matches!(
remote_request.action,
Some(remote_dyn_filter_request::Action::Update(_))
));
}
}

View File

@@ -27,7 +27,8 @@ use api::region::RegionResponse;
use api::v1::meta::TopicStat;
use api::v1::region::sync_request::ManifestInfo;
use api::v1::region::{
ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
ListMetadataRequest, RegionResponse as RegionResponseV1, RemoteDynFilterRequest, SyncRequest,
region_request,
};
use api::v1::{ResponseHeader, Status};
use arrow_flight::{FlightData, Ticket};
@@ -89,8 +90,9 @@ use crate::error::{
ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
NewPlanDecoderSnafu, NotYetImplementedSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu,
RegionNotReadySnafu, Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu,
UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;
use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
@@ -713,6 +715,70 @@ impl RegionServer {
Ok(response)
}
async fn handle_remote_dyn_filter_request(
&self,
request: &RemoteDynFilterRequest,
) -> Result<RegionResponse> {
if request.query_id.is_empty() {
return error::MissingRequiredFieldSnafu { name: "query_id" }.fail();
}
match request
.action
.as_ref()
.context(error::MissingRequiredFieldSnafu { name: "action" })?
{
api::v1::region::remote_dyn_filter_request::Action::Update(update) => {
self.handle_remote_dyn_filter_update(&request.query_id, update)
.await
}
api::v1::region::remote_dyn_filter_request::Action::Unregister(unregister) => {
self.handle_remote_dyn_filter_unregister(&request.query_id, unregister)
.await
}
}
}
async fn handle_remote_dyn_filter_update(
&self,
query_id: &str,
request: &api::v1::region::RemoteDynFilterUpdate,
) -> Result<RegionResponse> {
if request.filter_id.is_empty() {
return error::MissingRequiredFieldSnafu { name: "filter_id" }.fail();
}
if request.payload.is_empty() {
return error::MissingRequiredFieldSnafu { name: "payload" }.fail();
}
NotYetImplementedSnafu {
what: format!(
"remote dyn filter update unary RPC placeholder for query_id {query_id}, filter_id {}",
request.filter_id
),
}
.fail()
}
async fn handle_remote_dyn_filter_unregister(
&self,
query_id: &str,
request: &api::v1::region::RemoteDynFilterUnregister,
) -> Result<RegionResponse> {
if request.filter_id.is_empty() {
return error::MissingRequiredFieldSnafu { name: "filter_id" }.fail();
}
NotYetImplementedSnafu {
what: format!(
"remote dyn filter unregister unary RPC placeholder for query_id {query_id}, filter_id {}",
request.filter_id
),
}
.fail()
}
/// Sync region manifest and registers new opened logical regions.
pub async fn sync_region(
&self,
@@ -877,6 +943,10 @@ impl RegionServerHandler for RegionServer {
self.handle_list_metadata_request(list_metadata_request)
.await
}
region_request::Body::RemoteDynFilter(remote_dyn_filter_request) => {
self.handle_remote_dyn_filter_request(remote_dyn_filter_request)
.await
}
_ => self.handle_requests_in_serial(request).await,
}
.map_err(BoxedError::new)
@@ -1782,6 +1852,10 @@ mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use api::v1::region::{
RemoteDynFilterRequest, RemoteDynFilterUnregister, RemoteDynFilterUpdate,
remote_dyn_filter_request,
};
use common_error::ext::ErrorExt;
use common_recordbatch::RecordBatches;
use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
@@ -2560,4 +2634,135 @@ mod tests {
.await
.unwrap_err();
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_request_requires_query_id() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: String::new(),
action: Some(remote_dyn_filter_request::Action::Unregister(
RemoteDynFilterUnregister {
filter_id: "filter-1".to_string(),
},
)),
})
.await
.unwrap_err();
assert_matches!(
err,
crate::error::Error::MissingRequiredField { ref name, .. } if name == "query_id"
);
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_request_requires_action() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: "query-1".to_string(),
action: None,
})
.await
.unwrap_err();
assert_matches!(
err,
crate::error::Error::MissingRequiredField { ref name, .. } if name == "action"
);
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_update_requires_filter_id() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: "query-1".to_string(),
action: Some(remote_dyn_filter_request::Action::Update(
RemoteDynFilterUpdate {
filter_id: String::new(),
payload: vec![1],
generation: 1,
is_complete: false,
},
)),
})
.await
.unwrap_err();
assert_matches!(
err,
crate::error::Error::MissingRequiredField { ref name, .. } if name == "filter_id"
);
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_update_requires_payload() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: "query-1".to_string(),
action: Some(remote_dyn_filter_request::Action::Update(
RemoteDynFilterUpdate {
filter_id: "filter-1".to_string(),
payload: Vec::new(),
generation: 1,
is_complete: false,
},
)),
})
.await
.unwrap_err();
assert_matches!(
err,
crate::error::Error::MissingRequiredField { ref name, .. } if name == "payload"
);
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_update_placeholder() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: "query-1".to_string(),
action: Some(remote_dyn_filter_request::Action::Update(
RemoteDynFilterUpdate {
filter_id: "filter-1".to_string(),
payload: vec![1],
generation: 1,
is_complete: false,
},
)),
})
.await
.unwrap_err();
assert_matches!(err, crate::error::Error::NotYetImplemented { .. });
}
#[tokio::test]
async fn test_handle_remote_dyn_filter_unregister_placeholder() {
let mock_region_server = mock_region_server();
let err = mock_region_server
.handle_remote_dyn_filter_request(&RemoteDynFilterRequest {
query_id: "query-1".to_string(),
action: Some(remote_dyn_filter_request::Action::Unregister(
RemoteDynFilterUnregister {
filter_id: "filter-1".to_string(),
},
)),
})
.await
.unwrap_err();
assert_matches!(err, crate::error::Error::NotYetImplemented { .. });
}
}

View File

@@ -184,6 +184,10 @@ impl RegionRequest {
reason: "ListMetadata request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::RemoteDynFilter(_) => UnexpectedSnafu {
reason: "RemoteDynFilter request should be handled separately by RegionServer",
}
.fail(),
region_request::Body::ApplyStagingManifest(apply) => {
make_region_apply_staging_manifest(apply)
}