SCTPAssociationEventProc.cc

Go to the documentation of this file.
00001 //
00002 // Copyright (C) 2005-2010 by Irene Ruengeler
00003 // Copyright (C) 2009-2010 by Thomas Dreibholz
00004 //
00005 // This program is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU General Public License
00007 // as published by the Free Software Foundation; either version 2
00008 // of the License, or (at your option) any later version.
00009 //
00010 // This program is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00013 // GNU General Public License for more details.
00014 //
00015 // You should have received a copy of the GNU General Public License
00016 // along with this program; if not, see <http://www.gnu.org/licenses/>.
00017 //
00018 
00019 
00020 #include <string.h>
00021 #include "SCTP.h"
00022 #include "SCTPAssociation.h"
00023 #include "SCTPCommand_m.h"
00024 #include "IPControlInfo_m.h"
00025 #include "SCTPAlgorithm.h"
00026 
00027 //
00028 // Event processing code
00029 //
00030 
00031 void SCTPAssociation::process_ASSOCIATE(SCTPEventCode& event, SCTPCommand *sctpCommand, cPacket *msg)
00032 {
00033     IPvXAddress lAddr, rAddr;
00034 
00035     SCTPOpenCommand *openCmd = check_and_cast<SCTPOpenCommand *>(sctpCommand);
00036 
00037     ev<<"SCTPAssociationEventProc:process_ASSOCIATE\n";
00038 
00039     switch(fsm->getState())
00040     {
00041         case SCTP_S_CLOSED:
00042         initAssociation(openCmd);
00043         state->active = true;
00044         localAddressList = openCmd->getLocalAddresses();
00045         lAddr = openCmd->getLocalAddresses().front();
00046         if (!(openCmd->getRemoteAddresses().empty()))
00047         {
00048             remoteAddressList = openCmd->getRemoteAddresses();
00049             rAddr = openCmd->getRemoteAddresses().front();
00050         }
00051         else
00052             rAddr = openCmd->getRemoteAddr();
00053         localPort = openCmd->getLocalPort();
00054         remotePort = openCmd->getRemotePort();
00055         state->numRequests = openCmd->getNumRequests();
00056         if (rAddr.isUnspecified() || remotePort==0)
00057         opp_error("Error processing command OPEN_ACTIVE: remote address and port must be specified");
00058 
00059         if (localPort==0)
00060         {
00061         localPort = sctpMain->getEphemeralPort();
00062         }
00063         ev << "OPEN: " << lAddr << ":" << localPort << " --> " << rAddr << ":" << remotePort << "\n";
00064 
00065         sctpMain->updateSockPair(this, lAddr, rAddr, localPort, remotePort);
00066         state->localRwnd = (long)sctpMain->par("arwnd");
00067         sendInit();
00068         startTimer(T1_InitTimer,state->initRexmitTimeout);
00069         break;
00070 
00071     default:
00072         opp_error("Error processing command OPEN_ACTIVE: connection already exists");
00073     }
00074 
00075 }
00076 
00077 void SCTPAssociation::process_OPEN_PASSIVE(SCTPEventCode& event, SCTPCommand *sctpCommand, cPacket *msg)
00078 {
00079     IPvXAddress lAddr;
00080     int16 localPort;
00081 
00082     SCTPOpenCommand *openCmd = check_and_cast<SCTPOpenCommand *>(sctpCommand);
00083 
00084     sctpEV3<<"SCTPAssociationEventProc:process_OPEN_PASSIVE\n";
00085 
00086     switch(fsm->getState())
00087     {
00088         case SCTP_S_CLOSED:
00089             initAssociation(openCmd);
00090             state->fork = openCmd->getFork();
00091             localAddressList = openCmd->getLocalAddresses();
00092             sctpEV3<<"process_OPEN_PASSIVE: number of local addresses="<<localAddressList.size()<<"\n";
00093             lAddr = openCmd->getLocalAddresses().front();
00094             localPort = openCmd->getLocalPort();
00095             inboundStreams = openCmd->getInboundStreams();
00096             outboundStreams = openCmd->getOutboundStreams();
00097             state->localRwnd = (long)sctpMain->par("arwnd");
00098             state->numRequests = openCmd->getNumRequests();
00099             state->messagesToPush = openCmd->getMessagesToPush();
00100 
00101             if (localPort==0)
00102                 opp_error("Error processing command OPEN_PASSIVE: local port must be specified");
00103             sctpEV3 << "Assoc "<<assocId<<"::Starting to listen on: " << lAddr << ":" << localPort << "\n";
00104 
00105             sctpMain->updateSockPair(this, lAddr, IPvXAddress(), localPort, 0);
00106             break;
00107         default:
00108         opp_error("Error processing command OPEN_PASSIVE: connection already exists");
00109     }
00110 }
00111 
00112 void SCTPAssociation::process_SEND(SCTPEventCode& event, SCTPCommand* sctpCommand, cPacket* msg)
00113 {
00114     SCTPSendCommand* sendCommand = check_and_cast<SCTPSendCommand*>(sctpCommand);
00115 
00116     if(fsm->getState() != SCTP_S_ESTABLISHED) {
00117         // TD 12.03.2009: since SCTP_S_ESTABLISHED is the only case, the
00118         // switch(...)-block has been removed for enhanced readability.
00119         sctpEV3 << "process_SEND: state is not SCTP_S_ESTABLISHED -> returning" << endl;
00120         return;
00121     }
00122 
00123     sctpEV3 << "process_SEND:"
00124               << " assocId="      << assocId
00125               << " localAddr="    << localAddr
00126               << " remoteAddr="   << remoteAddr
00127               << " cmdRemoteAddr="<< sendCommand->getRemoteAddr()
00128               << " cmdPrimary="   << (sendCommand->getPrimary() ? "true" : "false")
00129               << " appGateIndex=" << appGateIndex
00130               << " streamId="     << sendCommand->getSid() << endl;
00131 
00132     SCTPSimpleMessage* smsg = check_and_cast<SCTPSimpleMessage*>((msg->decapsulate()));
00133     SCTP::AssocStatMap::iterator iter = sctpMain->assocStatMap.find(assocId);
00134     iter->second.sentBytes += smsg->getBitLength() / 8;
00135 
00136   // ------ Prepare SCTPDataMsg -----------------------------------------
00137   const uint32 streamId       = sendCommand->getSid();
00138   const uint32 sendUnordered = sendCommand->getSendUnordered();
00139   const uint32 ppid           = sendCommand->getPpid();
00140   SCTPSendStream* stream = NULL;
00141   SCTPSendStreamMap::iterator associter = sendStreams.find(streamId);
00142   if (associter != sendStreams.end()) {
00143      stream = associter->second;
00144   }
00145   else {
00146      opp_error("stream with id %d not found", streamId);
00147   }
00148 
00149   char name[64];
00150   snprintf(name, sizeof(name), "SDATA-%d-%d", streamId, state->msgNum);
00151   smsg->setName(name);
00152 
00153   SCTPDataMsg* datMsg = new SCTPDataMsg();
00154   datMsg->encapsulate(smsg);
00155   datMsg->setSid(streamId);
00156   datMsg->setPpid(ppid);
00157   datMsg->setEnqueuingTime(simulation.getSimTime());
00158 
00159   // ------ Set initial destination address -----------------------------
00160   if (sendCommand->getPrimary()) {
00161      if (sendCommand->getRemoteAddr() == IPvXAddress("0.0.0.0")) {
00162             datMsg->setInitialDestination(remoteAddr);
00163      }
00164      else {
00165         datMsg->setInitialDestination(sendCommand->getRemoteAddr());
00166      }
00167   }
00168   else {
00169      datMsg->setInitialDestination(state->getPrimaryPathIndex());
00170   }
00171 
00172   // ------ Optional padding and size calculations ----------------------
00173      datMsg->setBooksize(smsg->getBitLength() / 8 + state->header);
00174      qCounter.roomSumSendStreams += ADD_PADDING(smsg->getBitLength() / 8 + SCTP_DATA_CHUNK_LENGTH);
00175      qCounter.bookedSumSendStreams += datMsg->getBooksize();
00176      state->sendBuffer += smsg->getByteLength();
00177 
00178   datMsg->setMsgNum(++state->msgNum);
00179 
00180   // ------ Ordered/Unordered modes -------------------------------------
00181   if (sendUnordered == 1) {
00182      datMsg->setOrdered(false);
00183      stream->getUnorderedStreamQ()->insert(datMsg);
00184   }
00185   else {
00186      datMsg->setOrdered(true);
00187      stream->getStreamQ()->insert(datMsg);
00188 
00189      if ((state->appSendAllowed)        &&
00190           (state->sendQueueLimit > 0) &&
00191           ((uint64)state->sendBuffer >= state->sendQueueLimit) ) {
00192         sendIndicationToApp(SCTP_I_SENDQUEUE_FULL);
00193         state->appSendAllowed = false;
00194      }
00195      sendQueue->record(stream->getStreamQ()->getLength());
00196   }
00197 
00198   state->queuedMessages++;
00199   if ((state->queueLimit > 0) && (state->queuedMessages > state->queueLimit)) {
00200      state->queueUpdate = false;
00201   }
00202   sctpEV3 << "process_SEND:"
00203              << " last="         << sendCommand->getLast()
00204              <<"    queueLimit=" << state->queueLimit << endl;
00205 
00206   // ------ Call sendCommandInvoked() to send message -------------------
00207   // sendCommandInvoked() itself will call sendOnAllPaths() ...
00208   if (sendCommand->getLast() == true) {
00209      if (sendCommand->getPrimary()) {
00210         sctpAlgorithm->sendCommandInvoked(NULL);
00211      }
00212      else {
00213         sctpAlgorithm->sendCommandInvoked(getPath(datMsg->getInitialDestination()));
00214      }
00215   }
00216 }
00217 
00218 void SCTPAssociation::process_RECEIVE_REQUEST(SCTPEventCode& event, SCTPCommand *sctpCommand)
00219 {
00220      SCTPSendCommand *sendCommand = check_and_cast<SCTPSendCommand *>(sctpCommand);
00221     if ((uint32)sendCommand->getSid() > inboundStreams || sendCommand->getSid() < 0)
00222     {
00223         sctpEV3<<"Application tries to read from invalid stream id....\n";
00224     }
00225     state->numMsgsReq[sendCommand->getSid()]+= sendCommand->getNumMsgs();
00226     pushUlp();
00227 }
00228 
00229 void SCTPAssociation::process_PRIMARY(SCTPEventCode& event, SCTPCommand *sctpCommand)
00230 {
00231     SCTPPathInfo *pinfo = check_and_cast<SCTPPathInfo *>(sctpCommand);
00232     state->setPrimaryPath(getPath(pinfo->getRemoteAddress()));
00233 }
00234 
00235 
00236 void SCTPAssociation::process_QUEUE_MSGS_LIMIT(const SCTPCommand* sctpCommand)
00237 {
00238     const SCTPInfo* qinfo = check_and_cast<const SCTPInfo*>(sctpCommand);
00239     state->queueLimit = qinfo->getText();
00240     sctpEV3<<"state->queueLimit set to "<<state->queueLimit<<"\n";
00241 }
00242 
00243 void SCTPAssociation::process_QUEUE_BYTES_LIMIT(const SCTPCommand* sctpCommand)
00244 {
00245     const SCTPInfo* qinfo = check_and_cast<const SCTPInfo*>(sctpCommand);
00246     state->sendQueueLimit = qinfo->getText();
00247 }
00248 
00249 void SCTPAssociation::process_CLOSE(SCTPEventCode& event)
00250 {
00251     sctpEV3 << "SCTPAssociationEventProc:process_CLOSE; assoc=" << assocId << endl;
00252     switch(fsm->getState()) {
00253         case SCTP_S_ESTABLISHED:
00254             sendOnAllPaths(state->getPrimaryPath());
00255             sendShutdown();
00256             break;
00257         case SCTP_S_SHUTDOWN_RECEIVED:
00258             if (getOutstandingBytes() == 0) {
00259                 sendShutdownAck(remoteAddr);
00260             }
00261             break;
00262      }
00263 }
00264 
00265 void SCTPAssociation::process_ABORT(SCTPEventCode& event)
00266 {
00267     sctpEV3 << "SCTPAssociationEventProc:process_ABORT; assoc=" << assocId << endl;
00268     switch(fsm->getState()) {
00269         case SCTP_S_ESTABLISHED:
00270             sendOnAllPaths(state->getPrimaryPath());
00271             sendAbort();
00272             break;
00273     }
00274 }
00275 
00276 void SCTPAssociation::process_STATUS(SCTPEventCode& event, SCTPCommand *sctpCommand, cPacket *msg)
00277 {
00278     SCTPStatusInfo *statusInfo = new SCTPStatusInfo();
00279     statusInfo->setState(fsm->getState());
00280     statusInfo->setStateName(stateName(fsm->getState()));
00281     statusInfo->setPathId(remoteAddr);
00282     statusInfo->setActive(getPath(remoteAddr)->activePath);
00283     msg->setControlInfo(statusInfo);
00284     sendToApp(msg);
00285 }