ParallelResult proposal

Sep 28, 2010 at 7:59 AM
Edited Sep 28, 2010 at 7:59 AM

Hi,

SequentialResult is pretty nice, but sometimes I will need multiple results to run in parallel, so I came up with a very simple implementation for  ParallelResult:

 

    public class ParallelResult : IResult
    {
        private readonly IEnumerator<IResult> _enumerator;

        public ParallelResult(IEnumerator<IResult> enumerator)
        {
            _enumerator = enumerator;
        }

        public event EventHandler<ResultCompletionEventArgs> Completed = delegate { };

        public void Execute()
        {
            int inExecution = 0;
            while (_enumerator.MoveNext())
            {
                var next = _enumerator.Current;
                next.Completed += (sender, args) =>
                                     {
                                         if (--inExecution == 0)
                                         {
                                             OnComplete(null, false);
                                         }
                                     };
                next.Execute();
                inExecution++;
            }
        }

        private void OnComplete(Exception error, bool wasCancelled)
        {
            Completed(this, new ResultCompletionEventArgs { Error = error, WasCancelled = wasCancelled });
        }
    }

 

 You can then use this like this:

 

ParallelResult parallelResult = new ParallelResult(Populate());
parallelResult.Execute();

private IEnumerator<IResult> Populate()
        {
ListService<VendorList, Business.Bos.Vendor> vendorListService = new ListService<VendorList, Business.Bos.Vendor>((sender, args) =>
            {
                Vendors = ((ListService<VendorList, Business.Bos.Vendor>)sender).Response;
                LoadingVendors = false;
            });
            yield return vendorListService;

            ListService<CategoryList, Business.Bos.Category> categoriesListService = new ListService<CategoryList, Business.Bos.Category>((sender, args) =>
            {
                Categories = ((ListService<CategoryList, Business.Bos.Category>)sender).Response;
                LoadingCategories = false;
            });
            yield return categoriesListService;
}

You can of course imagine alternating the SequentialResult and ParallelResult, for example you will need 1 think to load first and the rest should go and load in parallel.
The problem here is what do I do with cancel and error ?

Any suggestions and feedback will be greatly appreciated.
Thanks,

 

Sep 28, 2010 at 12:29 PM
Edited Sep 28, 2010 at 12:53 PM

Hi!

I would advice that you take an array of IResult's as argument to the constructor.  The reason is that, when executing the results in parallel like this, code that exists inbetween the yields in the enumerator will not really exute in serial anymore.  Meaning that your Populate() method is not really a sequential state-machine any longer.  This could cause undesireable effects in your application.  If you change your constructor to something like this, it would make it clearer:

        public ParallelResult(params IResult[] results)
        {
            _results = results;
        }

Your Excute method needs to change accordingly.

Below you will find an example where I am using the reactive framework (Rx) to achieve pretty much the same:

public class ForkJoinResult : IResult
{
    private readonly IResult[] _results;

    public ForkJoinResult(params IResult[] results)
    {
        _results = results;
    }

    public void Execute(ActionExecutionContext context = null)
    {
        _results
            .Select(x => x.AsObservable(context))
            .ForkJoin()
            .Subscribe(x => { },
                    ex => Completed(thisnew ResultCompletionEventArgs {Error = ex, WasCancelled = false}),
                    () => Completed(thisnew ResultCompletionEventArgs()));
    }

    public event EventHandler<ResultCompletionEventArgs> Completed = delegate { };
}
public static partial class Extensions
{
    public static IObservable<ResultCompletionEventArgs> AsObservable(this IResult result, ActionExecutionContext context = null)
    {
        var asyncSubject = new AsyncSubject<ResultCompletionEventArgs>();
        result.Completed += (s, e) =>
                                {
                                    if (e.Error != null)
                                        asyncSubject.OnError(e.Error);
                                    else
                                    {
                                        asyncSubject.OnNext(e);
                                        asyncSubject.OnCompleted();
                                    }
                                };
        ThreadPool.QueueUserWorkItem(_ => result.Execute(context));
        return asyncSubject;
    }
}

You could skip using the ThreadPool if your results are of an asynchronous type.  Meaning that their Excute normally returns immediately, and the Completed event fires later.  If your results are of a synchronous type, meaning that the Execute method fires the Complete event before returning, I would advice using the ThreadPool. 



Now you can do like this:

[SetBusy]
public IResult TestMe()
{
    return new ForkJoinResult(
        new SomeArbitraryResult(null),
        new SomeOtherArbitraryResult(null),
        new SomeThirdArbitraryResult(null)));
}
In my example the "TestMe"-method is bound to a button in the corresponding view.
As mentioned, you need to include the reactive framework to use the code above.


 

Sep 28, 2010 at 1:30 PM

Hi,

Very nice I will definitely have to look more carefully into this.

My goal here was to be able to write thinks like 

 

ParallelResult parallelResult = new ParallelResult(Populate());
parallelResult.Execute();
that can be interchangeable with 
SequentialResult sequentialResult = new SequentialResult(Populate());
sequentialResult.Execute();

and of course Populate can yield any of the Parallel and Sequential results thus obtaining thinks like: load this first and then go and load all the other stuff in parallel.

I might be very well able to achieve this using Rx bug I will have to look more closely into it.

Thanks for the reply,