IOCP Thread Pooling in C#

80酷酷网    80kuku.com

  IOCP Thread Pooling in C#By William KennedyContinuum Technology CenterIntroduction

 


When building server based applications in C#, it is important to have the ability to create thread pools.  Thread pools allow our server to queue and perform work in the most efficient and scalable way possible.  Without thread pooling we are left with two options.  The first option is to perform all of the work on a single thread.  The second option is to spawn a thread every time some piece of work needs to be done.  For this article, work is defined as an event that requires the processing of code.  Work may or may not be associated with data, and it is our job to process all of the work our server receives in the most efficient and fastest way possible.


 


As a general rule, if you can accomplish all of the work required with a single thread, then only use a single thread.   Having multiple threads performing work at the same time does not necessarily mean our application is getting more work done, or getting work done faster.  This is true for many reasons. 


 


For example, if you spawn multiple threads which attempt to access the same resource bound to a synchronization object, like a monitor object, these threads will serialize and fall in line waiting for the resource to become available.  As each thread tries to access the resource, it has the potential to block, and wait for the thread that owns the resource to release the resource.  At that point, these waiting threads are put to sleep, and not getting any work done.  In fact, these waiting threads have caused more work for the operating system to perform.  Now the operating system must task another thread to perform work, and then determine which thread, waiting for the resource, may access the resource next, once it becomes available.  If the threads that need to perform work are sleeping, because they are waiting for the resource to become available, we have actually created a performance problem.  In this case it would be more efficient to queue up this work and have a single thread process the queue. 

 

Threads that start waiting for a resource before other threads, are not guaranteed to be given the resource first.  In diagram A, thread 1 requests access to the resource before thread 2, and thread 2 requests access to the resource before thread 3.  The operating system however decides to give the resource to thread 1 first, then thread 3, and then thread 2.  This scenario causes work to be performed in an undetermined order.  The possible issues are endless when dealing with multi-threaded applications.


 






If work received can be performed independent of each other, we could always spawn a thread for processing that piece of work.  The problem here is that an operating system like Windows has severe performance problems when a large number of threads are created or running at the same time, waiting to have access to the CPU.  The Windows operating system needs to manage all of these threads, and compared to the UNIX operating system, it just doesn’t hold up.  If large amounts of work are issued to the server, this model will most likely cause the Windows operating system to become overloaded.  System performance will degrade drastically.


 


This article is a case study comparing thread performance between Windows NT and Solaris.

http://www.usenix.org/publications/library/proceedings/usenix-nt98/full_papers/zabatta/zabatta_html/zabatta.html


 


In the .NET framework, the “System.Threading” namespace has a ThreadPool class.  Unfortunately, it is a static class and therefore our server can only have a single thread pool.  This isn’t the only issue.  The ThreadPool class does not allow us to set the concurrency level of the thread pool.  The concurrency level is the most important setting when configuring a thread pool.  The concurrency level defines how many threads in the pool may be in an “active state” at the same time.  If we set this parameter correctly, we will have the most efficient, performance enhanced thread pool for the work being processed.


 


Imagine we have a thread pool with 4 threads and a concurrency level of 1.  Then, three pieces of work are queued up for processing in the pool.  Since the concurrency level for the thread pool is 1, only a single thread from the pool is activated and given work from the queue.  Even though there are two pieces of work queued up, no other threads are activated.  This is because the concurrency level is set to 1.  If the concurrency level was set to 2, then another thread would have been activated immediately and given work from the queue.  In diagram B we have thread 1 running and all of the other threads sleeping with two pieces of work queued.


 





 


So the question exists, why have more than 1 thread in the pool if the concurrency level is set to 1?  If thread 1 in diagram B ever goes to sleep before it completes its work, another thread from the pool will be activated.  When thread 1 goes to sleep, there are 0 threads “active” in the pool and it is ok to activate a new thread based on the concurrency level.  In diagram C, we now have thread 1 sleeping and thread 4 running with one piece of work queued.


 





 


Eventually, thread 1 will wake up, and it is possible for thread 4 to still be active.  We have 2 threads active in the pool, even though the concurrency level is set to 1.  In diagram D, we now have thread 1 and thread 4 running and one piece of work still queued.


 





 


The last piece of work in the queue will need to wait until both threads return to a sleeping state.  This is because the concurrency level is set to 1.  As we can see, even though the concurrency level restricts the number of active threads in the pool at any given time, we could have more active threads then the concurrency level allows.  It all depends on the state of the threads in the pool and how fast the threads can complete the work they are processing.


 


