feat: able to set read preference to flownode (#6696)

fix: correctly compare the opened follower regions in startup

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-08-08 17:08:09 +08:00
committed by GitHub
parent 3a2f5413e0
commit 253d89b5cc
9 changed files with 20 additions and 2 deletions

1
Cargo.lock generated
View File

@@ -2747,6 +2747,7 @@ dependencies = [
name = "common-session"
version = "0.17.0"
dependencies = [
"serde",
"strum 0.27.1",
]

View File

@@ -591,6 +591,7 @@
| `flow.batching_mode.experimental_frontend_activity_timeout` | String | `60s` | Frontend activity timeout<br/>if frontend is down(not sending heartbeat) for more than frontend_activity_timeout,<br/>it will be removed from the list that flownode use to connect |
| `flow.batching_mode.experimental_max_filter_num_per_query` | Integer | `20` | Maximum number of filters allowed in a single query |
| `flow.batching_mode.experimental_time_window_merge_threshold` | Integer | `3` | Time window merge distance |
| `flow.batching_mode.read_preference` | String | `Leader` | Read preference of the Frontend client. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:6800` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:6800` | The address advertised to the metasrv,<br/>and used for connections from outside the host |

View File

@@ -30,6 +30,8 @@ node_id = 14
#+experimental_max_filter_num_per_query=20
## Time window merge distance
#+experimental_time_window_merge_threshold=3
## Read preference of the Frontend client.
#+read_preference="Leader"
## The gRPC server options.
[grpc]

View File

@@ -8,4 +8,5 @@ license.workspace = true
workspace = true
[dependencies]
serde.workspace = true
strum.workspace = true

View File

@@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
use strum::{AsRefStr, Display, EnumString};
/// Defines the read preference for frontend route operations,
/// determining whether to read from the region leader or follower.
#[derive(Debug, Clone, Copy, Default, EnumString, Display, AsRefStr, PartialEq, Eq)]
#[derive(
Debug, Clone, Copy, Default, EnumString, Display, AsRefStr, PartialEq, Serialize, Deserialize,
)]
pub enum ReadPreference {
#[default]
// Reads all operations from the region leader. This is the default mode.

View File

@@ -705,6 +705,7 @@ async fn open_all_regions(
follower_regions.len()
);
let mut region_requests = Vec::with_capacity(follower_regions.len());
let num_regions = follower_regions.len();
for (region_id, engine, store_path, options) in follower_regions {
let table_dir = table_dir(&store_path, region_id.table_id());
region_requests.push((

View File

@@ -17,6 +17,7 @@
use std::time::Duration;
use serde::{Deserialize, Serialize};
use session::ReadPreference;
pub(crate) mod engine;
pub(crate) mod frontend_client;
@@ -54,6 +55,8 @@ pub struct BatchingModeOptions {
pub experimental_max_filter_num_per_query: usize,
/// Time window merge distance
pub experimental_time_window_merge_threshold: usize,
/// Read preference of the Frontend client.
pub read_preference: ReadPreference,
}
impl Default for BatchingModeOptions {
@@ -68,6 +71,7 @@ impl Default for BatchingModeOptions {
experimental_frontend_activity_timeout: Duration::from_secs(60),
experimental_max_filter_num_per_query: 20,
experimental_time_window_merge_threshold: 3,
read_preference: Default::default(),
}
}
}

View File

@@ -36,6 +36,7 @@ use rand::rng;
use rand::seq::SliceRandom;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt};
use crate::batching_mode::BatchingModeOptions;
@@ -363,7 +364,10 @@ impl FrontendClient {
.handle_with_retry(
req.clone(),
batch_opts.experimental_grpc_max_retries,
&[(QUERY_PARALLELISM_HINT, &query.parallelism.to_string())],
&[
(QUERY_PARALLELISM_HINT, &query.parallelism.to_string()),
(READ_PREFERENCE_HINT, batch_opts.read_preference.as_ref()),
],
)
.await
.with_context(|_| InvalidRequestSnafu {

View File

@@ -1269,6 +1269,7 @@ experimental_frontend_scan_timeout = "30s"
experimental_frontend_activity_timeout = "1m"
experimental_max_filter_num_per_query = 20
experimental_time_window_merge_threshold = 3
read_preference = "Leader"
[logging]
max_log_files = 720