From 550985cded0ae7f9e40d4d4c0905d6b775508987 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 2 Apr 2026 16:25:33 +0800 Subject: [PATCH] feat: add remote dyn filter region rpc scaffolding Signed-off-by: discord9 --- Cargo.lock | 16 +-- Cargo.toml | 2 +- src/client/src/region.rs | 74 +++++++++- src/datanode/src/region_server.rs | 211 +++++++++++++++++++++++++++- src/store-api/src/region_request.rs | 4 + 5 files changed, 294 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4c7df9083b..6081085ba0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2139,7 +2139,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] @@ -5721,7 +5721,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=092ba1d01e2da676dca66cca7eebb55009da8ef8#092ba1d01e2da676dca66cca7eebb55009da8ef8" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=9423b6ae25e8e64b1c57ef2594a6a7698efb3c5a#9423b6ae25e8e64b1c57ef2594a6a7698efb3c5a" dependencies = [ "prost 0.14.1", "prost-types 0.14.1", @@ -6241,7 +6241,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.0", + "socket2 0.5.10", "tokio", "tower-service", "tracing", @@ -7330,7 +7330,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -10361,7 +10361,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" dependencies = [ "heck 0.5.0", - "itertools 0.14.0", + "itertools 0.10.5", "log", "multimap", "once_cell", @@ -10409,7 +10409,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.117", @@ -10422,7 +10422,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.117", @@ -15080,7 +15080,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9ebcfc8627..5b5033455d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -155,7 +155,7 @@ etcd-client = { version = "0.17", features = [ fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "092ba1d01e2da676dca66cca7eebb55009da8ef8" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9423b6ae25e8e64b1c57ef2594a6a7698efb3c5a" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 8eefb16e0d..72321b2659 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -16,7 +16,10 @@ use std::sync::Arc; use api::region::RegionResponse; use api::v1::ResponseHeader; -use api::v1::region::RegionRequest; +use api::v1::region::{ + RegionRequest, RegionRequestHeader, RemoteDynFilterRequest, RemoteDynFilterUnregister, + RemoteDynFilterUpdate, region_request, remote_dyn_filter_request, +}; use arc_swap::ArcSwapOption; use arrow_flight::Ticket; use async_stream::stream; @@ -284,6 +287,48 @@ impl RegionRequester { pub async fn handle(&self, request: RegionRequest) -> Result { self.handle_inner(request).await } + + pub async fn handle_remote_dyn_filter_update( + &self, + query_id: impl Into, + update: RemoteDynFilterUpdate, + ) -> Result { + self.handle_inner(build_remote_dyn_filter_request( + query_id.into(), + remote_dyn_filter_request::Action::Update(update), + )) + .await + } + + pub async fn handle_remote_dyn_filter_unregister( + &self, + query_id: impl Into, + unregister: RemoteDynFilterUnregister, + ) -> Result { + self.handle_inner(build_remote_dyn_filter_request( + query_id.into(), + remote_dyn_filter_request::Action::Unregister(unregister), + )) + .await + } +} + +fn build_remote_dyn_filter_request( + query_id: String, + action: remote_dyn_filter_request::Action, +) -> RegionRequest { + RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(region_request::Body::RemoteDynFilter( + RemoteDynFilterRequest { + query_id, + action: Some(action), + }, + )), + } } pub fn check_response_header(header: &Option) -> Result<()> { @@ -312,6 +357,7 @@ pub fn check_response_header(header: &Option) -> Result<()> { #[cfg(test)] mod test { use api::v1::Status as PbStatus; + use api::v1::region::{RemoteDynFilterUpdate, region_request, remote_dyn_filter_request}; use super::*; use crate::Error::{IllegalDatabaseResponse, Server}; @@ -361,4 +407,30 @@ mod test { assert_eq!(code, StatusCode::Internal); assert_eq!(msg, "blabla"); } + + #[test] + fn test_build_remote_dyn_filter_request_sets_header_and_body() { + let request = build_remote_dyn_filter_request( + "query-1".to_string(), + remote_dyn_filter_request::Action::Update(RemoteDynFilterUpdate { + filter_id: "filter-1".to_string(), + payload: vec![1, 2, 3], + generation: 7, + is_complete: false, + }), + ); + + request.header.expect("remote dyn filter header must exist"); + + let body = request.body.expect("remote dyn filter body must exist"); + let region_request::Body::RemoteDynFilter(remote_request) = body else { + panic!("expected remote dyn filter request body"); + }; + + assert_eq!(remote_request.query_id, "query-1"); + assert!(matches!( + remote_request.action, + Some(remote_dyn_filter_request::Action::Update(_)) + )); + } } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index ec10691bea..9bf028fef7 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -25,7 +25,8 @@ use api::region::RegionResponse; use api::v1::meta::TopicStat; use api::v1::region::sync_request::ManifestInfo; use api::v1::region::{ - ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request, + ListMetadataRequest, RegionResponse as RegionResponseV1, RemoteDynFilterRequest, SyncRequest, + region_request, }; use api::v1::{ResponseHeader, Status}; use arrow_flight::{FlightData, Ticket}; @@ -84,8 +85,9 @@ use crate::error::{ ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu, HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, - NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, - Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, + NewPlanDecoderSnafu, NotYetImplementedSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, + RegionNotReadySnafu, Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, + UnsupportedOutputSnafu, }; use crate::event_listener::RegionServerEventListenerRef; use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder}; @@ -696,6 +698,70 @@ impl RegionServer { Ok(response) } + async fn handle_remote_dyn_filter_request( + &self, + request: &RemoteDynFilterRequest, + ) -> Result { + if request.query_id.is_empty() { + return error::MissingRequiredFieldSnafu { name: "query_id" }.fail(); + } + + match request + .action + .as_ref() + .context(error::MissingRequiredFieldSnafu { name: "action" })? + { + api::v1::region::remote_dyn_filter_request::Action::Update(update) => { + self.handle_remote_dyn_filter_update(&request.query_id, update) + .await + } + api::v1::region::remote_dyn_filter_request::Action::Unregister(unregister) => { + self.handle_remote_dyn_filter_unregister(&request.query_id, unregister) + .await + } + } + } + + async fn handle_remote_dyn_filter_update( + &self, + query_id: &str, + request: &api::v1::region::RemoteDynFilterUpdate, + ) -> Result { + if request.filter_id.is_empty() { + return error::MissingRequiredFieldSnafu { name: "filter_id" }.fail(); + } + + if request.payload.is_empty() { + return error::MissingRequiredFieldSnafu { name: "payload" }.fail(); + } + + NotYetImplementedSnafu { + what: format!( + "remote dyn filter update unary RPC placeholder for query_id {query_id}, filter_id {}", + request.filter_id + ), + } + .fail() + } + + async fn handle_remote_dyn_filter_unregister( + &self, + query_id: &str, + request: &api::v1::region::RemoteDynFilterUnregister, + ) -> Result { + if request.filter_id.is_empty() { + return error::MissingRequiredFieldSnafu { name: "filter_id" }.fail(); + } + + NotYetImplementedSnafu { + what: format!( + "remote dyn filter unregister unary RPC placeholder for query_id {query_id}, filter_id {}", + request.filter_id + ), + } + .fail() + } + /// Sync region manifest and registers new opened logical regions. pub async fn sync_region( &self, @@ -767,6 +833,10 @@ impl RegionServerHandler for RegionServer { self.handle_list_metadata_request(list_metadata_request) .await } + region_request::Body::RemoteDynFilter(remote_dyn_filter_request) => { + self.handle_remote_dyn_filter_request(remote_dyn_filter_request) + .await + } _ => self.handle_requests_in_serial(request).await, } .map_err(BoxedError::new) @@ -1670,6 +1740,10 @@ mod tests { use std::assert_matches; use api::v1::SemanticType; + use api::v1::region::{ + RemoteDynFilterRequest, RemoteDynFilterUnregister, RemoteDynFilterUpdate, + remote_dyn_filter_request, + }; use common_error::ext::ErrorExt; use datatypes::prelude::ConcreteDataType; use mito2::test_util::CreateRequestBuilder; @@ -2304,4 +2378,135 @@ mod tests { .await .unwrap_err(); } + + #[tokio::test] + async fn test_handle_remote_dyn_filter_request_requires_query_id() { + let mock_region_server = mock_region_server(); + + let err = mock_region_server + .handle_remote_dyn_filter_request(&RemoteDynFilterRequest { + query_id: String::new(), + action: Some(remote_dyn_filter_request::Action::Unregister( + RemoteDynFilterUnregister { + filter_id: "filter-1".to_string(), + }, + )), + }) + .await + .unwrap_err(); + + assert_matches!( + err, + crate::error::Error::MissingRequiredField { ref name, .. } if name == "query_id" + ); + } + + #[tokio::test] + async fn test_handle_remote_dyn_filter_request_requires_action() { + let mock_region_server = mock_region_server(); + + let err = mock_region_server + .handle_remote_dyn_filter_request(&RemoteDynFilterRequest { + query_id: "query-1".to_string(), + action: None, + }) + .await + .unwrap_err(); + + assert_matches!( + err, + crate::error::Error::MissingRequiredField { ref name, .. } if name == "action" + ); + } + + #[tokio::test] + async fn test_handle_remote_dyn_filter_update_requires_filter_id() { + let mock_region_server = mock_region_server(); + + let err = mock_region_server + .handle_remote_dyn_filter_request(&RemoteDynFilterRequest { + query_id: "query-1".to_string(), + action: Some(remote_dyn_filter_request::Action::Update( + RemoteDynFilterUpdate { + filter_id: String::new(), + payload: vec![1], + generation: 1, + is_complete: false, + }, + )), + }) + .await + .unwrap_err(); + + assert_matches!( + err, + crate::error::Error::MissingRequiredField { ref name, .. } if name == "filter_id" + ); + } + + #[tokio::test] + async fn test_handle_remote_dyn_filter_update_requires_payload() { + let mock_region_server = mock_region_server(); + + let err = mock_region_server + .handle_remote_dyn_filter_request(&RemoteDynFilterRequest { + query_id: "query-1".to_string(), + action: Some(remote_dyn_filter_request::Action::Update( + RemoteDynFilterUpdate { + filter_id: "filter-1".to_string(), + payload: Vec::new(), + generation: 1, + is_complete: false, + }, + )), + }) + .await + .unwrap_err(); + + assert_matches!( + err, + crate::error::Error::MissingRequiredField { ref name, .. } if name == "payload" + ); + } + + #[tokio::test] + async fn test_handle_remote_dyn_filter_update_placeholder() { + let mock_region_server = mock_region_server(); + + let err = mock_region_server + .handle_remote_dyn_filter_request(&RemoteDynFilterRequest { + query_id: "query-1".to_string(), + action: Some(remote_dyn_filter_request::Action::Update( + RemoteDynFilterUpdate { + filter_id: "filter-1".to_string(), + payload: vec![1], + generation: 1, + is_complete: false, + }, + )), + }) + .await + .unwrap_err(); + + assert_matches!(err, crate::error::Error::NotYetImplemented { .. }); + } + + #[tokio::test] + async fn test_handle_remote_dyn_filter_unregister_placeholder() { + let mock_region_server = mock_region_server(); + + let err = mock_region_server + .handle_remote_dyn_filter_request(&RemoteDynFilterRequest { + query_id: "query-1".to_string(), + action: Some(remote_dyn_filter_request::Action::Unregister( + RemoteDynFilterUnregister { + filter_id: "filter-1".to_string(), + }, + )), + }) + .await + .unwrap_err(); + + assert_matches!(err, crate::error::Error::NotYetImplemented { .. }); + } } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 99d3a87dd3..33a66d0d0d 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -184,6 +184,10 @@ impl RegionRequest { reason: "ListMetadata request should be handled separately by RegionServer", } .fail(), + region_request::Body::RemoteDynFilter(_) => UnexpectedSnafu { + reason: "RemoteDynFilter request should be handled separately by RegionServer", + } + .fail(), region_request::Body::ApplyStagingManifest(apply) => { make_region_apply_staging_manifest(apply) }