A good rule of thumb is to set the concurrency level to match the number of CPU’s in the system.  If the machine our server is running on only has one CPU, then only one thread can be executing at any given time.  It will require a task swap to have another thread get CPU time.  We want to reduce the number of active threads at any given time to maximize performance.  This also leads to scalability.  As the number of CPU’s increase, we can increase the concurrency level because there is a CPU to execute that thread.  This is a general rule and is always a good starting point for configuring our thread pools.


 


The bottom line is, if the CPU is available, and there is work to perform, activate a thread.  If the CPU is not available, do not activate a thread.  One other thing, we need to be careful that we don’t cause a situation where the threads in the pool are constantly being put to sleep for long periods of time during the processing of work.  This may cause all of the threads in the pool to constantly be in an active state, defeating the efficiency of the pool and the performance of the server.


 


The remaining scope of this article will show you how to add IOCP thread pools to your C# server based applications.  How to configure the thread pools for your specific application will not be covered.  It is suggested to use the general rules as discussed.


 

System Requirements

 


A basic understanding of C# is required to follow through the examples and the classes.  Basic concepts of type, properties, threading, synchronization, and delegates are required.
Defining the Problem

 


IOCP thread support has not been made available to C# developers through the “System.Threading” namespace.  We need to access the Win32 API calls from the Kernel32.dll.  This requires us to write unsafe code.  This is really not a problem, but something that needs to be discussed.  Let’s take a look at the Win32 API calls we need to implement an IOCP thread pool.


 


[DllImport("Kernel32", CharSet=CharSet.Auto)]

private unsafe static extern UInt32 CreateIoCompletionPort(UInt32 hFile, UInt32 hExistingCompletionPort, UInt32* puiCompletionKey, UInt32 uiNumberOfConcurrentThreads);


 


This Win32 API call is used to create an IOCP thread pool.  The first argument will always be set to INVALID_HANDLE_VALUE, which is 0xFFFFFFFF.  This tells the operating system this IOCP thread pool is not linked to a device.  The second argument will always be set to 0.  There is no existing IOCP thread pool because we are creating this for the first time.  The third argument will always be null.  We do not require a key because we have not associated this IOCP thread pool with a device.  The last argument is the important argument.  Here we define the concurrency level of the thread pool.  If we pass a 0 for this argument the operating system will set the concurrency level to match the number of CPU’s in the machine.  This option gives us our best chance to be scalable and take advantage of the number of CPU’s present in the machine.  This API call will return a handle to the newly created IOCP thread pool.  If the API call fails, it will return null.


 


[DllImport("Kernel32", CharSet=CharSet.Auto)]

private unsafe static extern Boolean CloseHandle(UInt32 hObject);


 


This Win32 API call is used to close our thread pool.  The only argument is the handle to the IOCP thread pool.  This API call will return TRUE or FALSE if the handle can not be closed.


 


[DllImport("Kernel32", CharSet=CharSet.Auto)]

private unsafe static extern Boolean PostQueuedCompletionStatus(UInt32 hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg, OVERLAPPED* pOverlapped);


 


This Win32 API call is used to post work in the IOCP thread pool queue.  Other threads in our application will make this Win32 API call.  The first argument is the handle to the IOCP thread pool.  The second argument is the size of the data we are posting to the queue.  The third argument is a value or a reference to an object or data structure we are posting to the queue.  The last argument will always be null.  The following diagram shows how the data is associated with the posted work.


 







 


In diagram E, we have two threads actively processing posted work and one piece of work on the queue waiting for its data to be processed.  The thing to note here is that each piece of work was given a reference to its specific data.  I am calling this variable pData to help describe what is happening in the IOCP thread pool.  The actual name or structure of this variable is undocumented.


 


When we make this API call in a C++ application, we can pass the address of any object in memory we wish, as in diagram E.  In C#, we don’t have the same luxury because of the managed heap.  The managed heap is a contiguous region of address space that contains all of the memory allocated for reference variables.  The heap maintains a pointer that indicates where the next object is to be allocated, and all allocations are contiguous from that point.  This is much different from the C-runtime heap. 


 


The C-runtime heap uses a link list of data structures to reference available memory blocks.  For the C-runtime heap to allocate memory, it must walk through the link list until a large enough block of free memory is found.  Then the free block of memory must be resized, and the link list adjusted.  If objects are allocated consecutively in a C++ application, those objects could be allocated anywhere on the heap.  This can never happen with the managed heap.  Objects that are allocated consecutively in a C# application will always be allocated consecutively on the managed heap.  The catch is that the managed heap must be compacted to guarantee the heap does not run out of memory.  That is the job of garbage collection.


 


