Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

ReceiveWindow.cpp

Go to the documentation of this file.
00001 #include "headers.h"
00002 
00010 
00011 
00016 ReceiveWindow::ReceiveWindow(int size, SAR* sar) 
00017 {
00018     m_size = size;
00019     m_window.reserve(m_size);
00020     for (int i = 0; i < m_size; i++) {
00021         m_window.push_back(NULL);
00022     }
00023 
00024     m_nextExpected = WindowPosition(SEQ_NUM_SPACE, 0);
00025     m_lastAccepted = WindowPosition(SEQ_NUM_SPACE, size-1);
00026     m_lastRead = WindowPosition(SEQ_NUM_SPACE);
00027     m_sar = sar;
00028     m_destroy = false;
00029     m_reading = false;
00030     m_closed = false;
00031     //m_firstPacket = true;
00032     m_firstAck = true;
00033     // make sure our internal buffer is big enough to store the excess data of a single packet
00034     m_excessBuffer.reserve(MAX_DATASIZE);
00035 } // constructor
00036 
00037 
00041 ReceiveWindow::~ReceiveWindow()
00042 {
00043     if (!m_destroy) {
00044         destroy();
00045     }
00046 
00047     Guard guard(&m_slidingWindowLock);
00048     
00049     for (int i = 0; i < m_size; i++) {
00050         delete m_window[i];
00051         m_window[i] = NULL;
00052     }
00053 
00054 } // destructor
00055 
00056 
00061 int 
00062 ReceiveWindow::addPacket(TpPacket* packet)
00063 {
00064     WindowPosition pos = WindowPosition(SEQ_NUM_SPACE, packet->getSeqNum());
00065     removePacket(pos.getPosition());
00066     Guard guard(&m_slidingWindowLock);
00067     m_window[pos.getPosition()] = packet;
00068     return pos.getPosition();
00069 } // fn addPacket
00070 
00071 
00076 void 
00077 ReceiveWindow::removePacket(int index)
00078 {
00079     if ((index < 0) || (index > m_size-1)) {
00080         debug(DEBUG_ERR, "sequence number out of range: %d", index);
00081         return;
00082     }
00083 
00084     Guard guard(&m_slidingWindowLock);
00085     if (index < (int)m_window.size()) {
00086         delete m_window[index];
00087         m_window[index] = NULL;
00088     }
00089 } // fn removePacket
00090 
00091 
00095 int
00096 ReceiveWindow::receive(u_char* buffer, int bufferLength) {
00097     return reassemble(buffer, bufferLength);
00098 } // fn receive
00099 
00100 
00105 void
00106 ReceiveWindow::setInitialSequenceNumber(int sequenceNumber) {
00107     m_nextExpected = WindowPosition(SEQ_NUM_SPACE, sequenceNumber);
00108     m_lastAccepted = WindowPosition(SEQ_NUM_SPACE, sequenceNumber + m_size - 1);
00109     m_lastRead = WindowPosition(SEQ_NUM_SPACE, sequenceNumber) - 1;
00110 } // fn setInitialSequenceNumber
00111 
00112 
00116 void
00117 ReceiveWindow::sendAck() {
00118     // find out which seq num to ACK
00119     WindowPosition seqNum = calcMaxAck();
00120 
00121     // figure out how many credits there are
00122     int credits = calcCredits();
00123 
00124     // send the ACK
00125     TpPacket* ackPacket = new TpPacket(credits, seqNum.getSeqNum(), false);
00126     MEMCHECK(ackPacket);
00127     if (m_sar->sendDataPacket(ackPacket) < 0) {
00128         debug(DEBUG_RECVW, "Could not send ACK.");
00129     }
00130     debug(DEBUG_RECVW, "[Sent ACK for packet %d, credits: %d]", seqNum.getSeqNum(), credits);
00131     
00132     // clean up
00133     delete ackPacket;
00134 } // fn sendAck
00135 
00136 
00143 bool 
00144 ReceiveWindow::addReceive(TpPacket* receivedPacket)
00145 {
00146     Guard guard(&m_slidingWindowLock);
00147     debug(DEBUG_RECVW, "[R window: received %d]", receivedPacket->getSeqNum());
00148     bool retval = false;
00149 
00150     if (m_closed || m_destroy) {
00151         debug(DEBUG_RECVW, "Received packet on closed service.");
00152         delete receivedPacket;
00153         return false;
00154     }
00155 
00156     // if this is the first packet received, set our window pointers up
00157     //if (m_firstPacket) {
00158     //    setInitialSequenceNumber(receivedPacket->getSeqNum());
00159     //    m_firstPacket = false;
00160     //}
00161 
00162     // Check if it is a keep-alive packet
00163     if (receivedPacket->getDataLength() == 0) {
00164         // Do nothing.
00165         debug(DEBUG_RECVW, "Got keep-alive packet");
00166     }
00167     // if we cant add the packet to our queue
00168     else if (!canAdd(receivedPacket)) {
00169         debug(DEBUG_RECVW, "Cant add packet");
00170         delete receivedPacket;
00171         retval = false;
00172     }
00173     else {
00174         //debug(DEBUG_RECVW, "Adding packet %d", receivedPacket->getSeqNum());
00175         // add the packet to the sliding window
00176         addPacket(receivedPacket);
00177 
00178         // figure out which packet we should expect next
00179         WindowPosition savedNextExpected = m_nextExpected;
00180         m_nextExpected = (calcMaxAck() + 1);
00181         if ((savedNextExpected + 1) != m_nextExpected) {
00182             debug(DEBUG_RECVW, "Error : Skipped a sequence number.  Why?");
00183             dumpDebug();
00184         }
00185 
00186         // signal waiting threads that we have received some data
00187         m_dataSignal.signal();
00188         retval = true;
00189     }
00190 
00191     // send an ACK to the sender
00192     sendAck();
00193 
00194     dumpDebug();
00195 
00196     return retval;
00197 } // fn addReceive
00198 
00199 
00209 TpPacket* 
00210 ReceiveWindow::getNext(bool blocking)
00211 {
00212     Guard guard(&m_slidingWindowLock);
00213     if (m_reading) {
00214         debug(DEBUG_RECVW, "ERROR ERROR ERROR: You called getNext() twice in a row!!!");
00215     }
00216     ScopedBool sbool(&m_reading, true);
00217     WindowPosition readPosition;
00218 
00219     debug(DEBUG_RECVW, "Before read packet %d (%d)", m_lastRead.getSeqNum(), m_lastRead.getPosition());
00220 
00221     // figure out which packet to read
00222     if (m_lastRead.getSeqNum() == WindowPosition::DONT_CARE) {
00223         readPosition = WindowPosition(SEQ_NUM_SPACE, 0);
00224     }
00225     else {
00226         readPosition = m_lastRead + 1;
00227     }
00228 
00229     //debug(DEBUG_RECVW, "getNext(): closed ?= %s, blocking ?= %s, last seq num read = %d, readPosition = %d", 
00230     //    boolToString(m_closed), boolToString(blocking), m_lastRead.getSeqNum(), readPosition.getPosition());
00231 
00232     // if the connection has not been closed
00233     if (!m_closed) {
00234         if (blocking) {
00235             // Wait for the next expected packet.  Note: we can receive other packets while
00236             // we are waiting for the next one, which is why we have a while loop
00237             while (m_window[readPosition.getPosition()] == NULL) {
00238                 m_dataSignal.wait(&m_slidingWindowLock);
00239                 if (m_destroy) {
00240                     debug(DEBUG_RECVW, "getNext(): Receive window asked to die");
00241                     m_terminate.signal();
00242                     return NULL;
00243                 }
00244                 if (m_closed) {
00245                     debug(DEBUG_RECVW, "getNext(): Connection closed");
00246                     return NULL;
00247                 }
00248             }
00249         }
00250         else {
00251             // not blocking
00252             if (m_window[readPosition.getPosition()] == NULL) {
00253                 debug(DEBUG_RECVW, "getNext(): Not blocking, no pending packets, returning NULL");
00254                 return NULL;
00255             }
00256         }
00257     }
00258     else {
00259         debug(DEBUG_RECVW, "getNext(): Receive window is closed");
00260         if (m_window[readPosition.getPosition()] == NULL) {
00261             debug(DEBUG_RECVW, "getNext(): And no packets left to read");
00262             return NULL;
00263         }
00264     }
00265 
00266     // remember the last packet we read
00267     m_lastRead = readPosition;
00268 
00269     // Take the packet out of the window
00270     TpPacket* receivedPacket = m_window[readPosition.getPosition()];
00271 
00272     // mark the entry NULL - note we cant use "removePacket" here because it deletes the memory
00273     m_window[readPosition.getPosition()] = NULL;
00274     
00275     // we can now accept another packet
00276     m_lastAccepted.incrementSeqNum();
00277 
00278     if (receivedPacket == NULL) {
00279         debug(DEBUG_RECVW, "MAJOR ERROR!!! RECEIVED PACKET IS NULL!!!! NOT POSSIBLE!!!!");
00280     }
00281 
00282     debug(DEBUG_RECVW, "Read packet %d (%d)", readPosition.getSeqNum(), readPosition.getPosition());
00283     dumpDebug();
00284     return receivedPacket;
00285 } // fn getNext
00286 
00287 
00295 bool 
00296 ReceiveWindow::canAdd(TpPacket* packet)
00297 {
00298     WindowPosition num(SEQ_NUM_SPACE, packet->getSeqNum());
00299     Guard guard(&m_slidingWindowLock);
00300     bool retval = num.withinRange(m_nextExpected, (m_lastAccepted-1));
00301     if (!retval) {
00302         debug(DEBUG_RECVW, "Cannot add packet because %d(%d) is not within (%d, %d)", 
00303             num.getPosition(), packet->getSeqNum(), m_nextExpected.getPosition(), (m_lastAccepted-1).getPosition());
00304     }
00305     return retval;
00306 } // fn canAdd
00307 
00308 
00318 int
00319 ReceiveWindow::calcCredits()
00320 {
00321     int credits = 0;
00322     int count = 0;
00323     WindowPosition i = m_nextExpected;
00324     while (i.withinRange(m_nextExpected, m_lastAccepted) && (count < m_size)) {
00325     //for (WindowPosition i = m_lastAccepted; i.getSeqNum() >= m_nextExpected.getSeqNum(); i.decrementSeqNum()) {
00326         // if there is a hole in the window
00327         if (m_window[i.getPosition()] == NULL) {
00328             // add a credit
00329             ++credits;
00330         }
00331         i.incrementSeqNum();
00332         ++count;
00333     }
00334 
00335     //debug(DEBUG_RECVW, "calcCredits(): %d", credits);
00336     return credits;
00337 } // fn calcCredits
00338 
00339 
00346 WindowPosition
00347 ReceiveWindow::calcMaxAck() {
00348     bool allFull = true;
00349     
00350     WindowPosition maxAck;
00351 
00352     // Set the default value to ACK.
00353     if (m_firstAck) {
00354         // If this is the first ACK, start at zero.
00355         m_firstAck = false;
00356         maxAck = 0;
00357     }
00358     else {
00359         maxAck = m_nextExpected - 1;
00360     }
00361 
00362     //if (m_nextExpected.getSeqNum() != 0) {
00363     //    maxAck = m_nextExpected - 1;
00364     //}
00365     //else {
00366     //    maxAck = m_nextExpected;
00367     //}
00368 
00369     WindowPosition stop = m_lastAccepted;
00370     stop.incrementSeqNum();
00371 
00372     for (WindowPosition i = m_nextExpected; i.getSeqNum() != stop.getSeqNum(); i.incrementSeqNum()) {
00373         // if there is a hole in the window
00374         if (m_window[i.getPosition()] == NULL) {
00375             break;
00376         }
00377 
00378         // remember the maximum position in the window we can ACK
00379         maxAck = i;
00380     }
00381 
00382     debug(DEBUG_RECVW, "calcMaxAck(): m_nextExpected = %d, m_lastAccepted = %d, maxAck = %d", 
00383         m_nextExpected.getSeqNum(), m_lastAccepted.getSeqNum(), maxAck.getSeqNum());
00384     // maxAck is pointing to the spot before the first hole
00385     return maxAck;
00386 } // fn calcMaxAck
00387 
00388 
00403 int 
00404 ReceiveWindow::reassemble(u_char* buffer, int bufferLength)
00405 {
00406     //debug(DEBUG_RECVW, "[Reassembling]");
00407 
00408     int remainingBufferBytes = bufferLength;
00409     int currentPosition = 0;
00410 
00411     //
00412     // if there was any data left over from last time, put that in the buffer first
00413     //
00414     if (!m_excessBuffer.empty()) {
00415         //debug(DEBUG_RECVW, "Copying leftover data (%d bytes) from previous call into buffer", m_excessBuffer.size());
00416         // find out how much we can copy into the buffer
00417         int copyAmount = (bufferLength > (int)m_excessBuffer.size()) ? m_excessBuffer.size() : bufferLength;
00418         // copy it
00419         memcpy(&buffer[0], &m_excessBuffer[0], copyAmount);
00420         // did we copy it all?
00421         if (copyAmount < (int)m_excessBuffer.size()) {
00422             // No, save the rest of the data for later
00423             debug(DEBUG_RECVW, "Could not fit excess data into supplied buffer - you should really consider increasing the size of the buffer you passed in.");
00424             vector<u_char> tmp;
00425             vector<u_char>::iterator begin = m_excessBuffer.begin();
00426             copy(begin + copyAmount, m_excessBuffer.end(), tmp.begin());
00427             copy(tmp.begin(), tmp.end(), m_excessBuffer.begin());
00428             return copyAmount;
00429         }
00430         else {
00431             // Yes, adjust the buffer pointers
00432             remainingBufferBytes -= copyAmount;
00433             currentPosition += copyAmount;
00434 
00435             // clear excess buffer
00436             m_excessBuffer.clear();
00437         }
00438     }
00439 
00440     // this is for the case where the other side has closed connection,
00441     // but the application hasnt read all the data yet
00442     if ((currentPosition > 0) && (m_closed)) {
00443         return currentPosition;
00444     }
00445     
00446     //
00447     // take as many packets as you can out of the receive window, 
00448     // up to the maximum we can fit into the supplied buffer
00449     // and always make sure you get at least one
00450     //
00451     TpPacket* next;
00452     ThreadMessageQueue<TpPacket> queue;
00453     int dataLength = 0;
00454 
00455     bool blocking = true;
00456     bool done = false;
00457     do {
00458         next = getNext(blocking);
00459         if (next != NULL) {
00460             queue.add(next);
00461             dataLength += next->getDataLength();
00462             done = (dataLength > remainingBufferBytes);
00463         }
00464         blocking = false;
00465     } while( (next != NULL) && (!done) );
00466     
00467     // could have been killed at this point, but we want to return any data we got
00468     // (the only way the queue can be empty is if we have been killed)
00469     if (queue.isEmpty() && (currentPosition > 0) ) {
00470         debug(DEBUG_RECVW, "No packets read, but returning some leftover data");
00471         // the data has already been copied into supplied buffer
00472         return currentPosition;
00473     }
00474 
00475     // Otherwise, we want to return an error
00476     if (queue.isEmpty()) {
00477         debug(DEBUG_RECVW, "Connection has been closed.");
00478         return PB_UNREACHABLE;
00479     }
00480     
00481     //debug(DEBUG_RECVW, "[Reassemble] Message length: %d, Remaining buffer length: %d", dataLength, remainingBufferBytes);
00482     
00483     //
00484     // copy as much as we can into supplied buffer
00485     //
00486     int copyAmount;
00487     int packetLength;
00488     TpPacket* currentPacket;
00489     // while we still have room in supplied buffer and there are still packets to process
00490     while ( (remainingBufferBytes > 0) && (!queue.isEmpty()) ) {
00491         // get a packet out of the queue
00492         currentPacket = (TpPacket*)queue.getNext();
00493         packetLength = currentPacket->getDataLength();
00494 
00495         // figure out how much of current packet we can copy into the buffer
00496         copyAmount = (remainingBufferBytes > packetLength) ? packetLength : remainingBufferBytes ;
00497 
00498         // copy as much as we can into the buffer
00499         memcpy(buffer + currentPosition, currentPacket->getData(), copyAmount);
00500 
00501         // adjust indexes into buffer
00502         remainingBufferBytes -= copyAmount;
00503         currentPosition += copyAmount;
00504 
00505         // if we copied the whole packet, delete it
00506         if (copyAmount == packetLength) {
00507             delete currentPacket;
00508         }
00509     }
00510 
00511     //
00512     // we have at most one excess packet
00513     // copy anything we couldnt fit into supplied buffer into internal buffer
00514     //
00515     if (copyAmount < packetLength) {
00516         memcpy(&m_excessBuffer[0], currentPacket->getData() + copyAmount, packetLength - copyAmount);
00517         m_excessBuffer.resize(packetLength - copyAmount);
00518         //debug(DEBUG_RECVW, "Saving %d bytes for later", packetLength - copyAmount);
00519         delete currentPacket;
00520     }
00521 
00522     //debug(DEBUG_RECVW, "[Reassemble complete]");
00523 
00524     return currentPosition;
00525 } //fn reassemble
00526 
00527 
00531 void 
00532 ReceiveWindow::destroy()
00533 {
00534     Guard guard(&m_slidingWindowLock);
00535     debug(DEBUG_RECVW, "Receive window being destroyed");
00536     m_destroy = true;
00537     m_closed = true;
00538     if (m_reading) {
00539         int i = 0;
00540         do {
00541             m_dataSignal.signal();
00542         } while (!m_terminate.timedWait(&m_slidingWindowLock, 100) && (++i < 10));
00543     }
00544 } // fn destroy
00545 
00546 
00551 void
00552 ReceiveWindow::close() {
00553     Guard guard(&m_slidingWindowLock);
00554     debug(DEBUG_RECVW, "Receive Window being closed");
00555     m_closed = true;
00556     m_dataSignal.signal();
00557 } // fn close
00558 
00559 
00563 void
00564 ReceiveWindow::dumpDebug() {
00565 #ifdef DEBUG_ON
00566     stringstream tmp;
00567     tmp << *this;
00568     debug(DEBUG_RECVW, "Current state of receive window:\n%s", tmp.str().c_str());
00569 #endif
00570 } // fn dumpDebug
00571 
00572 
00576 void
00577 ReceiveWindow::toStream(std::ostream& out)
00578 {
00579     Guard guard(&m_slidingWindowLock);
00580     out << "---------- RECEIVE WINDOW ----------" << "\n";
00581     for (int i = 0; i < m_size; i++) {
00582         if (m_window[i] != NULL) {
00583             out << m_window[i]->getSeqNum();
00584         }
00585         else {
00586             out << "[null]";
00587         }
00588         if (m_nextExpected.getPosition() == i) {
00589             out << "  <-- (next expected packet)";
00590         }
00591         if (m_lastAccepted.getPosition() == i) {
00592             out << "  <-- (last acceptable packet)";
00593         }
00594         if (m_lastRead.getPosition() == i) {
00595             out << "  <-- (last packet read)";
00596         }
00597         out << "\n";
00598     }
00599     out << "------------------------------------\n";
00600 } // fn toStream
00601 
00602 

Generated at Thu Jul 11 13:31:51 2002 for Peekabooty by doxygen1.2.9 written by Dimitri van Heesch, © 1997-2001