00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include "SCTPPeer.h"
00020 #include "SCTPSocket.h"
00021 #include "SCTPCommand_m.h"
00022 #include "SCTPMessage_m.h"
00023 #include <stdlib.h>
00024 #include <stdio.h>
00025 #include "SCTPAssociation.h"
00026 #include "IPAddressResolver.h"
00027
00028 #define MSGKIND_CONNECT 0
00029 #define MSGKIND_SEND 1
00030 #define MSGKIND_ABORT 2
00031 #define MSGKIND_PRIMARY 3
00032 #define MSGKIND_STOP 5
00033
00034 Define_Module(SCTPPeer);
00035
00036 void SCTPPeer::initialize()
00037 {
00038 char * token;
00039 AddressVector addresses;
00040
00041 numSessions = packetsSent = packetsRcvd = bytesSent = notifications = 0;
00042 WATCH(numSessions);
00043 WATCH(packetsSent);
00044 WATCH(packetsRcvd);
00045 WATCH(bytesSent);
00046 WATCH(numRequestsToSend);
00047
00048 const char* address = par("address");
00049
00050 token = strtok((char*)address,",");
00051 while (token != NULL)
00052 {
00053 addresses.push_back(IPvXAddress(token));
00054 token = strtok(NULL, ",");
00055 }
00056 int32 port = par("port");
00057 echoFactor = par("echoFactor");
00058 delay = par("echoDelay");
00059 outboundStreams = par("outboundStreams");
00060 ordered = (bool)par("ordered");
00061 queueSize = par("queueSize");
00062 lastStream = 0;
00063 timeoutMsg = new cMessage("SrvAppTimer");
00064 SCTPSocket* socket = new SCTPSocket();
00065 socket->setOutputGate(gate("sctpOut"));
00066 socket->setOutboundStreams(outboundStreams);
00067 if (strcmp(address,"")==0)
00068 {
00069 socket->bind(port);
00070 clientSocket.bind(port);
00071 }
00072 else
00073 {
00074 socket->bindx(addresses, port);
00075 clientSocket.bindx(addresses, port);
00076 }
00077 socket->listen(true, par("numPacketsToSendPerClient"));
00078 sctpEV3<<"SCTPPeer::initialized listen port="<<port<<"\n";
00079 clientSocket.setCallbackObject(this);
00080 clientSocket.setOutputGate(gate("sctpOut"));
00081
00082 if ((simtime_t)par("startTime")>0)
00083 {
00084 connectTimer = new cMessage("ConnectTimer");
00085 connectTimer->setKind(MSGKIND_CONNECT);
00086 scheduleAt((simtime_t)par("startTime"), connectTimer);
00087 }
00088 schedule = false;
00089 shutdownReceived = false;
00090 sendAllowed = true;
00091 }
00092
00093 void SCTPPeer::sendOrSchedule(cPacket *msg)
00094 {
00095 if (delay==0)
00096 {
00097 send(msg, "sctpOut");
00098 }
00099 else
00100 {
00101 scheduleAt(simulation.getSimTime()+delay, msg);
00102 }
00103 }
00104
00105 void SCTPPeer::generateAndSend(SCTPConnectInfo *connectInfo)
00106 {
00107 uint32 numBytes;
00108 cPacket* cmsg = new cPacket("CMSG");
00109 SCTPSimpleMessage* msg=new SCTPSimpleMessage("Server");
00110 numBytes=(long)par("requestLength");
00111 msg->setDataArraySize(numBytes);
00112 for (uint32 i=0; i<numBytes; i++)
00113 {
00114 msg->setData(i, 's');
00115 }
00116 msg->setDataLen(numBytes);
00117 msg->setByteLength(numBytes);
00118 cmsg->encapsulate(msg);
00119 SCTPSendCommand *cmd = new SCTPSendCommand();
00120 cmd->setAssocId(serverAssocId);
00121 if (ordered)
00122 cmd->setSendUnordered(COMPLETE_MESG_ORDERED);
00123 else
00124 cmd->setSendUnordered(COMPLETE_MESG_UNORDERED);
00125 lastStream=(lastStream+1)%outboundStreams;
00126 cmd->setSid(lastStream);
00127 cmd->setLast(true);
00128 cmsg->setKind(SCTP_C_SEND);
00129 cmsg->setControlInfo(cmd);
00130 packetsSent++;
00131 bytesSent+=msg->getBitLength()/8;
00132 sendOrSchedule(cmsg);
00133 }
00134
00135 void SCTPPeer::connect()
00136 {
00137 const char *connectAddress = par("connectAddress");
00138 int32 connectPort = par("connectPort");
00139 uint32 outStreams = par("outboundStreams");
00140 clientSocket.setOutboundStreams(outStreams);
00141
00142 sctpEV3 << "issuing OPEN command\n";
00143 sctpEV3<<"Assoc "<<clientSocket.getConnectionId()<<"::connect to address "<<connectAddress<<", port "<<connectPort<<"\n";
00144 numSessions++;
00145 clientSocket.connect(IPAddressResolver().resolve(connectAddress, 1), connectPort, (uint32)par("numRequestsPerSession"));
00146
00147 }
00148
00149 void SCTPPeer::handleMessage(cMessage *msg)
00150 {
00151 int32 id;
00152
00153 if (msg->isSelfMessage())
00154 {
00155
00156 handleTimer(msg);
00157 }
00158 switch (msg->getKind())
00159 {
00160 case SCTP_I_PEER_CLOSED:
00161 case SCTP_I_ABORT:
00162 {
00163 SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->getControlInfo()->dup());
00164 cPacket* cmsg = new cPacket("Notification");
00165 SCTPSendCommand *cmd = new SCTPSendCommand();
00166 id = ind->getAssocId();
00167 cmd->setAssocId(id);
00168 cmd->setSid(ind->getSid());
00169 cmd->setNumMsgs(ind->getNumMsgs());
00170 cmsg->setControlInfo(cmd);
00171 delete ind;
00172 delete msg;
00173 cmsg->setKind(SCTP_C_ABORT);
00174 sendOrSchedule(cmsg);
00175 break;
00176 }
00177 case SCTP_I_ESTABLISHED:
00178 {
00179 if (clientSocket.getState()==SCTPSocket::CONNECTING)
00180 clientSocket.processMessage(PK(msg));
00181 else
00182 {
00183 int32 count=0;
00184 SCTPConnectInfo *connectInfo = dynamic_cast<SCTPConnectInfo *>(msg->removeControlInfo());
00185 numSessions++;
00186 serverAssocId = connectInfo->getAssocId();
00187 id = serverAssocId;
00188 outboundStreams = connectInfo->getOutboundStreams();
00189 rcvdPacketsPerAssoc[serverAssocId]= (long) par("numPacketsToReceivePerClient");
00190 sentPacketsPerAssoc[serverAssocId]= (long) par("numPacketsToSendPerClient");
00191 char text[30];
00192 sprintf(text, "App: Received Bytes of assoc %d",serverAssocId);
00193 bytesPerAssoc[serverAssocId] = new cOutVector(text);
00194 rcvdBytesPerAssoc[serverAssocId]= 0;
00195 sprintf(text, "App: EndToEndDelay of assoc %d",serverAssocId);
00196 endToEndDelay[serverAssocId] = new cOutVector(text);
00197 sprintf(text, "Hist: EndToEndDelay of assoc %d",serverAssocId);
00198 histEndToEndDelay[serverAssocId] = new cDoubleHistogram(text);
00199
00200
00201 delete msg;
00202 if ((long) par("numPacketsToSendPerClient") > 0)
00203 {
00204 SentPacketsPerAssoc::iterator i=sentPacketsPerAssoc.find(serverAssocId);
00205 numRequestsToSend = i->second;
00206 if ((simtime_t)par("thinkTime") > 0)
00207 {
00208 generateAndSend(connectInfo);
00209 timeoutMsg->setKind(SCTP_C_SEND);
00210 scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeoutMsg);
00211 numRequestsToSend--;
00212 i->second = numRequestsToSend;
00213 }
00214 else
00215 {
00216 if (queueSize==0)
00217 {
00218 while (numRequestsToSend > 0)
00219 {
00220 generateAndSend(connectInfo);
00221 numRequestsToSend--;
00222 i->second = numRequestsToSend;
00223 }
00224 }
00225 else if (queueSize>0)
00226 {
00227 while (numRequestsToSend > 0 && count++ < queueSize*2)
00228 {
00229 generateAndSend(connectInfo);
00230 numRequestsToSend--;
00231 i->second = numRequestsToSend;
00232 }
00233
00234 cPacket* cmsg = new cPacket("Queue");
00235 SCTPInfo* qinfo = new SCTPInfo();
00236 qinfo->setText(queueSize);
00237 cmsg->setKind(SCTP_C_QUEUE_MSGS_LIMIT);
00238 qinfo->setAssocId(id);
00239 cmsg->setControlInfo(qinfo);
00240 sendOrSchedule(cmsg);
00241 }
00242
00243 sctpEV3<<"!!!!!!!!!!!!!!!All data sent from Server !!!!!!!!!!\n";
00244
00245 RcvdPacketsPerAssoc::iterator j=rcvdPacketsPerAssoc.find(serverAssocId);
00246 if (j->second == 0 && (simtime_t)par("waitToClose")>0)
00247 {
00248 char as[5];
00249 sprintf(as, "%d",serverAssocId);
00250 cMessage* abortMsg = new cMessage(as);
00251 abortMsg->setKind(SCTP_I_ABORT);
00252 scheduleAt(simulation.getSimTime()+(simtime_t)par("waitToClose"), abortMsg);
00253 }
00254 else
00255 {
00256 sctpEV3<<"no more packets to send, call shutdown for assoc "<<serverAssocId<<"\n";
00257 cPacket* cmsg = new cPacket("ShutdownRequest");
00258 SCTPCommand* cmd = new SCTPCommand();
00259 cmsg->setKind(SCTP_C_SHUTDOWN);
00260 cmd->setAssocId(serverAssocId);
00261 cmsg->setControlInfo(cmd);
00262 sendOrSchedule(cmsg);
00263 }
00264 }
00265 }
00266 }
00267 break;
00268 }
00269 case SCTP_I_DATA_NOTIFICATION:
00270 {
00271 notifications++;
00272 SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->removeControlInfo());
00273 cPacket* cmsg = new cPacket("Notification");
00274 SCTPSendCommand *cmd = new SCTPSendCommand();
00275 id = ind->getAssocId();
00276 cmd->setAssocId(id);
00277 cmd->setSid(ind->getSid());
00278 cmd->setNumMsgs(ind->getNumMsgs());
00279 cmsg->setKind(SCTP_C_RECEIVE);
00280 cmsg->setControlInfo(cmd);
00281 delete ind;
00282 delete msg;
00283 if (!cmsg->isScheduled() && schedule==false)
00284 {
00285 scheduleAt(simulation.getSimTime()+(simtime_t)par("delayFirstRead"), cmsg);
00286 }
00287 else if (schedule==true)
00288 sendOrSchedule(cmsg);
00289 break;
00290 }
00291 case SCTP_I_DATA:
00292 {
00293 SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->getControlInfo());
00294 id = ind->getAssocId();
00295 RcvdBytesPerAssoc::iterator j=rcvdBytesPerAssoc.find(id);
00296 if (j==rcvdBytesPerAssoc.end() && (clientSocket.getState()==SCTPSocket::CONNECTED))
00297 clientSocket.processMessage(PK(msg));
00298 else
00299 {
00300 j->second+= PK(msg)->getByteLength();
00301 BytesPerAssoc::iterator k=bytesPerAssoc.find(id);
00302 k->second->record(j->second);
00303 packetsRcvd++;
00304 if (echoFactor==0)
00305 {
00306 if ((long)par("numPacketsToReceivePerClient")>0)
00307 {
00308 RcvdPacketsPerAssoc::iterator i=rcvdPacketsPerAssoc.find(id);
00309 i->second--;
00310 SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg);
00311 EndToEndDelay::iterator j=endToEndDelay.find(id);
00312 j->second->record(simulation.getSimTime()-smsg->getCreationTime());
00313 HistEndToEndDelay::iterator k=histEndToEndDelay.find(id);
00314 k->second->collect(simulation.getSimTime()-smsg->getCreationTime());
00315 if (i->second == 0)
00316 {
00317 cPacket* cmsg = new cPacket("Request");
00318 SCTPInfo* qinfo = new SCTPInfo();
00319 cmsg->setKind(SCTP_C_NO_OUTSTANDING);
00320 qinfo->setAssocId(id);
00321 cmsg->setControlInfo(qinfo);
00322 sendOrSchedule(cmsg);
00323 }
00324 }
00325 delete msg;
00326 }
00327 else
00328 {
00329 SCTPSendCommand *cmd = new SCTPSendCommand();
00330 cmd->setAssocId(id);
00331
00332 SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg->dup());
00333 EndToEndDelay::iterator j=endToEndDelay.find(id);
00334 j->second->record(simulation.getSimTime()-smsg->getCreationTime());
00335 HistEndToEndDelay::iterator k=histEndToEndDelay.find(id);
00336 k->second->collect(simulation.getSimTime()-smsg->getCreationTime());
00337 cPacket* cmsg = new cPacket("SVData");
00338 bytesSent+=smsg->getByteLength();
00339 cmd->setSendUnordered(cmd->getSendUnordered());
00340 lastStream=(lastStream+1)%outboundStreams;
00341 cmd->setSid(lastStream);
00342 cmd->setLast(true);
00343 cmsg->encapsulate(smsg);
00344 cmsg->setKind(SCTP_C_SEND);
00345 cmsg->setControlInfo(cmd);
00346 packetsSent++;
00347 delete msg;
00348 sendOrSchedule(cmsg);
00349 }
00350 }
00351
00352 break;
00353 }
00354 case SCTP_I_SHUTDOWN_RECEIVED:
00355 {
00356 SCTPCommand *command = check_and_cast<SCTPCommand *>(msg->removeControlInfo());
00357 id = command->getAssocId();
00358 sctpEV3<<"server: SCTP_I_SHUTDOWN_RECEIVED for assoc "<<id<<"\n";
00359 RcvdPacketsPerAssoc::iterator i=rcvdPacketsPerAssoc.find(id);
00360 if (i==rcvdPacketsPerAssoc.end()&& (clientSocket.getState()==SCTPSocket::CONNECTED))
00361 clientSocket.processMessage(PK(msg));
00362 else
00363 {
00364 if (i->second == 0)
00365 {
00366 cPacket* cmsg = new cPacket("Request");
00367 SCTPInfo* qinfo = new SCTPInfo();
00368 cmsg->setKind(SCTP_C_NO_OUTSTANDING);
00369 qinfo->setAssocId(id);
00370 cmsg->setControlInfo(qinfo);
00371 sendOrSchedule(cmsg);
00372 }
00373 delete command;
00374 shutdownReceived = true;
00375 }
00376 delete msg;
00377 }
00378 case SCTP_I_CLOSED: delete msg;
00379 break;
00380 }
00381
00382 if (ev.isGUI())
00383 {
00384 char buf[32];
00385 RcvdBytesPerAssoc::iterator l=rcvdBytesPerAssoc.find(id);
00386 sprintf(buf, "rcvd: %ld bytes\nsent: %ld bytes", l->second, bytesSent);
00387 getDisplayString().setTagArg("t",0,buf);
00388 }
00389 }
00390
00391 void SCTPPeer::handleTimer(cMessage *msg)
00392 {
00393 cPacket* cmsg;
00394 SCTPCommand* cmd;
00395 int32 id;
00396
00397
00398 sctpEV3<<"SCTPPeer::handleTimer\n";
00399
00400 SCTPConnectInfo *connectInfo = dynamic_cast<SCTPConnectInfo *>(msg->getControlInfo());
00401 switch (msg->getKind())
00402 {
00403 case MSGKIND_CONNECT:
00404 sctpEV3 << "starting session call connect\n";
00405 connect();
00406 break;
00407 case SCTP_C_SEND:
00408
00409 if (numRequestsToSend>0)
00410 {
00411 generateAndSend(connectInfo);
00412 if ((simtime_t)par("thinkTime") > 0)
00413 scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeoutMsg);
00414 numRequestsToSend--;
00415 }
00416 break;
00417 case SCTP_I_ABORT:
00418
00419 cmsg = new cPacket("CLOSE", SCTP_C_CLOSE);
00420 cmd = new SCTPCommand();
00421 id = atoi(msg->getName());
00422 cmd->setAssocId(id);
00423 cmsg->setControlInfo(cmd);
00424 sendOrSchedule(cmsg);
00425 break;
00426 case SCTP_C_RECEIVE:
00427 schedule = true;
00428 sendOrSchedule(PK(msg));
00429 break;
00430 default:
00431
00432 break;
00433 }
00434 }
00435
00436 void SCTPPeer::socketDataNotificationArrived(int32 connId, void *ptr, cPacket *msg)
00437 {
00438 SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->removeControlInfo());
00439 cPacket* cmsg = new cPacket("CMSG");
00440 SCTPSendCommand *cmd = new SCTPSendCommand();
00441 cmd->setAssocId(ind->getAssocId());
00442 cmd->setSid(ind->getSid());
00443 cmd->setNumMsgs(ind->getNumMsgs());
00444 cmsg->setKind(SCTP_C_RECEIVE);
00445 cmsg->setControlInfo(cmd);
00446 delete ind;
00447 clientSocket.sendNotification(cmsg);
00448 }
00449
00450
00451 void SCTPPeer::socketPeerClosed(int32, void *)
00452 {
00453
00454 if (clientSocket.getState()==SCTPSocket::PEER_CLOSED)
00455 {
00456 ev << "remote SCTP closed, closing here as well\n";
00457 setStatusString("closing");
00458 clientSocket.close();
00459 }
00460 }
00461
00462 void SCTPPeer::socketClosed(int32, void *)
00463 {
00464
00465 ev << "connection closed\n";
00466 setStatusString("closed");
00467 }
00468
00469 void SCTPPeer::socketFailure(int32, void *, int32 code)
00470 {
00471
00472 ev << "connection broken\n";
00473 setStatusString("broken");
00474
00475 timeMsg->setKind(MSGKIND_CONNECT);
00476 scheduleAt(simulation.getSimTime()+(simtime_t)par("reconnectInterval"), timeMsg);
00477 }
00478
00479 void SCTPPeer::socketStatusArrived(int32 assocId, void *yourPtr, SCTPStatusInfo *status)
00480 {
00481 struct pathStatus ps;
00482 SCTPPathStatus::iterator i=sctpPathStatus.find(status->getPathId());
00483 if (i!=sctpPathStatus.end())
00484 {
00485 ps = i->second;
00486 ps.active=status->getActive();
00487 }
00488 else
00489 {
00490 ps.active = status->getActive();
00491 ps.primaryPath = false;
00492 sctpPathStatus[ps.pid]=ps;
00493 }
00494 }
00495
00496 void SCTPPeer::setStatusString(const char *s)
00497 {
00498 if (ev.isGUI()) getDisplayString().setTagArg("t", 0, s);
00499 }
00500
00501 void SCTPPeer::sendRequest(bool last)
00502 {
00503 sctpEV3 << "sending request, " << numRequestsToSend-1 << " more to go\n";
00504 long numBytes = par("requestLength");
00505 if (numBytes < 1)
00506 numBytes=1;
00507
00508 sctpEV3 << "SCTPClient: sending " << numBytes << " data bytes\n";
00509
00510 cPacket* cmsg = new cPacket("AppData");
00511 SCTPSimpleMessage* msg=new SCTPSimpleMessage("data");
00512
00513 msg->setDataArraySize(numBytes);
00514 for (int32 i=0; i<numBytes; i++)
00515 {
00516 msg->setData(i, 'a');
00517 }
00518 msg->setDataLen(numBytes);
00519 msg->setBitLength(numBytes * 8);
00520 msg->setCreationTime(simulation.getSimTime());
00521 cmsg->encapsulate(msg);
00522 if (ordered)
00523 cmsg->setKind(SCTP_C_SEND_ORDERED);
00524 else
00525 cmsg->setKind(SCTP_C_SEND_UNORDERED);
00526
00527 clientSocket.send(cmsg, last);
00528 bytesSent+=numBytes;
00529 }
00530
00531
00532 void SCTPPeer::socketEstablished(int32, void *)
00533 {
00534 int32 count = 0;
00535
00536 ev<<"SCTPClient: connected\n";
00537 setStatusString("connected");
00538
00539 numRequestsToSend = (long) par("numRequestsPerSession");
00540 numPacketsToReceive = (long) par("numPacketsToReceive");
00541 if (numRequestsToSend<1)
00542 numRequestsToSend = 0;
00543
00544 if (numRequestsToSend>0)
00545 {
00546 if ((simtime_t)par("thinkTime") > 0)
00547 {
00548 if (sendAllowed)
00549 {
00550 sendRequest();
00551 numRequestsToSend--;
00552 }
00553 timeMsg->setKind(MSGKIND_SEND);
00554 scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeMsg);
00555
00556 }
00557 else
00558 {
00559 if (queueSize>0)
00560 {
00561 while (numRequestsToSend > 0 && count++ < queueSize*2 && sendAllowed)
00562 {
00563 if (count == queueSize*2)
00564 sendRequest();
00565 else
00566 sendRequest(false);
00567 numRequestsToSend--;
00568 }
00569 if (numRequestsToSend>0 && sendAllowed)
00570 sendQueueRequest();
00571 }
00572 else
00573 {
00574 while (numRequestsToSend > 0 && sendAllowed)
00575 {
00576 sendRequest();
00577 numRequestsToSend--;
00578 }
00579 }
00580
00581 if (numPacketsToReceive == 0 && (simtime_t)par("waitToClose")>0)
00582 {
00583 timeMsg->setKind(MSGKIND_ABORT);
00584 scheduleAt(simulation.getSimTime()+(simtime_t)par("waitToClose"), timeMsg);
00585 }
00586 if (numRequestsToSend == 0 && (simtime_t)par("waitToClose")==0)
00587 {
00588 sctpEV3<<"socketEstablished:no more packets to send, call shutdown\n";
00589 clientSocket.shutdown();
00590 }
00591 }
00592 }
00593 }
00594
00595 void SCTPPeer::sendQueueRequest()
00596 {
00597 cPacket* cmsg = new cPacket("Queue");
00598 SCTPInfo* qinfo = new SCTPInfo();
00599 qinfo->setText(queueSize);
00600 cmsg->setKind(SCTP_C_QUEUE_MSGS_LIMIT);
00601 qinfo->setAssocId(clientSocket.getConnectionId());
00602 cmsg->setControlInfo(qinfo);
00603 clientSocket.sendRequest(cmsg);
00604
00605 }
00606
00607
00608 void SCTPPeer::sendRequestArrived()
00609 {
00610 int32 count = 0;
00611
00612 sctpEV3<<"sendRequestArrived numRequestsToSend="<<numRequestsToSend<<"\n";
00613 while (numRequestsToSend > 0 && count++ < queueSize && sendAllowed)
00614 {
00615 numRequestsToSend--;
00616 if (count == queueSize || numRequestsToSend==0)
00617 sendRequest();
00618 else
00619 sendRequest(false);
00620
00621 if (numRequestsToSend == 0)
00622 {
00623 sctpEV3<<"no more packets to send, call shutdown\n";
00624 clientSocket.shutdown();
00625 }
00626 }
00627
00628
00629 }
00630
00631 void SCTPPeer::socketDataArrived(int32, void *, cPacket *msg, bool)
00632 {
00633
00634 packetsRcvd++;
00635
00636 sctpEV3<<"Client received packet Nr "<<packetsRcvd<<" from SCTP\n";
00637
00638 SCTPCommand* ind = check_and_cast<SCTPCommand*>(msg->getControlInfo());
00639
00640 bytesRcvd+=msg->getByteLength();
00641
00642 if (echoFactor > 0)
00643 {
00644 SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg->dup());
00645 cPacket* cmsg = new cPacket("SVData");
00646 echoedBytesSent+=smsg->getBitLength()/8;
00647 cmsg->encapsulate(smsg);
00648 if (ind->getSendUnordered())
00649 cmsg->setKind(SCTP_C_SEND_UNORDERED);
00650 else
00651 cmsg->setKind(SCTP_C_SEND_ORDERED);
00652 packetsSent++;
00653 delete msg;
00654 clientSocket.send(cmsg,1);
00655 }
00656 if ((long)par("numPacketsToReceive")>0)
00657 {
00658 numPacketsToReceive--;
00659 if (numPacketsToReceive == 0)
00660 {
00661 setStatusString("closing");
00662 clientSocket.close();
00663 }
00664 }
00665 }
00666
00667
00668
00669 void SCTPPeer::shutdownReceivedArrived(int32 connId)
00670 {
00671 if (numRequestsToSend==0)
00672 {
00673 cPacket* cmsg = new cPacket("Request");
00674 SCTPInfo* qinfo = new SCTPInfo();
00675 cmsg->setKind(SCTP_C_NO_OUTSTANDING);
00676 qinfo->setAssocId(connId);
00677 cmsg->setControlInfo(qinfo);
00678 clientSocket.sendNotification(cmsg);
00679 }
00680 }
00681
00682
00683
00684 void SCTPPeer::sendqueueFullArrived(int32 assocId)
00685 {
00686 sendAllowed = false;
00687 }
00688
00689
00690 void SCTPPeer::finish()
00691 {
00692 delete timeoutMsg;
00693 delete connectTimer;
00694 ev << getFullPath() << ": opened " << numSessions << " sessions\n";
00695 ev << getFullPath() << ": sent " << bytesSent << " bytes in " << packetsSent << " packets\n";
00696 for (RcvdBytesPerAssoc::iterator l=rcvdBytesPerAssoc.begin(); l!=rcvdBytesPerAssoc.end(); l++)
00697 {
00698 ev << getFullPath() << ": received " << l->second << " bytes in assoc " << l->first<< "\n";
00699 }
00700 ev << getFullPath() << "Over all " << packetsRcvd << " packets received\n ";
00701 ev << getFullPath() << "Over all " << notifications << " notifications received\n ";
00702 for (BytesPerAssoc::iterator j = bytesPerAssoc.begin(); j!= bytesPerAssoc.end(); j++)
00703 {
00704 delete j->second;
00705 bytesPerAssoc.erase(j);
00706 }
00707 for (EndToEndDelay::iterator k = endToEndDelay.begin(); k!= endToEndDelay.end(); k++)
00708 {
00709 delete k->second;
00710 endToEndDelay.erase(k);
00711 }
00712 for (HistEndToEndDelay::iterator l = histEndToEndDelay.begin(); l!= histEndToEndDelay.end(); l++)
00713 {
00714 delete l->second;
00715 histEndToEndDelay.erase(l);
00716 }
00717 rcvdPacketsPerAssoc.clear();
00718 sentPacketsPerAssoc.clear();
00719 rcvdBytesPerAssoc.clear();
00720 }
00721