For more information on garbage collection, try these links:

http://msdn.microsoft.com/library/default.asp?url=/library/en-us/cpguide/html/cpconautomaticmemorymanagement.asp

http://msdn.microsoft.com/msdnmag/issues/1100/GCI/default.aspx

http://msdn.microsoft.com/msdnmag/issues/1200/GCI2/default.aspx


 


In diagram F, we have allocated four objects on the managed heap.  Imagine that the managed heap has allocated memory for these objects at address FDEO, FDDO, FDCO, and FDBO.  This would mean the value of pClass1 is FDEO, the value of pClass2 is FDDO, the value of pClass3 is FDCO, and the value of pClass4 is FDBO. 

 

MyClass pClass1 = new MyClass();



MyClass pClass2 = new MyClass();



MyClass pClass3 = new MyClass();



MyClass pClass4 = new MyClass();





Now we write the following code.


 


pClass2 = null;




 


Diagram G shows what happens to the managed heap after garbage collection takes place and the managed heap is compacted.


 





 


The Class 2 object has been removed from the managed heap and the Class3 and Class 4 objects have been moved.  Now the value of pClass3 is FDDO and the value of pClass4 is FDCO.  The value that the pointer points to has changed.  The garbage collection process changes the values of all reference variables to make sure they are pointing to the correct objects after the managed heap is compacted.


 


So what does this mean for our IOCP thread pool implementation?  If we pass the reference of a managed object as the data for the work, there is a chance the reference is no longer valid when a thread in the pool is chosen to work on the data.


 





 


In diagram H, we have passed a reference to the Class 3 object as the data for the work posted to the IOCP thread pool.  This object is at address FDCO.  Before the work is given to thread 1, the Class 2 object is marked for deletion.  Then the garbage collection process runs, and the managed heap is compacted.  Now in diagram I, the work has been given to thread 1 for processing.  The value of pData is still FDCO, but Class 3 is no longer at address FDCO, it is at address FDDO.  The thread will perform the work, but using Class 4 instead of Class 3.


 


The garbage collection process can not change the value of pData, as it does with other variables, because this variable is not a managed variable.  It is a variable owned by the IOCP thread pool and exists outside the scope of the CLR.  The garbage collector has no knowledge of this variable or access to this variable.  The variable is set during the unsafe call to PostQueuedCompletionStatus.



 

Unfortunately, pinning the objects we want to pass as the data for the work posted to the IOCP thread pool is not a possible solution.  Pinning provides the ability to prevent an object from being moved on the manage heap during the garbage collection process.  We can not pin these objects because there is no way to pin an object in one thread and unpin the object in a different thread.  To pin an object, we need to use the fixed keyword.  This keyword can only be used in the context of a single method. Here is a quick example of pinning.


 


Int32 iArray = new Int32[5] {12, 34, 56, 78, 90};



unsafe



{



  fixed (Int32* piArray = iArray)



  {



    // Do Something



  }



}




 


The safest thing we can do is pass a value to the IOCP thread pool.  This value could be the index from a managed array, containing a reference to an object on the managed heap.   If the garbage collection process does compact the heap, the index values of the array will not change.  In diagram J and K, we can see one way to properly pass data for the work posted to the IOCP thread pool.  After the garbage collection process compacts the heap, the values of pData change, but the index positions to the pData variables do not change.


 





 


[DllImport("Kernel32", CharSet=CharSet.Auto)]

private static extern Boolean GetQueuedCompletionStatus(UInt32 hCompletionPort, UInt32* pSizeOfArgument, UInt32* puiUserArg, OVERLAPPED** ppOverlapped, UInt32 uiMilliseconds);


 


The final Win32 API call is used to add threads to the IOCP thread pool.  Any thread that makes this Win32 API call will become part of the IOCP thread pool.  This is a blocking call and the method will return when the IOCP thread pool chooses the thread to perform work.  The first argument is the handle to the IOCP thread pool.  The second argument is the size of the data associated with the work.  This value was provided when the work was posted.  The third argument is the data value or data reference associated with the work.  This value was provided when the work was posted.  The forth argument is the address to a pointer of type OVERLAPPED.  This address is returned after the call.  The last argument is the time in milliseconds the thread should wait to be activated to perform work.  We will always pass INFINITE or 0xFFFFFFFF.




 


