Aggregation.cc

00001 #include "Aggregation.h"
00002 #include "AggrPkt.h"
00003 #include <iostream>
00004 #include <omnetpp.h>
00005 #include <cassert>
00006 
00007 Define_Module(Aggregation);
00008 
00009 Aggregation::Aggregation() :
00010     aggregationTimer(NULL)
00011 {}
00012 
00013 void Aggregation::initialize(int stage) {
00014     BaseLayer::initialize(stage);
00015   if(stage == 0) {
00016     interPacketDelay = par("interPacketDelay").doubleValue();
00017     if(interPacketDelay > 0) {
00018       nbMaxPacketsPerAggregation = par("nbMaxPacketsPerAggregation");
00019       assert(nbMaxPacketsPerAggregation > 0);
00020       aggregationTimer = new cMessage("AggregationTimer");
00021       nbAggrPktSentDown = 0;
00022       nbAggrPktReceived = 0;
00023     } else {
00024       interPacketDelay = 0;
00025     }
00026   }
00027 }
00028 
00029 bool Aggregation::isOkToSendNow(int dest) {
00030   bool isOkToSendNow = false;
00031     map<int, destInfo>::iterator iter = destInfos.find(dest);
00032   if(iter == destInfos.end()) {
00033     // we can send directly if we meet this node for the first time
00034     isOkToSendNow = true;
00035   } else if(destInfos[dest].first + interPacketDelay < simTime()) {
00036     // we can send directly if the interPacketDelay time has expired since last transmission
00037     isOkToSendNow = true;
00038     assert(destInfos[dest].second.size() == 0); // otherwise the aggregation timer should have fired
00039   }
00040   return isOkToSendNow;
00041 }
00042 
00043 void Aggregation::handleUpperMsg(cMessage* msg) {
00044   ApplPkt* pkt = check_and_cast<ApplPkt*> (msg);
00045   if (interPacketDelay == 0) {
00046     sendDown(msg);
00047   } else {
00048     int dest = pkt->getDestAddr();
00049     if (!isOkToSendNow(dest)) {
00050       // store packet
00051       destInfos[dest].second.push_back(pkt);
00052       // reschedule aggregation timer to "earliest destination"
00053       simtime_t destTxTime = destInfos[dest].first + interPacketDelay;
00054       if (aggregationTimer->isScheduled()) {
00055         if (aggregationTimer->getArrivalTime() > destTxTime) {
00056           cancelEvent( aggregationTimer);
00057           scheduleAt(destTxTime, aggregationTimer);
00058         }
00059       } else {
00060         scheduleAt(destTxTime, aggregationTimer);
00061       }
00062     } else {
00063       // send now
00064       destInfos[dest].second.push_back(pkt);
00065       sendAggregatedPacketNow(dest);
00066     }
00067   }
00068 }
00069 
00070 void Aggregation::sendAggregatedPacketNow(int dest) {
00071   AggrPkt* aggr = new AggrPkt("AggregationPacket", 1);
00072   aggr->setBitLength(8);
00073   int nbAggr = 0;
00074   int pktSize = 0;
00075   cObject* ctrlInfo = NULL;
00076   while(nbAggr < nbMaxPacketsPerAggregation && destInfos[dest].second.size() > 0) {
00077     pktSize = pktSize + destInfos[dest].second.front()->getByteLength();
00078     if(ctrlInfo != NULL) {
00079       delete ctrlInfo; // we delete all ctrlInfo except the last, which we attach to our message
00080     }
00081     ctrlInfo = destInfos[dest].second.front()->getControlInfo();
00082     aggr->storePacket(destInfos[dest].second.front());
00083     destInfos[dest].second.pop_front();
00084     nbAggr = nbAggr + 1;
00085   }
00086   aggr->setByteLength(pktSize); // why doesn't this compile ?
00087   aggr->setControlInfo(ctrlInfo);
00088   sendDown(aggr);
00089   destInfos[dest].first = simTime();
00090   nbAggrPktSentDown++;
00091 }
00092 
00093 void Aggregation::handleLowerMsg(cMessage * msg) {
00094   if(interPacketDelay == 0) {
00095     sendUp(msg);
00096   } else {
00097     AggrPkt* aggr = check_and_cast<AggrPkt*>(msg);
00098     while(!aggr->isEmpty()) {
00099     ApplPkt* aPacket = aggr->popFrontPacket();
00100     sendUp(aPacket);
00101     }
00102     delete aggr;
00103     nbAggrPktReceived++;
00104   }
00105 }
00106 
00107 void Aggregation::handleSelfMsg(cMessage* msg) {
00108   ASSERT(msg == aggregationTimer);
00109   // loop over all destinations
00110   // and send their packets if the time has come
00111   map<int, destInfo>::iterator iter = destInfos.begin();
00112   // simultaneously, compute next trigger time for aggregate timer (if required)
00113   simtime_t nextTxTime = simTime() + 2*interPacketDelay;
00114   while(iter != destInfos.end()) {
00115     if(iter->second.first + interPacketDelay <= simTime() && iter->second.second.size() > 0) {
00116       sendAggregatedPacketNow(iter->first);
00117     }
00118     if(iter->second.second.size() > 0 && iter->second.first + interPacketDelay < nextTxTime) {
00119       nextTxTime = iter->second.first + interPacketDelay;
00120     }
00121     iter++;
00122   }
00123   if(nextTxTime < simTime() + 2*interPacketDelay) {
00124     scheduleAt(nextTxTime, aggregationTimer);
00125   }
00126 }
00127 
00128 void Aggregation::finish() {
00129   // clean up memory
00130   cancelAndDelete(aggregationTimer);
00131   map<int, destInfo>::iterator iter = destInfos.begin();
00132   while(iter != destInfos.end()) {
00133     while(iter->second.second.size() > 0) {
00134       ApplPkt* pkt = iter->second.second.front();
00135       delete pkt;
00136       iter->second.second.pop_front();
00137     }
00138     iter++;
00139   }
00140   // save counter values
00141   recordScalar("nbAggrPktReceived", nbAggrPktReceived);
00142   recordScalar("nbAggrPktSentDown ", nbAggrPktSentDown);
00143 }
00144 
00145 void Aggregation::handleLowerControl(cMessage *msg) {
00146   sendControlUp(msg);
00147 }
00148 
00149 void Aggregation::handleUpperControl(cMessage *msg) {
00150   sendControlDown(msg);
00151 }