From 8a467556a27a8a59cc687ea10deed7535390910f Mon Sep 17 00:00:00 2001 From: bluss Date: Wed, 21 Dec 2016 21:38:13 +0100 Subject: [PATCH 01/13] Initial implementation of ParallelIterator for AxisIter --- Cargo.toml | 2 ++ src/iterators/mod.rs | 3 ++ src/iterators/par.rs | 78 ++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 ++ tests/rayon.rs | 18 ++++++++++ 5 files changed, 103 insertions(+) create mode 100644 src/iterators/par.rs create mode 100644 tests/rayon.rs diff --git a/Cargo.toml b/Cargo.toml index 3daa21ff8..d04d644eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,8 @@ optional = true blas-sys = { version = "0.6.5", optional = true, default-features = false } matrixmultiply = { version = "0.1.13" } +rayon = { version = "0.5.0", optional = true, default-features = false } + [dependencies.serde] version = "0.8.20" optional = true diff --git a/src/iterators/mod.rs b/src/iterators/mod.rs index e379fb96f..762648e55 100644 --- a/src/iterators/mod.rs +++ b/src/iterators/mod.rs @@ -21,6 +21,9 @@ use super::{ Axis, }; +#[cfg(feature = "rayon")] +mod par; + /// Base for array iterators /// /// Iterator element type is `&'a A`. diff --git a/src/iterators/par.rs b/src/iterators/par.rs new file mode 100644 index 000000000..0e1da784b --- /dev/null +++ b/src/iterators/par.rs @@ -0,0 +1,78 @@ + + +use rayon::par_iter::ParallelIterator; +use rayon::par_iter::IndexedParallelIterator; +use rayon::par_iter::ExactParallelIterator; +use rayon::par_iter::BoundedParallelIterator; +use rayon::par_iter::internal::{Consumer, UnindexedConsumer}; +use rayon::par_iter::internal::bridge; +use rayon::par_iter::internal::ProducerCallback; +use rayon::par_iter::internal::Producer; + +use super::AxisIter; +use imp_prelude::*; + + + +impl<'a, A, D> ParallelIterator for AxisIter<'a, A, D> + where D: Dimension, + A: Sync, +{ + type Item = ::Item; + fn drive_unindexed(self, consumer: C) -> C::Result + where C: UnindexedConsumer + { + bridge(self, consumer) + } +} + +impl<'a, A, D> IndexedParallelIterator for AxisIter<'a, A, D> + where D: Dimension, + A: Sync, +{ + fn with_producer(self, callback: Cb) -> Cb::Output + where Cb: ProducerCallback + { + callback.callback(self) + } +} + +impl<'a, A, D> ExactParallelIterator for AxisIter<'a, A, D> + where D: Dimension, + A: Sync, +{ + fn len(&mut self) -> usize { + self.size_hint().0 + } +} + +impl<'a, A, D> BoundedParallelIterator for AxisIter<'a, A, D> + where D: Dimension, + A: Sync, +{ + fn upper_bound(&mut self) -> usize { + ExactParallelIterator::len(self) + } + + fn drive(self, consumer: C) -> C::Result + where C: Consumer + { + bridge(self, consumer) + } +} + +// This is the real magic, I guess + +impl<'a, A, D> Producer for AxisIter<'a, A, D> + where D: Dimension, + A: Sync, +{ + fn cost(&mut self, len: usize) -> f64 { + // FIXME: No idea about what this is + len as f64 + } + + fn split_at(self, i: usize) -> (Self, Self) { + self.split_at(i) + } +} diff --git a/src/lib.rs b/src/lib.rs index 5e049b13e..ad7560ce3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -77,6 +77,8 @@ extern crate matrixmultiply; extern crate itertools; extern crate num_traits as libnum; extern crate num_complex; +#[cfg(feature = "rayon")] +extern crate rayon; use std::iter::Zip; use std::marker::PhantomData; diff --git a/tests/rayon.rs b/tests/rayon.rs new file mode 100644 index 000000000..548814f25 --- /dev/null +++ b/tests/rayon.rs @@ -0,0 +1,18 @@ +#![cfg(feature = "rayon")] + +extern crate rayon; +extern crate ndarray; + +use ndarray::prelude::*; + +use rayon::par_iter::ParallelIterator; + +#[test] +fn test_axis_iter() { + let mut a = Array2::::zeros((10240, 10240)); + for (i, mut v) in a.axis_iter_mut(Axis(0)).enumerate() { + v.fill(i as _); + } + let s = ParallelIterator::map(a.axis_iter(Axis(0)), |x| x.scalar_sum()).sum(); + assert_eq!(s, a.scalar_sum()); +} From bbfd18525c3f3b00351977131311e18ada5b920a Mon Sep 17 00:00:00 2001 From: bluss Date: Wed, 21 Dec 2016 21:38:13 +0100 Subject: [PATCH 02/13] Use a separate type for the ParallelIterator --- src/iterators/par.rs | 40 ++++++++++++++++++++++++++++++++++------ tests/rayon.rs | 7 ++++--- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/src/iterators/par.rs b/src/iterators/par.rs index 0e1da784b..0ee9c3af9 100644 --- a/src/iterators/par.rs +++ b/src/iterators/par.rs @@ -1,6 +1,7 @@ use rayon::par_iter::ParallelIterator; +use rayon::par_iter::IntoParallelIterator; use rayon::par_iter::IndexedParallelIterator; use rayon::par_iter::ExactParallelIterator; use rayon::par_iter::BoundedParallelIterator; @@ -12,13 +13,40 @@ use rayon::par_iter::internal::Producer; use super::AxisIter; use imp_prelude::*; +/// Parallel iterator wrapper. +pub struct Parallel { + pub iter: I, +} +impl From for Parallel + where I: IntoIterator, +{ + fn from(iter: I) -> Self { + Parallel { + iter: iter.into_iter() + } + } +} -impl<'a, A, D> ParallelIterator for AxisIter<'a, A, D> +impl<'a, A, D> IntoParallelIterator for AxisIter<'a, A, D> where D: Dimension, A: Sync, { type Item = ::Item; + type Iter = Parallel; + fn into_par_iter(self) -> Self::Iter { + Parallel { + iter: self, + } + } +} + + +impl<'a, A, D> ParallelIterator for Parallel> + where D: Dimension, + A: Sync, +{ + type Item = as Iterator>::Item; fn drive_unindexed(self, consumer: C) -> C::Result where C: UnindexedConsumer { @@ -26,27 +54,27 @@ impl<'a, A, D> ParallelIterator for AxisIter<'a, A, D> } } -impl<'a, A, D> IndexedParallelIterator for AxisIter<'a, A, D> +impl<'a, A, D> IndexedParallelIterator for Parallel> where D: Dimension, A: Sync, { fn with_producer(self, callback: Cb) -> Cb::Output where Cb: ProducerCallback { - callback.callback(self) + callback.callback(self.iter) } } -impl<'a, A, D> ExactParallelIterator for AxisIter<'a, A, D> +impl<'a, A, D> ExactParallelIterator for Parallel> where D: Dimension, A: Sync, { fn len(&mut self) -> usize { - self.size_hint().0 + self.iter.len() } } -impl<'a, A, D> BoundedParallelIterator for AxisIter<'a, A, D> +impl<'a, A, D> BoundedParallelIterator for Parallel> where D: Dimension, A: Sync, { diff --git a/tests/rayon.rs b/tests/rayon.rs index 548814f25..6d55c64db 100644 --- a/tests/rayon.rs +++ b/tests/rayon.rs @@ -5,14 +5,15 @@ extern crate ndarray; use ndarray::prelude::*; -use rayon::par_iter::ParallelIterator; +use rayon::prelude::*; #[test] fn test_axis_iter() { - let mut a = Array2::::zeros((10240, 10240)); + let mut a = Array2::::zeros((1024 * 1024, 100)); for (i, mut v) in a.axis_iter_mut(Axis(0)).enumerate() { v.fill(i as _); } - let s = ParallelIterator::map(a.axis_iter(Axis(0)), |x| x.scalar_sum()).sum(); + assert_eq!(a.axis_iter(Axis(0)).len(), 1024 * 1024); + let s = a.axis_iter(Axis(0)).into_par_iter().map(|x| x.scalar_sum()).sum(); assert_eq!(s, a.scalar_sum()); } From 4c769d56db470c5587b9bd2f108c706ff1846a68 Mon Sep 17 00:00:00 2001 From: bluss Date: Wed, 21 Dec 2016 21:38:13 +0100 Subject: [PATCH 03/13] Enable parallel iterator for both AxisIter and AxisIterMut --- src/iterators/par.rs | 124 +++++++++++++++++++++++-------------------- tests/rayon.rs | 11 +++- 2 files changed, 76 insertions(+), 59 deletions(-) diff --git a/src/iterators/par.rs b/src/iterators/par.rs index 0ee9c3af9..b830efef9 100644 --- a/src/iterators/par.rs +++ b/src/iterators/par.rs @@ -11,11 +11,13 @@ use rayon::par_iter::internal::ProducerCallback; use rayon::par_iter::internal::Producer; use super::AxisIter; +use super::AxisIterMut; use imp_prelude::*; /// Parallel iterator wrapper. +#[derive(Copy, Clone, Debug)] pub struct Parallel { - pub iter: I, + iter: I, } impl From for Parallel @@ -28,79 +30,85 @@ impl From for Parallel } } -impl<'a, A, D> IntoParallelIterator for AxisIter<'a, A, D> - where D: Dimension, - A: Sync, -{ - type Item = ::Item; - type Iter = Parallel; - fn into_par_iter(self) -> Self::Iter { - Parallel { - iter: self, +macro_rules! par_iter_wrapper { + // thread_bounds are either Sync or Send + Sync + ($iter_name:ident, [$($thread_bounds:tt)*]) => { + impl<'a, A, D> IntoParallelIterator for $iter_name<'a, A, D> + where D: Dimension, + A: $($thread_bounds)*, + { + type Item = ::Item; + type Iter = Parallel; + fn into_par_iter(self) -> Self::Iter { + Parallel::from(self) } } -} -impl<'a, A, D> ParallelIterator for Parallel> - where D: Dimension, - A: Sync, -{ - type Item = as Iterator>::Item; - fn drive_unindexed(self, consumer: C) -> C::Result - where C: UnindexedConsumer + impl<'a, A, D> ParallelIterator for Parallel<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, { - bridge(self, consumer) + type Item = <$iter_name<'a, A, D> as Iterator>::Item; + fn drive_unindexed(self, consumer: C) -> C::Result + where C: UnindexedConsumer + { + bridge(self, consumer) + } } -} -impl<'a, A, D> IndexedParallelIterator for Parallel> - where D: Dimension, - A: Sync, -{ - fn with_producer(self, callback: Cb) -> Cb::Output - where Cb: ProducerCallback + impl<'a, A, D> IndexedParallelIterator for Parallel<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, { - callback.callback(self.iter) - } -} - -impl<'a, A, D> ExactParallelIterator for Parallel> - where D: Dimension, - A: Sync, -{ - fn len(&mut self) -> usize { - self.iter.len() + fn with_producer(self, callback: Cb) -> Cb::Output + where Cb: ProducerCallback + { + callback.callback(self.iter) + } } -} -impl<'a, A, D> BoundedParallelIterator for Parallel> - where D: Dimension, - A: Sync, -{ - fn upper_bound(&mut self) -> usize { - ExactParallelIterator::len(self) + impl<'a, A, D> ExactParallelIterator for Parallel<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + fn len(&mut self) -> usize { + ExactSizeIterator::len(&self.iter) + } } - fn drive(self, consumer: C) -> C::Result - where C: Consumer + impl<'a, A, D> BoundedParallelIterator for Parallel<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, { - bridge(self, consumer) + fn upper_bound(&mut self) -> usize { + ExactSizeIterator::len(&self.iter) + } + + fn drive(self, consumer: C) -> C::Result + where C: Consumer + { + bridge(self, consumer) + } } -} -// This is the real magic, I guess + // This is the real magic, I guess -impl<'a, A, D> Producer for AxisIter<'a, A, D> - where D: Dimension, - A: Sync, -{ - fn cost(&mut self, len: usize) -> f64 { - // FIXME: No idea about what this is - len as f64 - } + impl<'a, A, D> Producer for $iter_name<'a, A, D> + where D: Dimension, + A: $($thread_bounds)*, + { + fn cost(&mut self, len: usize) -> f64 { + // FIXME: No idea about what this is + len as f64 + } - fn split_at(self, i: usize) -> (Self, Self) { - self.split_at(i) + fn split_at(self, i: usize) -> (Self, Self) { + self.split_at(i) + } + } } } + +par_iter_wrapper!(AxisIter, [Sync]); +par_iter_wrapper!(AxisIterMut, [Send + Sync]); diff --git a/tests/rayon.rs b/tests/rayon.rs index 6d55c64db..53ed9781a 100644 --- a/tests/rayon.rs +++ b/tests/rayon.rs @@ -1,7 +1,7 @@ #![cfg(feature = "rayon")] extern crate rayon; -extern crate ndarray; +#[macro_use(s)] extern crate ndarray; use ndarray::prelude::*; @@ -17,3 +17,12 @@ fn test_axis_iter() { let s = a.axis_iter(Axis(0)).into_par_iter().map(|x| x.scalar_sum()).sum(); assert_eq!(s, a.scalar_sum()); } + +#[test] +fn test_axis_iter_mut() { + let mut a = Array2::::zeros((1024 * 1024, 100)); + a.axis_iter_mut(Axis(0)).into_par_iter().enumerate().for_each(|(i, mut v)| v.fill(i as _)); + assert_eq!(a.scalar_sum(), + (0..a.len_of(Axis(0))).map(|n| n as u32 * a.len_of(Axis(1)) as u32).sum::()); + println!("{:?}", a.slice(s![..10, ..10])); +} From 17a1180f3ca34ae534e70a341f2ce4a5d8669649 Mon Sep 17 00:00:00 2001 From: bluss Date: Wed, 21 Dec 2016 21:38:13 +0100 Subject: [PATCH 04/13] Update tests for parallel axis iterators --- tests/rayon.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/tests/rayon.rs b/tests/rayon.rs index 53ed9781a..b3b539d98 100644 --- a/tests/rayon.rs +++ b/tests/rayon.rs @@ -7,22 +7,26 @@ use ndarray::prelude::*; use rayon::prelude::*; +const M: usize = 1024 * 10; +const N: usize = 100; + #[test] fn test_axis_iter() { - let mut a = Array2::::zeros((1024 * 1024, 100)); + let mut a = Array2::::zeros((M, N)); for (i, mut v) in a.axis_iter_mut(Axis(0)).enumerate() { v.fill(i as _); } - assert_eq!(a.axis_iter(Axis(0)).len(), 1024 * 1024); + assert_eq!(a.axis_iter(Axis(0)).len(), M); let s = a.axis_iter(Axis(0)).into_par_iter().map(|x| x.scalar_sum()).sum(); + println!("{:?}", a.slice(s![..10, ..5])); assert_eq!(s, a.scalar_sum()); } #[test] fn test_axis_iter_mut() { - let mut a = Array2::::zeros((1024 * 1024, 100)); - a.axis_iter_mut(Axis(0)).into_par_iter().enumerate().for_each(|(i, mut v)| v.fill(i as _)); - assert_eq!(a.scalar_sum(), - (0..a.len_of(Axis(0))).map(|n| n as u32 * a.len_of(Axis(1)) as u32).sum::()); - println!("{:?}", a.slice(s![..10, ..10])); + let mut a = Array::linspace(0., 1.0f64, M * N).into_shape((M, N)).unwrap(); + let b = a.mapv(|x| x.exp()); + a.axis_iter_mut(Axis(0)).into_par_iter().for_each(|mut v| v.mapv_inplace(|x| x.exp())); + println!("{:?}", a.slice(s![..10, ..5])); + assert!(a.all_close(&b, 0.001)); } From 0a226b6e30d8462f7437b9df2e6075550f9a05fd Mon Sep 17 00:00:00 2001 From: bluss Date: Wed, 21 Dec 2016 21:38:13 +0100 Subject: [PATCH 05/13] Enable "rayon" feature in CI and docs --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d04d644eb..6ef66fff9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,10 +50,10 @@ blas = ["blas-sys"] # These features are used for testing blas-openblas-sys = ["blas"] -test = ["blas-openblas-sys"] +test = ["blas-openblas-sys", "rayon"] # This feature is used for docs -docs = ["rustc-serialize", "serde"] +docs = ["rustc-serialize", "serde", "rayon"] [profile.release] [profile.bench] From 5bd033854c22e5ef5c0d3312ff9e8819f25667b7 Mon Sep 17 00:00:00 2001 From: bluss Date: Wed, 21 Dec 2016 21:38:13 +0100 Subject: [PATCH 06/13] Expose `Parallel` in docs and document crate feature --- README.rst | 5 +++++ src/iterators/mod.rs | 2 +- src/iterators/par.rs | 8 ++++++-- src/lib.rs | 5 +++++ src/par/mod.rs | 6 ++++++ 5 files changed, 23 insertions(+), 3 deletions(-) create mode 100644 src/par/mod.rs diff --git a/README.rst b/README.rst index a771f307f..4db9999b1 100644 --- a/README.rst +++ b/README.rst @@ -69,6 +69,11 @@ your `Cargo.toml`. Uses ``blas-sys`` for pluggable backend, which needs to be configured separately. +- ``rayon`` + + - Optional, compatible with Rust stable + - Implement rayon 0.5 parallelization traits for ``AxisIter``. + How to use with cargo:: [dependencies] diff --git a/src/iterators/mod.rs b/src/iterators/mod.rs index 762648e55..4f91c4b61 100644 --- a/src/iterators/mod.rs +++ b/src/iterators/mod.rs @@ -22,7 +22,7 @@ use super::{ }; #[cfg(feature = "rayon")] -mod par; +pub mod par; /// Base for array iterators /// diff --git a/src/iterators/par.rs b/src/iterators/par.rs index b830efef9..50a1ca954 100644 --- a/src/iterators/par.rs +++ b/src/iterators/par.rs @@ -14,7 +14,9 @@ use super::AxisIter; use super::AxisIterMut; use imp_prelude::*; -/// Parallel iterator wrapper. +/// Iterator wrapper for parallelized implementations. +/// +/// **Requires crate feature `"rayon"`** #[derive(Copy, Clone, Debug)] pub struct Parallel { iter: I, @@ -33,6 +35,9 @@ impl From for Parallel macro_rules! par_iter_wrapper { // thread_bounds are either Sync or Send + Sync ($iter_name:ident, [$($thread_bounds:tt)*]) => { + /// This iterator can be turned into a parallel iterator (rayon crate). + /// + /// **Requires crate feature `"rayon"`** impl<'a, A, D> IntoParallelIterator for $iter_name<'a, A, D> where D: Dimension, A: $($thread_bounds)*, @@ -44,7 +49,6 @@ macro_rules! par_iter_wrapper { } } - impl<'a, A, D> ParallelIterator for Parallel<$iter_name<'a, A, D>> where D: Dimension, A: $($thread_bounds)*, diff --git a/src/lib.rs b/src/lib.rs index ad7560ce3..94e027c67 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -62,6 +62,9 @@ //! - Enable transparent BLAS support for matrix multiplication. //! Uses ``blas-sys`` for pluggable backend, which needs to be configured //! separately. +//! - `rayon` +//! - Optional. +//! - Implement rayon 0.5 parallelization traits for `AxisIter`. //! #[cfg(feature = "serde")] @@ -126,6 +129,8 @@ mod array_serde; mod array_serialize; mod arrayformat; mod data_traits; +#[cfg(feature = "rayon")] +pub mod par; pub use aliases::*; diff --git a/src/par/mod.rs b/src/par/mod.rs new file mode 100644 index 000000000..717e35775 --- /dev/null +++ b/src/par/mod.rs @@ -0,0 +1,6 @@ + +//! Parallelization features for ndarray. +//! +//! **Requires crate feature `"rayon"`** + +pub use iterators::par::Parallel; From 5c5c6abc4eff9e7d8a9c2cb6451a5cd7f2aef402 Mon Sep 17 00:00:00 2001 From: bluss Date: Wed, 21 Dec 2016 21:38:13 +0100 Subject: [PATCH 07/13] Use rayon 0.6 to implement IntoParallelIterator for array views --- Cargo.toml | 2 +- README.rst | 2 +- src/iterators/par.rs | 86 ++++++++++++++++++++++++++++++++++++++------ src/lib.rs | 2 +- tests/rayon.rs | 11 ++++++ 5 files changed, 89 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6ef66fff9..035c4642d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ optional = true blas-sys = { version = "0.6.5", optional = true, default-features = false } matrixmultiply = { version = "0.1.13" } -rayon = { version = "0.5.0", optional = true, default-features = false } +rayon = { version = "0.6.0", optional = true } [dependencies.serde] version = "0.8.20" diff --git a/README.rst b/README.rst index 4db9999b1..ff018ea78 100644 --- a/README.rst +++ b/README.rst @@ -72,7 +72,7 @@ your `Cargo.toml`. - ``rayon`` - Optional, compatible with Rust stable - - Implement rayon 0.5 parallelization traits for ``AxisIter``. + - Implement rayon 0.6 parallelization. How to use with cargo:: diff --git a/src/iterators/par.rs b/src/iterators/par.rs index 50a1ca954..9eb45caef 100644 --- a/src/iterators/par.rs +++ b/src/iterators/par.rs @@ -7,8 +7,12 @@ use rayon::par_iter::ExactParallelIterator; use rayon::par_iter::BoundedParallelIterator; use rayon::par_iter::internal::{Consumer, UnindexedConsumer}; use rayon::par_iter::internal::bridge; +use rayon::par_iter::internal::bridge_unindexed; use rayon::par_iter::internal::ProducerCallback; use rayon::par_iter::internal::Producer; +use rayon::par_iter::internal::UnindexedProducer; +#[cfg(rayon_fold_with)] +use rayon::par_iter::internal::Folder; use super::AxisIter; use super::AxisIterMut; @@ -22,16 +26,6 @@ pub struct Parallel { iter: I, } -impl From for Parallel - where I: IntoIterator, -{ - fn from(iter: I) -> Self { - Parallel { - iter: iter.into_iter() - } - } -} - macro_rules! par_iter_wrapper { // thread_bounds are either Sync or Send + Sync ($iter_name:ident, [$($thread_bounds:tt)*]) => { @@ -45,7 +39,9 @@ macro_rules! par_iter_wrapper { type Item = ::Item; type Iter = Parallel; fn into_par_iter(self) -> Self::Iter { - Parallel::from(self) + Parallel { + iter: self, + } } } @@ -59,6 +55,10 @@ macro_rules! par_iter_wrapper { { bridge(self, consumer) } + + fn opt_len(&mut self) -> Option { + Some(self.iter.len()) + } } impl<'a, A, D> IndexedParallelIterator for Parallel<$iter_name<'a, A, D>> @@ -116,3 +116,67 @@ macro_rules! par_iter_wrapper { par_iter_wrapper!(AxisIter, [Sync]); par_iter_wrapper!(AxisIterMut, [Send + Sync]); + +macro_rules! par_iter_view_wrapper { + // thread_bounds are either Sync or Send + Sync + ($view_name:ident, [$($thread_bounds:tt)*]) => { + impl<'a, A, D> IntoParallelIterator for $view_name<'a, A, D> + where D: Dimension, + A: $($thread_bounds)*, + { + type Item = ::Item; + type Iter = Parallel; + fn into_par_iter(self) -> Self::Iter { + Parallel { + iter: self, + } + } + } + + + impl<'a, A, D> ParallelIterator for Parallel<$view_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + type Item = <$view_name<'a, A, D> as IntoIterator>::Item; + fn drive_unindexed(self, consumer: C) -> C::Result + where C: UnindexedConsumer + { + bridge_unindexed(self.iter, consumer) + } + + fn opt_len(&mut self) -> Option { + Some(self.iter.len()) + } + } + + impl<'a, A, D> UnindexedProducer for $view_name<'a, A, D> + where D: Dimension, + A: $($thread_bounds)*, + { + fn can_split(&self) -> bool { + self.len() > 1 + } + + fn split(self) -> (Self, Self) { + let max_axis = self.max_stride_axis(); + let mid = self.len_of(max_axis) / 2; + let (a, b) = self.split_at(max_axis, mid); + //println!("Split along axis {:?} at {}", max_axis, mid); + //println!("Result shapes {:?}, {:?}", a.shape(), b.shape()); + (a, b) + } + + #[cfg(rayon_fold_with)] + fn fold_with(self, folder: F) -> F + where F: Folder, + { + self.into_iter().fold(folder, move |f, elt| f.consume(elt)) + } + } + + } +} + +par_iter_view_wrapper!(ArrayView, [Sync]); +par_iter_view_wrapper!(ArrayViewMut, [Sync + Send]); diff --git a/src/lib.rs b/src/lib.rs index 94e027c67..cead81c6c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -64,7 +64,7 @@ //! separately. //! - `rayon` //! - Optional. -//! - Implement rayon 0.5 parallelization traits for `AxisIter`. +//! - Implement rayon 0.6 parallelization. //! #[cfg(feature = "serde")] diff --git a/tests/rayon.rs b/tests/rayon.rs index b3b539d98..82875c875 100644 --- a/tests/rayon.rs +++ b/tests/rayon.rs @@ -30,3 +30,14 @@ fn test_axis_iter_mut() { println!("{:?}", a.slice(s![..10, ..5])); assert!(a.all_close(&b, 0.001)); } + +#[test] +fn test_regular_iter() { + let mut a = Array2::::zeros((M, N)); + for (i, mut v) in a.axis_iter_mut(Axis(0)).enumerate() { + v.fill(i as _); + } + let s = a.view().into_par_iter().map(|&x| x).sum(); + println!("{:?}", a.slice(s![..10, ..5])); + assert_eq!(s, a.scalar_sum()); +} From f3a1dbabe7ba6601d64aded6af44a8ab9748f880 Mon Sep 17 00:00:00 2001 From: bluss Date: Wed, 21 Dec 2016 21:38:13 +0100 Subject: [PATCH 08/13] Add rayon benchmarks --- benches/rayon.rs | 62 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 benches/rayon.rs diff --git a/benches/rayon.rs b/benches/rayon.rs new file mode 100644 index 000000000..8db74740e --- /dev/null +++ b/benches/rayon.rs @@ -0,0 +1,62 @@ + +#![feature(test)] + +extern crate test; +use test::Bencher; + +#[macro_use(s)] +extern crate ndarray; +use ndarray::prelude::*; + +extern crate rayon; +use rayon::prelude::*; + +const EXP_N: usize = 128; + +#[bench] +fn map_exp_regular(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((EXP_N, EXP_N)); + a.swap_axes(0, 1); + bench.iter(|| { + a.mapv_inplace(|x| x.exp()); + }); +} + +#[bench] +fn rayon_exp_regular(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((EXP_N, EXP_N)); + a.swap_axes(0, 1); + bench.iter(|| { + a.view_mut().into_par_iter().for_each(|x| *x = x.exp()); + }); +} + +const FASTEXP: usize = 900; + +#[inline] +fn fastexp(x: f64) -> f64 { + let x = 1. + x/1024.; + x.powi(1024) +} + +#[bench] +fn map_fastexp_regular(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + let mut a = a.slice_mut(s![.., ..-1]); + bench.iter(|| { + a.mapv_inplace(|x| fastexp(x)) + }); +} + +#[bench] +fn rayon_fastexp_regular(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + let mut a = a.slice_mut(s![.., ..-1]); + bench.iter(|| { + a.view_mut().into_par_iter().for_each(|x| *x = fastexp(*x)); + }); +} From 122db3fb12e0cb0376c3067996368dcfb11ae9c4 Mon Sep 17 00:00:00 2001 From: bluss Date: Wed, 21 Dec 2016 21:38:13 +0100 Subject: [PATCH 09/13] Update parameters in rayon impl / benchmark --- Cargo.toml | 3 +++ benches/rayon.rs | 34 ++++++++++++++++++++++++++++++++-- src/iterators/par.rs | 3 +-- 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 035c4642d..3a3bb097e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,9 @@ rayon = { version = "0.6.0", optional = true } version = "0.8.20" optional = true +[dev-dependencies] +num_cpus = "1.2" + [features] blas = ["blas-sys"] diff --git a/benches/rayon.rs b/benches/rayon.rs index 8db74740e..e79447174 100644 --- a/benches/rayon.rs +++ b/benches/rayon.rs @@ -1,6 +1,7 @@ #![feature(test)] +extern crate num_cpus; extern crate test; use test::Bencher; @@ -13,6 +14,14 @@ use rayon::prelude::*; const EXP_N: usize = 128; +use std::cmp::max; + +fn set_threads() { + let n = max(1, num_cpus::get() / 2); + let cfg = rayon::Configuration::new().set_num_threads(n); + let _ = rayon::initialize(cfg); +} + #[bench] fn map_exp_regular(bench: &mut Bencher) { @@ -26,6 +35,7 @@ fn map_exp_regular(bench: &mut Bencher) #[bench] fn rayon_exp_regular(bench: &mut Bencher) { + set_threads(); let mut a = Array2::::zeros((EXP_N, EXP_N)); a.swap_axes(0, 1); bench.iter(|| { @@ -33,7 +43,7 @@ fn rayon_exp_regular(bench: &mut Bencher) }); } -const FASTEXP: usize = 900; +const FASTEXP: usize = 800; #[inline] fn fastexp(x: f64) -> f64 { @@ -45,7 +55,6 @@ fn fastexp(x: f64) -> f64 { fn map_fastexp_regular(bench: &mut Bencher) { let mut a = Array2::::zeros((FASTEXP, FASTEXP)); - let mut a = a.slice_mut(s![.., ..-1]); bench.iter(|| { a.mapv_inplace(|x| fastexp(x)) }); @@ -54,6 +63,27 @@ fn map_fastexp_regular(bench: &mut Bencher) #[bench] fn rayon_fastexp_regular(bench: &mut Bencher) { + set_threads(); + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + bench.iter(|| { + a.view_mut().into_par_iter().for_each(|x| *x = fastexp(*x)); + }); +} + +#[bench] +fn map_fastexp_cut(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + let mut a = a.slice_mut(s![.., ..-1]); + bench.iter(|| { + a.mapv_inplace(|x| fastexp(x)) + }); +} + +#[bench] +fn rayon_fastexp_cut(bench: &mut Bencher) +{ + set_threads(); let mut a = Array2::::zeros((FASTEXP, FASTEXP)); let mut a = a.slice_mut(s![.., ..-1]); bench.iter(|| { diff --git a/src/iterators/par.rs b/src/iterators/par.rs index 9eb45caef..e92fbbdf1 100644 --- a/src/iterators/par.rs +++ b/src/iterators/par.rs @@ -162,8 +162,7 @@ macro_rules! par_iter_view_wrapper { let max_axis = self.max_stride_axis(); let mid = self.len_of(max_axis) / 2; let (a, b) = self.split_at(max_axis, mid); - //println!("Split along axis {:?} at {}", max_axis, mid); - //println!("Result shapes {:?}, {:?}", a.shape(), b.shape()); + //println!("Split along axis {:?} at {}\nshapes {:?}, {:?}", max_axis, mid, a.shape(), b.shape()); (a, b) } From 2730efb69e71eac3f6bc848829a615a549042735 Mon Sep 17 00:00:00 2001 From: bluss Date: Wed, 21 Dec 2016 21:38:13 +0100 Subject: [PATCH 10/13] Add benchmark for AxisIterMut split --- benches/rayon.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/benches/rayon.rs b/benches/rayon.rs index e79447174..f51435df2 100644 --- a/benches/rayon.rs +++ b/benches/rayon.rs @@ -90,3 +90,25 @@ fn rayon_fastexp_cut(bench: &mut Bencher) a.view_mut().into_par_iter().for_each(|x| *x = fastexp(*x)); }); } + +#[bench] +fn map_fastexp_by_axis(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + bench.iter(|| { + for mut sheet in a.axis_iter_mut(Axis(0)) { + sheet.mapv_inplace(fastexp) + } + }); +} + +#[bench] +fn rayon_fastexp_by_axis(bench: &mut Bencher) +{ + set_threads(); + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + bench.iter(|| { + a.axis_iter_mut(Axis(0)).into_par_iter() + .for_each(|mut sheet| sheet.mapv_inplace(fastexp)); + }); +} From 587be30e1096876dbecddda18b553b84c236978b Mon Sep 17 00:00:00 2001 From: bluss Date: Wed, 21 Dec 2016 21:38:13 +0100 Subject: [PATCH 11/13] Implement into parallel iterator for array, rcarray --- src/iterators/par.rs | 44 ++++++++++++++++++++++++++++++++++++++++++++ tests/rayon.rs | 2 +- 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/src/iterators/par.rs b/src/iterators/par.rs index e92fbbdf1..75cc038c2 100644 --- a/src/iterators/par.rs +++ b/src/iterators/par.rs @@ -117,6 +117,50 @@ macro_rules! par_iter_wrapper { par_iter_wrapper!(AxisIter, [Sync]); par_iter_wrapper!(AxisIterMut, [Send + Sync]); +impl<'a, A, D> IntoParallelIterator for &'a Array + where D: Dimension, + A: Sync +{ + type Item = &'a A; + type Iter = Parallel>; + fn into_par_iter(self) -> Self::Iter { + self.view().into_par_iter() + } +} + +impl<'a, A, D> IntoParallelIterator for &'a RcArray + where D: Dimension, + A: Sync +{ + type Item = &'a A; + type Iter = Parallel>; + fn into_par_iter(self) -> Self::Iter { + self.view().into_par_iter() + } +} + +impl<'a, A, D> IntoParallelIterator for &'a mut Array + where D: Dimension, + A: Sync + Send +{ + type Item = &'a mut A; + type Iter = Parallel>; + fn into_par_iter(self) -> Self::Iter { + self.view_mut().into_par_iter() + } +} + +impl<'a, A, D> IntoParallelIterator for &'a mut RcArray + where D: Dimension, + A: Sync + Send + Clone, +{ + type Item = &'a mut A; + type Iter = Parallel>; + fn into_par_iter(self) -> Self::Iter { + self.view_mut().into_par_iter() + } +} + macro_rules! par_iter_view_wrapper { // thread_bounds are either Sync or Send + Sync ($view_name:ident, [$($thread_bounds:tt)*]) => { diff --git a/tests/rayon.rs b/tests/rayon.rs index 82875c875..8ff19ae84 100644 --- a/tests/rayon.rs +++ b/tests/rayon.rs @@ -37,7 +37,7 @@ fn test_regular_iter() { for (i, mut v) in a.axis_iter_mut(Axis(0)).enumerate() { v.fill(i as _); } - let s = a.view().into_par_iter().map(|&x| x).sum(); + let s = a.par_iter().map(|&x| x).sum(); println!("{:?}", a.slice(s![..10, ..5])); assert_eq!(s, a.scalar_sum()); } From 51fbf590d5c7c7e76688e8b1a3ca97b9c5fa539b Mon Sep 17 00:00:00 2001 From: bluss Date: Wed, 21 Dec 2016 21:38:13 +0100 Subject: [PATCH 12/13] =?UTF-8?q?Move=20par=20=E2=86=92=20parallel?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/lib.rs | 2 +- src/{par => parallel}/mod.rs | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename src/{par => parallel}/mod.rs (100%) diff --git a/src/lib.rs b/src/lib.rs index cead81c6c..1ac9423a6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -130,7 +130,7 @@ mod array_serialize; mod arrayformat; mod data_traits; #[cfg(feature = "rayon")] -pub mod par; +pub mod parallel; pub use aliases::*; diff --git a/src/par/mod.rs b/src/parallel/mod.rs similarity index 100% rename from src/par/mod.rs rename to src/parallel/mod.rs From 45dc5b35e28cde0e01539aa5d59db2e60e68623d Mon Sep 17 00:00:00 2001 From: bluss Date: Wed, 21 Dec 2016 21:38:13 +0100 Subject: [PATCH 13/13] Add docs & examples to ndarray::parallel --- src/iterators/par.rs | 2 +- src/parallel/mod.rs | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/src/iterators/par.rs b/src/iterators/par.rs index 75cc038c2..554577210 100644 --- a/src/iterators/par.rs +++ b/src/iterators/par.rs @@ -18,7 +18,7 @@ use super::AxisIter; use super::AxisIterMut; use imp_prelude::*; -/// Iterator wrapper for parallelized implementations. +/// Wrapper type for parallelized implementations. /// /// **Requires crate feature `"rayon"`** #[derive(Copy, Clone, Debug)] diff --git a/src/parallel/mod.rs b/src/parallel/mod.rs index 717e35775..417900cc1 100644 --- a/src/parallel/mod.rs +++ b/src/parallel/mod.rs @@ -2,5 +2,42 @@ //! Parallelization features for ndarray. //! //! **Requires crate feature `"rayon"`** +//! +//! The array views and references to owned arrays all implement +//! `IntoParallelIterator`; the default parallel iterators (each element by +//! reference or mutable reference) have no ordering guarantee in their parallel +//! implementations. +//! +//! `.axis_iter()` and `.axis_iter_mut()` also have parallel counterparts. +//! +//! # Examples +//! +//! Compute the exponential of each element in an array, parallelized. +//! +//! ``` +//! use ndarray::Array2; +//! use ndarray::parallel::rayon_prelude::*; +//! +//! let mut a = Array2::::zeros((128, 128)); +//! a.par_iter_mut().for_each(|x| *x = x.exp()); +//! ``` +//! +//! Use the parallel `.axis_iter()` to compute the sum of each row. +//! +//! ``` +//! use ndarray::Array; +//! use ndarray::Axis; +//! use ndarray::parallel::rayon_prelude::*; +//! +//! let a = Array::linspace(0., 63., 64).into_shape((4, 16)).unwrap(); +//! let mut sums = Vec::new(); +//! a.axis_iter(Axis(0)) +//! .into_par_iter() +//! .map(|row| row.scalar_sum()) +//! .collect_into(&mut sums); +//! +//! assert_eq!(sums, [120., 376., 632., 888.]); +//! ``` +pub use rayon::prelude as rayon_prelude; pub use iterators::par::Parallel;