ProbabilisticBroadcast.cc

00001 /*
00002  * ProbabilisticBroadcast.cc
00003  *
00004  *  Created on: Nov 4, 2008
00005  *      Author: Damien Piguet
00006  */
00007 
00008 #include <cassert>
00009 #include "ProbabilisticBroadcast.h"
00010 #include "SimpleAddress.h"
00011 
00012 Define_Module(ProbabilisticBroadcast);
00013 
00014 long ProbabilisticBroadcast::id_counter = 0;
00015 
00016 void ProbabilisticBroadcast::initialize(int stage)
00017 {
00018   BaseNetwLayer::initialize(stage);
00019 
00020   if(stage == 0){
00021       stats = par("stats");
00022       trace = par("trace");
00023       debug = par("debug");
00024       broadcastPeriod = par("bcperiod");
00025     beta = par("beta");
00026     maxNbBcast = par("maxNbBcast");
00027       headerLength = par("headerLength");
00028       timeInQueueAfterDeath = par("timeInQueueAfterDeath");
00029       timeToLive = par("timeToLive");
00030       broadcastTimer = new cMessage("broadcastTimer");
00031       convertedMacBroadcastAddr = L2BROADCAST;
00032       maxFirstBcastBackoff = par("maxFirstBcastBackoff");
00033       oneHopLatencies.setName("oneHopLatencies");
00034       nbDataPacketsReceived = 0;
00035       nbDataPacketsSent = 0;
00036       debugNbMessageKnown = 0;
00037       nbDataPacketsForwarded = 0;
00038       nbHops = 0;
00039   }
00040 }
00041 
00042 void ProbabilisticBroadcast::handleUpperMsg(cMessage* msg)
00043 {
00044   ProbabilisticBroadcastPkt* pkt;
00045 
00046   // encapsulate message in a network layer packet.
00047   pkt = encapsMsg(msg);
00048   nbDataPacketsSent++;
00049   EV << "PBr: " << simTime() << " n"  << myNetwAddr << " handleUpperMsg(): Pkt ID = " << pkt->getId() << " TTL = " << pkt->getAppTtl() << endl;
00050   // submit packet for first insertion in queue.
00051   insertNewMessage(pkt, true);
00052 }
00053 
00054 void ProbabilisticBroadcast::handleLowerMsg(cMessage* msg)
00055 {
00056   unsigned long macSrcAddr;
00057   double oneHopLatency;
00058   ProbabilisticBroadcastPkt* m = check_and_cast<ProbabilisticBroadcastPkt*>(msg);
00059   MacToNetwControlInfo* cInfo = check_and_cast<MacToNetwControlInfo*>(m->removeControlInfo());
00060   m->setNbHops(m->getNbHops()+1);
00061   macSrcAddr = cInfo->getLastHopMac();
00062   delete cInfo;
00063   ++nbDataPacketsReceived;
00064   nbHops = nbHops + m->getNbHops();
00065   oneHopLatency = simTime().dbl() - m->getTimestamp().dbl();
00066   if(trace) {
00067     oneHopLatencies.record(oneHopLatency);
00068   }
00069   // oneHopLatency gives us an estimate of how long the message spent in the MAC queue of
00070   // its sender (compared to that, transmission delay is negligible). Use this value
00071   // to update the TTL of the message. Dump it if it is dead.
00072   //m->setAppTtl(m->getAppTtl().dbl() - oneHopLatency);
00073   if (/*(m->getAppTtl() <= 0) || */(messageKnown(m->getId()))) {
00074     // we got this message already, ignore it.
00075     EV << "PBr: " << simTime() << " n"  << myNetwAddr << " handleLowerMsg(): Dead or Known message ID=" << m->getId() << " from node "
00076        << macSrcAddr << " TTL = " << m->getAppTtl() << endl;
00077     delete m;
00078   }
00079   else {
00080     if (debugMessageKnown(m->getId())) {
00081       ++debugNbMessageKnown;
00082       EV << "PBr: " << simTime() << " n"  << myNetwAddr << " ERROR Message should be known TTL= " << m->getAppTtl() << endl;
00083     }
00084     EV << "PBr: " << simTime() << " n"  << myNetwAddr << " handleLowerMsg(): Unknown message ID=" << m->getId() << " from node "
00085        << macSrcAddr << endl;
00086     // Unknown message. Insert message in queue with random backoff broadcast delay.
00087     // Because we got the message from lower layer, we need to create and add a new
00088     // control info with the MAC destination address = broadcast address.
00089     m->setControlInfo(new NetwToMacControlInfo(convertedMacBroadcastAddr));
00090     // before inserting message, update source address (for this hop, not the initial source)
00091     m->setSrcAddr(myNetwAddr);
00092     insertNewMessage(m);
00093 
00094     // until a subscription mechanism is implemented, duplicate and pass all received packets
00095     // to the application layer who will be able to compute statistics.
00096     // TODO: implement an application subscription mechanism.
00097     if (true) {
00098       ProbabilisticBroadcastPkt* mCopy = static_cast<ProbabilisticBroadcastPkt*>(m->dup());
00099       sendUp(decapsMsg(mCopy));
00100     }
00101   }
00102 }
00103 
00104 void ProbabilisticBroadcast::handleSelfMsg(cMessage* msg)
00105 {
00106   if (msg == broadcastTimer) {
00107     tMsgDesc* msgDesc;
00108     ProbabilisticBroadcastPkt* pkt;
00109 
00110     // called method pops the first message from the message queue and
00111     // schedules the message timer for the next one. The message is embedded
00112     // into a container of type tMsgDesc.
00113     msgDesc = popFirstMessageUpdateQueue();
00114     pkt = msgDesc->pkt;
00115     // if the packet is alive, duplicate it and insert the copy in the queue,
00116     // then perform a broadcast attempt.
00117     EV << "PBr: " << simTime() << " n"  << myNetwAddr << " handleSelfMsg(): Message ID= " << pkt->getId() << " TTL= " << pkt->getAppTtl() << endl;
00118     if (pkt->getAppTtl() > 0) {
00119       // check if we are allowed to re-transmit the message on more time.
00120       if (msgDesc->nbBcast < maxNbBcast) {
00121         ProbabilisticBroadcastPkt* pktCopy;
00122         bool sendForSure = msgDesc->initialSend;
00123 
00124         // duplicate packet and insert the copy in the queue.
00125         // two possibilities: the packet will be alive at next
00126         // broadcast period => insert it with delay = broadcastPeriod.
00127         // Or the packet will be dead at next broadcast period (TTL <= broadcastPeriod)
00128         // => insert it with delay = TTL. So when the copy will be popped out of the
00129         // queue, it will be considered as dead and discarded.
00130         pktCopy = static_cast<ProbabilisticBroadcastPkt*>(pkt->dup());
00131         // control info is not duplicated with the message, so we have to re-create one here.
00132         pktCopy->setControlInfo(new NetwToMacControlInfo(convertedMacBroadcastAddr));
00133         // it the copy that is re-inserted into the queue so update the container accordingly
00134         msgDesc->pkt = pktCopy;
00135         // increment nbBcast field of the descriptor because at this point, it is sure that
00136         // the message will go through one more broadcast attempt.
00137         msgDesc->nbBcast++;
00138         // for sure next broadcast attempt will not be the initial one.
00139         msgDesc->initialSend = false;
00140         // if msg TTL > broadcast period, the message will be broadcasted one more
00141         // time, insert it with delay = broadcast period. Otherwise, the message
00142         // will be dead at next broadcast attempt. Keep it in the list with
00143         // delay = TTL + timeInQueueAfterDeath. insertMessage() will update its
00144         // TTL to -timeInQueueAfterDeath, a negative value. That way, the message
00145         // is known to the system, de-synchronization between copies of the same message
00146         // is therefore handled and when the message will be popped out, its TTL will
00147         // be smaller than zero, thus the message will be discarded, not broadcasted.
00148         if (pktCopy->getAppTtl() > broadcastPeriod)
00149           insertMessage(broadcastPeriod, msgDesc);
00150         else
00151           insertMessage(pktCopy->getAppTtl()+timeInQueueAfterDeath, msgDesc);
00152         // broadcast the message with probability beta
00153         if (sendForSure) {
00154           EV << "PBr: " << simTime() << " n"  << myNetwAddr << "     Send packet down for sure." << endl;
00155           pkt->setTimestamp();
00156           sendDown(pkt);
00157           ++nbDataPacketsForwarded;
00158         }
00159         else {
00160           if (bernoulli(beta)) {
00161             EV << "PBr: " << simTime() << " n"  << myNetwAddr << "     Bernoulli test result: TRUE. Send packet down." << endl;
00162             pkt->setTimestamp();
00163             sendDown(pkt);
00164             ++nbDataPacketsForwarded;
00165           }
00166           else {
00167             EV << "PBr: " << simTime() << " n"  << myNetwAddr << "     Bernoulli test result: FALSE" << endl;
00168             delete pkt;
00169           }
00170         }
00171       }
00172       else {
00173         // we can't re-transmit the message because maxNbBcast is reached.
00174         // re-insert-it in the queue with delay = TTL so that its ID is still
00175         // known by the system.
00176         EV << "PBr: " << simTime() << " n"  << myNetwAddr << "     maxNbBcast reached." << endl;
00177         insertMessage(pkt->getAppTtl()+timeInQueueAfterDeath, msgDesc);
00178       }
00179     }
00180     else {
00181       EV << "PBr: " << simTime() << " n"  << myNetwAddr << "     Message TTL zero, discard." << endl;
00182       delete msgDesc;
00183       delete pkt;
00184     }
00185   }
00186   else {
00187     EV << "PBr: " << simTime() << " n"  << myNetwAddr << " Received unexpected self message" << endl;
00188   }
00189 }
00190 
00191 void ProbabilisticBroadcast::handleLowerControl(cMessage* msg)
00192 {
00193   EV << "PBr: " << simTime() << " n"  << myNetwAddr << " Received lower control message. Name: "
00194      << msg->getName() << " type: " << msg->getKind() << endl;
00195   delete msg;
00196 }
00197 
00198 void ProbabilisticBroadcast::finish()
00199 {
00200   EV << "PBr: " << simTime() << " n"  << myNetwAddr << " finish()" << endl;
00201   cancelAndDelete(broadcastTimer);
00202   // if some messages are still in the queue, delete them.
00203   while (!msgQueue.empty()) {
00204     TimeMsgMap::iterator pos = msgQueue.begin();
00205     tMsgDesc* msgDesc = pos->second;
00206     msgQueue.erase(pos);
00207     delete msgDesc->pkt;
00208     delete msgDesc;
00209   }
00210   if (stats) {
00211     recordScalar("nbDataPacketsReceived", nbDataPacketsReceived);
00212     recordScalar("debugNbMessageKnown", debugNbMessageKnown);
00213     recordScalar("nbDataPacketsForwarded", nbDataPacketsForwarded);
00214     if(nbDataPacketsReceived > 0) {
00215       recordScalar("meanNbHops", (double) nbHops / (double) nbDataPacketsReceived);
00216     } else {
00217       recordScalar("meanNbHops", 0);
00218     }
00219   }
00220 }
00221 
00222 bool ProbabilisticBroadcast::messageKnown(unsigned int msgId)
00223 {
00224   MsgIdSet::iterator pos;
00225 
00226   pos = knownMsgIds.find(msgId);
00227   return pos != knownMsgIds.end();
00228 }
00229 
00230 bool ProbabilisticBroadcast::debugMessageKnown(unsigned int msgId)
00231 {
00232   MsgIdSet::iterator pos;
00233 
00234   pos = debugMsgIdSet.find(msgId);
00235   return pos != debugMsgIdSet.end();
00236 }
00237 
00238 void ProbabilisticBroadcast::insertMessage(simtime_t bcastDelay, tMsgDesc* msgDesc)
00239 {
00240   TimeMsgMap::iterator pos;
00241   simtime_t bcastTime = simTime() + bcastDelay;
00242 
00243   EV << "PBr: " << simTime() << " n"  << myNetwAddr << "         insertMessage() bcastDelay = " << bcastDelay << " Msg ID = " << msgDesc->pkt->getId() << endl;
00244   // update TTL field of the message to the value it will have when taken out of the list
00245   msgDesc->pkt->setAppTtl(msgDesc->pkt->getAppTtl() - bcastDelay);
00246   // insert message ID in ID list.
00247   knownMsgIds.insert(msgDesc->pkt->getId());
00248   // insert key value pair <broadcast time, pointer to message> in message queue.
00249   pos = msgQueue.insert(make_pair(bcastTime, msgDesc));
00250   // if the message has been inserted in the front of the list, it means that it
00251   // will be the next message to be broadcasted, therefore we have to re-schedule
00252   // the broadcast timer to the message's broadcast instant.
00253   if (pos == msgQueue.begin()) {
00254     EV << "PBr: " << simTime() << " n"  << myNetwAddr << "         message inserted in the front, reschedule it." << endl;
00255     cancelEvent(broadcastTimer);
00256     scheduleAt(bcastTime, broadcastTimer);
00257   }
00258 }
00259 
00260 ProbabilisticBroadcast::tMsgDesc* ProbabilisticBroadcast::popFirstMessageUpdateQueue(void)
00261 {
00262   TimeMsgMap::iterator pos;
00263   tMsgDesc* msgDesc;
00264 
00265   // get first message.
00266   ASSERT(!msgQueue.empty());
00267   pos = msgQueue.begin();
00268   msgDesc = pos->second;
00269   // remove first message from message queue and from ID list
00270   msgQueue.erase(pos);
00271   knownMsgIds.erase(msgDesc->pkt->getId());
00272   EV << "PBr: " << simTime() << " n"  << myNetwAddr << "         pop(): just popped msg " << msgDesc->pkt->getId() << endl;
00273   if (!msgQueue.empty()) {
00274     // schedule broadcast of new first message
00275     EV << "PBr: " << simTime() << " n"  << myNetwAddr << "         pop(): schedule next message." << endl;
00276     pos = msgQueue.begin();
00277     scheduleAt(pos->first, broadcastTimer);
00278   }
00279   return msgDesc;
00280 }
00281 
00282 ProbabilisticBroadcastPkt* ProbabilisticBroadcast::encapsMsg(cMessage* message)
00283 {
00284   assert(static_cast<cPacket*>(message));
00285   cPacket* msg = static_cast<cPacket*>(message);
00286   ProbabilisticBroadcastPkt* pkt = new ProbabilisticBroadcastPkt(msg->getName(), DATA);
00287 //  ProbBcastNetwControlInfo* cInfo = dynamic_cast<ProbBcastNetwControlInfo*>(msg->removeControlInfo());
00288   cObject* cInfo = msg->removeControlInfo();
00289   int bcastIpAddr = L3BROADCAST;
00290 
00291   ASSERT(cInfo);
00292   pkt->setByteLength(headerLength);
00293   pkt->setSrcAddr(myNetwAddr);
00294   pkt->setDestAddr(bcastIpAddr);
00295   pkt->setInitialSrcAddr(myNetwAddr);
00296   pkt->setFinalDestAddr(bcastIpAddr);
00297   pkt->setAppTtl(timeToLive);
00298   pkt->setId(getNextID());
00299   pkt->setControlInfo(new NetwToMacControlInfo(convertedMacBroadcastAddr));
00300   //encapsulate the application packet
00301   pkt->encapsulate(msg);
00302 
00303   // clean-up
00304   delete cInfo;
00305 
00306   return pkt;
00307 }
00308 
00309 void ProbabilisticBroadcast::insertNewMessage(ProbabilisticBroadcastPkt* pkt, bool iAmInitialSender)
00310 {
00311   simtime_t ttl = pkt->getAppTtl();
00312 
00313   if (ttl > 0) {
00314     simtime_t bcastDelay;
00315     tMsgDesc* msgDesc;
00316 
00317     // insert packet in queue with delay in [0, min(TTL, broadcast period)].
00318     // since the insertion schedules the message for its first broadcast attempt,
00319     // we use a uniform random back-off taken between now and the broadcast delay
00320     // to avoid having all nodes in the neighborhood forward the packet at the same
00321     // time. Backoffs used at MAC layer are thought to be too short.
00322     if (broadcastPeriod < maxFirstBcastBackoff)
00323       bcastDelay = broadcastPeriod;
00324     else
00325       bcastDelay = maxFirstBcastBackoff;
00326     if (bcastDelay > ttl)
00327       bcastDelay = ttl;
00328     EV << "PBr: " << simTime() << " n"  << myNetwAddr << " insertNewMessage(): insert packet " << pkt->getId() << " with delay "
00329        << bcastDelay << endl;
00330     // create container for message and initialize container's values.
00331     msgDesc = new tMsgDesc;
00332     msgDesc->pkt = pkt;
00333     msgDesc->nbBcast = 0;  // so far, pkt has been forwarded zero times.
00334     msgDesc->initialSend = iAmInitialSender;
00335     debugMsgIdSet.insert(pkt->getId());
00336     insertMessage(uniform(0, bcastDelay), msgDesc);
00337   }
00338   else {
00339     EV << "PBr: " << simTime() << " n"  << myNetwAddr << " insertNewMessage(): got new packet with TTL = 0." << endl;
00340     delete pkt;
00341   }
00342 }
00343 
00344 cMessage* ProbabilisticBroadcast::decapsMsg(ProbabilisticBroadcastPkt *msg)
00345 {
00346   cMessage *m = msg->decapsulate();
00347   m->setControlInfo(new ProbBcastNetwControlInfo(msg->getSrcAddr()));
00348   // delete the network layer packet
00349   delete msg;
00350   return m;
00351 }