These are the Win32 API calls we need to add IOCP thread pool support to our C# server based applications.  We need to encapsulate these Win32 API calls using .NET threads and minimize the sections of unsafe code.  We need to prevent the application developer from passing a reference variable into the IOCP thread pool, by restricting them to passing only integer values.
Defining the Solution
We will build a class that encapsulates a single IOCP thread pool.  The application developer will be able to instantiate as many thread pools as he wishes.  During construction, the application developer will be able to: set the concurrency level of the thread pool, set the minimum and maximum number of threads in the pool, and will be able to provide a method to be called when work posted to the thread pool needs to be processed.  The application developer will also be able to post work with data into the IOCP thread pool.
Component Design and Coding

 


Start by adding a new class to your C# project.  Remove all of the code provided by the Visual Studio .NET wizard.  Then add the following namespaces.  The System.Runtime.InteropServices namespace is required to access the Win32 API methods from the Kernel32 DLL. 


 


using System;

using System.Threading;

using System.Runtime.InteropServices;


 


Don’t forget to change the project properties to allow unsafe code blocks.  This can be done by opening the project properties and selecting the Configuration Properties.  Under the Build / Code Generation section you will see “Allow unsafe code blocks”.  Set this to true.


 


Next add the namespace.   You will notice that I have defined a two level namespace.  This is great when you are building a class library with many different classes.


 


namespace Continuum.Threading

{


 


The PostQueuedCompletionStatus and GetQueuedCompletionStats Win32 API methods both require a pointer to the Win32 OVERLAPPED structure.  Because this structure will be used by the unsafe Win32 API call, we need to make sure the structure is aligned exactly the same way it would be in our C++ applications.  This can be accomplished by using the StructLayout attribute.  By setting the attribute to “LayoutKind.Sequential”, the structure will be aligned based on the same rules as the C++ compiler.


 


The structure requires members that are pointers.  The only way to add pointers to this structure is to use the unsafe keyword.  We can still use the FCL types when defining the structure.  This is very important because we can make sure the structure is identical to the C++ version.


 


// Structures




 


  //=============================================================================



  /// <summary> This is the WIN32 OVERLAPPED structure </summary>



  [StructLayout(LayoutKind.Sequential, CharSet=CharSet.Auto)]

  public unsafe struct OVERLAPPED

  {

    UInt32* ulpInternal;

    UInt32* ulpInternalHigh;

    Int32   lOffset;

    Int32   lOffsetHigh;

    UInt32  hEvent;

  }


 


Now it is time to define the IOCP thread pool class.  I am using the keyword sealed in the definition of this class.  The sealed keyword tells the compiler that this class can not be inherited.  If you know there is no reason for a class to be inherited, use the sealed keyword.  Certain run-time optimizations are enabled for the class when the sealed keyword is used.


 


// Classes




 


  //=============================================================================



  /// <summary> This class provides the ability to create a thread pool to manage work.  The



  ///           class abstracts the Win32 IOCompletionPort API so it requires the use of



  ///           unmanaged code.  Unfortunately the .NET framework does not provide this functionality </summary>



  public sealed class IOCPThreadPool

  {


 


The first section of the IOCP thread pool class is the Win32 function prototypes.  These are the same ones described earlier.


 


  // Win32 Function Prototypes




 


    /// <summary> Win32Func: Create an IO Completion Port Thread Pool </summary>



    [DllImport("Kernel32", CharSet=CharSet.Auto)]

    private unsafe static extern UInt32 CreateIoCompletionPort(UInt32 hFile, UInt32 hExistingCompletionPort, UInt32* puiCompletionKey, UInt32 uiNumberOfConcurrentThreads);


 


    /// <summary> Win32Func: Closes an IO Completion Port Thread Pool </summary>



    [DllImport("Kernel32", CharSet=CharSet.Auto)]

    private unsafe static extern Boolean CloseHandle(UInt32 hObject);


 


    /// <summary> Win32Func: Posts a context based event into an IO Completion Port Thread Pool </summary>



    [DllImport("Kernel32", CharSet=CharSet.Auto)]

    private unsafe static extern Boolean PostQueuedCompletionStatus(UInt32 hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg, OVERLAPPED* pOverlapped);


 


    /// <summary> Win32Func: Waits on a context based event from an IO Completion Port Thread Pool.



    ///           All threads in the pool wait in this Win32 Function </summary>



    [DllImport("Kernel32", CharSet=CharSet.Auto)]

    private unsafe static extern Boolean GetQueuedCompletionStatus(UInt32 hCompletionPort, UInt32* pSizeOfArgument, UInt32* puiUserArg, OVERLAPPED** ppOverlapped, UInt32 uiMilliseconds);


 


The next section is the constants section.  Here we need to define the Win32 constants required for the Win32 API calls we are going to make later.


 


