From beaa2cd0a22f3991a10152343d435eb6f3091f21 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 20 Aug 2021 11:45:33 +0300 Subject: [PATCH] Handle COPY error --- control_plane/src/compute.rs | 2 +- walkeeper/src/replication.rs | 4 +++- zenith_utils/src/postgres_backend.rs | 2 +- zenith_utils/src/pq_proto.rs | 8 ++++++++ 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index fb68007d8f..89f56f9f4f 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -320,7 +320,7 @@ impl PostgresNode { // Never clean up old WAL. TODO: We should use a replication // slot or something proper, to prevent the compute node - // from removing WAL that hasn't been streamed to the safekeepr or + // from removing WAL that hasn't been streamed to the safekeeper or // page server yet. (gh issue #349) self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n")?; diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 907e4868cd..9012e8e10d 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -76,8 +76,10 @@ impl ReplicationConn { let feedback = HotStandbyFeedback::des(&m)?; subscriber.add_hs_feedback(feedback); } + FeMessage::Sync => {} + FeMessage::CopyFail => return Err(anyhow!("Copy failed")), _ => { - // We only handle `CopyData` messages. Anything else is ignored. + // We only handle `CopyData`, 'Sync', 'CopyFail' messages. Anything else is ignored. info!("unexpected message {:?}", msg); } } diff --git a/zenith_utils/src/postgres_backend.rs b/zenith_utils/src/postgres_backend.rs index e730178823..27aa0e4277 100644 --- a/zenith_utils/src/postgres_backend.rs +++ b/zenith_utils/src/postgres_backend.rs @@ -341,7 +341,7 @@ impl PostgresBackend { // We prefer explicit pattern matching to wildcards, because // this helps us spot the places where new variants are missing - FeMessage::CopyData(_) | FeMessage::CopyDone => { + FeMessage::CopyData(_) | FeMessage::CopyDone | FeMessage::CopyFail => { bail!("unexpected message type: {:?}", msg); } } diff --git a/zenith_utils/src/pq_proto.rs b/zenith_utils/src/pq_proto.rs index 780474a4b9..b2722f3b13 100644 --- a/zenith_utils/src/pq_proto.rs +++ b/zenith_utils/src/pq_proto.rs @@ -31,6 +31,7 @@ pub enum FeMessage { Terminate, CopyData(Bytes), CopyDone, + CopyFail, PasswordMessage(Bytes), } @@ -138,6 +139,7 @@ impl FeMessage { b'X' => Ok(Some(FeMessage::Terminate)), b'd' => Ok(Some(FeMessage::CopyData(body))), b'c' => Ok(Some(FeMessage::CopyDone)), + b'f' => Ok(Some(FeMessage::CopyFail)), b'p' => Ok(Some(FeMessage::PasswordMessage(body))), tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)), } @@ -338,6 +340,7 @@ pub enum BeMessage<'a> { ControlFile, CopyData(&'a [u8]), CopyDone, + CopyFail, CopyInResponse, CopyOutResponse, CopyBothResponse, @@ -546,6 +549,11 @@ impl<'a> BeMessage<'a> { write_body(buf, |_| Ok::<(), io::Error>(())).unwrap(); } + BeMessage::CopyFail => { + buf.put_u8(b'f'); + write_body(buf, |_| Ok::<(), io::Error>(())).unwrap(); + } + BeMessage::CopyInResponse => { buf.put_u8(b'G'); write_body(buf, |buf| {