diff --git a/Cargo.toml b/Cargo.toml index 225e3141bb..65328d5f59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -159,7 +159,7 @@ fs2 = "0.4" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0de5437582920c8b30d6c34212f161db71d95c50" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1b1812e839930f2037992d98657c0ff04fa5b2ee" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 8eefb16e0d..72321b2659 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -16,7 +16,10 @@ use std::sync::Arc; use api::region::RegionResponse; use api::v1::ResponseHeader; -use api::v1::region::RegionRequest; +use api::v1::region::{ + RegionRequest, RegionRequestHeader, RemoteDynFilterRequest, RemoteDynFilterUnregister, + RemoteDynFilterUpdate, region_request, remote_dyn_filter_request, +}; use arc_swap::ArcSwapOption; use arrow_flight::Ticket; use async_stream::stream; @@ -284,6 +287,48 @@ impl RegionRequester { pub async fn handle(&self, request: RegionRequest) -> Result { self.handle_inner(request).await } + + pub async fn handle_remote_dyn_filter_update( + &self, + query_id: impl Into, + update: RemoteDynFilterUpdate, + ) -> Result { + self.handle_inner(build_remote_dyn_filter_request( + query_id.into(), + remote_dyn_filter_request::Action::Update(update), + )) + .await + } + + pub async fn handle_remote_dyn_filter_unregister( + &self, + query_id: impl Into, + unregister: RemoteDynFilterUnregister, + ) -> Result { + self.handle_inner(build_remote_dyn_filter_request( + query_id.into(), + remote_dyn_filter_request::Action::Unregister(unregister), + )) + .await + } +} + +fn build_remote_dyn_filter_request( + query_id: String, + action: remote_dyn_filter_request::Action, +) -> RegionRequest { + RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(region_request::Body::RemoteDynFilter( + RemoteDynFilterRequest { + query_id, + action: Some(action), + }, + )), + } } pub fn check_response_header(header: &Option) -> Result<()> { @@ -312,6 +357,7 @@ pub fn check_response_header(header: &Option) -> Result<()> { #[cfg(test)] mod test { use api::v1::Status as PbStatus; + use api::v1::region::{RemoteDynFilterUpdate, region_request, remote_dyn_filter_request}; use super::*; use crate::Error::{IllegalDatabaseResponse, Server}; @@ -361,4 +407,30 @@ mod test { assert_eq!(code, StatusCode::Internal); assert_eq!(msg, "blabla"); } + + #[test] + fn test_build_remote_dyn_filter_request_sets_header_and_body() { + let request = build_remote_dyn_filter_request( + "query-1".to_string(), + remote_dyn_filter_request::Action::Update(RemoteDynFilterUpdate { + filter_id: "filter-1".to_string(), + payload: vec![1, 2, 3], + generation: 7, + is_complete: false, + }), + ); + + request.header.expect("remote dyn filter header must exist"); + + let body = request.body.expect("remote dyn filter body must exist"); + let region_request::Body::RemoteDynFilter(remote_request) = body else { + panic!("expected remote dyn filter request body"); + }; + + assert_eq!(remote_request.query_id, "query-1"); + assert!(matches!( + remote_request.action, + Some(remote_dyn_filter_request::Action::Update(_)) + )); + } } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index ac8cf4df26..94e2a51ac3 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -27,7 +27,8 @@ use api::region::RegionResponse; use api::v1::meta::TopicStat; use api::v1::region::sync_request::ManifestInfo; use api::v1::region::{ - ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request, + ListMetadataRequest, RegionResponse as RegionResponseV1, RemoteDynFilterRequest, SyncRequest, + region_request, }; use api::v1::{ResponseHeader, Status}; use arrow_flight::{FlightData, Ticket}; @@ -89,8 +90,9 @@ use crate::error::{ ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu, HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, - NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, - Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, + NewPlanDecoderSnafu, NotYetImplementedSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, + RegionNotReadySnafu, Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, + UnsupportedOutputSnafu, }; use crate::event_listener::RegionServerEventListenerRef; use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder}; @@ -713,6 +715,70 @@ impl RegionServer { Ok(response) } + async fn handle_remote_dyn_filter_request( + &self, + request: &RemoteDynFilterRequest, + ) -> Result { + if request.query_id.is_empty() { + return error::MissingRequiredFieldSnafu { name: "query_id" }.fail(); + } + + match request + .action + .as_ref() + .context(error::MissingRequiredFieldSnafu { name: "action" })? + { + api::v1::region::remote_dyn_filter_request::Action::Update(update) => { + self.handle_remote_dyn_filter_update(&request.query_id, update) + .await + } + api::v1::region::remote_dyn_filter_request::Action::Unregister(unregister) => { + self.handle_remote_dyn_filter_unregister(&request.query_id, unregister) + .await + } + } + } + + async fn handle_remote_dyn_filter_update( + &self, + query_id: &str, + request: &api::v1::region::RemoteDynFilterUpdate, + ) -> Result { + if request.filter_id.is_empty() { + return error::MissingRequiredFieldSnafu { name: "filter_id" }.fail(); + } + + if request.payload.is_empty() { + return error::MissingRequiredFieldSnafu { name: "payload" }.fail(); + } + + NotYetImplementedSnafu { + what: format!( + "remote dyn filter update unary RPC placeholder for query_id {query_id}, filter_id {}", + request.filter_id + ), + } + .fail() + } + + async fn handle_remote_dyn_filter_unregister( + &self, + query_id: &str, + request: &api::v1::region::RemoteDynFilterUnregister, + ) -> Result { + if request.filter_id.is_empty() { + return error::MissingRequiredFieldSnafu { name: "filter_id" }.fail(); + } + + NotYetImplementedSnafu { + what: format!( + "remote dyn filter unregister unary RPC placeholder for query_id {query_id}, filter_id {}", + request.filter_id + ), + } + .fail() + } + /// Sync region manifest and registers new opened logical regions. pub async fn sync_region( &self, @@ -877,6 +943,10 @@ impl RegionServerHandler for RegionServer { self.handle_list_metadata_request(list_metadata_request) .await } + region_request::Body::RemoteDynFilter(remote_dyn_filter_request) => { + self.handle_remote_dyn_filter_request(remote_dyn_filter_request) + .await + } _ => self.handle_requests_in_serial(request).await, } .map_err(BoxedError::new) @@ -1782,6 +1852,10 @@ mod tests { use std::sync::Arc; use api::v1::SemanticType; + use api::v1::region::{ + RemoteDynFilterRequest, RemoteDynFilterUnregister, RemoteDynFilterUpdate, + remote_dyn_filter_request, + }; use common_error::ext::ErrorExt; use common_recordbatch::RecordBatches; use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry}; @@ -2560,4 +2634,135 @@ mod tests { .await .unwrap_err(); } + + #[tokio::test] + async fn test_handle_remote_dyn_filter_request_requires_query_id() { + let mock_region_server = mock_region_server(); + + let err = mock_region_server + .handle_remote_dyn_filter_request(&RemoteDynFilterRequest { + query_id: String::new(), + action: Some(remote_dyn_filter_request::Action::Unregister( + RemoteDynFilterUnregister { + filter_id: "filter-1".to_string(), + }, + )), + }) + .await + .unwrap_err(); + + assert_matches!( + err, + crate::error::Error::MissingRequiredField { ref name, .. } if name == "query_id" + ); + } + + #[tokio::test] + async fn test_handle_remote_dyn_filter_request_requires_action() { + let mock_region_server = mock_region_server(); + + let err = mock_region_server + .handle_remote_dyn_filter_request(&RemoteDynFilterRequest { + query_id: "query-1".to_string(), + action: None, + }) + .await + .unwrap_err(); + + assert_matches!( + err, + crate::error::Error::MissingRequiredField { ref name, .. } if name == "action" + ); + } + + #[tokio::test] + async fn test_handle_remote_dyn_filter_update_requires_filter_id() { + let mock_region_server = mock_region_server(); + + let err = mock_region_server + .handle_remote_dyn_filter_request(&RemoteDynFilterRequest { + query_id: "query-1".to_string(), + action: Some(remote_dyn_filter_request::Action::Update( + RemoteDynFilterUpdate { + filter_id: String::new(), + payload: vec![1], + generation: 1, + is_complete: false, + }, + )), + }) + .await + .unwrap_err(); + + assert_matches!( + err, + crate::error::Error::MissingRequiredField { ref name, .. } if name == "filter_id" + ); + } + + #[tokio::test] + async fn test_handle_remote_dyn_filter_update_requires_payload() { + let mock_region_server = mock_region_server(); + + let err = mock_region_server + .handle_remote_dyn_filter_request(&RemoteDynFilterRequest { + query_id: "query-1".to_string(), + action: Some(remote_dyn_filter_request::Action::Update( + RemoteDynFilterUpdate { + filter_id: "filter-1".to_string(), + payload: Vec::new(), + generation: 1, + is_complete: false, + }, + )), + }) + .await + .unwrap_err(); + + assert_matches!( + err, + crate::error::Error::MissingRequiredField { ref name, .. } if name == "payload" + ); + } + + #[tokio::test] + async fn test_handle_remote_dyn_filter_update_placeholder() { + let mock_region_server = mock_region_server(); + + let err = mock_region_server + .handle_remote_dyn_filter_request(&RemoteDynFilterRequest { + query_id: "query-1".to_string(), + action: Some(remote_dyn_filter_request::Action::Update( + RemoteDynFilterUpdate { + filter_id: "filter-1".to_string(), + payload: vec![1], + generation: 1, + is_complete: false, + }, + )), + }) + .await + .unwrap_err(); + + assert_matches!(err, crate::error::Error::NotYetImplemented { .. }); + } + + #[tokio::test] + async fn test_handle_remote_dyn_filter_unregister_placeholder() { + let mock_region_server = mock_region_server(); + + let err = mock_region_server + .handle_remote_dyn_filter_request(&RemoteDynFilterRequest { + query_id: "query-1".to_string(), + action: Some(remote_dyn_filter_request::Action::Unregister( + RemoteDynFilterUnregister { + filter_id: "filter-1".to_string(), + }, + )), + }) + .await + .unwrap_err(); + + assert_matches!(err, crate::error::Error::NotYetImplemented { .. }); + } } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 0759fccf67..e33e7613c5 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -184,6 +184,10 @@ impl RegionRequest { reason: "ListMetadata request should be handled separately by RegionServer", } .fail(), + region_request::Body::RemoteDynFilter(_) => UnexpectedSnafu { + reason: "RemoteDynFilter request should be handled separately by RegionServer", + } + .fail(), region_request::Body::ApplyStagingManifest(apply) => { make_region_apply_staging_manifest(apply) }