Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "IPAddressResolver.h"
00021 #include "SCTPAssociation.h"
00022 #include "SCTPClient.h"
00023
00024 #define MSGKIND_CONNECT 0
00025 #define MSGKIND_SEND 1
00026 #define MSGKIND_ABORT 2
00027 #define MSGKIND_PRIMARY 3
00028 #define MSGKIND_STOP 5
00029
00030
00031 Define_Module(SCTPClient);
00032
00033 void SCTPClient::initialize()
00034 {
00035 const char * address;
00036 char* token;
00037 AddressVector addresses;
00038 sctpEV3<<"initialize SCTP Client\n";
00039 numSessions = numBroken = packetsSent = packetsRcvd = bytesSent = echoedBytesSent = bytesRcvd = 0;
00040 WATCH(numSessions);
00041 WATCH(numBroken);
00042 WATCH(packetsSent);
00043 WATCH(packetsRcvd);
00044 WATCH(bytesSent);
00045 WATCH(bytesRcvd);
00046
00047 address=par("address");
00048
00049 token = strtok((char*)address,",");
00050 while (token != NULL)
00051 {
00052 addresses.push_back(IPvXAddress(token));
00053 token = strtok(NULL, ",");
00054 }
00055 int32 port = par("port");
00056 echoFactor = par("echoFactor");
00057 if (!echoFactor) echoFactor = false;
00058 ordered = (bool)par("ordered");
00059 finishEndsSimulation = (bool)par("finishEndsSimulation");
00060 if (strcmp(address,"")==0)
00061 {
00062 socket.bind(port);
00063 }
00064 else
00065 {
00066 socket.bindx(addresses, port);
00067 }
00068
00069 socket.setCallbackObject(this);
00070 socket.setOutputGate(gate("sctpOut"));
00071 setStatusString("waiting");
00072
00073 timeMsg = new cMessage("CliAppTimer");
00074 numRequestsToSend = 0;
00075 numPacketsToReceive = 0;
00076 queueSize = par("queueSize");
00077 WATCH(numRequestsToSend);
00078 recordScalar("ums", (uint32) par("requestLength"));
00079 timeMsg->setKind(MSGKIND_CONNECT);
00080 scheduleAt((simtime_t)par("startTime"), timeMsg);
00081 sendAllowed = true;
00082 bufferSize = 0;
00083 if ((simtime_t)par("stopTime")!=0)
00084 {
00085 stopTimer = new cMessage("StopTimer");
00086 stopTimer->setKind(MSGKIND_STOP);
00087 scheduleAt((simtime_t)par("stopTime"), stopTimer);
00088 timer = true;
00089 }
00090 else
00091 {
00092 timer = false;
00093 stopTimer = NULL;
00094 }
00095 if ((simtime_t)par("primaryTime")!=0)
00096 {
00097 primaryChangeTimer = new cMessage("PrimaryTime");
00098 primaryChangeTimer->setKind(MSGKIND_PRIMARY);
00099 scheduleAt((simtime_t)par("primaryTime"), primaryChangeTimer);
00100 }
00101 else
00102 {
00103 primaryChangeTimer = NULL;
00104 }
00105 }
00106
00107 void SCTPClient::handleMessage(cMessage *msg)
00108 {
00109 if (msg->isSelfMessage())
00110 handleTimer(msg);
00111 else
00112 {
00113 socket.processMessage(PK(msg));
00114 }
00115 }
00116
00117 void SCTPClient::connect()
00118 {
00119 const char *connectAddress = par("connectAddress");
00120 int32 connectPort = par("connectPort");
00121 inStreams = par("inboundStreams");
00122 outStreams = par("outboundStreams");
00123 socket.setInboundStreams(inStreams);
00124 socket.setOutboundStreams(outStreams);
00125 ev << "issuing OPEN command\n";
00126 setStatusString("connecting");
00127 ev<<"connect to address "<<connectAddress<<"\n";
00128 socket.connect(IPAddressResolver().resolve(connectAddress, 1), connectPort, (uint32)par("numRequestsPerSession"));
00129 numSessions++;
00130 }
00131
00132 void SCTPClient::close()
00133 {
00134 setStatusString("closing");
00135 socket.close();
00136 }
00137
00138
00139 void SCTPClient::setStatusString(const char *s)
00140 {
00141 if (ev.isGUI()) getDisplayString().setTagArg("t", 0, s);
00142 }
00143
00144 void SCTPClient::socketEstablished(int32, void *, uint64 buffer )
00145 {
00146 int32 count = 0;
00147 ev<<"SCTPClient: connected\n";
00148 setStatusString("connected");
00149 bufferSize = buffer;
00150
00151 numRequestsToSend = (long) par("numRequestsPerSession");
00152 numPacketsToReceive = (long) par("numPacketsToReceive");
00153 if (numRequestsToSend<1)
00154 numRequestsToSend = 0;
00155 sctpEV3<<"SCTPClient:numRequestsToSend="<<numRequestsToSend<<"\n";
00156
00157 if ((numRequestsToSend>0 && !timer) || timer)
00158 {
00159 if ((simtime_t)par("thinkTime") > 0)
00160 {
00161 if (sendAllowed)
00162 {
00163 sendRequest();
00164 if (!timer)
00165 numRequestsToSend--;
00166 }
00167 timeMsg->setKind(MSGKIND_SEND);
00168 scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeMsg);
00169 }
00170 else
00171 {
00172 if (queueSize>0)
00173 {
00174 while (((!timer && numRequestsToSend > 0) || timer) && count++ < queueSize*2 && sendAllowed)
00175 {
00176 if (count == queueSize*2)
00177 sendRequest();
00178 else
00179 sendRequest(false);
00180 if (!timer)
00181 {
00182 if (--numRequestsToSend == 0)
00183 sendAllowed = false;
00184 }
00185 }
00186 if (((!timer && numRequestsToSend>0) || timer) && sendAllowed)
00187 sendQueueRequest();
00188 }
00189 else
00190 {
00191 while ((((!timer && numRequestsToSend>0) || timer) && sendAllowed && bufferSize>0) ||
00192 (((!timer && numRequestsToSend>0) || timer) && sendAllowed && buffer==0))
00193 {
00194 if (!timer && numRequestsToSend==1)
00195 sendRequest(true);
00196 else
00197 sendRequest(false);
00198 if (!timer && (--numRequestsToSend == 0))
00199 sendAllowed = false;
00200 }
00201 }
00202 }
00203 if ((!timer && numPacketsToReceive == 0) && (simtime_t)par("waitToClose")>0)
00204 {
00205 timeMsg->setKind(MSGKIND_ABORT);
00206 scheduleAt(simulation.getSimTime()+(simtime_t)par("waitToClose"), timeMsg);
00207 }
00208 if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0)
00209 {
00210 sctpEV3<<"socketEstablished:no more packets to send, call shutdown\n";
00211 socket.shutdown();
00212 if (timeMsg->isScheduled())
00213 cancelEvent(timeMsg);
00214 if (finishEndsSimulation) {
00215 endSimulation();
00216 }
00217 }
00218 }
00219 }
00220
00221 void SCTPClient::sendQueueRequest()
00222 {
00223 cPacket* cmsg = new cPacket("Queue");
00224 SCTPInfo* qinfo = new SCTPInfo();
00225 qinfo->setText(queueSize);
00226 cmsg->setKind(SCTP_C_QUEUE_MSGS_LIMIT);
00227 qinfo->setAssocId(socket.getConnectionId());
00228 cmsg->setControlInfo(qinfo);
00229 sctpEV3 << "Sending queue request ..." << endl;
00230 socket.sendRequest(cmsg);
00231 }
00232
00233 void SCTPClient::sendRequestArrived()
00234 {
00235 int32 count = 0;
00236
00237 sctpEV3<<"sendRequestArrived numRequestsToSend="<<numRequestsToSend<<"\n";
00238 while (((!timer && numRequestsToSend > 0) || timer) && count++ < queueSize && sendAllowed)
00239 {
00240 if (count == queueSize)
00241 sendRequest();
00242 else
00243 sendRequest(false);
00244
00245 if (!timer)
00246 numRequestsToSend--;
00247 if ((!timer && numRequestsToSend == 0))
00248 {
00249 sctpEV3<<"no more packets to send, call shutdown\n";
00250 socket.shutdown();
00251 if (timeMsg->isScheduled())
00252 cancelEvent(timeMsg);
00253 if (finishEndsSimulation) {
00254 endSimulation();
00255 }
00256 }
00257 }
00258 }
00259
00260 void SCTPClient::socketDataArrived(int32, void *, cPacket *msg, bool)
00261 {
00262 packetsRcvd++;
00263 sctpEV3<<"Client received packet Nr "<<packetsRcvd<<" from SCTP\n";
00264 SCTPCommand* ind = check_and_cast<SCTPCommand*>(msg->removeControlInfo());
00265 bytesRcvd+=msg->getByteLength();
00266 if (echoFactor > 0)
00267 {
00268 SCTPSimpleMessage *smsg=check_and_cast<SCTPSimpleMessage*>(msg->dup());
00269 cPacket* cmsg = new cPacket("SVData");
00270 echoedBytesSent+=smsg->getBitLength()/8;
00271 cmsg->encapsulate(smsg);
00272 if (ind->getSendUnordered())
00273 cmsg->setKind(SCTP_C_SEND_UNORDERED);
00274 else
00275 cmsg->setKind(SCTP_C_SEND_ORDERED);
00276 packetsSent++;
00277 delete msg;
00278 socket.send(cmsg, 1);
00279 }
00280 if ((long)par("numPacketsToReceive")>0)
00281 {
00282 numPacketsToReceive--;
00283 if (numPacketsToReceive == 0)
00284 {
00285 close();
00286 }
00287 }
00288 delete ind;
00289 }
00290
00291
00292 void SCTPClient::sendRequest(bool last)
00293 {
00294 uint32 i, sendBytes;
00295
00296 sendBytes = par("requestLength");
00297
00298
00299 if (sendBytes < 1)
00300 sendBytes=1;
00301 cPacket* cmsg = new cPacket("AppData");
00302 SCTPSimpleMessage* msg=new SCTPSimpleMessage("data");
00303
00304 msg->setDataArraySize(sendBytes);
00305 for (i=0; i<sendBytes; i++)
00306 {
00307 msg->setData(i, 'a');
00308 }
00309 msg->setDataLen(sendBytes);
00310 msg->setByteLength(sendBytes);
00311 msg->setCreationTime(simulation.getSimTime());
00312 cmsg->encapsulate(msg);
00313 if (ordered)
00314 cmsg->setKind(SCTP_C_SEND_ORDERED);
00315 else
00316 cmsg->setKind(SCTP_C_SEND_UNORDERED);
00317
00318 sctpEV3 << "Sending request ..." << endl;
00319 bufferSize -= sendBytes;
00320 if (bufferSize < 0)
00321 last = true;
00322 socket.send(cmsg, last);
00323 bytesSent+=sendBytes;
00324 }
00325
00326 void SCTPClient::handleTimer(cMessage *msg)
00327 {
00328
00329 switch (msg->getKind())
00330 {
00331 case MSGKIND_CONNECT:
00332 ev << "starting session call connect\n";
00333 connect();
00334 break;
00335 case MSGKIND_SEND:
00336
00337 if (((!timer && numRequestsToSend>0) || timer))
00338 {
00339 if (sendAllowed)
00340 {
00341 sendRequest();
00342 if (!timer)
00343 numRequestsToSend--;
00344 }
00345 if ((simtime_t)par("thinkTime") > 0)
00346 scheduleAt(simulation.getSimTime()+(simtime_t)par("thinkTime"), timeMsg);
00347 if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0)
00348 {
00349 socket.shutdown();
00350 if (timeMsg->isScheduled())
00351 cancelEvent(timeMsg);
00352 if (finishEndsSimulation) {
00353 endSimulation();
00354 }
00355 }
00356 }
00357 else if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0)
00358 {
00359 socket.shutdown();
00360 if (timeMsg->isScheduled())
00361 cancelEvent(timeMsg);
00362 if (finishEndsSimulation) {
00363 endSimulation();
00364 }
00365 }
00366 break;
00367 case MSGKIND_ABORT:
00368 close();
00369 break;
00370 case MSGKIND_PRIMARY:
00371 setPrimaryPath((const char*)par("newPrimary"));
00372 break;
00373 case MSGKIND_STOP:
00374 numRequestsToSend=0;
00375 sendAllowed = false;
00376 socket.abort();
00377 socket.close();
00378 if (timeMsg->isScheduled())
00379 cancelEvent(timeMsg);
00380 socket.close();
00381 if (finishEndsSimulation) {
00382 endSimulation();
00383 }
00384 break;
00385 default:
00386 ev<<"MsgKind ="<<msg->getKind()<<" unknown\n";
00387 break;
00388 }
00389 }
00390
00391
00392 void SCTPClient::socketDataNotificationArrived(int32 connId, void *ptr, cPacket *msg)
00393 {
00394 SCTPCommand *ind = check_and_cast<SCTPCommand *>(msg->removeControlInfo());
00395 cPacket* cmsg = new cPacket("CMSG-DataArr");
00396 SCTPSendCommand *cmd = new SCTPSendCommand();
00397 cmd->setAssocId(ind->getAssocId());
00398 cmd->setSid(ind->getSid());
00399 cmd->setNumMsgs(ind->getNumMsgs());
00400 cmsg->setKind(SCTP_C_RECEIVE);
00401 cmsg->setControlInfo(cmd);
00402 delete ind;
00403 socket.sendNotification(cmsg);
00404 }
00405
00406 void SCTPClient::shutdownReceivedArrived(int32 connId)
00407 {
00408 if (numRequestsToSend==0)
00409 {
00410 cPacket* cmsg = new cPacket("Request");
00411 SCTPInfo* qinfo = new SCTPInfo();
00412 cmsg->setKind(SCTP_C_NO_OUTSTANDING);
00413 qinfo->setAssocId(connId);
00414 cmsg->setControlInfo(qinfo);
00415 socket.sendNotification(cmsg);
00416 }
00417 }
00418
00419 void SCTPClient::socketPeerClosed(int32, void *)
00420 {
00421
00422 if (socket.getState()==SCTPSocket::PEER_CLOSED)
00423 {
00424 ev << "remote SCTP closed, closing here as well\n";
00425 close();
00426 }
00427 }
00428
00429 void SCTPClient::socketClosed(int32, void *)
00430 {
00431
00432 ev << "connection closed\n";
00433 setStatusString("closed");
00434 if (primaryChangeTimer)
00435 {
00436 cancelEvent(primaryChangeTimer);
00437 delete primaryChangeTimer;
00438 primaryChangeTimer = NULL;
00439 }
00440 }
00441
00442 void SCTPClient::socketFailure(int32, void *, int32 code)
00443 {
00444
00445 ev << "connection broken\n";
00446 setStatusString("broken");
00447 numBroken++;
00448
00449 timeMsg->setKind(MSGKIND_CONNECT);
00450 scheduleAt(simulation.getSimTime()+(simtime_t)par("reconnectInterval"), timeMsg);
00451 }
00452
00453 void SCTPClient::socketStatusArrived(int32 assocId, void *yourPtr, SCTPStatusInfo *status)
00454 {
00455 struct pathStatus ps;
00456 SCTPPathStatus::iterator i=sctpPathStatus.find(status->getPathId());
00457 if (i!=sctpPathStatus.end())
00458 {
00459 ps = i->second;
00460 ps.active=status->getActive();
00461 }
00462 else
00463 {
00464 ps.active = status->getActive();
00465 ps.pid = status->getPathId();
00466 ps.primaryPath = false;
00467 sctpPathStatus[ps.pid]=ps;
00468 }
00469 }
00470
00471 void SCTPClient::setPrimaryPath (const char* str)
00472 {
00473
00474 cPacket* cmsg = new cPacket("CMSG-SetPrimary");
00475 SCTPPathInfo *pinfo = new SCTPPathInfo();
00476 if (strcmp(str,"")!=0)
00477 {
00478 pinfo->setRemoteAddress(IPvXAddress(str));
00479 }
00480 else
00481 {
00482 str = (const char*)par("newPrimary");
00483 if (strcmp(str, "")!=0)
00484 pinfo->setRemoteAddress(IPvXAddress(str));
00485 else
00486 {
00487 str = (const char*)par("connectAddress");
00488 pinfo->setRemoteAddress(IPvXAddress(str));
00489 }
00490 }
00491
00492 pinfo->setAssocId(socket.getConnectionId());
00493 cmsg->setKind(SCTP_C_PRIMARY);
00494 cmsg->setControlInfo(pinfo);
00495 socket.sendNotification(cmsg);
00496 }
00497
00498
00499
00500
00501 void SCTPClient::sendqueueFullArrived(int32 assocId)
00502 {
00503 sendAllowed = false;
00504 }
00505
00506 void SCTPClient::sendqueueAbatedArrived(int32 assocId, uint64 buffer)
00507 {
00508 bufferSize = buffer;
00509 sendAllowed = true;
00510 while ((((!timer && numRequestsToSend>0) || timer) && sendAllowed && bufferSize>0) ||
00511 (((!timer && numRequestsToSend>0) || timer) && sendAllowed && buffer==0))
00512 {
00513 if (!timer && numRequestsToSend==1)
00514 sendRequest(true);
00515 else
00516 sendRequest(false);
00517 if (!timer && (--numRequestsToSend == 0))
00518 sendAllowed = false;
00519 }
00520 if ((!timer && numRequestsToSend == 0) && (simtime_t)par("waitToClose")==0)
00521 {
00522 sctpEV3<<"socketEstablished:no more packets to send, call shutdown\n";
00523 socket.shutdown();
00524 if (timeMsg->isScheduled())
00525 cancelEvent(timeMsg);
00526 if (finishEndsSimulation) {
00527 endSimulation();
00528 }
00529 }
00530 }
00531
00532 void SCTPClient::addressAddedArrived(int32 assocId, IPvXAddress remoteAddr)
00533 {
00534 }
00535
00536 void SCTPClient::finish()
00537 {
00538 if (timeMsg->isScheduled())
00539 cancelEvent(timeMsg);
00540 delete timeMsg;
00541 if (stopTimer)
00542 {
00543 cancelEvent(stopTimer);
00544 delete stopTimer;
00545 }
00546 if (primaryChangeTimer)
00547 {
00548 cancelEvent(primaryChangeTimer);
00549 delete primaryChangeTimer;
00550 primaryChangeTimer = NULL;
00551 }
00552 ev << getFullPath() << ": opened " << numSessions << " sessions\n";
00553 ev << getFullPath() << ": sent " << bytesSent << " bytes in " << packetsSent << " packets\n";
00554 ev << getFullPath() << ": received " << bytesRcvd << " bytes in " << packetsRcvd << " packets\n";
00555 sctpEV3<<"Client finished\n";
00556 }
00557