Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

ServiceTable.cpp

Go to the documentation of this file.
00001 #include "headers.h"
00002 
00015 
00019 ServiceTable::ServiceTable()
00020 {
00021         // m_serviceCount is used to find unused service numbers.  
00022     m_serviceCount.setTo(1);
00023         // used to assign a number to a connection, called the connection descriptor.  
00024     m_descriptorCount.setTo(1);
00025         // create the networkLayer
00026     m_networkLayer = new NetworkLayer(this);
00027     MEMCHECK(m_networkLayer);
00028 
00029     m_listenThreadKilled = false;
00030     m_listening = false;
00031     m_receiveWindowSize = 16;
00032 } // constructor
00033 
00034 
00040 ServiceTable::~ServiceTable()
00041 {
00042     // kill the listen thread
00043     // add a dummy entry to the service list in order to tell the listen thread to die
00044     m_listenThreadKilled = true;
00045     m_listenQueue.add(NULL);
00046 
00047     // destroy the service table
00048     m_serviceListLock.lock();
00049     list<ServiceEntry*>::iterator iter;
00050     for (iter = m_serviceList.begin(); iter != m_serviceList.end(); ++iter) {
00051         delete *iter;
00052     }
00053     m_serviceList.clear();
00054     m_serviceListLock.unlock();
00055 
00056     // destroy the network layer
00057     delete m_networkLayer;
00058     m_networkLayer = NULL;
00059 } // destructor
00060 
00061 
00065 NetworkLayer*
00066 ServiceTable::getNetworkLayer() {
00067     return m_networkLayer;
00068 } // fn getNetworkLayer
00069 
00070 
00074 ServiceEntry* 
00075 ServiceTable::getServiceEntry(int serviceNum)
00076 {
00077     Guard guard(&m_serviceListLock);
00078     list<ServiceEntry*>::iterator sIter;
00079     for (sIter = m_serviceList.begin(); sIter != m_serviceList.end(); ++sIter) {
00080         ServiceEntry* serviceEntry = *sIter;
00081         if (serviceEntry->getServiceNumber() == serviceNum) {
00082             return *sIter;
00083         }
00084     }
00085     return NULL;
00086 } // fn getService
00087 
00088 
00092 ServiceEntry* 
00093 ServiceTable::getServiceEntryByDescriptor(int connectionDescriptor)
00094 {
00095     Guard guard(&m_serviceListLock);
00096     list<ServiceEntry*>::iterator sIter;
00097     for (sIter = m_serviceList.begin(); sIter != m_serviceList.end(); ++sIter) {
00098         ServiceEntry* serviceEntry = *sIter;
00099         if (serviceEntry->getConnectionDescriptor() == connectionDescriptor) {
00100             return *sIter;
00101         }
00102     }
00103     return NULL;
00104 } // fn getService
00105 
00106 
00110 void
00111 ServiceTable::addServiceEntry(ServiceEntry* serviceEntry) {
00112     Guard guard(&m_serviceListLock);
00113     m_serviceList.push_front(serviceEntry);
00114 } // fn addServiceEntry
00115 
00116 
00120 bool
00121 ServiceTable::removeServiceEntry(int serviceNumber) {
00122     Guard guard(&m_serviceListLock);
00123     // iterate over the connection entry list inside the service entry
00124     // and find the service number that matches the parameter.  When we find the match, delete it.
00125     list<ServiceEntry*>::iterator sIter;
00126 
00127     for (sIter = m_serviceList.begin(); sIter != m_serviceList.end(); ++sIter) {
00128         ServiceEntry* serviceEntry = *sIter;
00129         if (serviceEntry->getServiceNumber() == serviceNumber) {
00130             m_serviceList.erase(sIter);
00131             delete serviceEntry;
00132             return true;
00133         }
00134     }
00135     return false;
00136 }
00137 
00138 
00144 int 
00145 ServiceTable::connectService()
00146 {
00147     // check if we are connected to any other nodes.
00148     if (!m_networkLayer->getLli()->isConnected()) {
00149         return PB_NOT_CONNECTED;
00150     }
00151 
00152     ServiceEntry* serviceEntry = new ServiceEntry(this);
00153     MEMCHECK(serviceEntry);
00154 
00155     serviceEntry->toConnectingState();
00156     
00157     addServiceEntry(serviceEntry);
00158     
00159     // try to connect up to CONNECTION_MAX_TRIES times
00160     while (serviceEntry->getRetries() <= CONNECTION_MAX_TRIES) {
00161         int returnValue = m_networkLayer->makeVc(serviceEntry->getConnectionDescriptor());
00162         
00163         if (returnValue == NetworkLayer::NL_UNREACHABLE) {
00164             return PB_NOT_CONNECTED;
00165         }
00166 
00167         debug(DEBUG_SRV, "Connection request sent");
00168         
00169         // wait for EST packet for CONNECTION_TIMEOUT amount of time
00170         if (serviceEntry->waitForConnectComplete(CONNECTION_TIMEOUT)) {
00171             break;
00172         }
00173 
00174         serviceEntry->incrementRetries();
00175     }
00176     
00177     // check if we failed to make a connection
00178     if (serviceEntry->getRetries() > CONNECTION_MAX_TRIES) {
00179         removeServiceEntry(serviceEntry->getServiceNumber());
00180         return PB_UNREACHABLE;
00181     }
00182     
00183     debug(DEBUG_SRV, "End-to-end connection created");
00184     
00185     return serviceEntry->getServiceNumber();
00186 } // fn connectService
00187 
00188   
00195 int
00196 ServiceTable::listenService()
00197 {
00198     ServiceEntry* serviceEntry;
00199     ScopedBool scopedBool(&m_listening, true);
00200 
00201     bool doReturn;
00202 
00203     do {
00204         debug(DEBUG_SRV, "Service Table Listening for new connections");
00205         
00206         doReturn = true;
00207         
00208         // wait for a new connection
00209         serviceEntry = m_listenQueue.getNext();
00210 
00211         if (serviceEntry != NULL) {
00212             debug(DEBUG_SRV, "Waking up for service number: %d, connection descriptor: %d", 
00213                 serviceEntry->getServiceNumber(), serviceEntry->getConnectionDescriptor());
00214         }
00215         
00216         // a request has been made that we should die
00217         if (m_listenThreadKilled) {
00218             debug(DEBUG_SRV, "listenService() killed");
00219             //while (true) { OS_SPEC_SLEEP(1000); }
00220             return TransportLayer::DEAD;
00221         }
00222         
00223         // if we have received a new connection
00224         if (serviceEntry->isConnected() && !serviceEntry->isOrigin()) {
00225             addServiceEntry(serviceEntry);
00226         }
00227         else {
00228             doReturn = false;
00229             debug(DEBUG_SRV, "Service entry added to listen queue that is not a new connection!!");
00230             delete serviceEntry;
00231         }
00232     } while (doReturn == false);
00233 
00234     return serviceEntry->getServiceNumber();
00235 } // fn listenService
00236 
00237 
00241 int 
00242 ServiceTable::disconnectService(int serviceNumber)
00243 {
00244     debug(DEBUG_SRV, "Disconnecting service number %d", serviceNumber);
00245     ServiceEntry* serviceEntry = getServiceEntry(serviceNumber);
00246 
00247     if (!serviceEntry) {
00248         return PB_ERROR;
00249     }
00250     m_networkLayer->destroyVc(serviceEntry->getConnectionDescriptor());
00251     
00252     serviceEntry->toDisconnectedState();
00253     //if (!serviceEntry->waitForDisconnect(DISCONNECT_TIMEOUT)) {
00254     removeServiceEntry(serviceNumber);
00255     //}
00256     return PB_OK;
00257 } // fn disconnectService
00258 
00259 
00279 int 
00280 ServiceTable::sendData(int serviceNumber, u_char* buf, int bufLength)
00281 {
00282     ServiceEntry* serviceEntry = getServiceEntry(serviceNumber);
00283 
00284     if (serviceEntry == NULL) {
00285         debug(DEBUG_SRV, "Invalid service number.");
00286         return PB_ERROR;
00287     }
00288 
00289     if (!m_networkLayer->hasVc(serviceEntry->getConnectionDescriptor())) {
00290         debug(DEBUG_SRV, "Invalid virtual circuit");
00291         return PB_ERROR;
00292     }
00293     
00294     // send all the data 
00295     int bytesSent = serviceEntry->getSar()->reliableSend(buf, bufLength);
00296     
00297     return bytesSent;
00298 } // fn sendData
00299 
00300 
00308 int 
00309 ServiceTable::receiveData(int serviceNumber, u_char* buf, int bufLength)
00310 {
00311     ServiceEntry* serviceEntry = getServiceEntry(serviceNumber);
00312     if (!serviceEntry || serviceEntry->isFailed()) {
00313         if (!serviceEntry) {
00314             debug(DEBUG_SRV, "Could not find service entry");
00315         }
00316         else {
00317             debug(DEBUG_SRV, "Service entry in failed state");
00318         }
00319         return PB_UNREACHABLE;
00320     }
00321     return serviceEntry->getSar()->receive(buf, bufLength);
00322 } // fn receiveData
00323 
00324 
00328 int
00329 ServiceTable::generateServiceNumber() {
00330     return this->m_serviceCount.increment();
00331 } // fn generateServiceNumber
00332 
00333 
00337 bool 
00338 ServiceTable::serviceExists(int serviceNumber)
00339 {
00340     return (getServiceEntry(serviceNumber) != NULL);
00341 } // fn serviceExists
00342 
00343 
00347 /*
00348 bool 
00349 ServiceTable::isBroken(int serviceNumber)
00350 {
00351     ServiceEntry* serviceEntry = getServiceEntry(serviceNumber);
00352     return (!serviceEntry || serviceEntry->isBroken());
00353 } // fn isBroken
00354 */
00355 
00356 
00362 void* ServiceTable::repairConnection(void* arg)
00363 {
00364     debug(DEBUG_SRV, "repair connection thread started");
00365     
00366     RepairConnectionObject* rco = (RepairConnectionObject*)arg;
00367     ServiceTable* serviceTable = rco->m_serviceTable;
00368     ServiceEntry* serviceEntry = rco->m_serviceEntry;
00369     delete rco;
00370 
00371     if (serviceEntry == NULL) {
00372         debug(DEBUG_ERR, "NULL service entry passed to repairConnection");
00373         return NULL;
00374     }
00375 
00376     serviceTable->repairConnectionImpl(serviceEntry);
00377     return NULL;
00378 } // thread repairConnection
00379 
00380 
00386 void
00387 ServiceTable::repairConnectionImpl(ServiceEntry* serviceEntry) {
00388     
00389     if (!serviceEntry->isBroken()) {
00390         debug(DEBUG_SRV, "Tried to repair a connection that isn't broken");
00391         return;
00392     }
00393     else {// connection is broken
00394         debug(DEBUG_SRV, "[Break]");
00395 
00396         // check if we are the origin of the connection
00397         if(serviceEntry->isOrigin()) {
00398             debug(DEBUG_SRV, "I am the origin of the connection");
00399 
00400             serviceEntry->toReconnectingState();
00401 
00402             // try to repair connection up to REPAIR_CONNECTION_MAX_RETRY times
00403             while (serviceEntry->getRetries() <= REPAIR_CONNECTION_MAX_RETRY) {
00404                 serviceEntry->incrementRetries();
00405                 // somehow we must specify that this is to be a reconnect
00406                 // by passing the old connId
00407                 if (!m_networkLayer) { //NetworkLayer is deleted
00408                     return;
00409                 }
00410                 m_networkLayer->makeVc(serviceEntry->getConnectionDescriptor());
00411                 
00412                 debug(DEBUG_SRV, "Reconnect #%d sent", serviceEntry->getRetries());
00413                 
00414                 if (serviceEntry->waitForConnectComplete(REPAIR_CONNECTION_TIMEOUT)) {
00415                     break;
00416                 }
00417             }
00418             
00419             // find out if we failed
00420             if (serviceEntry->getRetries() > REPAIR_CONNECTION_MAX_RETRY) {
00421                 // cannot reconnect, so destroy connection
00422                 // this is commented out right now because it could be dangerous
00423                 // to pull a connection out from under PAB, we will have to be
00424                 // careful to do this safely
00425                 serviceEntry->toFailedState();
00426                 debug(DEBUG_SRV, "[Reconnect Fail]");
00427                 //removeServiceEntry(serviceEntry->getServiceNumber());
00428             }
00429             else { // success
00430                 serviceEntry->toConnectedState();
00431                 debug(DEBUG_SRV, "[Reconnect]");
00432             }
00433         }
00434         else {
00435             debug(DEBUG_SRV, "I'm not origin, so I dont care about repairing this connection.");
00436             serviceEntry->toFailedState();
00437             removeServiceEntry(serviceEntry->getServiceNumber());
00438             //signalDisconnect(serviceEntry->getConnectionDescriptor());
00439         }
00440     }
00441     return;
00442 } // fn repairConnectionImpl
00443 
00444 
00448 /*
00449 void 
00450 ServiceTable::removeBrokenConnections()
00451 {
00452     Guard guard(&m_serviceListLock);
00453     list<ServiceEntry*>::iterator iter;
00454     for (iter = m_serviceList.begin(); iter != m_serviceList.end(); ++iter) {
00455         ServiceEntry* temp = *iter; 
00456         if (temp->isBroken())
00457         {
00458             removeServiceEntry(temp->getConnectionDescriptor());
00459         }
00460     }
00461 } // fn removeBrokenConnections
00462 */
00463 
00464 
00465 //
00466 // The next two functions are to satisfy the ServiceTable's NetworkLayerListener interface.
00467 //
00468 
00469 
00473 int 
00474 ServiceTable::generateConnectionDescriptor()
00475 {
00476     return this->m_descriptorCount.increment(); //Counter::incrementGlobalCount();
00477 } // generateConnectionDescriptor
00478 
00479 
00483 void 
00484 ServiceTable::handleNetworkEvent(int eventCode, int vcn, NpPacket* packet) {
00485     switch (eventCode) {
00486     case NetworkLayer::DATA_EVENT:
00487         handleDataPacket(vcn, packet);
00488         break;
00489     case NetworkLayer::BREAK_EVENT:
00490         signalBreak(vcn);
00491         break;
00492     case NetworkLayer::DISCONNECTION_EVENT:
00493         signalDisconnect(vcn, packet);
00494         break;
00495     case NetworkLayer::PASSIVE_CONNECTION_EVENT:
00496         signalListenComplete(vcn);
00497         break;
00498     case NetworkLayer::ACTIVE_CONNECTION_EVENT:
00499         signalConnectComplete(vcn);
00500         break;
00501     }
00502 }
00503 
00504 
00508 void
00509 ServiceTable::signalDisconnect(int connectionDescriptor, NpPacket* npPacket)
00510 {
00511     debug(DEBUG_SRV, "Disconnect signalled %d", connectionDescriptor);
00512     ServiceEntry* serviceEntry = getServiceEntryByDescriptor(connectionDescriptor);
00513     if (serviceEntry != NULL) {
00514         FinPacket* finPacket = dynamic_cast<FinPacket*>(npPacket);
00515         // check if this was a disconnect on purpose
00516         if (finPacket->getErrorCode() == FinPacket::NoError) {
00517             //serviceEntry->toDisconnectedState();
00518             disconnectService(serviceEntry->getServiceNumber());
00519         }
00520         else {
00521             // try to repair
00522             signalBreak(connectionDescriptor);
00523         }
00524     }
00525     else {
00526         debug(DEBUG_SRV, "Major error");
00527     }
00528 } // fn signalDisconnect
00529 
00530 
00535 void
00536 ServiceTable::signalBreak(int connectionDescriptor)
00537 {
00538     ServiceEntry* serviceEntry = getServiceEntryByDescriptor(connectionDescriptor);
00539 
00540     if (serviceEntry == NULL) {
00541         debug(DEBUG_SRV, "connection that broke not found %d", connectionDescriptor);
00542         return;
00543     }
00544     serviceEntry->toBrokenState();
00545 
00546     debug(DEBUG_SRV, "connection %d broken, spawning repair thread", connectionDescriptor);
00547     
00548     // spawn repairConnection thread
00549     pthread_attr_t attr;
00550     pthread_t tid;
00551     pthread_attr_init(&attr);
00552     RepairConnectionObject* rco = new RepairConnectionObject();
00553     rco->m_serviceTable = this;
00554     rco->m_serviceEntry = serviceEntry;
00555     pthread_create(&tid, &attr, repairConnection, rco);
00556     pthread_detach(tid);
00557     pthread_attr_destroy(&attr);
00558 } // fn signalBreak
00559 
00560 
00565 int 
00566 ServiceTable::signalListenComplete(int connectionDescriptor)
00567 {
00568     ServiceEntry* serviceEntry = getServiceEntryByDescriptor(connectionDescriptor);
00569 
00570     if (serviceEntry) {
00571         debug(DEBUG_SRV, "Duplicate service entry found!");
00572         return ST_NOT_EXPECTING_CONNECTION;
00573     }
00574     else {
00575         // we have accepted a new connection from a remote node
00576         // create a new service entry
00577         serviceEntry = new ServiceEntry(this);
00578         serviceEntry->connectionAccepted(connectionDescriptor);
00579         debug(DEBUG_SRV, "Waking listen thread");
00580         m_listenQueue.add(serviceEntry);
00581         return connectionDescriptor;
00582     }
00583 } // fn signalListenComplete
00584 
00585 
00590 int 
00591 ServiceTable::signalConnectComplete(int connectionDescriptor)
00592 {
00593     ServiceEntry* serviceEntry = getServiceEntryByDescriptor(connectionDescriptor);
00594     
00595     if (serviceEntry == NULL) {
00596         debug(DEBUG_SRV, "Invalid connection descriptor: %d", connectionDescriptor);
00597         return ST_INVALID_SERVICE;
00598     }
00599     
00600     if (serviceEntry->isReconnecting()) {
00601         serviceEntry->toConnectedState();
00602         debug(DEBUG_SRV, "Waking reconnect timer thread");
00603         serviceEntry->signalConnectComplete();
00604         return serviceEntry->getConnectionDescriptor();
00605     }       
00606     else if (serviceEntry->isConnecting()) {
00607         serviceEntry->toConnectedState();
00608         //debug(DEBUG_SRV, "Waking connect timer thread");
00609         serviceEntry->signalConnectComplete();
00610         return serviceEntry->getConnectionDescriptor();
00611     }
00612     else {
00613         debug(DEBUG_SRV, "Not expecting an EST message");
00614         return ST_NOT_EXPECTING_CONNECTION;
00615     }
00616 } // fn signalConnectComplete
00617 
00618 
00624 bool 
00625 ServiceTable::handleDataPacket(int connectionDescriptor, NpPacket* packet)
00626 {
00627     // convert NpPacket to TpPacket
00628     TpPacket* data = new TpPacket(packet->getData(), packet->getDataLength());
00629     MEMCHECK(data);
00630 
00631     debug(DEBUG_SRV, "handling a data packet for connection descriptor %d", connectionDescriptor);
00632     ServiceEntry* serviceEntry = getServiceEntryByDescriptor(connectionDescriptor);
00633     if (serviceEntry) {
00634         //debug(DEBUG_SRV, "found entry");
00635         serviceEntry->getSar()->receivePacket(data);
00636         return true;
00637     }
00638     else {
00639         debug(DEBUG_SRV, "service entry not in service table");
00640         return false;
00641     }
00642 } // fn handleDataPacket
00643 
00644 
00649 int 
00650 ServiceTable::sendDataPacket(int serviceNumber, TpPacket* packet)
00651 {
00652     ServiceEntry* serviceEntry = getServiceEntry(serviceNumber);
00653     if (serviceEntry && serviceEntry->isConnected()) {
00654         return m_networkLayer->send(packet, serviceEntry->getConnectionDescriptor());
00655     }
00656     return PB_BAD_SERVICE;
00657 }  // fn sendDataPacket
00658 
00659 
00660 void 
00661 ServiceTable::toStream(std::ostream& out)
00662 {
00663     out << "------------------------------------------------------" << "\n";
00664     out << "Service Table" << "\n";
00665     out << "------------------------------------------------------" << "\n";
00666     list<ServiceEntry*>::iterator sIter;
00667 
00668     for (sIter = m_serviceList.begin(); sIter != m_serviceList.end(); ++sIter) {
00669         ServiceEntry* serviceEntry = *sIter;
00670         out << *serviceEntry;
00671     }
00672 } // fn toStream
00673 
00674 

Generated at Thu Jul 11 13:31:52 2002 for Peekabooty by doxygen1.2.9 written by Dimitri van Heesch, © 1997-2001