  // Constants




 


    /// <summary> SimTypeConst: This represents the Win32 Invalid Handle Value Macro </summary>



    private const UInt32 INVALID_HANDLE_VALUE = 0xffffffff;


 


    /// <summary> SimTypeConst: This represents the Win32 INFINITE Macro </summary>



    private const UInt32 INIFINITE = 0xffffffff;


 


    /// <summary> SimTypeConst: This tells the IOCP Function to shutdown </summary>



    private const Int32 SHUTDOWN_IOCPTHREAD = 0x7fffffff;


 


The delegate function type section is where we define any delegate functions.  We need one delegate function type to define the signature of the function we will call when work needs to be processed.


 


  // Delegate Function Types




 


    /// <summary> DelType: This is the type of user function to be supplied for the thread pool </summary>



    public delegate void USER_FUNCTION(Int32 iValue);


 


These private properties are required to maintain the application developer’s settings.  The most interesting property is the GetUserFunction property.  This property contains a reference to a method supplied by the application developer.  We will use this property to call the application developers method.


 


  // Private Properties




 


    private UInt32 m_hHandle;

      /// <summary> SimType: Contains the IO Completion Port Thread Pool handle for this instance </summary>



      private UInt32 GetHandle { get { return m_hHandle; } set { m_hHandle = value; } }


 


    private Int32 m_uiMaxConcurrency;

      /// <summary> SimType: The maximum number of threads that may be running at the same time </summary>



      private Int32 GetMaxConcurrency { get { return m_uiMaxConcurrency; } set { m_uiMaxConcurrency = value; } }


 


    private Int32 m_iMinThreadsInPool;

      /// <summary> SimType: The minimal number of threads the thread pool maintains </summary>



      private Int32 GetMinThreadsInPool { get { return m_iMinThreadsInPool; } set { m_iMinThreadsInPool = value; } }


 


    private Int32 m_iMaxThreadsInPool;

      /// <summary> SimType: The maximum number of threads the thread pool maintains </summary>



      private Int32 GetMaxThreadsInPool { get { return m_iMaxThreadsInPool; } set { m_iMaxThreadsInPool = value; } }


 


    private Object m_pCriticalSection;

      /// <summary> RefType: A serialization object to protect the class state </summary>



      private Object GetCriticalSection { get { return m_pCriticalSection; } set { m_pCriticalSection = value; } }


 


    private USER_FUNCTION m_pfnUserFunction;

      /// <summary> DelType: A reference to a user specified function to be call by the thread pool </summary>



      private USER_FUNCTION GetUserFunction { get { return m_pfnUserFunction; } set { m_pfnUserFunction = value; } }


 


    private Boolean m_bDisposeFlag;

      /// <summary> SimType: Flag to indicate if the class is disposing </summary>



      private Boolean IsDisposed { get { return m_bDisposeFlag; } set { m_bDisposeFlag = value; } }

 

These public properties are used to determine if new threads need to be added to the thread pool.  These properties also provide statistical data about the thread pool.  Here we use the Interlocked class to provide serialization when we increment or decrement these properties.  This is the least expensive way to perform serialization.


 


  // Public Properties



 

    private Int32 m_iCurThreadsInPool;

      /// <summary> SimType: The current number of threads in the thread pool </summary>



      public Int32 GetCurThreadsInPool { get { return m_iCurThreadsInPool; } set { m_iCurThreadsInPool = value; } }

      /// <summary> SimType: Increment current number of threads in the thread pool </summary>



      private Int32 IncCurThreadsInPool() { return Interlocked.Increment(ref m_iCurThreadsInPool); }

      /// <summary> SimType: Decrement current number of threads in the thread pool </summary>



      private Int32 DecCurThreadsInPool() { return Interlocked.Decrement(ref m_iCurThreadsInPool); }


 


    private Int32 m_iActThreadsInPool;

      /// <summary> SimType: The current number of active threads in the thread pool </summary>



      public Int32 GetActThreadsInPool { get { return m_iActThreadsInPool; } set { m_iActThreadsInPool = value; } }

      /// <summary> SimType: Increment current number of active threads in the thread pool </summary>



      private Int32 IncActThreadsInPool() { return Interlocked.Increment(ref m_iActThreadsInPool); }

      /// <summary> SimType: Decrement current number of active threads in the thread pool </summary>



      private Int32 DecActThreadsInPool() { return Interlocked.Decrement(ref m_iActThreadsInPool); }


 


    private Int32 m_iCurWorkInPool;

      /// <summary> SimType: The current number of Work posted in the thread pool </summary>



