Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <string.h>
00021 #include "SCTP.h"
00022 #include "SCTPAssociation.h"
00023 #include "SCTPCommand_m.h"
00024 #include "IPControlInfo_m.h"
00025 #include "SCTPAlgorithm.h"
00026
00027
00028
00029
00030
00031 void SCTPAssociation::process_ASSOCIATE(SCTPEventCode& event, SCTPCommand *sctpCommand, cPacket *msg)
00032 {
00033 IPvXAddress lAddr, rAddr;
00034
00035 SCTPOpenCommand *openCmd = check_and_cast<SCTPOpenCommand *>(sctpCommand);
00036
00037 ev<<"SCTPAssociationEventProc:process_ASSOCIATE\n";
00038
00039 switch(fsm->getState())
00040 {
00041 case SCTP_S_CLOSED:
00042 initAssociation(openCmd);
00043 state->active = true;
00044 localAddressList = openCmd->getLocalAddresses();
00045 lAddr = openCmd->getLocalAddresses().front();
00046 if (!(openCmd->getRemoteAddresses().empty()))
00047 {
00048 remoteAddressList = openCmd->getRemoteAddresses();
00049 rAddr = openCmd->getRemoteAddresses().front();
00050 }
00051 else
00052 rAddr = openCmd->getRemoteAddr();
00053 localPort = openCmd->getLocalPort();
00054 remotePort = openCmd->getRemotePort();
00055 state->numRequests = openCmd->getNumRequests();
00056 if (rAddr.isUnspecified() || remotePort==0)
00057 opp_error("Error processing command OPEN_ACTIVE: remote address and port must be specified");
00058
00059 if (localPort==0)
00060 {
00061 localPort = sctpMain->getEphemeralPort();
00062 }
00063 ev << "OPEN: " << lAddr << ":" << localPort << " --> " << rAddr << ":" << remotePort << "\n";
00064
00065 sctpMain->updateSockPair(this, lAddr, rAddr, localPort, remotePort);
00066 state->localRwnd = (long)sctpMain->par("arwnd");
00067 sendInit();
00068 startTimer(T1_InitTimer,state->initRexmitTimeout);
00069 break;
00070
00071 default:
00072 opp_error("Error processing command OPEN_ACTIVE: connection already exists");
00073 }
00074
00075 }
00076
00077 void SCTPAssociation::process_OPEN_PASSIVE(SCTPEventCode& event, SCTPCommand *sctpCommand, cPacket *msg)
00078 {
00079 IPvXAddress lAddr;
00080 int16 localPort;
00081
00082 SCTPOpenCommand *openCmd = check_and_cast<SCTPOpenCommand *>(sctpCommand);
00083
00084 sctpEV3<<"SCTPAssociationEventProc:process_OPEN_PASSIVE\n";
00085
00086 switch(fsm->getState())
00087 {
00088 case SCTP_S_CLOSED:
00089 initAssociation(openCmd);
00090 state->fork = openCmd->getFork();
00091 localAddressList = openCmd->getLocalAddresses();
00092 sctpEV3<<"process_OPEN_PASSIVE: number of local addresses="<<localAddressList.size()<<"\n";
00093 lAddr = openCmd->getLocalAddresses().front();
00094 localPort = openCmd->getLocalPort();
00095 inboundStreams = openCmd->getInboundStreams();
00096 outboundStreams = openCmd->getOutboundStreams();
00097 state->localRwnd = (long)sctpMain->par("arwnd");
00098 state->numRequests = openCmd->getNumRequests();
00099 state->messagesToPush = openCmd->getMessagesToPush();
00100
00101 if (localPort==0)
00102 opp_error("Error processing command OPEN_PASSIVE: local port must be specified");
00103 sctpEV3 << "Assoc "<<assocId<<"::Starting to listen on: " << lAddr << ":" << localPort << "\n";
00104
00105 sctpMain->updateSockPair(this, lAddr, IPvXAddress(), localPort, 0);
00106 break;
00107 default:
00108 opp_error("Error processing command OPEN_PASSIVE: connection already exists");
00109 }
00110 }
00111
00112 void SCTPAssociation::process_SEND(SCTPEventCode& event, SCTPCommand* sctpCommand, cPacket* msg)
00113 {
00114 SCTPSendCommand* sendCommand = check_and_cast<SCTPSendCommand*>(sctpCommand);
00115
00116 if(fsm->getState() != SCTP_S_ESTABLISHED) {
00117
00118
00119 sctpEV3 << "process_SEND: state is not SCTP_S_ESTABLISHED -> returning" << endl;
00120 return;
00121 }
00122
00123 sctpEV3 << "process_SEND:"
00124 << " assocId=" << assocId
00125 << " localAddr=" << localAddr
00126 << " remoteAddr=" << remoteAddr
00127 << " cmdRemoteAddr="<< sendCommand->getRemoteAddr()
00128 << " cmdPrimary=" << (sendCommand->getPrimary() ? "true" : "false")
00129 << " appGateIndex=" << appGateIndex
00130 << " streamId=" << sendCommand->getSid() << endl;
00131
00132 SCTPSimpleMessage* smsg = check_and_cast<SCTPSimpleMessage*>((msg->decapsulate()));
00133 SCTP::AssocStatMap::iterator iter = sctpMain->assocStatMap.find(assocId);
00134 iter->second.sentBytes += smsg->getBitLength() / 8;
00135
00136
00137 const uint32 streamId = sendCommand->getSid();
00138 const uint32 sendUnordered = sendCommand->getSendUnordered();
00139 const uint32 ppid = sendCommand->getPpid();
00140 SCTPSendStream* stream = NULL;
00141 SCTPSendStreamMap::iterator associter = sendStreams.find(streamId);
00142 if (associter != sendStreams.end()) {
00143 stream = associter->second;
00144 }
00145 else {
00146 opp_error("stream with id %d not found", streamId);
00147 }
00148
00149 char name[64];
00150 snprintf(name, sizeof(name), "SDATA-%d-%d", streamId, state->msgNum);
00151 smsg->setName(name);
00152
00153 SCTPDataMsg* datMsg = new SCTPDataMsg();
00154 datMsg->encapsulate(smsg);
00155 datMsg->setSid(streamId);
00156 datMsg->setPpid(ppid);
00157 datMsg->setEnqueuingTime(simulation.getSimTime());
00158
00159
00160 if (sendCommand->getPrimary()) {
00161 if (sendCommand->getRemoteAddr() == IPvXAddress("0.0.0.0")) {
00162 datMsg->setInitialDestination(remoteAddr);
00163 }
00164 else {
00165 datMsg->setInitialDestination(sendCommand->getRemoteAddr());
00166 }
00167 }
00168 else {
00169 datMsg->setInitialDestination(state->getPrimaryPathIndex());
00170 }
00171
00172
00173 datMsg->setBooksize(smsg->getBitLength() / 8 + state->header);
00174 qCounter.roomSumSendStreams += ADD_PADDING(smsg->getBitLength() / 8 + SCTP_DATA_CHUNK_LENGTH);
00175 qCounter.bookedSumSendStreams += datMsg->getBooksize();
00176 state->sendBuffer += smsg->getByteLength();
00177
00178 datMsg->setMsgNum(++state->msgNum);
00179
00180
00181 if (sendUnordered == 1) {
00182 datMsg->setOrdered(false);
00183 stream->getUnorderedStreamQ()->insert(datMsg);
00184 }
00185 else {
00186 datMsg->setOrdered(true);
00187 stream->getStreamQ()->insert(datMsg);
00188
00189 if ((state->appSendAllowed) &&
00190 (state->sendQueueLimit > 0) &&
00191 ((uint64)state->sendBuffer >= state->sendQueueLimit) ) {
00192 sendIndicationToApp(SCTP_I_SENDQUEUE_FULL);
00193 state->appSendAllowed = false;
00194 }
00195 sendQueue->record(stream->getStreamQ()->getLength());
00196 }
00197
00198 state->queuedMessages++;
00199 if ((state->queueLimit > 0) && (state->queuedMessages > state->queueLimit)) {
00200 state->queueUpdate = false;
00201 }
00202 sctpEV3 << "process_SEND:"
00203 << " last=" << sendCommand->getLast()
00204 <<" queueLimit=" << state->queueLimit << endl;
00205
00206
00207
00208 if (sendCommand->getLast() == true) {
00209 if (sendCommand->getPrimary()) {
00210 sctpAlgorithm->sendCommandInvoked(NULL);
00211 }
00212 else {
00213 sctpAlgorithm->sendCommandInvoked(getPath(datMsg->getInitialDestination()));
00214 }
00215 }
00216 }
00217
00218 void SCTPAssociation::process_RECEIVE_REQUEST(SCTPEventCode& event, SCTPCommand *sctpCommand)
00219 {
00220 SCTPSendCommand *sendCommand = check_and_cast<SCTPSendCommand *>(sctpCommand);
00221 if ((uint32)sendCommand->getSid() > inboundStreams || sendCommand->getSid() < 0)
00222 {
00223 sctpEV3<<"Application tries to read from invalid stream id....\n";
00224 }
00225 state->numMsgsReq[sendCommand->getSid()]+= sendCommand->getNumMsgs();
00226 pushUlp();
00227 }
00228
00229 void SCTPAssociation::process_PRIMARY(SCTPEventCode& event, SCTPCommand *sctpCommand)
00230 {
00231 SCTPPathInfo *pinfo = check_and_cast<SCTPPathInfo *>(sctpCommand);
00232 state->setPrimaryPath(getPath(pinfo->getRemoteAddress()));
00233 }
00234
00235
00236 void SCTPAssociation::process_QUEUE_MSGS_LIMIT(const SCTPCommand* sctpCommand)
00237 {
00238 const SCTPInfo* qinfo = check_and_cast<const SCTPInfo*>(sctpCommand);
00239 state->queueLimit = qinfo->getText();
00240 sctpEV3<<"state->queueLimit set to "<<state->queueLimit<<"\n";
00241 }
00242
00243 void SCTPAssociation::process_QUEUE_BYTES_LIMIT(const SCTPCommand* sctpCommand)
00244 {
00245 const SCTPInfo* qinfo = check_and_cast<const SCTPInfo*>(sctpCommand);
00246 state->sendQueueLimit = qinfo->getText();
00247 }
00248
00249 void SCTPAssociation::process_CLOSE(SCTPEventCode& event)
00250 {
00251 sctpEV3 << "SCTPAssociationEventProc:process_CLOSE; assoc=" << assocId << endl;
00252 switch(fsm->getState()) {
00253 case SCTP_S_ESTABLISHED:
00254 sendOnAllPaths(state->getPrimaryPath());
00255 sendShutdown();
00256 break;
00257 case SCTP_S_SHUTDOWN_RECEIVED:
00258 if (getOutstandingBytes() == 0) {
00259 sendShutdownAck(remoteAddr);
00260 }
00261 break;
00262 }
00263 }
00264
00265 void SCTPAssociation::process_ABORT(SCTPEventCode& event)
00266 {
00267 sctpEV3 << "SCTPAssociationEventProc:process_ABORT; assoc=" << assocId << endl;
00268 switch(fsm->getState()) {
00269 case SCTP_S_ESTABLISHED:
00270 sendOnAllPaths(state->getPrimaryPath());
00271 sendAbort();
00272 break;
00273 }
00274 }
00275
00276 void SCTPAssociation::process_STATUS(SCTPEventCode& event, SCTPCommand *sctpCommand, cPacket *msg)
00277 {
00278 SCTPStatusInfo *statusInfo = new SCTPStatusInfo();
00279 statusInfo->setState(fsm->getState());
00280 statusInfo->setStateName(stateName(fsm->getState()));
00281 statusInfo->setPathId(remoteAddr);
00282 statusInfo->setActive(getPath(remoteAddr)->activePath);
00283 msg->setControlInfo(statusInfo);
00284 sendToApp(msg);
00285 }