address the "This is silly" comment

This commit is contained in:
Christian Schwarz
2024-09-14 15:38:46 +00:00
parent adc798b59e
commit c736f9d6ef
4 changed files with 40 additions and 16 deletions

View File

@@ -312,8 +312,7 @@ impl ValuesReconstructState {
&mut self,
key: &Key,
lsn: Lsn,
completes: bool,
value: sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
future_value: FutureValue,
) -> ValueReconstructSituation {
let state = self
.keys
@@ -321,14 +320,22 @@ impl ValuesReconstructState {
.or_insert(Ok(VectoredValueReconstructState::default()));
if let Ok(state) = state {
match state.situation {
let key_done = match state.situation {
ValueReconstructSituation::Complete => unreachable!(),
ValueReconstructSituation::Continue => {
state.records.push((lsn, value));
}
}
ValueReconstructSituation::Continue => match future_value {
FutureValue::Img { rx } => {
assert!(state.img.is_none());
state.img = Some((lsn, rx));
true
}
FutureValue::WalRecord { will_init, rx } => {
state.records.push((lsn, rx));
will_init
}
},
};
if completes && state.situation == ValueReconstructSituation::Continue {
if key_done && state.situation == ValueReconstructSituation::Continue {
state.situation = ValueReconstructSituation::Complete;
self.keys_done.add_key(*key);
}
@@ -359,6 +366,16 @@ impl ValuesReconstructState {
}
}
enum FutureValue {
WalRecord {
will_init: bool,
rx: sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
},
Img {
rx: sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
},
}
impl Default for ValuesReconstructState {
fn default() -> Self {
Self::new()

View File

@@ -1002,8 +1002,10 @@ impl DeltaLayerInner {
reconstruct_state.update_key(
&blob_meta.key,
blob_meta.lsn,
blob_meta.will_init,
rx,
super::FutureValue::WalRecord {
will_init: blob_meta.will_init,
rx,
},
);
}

View File

@@ -588,8 +588,11 @@ impl ImageLayerInner {
for (_, blob_meta) in read.blobs_at.as_slice() {
let (tx, rx) = oneshot::channel();
senders.insert((blob_meta.key, blob_meta.lsn), tx);
reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true, rx);
reconstruct_state.update_key(
&blob_meta.key,
blob_meta.lsn,
super::FutureValue::Img { rx },
);
}
let buf_size = read.size();
@@ -624,9 +627,7 @@ impl ImageLayerInner {
let sender = senders
.remove(&(meta.meta.key, meta.meta.lsn))
.expect("sender must exist");
// TODO: this is silly - sort it out
let bytes = Value::ser(&Value::Image(Bytes::copy_from_slice(buf)))
.expect("stupid but correct");
let bytes = Bytes::copy_from_slice(buf);
let _ = sender.send(Ok(bytes.into()));
}

View File

@@ -467,7 +467,11 @@ impl InMemoryLayer {
let (tx, rx) = tokio::sync::oneshot::channel();
senders.insert((key, *entry_lsn), tx);
reconstruct_state.update_key(&key, *entry_lsn, will_init, rx);
reconstruct_state.update_key(
&key,
*entry_lsn,
crate::tenant::storage_layer::FutureValue::WalRecord { will_init, rx },
);
if will_init {
break;