Commit a2bfe18b authored by shengjh's avatar shengjh 🇨🇳
Browse files

correct txSchedPkts from HomaPkt to Packet.

parent 9246129d
......@@ -320,20 +320,19 @@ void HomaTransport::handleRecvdPkt(Packet* pkt)
rxScheduler.rcvdBytesPerActivePeriod += pktLenOnWire;
// handle data or grant packets appropriately
switch (rxPkt->getPktType()) {
switch (rxChunk->getPktType()) {
case PktType::REQUEST:
case PktType::UNSCHED_DATA:
case PktType::SCHED_DATA:
rxScheduler.processReceivedPkt(rxPkt);
rxScheduler.processReceivedPkt(pkt);
break;
case PktType::GRANT:
sxController.processReceivedGrant(rxPkt);
sxController.processReceivedGrant(pkt);
break;
default:
throw cRuntimeError("Received packet type(%d) is not valid.",
rxPkt->getPktType());
throw cRuntimeError("Received packet type(%d) is not valid.", rxChunk->getPktType());
}
// Check if this is the end of active period and we should dump stats for
......@@ -417,7 +416,7 @@ HomaTransport::SendController::~SendController()
{
delete unschedByteAllocator;
while (!outGrantQueue.empty()) {
HomaPkt* head = outGrantQueue.top();
auto head = outGrantQueue.top();
outGrantQueue.pop();
delete head;
}
......@@ -436,7 +435,7 @@ HomaTransport::SendController::handlePktTransmitEnd()
// chooses one for transmission over the others, the sender has imposed a
// delay in transmitting the other packets. Here we collect the statistics
// related those trasmssion delays.
uint32_t lastDestAddr = sentPkt.getDestAddr().toIPv4().getInt();
uint32_t lastDestAddr = sentPkt.getDestAddr().toIpv4().getInt();
for (auto it = rxAddrMsgMap.begin(); it != rxAddrMsgMap.end(); ++it) {
uint32_t rxAddr = it->first;
if (rxAddr != lastDestAddr) {
......@@ -444,8 +443,7 @@ HomaTransport::SendController::handlePktTransmitEnd()
uint64_t totalTrailingUnsched = 0;
simtime_t oldestSchedPkt = simTime();
simtime_t currentTime = simTime();
for (auto itt = allMsgToRxAddr.begin();
itt != allMsgToRxAddr.end(); ++itt) {
for (auto itt = allMsgToRxAddr.begin(); itt != allMsgToRxAddr.end(); ++itt) {
OutboundMessage* msg = *itt;
if (msg->msgSize > msg->bytesLeft) {
// Unsched delay at sender may waste rx bw only if msg has
......@@ -1036,7 +1034,9 @@ HomaTransport::OutboundMessage::prepareSchedPkt(uint32_t offset, uint16_t numByt
uint32_t bytesToSend = std::min((uint32_t)numBytes, this->bytesToSched);
// create a data pkt and push it txPkts queue for
HomaPkt* dataPkt = new HomaPkt(sxController->transport);
//HomaPkt* dataPkt = new HomaPkt(sxController->transport);
Packet *packet = new Packet();
auto &dataPkt = makeShared<HomaPkt>();
dataPkt->setPktType(PktType::SCHED_DATA);
dataPkt->setSrcAddr(this->srcAddr);
dataPkt->setDestAddr(this->destAddr);
......@@ -1047,8 +1047,9 @@ HomaTransport::OutboundMessage::prepareSchedPkt(uint32_t offset, uint16_t numByt
dataFields.lastByte = dataFields.firstByte + bytesToSend - 1;
dataPkt->setSchedDataFields(dataFields);
dataPkt->setByteLength(bytesToSend + dataPkt->headerSize());
txPkts.push(dataPkt);
txSchedPkts.insert(dataPkt);
packet->insertAtBack(dataPkt);
txPkts.push(packet);
txSchedPkts.insert(packet);
// update outbound messgae
this->bytesToSched -= bytesToSend;
......@@ -1266,9 +1267,14 @@ HomaTransport::ReceiveScheduler::lookupInboundMesg(Ptr<const HomaPkt> rxPkt) con
* Received data packet (ie. REQUEST, UNSCHED_DATA, or SCHED_DATA).
*/
void
HomaTransport::ReceiveScheduler::processReceivedPkt(HomaPkt* rxPkt)
HomaTransport::ReceiveScheduler::processReceivedPkt(Packet* rxPkt)
{
uint32_t pktLenOnWire = HomaPkt::getBytesOnWire(rxPkt->getDataBytes(), (PktType)rxPkt->getPktType());
auto chunk = rxPkt->peekAtFront<HomaPkt>();
//uint32_t pktLenOnWire = HomaPkt::getBytesOnWire(rxPkt->getDataBytes(), (PktType)rxPkt->getPktType());
uint32_t pktLenOnWire = HomaPktHelper::getBytesOnWire(
HomaPktHelper::getDataBytes(homaChunk),
(PktType)homaChunk->getPktType() );
simtime_t pktDuration = SimTime(1e-9 * (pktLenOnWire * 8.0 / homaConfig->nicLinkSpeed));
simtime_t timeNow = simTime();
......@@ -1289,8 +1295,7 @@ HomaTransport::ReceiveScheduler::processReceivedPkt(HomaPkt* rxPkt)
maxEarliestGrant = simTime();
break;
}
simtime_t earliestGrant =
std::get<2>(*topMesg->inflightGrants.begin());
simtime_t earliestGrant = std::get<2>(*topMesg->inflightGrants.begin());
maxEarliestGrant = std::max(maxEarliestGrant, earliestGrant);
}
......@@ -1347,7 +1352,7 @@ HomaTransport::ReceiveScheduler::processReceivedPkt(HomaPkt* rxPkt)
}
// Get the SenderState collection for the sender of this pkt
inet::Ipv4Address srcIp = rxPkt->getSrcAddr().toIpv4();
Ipv4Address srcIp = rxPkt->getSrcAddr().toIpv4();
auto iter = ipSendersMap.find(srcIp.getInt());
SenderState* s;
if (iter == ipSendersMap.end()) {
......
......@@ -63,8 +63,10 @@ public:
class OutboundMessage {
public:
explicit OutboundMessage();
explicit OutboundMessage(Ptr<const AppMessage> outMsg,
SendController* sxController, uint64_t msgId,
explicit OutboundMessage(
Ptr<const AppMessage> outMsg,
SendController* sxController,
uint64_t msgId,
std::vector<uint16_t> reqUnschedDataVec);
explicit OutboundMessage(const OutboundMessage& outboundMsg);
~OutboundMessage();
......@@ -90,7 +92,7 @@ public:
const uint32_t& getMsgSize() const { return msgSize; }
const uint64_t& getMsgId() { return msgId; }
const OutbndPktQueue& getTxPktQueue() {return txPkts;}
const std::unordered_set<HomaPkt*>& getTxSchedPkts() {return txSchedPkts;}
const std::unordered_set<Packet*>& getTxSchedPkts() {return txSchedPkts;}
const uint32_t getBytesLeft() { return bytesLeft; }
const simtime_t getMsgCreationTime() { return msgCreationTime; }
bool getTransmitReadyPkt(Packet** outPkt);
......@@ -144,7 +146,7 @@ public:
OutbndPktQueue txPkts;
// Set of all sched pkts ready for transmission
std::unordered_set<HomaPkt*> txSchedPkts;
std::unordered_set<Packet*> txSchedPkts;
// The SendController that manages the transmission of this msg.
SendController* sxController;
......@@ -731,7 +733,7 @@ public:
PROTECTED:
void initialize(HomaConfigDepot* homaConfig,
PriorityResolver* prioResolver);
void processReceivedPkt(HomaPkt* rxPkt);
void processReceivedPkt(Packet* rxPkt);
void processGrantTimers(cMessage* grantTimer);
inline uint64_t getInflightBytes()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment