00001 #include "headers.h" 00002 00013 00014 PriorityPacketQueue::PriorityPacketQueue() 00015 { 00016 m_dead = false; 00017 m_dataQueue = new ThreadMessageQueue<PacketInfo>(); 00018 MEMCHECK(m_dataQueue); 00019 debug(DEBUG_PPQ, "data packet queue created"); 00020 00021 m_controlQueue = new ThreadMessageQueue<PacketInfo>(); 00022 MEMCHECK(m_controlQueue); 00023 debug(DEBUG_PPQ, "control packet queue created"); 00024 } // ctor 00025 00026 00027 PriorityPacketQueue::~PriorityPacketQueue() 00028 { 00029 m_dead = true; 00030 m_ready.signal(); 00031 m_ready.timedWait(&m_lock, 1000); 00032 00033 delete m_dataQueue; 00034 m_dataQueue = NULL; 00035 delete m_controlQueue; 00036 m_controlQueue = NULL; 00037 } // detour 00038 00039 00043 void 00044 PriorityPacketQueue::getNext(Node** fromNode, NpPacket** packet) 00045 { 00046 Guard guard(&m_lock); 00047 00048 PacketInfo* packetInfo; 00049 00050 while (m_controlQueue->isEmpty() && m_dataQueue->isEmpty()) 00051 { 00052 m_ready.wait(&m_lock); 00053 if (m_dead) 00054 { 00055 m_dead = false; 00056 m_ready.signal(); 00057 packet = NULL; 00058 return; 00059 } 00060 } 00061 00062 if (!m_controlQueue->isEmpty()) 00063 { 00064 packetInfo = m_controlQueue->getNext(); 00065 } 00066 else 00067 { 00068 packetInfo = m_dataQueue->getNext(); 00069 } 00070 00071 *fromNode = packetInfo->getFromNode(); 00072 *packet = packetInfo->getPacket(); 00073 delete packetInfo; 00074 } // fn getNext 00075 00076 00080 void 00081 PriorityPacketQueue::add(Node* fromNode, NpPacket* packet) 00082 { 00083 // check args 00084 if ((fromNode == NULL) || (packet == NULL)) { 00085 return; 00086 } 00087 00088 Guard guard(&m_lock); 00089 00090 PacketInfo* packetInfo = new PacketInfo(fromNode, packet); 00091 00092 // Insert the packet depending on whether its a control or data packet 00093 // Put it in the proper queue. 00094 //if ((packetInfo->getPacket())->isControlPacket()) 00095 //{ 00096 // m_controlQueue->add(packetInfo); 00097 //} 00098 //else 00099 //{ 00100 // m_dataQueue->add(packetInfo); 00101 //} 00102 m_dataQueue->add(packetInfo); 00103 00104 m_ready.signal(); 00105 00106 } // fn add 00107 00108 00112 bool 00113 PriorityPacketQueue::isEmpty() 00114 { 00115 return (m_controlQueue->isEmpty() && m_dataQueue->isEmpty()); 00116 } // fn isEmpty 00117 00118 00122 void 00123 PriorityPacketQueue::print(std::ostream& out) 00124 { 00125 m_controlQueue->print(out); 00126 m_dataQueue->print(out); 00127 } // fn print 00128