mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
use tokio::time::Instant internally, chrono::DateTime<Utc> externally; commuicate expiration through rfc3339 format; chrono::DateTime has good Debug fmt so this also serves observability; finish implementing release valve mechanism
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -4538,6 +4538,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"futures",
|
||||
"pageserver_api",
|
||||
"postgres_ffi_types",
|
||||
|
||||
@@ -1,10 +1,7 @@
|
||||
use std::{
|
||||
pin::Pin,
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
use std::{pin::Pin, str::FromStr, sync::Arc, time::Duration};
|
||||
|
||||
use anyhow::Context;
|
||||
use chrono::Utc;
|
||||
use compute_api::spec::PageserverProtocol;
|
||||
use futures::{StreamExt, stream::FuturesUnordered};
|
||||
use postgres::SimpleQueryMessage;
|
||||
@@ -90,12 +87,15 @@ async fn bg_task(compute: Arc<ComputeNode>) {
|
||||
|
||||
let mut obtained = ObtainedLease {
|
||||
lsn: Lsn(0),
|
||||
nearest_expiration: SystemTime::UNIX_EPOCH,
|
||||
nearest_expiration: Utc::now(),
|
||||
};
|
||||
loop {
|
||||
let valid_duration = obtained
|
||||
let valid_duration: Duration = obtained
|
||||
.nearest_expiration
|
||||
.duration_since(SystemTime::now())
|
||||
.signed_duration_since(Utc::now())
|
||||
.to_std()
|
||||
// to_std() errors if the duration is less than zero, i.e,. if the lease already expired;
|
||||
// try to renew anyway in that case;
|
||||
.unwrap_or_default();
|
||||
// Sleep for 60 seconds less than the valid duration but no more than half of the valid duration.
|
||||
let sleep_duration = valid_duration
|
||||
@@ -139,7 +139,7 @@ async fn bg_task(compute: Arc<ComputeNode>) {
|
||||
|
||||
struct ObtainedLease {
|
||||
lsn: Lsn,
|
||||
nearest_expiration: SystemTime,
|
||||
nearest_expiration: chrono::DateTime<Utc>,
|
||||
}
|
||||
|
||||
async fn attempt(lease_id: String, compute: &Arc<ComputeNode>) -> anyhow::Result<ObtainedLease> {
|
||||
@@ -164,7 +164,7 @@ async fn attempt(lease_id: String, compute: &Arc<ComputeNode>) -> anyhow::Result
|
||||
timeline_id=%timeline_id,
|
||||
);
|
||||
let logging_wrapper =
|
||||
|fut: Pin<Box<dyn Future<Output = anyhow::Result<Option<SystemTime>>>>>| {
|
||||
|fut: Pin<Box<dyn Future<Output = anyhow::Result<Option<chrono::DateTime<Utc>>>>>>| {
|
||||
async move {
|
||||
// TODO: timeout?
|
||||
match fut.await {
|
||||
@@ -233,7 +233,7 @@ async fn attempt_one_libpq(
|
||||
timeline_id: TimelineId,
|
||||
lease_id: String,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<Option<SystemTime>> {
|
||||
) -> anyhow::Result<Option<chrono::DateTime<Utc>>> {
|
||||
let ConnectInfo {
|
||||
tenant_shard_id,
|
||||
connstring,
|
||||
@@ -256,14 +256,12 @@ async fn attempt_one_libpq(
|
||||
_ => anyhow::bail!("expected row message type"),
|
||||
};
|
||||
|
||||
// Note: this will be None if a lease is explicitly not granted.
|
||||
let Some(expiration) = row.get("expiration") else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let expiration =
|
||||
SystemTime::UNIX_EPOCH.checked_add(Duration::from_millis(u64::from_str(expiration)?));
|
||||
Ok(expiration)
|
||||
// Note: this will be NULL (=> None) if a lease is explicitly not granted.
|
||||
row.get("expiration")
|
||||
.map(|s| {
|
||||
chrono::DateTime::<Utc>::from_str(s).with_context(|| format!("parse expiration: {s:?}"))
|
||||
})
|
||||
.transpose()
|
||||
}
|
||||
|
||||
async fn attempt_one_grpc(
|
||||
@@ -271,7 +269,7 @@ async fn attempt_one_grpc(
|
||||
timeline_id: TimelineId,
|
||||
lease_id: String,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<Option<SystemTime>> {
|
||||
) -> anyhow::Result<Option<chrono::DateTime<Utc>>> {
|
||||
let ConnectInfo {
|
||||
tenant_shard_id,
|
||||
connstring,
|
||||
@@ -289,7 +287,7 @@ async fn attempt_one_grpc(
|
||||
|
||||
let req = pageserver_page_api::LeaseStandbyHorizonRequest { lease_id, lsn };
|
||||
match client.lease_standby_horizon(req).await {
|
||||
Ok(expires) => Ok(Some(expires)),
|
||||
Ok(pageserver_page_api::LeaseStandbyHorizonResponse { expiration }) => Ok(Some(expiration)),
|
||||
// Lease couldn't be acquired
|
||||
Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None),
|
||||
Err(err) => Err(err.into()),
|
||||
|
||||
@@ -6,6 +6,7 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
chrono.workspace = true
|
||||
bytes.workspace = true
|
||||
futures.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
|
||||
@@ -15,10 +15,10 @@
|
||||
//! receivers should expect all sorts of junk from senders. This also allows the sender to use e.g.
|
||||
//! stream combinators without dealing with errors, and avoids validating the same message twice.
|
||||
|
||||
use std::fmt::Display;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use std::{fmt::Display, time::{Duration, SystemTime, UNIX_EPOCH}};
|
||||
|
||||
use bytes::Bytes;
|
||||
use chrono::Utc;
|
||||
use postgres_ffi_types::Oid;
|
||||
// TODO: split out Lsn, RelTag, SlruKind and other basic types to a separate crate, to avoid
|
||||
// pulling in all of their other crate dependencies when building the client.
|
||||
@@ -789,29 +789,35 @@ impl From<LeaseStandbyHorizonRequest> for proto::LeaseStandbyHorizonRequest {
|
||||
|
||||
/// Lease expiration time. If the lease could not be granted because the LSN has already been
|
||||
/// garbage collected, a FailedPrecondition status will be returned instead.
|
||||
pub type LeaseStandbyHorizonResponse = SystemTime;
|
||||
pub struct LeaseStandbyHorizonResponse {
|
||||
pub expiration: chrono::DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl TryFrom<proto::LeaseStandbyHorizonResponse> for LeaseStandbyHorizonResponse {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(pb: proto::LeaseStandbyHorizonResponse) -> Result<Self, Self::Error> {
|
||||
let expiration = pb.expiration.ok_or(ProtocolError::Missing("expiration"))?;
|
||||
UNIX_EPOCH
|
||||
.checked_add(Duration::new(
|
||||
expiration.seconds as u64,
|
||||
expiration.nanos as u32,
|
||||
))
|
||||
.ok_or_else(|| ProtocolError::invalid("expiration", expiration))
|
||||
Ok(Self {
|
||||
// TODO: upgrade prost-type to 0.14.1 and use `chrono` feature?
|
||||
expiration: chrono::DateTime::<Utc>::from_timestamp(
|
||||
expiration.seconds,
|
||||
expiration
|
||||
.nanos
|
||||
.try_into()
|
||||
.map_err(|_| ProtocolError::invalid("expiration.nanos", expiration.nanos))?,
|
||||
)
|
||||
.ok_or_else(|| ProtocolError::invalid("expiration", expiration))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LeaseStandbyHorizonResponse> for proto::LeaseStandbyHorizonResponse {
|
||||
fn from(response: LeaseStandbyHorizonResponse) -> Self {
|
||||
let expiration = response.duration_since(UNIX_EPOCH).unwrap_or_default();
|
||||
Self {
|
||||
expiration: Some(prost_types::Timestamp {
|
||||
seconds: expiration.as_secs() as i64,
|
||||
nanos: expiration.subsec_nanos() as i32,
|
||||
seconds: response.expiration.timestamp() as i64,
|
||||
nanos: response.expiration.timestamp_subsec_nanos() as i32,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,7 +84,7 @@ use crate::tenant::mgr::{
|
||||
};
|
||||
use crate::tenant::storage_layer::IoConcurrency;
|
||||
use crate::tenant::timeline::handle::{Handle, HandleUpgradeError, WeakHandle};
|
||||
use crate::tenant::timeline::{self, WaitLsnError, WaitLsnTimeout, WaitLsnWaiter};
|
||||
use crate::tenant::timeline::{self, WaitLsnError, WaitLsnTimeout, WaitLsnWaiter, standby_horizon};
|
||||
use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
|
||||
use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation};
|
||||
|
||||
@@ -2236,7 +2236,7 @@ impl PageServerHandler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id, %lsn), ret)]
|
||||
#[instrument(skip_all, fields(shard_id, %lsn))]
|
||||
async fn handle_lease_standby_horizon<IO>(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
@@ -2263,26 +2263,17 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
set_tracing_field_shard_id(&timeline);
|
||||
|
||||
let result: Option<SystemTime> = timeline
|
||||
.lease_standby_horizon(lease_id, lsn, ctx) // logs errors internally
|
||||
let result = timeline
|
||||
// logs both Ok() and Err() internally, no need to do it here
|
||||
.lease_standby_horizon(lease_id, lsn, ctx)
|
||||
.ok();
|
||||
debug!(
|
||||
result = result.map(|x| chrono::DateTime::<Utc>::from(x).to_rfc3339()),
|
||||
"result"
|
||||
); // XXX better observability isn't great
|
||||
|
||||
// Encode result as Option<millis since epoch>
|
||||
let bytes = result.map(|t| {
|
||||
t.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.expect("we wouldn't allow a lease at epoch, system time would be horribly off")
|
||||
.as_millis()
|
||||
.to_string()
|
||||
.into_bytes()
|
||||
});
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col(
|
||||
b"expiration",
|
||||
)]))?
|
||||
.write_message_noflush(&BeMessage::DataRow(&[bytes.as_deref()]))?;
|
||||
.write_message_noflush(&BeMessage::DataRow(&[result
|
||||
.map(|standby_horizon::LeaseInfo { valid_until }| valid_until.to_rfc3339().into_bytes())
|
||||
.as_deref()]))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -3952,12 +3943,17 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
span_record!(lease_id=%lease_id, lsn=%lsn);
|
||||
|
||||
// Attempt to acquire a lease. Return FailedPrecondition if the lease could not be granted.
|
||||
let expiration = match timeline.lease_standby_horizon(lease_id, lsn, &ctx) {
|
||||
let standby_horizon::LeaseInfo {
|
||||
valid_until: expiration,
|
||||
} = match timeline.lease_standby_horizon(lease_id, lsn, &ctx) {
|
||||
Ok(expiration) => expiration,
|
||||
// Use Display so the error towards the client is crisp; the function already logged the error with backtrace.
|
||||
Err(err) => return Err(tonic::Status::failed_precondition(format!("{err}"))),
|
||||
};
|
||||
|
||||
Ok(tonic::Response::new(expiration.into()))
|
||||
Ok(tonic::Response::new(
|
||||
page_api::LeaseStandbyHorizonResponse { expiration }.into(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4880,7 +4880,9 @@ impl TenantShard {
|
||||
// Cull any expired leases
|
||||
let now = SystemTime::now();
|
||||
target.lsn_leases.retain(|_, lease| !lease.is_expired(&now));
|
||||
timeline.standby_horizons.cull_leases(now);
|
||||
timeline
|
||||
.standby_horizons
|
||||
.cull_leases(tokio::time::Instant::now());
|
||||
|
||||
timeline
|
||||
.metrics
|
||||
@@ -6218,6 +6220,7 @@ mod tests {
|
||||
use crate::keyspace::KeySpaceAccum;
|
||||
use crate::tenant::harness::*;
|
||||
use crate::tenant::timeline::CompactFlags;
|
||||
use crate::tenant::timeline::standby_horizon::LeaseInfo;
|
||||
|
||||
static TEST_KEY: Lazy<Key> =
|
||||
Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
|
||||
@@ -9609,11 +9612,13 @@ mod tests {
|
||||
// NB: leases are tracked by SystemTime, which is not monotonic, but we want to assert monotonicity of the lease below.
|
||||
// Sleep a second to make flakiness less likely.
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
let renewed_lease = timeline
|
||||
let LeaseInfo {
|
||||
valid_until: renewed_lease,
|
||||
} = timeline
|
||||
.lease_standby_horizon("renewed_lease_0".to_string(), Lsn(initial_leases[0]), &ctx)
|
||||
.expect("standby horizon lease renewal should succeed");
|
||||
assert!(
|
||||
renewed_lease >= expirations[0],
|
||||
renewed_lease >= expirations[0].valid_until,
|
||||
"New lease should have expiration time at least as good as original"
|
||||
);
|
||||
|
||||
@@ -9696,7 +9701,7 @@ mod tests {
|
||||
//
|
||||
timeline
|
||||
.standby_horizons
|
||||
.cull_leases(SystemTime::now() + lease_length + Duration::from_secs(1));
|
||||
.cull_leases(tokio::time::Instant::now() + lease_length + Duration::from_secs(1));
|
||||
|
||||
assert_eq!(timeline.standby_horizons.get_leases().len(), 0);
|
||||
|
||||
@@ -9846,6 +9851,46 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_standby_horizon_emergency_forget_all_leases() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_standby_horizon_emergency_forget_all_leases")
|
||||
.await
|
||||
.unwrap()
|
||||
.load()
|
||||
.await;
|
||||
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
//
|
||||
// Empty call works
|
||||
//
|
||||
timeline.standby_horizons.validate_invariants(&timeline);
|
||||
timeline
|
||||
.standby_horizons
|
||||
.emergency_forget_all_leases_immediately();
|
||||
timeline.standby_horizons.validate_invariants(&timeline);
|
||||
|
||||
//
|
||||
// Leases are cleared but legacy remains
|
||||
//
|
||||
let legacy_lsn = Lsn(0x10);
|
||||
timeline.standby_horizons.register_legacy_update(legacy_lsn);
|
||||
timeline.lease_standby_horizon("mylease".to_string(), Lsn(0x20), &ctx)?;
|
||||
timeline.lease_standby_horizon("otherlease".to_string(), Lsn(0x30), &ctx)?;
|
||||
timeline.standby_horizons.validate_invariants(&timeline);
|
||||
timeline
|
||||
.standby_horizons
|
||||
.emergency_forget_all_leases_immediately();
|
||||
timeline.standby_horizons.validate_invariants(&timeline);
|
||||
assert_eq!(timeline.standby_horizons.get_leases().len(), 0);
|
||||
assert_eq!(timeline.standby_horizons.legacy(), Some(legacy_lsn));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_failed_flush_should_not_update_disk_consistent_lsn() -> anyhow::Result<()> {
|
||||
//
|
||||
|
||||
@@ -1868,22 +1868,24 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Logs return value at info level and errors at warn level in Debug::fmt.
|
||||
#[instrument(skip(_ctx), ret(level = tracing::Level::INFO, Debug), err(level = tracing::Level::WARN, Debug))]
|
||||
#[instrument(skip_all, fields(?lease_id, %lsn), ret(level = tracing::Level::INFO, Debug), err(level = tracing::Level::WARN, Debug))]
|
||||
pub(crate) fn lease_standby_horizon(
|
||||
&self,
|
||||
lease_id: String,
|
||||
lsn: Lsn,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<SystemTime> {
|
||||
) -> anyhow::Result<standby_horizon::LeaseInfo> {
|
||||
if self
|
||||
.feature_resolver
|
||||
.evaluate_boolean("standby-horizon-leases-track-disable")
|
||||
.is_ok()
|
||||
{
|
||||
// Use feature-flagging as a fail-safe in case something is wrong with the data structure (memory consumption, etc.)
|
||||
return Ok(SystemTime::now()
|
||||
.checked_add(Duration::from_secs(5 * 60))
|
||||
.unwrap());
|
||||
// Use feature-flagging as an emergency switch in case something is wrong with the data structure (memory consumption, etc.)
|
||||
self.standby_horizons
|
||||
.emergency_forget_all_leases_immediately();
|
||||
return Ok(standby_horizon::LeaseInfo {
|
||||
valid_until: Utc::now() + self.get_standby_horizon_lease_length(),
|
||||
});
|
||||
}
|
||||
let applied_gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn();
|
||||
if lsn < *applied_gc_cutoff_lsn_guard {
|
||||
@@ -1894,9 +1896,7 @@ impl Timeline {
|
||||
);
|
||||
}
|
||||
let length = self.get_standby_horizon_lease_length();
|
||||
self.standby_horizons
|
||||
.upsert_lease(lease_id, lsn, length)
|
||||
.map(|lease| lease.valid_until)
|
||||
self.standby_horizons.upsert_lease(lease_id, lsn, length)
|
||||
}
|
||||
|
||||
/// Freeze the current open in-memory layer. It will be written to disk on next iteration.
|
||||
|
||||
@@ -12,10 +12,12 @@
|
||||
|
||||
use std::{
|
||||
collections::{HashMap, hash_map},
|
||||
time::{Duration, SystemTime},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use chrono::Utc;
|
||||
use metrics::{IntGauge, UIntGauge};
|
||||
use tokio::time::Instant;
|
||||
use tracing::{instrument, warn};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -43,7 +45,7 @@ pub struct Metrics {
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Lease {
|
||||
valid_until: SystemTime,
|
||||
valid_until: tokio::time::Instant,
|
||||
lsn: Lsn,
|
||||
}
|
||||
|
||||
@@ -62,7 +64,7 @@ impl Lease {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LeaseInfo {
|
||||
pub valid_until: SystemTime,
|
||||
pub valid_until: chrono::DateTime<Utc>,
|
||||
}
|
||||
|
||||
/// Returned by [`Self::min_and_clear_legacy`].
|
||||
@@ -129,6 +131,8 @@ impl Horizons {
|
||||
}
|
||||
}
|
||||
|
||||
/// Renew a lease. Due to their nonpersistent nature we can't tell the difference between
|
||||
/// a renewal and an initial lease, so, let's call this `upsert` as a neutral term.
|
||||
pub fn upsert_lease(
|
||||
&self,
|
||||
id: String,
|
||||
@@ -136,7 +140,8 @@ impl Horizons {
|
||||
length: Duration,
|
||||
) -> anyhow::Result<LeaseInfo> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let valid_until = SystemTime::now() + length;
|
||||
let now = Instant::now();
|
||||
let valid_until = now + length;
|
||||
let update = Lease { valid_until, lsn };
|
||||
let updated = match inner.leases_by_id.entry(id) {
|
||||
hash_map::Entry::Occupied(mut entry) => {
|
||||
@@ -145,18 +150,26 @@ impl Horizons {
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => entry.insert(update),
|
||||
};
|
||||
let res = LeaseInfo {
|
||||
valid_until: updated.valid_until,
|
||||
// Convert the internal expiration time to a wallclock time to return it to the caller.
|
||||
let lease_info = LeaseInfo {
|
||||
valid_until: Utc::now()
|
||||
+ updated
|
||||
.valid_until
|
||||
// reuse now as the upsert is assumed to be fast
|
||||
.checked_duration_since(now)
|
||||
.expect("reuse of now() + guarantee of monotonicity in try_update"),
|
||||
};
|
||||
|
||||
let new_count = inner.leases_by_id.len().into_u64();
|
||||
inner.metrics.leases_count.set(new_count);
|
||||
let leases_min = inner.leases_by_id.values().map(|v| v.lsn).min();
|
||||
inner.leases_min = leases_min;
|
||||
inner.metrics.leases_min.set(leases_min.unwrap_or(Lsn(0)).0);
|
||||
Ok(res)
|
||||
|
||||
Ok(lease_info)
|
||||
}
|
||||
|
||||
pub fn cull_leases(&self, now: SystemTime) {
|
||||
pub fn cull_leases(&self, now: Instant) {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let mut min = None;
|
||||
inner.leases_by_id.retain(|_, l| {
|
||||
@@ -176,6 +189,23 @@ impl Horizons {
|
||||
inner.metrics.leases_min.set(min.unwrap_or(Lsn(0)).0);
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn emergency_forget_all_leases_immediately(&self) {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
for (lease_id, _) in inner.leases_by_id.drain() {
|
||||
warn!("dropping lease early {}", lease_id);
|
||||
}
|
||||
inner
|
||||
.metrics
|
||||
.leases_count
|
||||
.set(inner.leases_by_id.len().into_u64());
|
||||
inner.leases_min = None;
|
||||
inner
|
||||
.metrics
|
||||
.leases_min
|
||||
.set(inner.leases_min.unwrap_or(Lsn(0)).0);
|
||||
}
|
||||
|
||||
pub fn dump(&self) -> serde_json::Value {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
let Inner {
|
||||
@@ -252,7 +282,7 @@ impl Horizons {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn get_leases(&self) -> Vec<(Lsn, SystemTime)> {
|
||||
pub fn get_leases(&self) -> Vec<(Lsn, tokio::time::Instant)> {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
inner
|
||||
.leases_by_id
|
||||
|
||||
Reference in New Issue
Block a user