      public Int32 GetCurWorkInPool { get { return m_iCurWorkInPool; } set { m_iCurWorkInPool = value; } }

      /// <summary> SimType: Increment current number of Work posted in the thread pool </summary>



      private Int32 IncCurWorkInPool() { return Interlocked.Increment(ref m_iCurWorkInPool); }

      /// <summary> SimType: Decrement current number of Work posted in the thread pool </summary>



      private Int32 DecCurWorkInPool() { return Interlocked.Decrement(ref m_iCurWorkInPool); }


 


The constructor method does several things.  The class state is initialized and then the IOCP thread pool is created with a call to the CreateIoCompletionPort method.  Notice the method call is within the scope of the unsafe keyword.  This is required because we are passing pointers into the Win32 API call.  The last thing we do is create the minimal number of threads specified by the application developer.  Notice we use the .NET threading classes to create the threads.  We do not need to use the unsafe CreateThread method.  One might think we need to because these threads will be calling the GetQueuedCompletionStatus Win32 API method.


 


  // Constructor, Finalize, and Dispose




 


    //********************************************************************



    /// <summary> Constructor </summary>



    /// <param name = "iMaxConcurrency"> SimType: Max number of running threads allowed </param>



    /// <param name = "iMinThreadsInPool"> SimType: Min number of threads in the pool </param>



    /// <param name = "iMaxThreadsInPool"> SimType: Max number of threads in the pool </param>



    /// <param name = "pfnUserFunction"> DelType: Reference to a function to call to perform work </param>



    /// <exception cref = "Exception"> Unhandled Exception </exception>



    public IOCPThreadPool(Int32 iMaxConcurrency, Int32 iMinThreadsInPool, Int32 iMaxThreadsInPool, USER_FUNCTION pfnUserFunction)

    {

      try



      {

        // Set initial class state



        GetMaxConcurrency   = iMaxConcurrency;

        GetMinThreadsInPool = iMinThreadsInPool;

        GetMaxThreadsInPool = iMaxThreadsInPool;

        GetUserFunction     = pfnUserFunction;


 


        // Init the thread counters



        GetCurThreadsInPool = 0;

        GetActThreadsInPool = 0;

        GetCurWorkInPool    = 0;


 


        // Initialize the Monitor Object



        GetCriticalSection = new Object();


 


        // Set the disposing flag to false



        IsDisposed = false;


 


        unsafe



        {

          // Create an IO Completion Port for Thread Pool use



          GetHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, null, (UInt32) GetMaxConcurrency);

        }


 


        // Test to make sure the IO Completion Port was created



        if (GetHandle == 0)

          throw new Exception("Unable To Create IO Completion Port");


 


        // Allocate and start the Minimum number of threads specified



        Int32 iStartingCount = GetCurThreadsInPool;

       

        ThreadStart tsThread = new ThreadStart(IOCPFunction);

        for (Int32 iThread = 0; iThread < GetMinThreadsInPool; ++iThread)

        {

          // Create a thread and start it



          Thread thThread = new Thread(tsThread);

          thThread.Name = "IOCP " + thThread.GetHashCode();

          thThread.Start();


 


          // Increment the thread pool count



          IncCurThreadsInPool();

        }

      }


 


      catch



      {

        throw new Exception("Unhandled Exception");

      }

    }


 


The finalize method is only required to guarantee the IOCP thread pool handle is closed.  As a general rule, if a class allocates a resource outside the scope of the .NET framework, a finalize method is required else do not add a finalize method.  A finalize method will cause the garbage collection process to spend more time trying to release the memory for the object.


 


    //********************************************************************



    /// <summary> Finalize called by the GC </summary>



    ~IOCPThreadPool()

    {

      if (!IsDisposed)

        Dispose();

    }


 


The dispose method will not return until all of the threads in the pool have been terminated.  We can’t use the Abort method to kill the threads in the pool because any thread blocked, via the call to the GetQueuedCompletionStatus Win32 API method, will not respond to the Abort message.  The GetQueuedCompletionStatus Win32 API method will cause the thread to run outside the scope of the CLR and the .NET framework will lose access to the thread.  So what we do is post work into the IOCP thread pool.  We pass the SHUTDOWN_IOCPTHREAD data when we post the work.  This will tell the thread to terminate.  Then, we wait in a spin lock, until all of the threads have terminated.  The last thing is to close the IOCP thread pool.


 


    //********************************************************************



    /// <summary> Called when the object will be shutdown.  This



    ///           function will wait for all of the work to be completed



    ///           inside the queue before completing </summary>



    public void Dispose()

