A C++ class that provides a thread-safe FIFO queue with timeout.
by
Dr. Thomas Becker
tmbecker@compuserve.com
COPYRIGHT (C) 1997 Thomas BeckerPERMISSION NOTICE:
PERMISSION TO USE, COPY, MODIFY, AND DISTRIBUTE THIS SOFTWARE FOR ANY PURPOSE AND WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT ALL COPIES ARE ACCOMPANIED BY THE COMPLETE MACHINE-READABLE SOURCE CODE, ALL MODIFIED FILES CARRY PROMINENT NOTICES AS TO WHEN AND BY WHOM THEY WERE MODIFIED, THE ABOVE COPYRIGHT NOTICE, THIS PERMISSION NOTICE AND THE NO-WARRANTY NOTICE BELOW APPEAR IN ALL SOURCE FILES AND IN ALL SUPPORTING DOCUMENTATION.
NO-WARRANTY NOTICE:
THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT EXPRESS OR IMPLIED WARRANTY. ALL IMPLIED WARRANTIES OF FITNESS FOR ANY PARTICULAR PURPOSE AND OF MERCHANTABILITY ARE HEREBY DISCLAIMED.
The first template argument indicates the type of the queue elements, the second one indicates the length of the queue. It is easy to change the source code so that the length of the queue can be set at runtime: make the template argument a class member that is set by the constructor, find the places where it occurs as the length parameter in array declarations, and replace these array declarations with heap allocations using the new operator.
The constructor takes as its first mandatory argument the number of worker threads that will be retrieving elements from the queue. If this number is less than the actual number of worker threads, the Retrieve member function will throw a CWin32Exception with error code 298L (ERROR_TOO_MANY_POSTS). If this exception is caught and ignored, the queue will operate, but it will allow only the specified number of worker threads to operate on queue elements simultaneously.
The second mandatory argument to the constructor is a function pointer of type void (*)(T&), where T is the type of the queue elements. This function will be called on elements that are removed from the queue because their timeout interval has elapsed. Typically, this function reconnects a communications device such as a named pipe.
The third argument to the constructor is optional. It is the handle to an exit event. Setting this event will cause the Insert and Retrieve member functions to return immediately and terminate the agent thread. This event handle can also be specified later by means of the SetHandleToExitEvent member function. Note that an exit event must be specified before any of the member functions can be called.
The Start member function starts the agent thread that administers the queue. This function must be called before the first call to the Insert and Retrieve member function. The Start member function will also check the handles that were created by the constructor and throw CWin32Exceptions in case something went wrong.
The Insert member function inserts an element into the queue. This is the function that is typically called by a listening thread. Such a thread accepts clients from a communications device and then places in the queue a pointer or an array index that identifies e.g. a socket descriptor or a pipe handle. Elements are always copied into the queue; therefore, the element type must possess a public copy constructor if it is a user-defined class. The Insert member function takes two timeout values as arguments: the timeout for the element once it has entered the queue, and the timeout for the Insert member function itself if it has to wait because the queue is full.
The Retrieve member function retrieves an element from the queue. This is the function that is typically called by worker threads to retrieve a pointer or an array index leading to a communications device.
The GetQueueAgentThread member function returns a handle to the agent thread that administers the queue, so that a supervising thread can watch it.
All CActiveQueueWithTimeout member functions throw CWin32Exceptions if Win32 errors are encountered. The agent thread terminates with the last error code as its return value if it encounters a Win32 error.
If the destructor encounters an error, it throws a CWin32Exception but does not propagate it.
waitResult | Return values from functions that perform wait operations. |
CActiveQueueWithTimeout | Constructs a CActiveQueueWithTimeout object. |
Start | Starts the queue. |
Insert | Inserts an element into the queue. |
Retrieve | Retrieves an element from the queue. |
SetHandleToExitEvent | Places the handle to the exit event in a private member variable. |
GetQueueAgentThread | Returns a handle to the queue agent thread. |
enum waitResult { waitTimeout, // timeout occurred waitExitEvent, // exit event was set waitSuccess // operation completed successfully } ;
template<class T, DWORD dwLEN> CActiveQueueWithTimeout<T, dwLEN>::CActiveQueueWithTimeout( DWORD dwNumWorkerThreads, // number of working threads void (*DealWithTimedOutElement)(T&), // function pointer HANDLE hExitEvent // = NULL exit event );
template<class T, DWORD dwLEN> CActiveQueueWithTimeout<T, dwLEN>::waitResult CActiveQueueWithTimeout<T, dwLEN>::Start( DWORD dwMilliseconds // = INFINITE timeout );
template<class T, DWORD dwLEN> CActiveQueueWithTimeout<T, dwLEN>::waitResult CActiveQueueWithTimeout<T, dwLEN>::Insert( const T& refNewElement, // new element DWORD dwMillisecondsInQueue, // timeout in queue DWORD dwMilliseconds // timeout );
template<class T, DWORD dwLEN> CActiveQueueWithTimeout<T, dwLEN>::waitResult CActiveQueueWithTimeout<T, dwLEN>::Retrieve( T& refLocation, // location to retrieve to DWORD dwMilliseconds // timeout );
template<class T, DWORD dwLEN> void CActiveQueueWithTimeout<T, dwLEN>::SetHandleToExitEvent( HANDLE hExitEvent );
template<class T, DWORD dwLEN> HANDLE CActiveQueueWithTimeout<T, dwLEN>::GetQueueAgentThread( );
Win32Exception.h also contains the declaration of a class named CSehException, which can be used to catch operating system exceptions (SEH-exceptions) such as access violations via the C++ exception handling mechanism. See the documentation of the _set_se_translator() function and the article "Structured exception handling" in the C++ Language Reference for a detailed explanation.
The programm creates an instance of the CActiveQueueWithTimeout class as a global variable:
#define NUM_THREADS 3 #define LEN_QUEUE 5 CActiveQueueWithTimeout<DWORD, LEN_QUEUE> cTheQueue(NUM_THREADS, Reconnect) ;Here, Reconnect is a function that reconnects a named pipe that is identified by an index:
static void Reconnect(DWORD&) ;The program creates an exit event with handle hExitEvent. This handle is passed to the queue:
cTheQueue.SetHandleToExitEvent(hExitEvent) ;The queue is started as part of an initialization routine:
DWORD dwRetVal = cTheQueue.Start() ;A separate thread waits in an infinite loop for clients to connect to an instance of a named pipe. Everytime a client has connected, the index of the pipe instance that the client is using is placed into the queue:
DWORD dwPipeIndex ; CActiveQueueWithTimeout<DWORD, LEN_QUEUE>::waitResult enInsertRetVal ; while ( 1 ) { // // Wait for a client to connect to an instance of a named pipe. // When a client has connected, place the index of the pipe // pipe index in the dwPipeIndex variable. // // Place index dwIndex of connected pipe in the queue. // enInsertRetVal = cTheQueue.Insert( dwPipeIndex, QUEUE_TIMEOUT, INFINITE ) ; // Break if exit event was set // if (CActiveQueueWithTimeoutThe indices of connected pipe instances are retrieved from the queue by worker threads. These threads perform read and write operations on the pipe instance and then reconnect it:::waitExitEvent == enInsertRetVal) break ; }
DWORD dwPipeIndex ; CActiveQueueWithTimeout<DWORD, LEN_QUEUE>::waitResult enRetrieveRetVal ; while(1) { // Retrieve a pipe index from queue. // enRetrieveRetVal = cTheQueue.Retrieve( dwPipeIndex, INFINITE ) ; // Break if exit event was set // if (CActiveQueueWithTimeoutIn order to obtain an acceptable timeout behavior, the number of available pipe instances should be LEN_QUEUE + NUM_THREADS. That way, a client that connects to a pipe instance will be inserted in the queue with practically no delay. It is thus guatanteed that after QUEUE_TIMEOUT milliseconds, the client will either be served by a worker thread or be disconnected.::waitExitEvent == enRetrieveRetVal) break ; // // Server a client on the named pipe instance with index dwPipeIndex, // then reconnect the pipe. // }