00001 #include "headers.h"
00002
00010
00011
00016 ReceiveWindow::ReceiveWindow(int size, SAR* sar)
00017 {
00018 m_size = size;
00019 m_window.reserve(m_size);
00020 for (int i = 0; i < m_size; i++) {
00021 m_window.push_back(NULL);
00022 }
00023
00024 m_nextExpected = WindowPosition(SEQ_NUM_SPACE, 0);
00025 m_lastAccepted = WindowPosition(SEQ_NUM_SPACE, size-1);
00026 m_lastRead = WindowPosition(SEQ_NUM_SPACE);
00027 m_sar = sar;
00028 m_destroy = false;
00029 m_reading = false;
00030 m_closed = false;
00031
00032 m_firstAck = true;
00033
00034 m_excessBuffer.reserve(MAX_DATASIZE);
00035 }
00036
00037
00041 ReceiveWindow::~ReceiveWindow()
00042 {
00043 if (!m_destroy) {
00044 destroy();
00045 }
00046
00047 Guard guard(&m_slidingWindowLock);
00048
00049 for (int i = 0; i < m_size; i++) {
00050 delete m_window[i];
00051 m_window[i] = NULL;
00052 }
00053
00054 }
00055
00056
00061 int
00062 ReceiveWindow::addPacket(TpPacket* packet)
00063 {
00064 WindowPosition pos = WindowPosition(SEQ_NUM_SPACE, packet->getSeqNum());
00065 removePacket(pos.getPosition());
00066 Guard guard(&m_slidingWindowLock);
00067 m_window[pos.getPosition()] = packet;
00068 return pos.getPosition();
00069 }
00070
00071
00076 void
00077 ReceiveWindow::removePacket(int index)
00078 {
00079 if ((index < 0) || (index > m_size-1)) {
00080 debug(DEBUG_ERR, "sequence number out of range: %d", index);
00081 return;
00082 }
00083
00084 Guard guard(&m_slidingWindowLock);
00085 if (index < (int)m_window.size()) {
00086 delete m_window[index];
00087 m_window[index] = NULL;
00088 }
00089 }
00090
00091
00095 int
00096 ReceiveWindow::receive(u_char* buffer, int bufferLength) {
00097 return reassemble(buffer, bufferLength);
00098 }
00099
00100
00105 void
00106 ReceiveWindow::setInitialSequenceNumber(int sequenceNumber) {
00107 m_nextExpected = WindowPosition(SEQ_NUM_SPACE, sequenceNumber);
00108 m_lastAccepted = WindowPosition(SEQ_NUM_SPACE, sequenceNumber + m_size - 1);
00109 m_lastRead = WindowPosition(SEQ_NUM_SPACE, sequenceNumber) - 1;
00110 }
00111
00112
00116 void
00117 ReceiveWindow::sendAck() {
00118
00119 WindowPosition seqNum = calcMaxAck();
00120
00121
00122 int credits = calcCredits();
00123
00124
00125 TpPacket* ackPacket = new TpPacket(credits, seqNum.getSeqNum(), false);
00126 MEMCHECK(ackPacket);
00127 if (m_sar->sendDataPacket(ackPacket) < 0) {
00128 debug(DEBUG_RECVW, "Could not send ACK.");
00129 }
00130 debug(DEBUG_RECVW, "[Sent ACK for packet %d, credits: %d]", seqNum.getSeqNum(), credits);
00131
00132
00133 delete ackPacket;
00134 }
00135
00136
00143 bool
00144 ReceiveWindow::addReceive(TpPacket* receivedPacket)
00145 {
00146 Guard guard(&m_slidingWindowLock);
00147 debug(DEBUG_RECVW, "[R window: received %d]", receivedPacket->getSeqNum());
00148 bool retval = false;
00149
00150 if (m_closed || m_destroy) {
00151 debug(DEBUG_RECVW, "Received packet on closed service.");
00152 delete receivedPacket;
00153 return false;
00154 }
00155
00156
00157
00158
00159
00160
00161
00162
00163 if (receivedPacket->getDataLength() == 0) {
00164
00165 debug(DEBUG_RECVW, "Got keep-alive packet");
00166 }
00167
00168 else if (!canAdd(receivedPacket)) {
00169 debug(DEBUG_RECVW, "Cant add packet");
00170 delete receivedPacket;
00171 retval = false;
00172 }
00173 else {
00174
00175
00176 addPacket(receivedPacket);
00177
00178
00179 WindowPosition savedNextExpected = m_nextExpected;
00180 m_nextExpected = (calcMaxAck() + 1);
00181 if ((savedNextExpected + 1) != m_nextExpected) {
00182 debug(DEBUG_RECVW, "Error : Skipped a sequence number. Why?");
00183 dumpDebug();
00184 }
00185
00186
00187 m_dataSignal.signal();
00188 retval = true;
00189 }
00190
00191
00192 sendAck();
00193
00194 dumpDebug();
00195
00196 return retval;
00197 }
00198
00199
00209 TpPacket*
00210 ReceiveWindow::getNext(bool blocking)
00211 {
00212 Guard guard(&m_slidingWindowLock);
00213 if (m_reading) {
00214 debug(DEBUG_RECVW, "ERROR ERROR ERROR: You called getNext() twice in a row!!!");
00215 }
00216 ScopedBool sbool(&m_reading, true);
00217 WindowPosition readPosition;
00218
00219 debug(DEBUG_RECVW, "Before read packet %d (%d)", m_lastRead.getSeqNum(), m_lastRead.getPosition());
00220
00221
00222 if (m_lastRead.getSeqNum() == WindowPosition::DONT_CARE) {
00223 readPosition = WindowPosition(SEQ_NUM_SPACE, 0);
00224 }
00225 else {
00226 readPosition = m_lastRead + 1;
00227 }
00228
00229
00230
00231
00232
00233 if (!m_closed) {
00234 if (blocking) {
00235
00236
00237 while (m_window[readPosition.getPosition()] == NULL) {
00238 m_dataSignal.wait(&m_slidingWindowLock);
00239 if (m_destroy) {
00240 debug(DEBUG_RECVW, "getNext(): Receive window asked to die");
00241 m_terminate.signal();
00242 return NULL;
00243 }
00244 if (m_closed) {
00245 debug(DEBUG_RECVW, "getNext(): Connection closed");
00246 return NULL;
00247 }
00248 }
00249 }
00250 else {
00251
00252 if (m_window[readPosition.getPosition()] == NULL) {
00253 debug(DEBUG_RECVW, "getNext(): Not blocking, no pending packets, returning NULL");
00254 return NULL;
00255 }
00256 }
00257 }
00258 else {
00259 debug(DEBUG_RECVW, "getNext(): Receive window is closed");
00260 if (m_window[readPosition.getPosition()] == NULL) {
00261 debug(DEBUG_RECVW, "getNext(): And no packets left to read");
00262 return NULL;
00263 }
00264 }
00265
00266
00267 m_lastRead = readPosition;
00268
00269
00270 TpPacket* receivedPacket = m_window[readPosition.getPosition()];
00271
00272
00273 m_window[readPosition.getPosition()] = NULL;
00274
00275
00276 m_lastAccepted.incrementSeqNum();
00277
00278 if (receivedPacket == NULL) {
00279 debug(DEBUG_RECVW, "MAJOR ERROR!!! RECEIVED PACKET IS NULL!!!! NOT POSSIBLE!!!!");
00280 }
00281
00282 debug(DEBUG_RECVW, "Read packet %d (%d)", readPosition.getSeqNum(), readPosition.getPosition());
00283 dumpDebug();
00284 return receivedPacket;
00285 }
00286
00287
00295 bool
00296 ReceiveWindow::canAdd(TpPacket* packet)
00297 {
00298 WindowPosition num(SEQ_NUM_SPACE, packet->getSeqNum());
00299 Guard guard(&m_slidingWindowLock);
00300 bool retval = num.withinRange(m_nextExpected, (m_lastAccepted-1));
00301 if (!retval) {
00302 debug(DEBUG_RECVW, "Cannot add packet because %d(%d) is not within (%d, %d)",
00303 num.getPosition(), packet->getSeqNum(), m_nextExpected.getPosition(), (m_lastAccepted-1).getPosition());
00304 }
00305 return retval;
00306 }
00307
00308
00318 int
00319 ReceiveWindow::calcCredits()
00320 {
00321 int credits = 0;
00322 int count = 0;
00323 WindowPosition i = m_nextExpected;
00324 while (i.withinRange(m_nextExpected, m_lastAccepted) && (count < m_size)) {
00325
00326
00327 if (m_window[i.getPosition()] == NULL) {
00328
00329 ++credits;
00330 }
00331 i.incrementSeqNum();
00332 ++count;
00333 }
00334
00335
00336 return credits;
00337 }
00338
00339
00346 WindowPosition
00347 ReceiveWindow::calcMaxAck() {
00348 bool allFull = true;
00349
00350 WindowPosition maxAck;
00351
00352
00353 if (m_firstAck) {
00354
00355 m_firstAck = false;
00356 maxAck = 0;
00357 }
00358 else {
00359 maxAck = m_nextExpected - 1;
00360 }
00361
00362
00363
00364
00365
00366
00367
00368
00369 WindowPosition stop = m_lastAccepted;
00370 stop.incrementSeqNum();
00371
00372 for (WindowPosition i = m_nextExpected; i.getSeqNum() != stop.getSeqNum(); i.incrementSeqNum()) {
00373
00374 if (m_window[i.getPosition()] == NULL) {
00375 break;
00376 }
00377
00378
00379 maxAck = i;
00380 }
00381
00382 debug(DEBUG_RECVW, "calcMaxAck(): m_nextExpected = %d, m_lastAccepted = %d, maxAck = %d",
00383 m_nextExpected.getSeqNum(), m_lastAccepted.getSeqNum(), maxAck.getSeqNum());
00384
00385 return maxAck;
00386 }
00387
00388
00403 int
00404 ReceiveWindow::reassemble(u_char* buffer, int bufferLength)
00405 {
00406
00407
00408 int remainingBufferBytes = bufferLength;
00409 int currentPosition = 0;
00410
00411
00412
00413
00414 if (!m_excessBuffer.empty()) {
00415
00416
00417 int copyAmount = (bufferLength > (int)m_excessBuffer.size()) ? m_excessBuffer.size() : bufferLength;
00418
00419 memcpy(&buffer[0], &m_excessBuffer[0], copyAmount);
00420
00421 if (copyAmount < (int)m_excessBuffer.size()) {
00422
00423 debug(DEBUG_RECVW, "Could not fit excess data into supplied buffer - you should really consider increasing the size of the buffer you passed in.");
00424 vector<u_char> tmp;
00425 vector<u_char>::iterator begin = m_excessBuffer.begin();
00426 copy(begin + copyAmount, m_excessBuffer.end(), tmp.begin());
00427 copy(tmp.begin(), tmp.end(), m_excessBuffer.begin());
00428 return copyAmount;
00429 }
00430 else {
00431
00432 remainingBufferBytes -= copyAmount;
00433 currentPosition += copyAmount;
00434
00435
00436 m_excessBuffer.clear();
00437 }
00438 }
00439
00440
00441
00442 if ((currentPosition > 0) && (m_closed)) {
00443 return currentPosition;
00444 }
00445
00446
00447
00448
00449
00450
00451 TpPacket* next;
00452 ThreadMessageQueue<TpPacket> queue;
00453 int dataLength = 0;
00454
00455 bool blocking = true;
00456 bool done = false;
00457 do {
00458 next = getNext(blocking);
00459 if (next != NULL) {
00460 queue.add(next);
00461 dataLength += next->getDataLength();
00462 done = (dataLength > remainingBufferBytes);
00463 }
00464 blocking = false;
00465 } while( (next != NULL) && (!done) );
00466
00467
00468
00469 if (queue.isEmpty() && (currentPosition > 0) ) {
00470 debug(DEBUG_RECVW, "No packets read, but returning some leftover data");
00471
00472 return currentPosition;
00473 }
00474
00475
00476 if (queue.isEmpty()) {
00477 debug(DEBUG_RECVW, "Connection has been closed.");
00478 return PB_UNREACHABLE;
00479 }
00480
00481
00482
00483
00484
00485
00486 int copyAmount;
00487 int packetLength;
00488 TpPacket* currentPacket;
00489
00490 while ( (remainingBufferBytes > 0) && (!queue.isEmpty()) ) {
00491
00492 currentPacket = (TpPacket*)queue.getNext();
00493 packetLength = currentPacket->getDataLength();
00494
00495
00496 copyAmount = (remainingBufferBytes > packetLength) ? packetLength : remainingBufferBytes ;
00497
00498
00499 memcpy(buffer + currentPosition, currentPacket->getData(), copyAmount);
00500
00501
00502 remainingBufferBytes -= copyAmount;
00503 currentPosition += copyAmount;
00504
00505
00506 if (copyAmount == packetLength) {
00507 delete currentPacket;
00508 }
00509 }
00510
00511
00512
00513
00514
00515 if (copyAmount < packetLength) {
00516 memcpy(&m_excessBuffer[0], currentPacket->getData() + copyAmount, packetLength - copyAmount);
00517 m_excessBuffer.resize(packetLength - copyAmount);
00518
00519 delete currentPacket;
00520 }
00521
00522
00523
00524 return currentPosition;
00525 }
00526
00527
00531 void
00532 ReceiveWindow::destroy()
00533 {
00534 Guard guard(&m_slidingWindowLock);
00535 debug(DEBUG_RECVW, "Receive window being destroyed");
00536 m_destroy = true;
00537 m_closed = true;
00538 if (m_reading) {
00539 int i = 0;
00540 do {
00541 m_dataSignal.signal();
00542 } while (!m_terminate.timedWait(&m_slidingWindowLock, 100) && (++i < 10));
00543 }
00544 }
00545
00546
00551 void
00552 ReceiveWindow::close() {
00553 Guard guard(&m_slidingWindowLock);
00554 debug(DEBUG_RECVW, "Receive Window being closed");
00555 m_closed = true;
00556 m_dataSignal.signal();
00557 }
00558
00559
00563 void
00564 ReceiveWindow::dumpDebug() {
00565 #ifdef DEBUG_ON
00566 stringstream tmp;
00567 tmp << *this;
00568 debug(DEBUG_RECVW, "Current state of receive window:\n%s", tmp.str().c_str());
00569 #endif
00570 }
00571
00572
00576 void
00577 ReceiveWindow::toStream(std::ostream& out)
00578 {
00579 Guard guard(&m_slidingWindowLock);
00580 out << "---------- RECEIVE WINDOW ----------" << "\n";
00581 for (int i = 0; i < m_size; i++) {
00582 if (m_window[i] != NULL) {
00583 out << m_window[i]->getSeqNum();
00584 }
00585 else {
00586 out << "[null]";
00587 }
00588 if (m_nextExpected.getPosition() == i) {
00589 out << " <-- (next expected packet)";
00590 }
00591 if (m_lastAccepted.getPosition() == i) {
00592 out << " <-- (last acceptable packet)";
00593 }
00594 if (m_lastRead.getPosition() == i) {
00595 out << " <-- (last packet read)";
00596 }
00597 out << "\n";
00598 }
00599 out << "------------------------------------\n";
00600 }
00601
00602