SCTPServer.cc

Go to the documentation of this file.
00001 //
00002 // Copyright (C) 2008 Irene Ruengeler
00003 // Copyright (C) 2009 Thomas Dreibholz
00004 //
00005 // This program is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU General Public License
00007 // as published by the Free Software Foundation; either version 2
00008 // of the License, or (at your option) any later version.
00009 //
00010 // This program is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00013 // GNU General Public License for more details.
00014 //
00015 // You should have received a copy of the GNU General Public License
00016 // along with this program; if not, see <http://www.gnu.org/licenses/>.
00017 //
00018 
00019 
00020 #include "SCTPServer.h"
00021 #include "SCTPSocket.h"
00022 #include "SCTPCommand_m.h"
00023 #include "SCTPMessage_m.h"
00024 #include <stdlib.h>
00025 #include <stdio.h>
00026 #include "SCTPAssociation.h"
00027 
00028 #define MSGKIND_CONNECT  0
00029 #define MSGKIND_SEND         1
00030 #define MSGKIND_      2
00031 
00032 Define_Module(SCTPServer);
00033 
00034 void SCTPServer::initialize()
00035 {
00036     char * token;
00037     cPar *delT;
00038     AddressVector addresses;
00039     socket = NULL;
00040     sctpEV3<<"initialize SCTP Server\n";
00041     numSessions = packetsSent = packetsRcvd = bytesSent = notifications = 0;
00042     WATCH(numSessions);
00043     WATCH(packetsSent);
00044     WATCH(packetsRcvd);
00045     WATCH(bytesSent);
00046     WATCH(numRequestsToSend);
00047     // parameters
00048     finishEndsSimulation = (bool)par("finishEndsSimulation");
00049     const char* address = par("address");
00050     token = strtok((char*)address,",");
00051     while (token != NULL)
00052     {
00053         addresses.push_back(IPvXAddress(token));
00054         token = strtok(NULL, ",");
00055     }
00056     int32 port = par("port");
00057     echoFactor = par("echoFactor");
00058     delay = par("echoDelay");
00059     delayFirstRead = par("delayFirstRead");
00060     delT = &par("readingInterval");
00061     if (delT->isNumeric() && (double)*delT==0)
00062         readInt=false;
00063     else
00064         readInt=true;
00065     int32 messagesToPush = par("messagesToPush");
00066     inboundStreams = par("inboundStreams");
00067     outboundStreams = par("outboundStreams");
00068     ordered = (bool)par("ordered");
00069     queueSize = par("queueSize");
00070     lastStream = 0;
00071     //abort = NULL;
00072     //abortSent = false;
00073     timeoutMsg = new cMessage("SrvAppTimer");
00074     delayTimer = new cMessage("delayTimer");
00075     delayTimer->setContextPointer(this);
00076     delayFirstReadTimer = new cMessage("delayFirstReadTimer");
00077     firstData = true;
00078     SCTPSocket *socket = new SCTPSocket();
00079     socket->setOutputGate(gate("sctpOut"));
00080     socket->setInboundStreams(inboundStreams);
00081     socket->setOutboundStreams(outboundStreams);
00082     if (strcmp(address,"")==0)
00083         socket->bind(port);
00084     else
00085     {
00086         socket->bindx(addresses, port);
00087     }
00088     socket->listen(true, par("numPacketsToSendPerClient"), messagesToPush);
00089     sctpEV3<<"SCTPServer::initialized listen port="<<port<<"\n";
00090     schedule = false;
00091     shutdownReceived = false;
00092 }
00093 
00094 void SCTPServer::sendOrSchedule(cPacket *msg)
00095 {
00096     if (delay==0)
00097     {
00098         send(msg, "sctpOut");
00099     }
00100     else
00101     {
00102         scheduleAt(simulation.getSimTime()+delay, msg);
00103     }
00104 }
00105 
00106 void SCTPServer::generateAndSend()
00107 {
00108 uint32 numBytes;
00109 
00110     cPacket* cmsg = new cPacket("CMSG");
00111     SCTPSimpleMessage* msg = new SCTPSimpleMessage("Server");
00112     numBytes = (uint32)par("requestLength");
00113     msg->setDataArraySize(numBytes);
00114     for (uint32 i=0; i<numBytes; i++)
00115     {
00116         msg->setData(i, 's');
00117     }
00118     msg->setDataLen(numBytes);
00119     msg->setBitLength(numBytes * 8);
00120     cmsg->encapsulate(msg);
00121     SCTPSendCommand *cmd = new SCTPSendCommand("Send1");
00122     cmd->setAssocId(assocId);
00123     if (ordered)
00124         cmd->setSendUnordered(COMPLETE_MESG_ORDERED);
00125     else
00126         cmd->setSendUnordered(COMPLETE_MESG_UNORDERED);
00127     lastStream=(lastStream+1)%outboundStreams;
00128     cmd->setSid(lastStream);
00129     if (queueSize>0 && numRequestsToSend > 0 && count < queueSize*2)
00130         cmd->setLast(false);
00131     else
00132         cmd->setLast(true);
00133     cmsg->setKind(SCTP_C_SEND);
00134     cmsg->setControlInfo(cmd);
00135     packetsSent++;
00136     bytesSent+=msg->getBitLength()/8;
00137     sendOrSchedule(cmsg);
00138 }
00139 
00140 cPacket* SCTPServer::makeReceiveRequest(cPacket* msg)
00141 {
00142     SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->removeControlInfo());
00143     cPacket* cmsg = new cPacket("ReceiveRequest");
00144     SCTPSendCommand *cmd = new SCTPSendCommand("Send2");
00145     cmd->setAssocId(ind->getAssocId());
00146     cmd->setSid(ind->getSid());
00147     cmd->setNumMsgs(ind->getNumMsgs());
00148     cmsg->setKind(SCTP_C_RECEIVE);
00149     cmsg->setControlInfo(cmd);
00150     delete ind;
00151     return cmsg;
00152 }
00153 
00154 cPacket* SCTPServer::makeDefaultReceive()
00155 {
00156     cPacket* cmsg = new cPacket("DefaultReceive");
00157     SCTPSendCommand *cmd = new SCTPSendCommand("Send3");
00158     cmd->setAssocId(assocId);
00159     cmd->setSid(0);
00160     cmd->setNumMsgs(1);
00161     cmsg->setKind(SCTP_C_RECEIVE);
00162     cmsg->setControlInfo(cmd);
00163     return cmsg;
00164 }
00165 
00166 cPacket* SCTPServer::makeAbortNotification(SCTPCommand* msg)
00167 {
00168     SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg);
00169     cPacket* cmsg = new cPacket("AbortNotification");
00170     SCTPSendCommand *cmd = new SCTPSendCommand("Send4");
00171     assocId = ind->getAssocId();
00172     cmd->setAssocId(assocId);
00173     cmd->setSid(ind->getSid());
00174     cmd->setNumMsgs(ind->getNumMsgs());
00175     cmsg->setControlInfo(cmd);
00176     delete ind;
00177     //delete msg;
00178     cmsg->setKind(SCTP_C_ABORT);
00179     return cmsg;
00180 }
00181 
00182 void SCTPServer::handleMessage(cMessage *msg)
00183 {
00184     int32 id;
00185     cPacket* cmsg;
00186 
00187     if (msg->isSelfMessage())
00188     {
00189 
00190         handleTimer(msg);
00191     }
00192     else
00193     {
00194     switch (msg->getKind())
00195     {
00196         case SCTP_I_PEER_CLOSED:
00197         case SCTP_I_ABORT:
00198         {
00199             SCTPCommand *command = dynamic_cast<SCTPCommand *>(msg->removeControlInfo());
00200             assocId = command->getAssocId();
00201             serverAssocStatMap[assocId].peerClosed = true;
00202             if ((long) par("numPacketsToReceivePerClient")==0)
00203             {
00204                 if (serverAssocStatMap[assocId].abortSent==false)
00205                 {
00206                     sendOrSchedule(makeAbortNotification(command->dup()));
00207                     serverAssocStatMap[assocId].abortSent = true;
00208                 }
00209             }
00210             else
00211             {
00212                 if (serverAssocStatMap[assocId].rcvdPackets==(unsigned long) par("numPacketsToReceivePerClient") &&
00213                     serverAssocStatMap[assocId].abortSent==false)
00214                 {
00215                     sendOrSchedule(makeAbortNotification(command->dup()));
00216                     serverAssocStatMap[assocId].abortSent = true;
00217                 }
00218             }
00219             if (delayTimer->isScheduled())
00220                 cancelEvent(delayTimer);
00221             if (delayFirstReadTimer->isScheduled())
00222                 cancelEvent(delayFirstReadTimer);
00223             delete command;
00224             delete msg;
00225             break;
00226         }
00227         case SCTP_I_ESTABLISHED:
00228         {
00229             count=0;
00230             SCTPConnectInfo *connectInfo = dynamic_cast<SCTPConnectInfo *>(msg->removeControlInfo());
00231             numSessions++;
00232             assocId = connectInfo->getAssocId();
00233             inboundStreams = connectInfo->getInboundStreams();
00234             outboundStreams = connectInfo->getOutboundStreams();
00235             serverAssocStatMap[assocId].rcvdPackets= (long) par("numPacketsToReceivePerClient");
00236             serverAssocStatMap[assocId].sentPackets= (long) par("numPacketsToSendPerClient");
00237             serverAssocStatMap[assocId].rcvdBytes=0;
00238             serverAssocStatMap[assocId].start=0;
00239             serverAssocStatMap[assocId].stop=0;
00240             serverAssocStatMap[assocId].lifeTime=0;
00241             serverAssocStatMap[assocId].abortSent=false;
00242             serverAssocStatMap[assocId].peerClosed = false;
00243             char text[30];
00244             sprintf(text, "App: Received Bytes of assoc %d",assocId);
00245             bytesPerAssoc[assocId] = new cOutVector(text);
00246             sprintf(text, "App: EndToEndDelay of assoc %d",assocId);
00247             endToEndDelay[assocId] = new cOutVector(text);
00248 
00249             delete connectInfo;
00250             delete msg;
00251             if ((long) par("numPacketsToSendPerClient") > 0)
00252             {
00253                 ServerAssocStatMap::iterator i = serverAssocStatMap.find(assocId);
00254                 numRequestsToSend = i->second.sentPackets;
00255                 if ((simtime_t)par("thinkTime") > 0)
00256                 {
00257                     generateAndSend();
00258                     timeoutMsg->setKind(SCTP_C_SEND);
00259                     scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeoutMsg);
00260                     numRequestsToSend--;
00261                     i->second.sentPackets = numRequestsToSend;
00262                 }
00263                 else
00264                 {
00265                     if (queueSize==0)
00266                     {
00267                         while (numRequestsToSend > 0)
00268                         {
00269                             generateAndSend();
00270                             numRequestsToSend--;
00271                             i->second.sentPackets = numRequestsToSend;
00272                         }
00273                     }
00274                     else if (queueSize>0)
00275                     {
00276                         while (numRequestsToSend > 0 && count++ < queueSize*2)
00277                         {
00278                             generateAndSend();
00279                             numRequestsToSend--;
00280                             i->second.sentPackets = numRequestsToSend;
00281                         }
00282 
00283                         cPacket* cmsg = new cPacket("Queue");
00284                         SCTPInfo* qinfo = new SCTPInfo("Info1");
00285                         qinfo->setText(queueSize);
00286                         cmsg->setKind(SCTP_C_QUEUE_MSGS_LIMIT);
00287                         qinfo->setAssocId(id);
00288                         cmsg->setControlInfo(qinfo);
00289                         sendOrSchedule(cmsg);
00290                     }
00291                     ServerAssocStatMap::iterator j=serverAssocStatMap.find(assocId);
00292                     if (j->second.rcvdPackets == 0 && (simtime_t)par("waitToClose")>0)
00293                     {
00294                         char as[5];
00295                         sprintf(as, "%d",assocId);
00296                         cPacket* abortMsg = new cPacket(as);
00297                         abortMsg->setKind(SCTP_I_ABORT);
00298                         scheduleAt(simulation.getSimTime()+(simtime_t)par("waitToClose"), abortMsg);
00299                     }
00300                     else
00301                     {
00302                         sctpEV3<<"no more packets to send, call shutdown for assoc "<<assocId<<"\n";
00303                         cPacket* cmsg = new cPacket("ShutdownRequest");
00304                         SCTPCommand* cmd = new SCTPCommand("Send5");
00305                         cmsg->setKind(SCTP_C_SHUTDOWN);
00306                         cmd->setAssocId(assocId);
00307                         cmsg->setControlInfo(cmd);
00308                         sendOrSchedule(cmsg);
00309                     }
00310                 }
00311             }
00312             break;
00313         }
00314         case SCTP_I_DATA_NOTIFICATION:
00315         {
00316             notifications++;
00317 
00318 
00319             if (schedule==false)
00320             {
00321                 if (delayFirstRead>0 && !delayFirstReadTimer->isScheduled())
00322                 {
00323 
00324                     cmsg=makeReceiveRequest(PK(msg));
00325                     scheduleAt(simulation.getSimTime()+delayFirstRead, cmsg);
00326                     scheduleAt(simulation.getSimTime()+delayFirstRead, delayFirstReadTimer);
00327                 }
00328                 else if (readInt && firstData)
00329                 {
00330                     firstData=false;
00331                     cmsg=makeReceiveRequest(PK(msg));
00332                     scheduleAt(simulation.getSimTime()+(simtime_t)par("readingInterval"), delayTimer);
00333                     sendOrSchedule(cmsg);
00334                 }
00335                 else if (delayFirstRead==0 && readInt==false)
00336                 {
00337                     cmsg=makeReceiveRequest(PK(msg));
00338                     sendOrSchedule(cmsg);
00339                 }
00340 
00341             }
00342             else
00343             {
00344                 sctpEV3<<simulation.getSimTime()<<" makeReceiveRequest\n";
00345                 cmsg=makeReceiveRequest(PK(msg));
00346                 sendOrSchedule(cmsg);
00347             }
00348             delete msg;
00349             break;
00350         }
00351         case SCTP_I_DATA:
00352         {
00353             notifications--;
00354             packetsRcvd++;
00355             sctpEV3<<simulation.getSimTime()<<" server: data arrived. "<<packetsRcvd<<" Packets received now\n";
00356             SCTPRcvCommand *ind = check_and_cast<SCTPRcvCommand *>(msg->removeControlInfo());
00357             id = ind->getAssocId();
00358             ServerAssocStatMap::iterator j=serverAssocStatMap.find(id);
00359             BytesPerAssoc::iterator k=bytesPerAssoc.find(id);
00360             if (j->second.rcvdBytes == 0)
00361                 j->second.start = simulation.getSimTime();
00362 
00363             j->second.rcvdBytes+= PK(msg)->getByteLength();
00364             k->second->record(j->second.rcvdBytes);
00365 
00366             if (echoFactor==0)
00367             {
00368                 if ((uint32)par("numPacketsToReceivePerClient")>0)
00369                 {
00370                     j->second.rcvdPackets--;
00371                     SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg);
00372                     EndToEndDelay::iterator m=endToEndDelay.find(id);
00373                     m->second->record(simulation.getSimTime()-smsg->getCreationTime());
00374                     sctpEV3<<"server: Data received. Left packets to receive="<<j->second.rcvdPackets<<"\n";
00375 
00376                     if (j->second.rcvdPackets == 0)
00377                     {
00378                         if (serverAssocStatMap[assocId].peerClosed==true && serverAssocStatMap[assocId].abortSent==false)
00379                         {
00380                             sendOrSchedule(makeAbortNotification(ind));
00381                             serverAssocStatMap[assocId].abortSent = true;
00382                             j->second.stop = simulation.getSimTime();
00383                             j->second.lifeTime = j->second.stop - j->second.start;
00384                             break;
00385                         }
00386                         else
00387                         {
00388                             cPacket* cmsg = new cPacket("Request");
00389                             SCTPInfo* qinfo = new SCTPInfo("Info2");
00390                             cmsg->setKind(SCTP_C_NO_OUTSTANDING);
00391                             qinfo->setAssocId(id);
00392                             cmsg->setControlInfo(qinfo);
00393                             sendOrSchedule(cmsg);
00394                             j->second.stop = simulation.getSimTime();
00395                             j->second.lifeTime = j->second.stop - j->second.start;
00396                         }
00397                     }
00398                 }
00399                 delete msg;
00400             }
00401             else
00402             {
00403                 SCTPSendCommand *cmd = new SCTPSendCommand("Send6");
00404                 cmd->setAssocId(id);
00405                 SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg->dup());
00406                 EndToEndDelay::iterator n=endToEndDelay.find(id);
00407                 n->second->record(simulation.getSimTime()-smsg->getCreationTime());
00408                 cPacket* cmsg = new cPacket("SVData");
00409                 bytesSent+=smsg->getBitLength()/8;
00410                 cmd->setSendUnordered(cmd->getSendUnordered());
00411                 lastStream=(lastStream+1)%outboundStreams;
00412                 cmd->setSid(lastStream);
00413                 cmd->setLast(true);
00414                 cmsg->encapsulate(smsg);
00415                 cmsg->setKind(SCTP_C_SEND);
00416                 cmsg->setControlInfo(cmd);
00417                 packetsSent++;
00418                 delete msg;
00419                 sendOrSchedule(cmsg);
00420             }
00421             delete ind;
00422             break;
00423         }
00424         case SCTP_I_SHUTDOWN_RECEIVED:
00425         {
00426             SCTPCommand *command = check_and_cast<SCTPCommand *>(msg->removeControlInfo());
00427             id = command->getAssocId();
00428             sctpEV3<<"server: SCTP_I_SHUTDOWN_RECEIVED for assoc "<<id<<"\n";
00429             ServerAssocStatMap::iterator i=serverAssocStatMap.find(id);
00430             if (i->second.sentPackets == 0 || (long) par("numPacketsToSendPerClient")==0)
00431             {
00432                 cPacket* cmsg = new cPacket("Request");
00433                 SCTPInfo* qinfo = new SCTPInfo("Info3");
00434                 cmsg->setKind(SCTP_C_NO_OUTSTANDING);
00435                 qinfo->setAssocId(id);
00436                 cmsg->setControlInfo(qinfo);
00437                 sendOrSchedule(cmsg);
00438                 i->second.stop = simulation.getSimTime();
00439                 i->second.lifeTime = i->second.stop - i->second.start;
00440             }
00441             delete command;
00442             shutdownReceived = true;
00443             delete msg;
00444             break;
00445         }
00446         case SCTP_I_CLOSED:
00447             if (delayTimer->isScheduled())
00448                 cancelEvent(delayTimer);
00449             if (finishEndsSimulation) {
00450                 endSimulation();
00451             }
00452             delete msg;
00453         break;
00454         default: delete msg;
00455     }
00456     }
00457 }
00458 
00459 void SCTPServer::handleTimer(cMessage *msg)
00460 {
00461     cPacket* cmsg;
00462     SCTPCommand* cmd;
00463     int32 id;
00464     double tempInterval;
00465 
00466     if (msg==delayTimer)
00467     {
00468         ServerAssocStatMap::iterator i=serverAssocStatMap.find(assocId);
00469         sctpEV3<<simulation.getSimTime()<<" delayTimer expired\n";
00470         sendOrSchedule(makeDefaultReceive());
00471         scheduleAt(simulation.getSimTime()+(double)par("readingInterval"), delayTimer);
00472         return;
00473     }
00474     else if (msg==delayFirstReadTimer)
00475     {
00476         delayFirstRead = 0;
00477 
00478         if (readInt && !delayTimer->isScheduled())
00479         {
00480             tempInterval = (double)par("readingInterval");
00481             scheduleAt(simulation.getSimTime()+(simtime_t)tempInterval, delayTimer);
00482             scheduleAt(simulation.getSimTime()+(simtime_t)tempInterval, makeDefaultReceive());
00483         }
00484         return;
00485     }
00486 
00487     switch (msg->getKind())
00488     {
00489     case SCTP_C_SEND:
00490         if (numRequestsToSend>0)
00491         {
00492             generateAndSend();
00493             if ((simtime_t)par("thinkTime") > 0)
00494                 scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeoutMsg);
00495             numRequestsToSend--;
00496         }
00497         break;
00498     case SCTP_I_ABORT:
00499 
00500         cmsg = new cPacket("CLOSE", SCTP_C_CLOSE);
00501         cmd = new SCTPCommand("Send6");
00502         id = atoi(msg->getName());
00503               cmd->setAssocId(id);
00504         cmsg->setControlInfo(cmd);
00505         sendOrSchedule(cmsg);
00506         break;
00507     case SCTP_C_RECEIVE:
00508         sctpEV3<<simulation.getSimTime()<<" SCTPServer:SCTP_C_RECEIVE\n";
00509         if (readInt || delayFirstRead > 0)
00510             schedule = false;
00511         else
00512             schedule = true;
00513         sendOrSchedule(PK(msg));
00514         break;
00515     default:
00516 
00517         sctpEV3<<"MsgKind ="<<msg->getKind()<<" unknown\n";
00518 
00519         break;
00520     }
00521 }
00522 
00523 void SCTPServer::finish()
00524 {
00525     delete timeoutMsg;
00526     if (delayTimer->isScheduled())
00527         cancelEvent(delayTimer);
00528     delete delayTimer;
00529     delete delayFirstReadTimer;
00530 
00531         ev << getFullPath() << ": opened " << numSessions << " sessions\n";
00532     ev << getFullPath() << ": sent " << bytesSent << " bytes in " << packetsSent << " packets\n";
00533     for (ServerAssocStatMap::iterator l=serverAssocStatMap.begin(); l!=serverAssocStatMap.end(); l++)
00534     {
00535         ev << getFullPath() << " Assoc: "<<l->first<<"\n";
00536         ev << "\tstart time: "<<l->second.start <<"\n";
00537         ev << "\tstop time: "<<l->second.stop <<"\n";
00538         ev << "\tlife time: "<<l->second.lifeTime <<"\n";
00539         ev << "\treceived bytes:" << l->second.rcvdBytes << "\n";
00540         ev << "\tthroughput: "<<(l->second.rcvdBytes / l->second.lifeTime.dbl())*8 <<" bit/sec\n";
00541         recordScalar("bytes rcvd", l->second.rcvdBytes);
00542         recordScalar("throughput", (l->second.rcvdBytes / l->second.lifeTime.dbl())*8);
00543 
00544     }
00545     ev << getFullPath() << "Over all " << packetsRcvd << " packets received\n ";
00546     ev << getFullPath() << "Over all " << notifications << " notifications received\n ";
00547 
00548     BytesPerAssoc::iterator j;
00549     while ((j = bytesPerAssoc.begin())!= bytesPerAssoc.end())
00550     {
00551         delete j->second;
00552         bytesPerAssoc.erase(j);
00553     }
00554     EndToEndDelay::iterator k;
00555     while ((k = endToEndDelay.begin())!= endToEndDelay.end())
00556     {
00557         delete k->second;
00558         endToEndDelay.erase(k);
00559     }
00560     serverAssocStatMap.clear();
00561     sctpEV3<<"Server finished\n";
00562 }
00563 
00564 SCTPServer::~SCTPServer()
00565 {
00566     delete socket;
00567 }