Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "SCTPServer.h"
00021 #include "SCTPSocket.h"
00022 #include "SCTPCommand_m.h"
00023 #include "SCTPMessage_m.h"
00024 #include <stdlib.h>
00025 #include <stdio.h>
00026 #include "SCTPAssociation.h"
00027
00028 #define MSGKIND_CONNECT 0
00029 #define MSGKIND_SEND 1
00030 #define MSGKIND_ 2
00031
00032 Define_Module(SCTPServer);
00033
00034 void SCTPServer::initialize()
00035 {
00036 char * token;
00037 cPar *delT;
00038 AddressVector addresses;
00039 socket = NULL;
00040 sctpEV3<<"initialize SCTP Server\n";
00041 numSessions = packetsSent = packetsRcvd = bytesSent = notifications = 0;
00042 WATCH(numSessions);
00043 WATCH(packetsSent);
00044 WATCH(packetsRcvd);
00045 WATCH(bytesSent);
00046 WATCH(numRequestsToSend);
00047
00048 finishEndsSimulation = (bool)par("finishEndsSimulation");
00049 const char* address = par("address");
00050 token = strtok((char*)address,",");
00051 while (token != NULL)
00052 {
00053 addresses.push_back(IPvXAddress(token));
00054 token = strtok(NULL, ",");
00055 }
00056 int32 port = par("port");
00057 echoFactor = par("echoFactor");
00058 delay = par("echoDelay");
00059 delayFirstRead = par("delayFirstRead");
00060 delT = &par("readingInterval");
00061 if (delT->isNumeric() && (double)*delT==0)
00062 readInt=false;
00063 else
00064 readInt=true;
00065 int32 messagesToPush = par("messagesToPush");
00066 inboundStreams = par("inboundStreams");
00067 outboundStreams = par("outboundStreams");
00068 ordered = (bool)par("ordered");
00069 queueSize = par("queueSize");
00070 lastStream = 0;
00071
00072
00073 timeoutMsg = new cMessage("SrvAppTimer");
00074 delayTimer = new cMessage("delayTimer");
00075 delayTimer->setContextPointer(this);
00076 delayFirstReadTimer = new cMessage("delayFirstReadTimer");
00077 firstData = true;
00078 SCTPSocket *socket = new SCTPSocket();
00079 socket->setOutputGate(gate("sctpOut"));
00080 socket->setInboundStreams(inboundStreams);
00081 socket->setOutboundStreams(outboundStreams);
00082 if (strcmp(address,"")==0)
00083 socket->bind(port);
00084 else
00085 {
00086 socket->bindx(addresses, port);
00087 }
00088 socket->listen(true, par("numPacketsToSendPerClient"), messagesToPush);
00089 sctpEV3<<"SCTPServer::initialized listen port="<<port<<"\n";
00090 schedule = false;
00091 shutdownReceived = false;
00092 }
00093
00094 void SCTPServer::sendOrSchedule(cPacket *msg)
00095 {
00096 if (delay==0)
00097 {
00098 send(msg, "sctpOut");
00099 }
00100 else
00101 {
00102 scheduleAt(simulation.getSimTime()+delay, msg);
00103 }
00104 }
00105
00106 void SCTPServer::generateAndSend()
00107 {
00108 uint32 numBytes;
00109
00110 cPacket* cmsg = new cPacket("CMSG");
00111 SCTPSimpleMessage* msg = new SCTPSimpleMessage("Server");
00112 numBytes = (uint32)par("requestLength");
00113 msg->setDataArraySize(numBytes);
00114 for (uint32 i=0; i<numBytes; i++)
00115 {
00116 msg->setData(i, 's');
00117 }
00118 msg->setDataLen(numBytes);
00119 msg->setBitLength(numBytes * 8);
00120 cmsg->encapsulate(msg);
00121 SCTPSendCommand *cmd = new SCTPSendCommand("Send1");
00122 cmd->setAssocId(assocId);
00123 if (ordered)
00124 cmd->setSendUnordered(COMPLETE_MESG_ORDERED);
00125 else
00126 cmd->setSendUnordered(COMPLETE_MESG_UNORDERED);
00127 lastStream=(lastStream+1)%outboundStreams;
00128 cmd->setSid(lastStream);
00129 if (queueSize>0 && numRequestsToSend > 0 && count < queueSize*2)
00130 cmd->setLast(false);
00131 else
00132 cmd->setLast(true);
00133 cmsg->setKind(SCTP_C_SEND);
00134 cmsg->setControlInfo(cmd);
00135 packetsSent++;
00136 bytesSent+=msg->getBitLength()/8;
00137 sendOrSchedule(cmsg);
00138 }
00139
00140 cPacket* SCTPServer::makeReceiveRequest(cPacket* msg)
00141 {
00142 SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->removeControlInfo());
00143 cPacket* cmsg = new cPacket("ReceiveRequest");
00144 SCTPSendCommand *cmd = new SCTPSendCommand("Send2");
00145 cmd->setAssocId(ind->getAssocId());
00146 cmd->setSid(ind->getSid());
00147 cmd->setNumMsgs(ind->getNumMsgs());
00148 cmsg->setKind(SCTP_C_RECEIVE);
00149 cmsg->setControlInfo(cmd);
00150 delete ind;
00151 return cmsg;
00152 }
00153
00154 cPacket* SCTPServer::makeDefaultReceive()
00155 {
00156 cPacket* cmsg = new cPacket("DefaultReceive");
00157 SCTPSendCommand *cmd = new SCTPSendCommand("Send3");
00158 cmd->setAssocId(assocId);
00159 cmd->setSid(0);
00160 cmd->setNumMsgs(1);
00161 cmsg->setKind(SCTP_C_RECEIVE);
00162 cmsg->setControlInfo(cmd);
00163 return cmsg;
00164 }
00165
00166 cPacket* SCTPServer::makeAbortNotification(SCTPCommand* msg)
00167 {
00168 SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg);
00169 cPacket* cmsg = new cPacket("AbortNotification");
00170 SCTPSendCommand *cmd = new SCTPSendCommand("Send4");
00171 assocId = ind->getAssocId();
00172 cmd->setAssocId(assocId);
00173 cmd->setSid(ind->getSid());
00174 cmd->setNumMsgs(ind->getNumMsgs());
00175 cmsg->setControlInfo(cmd);
00176 delete ind;
00177
00178 cmsg->setKind(SCTP_C_ABORT);
00179 return cmsg;
00180 }
00181
00182 void SCTPServer::handleMessage(cMessage *msg)
00183 {
00184 int32 id;
00185 cPacket* cmsg;
00186
00187 if (msg->isSelfMessage())
00188 {
00189
00190 handleTimer(msg);
00191 }
00192 else
00193 {
00194 switch (msg->getKind())
00195 {
00196 case SCTP_I_PEER_CLOSED:
00197 case SCTP_I_ABORT:
00198 {
00199 SCTPCommand *command = dynamic_cast<SCTPCommand *>(msg->removeControlInfo());
00200 assocId = command->getAssocId();
00201 serverAssocStatMap[assocId].peerClosed = true;
00202 if ((long) par("numPacketsToReceivePerClient")==0)
00203 {
00204 if (serverAssocStatMap[assocId].abortSent==false)
00205 {
00206 sendOrSchedule(makeAbortNotification(command->dup()));
00207 serverAssocStatMap[assocId].abortSent = true;
00208 }
00209 }
00210 else
00211 {
00212 if (serverAssocStatMap[assocId].rcvdPackets==(unsigned long) par("numPacketsToReceivePerClient") &&
00213 serverAssocStatMap[assocId].abortSent==false)
00214 {
00215 sendOrSchedule(makeAbortNotification(command->dup()));
00216 serverAssocStatMap[assocId].abortSent = true;
00217 }
00218 }
00219 if (delayTimer->isScheduled())
00220 cancelEvent(delayTimer);
00221 if (delayFirstReadTimer->isScheduled())
00222 cancelEvent(delayFirstReadTimer);
00223 delete command;
00224 delete msg;
00225 break;
00226 }
00227 case SCTP_I_ESTABLISHED:
00228 {
00229 count=0;
00230 SCTPConnectInfo *connectInfo = dynamic_cast<SCTPConnectInfo *>(msg->removeControlInfo());
00231 numSessions++;
00232 assocId = connectInfo->getAssocId();
00233 inboundStreams = connectInfo->getInboundStreams();
00234 outboundStreams = connectInfo->getOutboundStreams();
00235 serverAssocStatMap[assocId].rcvdPackets= (long) par("numPacketsToReceivePerClient");
00236 serverAssocStatMap[assocId].sentPackets= (long) par("numPacketsToSendPerClient");
00237 serverAssocStatMap[assocId].rcvdBytes=0;
00238 serverAssocStatMap[assocId].start=0;
00239 serverAssocStatMap[assocId].stop=0;
00240 serverAssocStatMap[assocId].lifeTime=0;
00241 serverAssocStatMap[assocId].abortSent=false;
00242 serverAssocStatMap[assocId].peerClosed = false;
00243 char text[30];
00244 sprintf(text, "App: Received Bytes of assoc %d",assocId);
00245 bytesPerAssoc[assocId] = new cOutVector(text);
00246 sprintf(text, "App: EndToEndDelay of assoc %d",assocId);
00247 endToEndDelay[assocId] = new cOutVector(text);
00248
00249 delete connectInfo;
00250 delete msg;
00251 if ((long) par("numPacketsToSendPerClient") > 0)
00252 {
00253 ServerAssocStatMap::iterator i = serverAssocStatMap.find(assocId);
00254 numRequestsToSend = i->second.sentPackets;
00255 if ((simtime_t)par("thinkTime") > 0)
00256 {
00257 generateAndSend();
00258 timeoutMsg->setKind(SCTP_C_SEND);
00259 scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeoutMsg);
00260 numRequestsToSend--;
00261 i->second.sentPackets = numRequestsToSend;
00262 }
00263 else
00264 {
00265 if (queueSize==0)
00266 {
00267 while (numRequestsToSend > 0)
00268 {
00269 generateAndSend();
00270 numRequestsToSend--;
00271 i->second.sentPackets = numRequestsToSend;
00272 }
00273 }
00274 else if (queueSize>0)
00275 {
00276 while (numRequestsToSend > 0 && count++ < queueSize*2)
00277 {
00278 generateAndSend();
00279 numRequestsToSend--;
00280 i->second.sentPackets = numRequestsToSend;
00281 }
00282
00283 cPacket* cmsg = new cPacket("Queue");
00284 SCTPInfo* qinfo = new SCTPInfo("Info1");
00285 qinfo->setText(queueSize);
00286 cmsg->setKind(SCTP_C_QUEUE_MSGS_LIMIT);
00287 qinfo->setAssocId(id);
00288 cmsg->setControlInfo(qinfo);
00289 sendOrSchedule(cmsg);
00290 }
00291 ServerAssocStatMap::iterator j=serverAssocStatMap.find(assocId);
00292 if (j->second.rcvdPackets == 0 && (simtime_t)par("waitToClose")>0)
00293 {
00294 char as[5];
00295 sprintf(as, "%d",assocId);
00296 cPacket* abortMsg = new cPacket(as);
00297 abortMsg->setKind(SCTP_I_ABORT);
00298 scheduleAt(simulation.getSimTime()+(simtime_t)par("waitToClose"), abortMsg);
00299 }
00300 else
00301 {
00302 sctpEV3<<"no more packets to send, call shutdown for assoc "<<assocId<<"\n";
00303 cPacket* cmsg = new cPacket("ShutdownRequest");
00304 SCTPCommand* cmd = new SCTPCommand("Send5");
00305 cmsg->setKind(SCTP_C_SHUTDOWN);
00306 cmd->setAssocId(assocId);
00307 cmsg->setControlInfo(cmd);
00308 sendOrSchedule(cmsg);
00309 }
00310 }
00311 }
00312 break;
00313 }
00314 case SCTP_I_DATA_NOTIFICATION:
00315 {
00316 notifications++;
00317
00318
00319 if (schedule==false)
00320 {
00321 if (delayFirstRead>0 && !delayFirstReadTimer->isScheduled())
00322 {
00323
00324 cmsg=makeReceiveRequest(PK(msg));
00325 scheduleAt(simulation.getSimTime()+delayFirstRead, cmsg);
00326 scheduleAt(simulation.getSimTime()+delayFirstRead, delayFirstReadTimer);
00327 }
00328 else if (readInt && firstData)
00329 {
00330 firstData=false;
00331 cmsg=makeReceiveRequest(PK(msg));
00332 scheduleAt(simulation.getSimTime()+(simtime_t)par("readingInterval"), delayTimer);
00333 sendOrSchedule(cmsg);
00334 }
00335 else if (delayFirstRead==0 && readInt==false)
00336 {
00337 cmsg=makeReceiveRequest(PK(msg));
00338 sendOrSchedule(cmsg);
00339 }
00340
00341 }
00342 else
00343 {
00344 sctpEV3<<simulation.getSimTime()<<" makeReceiveRequest\n";
00345 cmsg=makeReceiveRequest(PK(msg));
00346 sendOrSchedule(cmsg);
00347 }
00348 delete msg;
00349 break;
00350 }
00351 case SCTP_I_DATA:
00352 {
00353 notifications--;
00354 packetsRcvd++;
00355 sctpEV3<<simulation.getSimTime()<<" server: data arrived. "<<packetsRcvd<<" Packets received now\n";
00356 SCTPRcvCommand *ind = check_and_cast<SCTPRcvCommand *>(msg->removeControlInfo());
00357 id = ind->getAssocId();
00358 ServerAssocStatMap::iterator j=serverAssocStatMap.find(id);
00359 BytesPerAssoc::iterator k=bytesPerAssoc.find(id);
00360 if (j->second.rcvdBytes == 0)
00361 j->second.start = simulation.getSimTime();
00362
00363 j->second.rcvdBytes+= PK(msg)->getByteLength();
00364 k->second->record(j->second.rcvdBytes);
00365
00366 if (echoFactor==0)
00367 {
00368 if ((uint32)par("numPacketsToReceivePerClient")>0)
00369 {
00370 j->second.rcvdPackets--;
00371 SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg);
00372 EndToEndDelay::iterator m=endToEndDelay.find(id);
00373 m->second->record(simulation.getSimTime()-smsg->getCreationTime());
00374 sctpEV3<<"server: Data received. Left packets to receive="<<j->second.rcvdPackets<<"\n";
00375
00376 if (j->second.rcvdPackets == 0)
00377 {
00378 if (serverAssocStatMap[assocId].peerClosed==true && serverAssocStatMap[assocId].abortSent==false)
00379 {
00380 sendOrSchedule(makeAbortNotification(ind));
00381 serverAssocStatMap[assocId].abortSent = true;
00382 j->second.stop = simulation.getSimTime();
00383 j->second.lifeTime = j->second.stop - j->second.start;
00384 break;
00385 }
00386 else
00387 {
00388 cPacket* cmsg = new cPacket("Request");
00389 SCTPInfo* qinfo = new SCTPInfo("Info2");
00390 cmsg->setKind(SCTP_C_NO_OUTSTANDING);
00391 qinfo->setAssocId(id);
00392 cmsg->setControlInfo(qinfo);
00393 sendOrSchedule(cmsg);
00394 j->second.stop = simulation.getSimTime();
00395 j->second.lifeTime = j->second.stop - j->second.start;
00396 }
00397 }
00398 }
00399 delete msg;
00400 }
00401 else
00402 {
00403 SCTPSendCommand *cmd = new SCTPSendCommand("Send6");
00404 cmd->setAssocId(id);
00405 SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg->dup());
00406 EndToEndDelay::iterator n=endToEndDelay.find(id);
00407 n->second->record(simulation.getSimTime()-smsg->getCreationTime());
00408 cPacket* cmsg = new cPacket("SVData");
00409 bytesSent+=smsg->getBitLength()/8;
00410 cmd->setSendUnordered(cmd->getSendUnordered());
00411 lastStream=(lastStream+1)%outboundStreams;
00412 cmd->setSid(lastStream);
00413 cmd->setLast(true);
00414 cmsg->encapsulate(smsg);
00415 cmsg->setKind(SCTP_C_SEND);
00416 cmsg->setControlInfo(cmd);
00417 packetsSent++;
00418 delete msg;
00419 sendOrSchedule(cmsg);
00420 }
00421 delete ind;
00422 break;
00423 }
00424 case SCTP_I_SHUTDOWN_RECEIVED:
00425 {
00426 SCTPCommand *command = check_and_cast<SCTPCommand *>(msg->removeControlInfo());
00427 id = command->getAssocId();
00428 sctpEV3<<"server: SCTP_I_SHUTDOWN_RECEIVED for assoc "<<id<<"\n";
00429 ServerAssocStatMap::iterator i=serverAssocStatMap.find(id);
00430 if (i->second.sentPackets == 0 || (long) par("numPacketsToSendPerClient")==0)
00431 {
00432 cPacket* cmsg = new cPacket("Request");
00433 SCTPInfo* qinfo = new SCTPInfo("Info3");
00434 cmsg->setKind(SCTP_C_NO_OUTSTANDING);
00435 qinfo->setAssocId(id);
00436 cmsg->setControlInfo(qinfo);
00437 sendOrSchedule(cmsg);
00438 i->second.stop = simulation.getSimTime();
00439 i->second.lifeTime = i->second.stop - i->second.start;
00440 }
00441 delete command;
00442 shutdownReceived = true;
00443 delete msg;
00444 break;
00445 }
00446 case SCTP_I_CLOSED:
00447 if (delayTimer->isScheduled())
00448 cancelEvent(delayTimer);
00449 if (finishEndsSimulation) {
00450 endSimulation();
00451 }
00452 delete msg;
00453 break;
00454 default: delete msg;
00455 }
00456 }
00457 }
00458
00459 void SCTPServer::handleTimer(cMessage *msg)
00460 {
00461 cPacket* cmsg;
00462 SCTPCommand* cmd;
00463 int32 id;
00464 double tempInterval;
00465
00466 if (msg==delayTimer)
00467 {
00468 ServerAssocStatMap::iterator i=serverAssocStatMap.find(assocId);
00469 sctpEV3<<simulation.getSimTime()<<" delayTimer expired\n";
00470 sendOrSchedule(makeDefaultReceive());
00471 scheduleAt(simulation.getSimTime()+(double)par("readingInterval"), delayTimer);
00472 return;
00473 }
00474 else if (msg==delayFirstReadTimer)
00475 {
00476 delayFirstRead = 0;
00477
00478 if (readInt && !delayTimer->isScheduled())
00479 {
00480 tempInterval = (double)par("readingInterval");
00481 scheduleAt(simulation.getSimTime()+(simtime_t)tempInterval, delayTimer);
00482 scheduleAt(simulation.getSimTime()+(simtime_t)tempInterval, makeDefaultReceive());
00483 }
00484 return;
00485 }
00486
00487 switch (msg->getKind())
00488 {
00489 case SCTP_C_SEND:
00490 if (numRequestsToSend>0)
00491 {
00492 generateAndSend();
00493 if ((simtime_t)par("thinkTime") > 0)
00494 scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeoutMsg);
00495 numRequestsToSend--;
00496 }
00497 break;
00498 case SCTP_I_ABORT:
00499
00500 cmsg = new cPacket("CLOSE", SCTP_C_CLOSE);
00501 cmd = new SCTPCommand("Send6");
00502 id = atoi(msg->getName());
00503 cmd->setAssocId(id);
00504 cmsg->setControlInfo(cmd);
00505 sendOrSchedule(cmsg);
00506 break;
00507 case SCTP_C_RECEIVE:
00508 sctpEV3<<simulation.getSimTime()<<" SCTPServer:SCTP_C_RECEIVE\n";
00509 if (readInt || delayFirstRead > 0)
00510 schedule = false;
00511 else
00512 schedule = true;
00513 sendOrSchedule(PK(msg));
00514 break;
00515 default:
00516
00517 sctpEV3<<"MsgKind ="<<msg->getKind()<<" unknown\n";
00518
00519 break;
00520 }
00521 }
00522
00523 void SCTPServer::finish()
00524 {
00525 delete timeoutMsg;
00526 if (delayTimer->isScheduled())
00527 cancelEvent(delayTimer);
00528 delete delayTimer;
00529 delete delayFirstReadTimer;
00530
00531 ev << getFullPath() << ": opened " << numSessions << " sessions\n";
00532 ev << getFullPath() << ": sent " << bytesSent << " bytes in " << packetsSent << " packets\n";
00533 for (ServerAssocStatMap::iterator l=serverAssocStatMap.begin(); l!=serverAssocStatMap.end(); l++)
00534 {
00535 ev << getFullPath() << " Assoc: "<<l->first<<"\n";
00536 ev << "\tstart time: "<<l->second.start <<"\n";
00537 ev << "\tstop time: "<<l->second.stop <<"\n";
00538 ev << "\tlife time: "<<l->second.lifeTime <<"\n";
00539 ev << "\treceived bytes:" << l->second.rcvdBytes << "\n";
00540 ev << "\tthroughput: "<<(l->second.rcvdBytes / l->second.lifeTime.dbl())*8 <<" bit/sec\n";
00541 recordScalar("bytes rcvd", l->second.rcvdBytes);
00542 recordScalar("throughput", (l->second.rcvdBytes / l->second.lifeTime.dbl())*8);
00543
00544 }
00545 ev << getFullPath() << "Over all " << packetsRcvd << " packets received\n ";
00546 ev << getFullPath() << "Over all " << notifications << " notifications received\n ";
00547
00548 BytesPerAssoc::iterator j;
00549 while ((j = bytesPerAssoc.begin())!= bytesPerAssoc.end())
00550 {
00551 delete j->second;
00552 bytesPerAssoc.erase(j);
00553 }
00554 EndToEndDelay::iterator k;
00555 while ((k = endToEndDelay.begin())!= endToEndDelay.end())
00556 {
00557 delete k->second;
00558 endToEndDelay.erase(k);
00559 }
00560 serverAssocStatMap.clear();
00561 sctpEV3<<"Server finished\n";
00562 }
00563
00564 SCTPServer::~SCTPServer()
00565 {
00566 delete socket;
00567 }