diff --git a/Cargo.lock b/Cargo.lock
index dcb3480d46..65d6fbd0d5 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2747,6 +2747,7 @@ dependencies = [
name = "common-session"
version = "0.17.0"
dependencies = [
+ "serde",
"strum 0.27.1",
]
diff --git a/config/config.md b/config/config.md
index 8ecde920e8..209bc48079 100644
--- a/config/config.md
+++ b/config/config.md
@@ -591,6 +591,7 @@
| `flow.batching_mode.experimental_frontend_activity_timeout` | String | `60s` | Frontend activity timeout
if frontend is down(not sending heartbeat) for more than frontend_activity_timeout,
it will be removed from the list that flownode use to connect |
| `flow.batching_mode.experimental_max_filter_num_per_query` | Integer | `20` | Maximum number of filters allowed in a single query |
| `flow.batching_mode.experimental_time_window_merge_threshold` | Integer | `3` | Time window merge distance |
+| `flow.batching_mode.read_preference` | String | `Leader` | Read preference of the Frontend client. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:6800` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:6800` | The address advertised to the metasrv,
and used for connections from outside the host |
diff --git a/config/flownode.example.toml b/config/flownode.example.toml
index a1d40f514b..bf2a2365c8 100644
--- a/config/flownode.example.toml
+++ b/config/flownode.example.toml
@@ -30,6 +30,8 @@ node_id = 14
#+experimental_max_filter_num_per_query=20
## Time window merge distance
#+experimental_time_window_merge_threshold=3
+## Read preference of the Frontend client.
+#+read_preference="Leader"
## The gRPC server options.
[grpc]
diff --git a/src/common/session/Cargo.toml b/src/common/session/Cargo.toml
index cd9bfa626b..db3ca0361f 100644
--- a/src/common/session/Cargo.toml
+++ b/src/common/session/Cargo.toml
@@ -8,4 +8,5 @@ license.workspace = true
workspace = true
[dependencies]
+serde.workspace = true
strum.workspace = true
diff --git a/src/common/session/src/lib.rs b/src/common/session/src/lib.rs
index 51d7846532..f09baf9286 100644
--- a/src/common/session/src/lib.rs
+++ b/src/common/session/src/lib.rs
@@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use serde::{Deserialize, Serialize};
use strum::{AsRefStr, Display, EnumString};
/// Defines the read preference for frontend route operations,
/// determining whether to read from the region leader or follower.
-#[derive(Debug, Clone, Copy, Default, EnumString, Display, AsRefStr, PartialEq, Eq)]
+#[derive(
+ Debug, Clone, Copy, Default, EnumString, Display, AsRefStr, PartialEq, Serialize, Deserialize,
+)]
pub enum ReadPreference {
#[default]
// Reads all operations from the region leader. This is the default mode.
diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs
index 7aab51020c..a4d1e138e3 100644
--- a/src/datanode/src/datanode.rs
+++ b/src/datanode/src/datanode.rs
@@ -705,6 +705,7 @@ async fn open_all_regions(
follower_regions.len()
);
let mut region_requests = Vec::with_capacity(follower_regions.len());
+ let num_regions = follower_regions.len();
for (region_id, engine, store_path, options) in follower_regions {
let table_dir = table_dir(&store_path, region_id.table_id());
region_requests.push((
diff --git a/src/flow/src/batching_mode.rs b/src/flow/src/batching_mode.rs
index 65e678bd00..76702a2f73 100644
--- a/src/flow/src/batching_mode.rs
+++ b/src/flow/src/batching_mode.rs
@@ -17,6 +17,7 @@
use std::time::Duration;
use serde::{Deserialize, Serialize};
+use session::ReadPreference;
pub(crate) mod engine;
pub(crate) mod frontend_client;
@@ -54,6 +55,8 @@ pub struct BatchingModeOptions {
pub experimental_max_filter_num_per_query: usize,
/// Time window merge distance
pub experimental_time_window_merge_threshold: usize,
+ /// Read preference of the Frontend client.
+ pub read_preference: ReadPreference,
}
impl Default for BatchingModeOptions {
@@ -68,6 +71,7 @@ impl Default for BatchingModeOptions {
experimental_frontend_activity_timeout: Duration::from_secs(60),
experimental_max_filter_num_per_query: 20,
experimental_time_window_merge_threshold: 3,
+ read_preference: Default::default(),
}
}
}
diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs
index eb29b6c5cd..1c920b1db5 100644
--- a/src/flow/src/batching_mode/frontend_client.rs
+++ b/src/flow/src/batching_mode/frontend_client.rs
@@ -36,6 +36,7 @@ use rand::rng;
use rand::seq::SliceRandom;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
+use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt};
use crate::batching_mode::BatchingModeOptions;
@@ -363,7 +364,10 @@ impl FrontendClient {
.handle_with_retry(
req.clone(),
batch_opts.experimental_grpc_max_retries,
- &[(QUERY_PARALLELISM_HINT, &query.parallelism.to_string())],
+ &[
+ (QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
+ (READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
+ ],
)
.await
.with_context(|_| InvalidRequestSnafu {
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index 8c11920ec6..c5cf7e89bd 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -1269,6 +1269,7 @@ experimental_frontend_scan_timeout = "30s"
experimental_frontend_activity_timeout = "1m"
experimental_max_filter_num_per_query = 20
experimental_time_window_merge_threshold = 3
+read_preference = "Leader"
[logging]
max_log_files = 720