Handle COPY error

This commit is contained in:
Konstantin Knizhnik
2021-08-20 11:45:33 +03:00
committed by Stas Kelvich
parent c4450907e5
commit beaa2cd0a2
4 changed files with 13 additions and 3 deletions

View File

@@ -320,7 +320,7 @@ impl PostgresNode {
// Never clean up old WAL. TODO: We should use a replication
// slot or something proper, to prevent the compute node
// from removing WAL that hasn't been streamed to the safekeepr or
// from removing WAL that hasn't been streamed to the safekeeper or
// page server yet. (gh issue #349)
self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n")?;

View File

@@ -76,8 +76,10 @@ impl ReplicationConn {
let feedback = HotStandbyFeedback::des(&m)?;
subscriber.add_hs_feedback(feedback);
}
FeMessage::Sync => {}
FeMessage::CopyFail => return Err(anyhow!("Copy failed")),
_ => {
// We only handle `CopyData` messages. Anything else is ignored.
// We only handle `CopyData`, 'Sync', 'CopyFail' messages. Anything else is ignored.
info!("unexpected message {:?}", msg);
}
}

View File

@@ -341,7 +341,7 @@ impl PostgresBackend {
// We prefer explicit pattern matching to wildcards, because
// this helps us spot the places where new variants are missing
FeMessage::CopyData(_) | FeMessage::CopyDone => {
FeMessage::CopyData(_) | FeMessage::CopyDone | FeMessage::CopyFail => {
bail!("unexpected message type: {:?}", msg);
}
}

View File

@@ -31,6 +31,7 @@ pub enum FeMessage {
Terminate,
CopyData(Bytes),
CopyDone,
CopyFail,
PasswordMessage(Bytes),
}
@@ -138,6 +139,7 @@ impl FeMessage {
b'X' => Ok(Some(FeMessage::Terminate)),
b'd' => Ok(Some(FeMessage::CopyData(body))),
b'c' => Ok(Some(FeMessage::CopyDone)),
b'f' => Ok(Some(FeMessage::CopyFail)),
b'p' => Ok(Some(FeMessage::PasswordMessage(body))),
tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)),
}
@@ -338,6 +340,7 @@ pub enum BeMessage<'a> {
ControlFile,
CopyData(&'a [u8]),
CopyDone,
CopyFail,
CopyInResponse,
CopyOutResponse,
CopyBothResponse,
@@ -546,6 +549,11 @@ impl<'a> BeMessage<'a> {
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
}
BeMessage::CopyFail => {
buf.put_u8(b'f');
write_body(buf, |_| Ok::<(), io::Error>(())).unwrap();
}
BeMessage::CopyInResponse => {
buf.put_u8(b'G');
write_body(buf, |buf| {