00001 #include "Aggregation.h"
00002 #include "AggrPkt.h"
00003 #include <iostream>
00004 #include <omnetpp.h>
00005 #include <cassert>
00006
00007 Define_Module(Aggregation);
00008
00009 Aggregation::Aggregation() :
00010 aggregationTimer(NULL)
00011 {}
00012
00013 void Aggregation::initialize(int stage) {
00014 BaseLayer::initialize(stage);
00015 if(stage == 0) {
00016 interPacketDelay = par("interPacketDelay").doubleValue();
00017 if(interPacketDelay > 0) {
00018 nbMaxPacketsPerAggregation = par("nbMaxPacketsPerAggregation");
00019 assert(nbMaxPacketsPerAggregation > 0);
00020 aggregationTimer = new cMessage("AggregationTimer");
00021 nbAggrPktSentDown = 0;
00022 nbAggrPktReceived = 0;
00023 } else {
00024 interPacketDelay = 0;
00025 }
00026 }
00027 }
00028
00029 bool Aggregation::isOkToSendNow(int dest) {
00030 bool isOkToSendNow = false;
00031 map<int, destInfo>::iterator iter = destInfos.find(dest);
00032 if(iter == destInfos.end()) {
00033
00034 isOkToSendNow = true;
00035 } else if(destInfos[dest].first + interPacketDelay < simTime()) {
00036
00037 isOkToSendNow = true;
00038 assert(destInfos[dest].second.size() == 0);
00039 }
00040 return isOkToSendNow;
00041 }
00042
00043 void Aggregation::handleUpperMsg(cMessage* msg) {
00044 ApplPkt* pkt = check_and_cast<ApplPkt*> (msg);
00045 if (interPacketDelay == 0) {
00046 sendDown(msg);
00047 } else {
00048 int dest = pkt->getDestAddr();
00049 if (!isOkToSendNow(dest)) {
00050
00051 destInfos[dest].second.push_back(pkt);
00052
00053 simtime_t destTxTime = destInfos[dest].first + interPacketDelay;
00054 if (aggregationTimer->isScheduled()) {
00055 if (aggregationTimer->getArrivalTime() > destTxTime) {
00056 cancelEvent( aggregationTimer);
00057 scheduleAt(destTxTime, aggregationTimer);
00058 }
00059 } else {
00060 scheduleAt(destTxTime, aggregationTimer);
00061 }
00062 } else {
00063
00064 destInfos[dest].second.push_back(pkt);
00065 sendAggregatedPacketNow(dest);
00066 }
00067 }
00068 }
00069
00070 void Aggregation::sendAggregatedPacketNow(int dest) {
00071 AggrPkt* aggr = new AggrPkt("AggregationPacket", 1);
00072 aggr->setBitLength(8);
00073 int nbAggr = 0;
00074 int pktSize = 0;
00075 cObject* ctrlInfo = NULL;
00076 while(nbAggr < nbMaxPacketsPerAggregation && destInfos[dest].second.size() > 0) {
00077 pktSize = pktSize + destInfos[dest].second.front()->getByteLength();
00078 if(ctrlInfo != NULL) {
00079 delete ctrlInfo;
00080 }
00081 ctrlInfo = destInfos[dest].second.front()->getControlInfo();
00082 aggr->storePacket(destInfos[dest].second.front());
00083 destInfos[dest].second.pop_front();
00084 nbAggr = nbAggr + 1;
00085 }
00086 aggr->setByteLength(pktSize);
00087 aggr->setControlInfo(ctrlInfo);
00088 sendDown(aggr);
00089 destInfos[dest].first = simTime();
00090 nbAggrPktSentDown++;
00091 }
00092
00093 void Aggregation::handleLowerMsg(cMessage * msg) {
00094 if(interPacketDelay == 0) {
00095 sendUp(msg);
00096 } else {
00097 AggrPkt* aggr = check_and_cast<AggrPkt*>(msg);
00098 while(!aggr->isEmpty()) {
00099 ApplPkt* aPacket = aggr->popFrontPacket();
00100 sendUp(aPacket);
00101 }
00102 delete aggr;
00103 nbAggrPktReceived++;
00104 }
00105 }
00106
00107 void Aggregation::handleSelfMsg(cMessage* msg) {
00108 ASSERT(msg == aggregationTimer);
00109
00110
00111 map<int, destInfo>::iterator iter = destInfos.begin();
00112
00113 simtime_t nextTxTime = simTime() + 2*interPacketDelay;
00114 while(iter != destInfos.end()) {
00115 if(iter->second.first + interPacketDelay <= simTime() && iter->second.second.size() > 0) {
00116 sendAggregatedPacketNow(iter->first);
00117 }
00118 if(iter->second.second.size() > 0 && iter->second.first + interPacketDelay < nextTxTime) {
00119 nextTxTime = iter->second.first + interPacketDelay;
00120 }
00121 iter++;
00122 }
00123 if(nextTxTime < simTime() + 2*interPacketDelay) {
00124 scheduleAt(nextTxTime, aggregationTimer);
00125 }
00126 }
00127
00128 void Aggregation::finish() {
00129
00130 cancelAndDelete(aggregationTimer);
00131 map<int, destInfo>::iterator iter = destInfos.begin();
00132 while(iter != destInfos.end()) {
00133 while(iter->second.second.size() > 0) {
00134 ApplPkt* pkt = iter->second.second.front();
00135 delete pkt;
00136 iter->second.second.pop_front();
00137 }
00138 iter++;
00139 }
00140
00141 recordScalar("nbAggrPktReceived", nbAggrPktReceived);
00142 recordScalar("nbAggrPktSentDown ", nbAggrPktSentDown);
00143 }
00144
00145 void Aggregation::handleLowerControl(cMessage *msg) {
00146 sendControlUp(msg);
00147 }
00148
00149 void Aggregation::handleUpperControl(cMessage *msg) {
00150 sendControlDown(msg);
00151 }