00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <string.h>
00021 #include <stdlib.h>
00022 #include <assert.h>
00023 #include "SCTP.h"
00024 #include "SCTPAssociation.h"
00025 #include "SCTPCommand_m.h"
00026 #include "IPv6ControlInfo.h"
00027 #include "SCTPQueue.h"
00028 #include "SCTPAlgorithm.h"
00029 #include "RoutingTable.h"
00030 #include "RoutingTableAccess.h"
00031 #include "InterfaceTable.h"
00032 #include "InterfaceTableAccess.h"
00033 #include "IPv4InterfaceData.h"
00034 #include "IPv6InterfaceData.h"
00035 #include "IPv6Address.h"
00036 #include "UDPControlInfo_m.h"
00037
00038
00039
00040 void SCTPAssociation::printSctpPathMap() const
00041 {
00042 sctpEV3 <<"SCTP PathMap:" << endl;
00043 for (SCTPPathMap::const_iterator iterator = sctpPathMap.begin();
00044 iterator != sctpPathMap.end(); ++iterator) {
00045 const SCTPPathVariables* path = iterator->second;
00046 sctpEV3 << " - " << path->remoteAddress << ": osb=" << path->outstandingBytes
00047 << " cwnd=" << path->cwnd << endl;
00048 }
00049 }
00050
00051 const char* SCTPAssociation::stateName(const int32 state)
00052 {
00053 #define CASE(x) case x: s=#x+7; break
00054 const char* s = "unknown";
00055 switch (state) {
00056 CASE(SCTP_S_CLOSED);
00057 CASE(SCTP_S_COOKIE_WAIT);
00058 CASE(SCTP_S_COOKIE_ECHOED);
00059 CASE(SCTP_S_ESTABLISHED);
00060 CASE(SCTP_S_SHUTDOWN_PENDING);
00061 CASE(SCTP_S_SHUTDOWN_SENT);
00062 CASE(SCTP_S_SHUTDOWN_RECEIVED);
00063 CASE(SCTP_S_SHUTDOWN_ACK_SENT);
00064 }
00065 return s;
00066 #undef CASE
00067 }
00068
00069 const char* SCTPAssociation::eventName(const int32 event)
00070 {
00071 #define CASE(x) case x: s=#x+7; break
00072 const char* s = "unknown";
00073 switch (event) {
00074 CASE(SCTP_E_OPEN_PASSIVE);
00075 CASE(SCTP_E_ASSOCIATE);
00076 CASE(SCTP_E_SHUTDOWN);
00077 CASE(SCTP_E_CLOSE);
00078 CASE(SCTP_E_ABORT);
00079 CASE(SCTP_E_SEND);
00080 CASE(SCTP_E_RCV_INIT);
00081 CASE(SCTP_E_RCV_ABORT);
00082 CASE(SCTP_E_RCV_VALID_COOKIE_ECHO);
00083 CASE(SCTP_E_RCV_INIT_ACK);
00084 CASE(SCTP_E_RCV_COOKIE_ACK);
00085 CASE(SCTP_E_RCV_SHUTDOWN);
00086 CASE(SCTP_E_RCV_SHUTDOWN_ACK);
00087 CASE(SCTP_E_RCV_SHUTDOWN_COMPLETE);
00088 CASE(SCTP_E_TIMEOUT_INIT_TIMER);
00089 CASE(SCTP_E_TIMEOUT_SHUTDOWN_TIMER);
00090 CASE(SCTP_E_TIMEOUT_RTX_TIMER);
00091 CASE(SCTP_E_TIMEOUT_HEARTBEAT_TIMER);
00092 CASE(SCTP_E_RECEIVE);
00093 CASE(SCTP_E_DUP_RECEIVED);
00094 CASE(SCTP_E_PRIMARY);
00095 CASE(SCTP_E_QUEUE_MSGS_LIMIT);
00096 CASE(SCTP_E_QUEUE_BYTES_LIMIT);
00097 CASE(SCTP_E_NO_MORE_OUTSTANDING);
00098 CASE(SCTP_E_IGNORE);
00099 CASE(SCTP_E_DELIVERED);
00100 CASE(SCTP_E_SEND_SHUTDOWN_ACK);
00101 CASE(SCTP_E_STOP_SENDING);
00102 }
00103 return s;
00104 #undef CASE
00105 }
00106
00107 const char* SCTPAssociation::indicationName(const int32 code)
00108 {
00109 #define CASE(x) case x: s=#x+7; break
00110 const char* s = "unknown";
00111 switch (code) {
00112 CASE(SCTP_I_DATA);
00113 CASE(SCTP_I_DATA_NOTIFICATION);
00114 CASE(SCTP_I_ESTABLISHED);
00115 CASE(SCTP_I_PEER_CLOSED);
00116 CASE(SCTP_I_CLOSED);
00117 CASE(SCTP_I_CONNECTION_REFUSED);
00118 CASE(SCTP_I_CONNECTION_RESET);
00119 CASE(SCTP_I_TIMED_OUT);
00120 CASE(SCTP_I_STATUS);
00121 CASE(SCTP_I_ABORT);
00122 CASE(SCTP_I_SHUTDOWN_RECEIVED);
00123 CASE(SCTP_I_SEND_MSG);
00124 CASE(SCTP_I_SENDQUEUE_FULL);
00125 CASE(SCTP_I_SENDQUEUE_ABATED);
00126 }
00127 return s;
00128 #undef CASE
00129 }
00130
00131
00132 uint32 SCTPAssociation::chunkToInt(const char* type)
00133 {
00134 if (strcmp(type, "DATA")==0) return 0;
00135 if (strcmp(type, "INIT")==0) return 1;
00136 if (strcmp(type, "INIT_ACK")==0) return 2;
00137 if (strcmp(type, "SACK")==0) return 3;
00138 if (strcmp(type, "HEARTBEAT")==0) return 4;
00139 if (strcmp(type, "HEARTBEAT_ACK")==0) return 5;
00140 if (strcmp(type, "ABORT")==0) return 6;
00141 if (strcmp(type, "SHUTDOWN")==0) return 7;
00142 if (strcmp(type, "SHUTDOWN_ACK")==0) return 8;
00143 if (strcmp(type, "ERRORTYPE")==0) return 9;
00144 if (strcmp(type, "COOKIE_ECHO")==0) return 10;
00145 if (strcmp(type, "COOKIE_ACK")==0) return 11;
00146 if (strcmp(type, "SHUTDOWN_COMPLETE")==0) return 14;
00147 sctpEV3<<"ChunkConversion not successful\n";
00148 return 0;
00149 }
00150
00151 void SCTPAssociation::printConnBrief()
00152 {
00153 sctpEV3 << "Connection " << this << " ";
00154 sctpEV3 << localAddr << ":" << localPort << " to " << remoteAddr << ":" << remotePort;
00155 sctpEV3 << " on app[" << appGateIndex << "],assocId=" << assocId;
00156 sctpEV3 << " in " << stateName(fsm->getState()) << "\n";
00157 }
00158
00159 void SCTPAssociation::printSegmentBrief(SCTPMessage *sctpmsg)
00160 {
00161 sctpEV3 << "." << sctpmsg->getSrcPort() << " > ";
00162 sctpEV3 << "." << sctpmsg->getDestPort() << ": ";
00163 sctpEV3 << "initTag "<< sctpmsg->getTag() << "\n";
00164 }
00165
00166 SCTPAssociation* SCTPAssociation::cloneAssociation()
00167 {
00168 SCTPAssociation* assoc = new SCTPAssociation(sctpMain,appGateIndex,assocId);
00169 const char* queueClass = transmissionQ->getClassName();
00170 assoc->transmissionQ = check_and_cast<SCTPQueue *>(createOne(queueClass));
00171 assoc->retransmissionQ = check_and_cast<SCTPQueue *>(createOne(queueClass));
00172
00173 const char* sctpAlgorithmClass = sctpAlgorithm->getClassName();
00174 assoc->sctpAlgorithm = check_and_cast<SCTPAlgorithm *>(createOne(sctpAlgorithmClass));
00175 assoc->sctpAlgorithm->setAssociation(assoc);
00176 assoc->sctpAlgorithm->initialize();
00177 assoc->state = assoc->sctpAlgorithm->createStateVariables();
00178
00179 assoc->state->active = false;
00180 assoc->state->fork = true;
00181 assoc->localAddr = localAddr;
00182 assoc->localPort = localPort;
00183 assoc->localAddressList = localAddressList;
00184
00185 FSM_Goto((*assoc->fsm), SCTP_S_CLOSED);
00186 sctpMain->printInfoConnMap();
00187 return assoc;
00188 }
00189
00190 void SCTPAssociation::recordInPathVectors(SCTPMessage* pMsg,
00191 const IPvXAddress& rDest)
00192 {
00193 uint32 n_chunks = pMsg->getChunksArraySize();
00194 if (n_chunks == 0)
00195 return;
00196
00197 SCTPPathVariables* p_path = getPath(rDest);
00198
00199 for (uint32 i = 0 ; i < n_chunks ; i++) {
00200 const SCTPChunk* p_chunk = check_and_cast<const SCTPChunk *>(pMsg->getChunks(i));
00201 if (p_chunk->getChunkType() == DATA) {
00202 const SCTPDataChunk* p_data_chunk = check_and_cast<const SCTPDataChunk *>(p_chunk);
00203 p_path->pathTSN->record(p_data_chunk->getTsn());
00204 } else if (p_chunk->getChunkType() == HEARTBEAT) {
00205 p_path->numberOfHeartbeatsSent++;
00206 p_path->pathHb->record(p_path->numberOfHeartbeatsSent);
00207 } else if (p_chunk->getChunkType() == HEARTBEAT_ACK) {
00208 p_path->numberOfHeartbeatAcksSent++;
00209 p_path->pathHbAck->record(p_path->numberOfHeartbeatAcksSent);
00210 }
00211 }
00212 }
00213
00214 void SCTPAssociation::sendToIP(SCTPMessage* sctpmsg,
00215 const IPvXAddress& dest,
00216 const bool qs)
00217 {
00218
00219 sctpmsg->setSrcPort(localPort);
00220 sctpmsg->setDestPort(remotePort);
00221 sctpmsg->setChecksumOk(true);
00222 const SCTPChunk* chunk = (const SCTPChunk*)(sctpmsg->peekFirstChunk());
00223 if (chunk->getChunkType() == ABORT) {
00224 const SCTPAbortChunk* abortChunk = check_and_cast<const SCTPAbortChunk *>(chunk);
00225 if (abortChunk->getT_Bit() == 1) {
00226 sctpmsg->setTag(peerVTag);
00227 }
00228 else {
00229 sctpmsg->setTag(localVTag);
00230 }
00231 }
00232 else if (sctpmsg->getTag() == 0) {
00233 sctpmsg->setTag(localVTag);
00234 }
00235
00236 if ((bool)sctpMain->par("udpEncapsEnabled")) {
00237 sctpmsg->setKind(UDP_C_DATA);
00238 UDPControlInfo* controlInfo = new UDPControlInfo();
00239 controlInfo->setSrcPort(9899);
00240 controlInfo->setDestAddr(remoteAddr.get4());
00241 controlInfo->setDestPort(9899);
00242 sctpmsg->setControlInfo(controlInfo);
00243 }
00244 else {
00245 if (dest.isIPv6()) {
00246 IPv6ControlInfo* controlInfo = new IPv6ControlInfo();
00247 controlInfo->setProtocol(IP_PROT_SCTP);
00248 controlInfo->setSrcAddr(IPv6Address());
00249 controlInfo->setDestAddr(dest.get6());
00250 sctpmsg->setControlInfo(controlInfo);
00251 sctpMain->send(sctpmsg, "to_ipv6");
00252 }
00253 else {
00254 IPControlInfo* controlInfo = new IPControlInfo();
00255 controlInfo->setProtocol(IP_PROT_SCTP);
00256 controlInfo->setSrcAddr(IPAddress("0.0.0.0"));
00257 controlInfo->setDestAddr(dest.get4());
00258 sctpmsg->setControlInfo(controlInfo);
00259 sctpMain->send(sctpmsg, "to_ip");
00260 }
00261 recordInPathVectors(sctpmsg, dest);
00262 }
00263 sctpEV3 << "Sent to " << dest << endl;
00264 }
00265
00266
00267 void SCTPAssociation::signalConnectionTimeout()
00268 {
00269 sendIndicationToApp(SCTP_I_TIMED_OUT);
00270 }
00271
00272 void SCTPAssociation::sendIndicationToApp(const int32 code, const int32 value)
00273 {
00274 sctpEV3<<"sendIndicationToApp: " << indicationName(code) << endl;
00275
00276 cPacket* msg = new cPacket(indicationName(code));
00277 msg->setKind(code);
00278
00279 SCTPCommand* indication = new SCTPCommand(indicationName(code));
00280 indication->setAssocId(assocId);
00281 indication->setLocalAddr(localAddr);
00282 indication->setRemoteAddr(remoteAddr);
00283 if (code == SCTP_I_SENDQUEUE_ABATED) {
00284 indication->setNumMsgs(value);
00285 }
00286 msg->setControlInfo(indication);
00287 sctpMain->send(msg, "to_appl", appGateIndex);
00288 }
00289
00290 void SCTPAssociation::sendEstabIndicationToApp()
00291 {
00292 sctpEV3 << "sendEstabIndicationToApp: localPort="
00293 << localPort << " remotePort=" << remotePort << endl;
00294
00295 cPacket* msg = new cPacket(indicationName(SCTP_I_ESTABLISHED));
00296 msg->setKind(SCTP_I_ESTABLISHED);
00297
00298 SCTPConnectInfo* establishIndication = new SCTPConnectInfo("CI");
00299 establishIndication->setAssocId(assocId);
00300 establishIndication->setLocalAddr(localAddr);
00301 establishIndication->setRemoteAddr(remoteAddr);
00302 establishIndication->setLocalPort(localPort);
00303 establishIndication->setRemotePort(remotePort);
00304 establishIndication->setRemoteAddresses(remoteAddressList);
00305 establishIndication->setInboundStreams(inboundStreams);
00306 establishIndication->setOutboundStreams(outboundStreams);
00307 establishIndication->setNumMsgs(state->sendQueueLimit);
00308 msg->setControlInfo(establishIndication);
00309 sctpMain->send(msg, "to_appl", appGateIndex);
00310
00311 }
00312
00313 void SCTPAssociation::sendToApp(cPacket *msg)
00314 {
00315 sctpMain->send(msg, "to_appl", appGateIndex);
00316 }
00317
00318 void SCTPAssociation::initAssociation(SCTPOpenCommand *openCmd)
00319 {
00320 sctpEV3<<"SCTPAssociationUtil:initAssociation\n";
00321
00322 const char *queueClass = openCmd->getQueueClass();
00323 transmissionQ = check_and_cast<SCTPQueue *>(createOne(queueClass));
00324
00325 retransmissionQ = check_and_cast<SCTPQueue *>(createOne(queueClass));
00326 outboundStreams = openCmd->getOutboundStreams();
00327
00328 const char *sctpAlgorithmClass = openCmd->getSctpAlgorithmClass();
00329 if (!sctpAlgorithmClass || !sctpAlgorithmClass[0])
00330 sctpAlgorithmClass = sctpMain->par("sctpAlgorithmClass");
00331 sctpAlgorithm = check_and_cast<SCTPAlgorithm *>(createOne(sctpAlgorithmClass));
00332 sctpAlgorithm->setAssociation(this);
00333 sctpAlgorithm->initialize();
00334
00335 state = sctpAlgorithm->createStateVariables();
00336 }
00337
00338
00339 void SCTPAssociation::sendInit()
00340 {
00341
00342 InterfaceTableAccess interfaceTableAccess;
00343 AddressVector adv;
00344 uint32 length = SCTP_INIT_CHUNK_LENGTH;
00345
00346 if (remoteAddr.isUnspecified() || remotePort==0)
00347 opp_error("Error processing command ASSOCIATE: foreign socket unspecified");
00348 if (localPort==0)
00349 opp_error("Error processing command ASSOCIATE: local port unspecified");
00350 state->setPrimaryPath(getPath(remoteAddr));
00351
00352 SCTPMessage *sctpmsg = new SCTPMessage();
00353 sctpmsg->setBitLength(SCTP_COMMON_HEADER*8);
00354 SCTPInitChunk *initChunk = new SCTPInitChunk("INIT");
00355 initChunk->setChunkType(INIT);
00356 initChunk->setInitTag((uint32)(fmod(intrand(INT32_MAX), 1.0+(double)(unsigned)0xffffffffUL)) & 0xffffffffUL);
00357
00358 peerVTag = initChunk->getInitTag();
00359 sctpEV3<<"INIT from "<<localAddr<<":InitTag="<<peerVTag<<"\n";
00360 initChunk->setA_rwnd(sctpMain->par("arwnd"));
00361 state->localRwnd = (long)sctpMain->par("arwnd");
00362 initChunk->setNoOutStreams(outboundStreams);
00363 initChunk->setNoInStreams(inboundStreams);
00364 initChunk->setInitTSN(1000);
00365 state->nextTSN=initChunk->getInitTSN();
00366 state->lastTSN = initChunk->getInitTSN() + state->numRequests - 1;
00367 initTsn=initChunk->getInitTSN();
00368 IInterfaceTable *ift = interfaceTableAccess.get();
00369 sctpEV3<<"add local address\n";
00370 if (localAddressList.front() == IPvXAddress("0.0.0.0"))
00371 {
00372 for (int32 i=0; i<ift->getNumInterfaces(); ++i)
00373 {
00374 if (ift->getInterface(i)->ipv4Data()!=NULL)
00375 {
00376 adv.push_back(ift->getInterface(i)->ipv4Data()->getIPAddress());
00377 }
00378 else if (ift->getInterface(i)->ipv6Data()!=NULL)
00379 {
00380 for (int32 j=0; j<ift->getInterface(i)->ipv6Data()->getNumAddresses(); j++)
00381 {
00382 sctpEV3<<"add address "<<ift->getInterface(i)->ipv6Data()->getAddress(j)<<"\n";
00383 adv.push_back(ift->getInterface(i)->ipv6Data()->getAddress(j));
00384 }
00385 }
00386 }
00387 }
00388 else
00389 {
00390 adv = localAddressList;
00391 sctpEV3<<"gebundene Adresse "<<localAddr<<" wird hinzugefuegt\n";
00392 }
00393 uint32 addrNum=0;
00394 bool friendly = false;
00395 if (remoteAddr.isIPv6())
00396 {
00397 for (AddressVector::iterator i=adv.begin(); i!=adv.end(); ++i)
00398 {
00399 if (!friendly)
00400 {
00401 initChunk->setAddressesArraySize(addrNum+1);
00402 initChunk->setAddresses(addrNum++,(*i));
00403 length+=20;
00404 }
00405 sctpMain->addLocalAddress(this, (*i));
00406 state->localAddresses.push_back((*i));
00407 if (localAddr.isUnspecified())
00408 localAddr=(*i);
00409 }
00410 }
00411 else
00412 {
00413 uint32 rlevel = getLevel(remoteAddr);
00414 sctpEV3<<"level of remote address="<<rlevel<<"\n";
00415 for (AddressVector::iterator i=adv.begin(); i!=adv.end(); ++i)
00416 {
00417 sctpEV3<<"level of address "<<(*i)<<" = "<<getLevel((*i))<<"\n";
00418 if (getLevel((*i))>=rlevel)
00419 {
00420 initChunk->setAddressesArraySize(addrNum+1);
00421 initChunk->setAddresses(addrNum++,(*i));
00422 length+=8;
00423 sctpMain->addLocalAddress(this, (*i));
00424 state->localAddresses.push_back((*i));
00425 if (localAddr.get4().getInt()==0)
00426 localAddr=(*i);
00427 }
00428 else if (rlevel==4 && getLevel((*i))==3 && friendly)
00429 {
00430 sctpMain->addLocalAddress(this, (*i));
00431 state->localAddresses.push_back((*i));
00432 if (localAddr.get4().getInt()==0)
00433 localAddr=(*i);
00434 }
00435 }
00436 }
00437 sctpMain->printInfoConnMap();
00438 initChunk->setBitLength(length*8);
00439 sctpmsg->addChunk(initChunk);
00440
00441 if (remoteAddressList.size()>0)
00442 {
00443 for (AddressVector::iterator it=remoteAddressList.begin(); it!=remoteAddressList.end(); it++)
00444 {
00445 sctpEV3<<__LINE__<<" get new path for "<<(*it)<<"\n";
00446 SCTPPathVariables* path = new SCTPPathVariables((*it), this);
00447 sctpPathMap[(*it)] = path;
00448 qCounter.roomTransQ[(*it)] = 0;
00449 qCounter.bookedTransQ[(*it)] = 0;
00450 qCounter.roomRetransQ[(*it)] = 0;
00451 }
00452 }
00453 else
00454 {
00455 sctpEV3<<__LINE__<<" get new path for "<<remoteAddr<<"\n";
00456 SCTPPathVariables* path = new SCTPPathVariables(remoteAddr, this);
00457 sctpPathMap[remoteAddr] = path;
00458 qCounter.roomTransQ[remoteAddr] = 0;
00459 qCounter.bookedTransQ[remoteAddr] = 0;
00460 qCounter.roomRetransQ[remoteAddr] = 0;
00461 }
00462
00463 state->initChunk=check_and_cast<SCTPInitChunk *>(initChunk->dup());
00464 state->initChunk->setName("StateInitChunk");
00465 printSctpPathMap();
00466 sctpEV3<<getFullPath()<<" sendInit: localVTag="<<localVTag<<" peerVTag="<<peerVTag<<"\n";
00467 sendToIP(sctpmsg);
00468 sctpMain->assocList.push_back(this);
00469 }
00470
00471 void SCTPAssociation::retransmitInit()
00472 {
00473 SCTPMessage *sctpmsg = new SCTPMessage();
00474 sctpmsg->setBitLength(SCTP_COMMON_HEADER*8);
00475 SCTPInitChunk *sctpinit;
00476
00477 sctpEV3<<"Retransmit InitChunk="<<&sctpinit<<"\n";
00478
00479 sctpinit=check_and_cast<SCTPInitChunk *>(state->initChunk->dup());
00480 sctpinit->setChunkType(INIT);
00481 sctpmsg->addChunk(sctpinit);
00482
00483 sendToIP(sctpmsg);
00484 }
00485
00486
00487 void SCTPAssociation::sendInitAck(SCTPInitChunk* initChunk)
00488 {
00489 uint32 length = SCTP_INIT_CHUNK_LENGTH;
00490
00491 state->setPrimaryPath(getPath(remoteAddr));
00492
00493 SCTPMessage *sctpinitack = new SCTPMessage();
00494 sctpinitack->setBitLength(SCTP_COMMON_HEADER*8);
00495
00496 sctpinitack->setSrcPort(localPort);
00497 sctpinitack->setDestPort(remotePort);
00498 sctpEV3<<"sendInitAck at "<<localAddr<<". Provided InitTag="<<initChunk->getInitTag()<<"\n";
00499 SCTPInitAckChunk *initAckChunk = new SCTPInitAckChunk("INIT_ACK");
00500 initAckChunk->setChunkType(INIT_ACK);
00501 SCTPCookie *cookie = new SCTPCookie("CookieUtil");
00502 cookie->setCreationTime(simTime());
00503 cookie->setLocalTieTagArraySize(32);
00504 cookie->setPeerTieTagArraySize(32);
00505 if (fsm->getState()==SCTP_S_CLOSED)
00506 {
00507 while (peerVTag==0)
00508 {
00509 peerVTag = (uint32)intrand(INT32_MAX);
00510 }
00511 initAckChunk->setInitTag(peerVTag);
00512 initAckChunk->setInitTSN(2000);
00513 state->nextTSN=initAckChunk->getInitTSN();
00514 state->lastTSN = initAckChunk->getInitTSN() + state->numRequests - 1;
00515 cookie->setLocalTag(localVTag);
00516 cookie->setPeerTag(peerVTag);
00517 for (int32 i=0; i<32; i++)
00518 {
00519 cookie->setLocalTieTag(i,0);
00520 cookie->setPeerTieTag(i,0);
00521 }
00522 sctpinitack->setTag(localVTag);
00523 sctpEV3<<"state=closed: localVTag="<<localVTag<<" peerVTag="<<peerVTag<<"\n";
00524 }
00525 else if (fsm->getState()==SCTP_S_COOKIE_WAIT || fsm->getState()==SCTP_S_COOKIE_ECHOED)
00526 {
00527 initAckChunk->setInitTag(peerVTag);
00528 sctpEV3<<"different state:set InitTag in InitAck: "<<initAckChunk->getInitTag()<<"\n";
00529 initAckChunk->setInitTSN(state->nextTSN);
00530 initPeerTsn=initChunk->getInitTSN();
00531 state->cTsnAck = initPeerTsn - 1;
00532 cookie->setLocalTag(initChunk->getInitTag());
00533 cookie->setPeerTag(peerVTag);
00534 for (int32 i=0; i<32; i++)
00535 {
00536 cookie->setPeerTieTag(i,(uint8)(intrand(256)));
00537 state->peerTieTag[i] = cookie->getPeerTieTag(i);
00538 if (fsm->getState()==SCTP_S_COOKIE_ECHOED)
00539 {
00540 cookie->setLocalTieTag(i,(uint8)(intrand(256)));
00541 state->localTieTag[i] = cookie->getLocalTieTag(i);
00542 }
00543 else
00544 cookie->setLocalTieTag(i,0);
00545 }
00546 sctpinitack->setTag(initChunk->getInitTag());
00547 sctpEV3<<"VTag in InitAck: "<<sctpinitack->getTag()<<"\n";
00548 }
00549 else
00550 {
00551 sctpEV3<<"other state\n";
00552 uint32 tag=0;
00553 while (tag==0)
00554 {
00555 tag = (uint32)(fmod(intrand(INT32_MAX), 1.0+(double)(unsigned)0xffffffffUL)) & 0xffffffffUL;
00556 }
00557 initAckChunk->setInitTag(tag);
00558 initAckChunk->setInitTSN(state->nextTSN);
00559 cookie->setLocalTag(localVTag);
00560 cookie->setPeerTag(peerVTag);
00561 for (int32 i=0; i<32; i++)
00562 {
00563 cookie->setPeerTieTag(i,state->peerTieTag[i]);
00564 cookie->setLocalTieTag(i,state->localTieTag[i]);
00565 }
00566 sctpinitack->setTag(initChunk->getInitTag());
00567 }
00568 cookie->setBitLength(SCTP_COOKIE_LENGTH*8);
00569 initAckChunk->setStateCookie(cookie);
00570 initAckChunk->setCookieArraySize(0);
00571 initAckChunk->setA_rwnd(sctpMain->par("arwnd"));
00572 state->localRwnd = (long)sctpMain->par("arwnd");
00573 initAckChunk->setNoOutStreams((unsigned int)min(outboundStreams,initChunk->getNoInStreams()));
00574 initAckChunk->setNoInStreams((unsigned int)min(inboundStreams,initChunk->getNoOutStreams()));
00575 initTsn=initAckChunk->getInitTSN();
00576 uint32 addrNum=0;
00577 bool friendly = false;
00578 if (!friendly)
00579 for (AddressVector::iterator k=state->localAddresses.begin(); k!=state->localAddresses.end(); ++k)
00580 {
00581 initAckChunk->setAddressesArraySize(addrNum+1);
00582 initAckChunk->setAddresses(addrNum++,(*k));
00583 length+=8;
00584 }
00585 uint32 unknownLen = initChunk->getUnrecognizedParametersArraySize();
00586 if (unknownLen>0)
00587 {
00588 sctpEV3<<"Found unrecognized Parameters in INIT chunk with a length of "<<unknownLen<<" bytes.\n";
00589 initAckChunk->setUnrecognizedParametersArraySize(unknownLen);
00590 for (uint32 i=0; i<unknownLen; i++)
00591 initAckChunk->setUnrecognizedParameters(i,initChunk->getUnrecognizedParameters(i));
00592 length+=unknownLen;
00593 }
00594 else
00595 initAckChunk->setUnrecognizedParametersArraySize(0);
00596
00597 initAckChunk->setBitLength((length+initAckChunk->getCookieArraySize())*8 + cookie->getBitLength());
00598 inboundStreams = ((initChunk->getNoOutStreams()<initAckChunk->getNoInStreams())?initChunk->getNoOutStreams():initAckChunk->getNoInStreams());
00599 outboundStreams = ((initChunk->getNoInStreams()<initAckChunk->getNoOutStreams())?initChunk->getNoInStreams():initAckChunk->getNoOutStreams());
00600 (this->*ssFunctions.ssInitStreams)(inboundStreams, outboundStreams);
00601 sctpinitack->addChunk(initAckChunk);
00602 if (fsm->getState()==SCTP_S_CLOSED)
00603 {
00604 sendToIP(sctpinitack, state->initialPrimaryPath);
00605 }
00606 else
00607 {
00608 sendToIP(sctpinitack);
00609
00610 }
00611 sctpMain->assocList.push_back(this);
00612 printSctpPathMap();
00613 }
00614
00615 void SCTPAssociation::sendCookieEcho(SCTPInitAckChunk* initAckChunk)
00616 {
00617 SCTPMessage *sctpcookieecho = new SCTPMessage();
00618 sctpcookieecho->setBitLength(SCTP_COMMON_HEADER*8);
00619
00620 sctpEV3<<"SCTPAssociationUtil:sendCookieEcho\n";
00621
00622 sctpcookieecho->setSrcPort(localPort);
00623 sctpcookieecho->setDestPort(remotePort);
00624 SCTPCookieEchoChunk* cookieEchoChunk=new SCTPCookieEchoChunk("COOKIE_ECHO");
00625 cookieEchoChunk->setChunkType(COOKIE_ECHO);
00626 int32 len = initAckChunk->getCookieArraySize();
00627 cookieEchoChunk->setCookieArraySize(len);
00628 if (len>0)
00629 {
00630 for (int32 i=0; i<len; i++)
00631 cookieEchoChunk->setCookie(i, initAckChunk->getCookie(i));
00632 cookieEchoChunk->setBitLength((SCTP_COOKIE_ACK_LENGTH+len)*8);
00633 }
00634 else
00635 {
00636 SCTPCookie* cookie = check_and_cast <SCTPCookie*> (initAckChunk->getStateCookie());
00637 cookieEchoChunk->setStateCookie(cookie);
00638 cookieEchoChunk->setBitLength(SCTP_COOKIE_ACK_LENGTH*8 + cookie->getBitLength());
00639 }
00640 uint32 unknownLen = initAckChunk->getUnrecognizedParametersArraySize();
00641 if (unknownLen>0)
00642 {
00643 sctpEV3<<"Found unrecognized Parameters in INIT-ACK chunk with a length of "<<unknownLen<<" bytes.\n";
00644 cookieEchoChunk->setUnrecognizedParametersArraySize(unknownLen);
00645 for (uint32 i=0; i<unknownLen; i++)
00646 cookieEchoChunk->setUnrecognizedParameters(i,initAckChunk->getUnrecognizedParameters(i));
00647 }
00648 else
00649 cookieEchoChunk->setUnrecognizedParametersArraySize(0);
00650 state->cookieChunk=check_and_cast<SCTPCookieEchoChunk*>(cookieEchoChunk->dup());
00651 if (len==0)
00652 {
00653 state->cookieChunk->setStateCookie(initAckChunk->getStateCookie()->dup());
00654 }
00655 sctpcookieecho->addChunk(cookieEchoChunk);
00656 sendToIP(sctpcookieecho);
00657 }
00658
00659
00660 void SCTPAssociation::retransmitCookieEcho()
00661 {
00662 SCTPMessage* sctpmsg = new SCTPMessage();
00663 sctpmsg->setBitLength(SCTP_COMMON_HEADER*8);
00664 SCTPCookieEchoChunk* cookieEchoChunk=check_and_cast<SCTPCookieEchoChunk*>(state->cookieChunk->dup());
00665 if (cookieEchoChunk->getCookieArraySize()==0)
00666 {
00667 cookieEchoChunk->setStateCookie(state->cookieChunk->getStateCookie()->dup());
00668 }
00669 sctpmsg->addChunk(cookieEchoChunk);
00670
00671 sctpEV3<<"retransmitCookieEcho localAddr="<<localAddr<<" remoteAddr"<<remoteAddr<<"\n";
00672
00673 sendToIP(sctpmsg);
00674 }
00675
00676 void SCTPAssociation::sendHeartbeat(const SCTPPathVariables* path)
00677 {
00678 SCTPMessage* sctpHeartbeatbeat = new SCTPMessage();
00679 sctpHeartbeatbeat->setBitLength(SCTP_COMMON_HEADER*8);
00680
00681 sctpHeartbeatbeat->setSrcPort(localPort);
00682 sctpHeartbeatbeat->setDestPort(remotePort);
00683 SCTPHeartbeatChunk* heartbeatChunk = new SCTPHeartbeatChunk("HEARTBEAT");
00684 heartbeatChunk->setChunkType(HEARTBEAT);
00685 heartbeatChunk->setRemoteAddr(path->remoteAddress);
00686 heartbeatChunk->setTimeField(simTime());
00687 heartbeatChunk->setBitLength((SCTP_HEARTBEAT_CHUNK_LENGTH+12)*8);
00688 sctpHeartbeatbeat->addChunk(heartbeatChunk);
00689 sctpEV3 << "sendHeartbeat: sendToIP to " << path->remoteAddress << endl;
00690 sendToIP(sctpHeartbeatbeat, path->remoteAddress);
00691 }
00692
00693 void SCTPAssociation::sendHeartbeatAck(const SCTPHeartbeatChunk* heartbeatChunk,
00694 const IPvXAddress& src,
00695 const IPvXAddress& dest)
00696 {
00697 SCTPMessage* sctpHeartbeatAck = new SCTPMessage();
00698 sctpHeartbeatAck->setBitLength(SCTP_COMMON_HEADER*8);
00699 sctpHeartbeatAck->setSrcPort(localPort);
00700 sctpHeartbeatAck->setDestPort(remotePort);
00701 SCTPHeartbeatAckChunk* heartbeatAckChunk=new SCTPHeartbeatAckChunk("HEARTBEAT_ACK");
00702 heartbeatAckChunk->setChunkType(HEARTBEAT_ACK);
00703 heartbeatAckChunk->setRemoteAddr(heartbeatChunk->getRemoteAddr());
00704 heartbeatAckChunk->setTimeField(heartbeatChunk->getTimeField());
00705 const int32 len = heartbeatChunk->getInfoArraySize();
00706 if (len > 0){
00707 heartbeatAckChunk->setInfoArraySize(len);
00708 for (int32 i=0; i<len; i++)
00709 heartbeatAckChunk->setInfo(i,heartbeatChunk->getInfo(i));
00710 }
00711
00712 heartbeatAckChunk->setBitLength(heartbeatChunk->getBitLength());
00713 sctpHeartbeatAck->addChunk(heartbeatAckChunk);
00714
00715 sctpEV3 << "sendHeartbeatAck: sendToIP from " << src << " to " << dest << endl;
00716 sendToIP(sctpHeartbeatAck, dest);
00717 }
00718
00719 void SCTPAssociation::sendCookieAck(const IPvXAddress& dest)
00720 {
00721 SCTPMessage *sctpcookieack = new SCTPMessage();
00722 sctpcookieack->setBitLength(SCTP_COMMON_HEADER*8);
00723
00724 sctpEV3<<"SCTPAssociationUtil:sendCookieACK\n";
00725
00726 sctpcookieack->setSrcPort(localPort);
00727 sctpcookieack->setDestPort(remotePort);
00728 SCTPCookieAckChunk* cookieAckChunk=new SCTPCookieAckChunk("COOKIE_ACK");
00729 cookieAckChunk->setChunkType(COOKIE_ACK);
00730 cookieAckChunk->setBitLength(SCTP_COOKIE_ACK_LENGTH*8);
00731 sctpcookieack->addChunk(cookieAckChunk);
00732 sendToIP(sctpcookieack, dest);
00733 }
00734
00735 void SCTPAssociation::sendShutdownAck(const IPvXAddress& dest)
00736 {
00737 sendOnAllPaths(getPath(dest));
00738 if (getOutstandingBytes() == 0) {
00739 performStateTransition(SCTP_E_NO_MORE_OUTSTANDING);
00740 SCTPMessage *sctpshutdownack = new SCTPMessage();
00741 sctpshutdownack->setBitLength(SCTP_COMMON_HEADER*8);
00742
00743 sctpEV3 << "SCTPAssociationUtil:sendShutdownACK" << endl;
00744
00745 sctpshutdownack->setSrcPort(localPort);
00746 sctpshutdownack->setDestPort(remotePort);
00747 SCTPShutdownAckChunk* shutdownAckChunk=new SCTPShutdownAckChunk("SHUTDOWN_ACK");
00748 shutdownAckChunk->setChunkType(SHUTDOWN_ACK);
00749 shutdownAckChunk->setBitLength(SCTP_COOKIE_ACK_LENGTH*8);
00750 sctpshutdownack->addChunk(shutdownAckChunk);
00751 state->initRexmitTimeout = SCTP_TIMEOUT_INIT_REXMIT;
00752 state->initRetransCounter = 0;
00753 stopTimer(T2_ShutdownTimer);
00754 startTimer(T2_ShutdownTimer,state->initRexmitTimeout);
00755 stopTimer(T5_ShutdownGuardTimer);
00756 startTimer(T5_ShutdownGuardTimer,SHUTDOWN_GUARD_TIMEOUT);
00757 state->shutdownAckChunk=check_and_cast<SCTPShutdownAckChunk*>(shutdownAckChunk->dup());
00758 sendToIP(sctpshutdownack, dest);
00759 }
00760 }
00761
00762 void SCTPAssociation::sendShutdownComplete()
00763 {
00764 SCTPMessage *sctpshutdowncomplete = new SCTPMessage();
00765 sctpshutdowncomplete->setBitLength(SCTP_COMMON_HEADER*8);
00766
00767 sctpEV3<<"SCTPAssociationUtil:sendShutdownComplete\n";
00768
00769 sctpshutdowncomplete->setSrcPort(localPort);
00770 sctpshutdowncomplete->setDestPort(remotePort);
00771 SCTPShutdownCompleteChunk* shutdownCompleteChunk=new SCTPShutdownCompleteChunk("SHUTDOWN_COMPLETE");
00772 shutdownCompleteChunk->setChunkType(SHUTDOWN_COMPLETE);
00773 shutdownCompleteChunk->setTBit(0);
00774 shutdownCompleteChunk->setBitLength(SCTP_SHUTDOWN_ACK_LENGTH*8);
00775 sctpshutdowncomplete->addChunk(shutdownCompleteChunk);
00776 sendToIP(sctpshutdowncomplete);
00777 }
00778
00779
00780 void SCTPAssociation::sendAbort()
00781 {
00782 SCTPMessage *msg = new SCTPMessage();
00783 msg->setBitLength(SCTP_COMMON_HEADER*8);
00784
00785 sctpEV3<<"SCTPAssociationUtil:sendABORT localPort="<<localPort<<" remotePort="<<remotePort<<"\n";
00786
00787 msg->setSrcPort(localPort);
00788 msg->setDestPort(remotePort);
00789 SCTPAbortChunk* abortChunk = new SCTPAbortChunk("ABORT");
00790 abortChunk->setChunkType(ABORT);
00791 abortChunk->setT_Bit(0);
00792 abortChunk->setBitLength(SCTP_ABORT_CHUNK_LENGTH*8);
00793 msg->addChunk(abortChunk);
00794 sendToIP(msg, remoteAddr);
00795 }
00796
00797 void SCTPAssociation::sendShutdown()
00798 {
00799 SCTPMessage *msg = new SCTPMessage();
00800 msg->setBitLength(SCTP_COMMON_HEADER*8);
00801
00802 sctpEV3<<"SCTPAssociationUtil:sendShutdown localPort="<<localPort<<" remotePort="<<remotePort<<"\n";
00803
00804 msg->setSrcPort(localPort);
00805 msg->setDestPort(remotePort);
00806 SCTPShutdownChunk* shutdownChunk = new SCTPShutdownChunk("SHUTDOWN");
00807 shutdownChunk->setChunkType(SHUTDOWN);
00808
00809 shutdownChunk->setCumTsnAck(state->cTsnAck);
00810 shutdownChunk->setBitLength(SCTP_SHUTDOWN_CHUNK_LENGTH*8);
00811 state->initRexmitTimeout = SCTP_TIMEOUT_INIT_REXMIT;
00812 state->initRetransCounter = 0;
00813 stopTimer(T5_ShutdownGuardTimer);
00814 startTimer(T5_ShutdownGuardTimer,SHUTDOWN_GUARD_TIMEOUT);
00815 stopTimer(T2_ShutdownTimer);
00816 startTimer(T2_ShutdownTimer,state->initRexmitTimeout);
00817 state->shutdownChunk=check_and_cast<SCTPShutdownChunk*>(shutdownChunk->dup());
00818 msg->addChunk(shutdownChunk);
00819 sendToIP(msg, remoteAddr);
00820 performStateTransition(SCTP_E_NO_MORE_OUTSTANDING);
00821 }
00822
00823
00824 void SCTPAssociation::retransmitShutdown()
00825 {
00826 SCTPMessage *sctpmsg = new SCTPMessage();
00827 sctpmsg->setBitLength(SCTP_COMMON_HEADER*8);
00828 SCTPShutdownChunk* shutdownChunk;
00829 shutdownChunk=check_and_cast<SCTPShutdownChunk*>(state->shutdownChunk->dup());
00830 sctpmsg->addChunk(shutdownChunk);
00831
00832 sctpEV3<<"retransmitShutdown localAddr="<<localAddr<<" remoteAddr"<<remoteAddr<<"\n";
00833
00834 sendToIP(sctpmsg);
00835 }
00836
00837 void SCTPAssociation::retransmitShutdownAck()
00838 {
00839 SCTPMessage *sctpmsg = new SCTPMessage();
00840 sctpmsg->setBitLength(SCTP_COMMON_HEADER*8);
00841 SCTPShutdownAckChunk* shutdownAckChunk;
00842 shutdownAckChunk=check_and_cast<SCTPShutdownAckChunk*>(state->shutdownAckChunk->dup());
00843 sctpmsg->addChunk(shutdownAckChunk);
00844
00845 sctpEV3<<"retransmitShutdownAck localAddr="<<localAddr<<" remoteAddr"<<remoteAddr<<"\n";
00846
00847 sendToIP(sctpmsg);
00848 }
00849
00850
00851 void SCTPAssociation::scheduleSack()
00852 {
00853
00854 if (state->firstChunkReceived)
00855 state->ackState++;
00856 else
00857 {
00858 state->ackState = sackFrequency;
00859 state->firstChunkReceived = true;
00860 }
00861
00862 sctpEV3<<"scheduleSack() : ackState is now: "<<state->ackState<<"\n";
00863
00864 if (state->ackState <= sackFrequency - 1)
00865 {
00866
00867 if (!SackTimer->isScheduled())
00868 {
00869 startTimer(SackTimer, sackPeriod);
00870 }
00871 else {
00872
00873
00874 sctpEV3<<"SACK timer running, but scheduleSack() called\n";
00875
00876 }
00877 }
00878 }
00879
00880
00881 SCTPSackChunk* SCTPAssociation::createSack()
00882 {
00883 uint32 key=0, arwnd=0;
00884
00885 sctpEV3<<"SCTPAssociationUtil:createSACK localAddress="<<localAddr<<" remoteAddress="<<remoteAddr<<"\n";
00886
00887 sctpEV3<<" localRwnd="<<state->localRwnd<<" queuedBytes="<<state->queuedReceivedBytes<<"\n";
00888 if ((int32)(state->localRwnd - state->queuedReceivedBytes) <= 0)
00889 {
00890 arwnd = 0;
00891 if (state->swsLimit > 0)
00892 state->swsAvoidanceInvoked = true;
00893 }
00894 else if (state->localRwnd - state->queuedReceivedBytes < state->swsLimit || state->swsAvoidanceInvoked == true)
00895 {
00896 arwnd = 1;
00897 if (state->swsLimit > 0)
00898 state->swsAvoidanceInvoked = true;
00899 sctpEV3<<"arwnd=1; createSack : SWS Avoidance ACTIVE !!!\n";
00900 }
00901 else
00902 {
00903 arwnd = state->localRwnd - state->queuedReceivedBytes;
00904 sctpEV3<<simTime()<<" arwnd = "<<state->localRwnd<<" - "<<state->queuedReceivedBytes<<" = "<<arwnd<<"\n";
00905 }
00906 advRwnd->record(arwnd);
00907 SCTPSackChunk* sackChunk=new SCTPSackChunk("SACK");
00908 sackChunk->setChunkType(SACK);
00909 sackChunk->setCumTsnAck(state->cTsnAck);
00910 sackChunk->setA_rwnd(arwnd);
00911 uint32 numGaps=state->numGaps;
00912 uint32 numDups=state->dupList.size();
00913 uint16 sackLength=SCTP_SACK_CHUNK_LENGTH + numGaps*4 + numDups*4;
00914 uint32 mtu = getPath(remoteAddr)->pmtu;
00915
00916 if (sackLength > mtu-32)
00917 {
00918 if (SCTP_SACK_CHUNK_LENGTH + numGaps*4 > mtu-32)
00919 {
00920 numDups = 0;
00921 numGaps = (uint32)((mtu-32-SCTP_SACK_CHUNK_LENGTH)/4);
00922 }
00923 else
00924 {
00925 numDups = (uint32)((mtu-32-SCTP_SACK_CHUNK_LENGTH - numGaps*4)/4);
00926 }
00927 sackLength=SCTP_SACK_CHUNK_LENGTH + numGaps*4 + numDups*4;
00928 }
00929 sackChunk->setNumGaps(numGaps);
00930 sackChunk->setNumDupTsns(numDups);
00931 sackChunk->setBitLength(sackLength*8);
00932
00933 sctpEV3<<"Sack arwnd="<<sackChunk->getA_rwnd()<<" ctsnAck="<<state->cTsnAck<<" numGaps="<<numGaps<<" numDups="<<numDups<<"\n";
00934
00935 if (numGaps > 0)
00936 {
00937 sackChunk->setGapStartArraySize(numGaps);
00938 sackChunk->setGapStopArraySize(numGaps);
00939
00940 uint32 last = state->cTsnAck;
00941 for (key=0; key<numGaps; key++)
00942 {
00943
00944 assert(tsnGt(state->gapStartList[key], last + 1));
00945 assert(tsnGe(state->gapStopList[key], state->gapStartList[key]));
00946 last = state->gapStopList[key];
00947
00948 sackChunk->setGapStart(key, state->gapStartList[key]);
00949 sackChunk->setGapStop(key, state->gapStopList[key]);
00950 }
00951 }
00952 if (numDups > 0)
00953 {
00954 sackChunk->setDupTsnsArraySize(numDups);
00955 key=0;
00956 for(std::list<uint32>::iterator iter=state->dupList.begin(); iter!=state->dupList.end(); iter++)
00957 {
00958 sackChunk->setDupTsns(key, (*iter));
00959 key++;
00960 if (key == numDups)
00961 break;
00962 }
00963 state->dupList.clear();
00964 }
00965 sctpEV3<<endl;
00966 for (uint32 i=0; i<numGaps; i++)
00967 sctpEV3<<sackChunk->getGapStart(i)<<" - "<<sackChunk->getGapStop(i)<<"\n";
00968
00969 sctpEV3<<"send "<<sackChunk->getName()<<" from "<<localAddr<<" to "<<state->lastDataSourceAddress<<"\n";
00970 return sackChunk;
00971 }
00972
00973 void SCTPAssociation::sendSack()
00974 {
00975 SCTPSackChunk* sackChunk;
00976
00977 sctpEV3 << "Sending SACK" << endl;
00978
00979
00980 stopTimer(SackTimer);
00981 state->ackState = 0;
00982 sackChunk = createSack();
00983
00984 SCTPMessage* sctpmsg = new SCTPMessage();
00985 sctpmsg->setBitLength(SCTP_COMMON_HEADER*8);
00986 sctpmsg->addChunk(sackChunk);
00987
00988
00989 sendToIP(sctpmsg, state->lastDataSourceAddress);
00990 }
00991
00992 void SCTPAssociation::sendDataArrivedNotification(uint16 sid)
00993 {
00994
00995 sctpEV3<<"SendDataArrivedNotification\n";
00996
00997 cPacket* cmsg = new cPacket("DataArrivedNotification");
00998 cmsg->setKind(SCTP_I_DATA_NOTIFICATION);
00999 SCTPCommand *cmd = new SCTPCommand("notification");
01000 cmd->setAssocId(assocId);
01001 cmd->setSid(sid);
01002 cmd->setNumMsgs(1);
01003 cmsg->setControlInfo(cmd);
01004
01005 sendToApp(cmsg);
01006 }
01007
01008
01009 void SCTPAssociation::putInDeliveryQ(uint16 sid)
01010 {
01011 SCTPReceiveStreamMap::iterator iter=receiveStreams.find(sid);
01012 SCTPReceiveStream* rStream = iter->second;
01013 sctpEV3 << "putInDeliveryQ: SSN=" << rStream->getExpectedStreamSeqNum()
01014 << " SID=" << sid
01015 << " QueueSize="<< rStream->getOrderedQ()->getQueueSize() << endl;
01016 while (rStream->getOrderedQ()->getQueueSize()>0)
01017 {
01018
01019 SCTPDataVariables* chunk =
01020 rStream->getOrderedQ()-> dequeueChunkBySSN(rStream->getExpectedStreamSeqNum());
01021 if (chunk) {
01022 sctpEV3 << "putInDeliveryQ::chunk " <<chunk->tsn
01023 <<", sid " << chunk->sid <<" and ssn " << chunk->ssn
01024 <<" dequeued from ordered queue. queuedReceivedBytes="
01025 << state->queuedReceivedBytes << " will be reduced by "
01026 << chunk->len/8 << endl;
01027 state->queuedReceivedBytes-=chunk->len/8;
01028
01029
01030 qCounter.roomSumRcvStreams -= ADD_PADDING(chunk->len/8 + SCTP_DATA_CHUNK_LENGTH);
01031 if (rStream->getDeliveryQ()->checkAndInsertChunk(chunk->tsn, chunk)) {
01032 state->queuedReceivedBytes += chunk->len/8;
01033
01034 sctpEV3 << "data put in deliveryQ; queuedBytes now "
01035 << state->queuedReceivedBytes << endl;
01036 qCounter.roomSumRcvStreams += ADD_PADDING(chunk->len/8 + SCTP_DATA_CHUNK_LENGTH);
01037 int32 seqnum=rStream->getExpectedStreamSeqNum();
01038 rStream->setExpectedStreamSeqNum(++seqnum);
01039 if (rStream->getExpectedStreamSeqNum() > 65535) {
01040 rStream->setExpectedStreamSeqNum(0);
01041 }
01042 sendDataArrivedNotification(sid);
01043 }
01044 }
01045 else {
01046 break;
01047 }
01048 }
01049 }
01050
01051 void SCTPAssociation::pushUlp()
01052 {
01053 int32 count = 0;
01054
01055 for (unsigned int i = 0; i < inboundStreams; i++) {
01056 putInDeliveryQ(i);
01057 }
01058 if (state->pushMessagesLeft <= 0) {
01059 state->pushMessagesLeft = state->messagesToPush;
01060 }
01061 bool restrict = false;
01062 if (state->pushMessagesLeft > 0) {
01063 restrict = true;
01064 }
01065
01066
01067 sctpEV3 << simTime() << " Calling pushUlp(" << state->queuedReceivedBytes
01068 << " bytes queued) ..." << endl;
01069 uint32 i = state->nextRSid;
01070 do {
01071 SCTPReceiveStreamMap::iterator iter = receiveStreams.find(i);
01072 SCTPReceiveStream* rStream = iter->second;
01073 sctpEV3 << "Size of stream " << iter->first << ": "
01074 << rStream->getDeliveryQ()->getQueueSize() << endl;
01075
01076 while ( (!rStream->getDeliveryQ()->payloadQueue.empty()) &&
01077 (!restrict || (restrict && state->pushMessagesLeft>0)) ) {
01078 SCTPDataVariables* chunk = rStream->getDeliveryQ()->extractMessage();
01079 qCounter.roomSumRcvStreams -= ADD_PADDING(chunk->len/8 + SCTP_DATA_CHUNK_LENGTH);
01080
01081 if (state->pushMessagesLeft >0)
01082 state->pushMessagesLeft--;
01083
01084 state->queuedReceivedBytes-=chunk->len/8;
01085 if (state->swsAvoidanceInvoked) {
01086 if ((int32)(state->localRwnd - state->queuedReceivedBytes) >= (int32)(state->swsLimit) &&
01087 (int32)(state->localRwnd - state->queuedReceivedBytes) <= (int32)(state->swsLimit+state->assocPmtu)) {
01088
01089 state->swsAvoidanceInvoked = false;
01090 sctpEV3<<"pushUlp: Window opens up to "<<(int32)state->localRwnd-state->queuedReceivedBytes<<" bytes: sending a SACK. SWS Avoidance INACTIVE\n";
01091
01092 sendSack();
01093 }
01094 }
01095 else if ((int32)(state->swsLimit) == 0) {
01096 sendSack();
01097 }
01098 sctpEV3 << "Push TSN " << chunk->tsn
01099 << ": sid=" << chunk->sid << " ssn=" << chunk->ssn << endl;
01100 cPacket* msg= (cPacket *)chunk->userData;
01101 msg->setKind(SCTP_I_DATA);
01102 SCTPRcvCommand *cmd = new SCTPRcvCommand("push");
01103 cmd->setAssocId(assocId);
01104 cmd->setGate(appGateIndex);
01105 cmd->setSid(chunk->sid);
01106 cmd->setSsn(chunk->ssn);
01107 cmd->setSendUnordered(!chunk->ordered);
01108 cmd->setLocalAddr(localAddr);
01109 cmd->setRemoteAddr(remoteAddr);
01110 cmd->setPpid(chunk->ppid);
01111 cmd->setTsn(chunk->tsn);
01112 cmd->setCumTsn(state->lastTsnAck);
01113 msg->setControlInfo(cmd);
01114 state->numMsgsReq[count]--;
01115 delete chunk;
01116 sendToApp(msg);
01117 }
01118 i = (i + 1) % inboundStreams;
01119 count++;
01120 } while (i != state->nextRSid);
01121
01122 state->nextRSid = (state->nextRSid + 1) % inboundStreams;
01123 if ( (state->queuedReceivedBytes == 0) && (fsm->getState() == SCTP_S_SHUTDOWN_ACK_SENT)) {
01124 sctpEV3 << "SCTP_E_CLOSE" << endl;
01125 performStateTransition(SCTP_E_CLOSE);
01126 }
01127 }
01128
01129 SCTPDataChunk* SCTPAssociation::transformDataChunk(SCTPDataVariables* chunk)
01130 {
01131 SCTPDataChunk* dataChunk = new SCTPDataChunk("DATA");
01132 SCTPSimpleMessage* msg = check_and_cast<SCTPSimpleMessage*>(chunk->userData->dup());
01133 dataChunk->setChunkType(DATA);
01134 dataChunk->setBBit(chunk->bbit);
01135 dataChunk->setEBit(chunk->ebit);
01136 if (chunk->ordered) {
01137 dataChunk->setUBit(0);
01138 }
01139 else {
01140 dataChunk->setUBit(1);
01141 }
01142 dataChunk->setTsn(chunk->tsn);
01143 dataChunk->setSid(chunk->sid);
01144 dataChunk->setSsn(chunk->ssn);
01145 dataChunk->setPpid(chunk->ppid);
01146 dataChunk->setEnqueuingTime(chunk->enqueuingTime);
01147 dataChunk->setBitLength(SCTP_DATA_CHUNK_LENGTH*8);
01148 msg->setBitLength(chunk->len);
01149 dataChunk->encapsulate(msg);
01150 return dataChunk;
01151 }
01152
01153 void SCTPAssociation::addPath(const IPvXAddress& addr)
01154 {
01155 sctpEV3<<"Add Path remote address: "<<addr<<"\n";
01156
01157 SCTPPathMap::iterator i = sctpPathMap.find(addr);
01158 if (i==sctpPathMap.end())
01159 {
01160 sctpEV3<<__LINE__<<" get new path for "<<addr<<"\n";
01161 SCTPPathVariables* path = new SCTPPathVariables(addr, this);
01162 sctpPathMap[addr] = path;
01163 qCounter.roomTransQ[addr] = 0;
01164 qCounter.bookedTransQ[addr] = 0;
01165 qCounter.roomRetransQ[addr] = 0;
01166 }
01167 sctpEV3<<"path added\n";
01168 }
01169
01170 void SCTPAssociation::removePath(const IPvXAddress& addr)
01171 {
01172 SCTPPathMap::iterator pathIterator = sctpPathMap.find(addr);
01173 if (pathIterator != sctpPathMap.end())
01174 {
01175 SCTPPathVariables* path = pathIterator->second;
01176 path->cwnd = 0;
01177 path->ssthresh = 0;
01178 recordCwndUpdate(path);
01179
01180 stopTimer(path->HeartbeatTimer);
01181 delete path->HeartbeatTimer;
01182 stopTimer(path->HeartbeatIntervalTimer);
01183 delete path->HeartbeatIntervalTimer;
01184 stopTimer(path->T3_RtxTimer);
01185 delete path->T3_RtxTimer;
01186 stopTimer(path->CwndTimer);
01187 delete path->CwndTimer;
01188 sctpPathMap.erase(pathIterator);
01189 delete path;
01190 }
01191 }
01192
01193 void SCTPAssociation::deleteStreams()
01194 {
01195 for (SCTPSendStreamMap::iterator it=sendStreams.begin(); it != sendStreams.end(); it++)
01196 {
01197 it->second->deleteQueue();
01198 }
01199 for (SCTPReceiveStreamMap::iterator it=receiveStreams.begin(); it != receiveStreams.end(); it++)
01200 {
01201 delete it->second;
01202 }
01203 }
01204
01205 bool SCTPAssociation::makeRoomForTsn(const uint32 tsn, const uint32 length, const bool uBit)
01206 {
01207 SCTPQueue* stream, dStream;
01208 uint32 sum = 0;
01209 uint32 comp = 0;
01210 bool delQ = false;
01211 uint32 high = state->highestTsnStored;
01212
01213 sctpEV3 << "makeRoomForTsn: tsn=" << tsn
01214 << ", length=" << length << " high=" << high << endl;
01215 while ((sum < length) && (state->highestTsnReceived>state->lastTsnAck)) {
01216 comp = sum;
01217 for (SCTPReceiveStreamMap::iterator iter = receiveStreams.begin();
01218 iter!=receiveStreams.end(); iter++) {
01219 if (tsn > high) {
01220 return false;
01221 }
01222 if (uBit) {
01223 stream = iter->second->getUnorderedQ();
01224 }
01225 else {
01226 stream = iter->second->getOrderedQ();
01227 }
01228 SCTPDataVariables* chunk = stream->getChunk(high);
01229 if (chunk == NULL) {
01230 sctpEV3 << high << " not found in orderedQ. Try deliveryQ" << endl;
01231 stream = iter->second->getDeliveryQ();
01232 chunk = stream->getChunk(high);
01233 delQ = true;
01234 }
01235 if (chunk != NULL) {
01236 sum+=chunk->len;
01237 if (stream->deleteMsg(high)) {
01238 sctpEV3 << high << " found and deleted" << endl;
01239
01240 state->queuedReceivedBytes-=chunk->len/8;
01241 if (ssnGt(iter->second->getExpectedStreamSeqNum(),chunk->ssn)) {
01242 iter->second->setExpectedStreamSeqNum(chunk->ssn);
01243 }
01244 }
01245 qCounter.roomSumRcvStreams -= ADD_PADDING(chunk->len/8 + SCTP_DATA_CHUNK_LENGTH);
01246 if (high == state->highestTsnReceived) {
01247 state->highestTsnReceived--;
01248 }
01249 removeFromGapList(high);
01250
01251 if (tsn > state->highestTsnReceived) {
01252 state->highestTsnReceived = tsn;
01253 }
01254 high--;
01255 break;
01256 }
01257 else {
01258 sctpEV3 << "TSN " << high << " not found in stream "
01259 << iter->second->getStreamId() << endl;
01260 }
01261 }
01262
01263 if (comp == sum) {
01264 sctpEV3 << high << " not found in any stream" << endl;
01265 high--;
01266 }
01267 state->highestTsnStored = high;
01268
01269 if (tsn > state->highestTsnReceived) {
01270 return false;
01271 }
01272 }
01273 return true;
01274 }
01275
01276 bool SCTPAssociation::tsnIsDuplicate(const uint32 tsn) const
01277 {
01278 for (std::list<uint32>::const_iterator iterator = state->dupList.begin();
01279 iterator != state->dupList.end(); iterator++)
01280 {
01281 if ((*iterator) == tsn)
01282 return true;
01283 }
01284 for (uint32 i=0; i < state->numGaps; i++) {
01285 if (tsnBetween(state->gapStartList[i], tsn, state->gapStopList[i])) {
01286 return true;
01287 }
01288 }
01289 return false;
01290 }
01291
01292 void SCTPAssociation::removeFromGapList(uint32 removedTsn)
01293 {
01294 int32 gapsize, numgaps;
01295
01296 numgaps = state->numGaps;
01297 sctpEV3<<"remove TSN "<<removedTsn<<" from GapList. "<<numgaps<<" gaps present, cumTsnAck="<<state->cTsnAck<<"\n";
01298 for (int32 j=0; j<numgaps; j++)
01299 sctpEV3<<state->gapStartList[j]<<" - "<<state->gapStopList[j]<<"\n";
01300 for (int32 i=numgaps-1; i>=0; i--)
01301 {
01302 sctpEV3<<"gapStartList["<<i<<"]="<<state->gapStartList[i]<<", state->gapStopList["<<i<<"]="<<state->gapStopList[i]<<"\n";
01303 if (tsnBetween(state->gapStartList[i], removedTsn, state->gapStopList[i]))
01304 {
01305 gapsize = (int32)(state->gapStopList[i] - state->gapStartList[i]+1);
01306 if (gapsize>1)
01307 {
01308 if (state->gapStopList[i]==removedTsn)
01309 {
01310 state->gapStopList[i]--;
01311 }
01312 else if (state->gapStartList[i]==removedTsn)
01313 {
01314 state->gapStartList[i]++;
01315 }
01316 else
01317 {
01318 for (int32 j=numgaps-1; j>=i; j--)
01319 {
01320 state->gapStopList[j+1] = state->gapStopList[j];
01321 state->gapStartList[j+1] = state->gapStartList[j];
01322 }
01323 state->gapStopList[i] = removedTsn-1;
01324 state->gapStartList[i+1] = removedTsn+1;
01325 state->numGaps = min(state->numGaps + 1, MAX_GAP_COUNT);
01326 }
01327 }
01328 else
01329 {
01330 for (int32 j=i; j<=numgaps-1; j++)
01331 {
01332 state->gapStopList[j] = state->gapStopList[j+1];
01333 state->gapStartList[j] = state->gapStartList[j+1];
01334 }
01335 state->gapStartList[numgaps-1]=0;
01336 state->gapStopList[numgaps-1]=0;
01337 state->numGaps--;
01338 if (state->numGaps == 0)
01339 {
01340 if (removedTsn == state->lastTsnAck+1)
01341 {
01342 state->lastTsnAck = removedTsn;
01343 }
01344 }
01345 }
01346 }
01347 }
01348 if (state->numGaps>0)
01349 state->highestTsnReceived = state->gapStopList[state->numGaps-1];
01350 else
01351 state->highestTsnReceived = state->cTsnAck;
01352 }
01353
01354 bool SCTPAssociation::updateGapList(const uint32 receivedTsn)
01355 {
01356 sctpEV3 << "Entering updateGapList (tsn=" << receivedTsn
01357 << " cTsnAck=" <<state->cTsnAck << " Number of Gaps="
01358 << state->numGaps << endl;
01359
01360 uint32 lo = state->cTsnAck + 1;
01361 if ((int32)(state->localRwnd-state->queuedReceivedBytes) <= 0)
01362 {
01363 sctpEV3 << "Window full" << endl;
01364
01365 if (receivedTsn == lo) {
01366 sctpEV3 << "Window full, but cumTsnAck can be advanced:" << lo << endl;
01367 }
01368 else
01369 return false;
01370 }
01371
01372 if (tsnGt(receivedTsn, state->highestTsnStored)) {
01373 state->highestTsnStored = receivedTsn;
01374 }
01375
01376 for (uint32 i = 0; i<state->numGaps; i++) {
01377 if (state->gapStartList[i] > 0) {
01378 const uint32 hi = state->gapStartList[i] - 1;
01379 if (tsnBetween(lo, receivedTsn, hi)) {
01380 const uint32 gapsize = hi - lo + 1;
01381 if (gapsize > 1) {
01386 if (receivedTsn == hi) {
01387 state->gapStartList[i] = receivedTsn;
01388 state->newChunkReceived = true;
01389 return true;
01390 }
01391 else if (receivedTsn == lo) {
01392 if (receivedTsn == (state->cTsnAck + 1)) {
01393 state->cTsnAck++;
01394 state->newChunkReceived = true;
01395 return true;
01396 }
01397
01398 state->gapStopList[i-1] = receivedTsn;
01399 state->newChunkReceived = true;
01400 return true;
01401 }
01402 else {
01403 state->numGaps = min(state->numGaps + 1, MAX_GAP_COUNT);
01404
01405 for (uint32 j = state->numGaps - 1; j > i; j--) {
01406 state->gapStartList[j] = state->gapStartList[j-1];
01407 state->gapStopList[j] = state->gapStopList[j-1];
01408 }
01409 state->gapStartList[i]=receivedTsn;
01410 state->gapStopList[i]=receivedTsn;
01411 state->newChunkReceived = true;
01412 return true;
01413 }
01414 }
01415 else {
01416 if (lo == state->cTsnAck + 1) {
01417 state->cTsnAck = state->gapStopList[i];
01418 if (i == state->numGaps-1) {
01419 state->gapStartList[i] = 0;
01420 state->gapStopList[i] = 0;
01421 }
01422 else {
01423 for (uint32 j = i; j < state->numGaps - 1; j++) {
01424 state->gapStartList[j] = state->gapStartList[j + 1];
01425 state->gapStopList[j] = state->gapStopList[j + 1];
01426 }
01427 }
01428 state->numGaps--;
01429 state->newChunkReceived = true;
01430 return true;
01431 }
01432 else {
01433 state->gapStopList[i-1] = state->gapStopList[i];
01434 if (i == state->numGaps-1) {
01435 state->gapStartList[i] = 0;
01436 state->gapStopList[i] = 0;
01437 }
01438 else {
01439 for (uint32 j = i; j < state->numGaps - 1; j++) {
01440 state->gapStartList[j] = state->gapStartList[j + 1];
01441 state->gapStopList[j] = state->gapStopList[j + 1];
01442 }
01443 }
01444 state->numGaps--;
01445 state->newChunkReceived = true;
01446 return true;
01447 }
01448 }
01449 }
01450 else {
01451 lo = state->gapStopList[i] + 1;
01452 }
01453 }
01454 }
01455
01456
01457 if (receivedTsn == lo) {
01458 if (receivedTsn == state->cTsnAck + 1) {
01459 state->cTsnAck = receivedTsn;
01460 state->newChunkReceived = true;
01461 return true;
01462 }
01463
01464 state->gapStopList[state->numGaps-1]++;
01465
01466 state->newChunkReceived = true;
01467 return true;
01468
01469 }
01470 else {
01471 if(state->numGaps + 1 <= MAX_GAP_COUNT) {
01472 state->gapStartList[state->numGaps] = receivedTsn;
01473 state->gapStopList[state->numGaps] = receivedTsn;
01474 state->numGaps++;
01475 state->newChunkReceived = true;
01476 }
01477 return true;
01478 }
01479
01480 return false;
01481 }
01482
01483 bool SCTPAssociation::advanceCtsna()
01484 {
01485 int32 listLength, counter;
01486
01487 ev<<"Entering advanceCtsna(ctsna now =="<< state->cTsnAck<<"\n";;
01488
01489 listLength = state->numGaps;
01490
01491
01492 if (listLength == 0) return false;
01493 counter = 0;
01494
01495 while(counter < listLength)
01496 {
01497
01498
01499 if (state->cTsnAck + 1 == state->gapStartList[0])
01500 {
01501
01502 state->cTsnAck = state->gapStopList[0];
01503
01504 counter++;
01505 for (uint32 i=1; i<state->numGaps; i++)
01506 {
01507 state->gapStartList[i-1] = state->gapStartList[i];
01508 state->gapStopList[i-1] = state->gapStopList[i];
01509 }
01510
01511 }
01512 else
01513 {
01514 ev<<"Entering advanceCtsna(when leaving: ctsna=="<<state->cTsnAck<<"\n";
01515 return false;
01516 }
01517
01518 }
01519
01520 ev<<"Entering advanceCtsna(when leaving: ctsna=="<< state->cTsnAck<<"\n";
01521 return true;
01522 }
01523
01524 SCTPDataVariables* SCTPAssociation::makeVarFromMsg(SCTPDataChunk* dataChunk)
01525 {
01526 SCTPDataVariables* chunk = new SCTPDataVariables();
01527
01528 chunk->bbit = dataChunk->getBBit();
01529 chunk->ebit = dataChunk->getEBit();
01530 chunk->sid = dataChunk->getSid();
01531 chunk->ssn = dataChunk->getSsn();
01532 chunk->ppid = dataChunk->getPpid();
01533 chunk->tsn = dataChunk->getTsn();
01534 if (!dataChunk->getUBit()) {
01535 chunk->ordered = true;
01536 }
01537 else {
01538 chunk->ordered = false;
01539 }
01540 SCTPSimpleMessage* smsg=check_and_cast<SCTPSimpleMessage*>(dataChunk->decapsulate());
01541
01542 chunk->userData = smsg;
01543 chunk->len = smsg->getDataLen()*8;
01544
01545 sctpEV3 << "makeVarFromMsg: queuedBytes has been increased to "
01546 << state->queuedReceivedBytes << endl;
01547 return chunk;
01548 }
01549
01550
01551
01552 SCTPDataVariables* SCTPAssociation::getOutboundDataChunk(const SCTPPathVariables* path,
01553 const int32 availableSpace,
01554 const int32 availableCwnd)
01555 {
01556
01557 sctpEV3 << "getOutboundDataChunk(" << path->remoteAddress << "):"
01558 << " availableSpace=" << availableSpace
01559 << " availableCwnd=" << availableCwnd
01560 << endl;
01561 if (!transmissionQ->payloadQueue.empty()) {
01562 for(SCTPQueue::PayloadQueue::iterator it = transmissionQ->payloadQueue.begin();
01563 it != transmissionQ->payloadQueue.end(); it++) {
01564 SCTPDataVariables* chunk = it->second;
01565 if( (chunkHasBeenAcked(chunk) == false) &&
01566 (chunk->getNextDestinationPath() == path) ) {
01567 const int32 len = ADD_PADDING(chunk->len/8+SCTP_DATA_CHUNK_LENGTH);
01568 sctpEV3 << "getOutboundDataChunk() found chunk " << chunk->tsn
01569 <<" in the transmission queue, length=" << len << endl;
01570 if ((len <= availableSpace) &&
01571 ((int32)chunk->booksize <= availableCwnd)) {
01572
01573
01574
01575
01576 transmissionQ->payloadQueue.erase(it);
01577 chunk->enqueuedInTransmissionQ = false;
01578 CounterMap::iterator i = qCounter.roomTransQ.find(path->remoteAddress);
01579 i->second -= ADD_PADDING(chunk->len/8+SCTP_DATA_CHUNK_LENGTH);
01580 CounterMap::iterator ib = qCounter.bookedTransQ.find(path->remoteAddress);
01581 ib->second -= chunk->booksize;
01582 return chunk;
01583 }
01584 }
01585 }
01586 }
01587 return NULL;
01588 }
01589
01590
01591 SCTPDataVariables* SCTPAssociation::peekAbandonedChunk(const SCTPPathVariables* path)
01592 {
01593
01594 if (!retransmissionQ->payloadQueue.empty())
01595 {
01596 for(SCTPQueue::PayloadQueue::iterator it = retransmissionQ->payloadQueue.begin();
01597 it != retransmissionQ->payloadQueue.end(); it++) {
01598 SCTPDataVariables* chunk = it->second;
01599 sctpEV3<<"peek Chunk "<<chunk->tsn<<"\n";
01600 if (chunk->getLastDestinationPath() == path && chunk->hasBeenAbandoned) {
01601 sctpEV3<<"peekAbandonedChunk() found chunk in the retransmission queue\n";
01602 return chunk;
01603 }
01604 }
01605 }
01606 return NULL;
01607 }
01608
01609
01610 SCTPDataMsg* SCTPAssociation::peekOutboundDataMsg()
01611 {
01612 SCTPDataMsg* datMsg=NULL;
01613 int32 nextStream = -1;
01614 nextStream = (this->*ssFunctions.ssGetNextSid)(true);
01615
01616 if (nextStream == -1)
01617 {
01618
01619 sctpEV3<<"peekOutboundDataMsg(): no valid stream found -> returning NULL !\n";
01620
01621 return NULL;
01622 }
01623
01624
01625 for (SCTPSendStreamMap::iterator iter=sendStreams.begin(); iter!=sendStreams.end(); ++iter)
01626 {
01627 if ((int32)iter->first==nextStream)
01628 {
01629 SCTPSendStream* stream=iter->second;
01630 if (!stream->getUnorderedStreamQ()->empty())
01631 {
01632 return (datMsg);
01633
01634 }
01635 if (!stream->getStreamQ()->empty())
01636 {
01637 return (datMsg);
01638
01639 }
01640 }
01641 }
01642 return NULL;
01643
01644 }
01645
01646 SCTPDataMsg* SCTPAssociation::dequeueOutboundDataMsg(const int32 availableSpace,
01647 const int32 availableCwnd)
01648 {
01649 SCTPDataMsg* datMsg=NULL;
01650 int32 nextStream = -1;
01651
01652 sctpEV3<<"dequeueOutboundDataMsg: " << availableSpace <<" bytes left to be sent" << endl;
01653
01654 if (state->lastMsgWasFragment)
01655 nextStream = state->lastStreamScheduled;
01656 else
01657 nextStream = (this->*ssFunctions.ssGetNextSid)(false);
01658
01659 if (nextStream == -1)
01660 return NULL;
01661
01662 sctpEV3<<"dequeueOutboundDataMsg: now stream "<< nextStream << endl;
01663
01664 for (SCTPSendStreamMap::iterator iter=sendStreams.begin(); iter!=sendStreams.end(); ++iter)
01665 {
01666 if ((int32)iter->first==nextStream)
01667 {
01668 SCTPSendStream* stream=iter->second;
01669 cQueue* streamQ = NULL;
01670
01671 if (!stream->getUnorderedStreamQ()->empty())
01672 {
01673 streamQ = stream->getUnorderedStreamQ();
01674 sctpEV3<<"DequeueOutboundDataMsg() found chunks in stream "<<iter->first<<" unordered queue, queue size="<<stream->getUnorderedStreamQ()->getLength()<<"\n";
01675 }
01676 else if (!stream->getStreamQ()->empty())
01677 {
01678 streamQ = stream->getStreamQ();
01679 sctpEV3<<"DequeueOutboundDataMsg() found chunks in stream "<<iter->first<<" ordered queue, queue size="<<stream->getStreamQ()->getLength()<<"\n";
01680 }
01681
01682 if (streamQ)
01683 {
01684 int32 b=ADD_PADDING( (check_and_cast<SCTPSimpleMessage*>(((SCTPDataMsg*)streamQ->front())->getEncapsulatedMsg())->getByteLength()+SCTP_DATA_CHUNK_LENGTH));
01685
01686
01687 if (b > (int32)state->assocPmtu - IP_HEADER_LENGTH - SCTP_COMMON_HEADER)
01688 {
01689
01690 SCTPDataMsg* datMsgQueued = (SCTPDataMsg*)streamQ->pop();
01691 SCTPSimpleMessage *datMsgQueuedSimple = check_and_cast<SCTPSimpleMessage*>(datMsgQueued->getEncapsulatedMsg());
01692
01693 SCTPDataMsg* datMsgLastFragment = NULL;
01694 uint32 offset = 0;
01695
01696 sctpEV3<<"Fragmentation: chunk " << &datMsgQueued << ", size = " << datMsgQueued->getByteLength() << endl;
01697
01698 while (datMsgQueued)
01699 {
01700
01701 uint32 msgbytes = state->assocPmtu - IP_HEADER_LENGTH - SCTP_COMMON_HEADER - SCTP_DATA_CHUNK_LENGTH;
01702 if (msgbytes > datMsgQueuedSimple->getDataLen() - offset)
01703 msgbytes = datMsgQueuedSimple->getDataLen() - offset;
01704
01705
01706 SCTPDataMsg* datMsgFragment = new SCTPDataMsg();
01707 datMsgFragment->setSid(datMsgQueued->getSid());
01708 datMsgFragment->setPpid(datMsgQueued->getPpid());
01709 datMsgFragment->setInitialDestination(datMsgQueued->getInitialDestination());
01710 datMsgFragment->setEnqueuingTime(datMsgQueued->getEnqueuingTime());
01711 datMsgFragment->setMsgNum(datMsgQueued->getMsgNum());
01712 datMsgFragment->setOrdered(datMsgQueued->getOrdered());
01713 datMsgFragment->setExpiryTime(datMsgQueued->getExpiryTime());
01714 datMsgFragment->setRtx(datMsgQueued->getRtx());
01715 datMsgFragment->setFragment(true);
01716 datMsgFragment->setBooksize(msgbytes + state->header);
01717
01718
01719 if (offset == 0)
01720 datMsgFragment->setBBit(true);
01721
01722
01723 SCTPSimpleMessage *datMsgFragmentSimple = new SCTPSimpleMessage();
01724
01725 datMsgFragmentSimple->setName(datMsgQueuedSimple->getName());
01726 datMsgFragmentSimple->setCreationTime(datMsgQueuedSimple->getCreationTime());
01727
01728 datMsgFragmentSimple->setDataArraySize(msgbytes);
01729 datMsgFragmentSimple->setDataLen(msgbytes);
01730 datMsgFragmentSimple->setByteLength(msgbytes);
01731
01732
01733 for (uint32 i = offset; i < offset + msgbytes; i++)
01734 datMsgFragmentSimple->setData(i - offset, datMsgQueuedSimple->getData(i));
01735
01736 offset += msgbytes;
01737 datMsgFragment->encapsulate(datMsgFragmentSimple);
01738
01739
01740 if (!streamQ->empty())
01741 {
01742 if (!datMsgLastFragment)
01743 {
01744
01745 streamQ->insertBefore((SCTPDataMsg*)streamQ->front(), datMsgFragment);
01746 }
01747 else
01748 {
01749
01750 streamQ->insertAfter(datMsgLastFragment, datMsgFragment);
01751 }
01752 }
01753 else
01754 streamQ->insert(datMsgFragment);
01755
01756 state->queuedMessages++;
01757 qCounter.roomSumSendStreams += ADD_PADDING(datMsgFragment->getByteLength() + SCTP_DATA_CHUNK_LENGTH);
01758 qCounter.bookedSumSendStreams += datMsgFragment->getBooksize();
01759 sctpEV3<<"Fragmentation: fragment " << &datMsgFragment << " created, length = " << datMsgFragmentSimple->getByteLength() << ", queue size = " << streamQ->getLength() << endl;
01760
01761 datMsgLastFragment = datMsgFragment;
01762
01763
01764 if (datMsgQueuedSimple->getDataLen() == offset)
01765 {
01766 datMsgFragment->setEBit(true);
01767
01768
01769 sctpEV3<<"Fragmentation: delete " << &datMsgQueued << endl;
01770
01771 qCounter.roomSumSendStreams -= ADD_PADDING(datMsgQueued->getByteLength() + SCTP_DATA_CHUNK_LENGTH);
01772 qCounter.bookedSumSendStreams -= datMsgQueued->getBooksize();
01773 delete datMsgQueued;
01774 datMsgQueued = NULL;
01775 state->queuedMessages--;
01776 }
01777 }
01778
01779
01780 state->lastMsgWasFragment = true;
01781
01782 b=ADD_PADDING( (check_and_cast<SCTPSimpleMessage*>(((SCTPDataMsg*)streamQ->front())->getEncapsulatedMsg())->getBitLength()/8+SCTP_DATA_CHUNK_LENGTH));
01783
01784 }
01785
01786 if ((b <= availableSpace) &&
01787 ( (int32)((SCTPDataMsg*)streamQ->front())->getBooksize() <= availableCwnd)) {
01788 datMsg = (SCTPDataMsg*)streamQ->pop();
01789
01790
01791
01792
01793
01794 sendQueue->record(streamQ->getLength());
01795
01796 if (!datMsg->getFragment())
01797 {
01798 datMsg->setBBit(true);
01799 datMsg->setEBit(true);
01800 state->lastMsgWasFragment = false;
01801 }
01802 else
01803 {
01804 if (datMsg->getEBit())
01805 state->lastMsgWasFragment = false;
01806 else
01807 state->lastMsgWasFragment = true;
01808 }
01809
01810 sctpEV3<<"DequeueOutboundDataMsg() found chunk ("<<&datMsg<<") in the stream queue "<<&iter->first<<"("<<streamQ<<") queue size="<<streamQ->getLength()<<"\n";
01811 }
01812 }
01813 break;
01814 }
01815 }
01816 if (datMsg != NULL)
01817 {
01818 qCounter.roomSumSendStreams -= ADD_PADDING( (check_and_cast<SCTPSimpleMessage*>(datMsg->getEncapsulatedMsg())->getBitLength()/8+SCTP_DATA_CHUNK_LENGTH));
01819 qCounter.bookedSumSendStreams -= datMsg->getBooksize();
01820 }
01821 return (datMsg);
01822 }
01823
01824
01825 bool SCTPAssociation::nextChunkFitsIntoPacket(int32 bytes)
01826 {
01827 int32 nextStream = -1;
01828 SCTPSendStream* stream;
01829
01830
01831 if (state->lastMsgWasFragment)
01832 nextStream = state->lastStreamScheduled;
01833 else
01834 nextStream = (this->*ssFunctions.ssGetNextSid)(true);
01835
01836 if (nextStream == -1)
01837 return false;
01838
01839 stream = sendStreams.find(nextStream)->second;
01840
01841 if (stream)
01842 {
01843 cQueue* streamQ = NULL;
01844
01845 if (!stream->getUnorderedStreamQ()->empty())
01846 streamQ = stream->getUnorderedStreamQ();
01847 else if (!stream->getStreamQ()->empty())
01848 streamQ = stream->getStreamQ();
01849
01850 if (streamQ)
01851 {
01852 int32 b=ADD_PADDING( (check_and_cast<SCTPSimpleMessage*>(((SCTPDataMsg*)streamQ->front())->getEncapsulatedMsg())->getByteLength()+SCTP_DATA_CHUNK_LENGTH));
01853
01854
01855 if (b > (int32) state->assocPmtu - IP_HEADER_LENGTH - SCTP_COMMON_HEADER)
01856 {
01857
01858 if (bytes >= (int32) state->assocPmtu - IP_HEADER_LENGTH - SCTP_COMMON_HEADER - SCTP_DATA_CHUNK_LENGTH)
01859 return true;
01860 else
01861 return false;
01862 }
01863
01864
01865 if (b <= bytes)
01866 return true;
01867 else
01868 return false;
01869 }
01870 }
01871
01872 return false;
01873 }
01874
01875
01876 SCTPPathVariables* SCTPAssociation::getNextPath(const SCTPPathVariables* oldPath) const
01877 {
01878 int32 hit = 0;
01879 if (sctpPathMap.size() > 1) {
01880 for (SCTPPathMap::const_iterator iterator = sctpPathMap.begin();
01881 iterator != sctpPathMap.end(); iterator++) {
01882 if (iterator->second == oldPath) {
01883 if (++hit == 1) {
01884 continue;
01885 }
01886 else {
01887 break;
01888 }
01889 }
01890 if (iterator->second->activePath) {
01891 return iterator->second;
01892 }
01893 }
01894 }
01895 return(NULL);
01896 }
01897
01898 SCTPPathVariables* SCTPAssociation::getNextDestination(SCTPDataVariables* chunk) const
01899 {
01900 SCTPPathVariables* next;
01901 SCTPPathVariables* last;
01902
01903 sctpEV3 << "Running getNextDestination()" << endl;
01904 if (chunk->numberOfTransmissions == 0) {
01905 if (chunk->getInitialDestinationPath() == NULL) {
01906 next = state->getPrimaryPath();
01907 }
01908 else {
01909 next = chunk->getInitialDestinationPath();
01910 }
01911 }
01912 else {
01913 if (chunk->hasBeenFastRetransmitted) {
01914 sctpEV3 << "Chunk is scheduled for FastRetransmission. Next destination = "
01915 << chunk->getLastDestination() << endl;
01916 return(chunk->getLastDestinationPath());
01917 }
01918
01919 last = chunk->getLastDestinationPath();
01920 next = getNextPath(last);
01921 if( (next == NULL) || (next->confirmed == false) ) {
01922 next = last;
01923 }
01924 }
01925
01926 sctpEV3 << "getNextDestination(): chunk was last sent to " << last->remoteAddress
01927 << ", will next be sent to path " << next->remoteAddress << endl;
01928 return (next);
01929 }
01930
01931
01932 void SCTPAssociation::pmDataIsSentOn(SCTPPathVariables* path)
01933 {
01934
01935 stopTimer(path->HeartbeatTimer);
01936 if (state->enableHeartbeats)
01937 {
01938 path->heartbeatTimeout = path->pathRto + (double)sctpMain->par("hbInterval");
01939 startTimer(path->HeartbeatTimer, path->heartbeatTimeout);
01940 sctpEV3 << "Restarting HB timer on path "<< path->remoteAddress
01941 << " to expire at time " << path->heartbeatTimeout << endl;
01942 }
01943
01944 path->cwndTimeout = path->pathRto;
01945 stopTimer(path->CwndTimer);
01946 startTimer(path->CwndTimer, path->cwndTimeout);
01947
01948 sctpEV3 << "Restarting CWND timer on path "<< path->remoteAddress
01949 << " to expire at time " << path->cwndTimeout << endl;
01950 }
01951
01952 void SCTPAssociation::pmStartPathManagement()
01953 {
01954 RoutingTableAccess routingTableAccess;
01955 SCTPPathVariables* path;
01956 int32 i=0;
01957
01958
01959 state->assocPmtu = state->localRwnd;
01960 for(SCTPPathMap::iterator piter=sctpPathMap.begin(); piter!=sctpPathMap.end(); piter++)
01961 {
01962 path=piter->second;
01963 path->pathErrorCount = 0;
01964 InterfaceEntry *rtie = routingTableAccess.get()->getInterfaceForDestAddr(path->remoteAddress.get4());
01965 path->pmtu = rtie->getMTU();
01966 sctpEV3 << "Path MTU of Interface "<< i << " = " << path->pmtu <<"\n";
01967 if (path->pmtu < state->assocPmtu)
01968 {
01969 state->assocPmtu = path->pmtu;
01970 }
01971 initCCParameters(path);
01972 path->pathRto = (double)sctpMain->par("rtoInitial");
01973 path->srtt = path->pathRto;
01974 path->rttvar = SIMTIME_ZERO;
01975
01976 path->updateTime = SIMTIME_ZERO;
01977
01978
01979 path->partialBytesAcked = 0;
01980 path->outstandingBytes = 0;
01981 path->activePath = true;
01982
01983 stopTimer(path->T3_RtxTimer);
01984
01985 if (path->remoteAddress == state->initialPrimaryPath && !path->confirmed) {
01986 path->confirmed = true;
01987 }
01988 sctpEV3<<getFullPath()<<" numberOfLocalAddresses="<<state->localAddresses.size()<<"\n";
01989 path->heartbeatTimeout= (double)sctpMain->par("hbInterval")+i*path->pathRto;
01990 stopTimer(path->HeartbeatTimer);
01991 sendHeartbeat(path);
01992 startTimer(path->HeartbeatTimer, path->heartbeatTimeout);
01993 startTimer(path->HeartbeatIntervalTimer, path->heartbeatIntervalTimeout);
01994 path->statisticsPathRTO->record(path->pathRto);
01995 i++;
01996 }
01997 }
01998
01999
02000 int32 SCTPAssociation::getOutstandingBytes() const
02001 {
02002 int32 osb = 0;
02003 for (SCTPPathMap::const_iterator pm = sctpPathMap.begin(); pm != sctpPathMap.end(); pm++) {
02004 osb += pm->second->outstandingBytes;
02005 }
02006 return osb;
02007 }
02008
02009 void SCTPAssociation::pmClearPathCounter(SCTPPathVariables* path)
02010 {
02011 path->pathErrorCount = 0;
02012 if (path->activePath == false) {
02013
02014 pathStatusIndication(path, true);
02015 sctpEV3 << "Path " << path->remoteAddress
02016 << " state changes from INACTIVE to ACTIVE !!!" << endl;
02017 }
02018 }
02019
02020 void SCTPAssociation::pathStatusIndication(const SCTPPathVariables* path,
02021 const bool status)
02022 {
02023 cPacket* msg = new cPacket("StatusInfo");
02024 msg->setKind(SCTP_I_STATUS);
02025 SCTPStatusInfo* cmd = new SCTPStatusInfo();
02026 cmd->setPathId(path->remoteAddress);
02027 cmd->setAssocId(assocId);
02028 cmd->setActive(status);
02029 msg->setControlInfo(cmd);
02030 if (!status) {
02031 SCTP::AssocStatMap::iterator iter=sctpMain->assocStatMap.find(assocId);
02032 iter->second.numPathFailures++;
02033 }
02034 sendToApp(msg);
02035 }
02036
02037 void SCTPAssociation::pmRttMeasurement(SCTPPathVariables* path,
02038 const simtime_t& rttEstimation)
02039 {
02040 if (rttEstimation < MAXTIME) {
02041 if (simTime() > path->updateTime) {
02042 if (path->updateTime == SIMTIME_ZERO) {
02043 path->rttvar = rttEstimation.dbl() / 2;
02044 path->srtt = rttEstimation;
02045 path->pathRto = 3.0 * rttEstimation.dbl();
02046 path->pathRto = max(min(path->pathRto.dbl(), (double)sctpMain->par("rtoMax")),
02047 (double)sctpMain->par("rtoMin"));
02048 }
02049 else {
02050 path->rttvar = (1.0 - (double)sctpMain->par("rtoBeta")) * path->rttvar.dbl() +
02051 (double)sctpMain->par("rtoBeta") * fabs(path->srtt.dbl() - rttEstimation.dbl());
02052 path->srtt = (1.0 - (double)sctpMain->par("rtoAlpha")) * path->srtt.dbl() +
02053 (double)sctpMain->par("rtoAlpha") * rttEstimation.dbl();
02054 path->pathRto = path->srtt.dbl() + 4.0 * path->rttvar.dbl();
02055 path->pathRto = max(min(path->pathRto.dbl(), (double)sctpMain->par("rtoMax")),
02056 (double)sctpMain->par("rtoMin"));
02057 }
02058
02059
02060
02061
02062
02063
02064
02065
02066
02067 path->updateTime = simTime() + path->srtt;
02068 path->statisticsPathRTO->record(path->pathRto);
02069 path->statisticsPathRTT->record(rttEstimation);
02070 }
02071 }
02072 }
02073
02074 bool SCTPAssociation::allPathsInactive() const
02075 {
02076 for(SCTPPathMap::const_iterator it = sctpPathMap.begin(); it != sctpPathMap.end(); it++) {
02077 if (it->second->activePath) {
02078 return false;
02079 }
02080 }
02081 return true;
02082 }
02083
02084
02085 void SCTPAssociation::disposeOf(SCTPMessage* sctpmsg)
02086 {
02087 SCTPChunk* chunk;
02088 uint32 numberOfChunks = sctpmsg->getChunksArraySize();
02089 if (numberOfChunks>0)
02090 for (uint32 i=0; i<numberOfChunks; i++)
02091 {
02092 chunk = (SCTPChunk*)(sctpmsg->removeChunk());
02093 if (chunk->getChunkType()==DATA)
02094 delete (SCTPSimpleMessage*)chunk->decapsulate();
02095 delete chunk;
02096 }
02097 delete sctpmsg;
02098 }
02099