Commit a3e71b7b authored by shengjh's avatar shengjh 🇨🇳
Browse files

correct processReceivedGrant processSendMsgFromApp input param to Packet*

parent a2bfe18b
......@@ -271,7 +271,7 @@ void HomaTransport::handleMessage(cMessage *msg)
} else {
ASSERT(localAddr == outMsg->getSrcAddr());
}
sxController.processSendMsgFromApp(outMsg);
sxController.processSendMsgFromApp(pk);
} else if (msg->arrivedOn("udpIn")) {
handleRecvdPkt(check_and_cast<Packet*>(msg));
}
......@@ -452,8 +452,7 @@ HomaTransport::SendController::handlePktTransmitEnd()
}
auto& schedPkts = msg->getTxSchedPkts();
// Find oldest sched pkt creation time
for (auto ittt = schedPkts.begin(); ittt != schedPkts.end();
++ittt) {
for (auto ittt = schedPkts.begin(); ittt != schedPkts.end(); ++ittt) {
simtime_t pktCreationTime = (*ittt)->getCreationTime();
oldestSchedPkt = (pktCreationTime < oldestSchedPkt
? pktCreationTime : oldestSchedPkt);
......@@ -463,8 +462,9 @@ HomaTransport::SendController::handlePktTransmitEnd()
simtime_t transmitDelay;
if (totalTrailingUnsched > 0) {
simtime_t totalUnschedTxTimeLeft = SimTime(1e-9 * (
HomaPkt::getBytesOnWire(totalTrailingUnsched,
PktType::UNSCHED_DATA) * 8.0 / homaConfig->nicLinkSpeed));
// HomaPkt::getBytesOnWire(totalTrailingUnsched, PktType::UNSCHED_DATA) * 8.0 / homaConfig->nicLinkSpeed
HomaPktHelper::getBytesOnWire(totalTrailingUnsched, PktType::UNSCHED_DATA)/ homaConfig->nicLinkSpeed
));
transmitDelay = sentPktDuration < totalUnschedTxTimeLeft
? sentPktDuration : totalUnschedTxTimeLeft;
transport->emit(sxUnschedPktDelaySignal, transmitDelay);
......@@ -487,13 +487,14 @@ HomaTransport::SendController::handlePktTransmitEnd()
* \param sendMsg
* Application messsage that needs to be transmitted by the transport.
*/
void HomaTransport::SendController::processSendMsgFromApp(Ptr<const AppMessage> sendMsg)
void HomaTransport::SendController::processSendMsgFromApp(Packet* outPkt)
{
transport->emit(msgsLeftToSendSignal, outboundMsgMap.size());
transport->emit(bytesLeftToSendSignal, bytesLeftToSend);
transport->emit(bytesNeedGrantSignal, bytesNeedGrant);
auto sendMsg = outPkt->peekAtFront<AppMessage>();
uint32_t destAddr = sendMsg->getDestAddr().toIpv4().getInt();
uint32_t msgSize = sendMsg->getByteLength();
uint32_t msgSize = B(sendMsg->getChunkLength()).get();
std::vector<uint16_t> reqUnschedDataVec = unschedByteAllocator->getReqUnschedDataPkts(destAddr, msgSize);
outboundMsgMap.emplace(std::piecewise_construct,
......@@ -515,7 +516,7 @@ void HomaTransport::SendController::processSendMsgFromApp(Ptr<const AppMessage>
ASSERT(insResult.second); // check insertion took place
bytesNeedGrant += outboundMsg->bytesToSched;
msgId++;
delete sendMsg;
delete outPkt;
sendOrQueue();
}
......@@ -527,26 +528,29 @@ void HomaTransport::SendController::processSendMsgFromApp(Ptr<const AppMessage>
* receiverd grant packet from the network.
*/
void
HomaTransport::SendController::processReceivedGrant(HomaPkt* rxPkt)
HomaTransport::SendController::processReceivedGrant(Packet* rxPkt)
{
EV << "Received a grant from " << rxPkt->getSrcAddr().str()
<< " for msgId " << rxPkt->getMsgId() << " for "
<< rxPkt->getGrantFields().grantBytes << " Bytes." << endl;
sumGrantsInGap += HomaPkt::getBytesOnWire(rxPkt->getDataBytes(),
(PktType)rxPkt->getPktType());
OutboundMessage* outboundMsg = &(outboundMsgMap.at(rxPkt->getMsgId()));
ASSERT(rxPkt->getPktType() == PktType::GRANT &&
rxPkt->getDestAddr() == outboundMsg->srcAddr &&
rxPkt->getSrcAddr() == outboundMsg->destAddr);
auto rxChunk = rxPkt->peekAtFront<HomaPkt>();
// EV << "Received a grant from "
// //<< rxChunk->getSrcAddr().str()
// << " for msgId " << rxChunk->getMsgId() << " for "
// << rxChunk->getGrantFields().grantBytes << " Bytes." << endl;
sumGrantsInGap +=
//HomaPkt::getBytesOnWire(rxPkt->getDataBytes(),(PktType)rxPkt->getPktType());
HomaPktHelper::getBytesOnWire(HomaPktHelper::getDataBytes(rxChunk), (PktType)rxChunk->getPktType());
OutboundMessage* outboundMsg = &(outboundMsgMap.at(rxChunk->getMsgId()));
ASSERT(rxChunk->getPktType() == PktType::GRANT &&
rxChunk->getDestAddr() == outboundMsg->srcAddr &&
rxChunk->getSrcAddr() == outboundMsg->destAddr);
size_t numRemoved = outbndMsgSet.erase(outboundMsg);
ASSERT(numRemoved <= 1); // At most one item removed
int bytesToSchedOld = outboundMsg->bytesToSched;
int bytesToSchedNew = outboundMsg->prepareSchedPkt(
rxPkt->getGrantFields().offset, rxPkt->getGrantFields().grantBytes,
rxPkt->getGrantFields().schedPrio);
rxChunk->getGrantFields().offset, rxChunk->getGrantFields().grantBytes,
rxChunk->getGrantFields().schedPrio);
int numBytesSent = bytesToSchedOld - bytesToSchedNew;
bytesNeedGrant -= numBytesSent;
ASSERT(numBytesSent > 0 && bytesLeftToSend >= 0 && bytesNeedGrant >= 0);
......@@ -576,7 +580,7 @@ HomaTransport::SendController::processReceivedGrant(HomaPkt* rxPkt)
void
HomaTransport::SendController::sendOrQueue(cMessage* msg)
{
HomaPkt* sxPkt = nullptr;
Packet* sxPkt = nullptr;
if (msg == transport->sendTimer) {
// Send timer fired and it's time to check if we can send a data packet.
ASSERT(msg->getKind() == SelfMsgKind::SEND);
......@@ -666,6 +670,7 @@ HomaTransport::SendController::sendOrQueue(cMessage* msg)
void
HomaTransport::SendController::sendPktAndScheduleNext(Packet* sxPkt)
{
PktType pktType = (PktType)sxPkt->getPktType();
auto chunk = sxPkt->peekAtFront<AppMessage>();
uint32_t numDataBytes = chunk->getDataBytes();
......
......@@ -176,7 +176,7 @@ public:
~SendController();
void initSendController(HomaConfigDepot* homaConfig,
PriorityResolver* prioResolver);
void processSendMsgFromApp(Ptr<const AppMessage> sendMsg);
void processSendMsgFromApp(Packet* sendMsg);
void processReceivedGrant(Packet* rxPkt);
// SJH:double const to deal with getTag problem because getTag always return const Ptr
const OutboundMsgMap* getOutboundMsgMap() const {return &outboundMsgMap;}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment