CActiveQueueWithTimeout

A C++ class that provides a thread-safe FIFO queue with timeout.
by
Dr. Thomas Becker
tmbecker@compuserve.com


Contents

  1. Copyright and Permission Notice
  2. Overview of CActiveQueueWithTimeout
  3. CActiveQueueWithTimeout Public Members
  4. CActiveQueueWithTimeout Exception Handling
  5. CActiveQueueWithTimeout Example

Copyright and Permission Notice

COPYRIGHT (C) 1997 Thomas Becker

PERMISSION NOTICE:

PERMISSION TO USE, COPY, MODIFY, AND DISTRIBUTE THIS SOFTWARE FOR ANY PURPOSE AND WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT ALL COPIES ARE ACCOMPANIED BY THE COMPLETE MACHINE-READABLE SOURCE CODE, ALL MODIFIED FILES CARRY PROMINENT NOTICES AS TO WHEN AND BY WHOM THEY WERE MODIFIED, THE ABOVE COPYRIGHT NOTICE, THIS PERMISSION NOTICE AND THE NO-WARRANTY NOTICE BELOW APPEAR IN ALL SOURCE FILES AND IN ALL SUPPORTING DOCUMENTATION.

NO-WARRANTY NOTICE:

THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT EXPRESS OR IMPLIED WARRANTY. ALL IMPLIED WARRANTIES OF FITNESS FOR ANY PARTICULAR PURPOSE AND OF MERCHANTABILITY ARE HEREBY DISCLAIMED.

Overview of CActiveQueueWithTimeout

template<class T, DWORD dwLEN>
class CActiveQueueWithTimeout

The CActiveQueueWithTimout class template realizes a thread-safe queue that is administered by an agent thread and allows timing out of elements that have already been placed into the queue.

The first template argument indicates the type of the queue elements, the second one indicates the length of the queue. It is easy to change the source code so that the length of the queue can be set at runtime: make the template argument a class member that is set by the constructor, find the places where it occurs as the length parameter in array declarations, and replace these array declarations with heap allocations using the new operator.

The constructor takes as its first mandatory argument the number of worker threads that will be retrieving elements from the queue. If this number is less than the actual number of worker threads, the Retrieve member function will throw a CWin32Exception with error code 298L (ERROR_TOO_MANY_POSTS). If this exception is caught and ignored, the queue will operate, but it will allow only the specified number of worker threads to operate on queue elements simultaneously.

The second mandatory argument to the constructor is a function pointer of type void (*)(T&), where T is the type of the queue elements. This function will be called on elements that are removed from the queue because their timeout interval has elapsed. Typically, this function reconnects a communications device such as a named pipe.

The third argument to the constructor is optional. It is the handle to an exit event. Setting this event will cause the Insert and Retrieve member functions to return immediately and terminate the agent thread. This event handle can also be specified later by means of the SetHandleToExitEvent member function. Note that an exit event must be specified before any of the member functions can be called.

The Start member function starts the agent thread that administers the queue. This function must be called before the first call to the Insert and Retrieve member function. The Start member function will also check the handles that were created by the constructor and throw CWin32Exceptions in case something went wrong.

The Insert member function inserts an element into the queue. This is the function that is typically called by a listening thread. Such a thread accepts clients from a communications device and then places in the queue a pointer or an array index that identifies e.g. a socket descriptor or a pipe handle. Elements are always copied into the queue; therefore, the element type must possess a public copy constructor if it is a user-defined class. The Insert member function takes two timeout values as arguments: the timeout for the element once it has entered the queue, and the timeout for the Insert member function itself if it has to wait because the queue is full.

The Retrieve member function retrieves an element from the queue. This is the function that is typically called by worker threads to retrieve a pointer or an array index leading to a communications device.

The GetQueueAgentThread member function returns a handle to the agent thread that administers the queue, so that a supervising thread can watch it.

All CActiveQueueWithTimeout member functions throw CWin32Exceptions if Win32 errors are encountered. The agent thread terminates with the last error code as its return value if it encounters a Win32 error.

If the destructor encounters an error, it throws a CWin32Exception but does not propagate it.

CActiveQueueWithTimeout Public Members

Enums
waitResult Return values from functions that perform wait operations.
Construction
CActiveQueueWithTimeout Constructs a CActiveQueueWithTimeout object.
Operations
Start Starts the queue.
Insert Inserts an element into the queue.
Retrieve Retrieves an element from the queue.
SetHandleToExitEvent Places the handle to the exit event in a private member variable.
GetQueueAgentThread Returns a handle to the queue agent thread.

CActiveQueueWithTimeout::waitResult

Return values from functions that perform wait operations.
enum waitResult
{
  waitTimeout, // timeout occurred
  waitExitEvent, // exit event was set
  waitSuccess  // operation completed successfully
} ;

CActiveQueueWithTimeout::CActiveQueueWithTimeout

The constructor creates the synchronization objects and initializes the member variables. To avoid exceptions from constructors, no error check is made on the Create... APIs. The Start member function will throw an exception if the handles are NULL.
template<class T, DWORD dwLEN>
CActiveQueueWithTimeout<T, dwLEN>::CActiveQueueWithTimeout( 
  DWORD dwNumWorkerThreads, // number of working threads
  void (*DealWithTimedOutElement)(T&),  // function pointer
  HANDLE hExitEvent // = NULL exit event
  );

Parameters

dwNumWorkerThreads
Indicates the number of worker threads that will be retrieving from the queue. If this number is less than the actual number of worker threads, the Retrieve member function will throw a CWin32Exception with error code 298L (ERROR_TOO_MANY_POSTS). If this exception is caught and ignored, the queue will operate, but it will allow only the specified number of worker threads to operate on queue elements simultaneously.
DealWithTimedOutElement
Points to the function that is to be applied to elements that are timed out of the queue.
hExitEvent
Indicates the handle to the exit event that causes all member functions to return immediately. This handle can be set later by means of the SetHandleToExitEvent member function. However, the handle must be set to a valid event handle before the Start member function is called.

CActiveQueueWithTimeout::Start

The Start member function checks if the object handles created in the constructor are valid and starts the queue agent thread.
template<class T, DWORD dwLEN>
CActiveQueueWithTimeout<T, dwLEN>::waitResult 
CActiveQueueWithTimeout<T, dwLEN>::Start(
  DWORD dwMilliseconds // = INFINITE timeout
  );

Parameters

dwMilliseconds
Indicates the timeout value in milliseconds or INFINITE.

Return Value and Exceptions

If the function succeeds, the return value is waitSuccess.
If the function fails, a CWin32Exception is thrown.

CActiveQueueWithTimeout::Insert

The Insert member function places a new element into the queue.
template<class T, DWORD dwLEN>
CActiveQueueWithTimeout<T, dwLEN>::waitResult 
CActiveQueueWithTimeout<T, dwLEN>::Insert(
  const T& refNewElement, // new element
  DWORD dwMillisecondsInQueue, // timeout in queue
  DWORD dwMilliseconds // timeout
  );

Parameters

refNewElement
Refers to the new element that is to be inserted.
dwMillisecondsInQueue
Indicates the timeout value in milliseconds or INFINITE for elements in the queue.
dwMilliseconds
Indicates the timeout value in milliseconds or INFINITE for the wait operation when the queue is full.

Return Value and Exceptions

If the function succeeds, the return value is waitSuccess.
If the function notices that the exit event was set, the return value is waitExitEvent.
If the function fails, a CWin32Exception is thrown.

CActiveQueueWithTimeout::Retrieve

The Retrieve member function retrieves an element from the queue.
template<class T, DWORD dwLEN>
CActiveQueueWithTimeout<T, dwLEN>::waitResult 
CActiveQueueWithTimeout<T, dwLEN>::Retrieve(
  T& refLocation, // location to retrieve to
  DWORD dwMilliseconds // timeout
  );

Parameters

refLocation
Refers to the T location that will receive the result.
dwMilliseconds
Indicates the timeout value in milliseconds or INFINITE.

Return Value and Exceptions

If the function succeeds, the return value is waitSuccess.
If the function notices that the exit event was set, the return value is waitExitEvent.
If the function fails, a CWin32Exception is thrown.