    {

      try



      {

        // Flag that we are disposing this object



        IsDisposed = true;


 


        // Get the current number of threads in the pool



        Int32 iCurThreadsInPool = GetCurThreadsInPool;


 


        // Shutdown all thread in the pool



        for (Int32 iThread = 0; iThread < iCurThreadsInPool; ++iThread)

        {

          unsafe



          {

            bool bret = PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) SHUTDOWN_IOCPTHREAD, null);

          }

        }


 


        // Wait here until all the threads are gone



        while (GetCurThreadsInPool != 0) Thread.Sleep(100);


 


        unsafe



        {

          // Close the IOCP Handle



          CloseHandle(GetHandle);

        }

      }


 


      catch



      {

      }

    }


 


The only private method is the IOCPFunction method.  This method is spawned as a thread and is made part of the IOCP thread pool by calling the GetQueuedCompletionStatus Win32 API method.  When the GetQueuedCompletionStatus Win32 API method returns, we check to make sure we are not being asked to shutdown the thread.  The third argument is the data associated with the posted work.  If the data is not SHUTDOWN_IOCPTHREAD, then real work has been posted into the IOCP thread pool and this thread has been chosen to process the work.  The application developer’s supplied user function is called since the application developer is the only one who knows what needs to be done.  Once that is complete, the method checks if a new thread should be added to the pool.  This is done by reviewing the number of active threads in the pool.


 


  // Private Methods



 

    //********************************************************************



    /// <summary> IOCP Worker Function that calls the specified user function </summary>



    private void IOCPFunction()

    {

      UInt32 uiNumberOfBytes;

      Int32  iValue;


 


      try



      {

        while (true)

        {

          unsafe



          {

            OVERLAPPED* pOv;


 


            // Wait for an event



            GetQueuedCompletionStatus(GetHandle, &uiNumberOfBytes, (UInt32*) &iValue, &pOv, INIFINITE);

          }

         

          // Decrement the number of events in queue



          DecCurWorkInPool();


 


          // Was this thread told to shutdown



          if (iValue == SHUTDOWN_IOCPTHREAD)

            break;


 


          // Increment the number of active threads



          IncActThreadsInPool();


 


          try



          {

            // Call the user function



            GetUserFunction(iValue);

          }


 


          catch



          {

          }


 


          // Get a lock



          Monitor.Enter(GetCriticalSection);


 


          try



          {

            // If we have less than max threads currently in the pool



            if (GetCurThreadsInPool < GetMaxThreadsInPool)

            {

              // Should we add a new thread to the pool



              if (GetActThreadsInPool == GetCurThreadsInPool)

              {

                if (IsDisposed == false)

                {

                  // Create a thread and start it



                  ThreadStart tsThread = new ThreadStart(IOCPFunction);

                  Thread thThread = new Thread(tsThread);

                  thThread.Name = "IOCP " + thThread.GetHashCode();

                  thThread.Start();


 


                  // Increment the thread pool count



                  IncCurThreadsInPool();

                }

              }

            }

          }


 


          catch



          {

          }


 


          // Relase the lock



          Monitor.Exit(GetCriticalSection);


 


          // Increment the number of active threads



          DecActThreadsInPool();

        }

      }


 


      catch



      {

      }


 


      // Decrement the thread pool count



      DecCurThreadsInPool();

    }


 


