diff --git a/src/flow/src/compute/render/mod.rs b/src/flow/src/compute/render/mod.rs index bf4ae204d9..24c265343c 100644 --- a/src/flow/src/compute/render/mod.rs +++ b/src/flow/src/compute/render/mod.rs @@ -11,6 +11,7 @@ use timely::progress::Timestamp; use crate::compute::context::CollectionBundle; use crate::compute::plan::Plan; +use crate::compute::types::BuildDesc; use crate::compute::Context; use crate::expr::Id; use crate::repr::{self, Row}; @@ -53,6 +54,19 @@ impl RenderTimestamp for repr::Timestamp { } } +// This implementation block allows child timestamps to vary from parent timestamps. +impl Context +where + G: Scope, + G::Timestamp: RenderTimestamp, +{ + pub(crate) fn build_object(&mut self, object: BuildDesc) { + // First, transform the relation expression into a render plan. + let bundle = self.render_plan(object.plan); + self.insert_id(Id::Global(object.id), bundle); + } +} + impl Context where S: Scope, @@ -188,3 +202,32 @@ where } } } + +#[cfg(test)] +mod test { + use differential_dataflow::input::InputSession; + use timely::dataflow::scopes::Child; + + use super::*; + use crate::expr::LocalId; + use crate::repr::Diff; + #[test] + fn test_simple_plan_render() { + timely::execute_from_args(std::env::args(), move |worker| { + let mut input = InputSession::::new(); + worker.dataflow(|scope: &mut Child<'_, _, repr::Timestamp>| { + let mut test_ctx = Context::<_, Row, _>::for_dataflow_in(scope.clone()); + let plan = Plan::Constant { + rows: Ok(vec![(Row::default(), 0, 1)]), + }; + let input_collection = input.to_collection(scope); + let err_collection = InputSession::new().to_collection(scope); + let input_collection = + CollectionBundle::from_collections(input_collection, err_collection); + // insert collection + test_ctx.insert_id(Id::Local(LocalId(0)), input_collection); + let bundle = test_ctx.render_plan(plan); + }); + }); + } +} diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 3878a11506..bb1ac438d4 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -100,11 +100,10 @@ where } }); - // TODO(discord9): find out how to do `consolidate_stream` without Abonomation // Demux out the potential errors from key and value selector evaluation. let (ok, mut err) = key_val_input .as_collection() - // .consolidate_stream() + .consolidate_stream() .flat_map_fallible("OkErrDemux", Some); err = err.concat(&err_input); diff --git a/src/flow/src/compute/types/dataflow.rs b/src/flow/src/compute/types/dataflow.rs new file mode 100644 index 0000000000..c14f0af96d --- /dev/null +++ b/src/flow/src/compute/types/dataflow.rs @@ -0,0 +1,59 @@ +use std::collections::BTreeMap; + +use serde::{Deserialize, Serialize}; +use timely::progress::Antichain; + +use crate::compute::plan::Plan; +use crate::compute::types::sinks::ComputeSinkDesc; +use crate::compute::types::sources::SourceInstanceDesc; +use crate::expr::{GlobalId, ScalarExpr}; +use crate::repr::{self, RelationType}; + +/// A description of a dataflow to construct and results to surface. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct DataflowDescription { + /// Sources instantiations made available to the dataflow pair with monotonicity information. + pub source_imports: BTreeMap, bool)>, + /// Indexes made available to the dataflow. + /// (id of new index, description of index, relationtype of base source/view, monotonic) + pub index_imports: BTreeMap, + /// Views and indexes to be built and stored in the local context. + /// Objects must be built in the specific order, as there may be + /// dependencies of later objects on prior identifiers. + pub objects_to_build: Vec>, + /// Indexes to be made available to be shared with other dataflows + /// (id of new index, description of index, relationtype of base source/view) + pub index_exports: BTreeMap, + /// sinks to be created + /// (id of new sink, description of sink) + pub sink_exports: BTreeMap>, + /// An optional frontier to which inputs should be advanced. + /// + /// If this is set, it should override the default setting determined by + /// the upper bound of `since` frontiers contributing to the dataflow. + /// It is an error for this to be set to a frontier not beyond that default. + pub as_of: Option>, + /// Frontier beyond which the dataflow should not execute. + /// Specifically, updates at times greater or equal to this frontier are suppressed. + /// This is often set to `as_of + 1` to enable "batch" computations. + pub until: Antichain, + /// Human readable name + pub debug_name: String, +} + +/// An association of a global identifier to an expression. +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct BuildDesc

