Tuesday, December 3, 2013

Simplify Asynchronous Programming with C# "yield return"

A few words ahead, please see http://msdn.microsoft.com/en-us/magazine/cc163323.aspx, for the article by inventor of this technology. My post is just a reinvent of the Jeffrey's idea. But my implementation is easy to understand. I read the code by Jeffrey, it's large and hard to digest.

(Next article: Nested IEnumerable functions.)

OK, here we started with my understandable implementation.

Asynchronous programming is extremely important and useful when you are developing an internet server which is going to support large concurrent requests. When dealing with internet data transmission, blocking mode with one thread handling one request is the one of the most popular traditional technologies. Programmers like it because it's easy to implement. On par with traditional programming technologies, asynchronous programming has many advantages.

  1. Far less threads involved, reduce large amount of memory usage.
  2. Since you don't have to create so many threads, its really responsive to handle an incoming request.
Though it's hard to implement, of course, and it's really easy to introduce bugs. Asynchronous calls always appear in Begin/End functions pair, for example, HttpWebRequest.BeginGetResponse() and  HttpWebRequest.EndGetResponse(). You have to call the BeginGetResponse() first, passing in a call back function, so that when it could be called and you got notified when BeginGetResponse() succeeded with a real response object. If your code flow involves several asynchronous calls, then your code is extremely hard to understand and maintain, since your code logic would be teared into several functions.

So, how to make your code stay in beautiful, logical, human readable form of linear structure, avoiding of tearing it into pieces, but still you could enjoy the high performance asynchronous programming technology? The answer is "yield return".

The basic idea of yield return is to form a state machine by writing the state changing logic into a single function or nested function calls. The coding style looks like the code of traditional technology of blocking mode data transmission. Every yield return will return from the function with all state reserved, and when last time the function being called, the code will be executed from last yield return statement.

See this simple example:

        private IEnumerable<int> Foo()
        {
            // some code here
            yield return 1;

            // some code here
            yield return 2;

            // some code here
            yield return 0;

            // some code here
        }


Foo() is a function, also it's a state machine. Every time you call it the function will resume execution at the point that last time returned. For sure that every local variable remains in its state of last yield return. OK, let's see how to execute this function:

        private void Driver()
        {
            foreach (var ele in Foo())
            {
                // do something
            }
        }

It's time to show the core of the technology. Let's incorporate the state machine function with asynchronous function calls.

        private IEnumerable<int> Foo()
        {
            var ar = HttpWebRequest.BeginGetResponse(callbackHandler);
            yield return 1;

            var response = HttpWebRequest.EndGetResponse();
            response.DoSomthing();
        }

The code isn't real working code, just for theory illustration. The first statement calls BeginGetResponse() passing a callback handler in. The callback handler won't be called until BeginGetResponse() finishing the work. During this period of time, no thread of your app work for these stuffs. So you don't have to pay the price to wait for BeginGetResponse() to return, for example, a thread waiting by semaphore or event. Thus we have a chance to make Foo() to execute the rest of the code. But Driver() can't drive the execution for us. So we have to upgrade our Driver() to a real driver which can drive Foo() through all of its async steps.

        public class AsyncDriver
        {
            public IEnumerator<int> iterator;

            public void AsyncCallback(IAsyncResult ar)
            {
                DriveToNext();
            }

            private void DriveToNext()
            {
                iterator.MoveNext();
            }

            public void AsyncCallback(IAsyncResult ar)
            {
                DriveToNext();
            }
        }

        public void Main()
        {
            AsyncDriver asyncDriver = new AsyncDriver();
            asyncDriver.iterator = Foo().GetEnumerator();
        }

        private IEnumerable<int> Foo(AsyncDriver asyncDriver)
        {
            var ar = HttpWebRequest.BeginGetResponse(asyncDriver.AsyncCallback);
            yield return 1;

            var response = HttpWebRequest.EndGetResponse();
            response.DoSomthing();
        }

So we have AsyncDriver class. Pay attention to BeginGetResponse(), as you can see the call back function is offered by AsyncDriver. When BeginGetResponse() finished its work, AsyncDriver will take control, and it drive the async steps in Foo() by calling iterator.MoveNext(). The 'iterator' is from Foo().

