SCTPClient.cc

Go to the documentation of this file.
00001 //
00002 // Copyright (C) 2008 Irene Ruengeler
00003 // Copyright (C) 2009 Thomas Dreibholz
00004 //
00005 // This program is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU General Public License
00007 // as published by the Free Software Foundation; either version 2
00008 // of the License, or (at your option) any later version.
00009 //
00010 // This program is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00013 // GNU General Public License for more details.
00014 //
00015 // You should have received a copy of the GNU General Public License
00016 // along with this program; if not, see <http://www.gnu.org/licenses/>.
00017 //
00018 
00019 
00020 #include "IPAddressResolver.h"
00021 #include "SCTPAssociation.h"
00022 #include "SCTPClient.h"
00023 
00024 #define MSGKIND_CONNECT  0
00025 #define MSGKIND_SEND         1
00026 #define MSGKIND_ABORT    2
00027 #define MSGKIND_PRIMARY  3
00028 #define MSGKIND_STOP         5
00029 
00030 
00031 Define_Module(SCTPClient);
00032 
00033 void SCTPClient::initialize()
00034 {
00035     const char * address;
00036     char* token;
00037     AddressVector addresses;
00038     sctpEV3<<"initialize SCTP Client\n";
00039     numSessions = numBroken = packetsSent = packetsRcvd = bytesSent = echoedBytesSent = bytesRcvd = 0;
00040     WATCH(numSessions);
00041     WATCH(numBroken);
00042     WATCH(packetsSent);
00043     WATCH(packetsRcvd);
00044     WATCH(bytesSent);
00045     WATCH(bytesRcvd);
00046     // parameters
00047     address=par("address");
00048 
00049     token = strtok((char*)address,",");
00050     while (token != NULL)
00051     {
00052         addresses.push_back(IPvXAddress(token));
00053         token = strtok(NULL, ",");
00054     }
00055     int32 port = par("port");
00056     echoFactor = par("echoFactor");
00057     if (!echoFactor) echoFactor = false;
00058     ordered = (bool)par("ordered");
00059     finishEndsSimulation = (bool)par("finishEndsSimulation");
00060     if (strcmp(address,"")==0)
00061     {
00062         socket.bind(port);
00063     }
00064     else
00065     {
00066         socket.bindx(addresses, port);
00067     }
00068 
00069     socket.setCallbackObject(this);
00070     socket.setOutputGate(gate("sctpOut"));
00071     setStatusString("waiting");
00072 
00073     timeMsg = new cMessage("CliAppTimer");
00074     numRequestsToSend = 0;
00075     numPacketsToReceive = 0;
00076     queueSize = par("queueSize");
00077     WATCH(numRequestsToSend);
00078     recordScalar("ums", (uint32) par("requestLength"));
00079     timeMsg->setKind(MSGKIND_CONNECT);
00080     scheduleAt((simtime_t)par("startTime"), timeMsg);
00081     sendAllowed = true;
00082     bufferSize = 0;
00083     if ((simtime_t)par("stopTime")!=0)
00084     {
00085         stopTimer = new cMessage("StopTimer");
00086         stopTimer->setKind(MSGKIND_STOP);
00087         scheduleAt((simtime_t)par("stopTime"), stopTimer);
00088         timer = true;
00089     }
00090     else
00091     {
00092         timer = false;
00093         stopTimer = NULL;
00094     }
00095     if ((simtime_t)par("primaryTime")!=0)
00096     {
00097         primaryChangeTimer = new cMessage("PrimaryTime");
00098         primaryChangeTimer->setKind(MSGKIND_PRIMARY);
00099         scheduleAt((simtime_t)par("primaryTime"), primaryChangeTimer);
00100     }
00101     else
00102     {
00103         primaryChangeTimer = NULL;
00104     }
00105 }
00106 
00107 void SCTPClient::handleMessage(cMessage *msg)
00108 {
00109     if (msg->isSelfMessage())
00110         handleTimer(msg);
00111     else
00112     {
00113         socket.processMessage(PK(msg));
00114     }
00115 }
00116 
00117 void SCTPClient::connect()
00118 {
00119     const char *connectAddress = par("connectAddress");
00120     int32 connectPort = par("connectPort");
00121     inStreams = par("inboundStreams");
00122     outStreams = par("outboundStreams");
00123     socket.setInboundStreams(inStreams);
00124     socket.setOutboundStreams(outStreams);
00125     ev << "issuing OPEN command\n";
00126     setStatusString("connecting");
00127     ev<<"connect to address "<<connectAddress<<"\n";
00128     socket.connect(IPAddressResolver().resolve(connectAddress, 1), connectPort, (uint32)par("numRequestsPerSession"));
00129     numSessions++;
00130 }
00131 
00132 void SCTPClient::close()
00133 {
00134     setStatusString("closing");
00135     socket.close();
00136 }
00137 
00138 
00139 void SCTPClient::setStatusString(const char *s)
00140 {
00141     if (ev.isGUI()) getDisplayString().setTagArg("t", 0, s);
00142 }
00143 
00144 void SCTPClient::socketEstablished(int32, void *, uint64 buffer )
00145 {
00146       int32 count = 0;
00147      ev<<"SCTPClient: connected\n";
00148     setStatusString("connected");
00149     bufferSize = buffer;
00150     // determine number of requests in this session
00151     numRequestsToSend = (long) par("numRequestsPerSession");
00152     numPacketsToReceive = (long) par("numPacketsToReceive");
00153     if (numRequestsToSend<1)
00154         numRequestsToSend = 0;
00155         sctpEV3<<"SCTPClient:numRequestsToSend="<<numRequestsToSend<<"\n";
00156     // perform first request (next one will be sent when reply arrives)
00157     if ((numRequestsToSend>0 && !timer) || timer)
00158     {
00159         if ((simtime_t)par("thinkTime") > 0)
00160         {
00161             if (sendAllowed)
00162             {
00163                 sendRequest();
00164                 if (!timer)
00165                     numRequestsToSend--;
00166             }
00167             timeMsg->setKind(MSGKIND_SEND);
00168             scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeMsg);
00169         }
00170         else
00171         {
00172             if (queueSize>0)
00173             {
00174                 while (((!timer && numRequestsToSend > 0) || timer) && count++ < queueSize*2 && sendAllowed)
00175                 {
00176                     if (count == queueSize*2)
00177                         sendRequest();
00178                     else
00179                         sendRequest(false);
00180                     if (!timer)
00181                     {
00182                         if (--numRequestsToSend == 0)
00183                             sendAllowed = false;
00184                     }
00185                 }
00186                 if (((!timer && numRequestsToSend>0) || timer) && sendAllowed)
00187                     sendQueueRequest();
00188             }
00189             else
00190             {
00191                 while ((((!timer && numRequestsToSend>0) || timer) && sendAllowed && bufferSize>0) ||
00192                     (((!timer && numRequestsToSend>0) || timer) && sendAllowed && buffer==0))
00193                 {
00194                     if (!timer && numRequestsToSend==1)
00195                         sendRequest(true);
00196                     else
00197                         sendRequest(false);
00198                     if (!timer && (--numRequestsToSend == 0))
00199                             sendAllowed = false;
00200                     }
00201                 }
00202             }
00203             if ((!timer && numPacketsToReceive == 0) && (simtime_t)par("waitToClose")>0)
00204             {
00205                 timeMsg->setKind(MSGKIND_ABORT);
00206                 scheduleAt(simulation.getSimTime()+(simtime_t)par("waitToClose"), timeMsg);
00207             }
00208             if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0)
00209             {
00210                 sctpEV3<<"socketEstablished:no more packets to send, call shutdown\n";
00211                 socket.shutdown();
00212                 if (timeMsg->isScheduled())
00213                     cancelEvent(timeMsg);
00214                 if (finishEndsSimulation) {
00215                     endSimulation();
00216                 }
00217             }
00218         }
00219 }
00220 
00221 void SCTPClient::sendQueueRequest()
00222 {
00223     cPacket* cmsg = new cPacket("Queue");
00224     SCTPInfo* qinfo = new SCTPInfo();
00225     qinfo->setText(queueSize);
00226     cmsg->setKind(SCTP_C_QUEUE_MSGS_LIMIT);
00227     qinfo->setAssocId(socket.getConnectionId());
00228     cmsg->setControlInfo(qinfo);
00229         sctpEV3 << "Sending queue request ..." << endl;
00230     socket.sendRequest(cmsg);
00231 }
00232 
00233 void SCTPClient::sendRequestArrived()
00234 {
00235     int32 count = 0;
00236 
00237     sctpEV3<<"sendRequestArrived numRequestsToSend="<<numRequestsToSend<<"\n";
00238     while (((!timer && numRequestsToSend > 0) || timer) && count++ < queueSize && sendAllowed)
00239     {
00240         if (count == queueSize)
00241             sendRequest();
00242         else
00243             sendRequest(false);
00244 
00245         if (!timer)
00246             numRequestsToSend--;
00247         if ((!timer && numRequestsToSend == 0))
00248         {
00249             sctpEV3<<"no more packets to send, call shutdown\n";
00250             socket.shutdown();
00251             if (timeMsg->isScheduled())
00252                 cancelEvent(timeMsg);
00253             if (finishEndsSimulation) {
00254                 endSimulation();
00255             }
00256         }
00257     }
00258 }
00259 
00260 void SCTPClient::socketDataArrived(int32, void *, cPacket *msg, bool)
00261 {
00262     packetsRcvd++;
00263     sctpEV3<<"Client received packet Nr "<<packetsRcvd<<" from SCTP\n";
00264     SCTPCommand* ind = check_and_cast<SCTPCommand*>(msg->removeControlInfo());
00265     bytesRcvd+=msg->getByteLength();
00266     if (echoFactor > 0)
00267     {
00268         SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg->dup());
00269         cPacket* cmsg = new cPacket("SVData");
00270         echoedBytesSent+=smsg->getBitLength()/8;
00271         cmsg->encapsulate(smsg);
00272         if (ind->getSendUnordered())
00273             cmsg->setKind(SCTP_C_SEND_UNORDERED);
00274         else
00275             cmsg->setKind(SCTP_C_SEND_ORDERED);
00276         packetsSent++;
00277         delete msg;
00278         socket.send(cmsg, 1);
00279     }
00280     if ((long)par("numPacketsToReceive")>0)
00281     {
00282         numPacketsToReceive--;
00283         if (numPacketsToReceive == 0)
00284         {
00285             close();
00286         }
00287     }
00288     delete ind;
00289 }
00290 
00291 
00292 void SCTPClient::sendRequest(bool last)
00293 {
00294     uint32 i, sendBytes;
00295 
00296     sendBytes = par("requestLength");
00297 
00298 
00299     if (sendBytes < 1)
00300         sendBytes=1;
00301     cPacket* cmsg = new cPacket("AppData");
00302     SCTPSimpleMessage* msg=new SCTPSimpleMessage("data");
00303 
00304     msg->setDataArraySize(sendBytes);
00305     for (i=0; i<sendBytes; i++)
00306     {
00307         msg->setData(i, 'a');
00308     }
00309     msg->setDataLen(sendBytes);
00310     msg->setByteLength(sendBytes);
00311     msg->setCreationTime(simulation.getSimTime());
00312     cmsg->encapsulate(msg);
00313     if (ordered)
00314         cmsg->setKind(SCTP_C_SEND_ORDERED);
00315     else
00316         cmsg->setKind(SCTP_C_SEND_UNORDERED);
00317     // send SCTPMessage with SCTPSimpleMessage enclosed
00318     sctpEV3 << "Sending request ..." << endl;
00319     bufferSize -= sendBytes;
00320     if (bufferSize < 0)
00321         last = true;
00322     socket.send(cmsg, last);
00323     bytesSent+=sendBytes;
00324 }
00325 
00326 void SCTPClient::handleTimer(cMessage *msg)
00327 {
00328 
00329     switch (msg->getKind())
00330     {
00331         case MSGKIND_CONNECT:
00332             ev << "starting session call connect\n";
00333             connect();
00334             break;
00335         case MSGKIND_SEND:
00336 
00337             if (((!timer && numRequestsToSend>0) || timer))
00338             {
00339                 if (sendAllowed)
00340                 {
00341                     sendRequest();
00342                     if (!timer)
00343                         numRequestsToSend--;
00344                 }
00345                 if ((simtime_t)par("thinkTime") > 0)
00346                     scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeMsg);
00347                 if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0)
00348                 {
00349                     socket.shutdown();
00350                     if (timeMsg->isScheduled())
00351                         cancelEvent(timeMsg);
00352                     if (finishEndsSimulation) {
00353                         endSimulation();
00354                     }
00355                 }
00356             }
00357             else if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0)
00358             {
00359                     socket.shutdown();
00360                     if (timeMsg->isScheduled())
00361                         cancelEvent(timeMsg);
00362                     if (finishEndsSimulation) {
00363                         endSimulation();
00364                     }
00365             }
00366             break;
00367         case MSGKIND_ABORT:
00368             close();
00369             break;
00370         case MSGKIND_PRIMARY:
00371             setPrimaryPath((const char*)par("newPrimary"));
00372             break;
00373         case MSGKIND_STOP:
00374             numRequestsToSend=0;
00375             sendAllowed = false;
00376             socket.abort();
00377             socket.close();
00378             if (timeMsg->isScheduled())
00379                 cancelEvent(timeMsg);
00380             socket.close();
00381             if (finishEndsSimulation) {
00382                 endSimulation();
00383             }
00384             break;
00385         default:
00386             ev<<"MsgKind ="<<msg->getKind()<<" unknown\n";
00387             break;
00388     }
00389 }
00390 
00391 
00392 void SCTPClient::socketDataNotificationArrived(int32 connId, void *ptr, cPacket *msg)
00393 {
00394     SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->removeControlInfo());
00395     cPacket* cmsg = new cPacket("CMSG-DataArr");
00396     SCTPSendCommand *cmd = new SCTPSendCommand();
00397     cmd->setAssocId(ind->getAssocId());
00398     cmd->setSid(ind->getSid());
00399     cmd->setNumMsgs(ind->getNumMsgs());
00400     cmsg->setKind(SCTP_C_RECEIVE);
00401     cmsg->setControlInfo(cmd);
00402     delete ind;
00403     socket.sendNotification(cmsg);
00404 }
00405 
00406 void SCTPClient::shutdownReceivedArrived(int32 connId)
00407 {
00408     if (numRequestsToSend==0)
00409     {
00410         cPacket* cmsg = new cPacket("Request");
00411         SCTPInfo* qinfo = new SCTPInfo();
00412         cmsg->setKind(SCTP_C_NO_OUTSTANDING);
00413         qinfo->setAssocId(connId);
00414         cmsg->setControlInfo(qinfo);
00415         socket.sendNotification(cmsg);
00416     }
00417 }
00418 
00419 void SCTPClient::socketPeerClosed(int32, void *)
00420 {
00421     // close the connection (if not already closed)
00422     if (socket.getState()==SCTPSocket::PEER_CLOSED)
00423     {
00424         ev << "remote SCTP closed, closing here as well\n";
00425         close();
00426     }
00427 }
00428 
00429 void SCTPClient::socketClosed(int32, void *)
00430 {
00431     // *redefine* to start another session etc.
00432     ev << "connection closed\n";
00433     setStatusString("closed");
00434     if (primaryChangeTimer)
00435     {
00436         cancelEvent(primaryChangeTimer);
00437         delete primaryChangeTimer;
00438         primaryChangeTimer = NULL;
00439     }
00440 }
00441 
00442 void SCTPClient::socketFailure(int32, void *, int32 code)
00443 {
00444     // subclasses may override this function, and add code try to reconnect after a delay.
00445     ev << "connection broken\n";
00446     setStatusString("broken");
00447     numBroken++;
00448     // reconnect after a delay
00449     timeMsg->setKind(MSGKIND_CONNECT);
00450     scheduleAt(simulation.getSimTime()+(simtime_t)par("reconnectInterval"), timeMsg);
00451 }
00452 
00453 void SCTPClient::socketStatusArrived(int32 assocId, void *yourPtr, SCTPStatusInfo *status)
00454 {
00455 struct pathStatus ps;
00456     SCTPPathStatus::iterator i=sctpPathStatus.find(status->getPathId());
00457     if (i!=sctpPathStatus.end())
00458     {
00459         ps = i->second;
00460         ps.active=status->getActive();
00461     }
00462     else
00463     {
00464         ps.active = status->getActive();
00465         ps.pid = status->getPathId();
00466         ps.primaryPath = false;
00467         sctpPathStatus[ps.pid]=ps;
00468     }
00469 }
00470 
00471 void SCTPClient::setPrimaryPath (const char* str)
00472 {
00473 
00474     cPacket* cmsg = new cPacket("CMSG-SetPrimary");
00475     SCTPPathInfo *pinfo = new SCTPPathInfo();
00476     if (strcmp(str,"")!=0)
00477     {
00478         pinfo->setRemoteAddress(IPvXAddress(str));
00479     }
00480     else
00481     {
00482         str = (const char*)par("newPrimary");
00483         if (strcmp(str, "")!=0)
00484             pinfo->setRemoteAddress(IPvXAddress(str));
00485         else
00486         {
00487             str = (const char*)par("connectAddress");
00488             pinfo->setRemoteAddress(IPvXAddress(str));
00489         }
00490     }
00491 
00492     pinfo->setAssocId(socket.getConnectionId());
00493     cmsg->setKind(SCTP_C_PRIMARY);
00494     cmsg->setControlInfo(pinfo);
00495     socket.sendNotification(cmsg);
00496 }
00497 
00498 
00499 
00500 
00501 void SCTPClient::sendqueueFullArrived(int32 assocId)
00502 {
00503     sendAllowed = false;
00504 }
00505 
00506 void SCTPClient::sendqueueAbatedArrived(int32 assocId, uint64 buffer)
00507 {
00508     bufferSize = buffer;
00509     sendAllowed = true;
00510     while ((((!timer && numRequestsToSend>0) || timer) && sendAllowed && bufferSize>0) ||
00511                     (((!timer && numRequestsToSend>0) || timer) && sendAllowed && buffer==0))
00512     {
00513             if (!timer && numRequestsToSend==1)
00514                         sendRequest(true);
00515                     else
00516                         sendRequest(false);
00517           if (!timer && (--numRequestsToSend == 0))
00518                 sendAllowed = false;
00519         }
00520         if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0)
00521             {
00522                 sctpEV3<<"socketEstablished:no more packets to send, call shutdown\n";
00523                 socket.shutdown();
00524                 if (timeMsg->isScheduled())
00525                     cancelEvent(timeMsg);
00526                 if (finishEndsSimulation) {
00527                     endSimulation();
00528                 }
00529             }
00530 }
00531 
00532 void SCTPClient::addressAddedArrived(int32 assocId, IPvXAddress remoteAddr)
00533 {
00534 }
00535 
00536 void SCTPClient::finish()
00537 {
00538     if (timeMsg->isScheduled())
00539         cancelEvent(timeMsg);
00540     delete timeMsg;
00541     if (stopTimer)
00542     {
00543         cancelEvent(stopTimer);
00544         delete stopTimer;
00545     }
00546     if (primaryChangeTimer)
00547     {
00548         cancelEvent(primaryChangeTimer);
00549         delete primaryChangeTimer;
00550         primaryChangeTimer = NULL;
00551     }
00552     ev << getFullPath() << ": opened " << numSessions << " sessions\n";
00553     ev << getFullPath() << ": sent " << bytesSent << " bytes in " << packetsSent << " packets\n";
00554     ev << getFullPath() << ": received " << bytesRcvd << " bytes in " << packetsRcvd << " packets\n";
00555     sctpEV3<<"Client finished\n";
00556 }
00557