SCTPPeer.cc

Go to the documentation of this file.
00001 //
00002 // Copyright (C) 2008 Irene Ruengeler
00003 //
00004 // This program is free software; you can redistribute it and/or
00005 // modify it under the terms of the GNU General Public License
00006 // as published by the Free Software Foundation; either version 2
00007 // of the License, or (at your option) any later version.
00008 //
00009 // This program is distributed in the hope that it will be useful,
00010 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00011 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00012 // GNU General Public License for more details.
00013 //
00014 // You should have received a copy of the GNU General Public License
00015 // along with this program; if not, see <http://www.gnu.org/licenses/>.
00016 //
00017 
00018 
00019 #include "SCTPPeer.h"
00020 #include "SCTPSocket.h"
00021 #include "SCTPCommand_m.h"
00022 #include "SCTPMessage_m.h"
00023 #include <stdlib.h>
00024 #include <stdio.h>
00025 #include "SCTPAssociation.h"
00026 #include "IPAddressResolver.h"
00027 
00028 #define MSGKIND_CONNECT  0
00029 #define MSGKIND_SEND         1
00030 #define MSGKIND_ABORT    2
00031 #define MSGKIND_PRIMARY  3
00032 #define MSGKIND_STOP     5
00033 
00034 Define_Module(SCTPPeer);
00035 
00036 void SCTPPeer::initialize()
00037 {
00038     char * token;
00039     AddressVector addresses;
00040 
00041     numSessions = packetsSent = packetsRcvd = bytesSent = notifications = 0;
00042     WATCH(numSessions);
00043     WATCH(packetsSent);
00044     WATCH(packetsRcvd);
00045     WATCH(bytesSent);
00046     WATCH(numRequestsToSend);
00047     // parameters
00048     const char* address = par("address");
00049 
00050     token = strtok((char*)address,",");
00051     while (token != NULL)
00052     {
00053         addresses.push_back(IPvXAddress(token));
00054         token = strtok(NULL, ",");
00055     }
00056     int32 port = par("port");
00057     echoFactor = par("echoFactor");
00058     delay = par("echoDelay");
00059     outboundStreams = par("outboundStreams");
00060     ordered = (bool)par("ordered");
00061     queueSize = par("queueSize");
00062     lastStream = 0;
00063     timeoutMsg = new cMessage("SrvAppTimer");
00064     SCTPSocket* socket = new SCTPSocket();
00065     socket->setOutputGate(gate("sctpOut"));
00066     socket->setOutboundStreams(outboundStreams);
00067     if (strcmp(address,"")==0)
00068     {
00069         socket->bind(port);
00070         clientSocket.bind(port);
00071     }
00072     else
00073     {
00074         socket->bindx(addresses, port);
00075         clientSocket.bindx(addresses, port);
00076     }
00077     socket->listen(true, par("numPacketsToSendPerClient"));
00078     sctpEV3<<"SCTPPeer::initialized listen port="<<port<<"\n";
00079     clientSocket.setCallbackObject(this);
00080     clientSocket.setOutputGate(gate("sctpOut"));
00081 
00082     if ((simtime_t)par("startTime")>0)
00083     {
00084         connectTimer = new cMessage("ConnectTimer");
00085         connectTimer->setKind(MSGKIND_CONNECT);
00086         scheduleAt((simtime_t)par("startTime"), connectTimer);
00087     }
00088     schedule = false;
00089     shutdownReceived = false;
00090     sendAllowed = true;
00091 }
00092 
00093 void SCTPPeer::sendOrSchedule(cPacket *msg)
00094 {
00095     if (delay==0)
00096     {
00097         send(msg, "sctpOut");
00098     }
00099     else
00100     {
00101         scheduleAt(simulation.getSimTime()+delay, msg);
00102     }
00103 }
00104 
00105 void SCTPPeer::generateAndSend(SCTPConnectInfo *connectInfo)
00106 {
00107 uint32 numBytes;
00108     cPacket* cmsg = new cPacket("CMSG");
00109     SCTPSimpleMessage* msg=new SCTPSimpleMessage("Server");
00110     numBytes=(long)par("requestLength");
00111     msg->setDataArraySize(numBytes);
00112     for (uint32 i=0; i<numBytes; i++)
00113     {
00114         msg->setData(i, 's');
00115     }
00116     msg->setDataLen(numBytes);
00117     msg->setByteLength(numBytes);
00118     cmsg->encapsulate(msg);
00119     SCTPSendCommand *cmd = new SCTPSendCommand();
00120     cmd->setAssocId(serverAssocId);
00121     if (ordered)
00122         cmd->setSendUnordered(COMPLETE_MESG_ORDERED);
00123     else
00124         cmd->setSendUnordered(COMPLETE_MESG_UNORDERED);
00125     lastStream=(lastStream+1)%outboundStreams;
00126     cmd->setSid(lastStream);
00127     cmd->setLast(true);
00128     cmsg->setKind(SCTP_C_SEND);
00129     cmsg->setControlInfo(cmd);
00130     packetsSent++;
00131     bytesSent+=msg->getBitLength()/8;
00132     sendOrSchedule(cmsg);
00133 }
00134 
00135 void SCTPPeer::connect()
00136 {
00137     const char *connectAddress = par("connectAddress");
00138     int32 connectPort = par("connectPort");
00139     uint32 outStreams = par("outboundStreams");
00140     clientSocket.setOutboundStreams(outStreams);
00141 
00142     sctpEV3 << "issuing OPEN command\n";
00143     sctpEV3<<"Assoc "<<clientSocket.getConnectionId()<<"::connect to address "<<connectAddress<<", port "<<connectPort<<"\n";
00144     numSessions++;
00145     clientSocket.connect(IPAddressResolver().resolve(connectAddress, 1), connectPort, (uint32)par("numRequestsPerSession"));
00146 
00147 }
00148 
00149 void SCTPPeer::handleMessage(cMessage *msg)
00150 {
00151     int32 id;
00152 
00153     if (msg->isSelfMessage())
00154     {
00155 
00156         handleTimer(msg);
00157     }
00158     switch (msg->getKind())
00159     {
00160         case SCTP_I_PEER_CLOSED:
00161         case SCTP_I_ABORT:
00162         {
00163             SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->getControlInfo()->dup());
00164             cPacket* cmsg = new cPacket("Notification");
00165             SCTPSendCommand *cmd = new SCTPSendCommand();
00166             id = ind->getAssocId();
00167             cmd->setAssocId(id);
00168             cmd->setSid(ind->getSid());
00169             cmd->setNumMsgs(ind->getNumMsgs());
00170             cmsg->setControlInfo(cmd);
00171             delete ind;
00172             delete msg;
00173             cmsg->setKind(SCTP_C_ABORT);
00174             sendOrSchedule(cmsg);
00175             break;
00176         }
00177         case SCTP_I_ESTABLISHED:
00178         {
00179             if (clientSocket.getState()==SCTPSocket::CONNECTING)
00180                 clientSocket.processMessage(PK(msg));
00181             else
00182             {
00183                 int32 count=0;
00184                 SCTPConnectInfo *connectInfo = dynamic_cast<SCTPConnectInfo *>(msg->removeControlInfo());
00185                 numSessions++;
00186                 serverAssocId = connectInfo->getAssocId();
00187                 id = serverAssocId;
00188                 outboundStreams = connectInfo->getOutboundStreams();
00189                 rcvdPacketsPerAssoc[serverAssocId]= (long) par("numPacketsToReceivePerClient");
00190                 sentPacketsPerAssoc[serverAssocId]= (long) par("numPacketsToSendPerClient");
00191                 char text[30];
00192                 sprintf(text, "App: Received Bytes of assoc %d",serverAssocId);
00193                 bytesPerAssoc[serverAssocId] = new cOutVector(text);
00194                 rcvdBytesPerAssoc[serverAssocId]= 0;
00195                 sprintf(text, "App: EndToEndDelay of assoc %d",serverAssocId);
00196                 endToEndDelay[serverAssocId] = new cOutVector(text);
00197                 sprintf(text, "Hist: EndToEndDelay of assoc %d",serverAssocId);
00198                 histEndToEndDelay[serverAssocId] = new cDoubleHistogram(text);
00199 
00200                 //delete connectInfo;
00201                 delete msg;
00202                 if ((long) par("numPacketsToSendPerClient") > 0)
00203                 {
00204                     SentPacketsPerAssoc::iterator i=sentPacketsPerAssoc.find(serverAssocId);
00205                     numRequestsToSend = i->second;
00206                     if ((simtime_t)par("thinkTime") > 0)
00207                     {
00208                         generateAndSend(connectInfo);
00209                         timeoutMsg->setKind(SCTP_C_SEND);
00210                         scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeoutMsg);
00211                         numRequestsToSend--;
00212                         i->second = numRequestsToSend;
00213                     }
00214                     else
00215                     {
00216                         if (queueSize==0)
00217                         {
00218                             while (numRequestsToSend > 0)
00219                             {
00220                                 generateAndSend(connectInfo);
00221                                 numRequestsToSend--;
00222                                 i->second = numRequestsToSend;
00223                             }
00224                         }
00225                         else if (queueSize>0)
00226                         {
00227                             while (numRequestsToSend > 0 && count++ < queueSize*2)
00228                             {
00229                                 generateAndSend(connectInfo);
00230                                 numRequestsToSend--;
00231                                 i->second = numRequestsToSend;
00232                             }
00233 
00234                             cPacket* cmsg = new cPacket("Queue");
00235                             SCTPInfo* qinfo = new SCTPInfo();
00236                             qinfo->setText(queueSize);
00237                             cmsg->setKind(SCTP_C_QUEUE_MSGS_LIMIT);
00238                             qinfo->setAssocId(id);
00239                             cmsg->setControlInfo(qinfo);
00240                             sendOrSchedule(cmsg);
00241                         }
00242 
00243                         sctpEV3<<"!!!!!!!!!!!!!!!All data sent from Server !!!!!!!!!!\n";
00244 
00245                         RcvdPacketsPerAssoc::iterator j=rcvdPacketsPerAssoc.find(serverAssocId);
00246                         if (j->second == 0 && (simtime_t)par("waitToClose")>0)
00247                         {
00248                             char as[5];
00249                             sprintf(as, "%d",serverAssocId);
00250                             cMessage* abortMsg = new cMessage(as);
00251                             abortMsg->setKind(SCTP_I_ABORT);
00252                             scheduleAt(simulation.getSimTime()+(simtime_t)par("waitToClose"), abortMsg);
00253                         }
00254                         else
00255                         {
00256                             sctpEV3<<"no more packets to send, call shutdown for assoc "<<serverAssocId<<"\n";
00257                             cPacket* cmsg = new cPacket("ShutdownRequest");
00258                             SCTPCommand* cmd = new SCTPCommand();
00259                             cmsg->setKind(SCTP_C_SHUTDOWN);
00260                             cmd->setAssocId(serverAssocId);
00261                             cmsg->setControlInfo(cmd);
00262                             sendOrSchedule(cmsg);
00263                         }
00264                     }
00265                 }
00266             }
00267             break;
00268         }
00269         case SCTP_I_DATA_NOTIFICATION:
00270         {
00271             notifications++;
00272             SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->removeControlInfo());
00273             cPacket* cmsg = new cPacket("Notification");
00274             SCTPSendCommand *cmd = new SCTPSendCommand();
00275             id = ind->getAssocId();
00276             cmd->setAssocId(id);
00277             cmd->setSid(ind->getSid());
00278             cmd->setNumMsgs(ind->getNumMsgs());
00279             cmsg->setKind(SCTP_C_RECEIVE);
00280             cmsg->setControlInfo(cmd);
00281             delete ind;
00282             delete msg;
00283             if (!cmsg->isScheduled() && schedule==false)
00284             {
00285                 scheduleAt(simulation.getSimTime()+(simtime_t)par("delayFirstRead"), cmsg);
00286             }
00287             else if (schedule==true)
00288                 sendOrSchedule(cmsg);
00289             break;
00290         }
00291         case SCTP_I_DATA:
00292         {
00293             SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->getControlInfo());
00294             id = ind->getAssocId();
00295             RcvdBytesPerAssoc::iterator j=rcvdBytesPerAssoc.find(id);
00296             if (j==rcvdBytesPerAssoc.end() && (clientSocket.getState()==SCTPSocket::CONNECTED))
00297                 clientSocket.processMessage(PK(msg));
00298             else
00299             {
00300                 j->second+= PK(msg)->getByteLength();
00301                 BytesPerAssoc::iterator k=bytesPerAssoc.find(id);
00302                 k->second->record(j->second);
00303                 packetsRcvd++;
00304                 if (echoFactor==0)
00305                 {
00306                     if ((long)par("numPacketsToReceivePerClient")>0)
00307                     {
00308                         RcvdPacketsPerAssoc::iterator i=rcvdPacketsPerAssoc.find(id);
00309                         i->second--;
00310                         SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg);
00311                         EndToEndDelay::iterator j=endToEndDelay.find(id);
00312                         j->second->record(simulation.getSimTime()-smsg->getCreationTime());
00313                         HistEndToEndDelay::iterator k=histEndToEndDelay.find(id);
00314                         k->second->collect(simulation.getSimTime()-smsg->getCreationTime());
00315                         if (i->second == 0)
00316                         {
00317                             cPacket* cmsg = new cPacket("Request");
00318                             SCTPInfo* qinfo = new SCTPInfo();
00319                             cmsg->setKind(SCTP_C_NO_OUTSTANDING);
00320                             qinfo->setAssocId(id);
00321                             cmsg->setControlInfo(qinfo);
00322                             sendOrSchedule(cmsg);
00323                         }
00324                     }
00325                     delete msg;
00326                 }
00327                 else
00328                 {
00329                     SCTPSendCommand *cmd = new SCTPSendCommand();
00330                     cmd->setAssocId(id);
00331 
00332                     SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg->dup());
00333                     EndToEndDelay::iterator j=endToEndDelay.find(id);
00334                     j->second->record(simulation.getSimTime()-smsg->getCreationTime());
00335                     HistEndToEndDelay::iterator k=histEndToEndDelay.find(id);
00336                     k->second->collect(simulation.getSimTime()-smsg->getCreationTime());
00337                     cPacket* cmsg = new cPacket("SVData");
00338                     bytesSent+=smsg->getByteLength();
00339                     cmd->setSendUnordered(cmd->getSendUnordered());
00340                     lastStream=(lastStream+1)%outboundStreams;
00341                     cmd->setSid(lastStream);
00342                     cmd->setLast(true);
00343                     cmsg->encapsulate(smsg);
00344                     cmsg->setKind(SCTP_C_SEND);
00345                     cmsg->setControlInfo(cmd);
00346                     packetsSent++;
00347                     delete msg;
00348                     sendOrSchedule(cmsg);
00349                 }
00350             }
00351 
00352             break;
00353         }
00354         case SCTP_I_SHUTDOWN_RECEIVED:
00355         {
00356             SCTPCommand *command = check_and_cast<SCTPCommand *>(msg->removeControlInfo());
00357             id = command->getAssocId();
00358             sctpEV3<<"server: SCTP_I_SHUTDOWN_RECEIVED for assoc "<<id<<"\n";
00359             RcvdPacketsPerAssoc::iterator i=rcvdPacketsPerAssoc.find(id);
00360             if (i==rcvdPacketsPerAssoc.end()&& (clientSocket.getState()==SCTPSocket::CONNECTED))
00361                 clientSocket.processMessage(PK(msg));
00362             else
00363             {
00364                 if (i->second == 0)
00365                 {
00366                     cPacket* cmsg = new cPacket("Request");
00367                     SCTPInfo* qinfo = new SCTPInfo();
00368                     cmsg->setKind(SCTP_C_NO_OUTSTANDING);
00369                     qinfo->setAssocId(id);
00370                     cmsg->setControlInfo(qinfo);
00371                     sendOrSchedule(cmsg);
00372                 }
00373                 delete command;
00374                 shutdownReceived = true;
00375             }
00376             delete msg;
00377         }
00378         case SCTP_I_CLOSED: delete msg;
00379         break;
00380     }
00381 
00382     if (ev.isGUI())
00383     {
00384         char buf[32];
00385         RcvdBytesPerAssoc::iterator l=rcvdBytesPerAssoc.find(id);
00386         sprintf(buf, "rcvd: %ld bytes\nsent: %ld bytes", l->second, bytesSent);
00387         getDisplayString().setTagArg("t",0,buf);
00388     }
00389 }
00390 
00391 void SCTPPeer::handleTimer(cMessage *msg)
00392 {
00393     cPacket* cmsg;
00394     SCTPCommand* cmd;
00395     int32 id;
00396 
00397 
00398     sctpEV3<<"SCTPPeer::handleTimer\n";
00399 
00400     SCTPConnectInfo *connectInfo = dynamic_cast<SCTPConnectInfo *>(msg->getControlInfo());
00401     switch (msg->getKind())
00402     {
00403         case MSGKIND_CONNECT:
00404             sctpEV3 << "starting session call connect\n";
00405             connect();
00406             break;
00407         case SCTP_C_SEND:
00408 
00409             if (numRequestsToSend>0)
00410             {
00411                 generateAndSend(connectInfo);
00412                 if ((simtime_t)par("thinkTime") > 0)
00413                     scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeoutMsg);
00414                 numRequestsToSend--;
00415             }
00416             break;
00417         case SCTP_I_ABORT:
00418 
00419             cmsg = new cPacket("CLOSE", SCTP_C_CLOSE);
00420             cmd = new SCTPCommand();
00421             id = atoi(msg->getName());
00422             cmd->setAssocId(id);
00423             cmsg->setControlInfo(cmd);
00424             sendOrSchedule(cmsg);
00425             break;
00426         case SCTP_C_RECEIVE:
00427             schedule = true;
00428             sendOrSchedule(PK(msg));
00429             break;
00430         default:
00431 
00432             break;
00433     }
00434 }
00435 
00436 void SCTPPeer::socketDataNotificationArrived(int32 connId, void *ptr, cPacket *msg)
00437 {
00438     SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->removeControlInfo());
00439     cPacket* cmsg = new cPacket("CMSG");
00440     SCTPSendCommand *cmd = new SCTPSendCommand();
00441     cmd->setAssocId(ind->getAssocId());
00442     cmd->setSid(ind->getSid());
00443     cmd->setNumMsgs(ind->getNumMsgs());
00444     cmsg->setKind(SCTP_C_RECEIVE);
00445     cmsg->setControlInfo(cmd);
00446     delete ind;
00447     clientSocket.sendNotification(cmsg);
00448 }
00449 
00450 
00451 void SCTPPeer::socketPeerClosed(int32, void *)
00452 {
00453     // close the connection (if not already closed)
00454     if (clientSocket.getState()==SCTPSocket::PEER_CLOSED)
00455     {
00456         ev << "remote SCTP closed, closing here as well\n";
00457         setStatusString("closing");
00458         clientSocket.close();
00459     }
00460 }
00461 
00462 void SCTPPeer::socketClosed(int32, void *)
00463 {
00464     // *redefine* to start another session etc.
00465     ev << "connection closed\n";
00466     setStatusString("closed");
00467 }
00468 
00469 void SCTPPeer::socketFailure(int32, void *, int32 code)
00470 {
00471     // subclasses may override this function, and add code try to reconnect after a delay.
00472     ev << "connection broken\n";
00473     setStatusString("broken");
00474     // reconnect after a delay
00475     timeMsg->setKind(MSGKIND_CONNECT);
00476     scheduleAt(simulation.getSimTime()+(simtime_t)par("reconnectInterval"), timeMsg);
00477 }
00478 
00479 void SCTPPeer::socketStatusArrived(int32 assocId, void *yourPtr, SCTPStatusInfo *status)
00480 {
00481 struct pathStatus ps;
00482     SCTPPathStatus::iterator i=sctpPathStatus.find(status->getPathId());
00483     if (i!=sctpPathStatus.end())
00484     {
00485         ps = i->second;
00486         ps.active=status->getActive();
00487     }
00488     else
00489     {
00490         ps.active = status->getActive();
00491         ps.primaryPath = false;
00492         sctpPathStatus[ps.pid]=ps;
00493     }
00494 }
00495 
00496 void SCTPPeer::setStatusString(const char *s)
00497 {
00498     if (ev.isGUI()) getDisplayString().setTagArg("t", 0, s);
00499 }
00500 
00501 void SCTPPeer::sendRequest(bool last)
00502 {
00503     sctpEV3 << "sending request, " << numRequestsToSend-1 << " more to go\n";
00504     long numBytes = par("requestLength");
00505     if (numBytes < 1)
00506         numBytes=1;
00507 
00508     sctpEV3 << "SCTPClient: sending " << numBytes << " data bytes\n";
00509 
00510     cPacket* cmsg = new cPacket("AppData");
00511     SCTPSimpleMessage* msg=new SCTPSimpleMessage("data");
00512 
00513     msg->setDataArraySize(numBytes);
00514     for (int32 i=0; i<numBytes; i++)
00515     {
00516         msg->setData(i, 'a');
00517     }
00518     msg->setDataLen(numBytes);
00519     msg->setBitLength(numBytes * 8);
00520     msg->setCreationTime(simulation.getSimTime());
00521     cmsg->encapsulate(msg);
00522     if (ordered)
00523         cmsg->setKind(SCTP_C_SEND_ORDERED);
00524     else
00525         cmsg->setKind(SCTP_C_SEND_UNORDERED);
00526     // send SCTPMessage with SCTPSimpleMessage enclosed
00527     clientSocket.send(cmsg, last);
00528     bytesSent+=numBytes;
00529 }
00530 
00531 
00532 void SCTPPeer::socketEstablished(int32, void *)
00533 {
00534     int32 count = 0;
00535      // *redefine* to perform or schedule first sending
00536     ev<<"SCTPClient: connected\n";
00537     setStatusString("connected");
00538     // determine number of requests in this session
00539     numRequestsToSend = (long) par("numRequestsPerSession");
00540     numPacketsToReceive = (long) par("numPacketsToReceive");
00541     if (numRequestsToSend<1)
00542         numRequestsToSend = 0;
00543     // perform first request (next one will be sent when reply arrives)
00544     if (numRequestsToSend>0)
00545     {
00546         if ((simtime_t)par("thinkTime") > 0)
00547         {
00548             if (sendAllowed)
00549             {
00550                 sendRequest();
00551                 numRequestsToSend--;
00552             }
00553             timeMsg->setKind(MSGKIND_SEND);
00554             scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeMsg);
00555 
00556         }
00557         else
00558         {
00559             if (queueSize>0)
00560             {
00561                 while (numRequestsToSend > 0 && count++ < queueSize*2 && sendAllowed)
00562                 {
00563                     if (count == queueSize*2)
00564                         sendRequest();
00565                     else
00566                         sendRequest(false);
00567                     numRequestsToSend--;
00568                 }
00569                 if (numRequestsToSend>0 && sendAllowed)
00570                     sendQueueRequest();
00571             }
00572             else
00573             {
00574                 while (numRequestsToSend > 0 && sendAllowed)
00575                 {
00576                     sendRequest();
00577                     numRequestsToSend--;
00578                 }
00579             }
00580 
00581             if (numPacketsToReceive == 0 && (simtime_t)par("waitToClose")>0)
00582             {
00583                 timeMsg->setKind(MSGKIND_ABORT);
00584                 scheduleAt(simulation.getSimTime()+(simtime_t)par("waitToClose"), timeMsg);
00585             }
00586             if (numRequestsToSend == 0 && (simtime_t)par("waitToClose")==0)
00587             {
00588                 sctpEV3<<"socketEstablished:no more packets to send, call shutdown\n";
00589                 clientSocket.shutdown();
00590             }
00591         }
00592     }
00593 }
00594 
00595 void SCTPPeer::sendQueueRequest()
00596 {
00597     cPacket* cmsg = new cPacket("Queue");
00598     SCTPInfo* qinfo = new SCTPInfo();
00599     qinfo->setText(queueSize);
00600     cmsg->setKind(SCTP_C_QUEUE_MSGS_LIMIT);
00601     qinfo->setAssocId(clientSocket.getConnectionId());
00602     cmsg->setControlInfo(qinfo);
00603     clientSocket.sendRequest(cmsg);
00604 
00605 }
00606 
00607 
00608 void SCTPPeer::sendRequestArrived()
00609 {
00610 int32 count = 0;
00611 
00612     sctpEV3<<"sendRequestArrived numRequestsToSend="<<numRequestsToSend<<"\n";
00613     while (numRequestsToSend > 0 && count++ < queueSize && sendAllowed)
00614     {
00615         numRequestsToSend--;
00616         if (count == queueSize || numRequestsToSend==0)
00617             sendRequest();
00618         else
00619             sendRequest(false);
00620 
00621         if (numRequestsToSend == 0)
00622         {
00623             sctpEV3<<"no more packets to send, call shutdown\n";
00624             clientSocket.shutdown();
00625         }
00626     }
00627 
00628 
00629 }
00630 
00631 void SCTPPeer::socketDataArrived(int32, void *, cPacket *msg, bool)
00632 {
00633     // *redefine* to perform or schedule next sending
00634     packetsRcvd++;
00635 
00636     sctpEV3<<"Client received packet Nr "<<packetsRcvd<<" from SCTP\n";
00637 
00638     SCTPCommand* ind = check_and_cast<SCTPCommand*>(msg->getControlInfo());
00639 
00640     bytesRcvd+=msg->getByteLength();
00641 
00642     if (echoFactor > 0)
00643     {
00644         SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg->dup());
00645         cPacket* cmsg = new cPacket("SVData");
00646         echoedBytesSent+=smsg->getBitLength()/8;
00647         cmsg->encapsulate(smsg);
00648         if (ind->getSendUnordered())
00649             cmsg->setKind(SCTP_C_SEND_UNORDERED);
00650         else
00651             cmsg->setKind(SCTP_C_SEND_ORDERED);
00652         packetsSent++;
00653         delete msg;
00654         clientSocket.send(cmsg,1);
00655     }
00656     if ((long)par("numPacketsToReceive")>0)
00657     {
00658         numPacketsToReceive--;
00659         if (numPacketsToReceive == 0)
00660         {
00661             setStatusString("closing");
00662             clientSocket.close();
00663         }
00664     }
00665 }
00666 
00667 
00668 
00669 void SCTPPeer::shutdownReceivedArrived(int32 connId)
00670 {
00671     if (numRequestsToSend==0)
00672     {
00673         cPacket* cmsg = new cPacket("Request");
00674         SCTPInfo* qinfo = new SCTPInfo();
00675         cmsg->setKind(SCTP_C_NO_OUTSTANDING);
00676         qinfo->setAssocId(connId);
00677         cmsg->setControlInfo(qinfo);
00678         clientSocket.sendNotification(cmsg);
00679     }
00680 }
00681 
00682 
00683 
00684 void SCTPPeer::sendqueueFullArrived(int32 assocId)
00685 {
00686     sendAllowed = false;
00687 }
00688 
00689 
00690 void SCTPPeer::finish()
00691 {
00692     delete timeoutMsg;
00693     delete connectTimer;
00694         ev << getFullPath() << ": opened " << numSessions << " sessions\n";
00695     ev << getFullPath() << ": sent " << bytesSent << " bytes in " << packetsSent << " packets\n";
00696     for (RcvdBytesPerAssoc::iterator l=rcvdBytesPerAssoc.begin(); l!=rcvdBytesPerAssoc.end(); l++)
00697     {
00698         ev << getFullPath() << ": received " << l->second << " bytes in assoc " << l->first<< "\n";
00699     }
00700     ev << getFullPath() << "Over all " << packetsRcvd << " packets received\n ";
00701     ev << getFullPath() << "Over all " << notifications << " notifications received\n ";
00702     for (BytesPerAssoc::iterator j = bytesPerAssoc.begin(); j!= bytesPerAssoc.end(); j++)
00703     {
00704         delete j->second;
00705         bytesPerAssoc.erase(j);
00706     }
00707     for (EndToEndDelay::iterator k = endToEndDelay.begin(); k!= endToEndDelay.end(); k++)
00708     {
00709         delete k->second;
00710         endToEndDelay.erase(k);
00711     }
00712     for (HistEndToEndDelay::iterator l = histEndToEndDelay.begin(); l!= histEndToEndDelay.end(); l++)
00713     {
00714         delete l->second;
00715         histEndToEndDelay.erase(l);
00716     }
00717     rcvdPacketsPerAssoc.clear();
00718     sentPacketsPerAssoc.clear();
00719     rcvdBytesPerAssoc.clear();
00720 }
00721