The last two public methods are the PostEvent methods.  The first method takes an integer as an argument and the second version takes no argument at all.  The integer is the data the application developer wishes to pass with the work posted into the IOCP thread pool.  In the PostQueuedCompletionStatus Win32 API call, we can see that the third argument is where we pass the data value.   Since this value is always an integer we set the size of the data to four, as seen in the second argument.  Like in the IOCPFunction, we check to see if we need to add a new thread to the pool.


 


  // Public Methods




 


    //********************************************************************



    /// <summary> IOCP Worker Function that calls the specified user function </summary>



    /// <param name="iValue"> SimType: A value to be passed with the event </param>



    /// <exception cref = "Exception"> Unhandled Exception </exception>



    public void PostEvent(Int32 iValue)

    {

      try



      {

        // Only add work if we are not disposing



        if (IsDisposed == false)

        {

          unsafe



          {

            // Post an event into the IOCP Thread Pool



            PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) iValue, null);

          }


 


          // Increment the number of item of work



          IncCurWorkInPool();


 


          // Get a lock



          Monitor.Enter(GetCriticalSection);


 


          try



          {

            // If we have less than max threads currently in the pool



            if (GetCurThreadsInPool < GetMaxThreadsInPool)

            {

              // Should we add a new thread to the pool



              if (GetActThreadsInPool == GetCurThreadsInPool)

              {

                if (IsDisposed == false)

                {

                  // Create a thread and start it



                  ThreadStart tsThread = new ThreadStart(IOCPFunction);

                  Thread thThread = new Thread(tsThread);

                  thThread.Name = "IOCP " + thThread.GetHashCode();

                  thThread.Start();


 


                  // Increment the thread pool count



                  IncCurThreadsInPool();

                }

              }

            }

          }


 


          catch



          {

          }


 


          // Release the lock



          Monitor.Exit(GetCriticalSection);

        }

      }


 


      catch (Exception e)

      {

        throw e;

      }


 


      catch



      {

        throw new Exception("Unhandled Exception");

      }

    } 


 


    //********************************************************************



    /// <summary> IOCP Worker Function that calls the specified user function </summary>



    /// <exception cref = "Exception"> Unhandled Exception </exception>



    public void PostEvent()

    {

      try



      {

        // Only add work if we are not disposing



        if (IsDisposed == false)

        {

          unsafe



          {

            // Post an event into the IOCP Thread Pool



            PostQueuedCompletionStatus(GetHandle, 0, null, null);

          }


 


          // Increment the number of item of work



          IncCurWorkInPool();


 


          // Get a lock



          Monitor.Enter(GetCriticalSection);


 


          try



          {

            // If we have less than max threads currently in the pool



            if (GetCurThreadsInPool < GetMaxThreadsInPool)

            {

              // Should we add a new thread to the pool



              if (GetActThreadsInPool == GetCurThreadsInPool)

              {

                if (IsDisposed == false)

                {

                  // Create a thread and start it



                  ThreadStart tsThread = new ThreadStart(IOCPFunction);

                  Thread thThread = new Thread(tsThread);

                  thThread.Name = "IOCP " + thThread.GetHashCode();

                  thThread.Start();


 


                  // Increment the thread pool count



                  IncCurThreadsInPool();

                }

              }

            }

          }


 


          catch



          {

          }


 


          // Release the lock



          Monitor.Exit(GetCriticalSection);

        }

      }


 


      catch (Exception e)

      {

        throw e;

      }


 


      catch



      {

        throw new Exception("Unhandled Exception");

      }

    }

  }

}




 


We have now completed the implementation of the IOCP thread pool class.  Now it is time to test it.
The Sample Application

 


Start by adding a new class to your C# project.  Remove all of the code provided by the Visual Studio .NET wizard.  Then add all of the following code.  In
Main
, an IOCP thread pool is created, and a single piece of work is posted to the IOCP thread pool.  We pass the data value of 10 along with the posted work.  The main thread is then put to sleep. This gives the IOCP thread function time to wake up to process the work posted.  The last thing in main is to dispose the IOCP thread pool.  The IOCP thread function displays the value of the data passed into the IOCP thread pool.


 


using System;

using System.Threading;  // Included for the Thread.Sleep call



using Continuum.Threading;


 


namespace Sample

{

  //=============================================================================



  /// <summary> Sample class for the threading class </summary>



  public class UtilThreadingSample

  {

    //***************************************************************************** 



    /// <summary> Test Method </summary>



    static void
Main
()

    {

      // Create the MSSQL IOCP Thread Pool



      IOCPThreadPool pThreadPool = new IOCPThreadPool(0, 5, 10, new IOCPThreadPool.USER_FUNCTION(IOCPThreadFunction));

     

      pThreadPool.PostEvent(10);

     

      Thread.Sleep(100);

     

      pThreadPool.Dispose();

    }

   

    //********************************************************************



    /// <summary> Function to be called by the IOCP thread pool.  Called when



    ///           a command is posted for processing by the SocketManager </summary>



    /// <param name="iValue"> The value provided by the thread posting the event </param>



    static public void IOCPThreadFunction(Int32 iValue)

    {

      try



      {

        Console.WriteLine("Value: {0}", iValue);



      }

     

      catch (Exception pException)

      {

        Console.WriteLine(pException.Message);

      }

    }

  }

}


 


This is what you should see when you run the sample application.  On your own change the main function to call the PostEvent method several times and see how the IOCP thread pool performs.


 



 



Review
This class provides the IOCP thread pooling support your server based software requires.  By using this class, you can build efficient and robust servers that can scale as the amount of work increases.  Though there are some limitations due to the nature of the managed heap and unsafe code, these limitations can easily be overcome with a little ingenuity.


 

分享到
  • 微信分享
  • 新浪微博
  • QQ好友
  • QQ空间
点击: