00001 #include "headers.h"
00002
00017
00021 LiveBroadcastTable::LiveBroadcastTable()
00022 {
00023 Guard guard(&m_lock);
00024
00025 m_curPos = 0;
00026
00027 m_broadcastId.reserve(LIVE_BROADCAST_TABLE_SIZE);
00028
00029 for (int i=0; i<LIVE_BROADCAST_TABLE_SIZE; i++) {
00030 vector<NodeId> tmp;
00031 m_broadcastId.push_back(tmp);
00032 m_broadcastId[i].reserve(5);
00033 }
00034
00035 m_dead = false;
00036
00037
00038 pthread_attr_t attr;
00039 pthread_attr_init(&attr);
00040 pthread_create(&m_liveBroadcastThreadId, &attr, LiveBroadcastTable::onTick, this);
00041 }
00042
00043
00047 LiveBroadcastTable::~LiveBroadcastTable()
00048 {
00049 void* statusp;
00050 Guard guard(&m_lock);
00051 m_dead = true;
00052 m_cond.signal();
00053 pthread_join(m_liveBroadcastThreadId, &statusp);
00054 }
00055
00056
00060 void LiveBroadcastTable::addId(unsigned int id, Node *from)
00061 {
00062 Guard guard(&m_lock);
00063 m_broadcastId[m_curPos].push_back(NodeId(id, from));
00064 }
00065
00066
00072 bool LiveBroadcastTable::hasId(unsigned int id, Node** from)
00073 {
00074 Guard guard(&m_lock);
00075
00076 for (int i = 0; i < LIVE_BROADCAST_TABLE_SIZE; i++) {
00077 vector<NodeId>::iterator iter;
00078 for (iter = m_broadcastId[i].begin(); iter != m_broadcastId[i].end(); ++iter) {
00079 if (iter->id == id) {
00080 if (from != NULL) {
00081 *from = iter->fromNode;
00082 }
00083 return true;
00084 }
00085 }
00086 }
00087 return false;
00088 }
00089
00090
00094 bool
00095 LiveBroadcastTable::hasNode(Node* node) {
00096 Guard guard(&m_lock);
00097
00098 for (int i = 0; i < LIVE_BROADCAST_TABLE_SIZE; i++) {
00099 vector<NodeId>::iterator iter;
00100 for (iter = m_broadcastId[i].begin(); iter != m_broadcastId[i].end(); ++iter) {
00101 if (iter->fromNode->equals(node)) {
00102 return true;
00103 }
00104 }
00105 }
00106 return false;
00107 }
00108
00109
00113 int
00114 LiveBroadcastTable::nodeCount(Node* node) {
00115 Guard guard(&m_lock);
00116 int count = 0;
00117 for (int i = 0; i < LIVE_BROADCAST_TABLE_SIZE; i++) {
00118 vector<NodeId>::iterator iter;
00119 for (iter = m_broadcastId[i].begin(); iter != m_broadcastId[i].end(); ++iter) {
00120 if (iter->fromNode->equals(node)) {
00121 ++count;
00122 }
00123 }
00124 }
00125 return count;
00126 }
00127
00128
00132 void* LiveBroadcastTable::onTick(void* me)
00133 {
00134 pthread_detach(pthread_self());
00135 LiveBroadcastTable* broadcastTable = (LiveBroadcastTable*)me;
00136 Guard guard(&broadcastTable->m_lock);
00137
00138 while (true) {
00139 broadcastTable->m_curPos = (broadcastTable->m_curPos + 1) % LIVE_BROADCAST_TABLE_SIZE;
00140
00141 broadcastTable->m_broadcastId[broadcastTable->m_curPos].clear();
00142
00143
00144 broadcastTable->m_cond.timedWait(&broadcastTable->m_lock, (int)BROADCAST_LIFE);
00145 if (broadcastTable->m_dead) {
00146 return NULL;
00147 }
00148 }
00149 return NULL;
00150 }
00151
00152
00156 void
00157 LiveBroadcastTable::toStream(std::ostream& out)
00158 {
00159 Guard guard(&m_lock);
00160
00161 for (int i=0; i < LIVE_BROADCAST_TABLE_SIZE; i++) {
00162 out << setw(10);
00163 if (i == m_curPos) {
00164 out << "--> Entry ";
00165 }
00166 else {
00167 out << " Entry ";
00168 }
00169 int size = m_broadcastId[i].size();
00170 out << i << "(size: " << size << "): ";
00171 vector<NodeId>::iterator iter;
00172 for (iter = m_broadcastId[i].begin(); iter != m_broadcastId[i].end(); ++iter) {
00173 out << iter->id << " ";
00174 }
00175 out << "\n";
00176 }
00177 }
00178
00179