mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-30 09:30:41 +00:00
Compare commits
1 Commits
main
...
jack/read-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ed6b166fc4 |
@@ -70,18 +70,29 @@ use tokio::sync::RwLock;
|
||||
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
|
||||
const MIN_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-version");
|
||||
const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp");
|
||||
const MIN_READ_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-read-version");
|
||||
const VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-version");
|
||||
const METRIC_TYPE_KEY: &str = "metric_type";
|
||||
const INDEX_TYPE_KEY: &str = "index_type";
|
||||
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
|
||||
const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Per-table state driving the freshness headers (`x-lancedb-min-version` and
|
||||
/// `x-lancedb-min-timestamp`) sent on read requests.
|
||||
/// Per-table state driving the freshness headers (`x-lancedb-min-version`,
|
||||
/// `x-lancedb-min-timestamp`, and `x-lancedb-min-read-version`) sent on read
|
||||
/// requests.
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
struct FreshnessState {
|
||||
/// Provides read-your-write within a single handle: writes that return a
|
||||
/// version update this, and reads send it as `x-lancedb-min-version`.
|
||||
min_version: Option<u64>,
|
||||
/// Highest dataset version observed in a *read* response on this handle.
|
||||
/// Reads send it as `x-lancedb-min-read-version` so a load-balanced query
|
||||
/// node whose cache is behind this version must refresh before serving,
|
||||
/// giving monotonic reads across nodes regardless of which one the load
|
||||
/// balancer routes to. Sourced only from reads (always committed dataset
|
||||
/// versions), never from writes (which may return WAL entry ids), so it is
|
||||
/// unaffected by the WAL/version mismatch that retired `min_version`.
|
||||
min_read_version: Option<u64>,
|
||||
/// Wall-clock time captured at the last [`BaseTable::checkout_latest`]
|
||||
/// call. Subsequent reads send
|
||||
/// `max(baseline, now - read_consistency_interval)` as
|
||||
@@ -102,6 +113,7 @@ struct FreshnessState {
|
||||
struct FreshnessHeaders {
|
||||
min_version: Option<u64>,
|
||||
min_timestamp: Option<SystemTime>,
|
||||
min_read_version: Option<u64>,
|
||||
}
|
||||
|
||||
impl FreshnessHeaders {
|
||||
@@ -113,6 +125,9 @@ impl FreshnessHeaders {
|
||||
let dt: chrono::DateTime<chrono::Utc> = ts.into();
|
||||
request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339());
|
||||
}
|
||||
if let Some(v) = self.min_read_version {
|
||||
request = request.header(MIN_READ_VERSION_HEADER, v.to_string());
|
||||
}
|
||||
request
|
||||
}
|
||||
}
|
||||
@@ -884,6 +899,7 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
self.client.read_consistency_interval,
|
||||
SystemTime::now(),
|
||||
),
|
||||
min_read_version: state.min_read_version,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -905,6 +921,30 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
state.min_version = Some(state.min_version.map_or(version, |v| v.max(version)));
|
||||
}
|
||||
|
||||
/// Record a dataset version observed in a *read* response so subsequent
|
||||
/// reads request at least this version via `x-lancedb-min-read-version`,
|
||||
/// giving monotonic reads across load-balanced query nodes. A returned `0`
|
||||
/// (or absent header from an old server) is ignored.
|
||||
fn track_read_version(&self, version: u64) {
|
||||
if version == 0 {
|
||||
return;
|
||||
}
|
||||
let mut state = self.freshness.lock().unwrap();
|
||||
state.min_read_version = Some(state.min_read_version.map_or(version, |v| v.max(version)));
|
||||
}
|
||||
|
||||
/// Parse the `x-lancedb-version` response header (the dataset version a read
|
||||
/// reflects) and fold it into the read-version watermark.
|
||||
fn track_read_version_from_headers(&self, headers: &reqwest::header::HeaderMap) {
|
||||
if let Some(version) = headers
|
||||
.get(&VERSION_HEADER)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.and_then(|value| value.parse::<u64>().ok())
|
||||
{
|
||||
self.track_read_version(version);
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_query(
|
||||
&self,
|
||||
query: &AnyQuery,
|
||||
@@ -928,6 +968,7 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
|
||||
let futures = requests.into_iter().map(|req| async move {
|
||||
let (request_id, response) = self.send(req, true).await?;
|
||||
self.track_read_version_from_headers(response.headers());
|
||||
self.read_arrow_stream(&request_id, response).await
|
||||
});
|
||||
let streams = futures::future::try_join_all(futures);
|
||||
@@ -1545,11 +1586,12 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
*write_guard = None;
|
||||
drop(write_guard);
|
||||
|
||||
// Drop any per-handle write tracking; subsequent reads use the
|
||||
// Drop any per-handle read/write tracking; subsequent reads use the
|
||||
// baseline timestamp captured now to guarantee freshness.
|
||||
*self.freshness.lock().unwrap() = FreshnessState {
|
||||
min_version: None,
|
||||
checkout_baseline: Some(SystemTime::now()),
|
||||
min_read_version: None,
|
||||
};
|
||||
|
||||
// Invalidate schema cache since we're switching versions
|
||||
@@ -1805,6 +1847,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
}
|
||||
};
|
||||
|
||||
self.track_read_version_from_headers(response.headers());
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
|
||||
serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
@@ -7124,6 +7167,7 @@ mod tests {
|
||||
let state = FreshnessState {
|
||||
min_version: None,
|
||||
checkout_baseline: Some(baseline),
|
||||
min_read_version: None,
|
||||
};
|
||||
assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline));
|
||||
|
||||
@@ -7148,6 +7192,7 @@ mod tests {
|
||||
let state = FreshnessState {
|
||||
min_version: None,
|
||||
checkout_baseline: Some(baseline),
|
||||
min_read_version: None,
|
||||
};
|
||||
assert_eq!(
|
||||
compute_min_timestamp(&state, Some(Duration::from_secs(10)), now),
|
||||
@@ -7159,6 +7204,7 @@ mod tests {
|
||||
let state = FreshnessState {
|
||||
min_version: None,
|
||||
checkout_baseline: Some(recent_baseline),
|
||||
min_read_version: None,
|
||||
};
|
||||
assert_eq!(
|
||||
compute_min_timestamp(&state, Some(Duration::from_secs(60)), now),
|
||||
@@ -7303,6 +7349,106 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
/// A handler that records every request's headers and answers each read with
|
||||
/// an `x-lancedb-version` response header taken from `versions` (by call
|
||||
/// index, saturating at the last entry). An empty string means "no header".
|
||||
fn read_version_handler(
|
||||
versions: &'static [&'static str],
|
||||
) -> (
|
||||
impl Fn(reqwest::Request) -> http::Response<String> + Clone + Send + Sync + 'static,
|
||||
Arc<std::sync::Mutex<Vec<http::HeaderMap>>>,
|
||||
) {
|
||||
let requests = Arc::new(std::sync::Mutex::new(Vec::new()));
|
||||
let requests_c = requests.clone();
|
||||
let call = Arc::new(AtomicUsize::new(0));
|
||||
let handler = move |request: reqwest::Request| {
|
||||
requests_c.lock().unwrap().push(request.headers().clone());
|
||||
let i = call.fetch_add(1, Ordering::SeqCst).min(versions.len() - 1);
|
||||
let mut builder = http::Response::builder().status(200);
|
||||
if !versions[i].is_empty() {
|
||||
builder = builder.header("x-lancedb-version", versions[i]);
|
||||
}
|
||||
builder.body("42".to_string()).unwrap()
|
||||
};
|
||||
(handler, requests)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_version_watermark_tracked_and_sent() {
|
||||
let (handler, requests) = read_version_handler(&["100", "100"]);
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
// First read has no watermark yet; the response advertises version 100,
|
||||
// so the second read must floor the server at 100.
|
||||
table.count_rows(None).await.unwrap();
|
||||
table.count_rows(None).await.unwrap();
|
||||
|
||||
let reqs = requests.lock().unwrap();
|
||||
assert!(!reqs[0].contains_key("x-lancedb-min-read-version"));
|
||||
assert_eq!(
|
||||
reqs[1]
|
||||
.get("x-lancedb-min-read-version")
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap(),
|
||||
"100"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_version_watermark_keeps_max() {
|
||||
// Server reports 100 then a stale 50; the watermark must not regress.
|
||||
let (handler, requests) = read_version_handler(&["100", "50", "50"]);
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
table.count_rows(None).await.unwrap();
|
||||
table.count_rows(None).await.unwrap();
|
||||
table.count_rows(None).await.unwrap();
|
||||
|
||||
let reqs = requests.lock().unwrap();
|
||||
assert_eq!(
|
||||
reqs[2]
|
||||
.get("x-lancedb-min-read-version")
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap(),
|
||||
"100"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_version_absent_header_no_watermark() {
|
||||
// An old server that doesn't return the version header leaves the
|
||||
// watermark unset, preserving backward compatibility.
|
||||
let (handler, requests) = read_version_handler(&[""]);
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
table.count_rows(None).await.unwrap();
|
||||
table.count_rows(None).await.unwrap();
|
||||
|
||||
let reqs = requests.lock().unwrap();
|
||||
assert!(!reqs[1].contains_key("x-lancedb-min-read-version"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_version_watermark_reset_on_checkout_latest() {
|
||||
let (handler, requests) = read_version_handler(&["100", "100"]);
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
table.count_rows(None).await.unwrap();
|
||||
table.checkout_latest().await.unwrap();
|
||||
table.count_rows(None).await.unwrap();
|
||||
|
||||
// The read after checkout_latest starts from a clean slate.
|
||||
let reqs = requests.lock().unwrap();
|
||||
assert!(
|
||||
!reqs
|
||||
.last()
|
||||
.unwrap()
|
||||
.contains_key("x-lancedb-min-read-version")
|
||||
);
|
||||
}
|
||||
|
||||
/// Like `capturing_handler`, but keeps a per-path snapshot of the headers
|
||||
/// from every request so tests can assert on a specific endpoint.
|
||||
#[allow(clippy::type_complexity)]
|
||||
|
||||
Reference in New Issue
Block a user