{ + pub id: GlobalId, + pub plan: P, +} + +/// An index storing processed updates so they can be queried +/// or reused in other computations +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] +pub struct IndexDesc { + /// Identity of the collection the index is on. + pub on_id: GlobalId, + /// Expressions to be arranged, in order of decreasing primacy. + pub key: Vec, +} diff --git a/src/flow/src/compute/types/mod.rs b/src/flow/src/compute/types/mod.rs index 8b13789179..7d8bc74309 100644 --- a/src/flow/src/compute/types/mod.rs +++ b/src/flow/src/compute/types/mod.rs @@ -1 +1,13 @@ +use serde::{Deserialize, Serialize}; +use crate::expr::GlobalId; +mod dataflow; +mod sinks; +mod sources; + +/// An association of a global identifier to an expression. +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct BuildDesc

{ + pub id: GlobalId, + pub plan: P, +} diff --git a/src/flow/src/compute/types/sinks.rs b/src/flow/src/compute/types/sinks.rs new file mode 100644 index 0000000000..83b198fe5b --- /dev/null +++ b/src/flow/src/compute/types/sinks.rs @@ -0,0 +1,28 @@ +use serde::{Deserialize, Serialize}; +use timely::progress::Antichain; + +use crate::expr::GlobalId; +use crate::repr::{self, RelationDesc}; + +/// A sink for updates to a relational collection. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct ComputeSinkDesc { + pub from: GlobalId, + pub from_desc: RelationDesc, + pub connection: ComputeSinkConnection, + pub with_snapshot: bool, + pub up_to: Antichain, +} + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub enum ComputeSinkConnection { + // TODO(discord9): consider if ever needed + Subscribe, + Persist(PersistSinkConnection), +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct PersistSinkConnection { + pub value_desc: RelationDesc, + pub storage_metadata: S, +} diff --git a/src/flow/src/compute/types/sources.rs b/src/flow/src/compute/types/sources.rs new file mode 100644 index 0000000000..b84cab75fe --- /dev/null +++ b/src/flow/src/compute/types/sources.rs @@ -0,0 +1,26 @@ +use serde::{Deserialize, Serialize}; + +use crate::expr::MapFilterProject; +use crate::repr::RelationType; + +/// A description of an instantiation of a source. +/// +/// This includes a description of the source, but additionally any +/// context-dependent options like the ability to apply filtering and +/// projection to the records as they emerge. +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct SourceInstanceDesc { + /// Arguments for this instantiation of the source. + pub arguments: SourceInstanceArguments, + /// Additional metadata used by the storage client of a compute instance to read it. + pub storage_metadata: M, + /// The relation type of this source + pub typ: RelationType, +} + +/// Per-source construction arguments. +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct SourceInstanceArguments { + /// Linear operators to be applied record-by-record. + pub operators: Option, +} diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 53f0662476..e4f8080cbe 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -5,7 +5,7 @@ use super::ScalarExpr; // TODO(discord9): more function & eval use crate::{repr::Row, storage::errors::EvalError}; -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)] pub enum UnaryFunc { Not, IsNull, @@ -19,7 +19,7 @@ impl UnaryFunc { } } -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)] pub enum BinaryFunc {} impl BinaryFunc { @@ -33,7 +33,7 @@ impl BinaryFunc { } } -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)] pub enum VariadicFunc {} impl VariadicFunc { diff --git a/src/flow/src/expr/id.rs b/src/flow/src/expr/id.rs index 7f055a793f..4841883f11 100644 --- a/src/flow/src/expr/id.rs +++ b/src/flow/src/expr/id.rs @@ -1,4 +1,6 @@ -#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] pub enum GlobalId { /// System namespace. System(u64), diff --git a/src/flow/src/expr/mod.rs b/src/flow/src/expr/mod.rs index e72ef5420a..04868d4f80 100644 --- a/src/flow/src/expr/mod.rs +++ b/src/flow/src/expr/mod.rs @@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize}; use crate::expr::func::{BinaryFunc, UnaryFunc, VariadicFunc}; use crate::storage::errors::EvalError; -#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum ScalarExpr { /// A column of the input row Column(usize), diff --git a/src/flow/src/repr/mod.rs b/src/flow/src/repr/mod.rs index a29757efe0..00ded4c4bc 100644 --- a/src/flow/src/repr/mod.rs +++ b/src/flow/src/repr/mod.rs @@ -4,11 +4,14 @@ use std::borrow::Borrow; use std::slice::SliceIndex; use datatypes::value::Value; +pub(crate) use relation::{RelationDesc, RelationType}; use serde::{Deserialize, Serialize}; - /// System-wide Record count difference type. pub type Diff = i64; +mod relation; +mod timestamp; + /// A row is a vector of values. /// /// TODO(discord9): use a more efficient representation diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs new file mode 100644 index 0000000000..b9e9c87074 --- /dev/null +++ b/src/flow/src/repr/relation.rs @@ -0,0 +1,342 @@ +use datatypes::prelude::ConcreteDataType; +use serde::{Deserialize, Serialize}; + +/// The type of a relation. +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)] +pub struct RelationType { + /// The type for each column, in order. + pub column_types: Vec, + /// Sets of indices that are "keys" for the collection. + /// + /// Each element in this list is a set of column indices, each with the + /// property that the collection contains at most one record with each + /// distinct set of values for each column. Alternately, for a specific set + /// of values assigned to the these columns there is at most one record. + /// + /// A collection can contain multiple sets of keys, although it is common to + /// have either zero or one sets of key indices. + #[serde(default)] + pub keys: Vec>, +} + +impl RelationType { + /// Constructs a `RelationType` representing the relation with no columns and + /// no keys. + pub fn empty() -> Self { + RelationType::new(vec![]) + } + + /// Constructs a new `RelationType` from specified column types. + /// + /// The `RelationType` will have no keys. + pub fn new(column_types: Vec) -> Self { + RelationType { + column_types, + keys: Vec::new(), + } + } + + /// Adds a new key for the relation. + pub fn with_key(mut self, mut indices: Vec) -> Self { + indices.sort_unstable(); + if !self.keys.contains(&indices) { + self.keys.push(indices); + } + self + } + + pub fn with_keys(mut self, keys: Vec>) -> Self { + for key in keys { + self = self.with_key(key) + } + self + } + + /// Computes the number of columns in the relation. + pub fn arity(&self) -> usize { + self.column_types.len() + } + + /// Gets the index of the columns used when creating a default index. + pub fn default_key(&self) -> Vec { + if let Some(key) = self.keys.first() { + if key.is_empty() { + (0..self.column_types.len()).collect() + } else { + key.clone() + } + } else { + (0..self.column_types.len()).collect() + } + } + + /// True if any collection described by `self` could safely be described by `other`. + /// + /// In practice this means checking that the scalar types match exactly, and that the + /// nullability of `self` is at least as strict as `other`, and that all keys of `other` + /// contain some key of `self` (as a set of key columns is less strict than any subset). + pub fn subtypes(&self, other: &RelationType) -> bool { + let all_keys = other.keys.iter().all(|key1| { + self.keys + .iter() + .any(|key2| key1.iter().all(|k| key2.contains(k))) + }); + if !all_keys { + return false; + } + + if self.column_types.len() != other.column_types.len() { + return false; + } + + for (col1, col2) in self.column_types.iter().zip(other.column_types.iter()) { + if col1.nullable && !col2.nullable { + return false; + } + if col1.scalar_type != col2.scalar_type { + return false; + } + } + true + } +} + +/// The type of a `Value` +/// +/// [`ColumnType`] bundles information about the scalar type of a datum (e.g., +/// Int32 or String) with its nullability. +/// +/// To construct a column type, either initialize the struct directly, or +/// use the [`ScalarType::nullable`] method. +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)] +pub struct ColumnType { + /// The underlying scalar type (e.g., Int32 or String) of this column. + pub scalar_type: ConcreteDataType, + /// Whether this datum can be null.` + #[serde(default = "return_true")] + pub nullable: bool, +} + +/// This method exists solely for the purpose of making ColumnType nullable by +/// default in unit tests. The default value of a bool is false, and the only +/// way to make an object take on any other value by default is to pass it a +/// function that returns the desired default value. See +/// +#[inline(always)] +fn return_true() -> bool { + true +} + +/// A description of the shape of a relation. +/// +/// It bundles a [`RelationType`] with the name of each column in the relation. +/// Individual column names are optional. +/// +/// # Examples +/// +/// A `RelationDesc`s is typically constructed via its builder API: +/// +/// ``` +/// use mz_repr::{ColumnType, RelationDesc, ScalarType}; +/// +/// let desc = RelationDesc::empty() +/// .with_column("id", ScalarType::Int64.nullable(false)) +/// .with_column("price", ScalarType::Float64.nullable(true)); +/// ``` +/// +/// In more complicated cases, like when constructing a `RelationDesc` in +/// response to user input, it may be more convenient to construct a relation +/// type first, and imbue it with column names to form a `RelationDesc` later: +/// +/// ``` +/// use mz_repr::RelationDesc; +/// +/// # fn plan_query(_: &str) -> mz_repr::RelationType { mz_repr::RelationType::new(vec![]) } +/// let relation_type = plan_query("SELECT * FROM table"); +/// let names = (0..relation_type.arity()).map(|i| match i { +/// 0 => "first", +/// 1 => "second", +/// _ => "unknown", +/// }); +/// let desc = RelationDesc::new(relation_type, names); +/// ``` +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] +pub struct RelationDesc { + typ: RelationType, + names: Vec, +} + +impl RelationDesc { + /// Constructs a new `RelationDesc` that represents the empty relation + /// with no columns and no keys. + pub fn empty() -> Self { + RelationDesc { + typ: RelationType::empty(), + names: vec![], + } + } + + /// Constructs a new `RelationDesc` from a `RelationType` and an iterator + /// over column names. + /// + /// # Panics + /// + /// Panics if the arity of the `RelationType` is not equal to the number of + /// items in `names`. + pub fn new(typ: RelationType, names: I) -> Self + where + I: IntoIterator, + N: Into, + { + let names: Vec<_> = names.into_iter().map(|name| name.into()).collect(); + assert_eq!(typ.column_types.len(), names.len()); + RelationDesc { typ, names } + } + + pub fn from_names_and_types(iter: I) -> Self + where + I: IntoIterator, + T: Into, + N: Into, + { + let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip(); + let types = types.into_iter().map(Into::into).collect(); + let typ = RelationType::new(types); + Self::new(typ, names) + } + /// Concatenates a `RelationDesc` onto the end of this `RelationDesc`. + pub fn concat(mut self, other: Self) -> Self { + let self_len = self.typ.column_types.len(); + self.names.extend(other.names); + self.typ.column_types.extend(other.typ.column_types); + for k in other.typ.keys { + let k = k.into_iter().map(|idx| idx + self_len).collect(); + self = self.with_key(k); + } + self + } + + /// Appends a column with the specified name and type. + pub fn with_column(mut self, name: N, column_type: ColumnType) -> Self + where + N: Into, + { + self.typ.column_types.push(column_type); + self.names.push(name.into()); + self + } + + /// Adds a new key for the relation. + pub fn with_key(mut self, indices: Vec) -> Self { + self.typ = self.typ.with_key(indices); + self + } + + /// Drops all existing keys. + pub fn without_keys(mut self) -> Self { + self.typ.keys.clear(); + self + } + + /// Builds a new relation description with the column names replaced with + /// new names. + /// + /// # Panics + /// + /// Panics if the arity of the relation type does not match the number of + /// items in `names`. + pub fn with_names(self, names: I) -> Self + where + I: IntoIterator, + N: Into, + { + Self::new(self.typ, names) + } + + /// Computes the number of columns in the relation. + pub fn arity(&self) -> usize { + self.typ.arity() + } + + /// Returns the relation type underlying this relation description. + pub fn typ(&self) -> &RelationType { + &self.typ + } + + /// Returns an iterator over the columns in this relation. + pub fn iter(&self) -> impl Iterator { + self.iter_names().zip(self.iter_types()) + } + + /// Returns an iterator over the types of the columns in this relation. + pub fn iter_types(&self) -> impl Iterator { + self.typ.column_types.iter() + } + + /// Returns an iterator over the names of the columns in this relation. + pub fn iter_names(&self) -> impl Iterator { + self.names.iter() + } + + /// Finds a column by name. + /// + /// Returns the index and type of the column named `name`. If no column with + /// the specified name exists, returns `None`. If multiple columns have the + /// specified name, the leftmost column is returned. + pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &ColumnType)> { + self.iter_names() + .position(|n| n == name) + .map(|i| (i, &self.typ.column_types[i])) + } + + /// Gets the name of the `i`th column. + /// + /// # Panics + /// + /// Panics if `i` is not a valid column index. + pub fn get_name(&self, i: usize) -> &ColumnName { + &self.names[i] + } + + /// Mutably gets the name of the `i`th column. + /// + /// # Panics + /// + /// Panics if `i` is not a valid column index. + pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName { + &mut self.names[i] + } + + /// Gets the name of the `i`th column if that column name is unambiguous. + /// + /// If at least one other column has the same name as the `i`th column, + /// returns `None`. If the `i`th column has no name, returns `None`. + /// + /// # Panics + /// + /// Panics if `i` is not a valid column index. + pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> { + let name = &self.names[i]; + if self.iter_names().filter(|n| *n == name).count() == 1 { + Some(name) + } else { + None + } + } +} + +/// The name of a column in a [`RelationDesc`]. +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)] +pub struct ColumnName(pub(crate) String); + +impl ColumnName { + /// Returns this column name as a `str`. + pub fn as_str(&self) -> &str { + &self.0 + } + + /// Returns a mutable reference to the string underlying this column name. + pub fn as_mut_str(&mut self) -> &mut String { + &mut self.0 + } +} diff --git a/src/flow/src/repr/timestamp.rs b/src/flow/src/repr/timestamp.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/src/flow/src/repr/timestamp.rs @@ -0,0 +1 @@ +