CActiveQueueWithTimeout::SetHandleToExitEvent

Places the handle to the exit event in a private member variable.
template<class T, DWORD dwLEN>
void CActiveQueueWithTimeout<T, dwLEN>::SetHandleToExitEvent(
  HANDLE hExitEvent 
  );

CActiveQueueWithTimeout::GetQueueAgentThread

Returns a handle to the queue agent thread. This is guaranteed to be meaningful only after the Start member function has been called. The handle is closed by the destructor.
template<class T, DWORD dwLEN>
HANDLE CActiveQueueWithTimeout<T, dwLEN>::GetQueueAgentThread(
  );

CActiveQueueWithTimeout Exception Handling

The implementation of CActiveQueueWithTimeout uses Win32 APIs. All errors are thus identified by the error code that is returned by the GetLastError() API. When such an error occurs, the CActiveQueueWithTimeout functions throw an exception of type CWin32Exception. CWin32Exception is a simple class that stores the error code as well as the name of the source file and the number of the line where the error occurred. The declaration of CWin32Exception can be found in the file Win32Exception.h.

Win32Exception.h also contains the declaration of a class named CSehException, which can be used to catch operating system exceptions (SEH-exceptions) such as access violations via the C++ exception handling mechanism. See the documentation of the _set_se_translator() function and the article "Structured exception handling" in the C++ Language Reference for a detailed explanation.

CActiveQueueWithTimeout Example

A complete example of an application of the CActiveQueueWithTimeout class can be found in the file MTServerV2.cpp. The following code fragments are excerpts from that file.

The programm creates an instance of the CActiveQueueWithTimeout class as a global variable:

#define NUM_THREADS 3
#define LEN_QUEUE 5
CActiveQueueWithTimeout<DWORD, LEN_QUEUE> 
  cTheQueue(NUM_THREADS, Reconnect) ;
Here, Reconnect is a function that reconnects a named pipe that is identified by an index:
static void Reconnect(DWORD&) ;
The program creates an exit event with handle hExitEvent. This handle is passed to the queue:
cTheQueue.SetHandleToExitEvent(hExitEvent) ;
The queue is started as part of an initialization routine:
DWORD dwRetVal = cTheQueue.Start() ;
A separate thread waits in an infinite loop for clients to connect to an instance of a named pipe. Everytime a client has connected, the index of the pipe instance that the client is using is placed into the queue:
DWORD dwPipeIndex ;
CActiveQueueWithTimeout<DWORD, LEN_QUEUE>::waitResult enInsertRetVal ;

while ( 1 )
{

  //
  // Wait for a client to connect to an instance of a named pipe.
  // When a client has connected, place the index of the pipe
  // pipe index in the dwPipeIndex variable.
  //  

  // Place index dwIndex of connected pipe in the queue.
  //
  enInsertRetVal = cTheQueue.Insert(
    dwPipeIndex,
    QUEUE_TIMEOUT,
    INFINITE
    ) ;

  // Break if exit event was set
  //
  if (CActiveQueueWithTimeout::waitExitEvent 
        == enInsertRetVal) break ;

}
The indices of connected pipe instances are retrieved from the queue by worker threads. These threads perform read and write operations on the pipe instance and then reconnect it:
DWORD dwPipeIndex ;
CActiveQueueWithTimeout<DWORD, LEN_QUEUE>::waitResult enRetrieveRetVal ;

while(1)
{
      
  // Retrieve a pipe index from queue. 
  //
  enRetrieveRetVal = cTheQueue.Retrieve(
    dwPipeIndex,
    INFINITE
    ) ;
      
  // Break if exit event was set
  //
  if (CActiveQueueWithTimeout::waitExitEvent 
    == enRetrieveRetVal) break ;
  
  //
  // Server a client on the named pipe instance with index dwPipeIndex,
  // then reconnect the pipe.
  //

}
In order to obtain an acceptable timeout behavior, the number of available pipe instances should be LEN_QUEUE + NUM_THREADS. That way, a client that connects to a pipe instance will be inserted in the queue with practically no delay. It is thus guatanteed that after QUEUE_TIMEOUT milliseconds, the client will either be served by a worker thread or be disconnected.