diff --git a/Cargo.lock b/Cargo.lock index c61ea14587..7a714e7389 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -633,6 +633,19 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "asynchronous-codec" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a860072022177f903e59730004fb5dc13db9275b79bb2aef7ba8ce831956c233" +dependencies = [ + "bytes", + "futures-sink", + "futures-util", + "memchr", + "pin-project-lite", +] + [[package]] name = "atoi" version = "1.0.0" @@ -3961,7 +3974,9 @@ name = "index" version = "0.4.4" dependencies = [ "async-trait", + "asynchronous-codec", "bytemuck", + "bytes", "common-base", "common-error", "common-macro", diff --git a/Cargo.toml b/Cargo.toml index 7b2e23c7e9..f5f4873b67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,6 +75,7 @@ base64 = "0.21" bigdecimal = "0.4.2" bitflags = "2.4.1" bytemuck = "1.12" +bytes = { version = "1.5", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } diff --git a/src/common/base/Cargo.toml b/src/common/base/Cargo.toml index 8bcae21f08..54926afcc4 100644 --- a/src/common/base/Cargo.toml +++ b/src/common/base/Cargo.toml @@ -7,7 +7,7 @@ license.workspace = true [dependencies] anymap = "1.0.0-beta.2" bitvec = "1.0" -bytes = { version = "1.1", features = ["serde"] } +bytes.workspace = true common-error.workspace = true common-macro.workspace = true paste = "1.0" diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index 3de0d2fbae..3027052e4c 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -16,7 +16,7 @@ async-compression = { version = "0.3", features = [ "tokio", ] } async-trait.workspace = true -bytes = "1.1" +bytes.workspace = true common-error.workspace = true common-macro.workspace = true common-runtime.workspace = true diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 891076f914..3415c841ae 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -13,7 +13,7 @@ async-recursion = "1.0" async-stream.workspace = true async-trait.workspace = true base64.workspace = true -bytes = "1.4" +bytes.workspace = true common-catalog.workspace = true common-error.workspace = true common-grpc-expr.workspace = true diff --git a/src/common/substrait/Cargo.toml b/src/common/substrait/Cargo.toml index 3137cf6290..9ae3660067 100644 --- a/src/common/substrait/Cargo.toml +++ b/src/common/substrait/Cargo.toml @@ -7,7 +7,7 @@ license.workspace = true [dependencies] async-recursion = "1.0" async-trait.workspace = true -bytes = "1.1" +bytes.workspace = true catalog.workspace = true common-catalog.workspace = true common-error.workspace = true diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index b5fe76a08d..ee568a3c06 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -12,7 +12,7 @@ async-stream.workspace = true async-trait.workspace = true axum = "0.6" axum-macros = "0.3" -bytes = "1.1" +bytes.workspace = true catalog.workspace = true client.workspace = true common-base.workspace = true diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index 800420164a..bd5b560ce8 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -6,7 +6,9 @@ license.workspace = true [dependencies] async-trait.workspace = true +asynchronous-codec = "0.7.0" bytemuck.workspace = true +bytes.workspace = true common-base.workspace = true common-error.workspace = true common-macro.workspace = true diff --git a/src/index/src/inverted_index.rs b/src/index/src/inverted_index.rs index 96db32a0cb..a793d1a252 100644 --- a/src/index/src/inverted_index.rs +++ b/src/index/src/inverted_index.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod create; pub mod error; pub mod format; pub mod search; diff --git a/src/index/src/inverted_index/create.rs b/src/index/src/inverted_index/create.rs new file mode 100644 index 0000000000..a57c2c36b7 --- /dev/null +++ b/src/index/src/inverted_index/create.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod sort; diff --git a/src/index/src/inverted_index/create/sort.rs b/src/index/src/inverted_index/create/sort.rs new file mode 100644 index 0000000000..2331ed8dcb --- /dev/null +++ b/src/index/src/inverted_index/create/sort.rs @@ -0,0 +1,24 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_base::BitVec; +use futures::Stream; + +use crate::inverted_index::error::Result; +use crate::inverted_index::Bytes; + +mod intermediate_rw; + +/// A stream of sorted values along with their associated bitmap +pub type SortedStream = Box> + Send + Unpin>; diff --git a/src/index/src/inverted_index/create/sort/intermediate_rw.rs b/src/index/src/inverted_index/create/sort/intermediate_rw.rs new file mode 100644 index 0000000000..754a219155 --- /dev/null +++ b/src/index/src/inverted_index/create/sort/intermediate_rw.rs @@ -0,0 +1,141 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod codec_v1; + +use std::collections::BTreeMap; + +use asynchronous_codec::{FramedRead, FramedWrite}; +use common_base::BitVec; +use futures::{stream, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt}; +use snafu::ResultExt; + +use crate::inverted_index::create::sort::SortedStream; +use crate::inverted_index::error::{ + CloseSnafu, FlushSnafu, ReadSnafu, Result, UnknownIntermediateCodecMagicSnafu, WriteSnafu, +}; +use crate::inverted_index::Bytes; + +/// `IntermediateWriter` serializes and writes intermediate data to the wrapped `writer` +pub struct IntermediateWriter { + writer: W, +} + +impl IntermediateWriter { + /// Creates a new `IntermediateWriter` wrapping an `AsyncWrite` + pub fn new(writer: W) -> IntermediateWriter { + IntermediateWriter { writer } + } + + /// Serializes and writes all provided values to the wrapped writer + pub async fn write_all(mut self, values: BTreeMap) -> Result<()> { + let (codec_magic, encoder) = (codec_v1::CODEC_V1_MAGIC, codec_v1::IntermediateCodecV1); + + self.writer + .write_all(codec_magic) + .await + .context(WriteSnafu)?; + + let value_stream = stream::iter(values.into_iter().map(Ok)); + let frame_write = FramedWrite::new(&mut self.writer, encoder); + value_stream.forward(frame_write).await?; + + self.writer.flush().await.context(FlushSnafu)?; + self.writer.close().await.context(CloseSnafu) + } +} + +/// Reads intermediate serialized data from an `AsyncRead` source and converts it to a [`SortedStream`] +pub struct IntermediateReader { + reader: R, +} + +impl IntermediateReader { + pub fn new(reader: R) -> IntermediateReader { + IntermediateReader { reader } + } + + /// Reads the magic header, determines the codec, and returns a stream of deserialized values. + pub async fn into_stream(mut self) -> Result { + let mut magic = [0u8; 4]; + self.reader + .read_exact(&mut magic) + .await + .context(ReadSnafu)?; + + let decoder = match &magic { + codec_v1::CODEC_V1_MAGIC => codec_v1::IntermediateCodecV1, + _ => return UnknownIntermediateCodecMagicSnafu { magic }.fail(), + }; + + Ok(Box::new(FramedRead::new(self.reader, decoder))) + } +} + +#[cfg(test)] +mod tests { + use futures::io::Cursor; + + use super::*; + use crate::inverted_index::error::Error; + + #[tokio::test] + async fn test_intermediate_read_write_basic() { + let mut buf = vec![]; + + let values = BTreeMap::from_iter([ + (Bytes::from("a"), BitVec::from_slice(&[0b10101010])), + (Bytes::from("b"), BitVec::from_slice(&[0b01010101])), + ]); + + let writer = IntermediateWriter::new(&mut buf); + writer.write_all(values.clone()).await.unwrap(); + + let reader = IntermediateReader::new(Cursor::new(buf)); + let mut stream = reader.into_stream().await.unwrap(); + + let a = stream.next().await.unwrap().unwrap(); + assert_eq!(a, (Bytes::from("a"), BitVec::from_slice(&[0b10101010]))); + let b = stream.next().await.unwrap().unwrap(); + assert_eq!(b, (Bytes::from("b"), BitVec::from_slice(&[0b01010101]))); + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn test_intermediate_read_write_empty() { + let mut buf = vec![]; + + let values = BTreeMap::new(); + + let writer = IntermediateWriter::new(&mut buf); + writer.write_all(values.clone()).await.unwrap(); + + let reader = IntermediateReader::new(Cursor::new(buf)); + let mut stream = reader.into_stream().await.unwrap(); + + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn test_intermediate_read_with_invalid_magic() { + let buf = b"invalid".to_vec(); + + let reader = IntermediateReader::new(Cursor::new(buf)); + let result = reader.into_stream().await; + assert!(matches!( + result, + Err(Error::UnknownIntermediateCodecMagic { .. }) + )) + } +} diff --git a/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs b/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs new file mode 100644 index 0000000000..14d1dcb9d3 --- /dev/null +++ b/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs @@ -0,0 +1,166 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; + +use asynchronous_codec::{BytesMut, Decoder, Encoder}; +use bytes::{Buf, BufMut}; +use common_base::BitVec; +use snafu::{location, Location}; + +use crate::inverted_index::error::{Error, Result}; +use crate::inverted_index::Bytes; + +const U64_LENGTH: usize = std::mem::size_of::(); + +/// Magic bytes for this intermediate codec version +pub const CODEC_V1_MAGIC: &[u8; 4] = b"im01"; + +/// Codec for serializing and deserializing intermediate data for external sorting. +/// +/// Binary format serialization. The item is laid out as follows: +/// ```text +/// [value len][value][bitmap len][bitmap] +/// [8] [?] [8] [?] +/// ``` +pub struct IntermediateCodecV1; + +/// [`FramedWrite`] requires the [`Encoder`] trait to be implemented. +impl Encoder for IntermediateCodecV1 { + type Item<'a> = (Bytes, BitVec); + type Error = Error; + + fn encode(&mut self, item: (Bytes, BitVec), dst: &mut BytesMut) -> Result<()> { + let value_bytes = item.0; + let bitmap_bytes = item.1.into_vec(); + + dst.reserve(U64_LENGTH * 2 + value_bytes.len() + bitmap_bytes.len()); + dst.put_u64_le(value_bytes.len() as u64); + dst.extend_from_slice(&value_bytes); + dst.put_u64_le(bitmap_bytes.len() as u64); + dst.extend_from_slice(&bitmap_bytes); + Ok(()) + } +} + +/// [`FramedRead`] requires the [`Decoder`] trait to be implemented. +impl Decoder for IntermediateCodecV1 { + type Item = (Bytes, BitVec); + type Error = Error; + + /// Decodes the `src` into `(Bytes, BitVec)`. Returns `None` if + /// the `src` does not contain enough data for a complete item. + /// + /// Only after successful decoding, the `src` is advanced. Otherwise, + /// it is left untouched to wait for filling more data and retrying. + fn decode(&mut self, src: &mut BytesMut) -> Result> { + // [value len][value][bitmap len][bitmap] + // [8] [?] [8] [?] + + // decode value len + if src.len() < U64_LENGTH { + return Ok(None); + } + let (value_len, buf) = src.split_at(U64_LENGTH); + let value_len = u64::from_le_bytes(value_len.try_into().unwrap()) as usize; + + // decode value + if buf.len() < value_len { + return Ok(None); + } + let (value_bytes, buf) = buf.split_at(value_len); + + // decode bitmap len + if buf.len() < U64_LENGTH { + return Ok(None); + } + let (bitmap_len, buf) = buf.split_at(U64_LENGTH); + let bitmap_len = u64::from_le_bytes(bitmap_len.try_into().unwrap()) as usize; + + // decode bitmap + if buf.len() < bitmap_len { + return Ok(None); + } + let bitmap_bytes = &buf[..bitmap_len]; + + let item = (value_bytes.to_vec(), BitVec::from_slice(bitmap_bytes)); + + src.advance(U64_LENGTH * 2 + value_len + bitmap_len); + Ok(Some(item)) + } +} + +/// Required for [`Encoder`] and [`Decoder`] implementations. +impl From for Error { + fn from(error: io::Error) -> Self { + Error::CommonIoError { + error, + location: location!(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_intermediate_codec_basic() { + let mut codec = IntermediateCodecV1; + let mut buf = BytesMut::new(); + + let item = (b"hello".to_vec(), BitVec::from_slice(&[0b10101010])); + codec.encode(item.clone(), &mut buf).unwrap(); + assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item); + assert_eq!(codec.decode(&mut buf).unwrap(), None); + + let item1 = (b"world".to_vec(), BitVec::from_slice(&[0b01010101])); + codec.encode(item.clone(), &mut buf).unwrap(); + codec.encode(item1.clone(), &mut buf).unwrap(); + assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item); + assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item1); + assert_eq!(codec.decode(&mut buf).unwrap(), None); + assert!(buf.is_empty()); + } + + #[test] + fn test_intermediate_codec_empty_item() { + let mut codec = IntermediateCodecV1; + let mut buf = BytesMut::new(); + + let item = (b"".to_vec(), BitVec::from_slice(&[])); + codec.encode(item.clone(), &mut buf).unwrap(); + assert_eq!(codec.decode(&mut buf).unwrap().unwrap(), item); + assert_eq!(codec.decode(&mut buf).unwrap(), None); + assert!(buf.is_empty()); + } + + #[test] + fn test_intermediate_codec_partial() { + let mut codec = IntermediateCodecV1; + let mut buf = BytesMut::new(); + + let item = (b"hello".to_vec(), BitVec::from_slice(&[0b10101010])); + codec.encode(item.clone(), &mut buf).unwrap(); + + let partial_length = U64_LENGTH + 3; + let mut partial_bytes = buf.split_to(partial_length); + + assert_eq!(codec.decode(&mut partial_bytes).unwrap(), None); // not enough data + partial_bytes.extend_from_slice(&buf[..]); + assert_eq!(codec.decode(&mut partial_bytes).unwrap().unwrap(), item); + assert_eq!(codec.decode(&mut partial_bytes).unwrap(), None); + assert!(partial_bytes.is_empty()); + } +} diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index 44fd77c413..afb8ae1283 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -150,6 +150,16 @@ pub enum Error { error: fst::Error, location: Location, }, + + #[snafu(display("Failed to perform IO operation"))] + CommonIoError { + #[snafu(source)] + error: IoError, + location: Location, + }, + + #[snafu(display("Unknown intermediate codec magic: {magic:?}"))] + UnknownIntermediateCodecMagic { magic: [u8; 4], location: Location }, } impl ErrorExt for Error { @@ -168,6 +178,8 @@ impl ErrorExt for Error { | DecodeProto { .. } | DecodeFst { .. } | KeysApplierUnexpectedPredicates { .. } + | CommonIoError { .. } + | UnknownIntermediateCodecMagic { .. } | FstCompile { .. } => StatusCode::Unexpected, ParseRegex { .. } diff --git a/src/index/src/lib.rs b/src/index/src/lib.rs index e7f448c398..296efb315d 100644 --- a/src/index/src/lib.rs +++ b/src/index/src/lib.rs @@ -13,5 +13,7 @@ // limitations under the License. #![feature(iter_partition_in_place)] +// TODO(zhongzc): remove once further code is added +#![allow(dead_code)] pub mod inverted_index; diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index c745c61e3d..b3ce74640a 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -13,7 +13,7 @@ protobuf-build = { version = "0.15", default-features = false, features = [ async-stream.workspace = true async-trait.workspace = true byteorder = "1.4" -bytes = "1.1" +bytes.workspace = true common-base.workspace = true common-config.workspace = true common-error.workspace = true diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 0e2bd5ad39..13d48636d6 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -17,7 +17,7 @@ async-channel = "1.9" async-compat = "0.2" async-stream.workspace = true async-trait = "0.1" -bytes = "1.4" +bytes.workspace = true chrono.workspace = true common-base.workspace = true common-catalog.workspace = true diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 8e192136c7..5cf792fa01 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -6,7 +6,7 @@ license.workspace = true [dependencies] async-trait = "0.1" -bytes = "1.4" +bytes.workspace = true common-error.workspace = true common-macro.workspace = true common-runtime.workspace = true diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 893f07f6c6..8b8ed8ac53 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -19,7 +19,7 @@ auth.workspace = true axum = { version = "0.6", features = ["headers"] } axum-macros = "0.3.8" base64.workspace = true -bytes = "1.2" +bytes.workspace = true catalog.workspace = true chrono.workspace = true common-base.workspace = true diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index f172256c4e..3e6b92c95f 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -8,7 +8,7 @@ license.workspace = true api.workspace = true aquamarine.workspace = true async-trait.workspace = true -bytes = "1.1" +bytes.workspace = true common-base.workspace = true common-error.workspace = true common-macro.workspace = true