That explains the theory of driving async steps of an IEnumerable function. Real code would be more complex.

Here is the complete code of AsyncDriver. And follow that is the usage of AsyncDriver.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace Common
{
    public class AsyncDriver
    {
        private class AsyncDriverResult : IAsyncResult
        {
            private object asyncState;

            public object AsyncState
            {
                get { return asyncState; }
                set { asyncState = value; }
            }

            public WaitHandle AsyncWaitHandle
            {
                get { throw new NotImplementedException(); }
            }

            public bool CompletedSynchronously
            {
                get { throw new NotImplementedException(); }
            }

            private bool isCompleted;

            public bool IsCompleted
            {
                get { return isCompleted; }
                set { isCompleted = value; }
            }
        }

        private AutoResetEvent executionFinishedEvent;
        private Mutex mutex;
        private AsyncDriverResult asyncDriverResult;

        private AsyncCallback AsyncDriverCallback;
        private Exception terminationException;

        private IEnumerable<int> enumerable;

        private IEnumerator<int> iterator;

        public delegate IEnumerable<int> AsyncEnumerator(AsyncDriver asyncDriver, object state);

        public AsyncDriver()
        {
            mutex = new Mutex();
        }

        public void AsyncCallback(IAsyncResult ar)
        {
            DriveToNext();
        }

        private void DriveToNext()
        {
            ThreadPool.QueueUserWorkItem((aeObject) =>
            {
                mutex.WaitOne();

                try
                {
                    bool tonext = iterator.MoveNext();
                    if (tonext)
                    {
                        if (iterator.Current == 0)
                        {
                            OnExecutionFinished();
                        }
                    }
                    else
                    {
                        OnExecutionFinished();
                    }
                }
                catch (Exception ex)
                {
                    terminationException = ex;
                    ex.Data.Add("PtolemaicCallStack", ex.StackTrace);
                    OnExecutionFinished();
                }
                finally
                {
                    mutex.ReleaseMutex();
                }
            },
            this);
        }

        private void OnExecutionFinished()
        {
            iterator.Dispose();
            executionFinishedEvent.Set();
            AsyncDriverCallback(asyncDriverResult);
        }

        public IAsyncResult BeginExecute(AsyncEnumerator asyncEnumerator, AsyncCallback callback, object state)
        {
            asyncDriverResult = new AsyncDriverResult();
            asyncDriverResult.AsyncState = state;

            enumerable = asyncEnumerator(this, state);
            AsyncDriverCallback = callback;
            Execute();
            return asyncDriverResult;
        }

        public void EndExecute(IAsyncResult ar)
        {
            executionFinishedEvent.WaitOne();
            if (terminationException != null)
            {
                throw terminationException;
            }
        }

        public void Execute()
        {
            executionFinishedEvent = new AutoResetEvent(false);
            iterator = enumerable.GetEnumerator();
            DriveToNext();
        }

        public static string DumpException(Exception ex)
        {
            string msg = "\n=================================================================\n"
                + ex.ToString() + "\n"
                + ex.Message + "\n\n" + ex.StackTrace + "\n";

            if (ex.Data.Contains("PtolemaicCallStack"))
            {
                msg += "\n---- PtolemaicCallStack ----\n" + ex.Data["PtolemaicCallStack"];
            }

            msg += "\n=================================================================\n";

            return msg;
        }
    }
}


The usage of AsyncDriver:

        public void Main()
        {
            Common.AsyncDriver asyncDriver = new Common.AsyncDriver();
            var serverContext = new ServerContext();
            serverContext.AsyncDriver = asyncDriver;
            asyncDriver.BeginExecute(Foo, FooCallback, asyncDriver);
        }

        private void FooCallback(IAsyncResult ar)
        {
            var asyncDriver = ar.AsyncState as Common.AsyncDriver;
            asyncDriver.EndExecute(ar);
        }

        private IEnumerable<int> Foo(Common.AsyncDriver asyncDriver, object state)
        {
            var ar = HttpWebRequest.BeginGetResponse(asyncDriver.AsyncCallback);
            yield return 1;

            var response = HttpWebRequest.EndGetResponse();
            response.DoSomthing();
        }