Commit 9246129d authored by shengjh's avatar shengjh 🇨🇳
Browse files

workloadsynizer pass appmesage to homatransportlayer. Then homatransportlayer...

workloadsynizer pass appmesage to homatransportlayer. Then homatransportlayer should build the real homa packet.
parent 06be6748
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program. If not, see http://www.gnu.org/licenses/.
//
#include "Util.h"
#include "dctrans/transportlayer/homa/HomaPkt_m.h"
#include "dctrans/transportlayer/homa/HomaPktHelper.h"
#include "dctrans/transportlayer/homa/HomaTransport.h"
#include "dctrans/transportlayer/homa/HomaTransport_m.h"
Register_ResultFilter("homaMsgSize", HomaMsgSizeFilter);
Register_ResultFilter("homaPktBytes", HomaPktBytesFilter);
Register_ResultFilter("homaUnschedPktBytes", HomaUnschedPktBytesFilter);
Register_ResultFilter("homaGrantPktBytes", HomaGrantPktBytesFilter);
void
HomaMsgSizeFilter::receiveSignal(cResultFilter *prev, simtime_t_cref t,
cObject *object, cObject *details)
{
auto homaPkt = check_and_cast<Packet*>(object);
auto homaChunk = homaPkt->peekAtFront<HomaPkt>();//->getTag<Homa>()->getHomaTransport()
uint32_t msgSize;
PktType pktType = (PktType)homaChunk->getPktType();
switch (pktType) {
case PktType::REQUEST:
case PktType::UNSCHED_DATA:
msgSize = homaChunk->getUnschedFields().msgByteLen;
break;
case PktType::SCHED_DATA:
case PktType::GRANT: {
HomaTransport::InboundMessage* inbndMsg;
const HomaTransport::OutboundMessage* outMsg;
auto transport = homaChunk->getTag<Homa>()->getHomaTransport();
if ((inbndMsg = transport->rxScheduler.lookupInboundMesg(homaChunk))) {
msgSize = inbndMsg->getMsgSize();
} else {
outMsg = &( transport->sxController.getOutboundMsgMap()->at(homaChunk->getMsgId()) );
ASSERT((transport->getLocalAddr() == homaChunk->getDestAddr()
&& pktType == PktType::GRANT)
|| (transport->getLocalAddr() == homaPkt->getSrcAddr()
&& pktType == PktType::SCHED_DATA));
msgSize = outMsg->getMsgSize();
}
break;
}
default:
throw cRuntimeError("PktType %d is not known!", pktType);
}
fire(this, t , (double)msgSize, details);
}
void
HomaPktBytesFilter::receiveSignal(cResultFilter *prev, simtime_t_cref t,
cObject *object, cObject *details)
{
auto homaPkt = check_and_cast<Packet*>(object);
auto homaChunk = homaPkt->peekAtFront<HomaPkt>();
//HomaPkt* homaPkt = check_and_cast<HomaPkt*>(object);
fire(this, t,
(double)HomaPktHelper::getBytesOnWire( HomaPktHelper::getDataBytes(homaChunk), (PktType)homaChunk->getPktType() ),
details);
}
void
HomaUnschedPktBytesFilter::receiveSignal(cResultFilter *prev, simtime_t_cref t,
cObject *object, cObject *details)
{
//HomaPkt* homaPkt = check_and_cast<HomaPkt*>(object);
auto homaPkt = check_and_cast<Packet*>(object);
auto homaChunk = homaPkt->peekAtFront<HomaPkt>();
switch (homaChunk->getPktType()) {
case PktType::REQUEST:
case PktType::UNSCHED_DATA:
fire(this, t,
//(double)HomaPkt::getBytesOnWire(homaPkt->getDataBytes(), (PktType)homaPkt->getPktType()),
(double)HomaPktHelper::getBytesOnWire( HomaPktHelper::getDataBytes(homaChunk), (PktType)homaChunk->getPktType() ),
details);
return;
default:
return;
}
}
void
HomaGrantPktBytesFilter::receiveSignal(cResultFilter *prev, simtime_t_cref t,
cObject *object, cObject *details)
{
//HomaPkt* homaPkt = check_and_cast<HomaPkt*>(object);
auto homaPkt = check_and_cast<Packet*>(object);
auto homaChunk = homaPkt->peekAtFront<HomaPkt>();
switch (homaChunk->getPktType()) {
case PktType::GRANT:
fire(this, t,
//(double)HomaPkt::getBytesOnWire(homaPkt->getDataBytes(), (PktType)homaPkt->getPktType()),
(double)HomaPktHelper::getBytesOnWire( HomaPktHelper::getDataBytes(homaChunk), (PktType)homaChunk->getPktType() ),
details);
return;
default:
return;
}
}
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program. If not, see http://www.gnu.org/licenses/.
//
#ifndef DCTRANS_COMMON_UTIL_H_
#define DCTRANS_COMMON_UTIL_H_
#include "DCTRANSDefs.h"
#include "Minimal.h"
/**
* Following filters are defined for collecting statistics from packets of type
* HomaPkt. They are used to collect stats at the HomaTransport instances.
*/
class SIM_API HomaMsgSizeFilter : public cObjectResultFilter
{
public:
HomaMsgSizeFilter() {}
virtual void receiveSignal(cResultFilter *prev, simtime_t_cref t, cObject *object, cObject *details);
};
class SIM_API HomaPktBytesFilter : public cObjectResultFilter
{
public:
HomaPktBytesFilter() {}
virtual void receiveSignal(cResultFilter *prev, simtime_t_cref t, cObject *object, cObject *details);
};
class SIM_API HomaUnschedPktBytesFilter : public cObjectResultFilter
{
public:
HomaUnschedPktBytesFilter() {}
virtual void receiveSignal(cResultFilter *prev, simtime_t_cref t, cObject *object, cObject *details);
};
class SIM_API HomaGrantPktBytesFilter : public cObjectResultFilter
{
public:
HomaGrantPktBytesFilter() {}
virtual void receiveSignal(cResultFilter *prev, simtime_t_cref t, cObject *object, cObject *details);
};
#endif /* DCTRANS_COMMON_UTIL_H_ */
......@@ -49,23 +49,35 @@ uint32_t HomaPktHelper::getHomaPktHeaderSize(PktType pktType) {
return size;
}
uint32_t HomaPktHelper::getDataBytes(PktType pktType)
uint32_t HomaPktHelper::getDataBytes(Ptr<const HomaPkt> homaChunk)
{
HomaPkt homaPkt = HomaPkt();
homaPkt.setPktType(pktType);
switch (this->getPktType()) {
switch (homaChunk->getPktType()) {
case PktType::REQUEST:
case PktType::UNSCHED_DATA:
return homaPkt.getUnschedFields().lastByte - homaPkt.getUnschedFields().firstByte + 1;
return homaChunk->getUnschedFields().lastByte - homaChunk->getUnschedFields().firstByte + 1;
case PktType::SCHED_DATA:
return homaPkt.getSchedDataFields().lastByte - homaPkt.getSchedDataFields().firstByte + 1;
return homaChunk->getSchedDataFields().lastByte - homaChunk->getSchedDataFields().firstByte + 1;
case PktType::GRANT:
return 0;
default:
throw cRuntimeError("PktType %d not defined", this->getPktType());
throw cRuntimeError("PktType %d not defined", homaChunk->getPktType());
}
return 0;
}
uint32_t getFirstByte(Ptr<const HomaPkt> homaChunk)
{
switch(homaChunk->getPktType()) {
case PktType::SCHED_DATA:
return homaChunk->getSchedDataFields().firstByte;
case PktType::UNSCHED_DATA:
case PktType::REQUEST:
return homaChunk->getUnschedFields().firstByte;
default:
return 0;
}
}
/**
*
* For a message of size numDataBytes comprised of packets of type homaPktType,
......
......@@ -8,16 +8,68 @@
#ifndef DCTRANS_TRANSPORTLAYER_HOMA_HOMAPKTHELPER_H_
#define DCTRANS_TRANSPORTLAYER_HOMA_HOMAPKTHELPER_H_
#include "HomaPkt_m.h"
#include "inet/common/packet/Packet.h"
#include "dctrans/common/DCTRANSDefs.h"
using namespace inet;
class HomaPktHelper {
public:
HomaPktHelper();
static uint32_t getHomaPktHeaderSize(PktType pktType);
static uint32_t getBytesOnWire(uint32_t numDataBytes, PktType homaPktType);
static uint32_t getDataBytes(PktType pktType);
static uint32_t getDataBytes(Ptr<const HomaPkt> homaChunk);
static uint32_t getFirstByte(Ptr<const HomaPkt> homaChunk);
~HomaPktHelper();
};
/**
* A utility predicate for creating PriorityQueues of HomaPkt instances
* based on priority numbers.
*/
class HomaPktSorter {
public:
HomaPktSorter(){}
/**
* Predicate functor operator () for comparison.
*
* \param pkt1
* first pkt for priority comparison
* \param pkt2
* second pkt for priority comparison
* \return
* true if pkt1 compared greater than pkt2.
*/
bool operator()(const Packet* pkt1, const Packet* pkt2)
{
auto lhs = pkt1->peekAtFront<HomaPkt>();
auto rhs = pkt2->peekAtFront<HomaPkt>();
if (lhs->getPriority() > rhs->getPriority() ||
(lhs->getPriority() == rhs->getPriority() &&
pkt1->getArrivalTime() > pkt2->getArrivalTime()) ||
(lhs->getPriority() == rhs->getPriority() &&
pkt1->getArrivalTime() == pkt2->getArrivalTime() &&
pkt1->getCreationTime() > pkt2->getCreationTime()) ||
(lhs->getPriority() == rhs->getPriority() &&
pkt1->getArrivalTime() == pkt2->getArrivalTime() &&
pkt1->getCreationTime() == pkt2->getCreationTime() &&
lhs->getSrcAddr() == rhs->getSrcAddr() &&
lhs->getMsgId() > rhs->getMsgId()) ||
(lhs->getPriority() == rhs->getPriority() &&
pkt1->getArrivalTime() == pkt2->getArrivalTime() &&
pkt1->getCreationTime() == pkt2->getCreationTime() &&
lhs->getSrcAddr() == rhs->getSrcAddr() &&
lhs->getMsgId() == rhs->getMsgId() &&
HomaPktHelper::getFirstByte(lhs)> HomaPktHelper::getFirstByte(rhs) ))
{
// The last two set of conditions are necessary for ordering pkts belong
// to same message before they are sent to network.
return true;
} else {
return false;
}
}
};
#endif /* DCTRANS_TRANSPORTLAYER_HOMA_HOMAPKTHELPER_H_ */
......@@ -20,9 +20,10 @@
#include "HomaPkt_m.h"
#include "HomaPktHelper.h"
#include "PriorityResolver.h"
#include "HomaPktHelper.h"
#include "inet/networklayer/common/FragmentationTag_m.h"
#include "HomaTag_m.h"
#include "inet/linklayer/common/UserPriorityTag_m.h"
#include "HomaTransport_m.h"
// Required by OMNeT++ for all simple modules.
Define_Module(HomaTransport);
......@@ -74,11 +75,11 @@ HomaTransport::HomaTransport()
, priorityStatsSignals()
, socket()
, localAddr()
, homaConfig(NULL)
, prioResolver(NULL)
, distEstimator(NULL)
, sendTimer(NULL)
, emitSignalTimer(NULL)
, homaConfig(nullptr)
, prioResolver(nullptr)
, distEstimator(nullptr)
, sendTimer(nullptr)
, emitSignalTimer(nullptr)
, nextEmitSignalTime(SIMTIME_ZERO)
, outstandingGrantBytes(0)
{
......@@ -272,7 +273,7 @@ void HomaTransport::handleMessage(cMessage *msg)
}
sxController.processSendMsgFromApp(outMsg);
} else if (msg->arrivedOn("udpIn")) {
handleRecvdPkt(check_and_cast<cPacket*>(msg));
handleRecvdPkt(check_and_cast<Packet*>(msg));
}
}
}
......@@ -285,22 +286,27 @@ void HomaTransport::handleMessage(cMessage *msg)
* \param pkt
* pointer to the packet just arrived from the network.
*/
void HomaTransport::handleRecvdPkt(cPacket* pkt)
void HomaTransport::handleRecvdPkt(Packet* pkt)
{
Enter_Method_Silent();
HomaPkt* rxPkt = check_and_cast<HomaPkt*>(pkt);
//HomaPkt* rxPkt = check_and_cast<HomaPkt*>(pkt);
auto rxChunk = pkt->peekAtFront<HomaPkt>();
// check and set the localAddr
if (localAddr == inet::L3Address()) {
localAddr = rxPkt->getDestAddr();
if (localAddr == L3Address()) {
localAddr = rxChunk->getDestAddr();
} else {
ASSERT(localAddr == rxPkt->getDestAddr());
ASSERT(localAddr == rxChunk->getDestAddr());
}
// update the owner transport for this packet
rxPkt->ownerTransport = this;
emit(priorityStatsSignals[rxPkt->getPriority()], rxPkt);
//rxPkt->ownerTransport = this;
//SJH: I don't know if this is ok.
pkt->findTagForUpdate<Homa>()->setHomaTransport(this) ;
uint32_t pktLenOnWire = HomaPkt::getBytesOnWire(rxPkt->getDataBytes(), (PktType)rxPkt->getPktType());
emit(priorityStatsSignals[rxChunk->getPriority()], pkt);
//uint32_t pktLenOnWire = HomaPkt::getBytesOnWire(rxPkt->getDataBytes(), (PktType)rxPkt->getPktType());
uint32_t pktLenOnWire = HomaPktHelper::getBytesOnWire( HomaPktHelper::getDataBytes(rxChunk), (PktType)rxChunk->getPktType() );
simtime_t pktDuration = SimTime(1e-9 * (pktLenOnWire * 8.0 / homaConfig->nicLinkSpeed));
// update active period stats
......@@ -572,7 +578,7 @@ HomaTransport::SendController::processReceivedGrant(HomaPkt* rxPkt)
void
HomaTransport::SendController::sendOrQueue(cMessage* msg)
{
HomaPkt* sxPkt = NULL;
HomaPkt* sxPkt = nullptr;
if (msg == transport->sendTimer) {
// Send timer fired and it's time to check if we can send a data packet.
ASSERT(msg->getKind() == SelfMsgKind::SEND);
......@@ -605,22 +611,21 @@ HomaTransport::SendController::sendOrQueue(cMessage* msg)
return;
}
// When this function is chighPrioMsgalled to send a grant packet.
sxPkt = dynamic_cast<HomaPkt*>(msg);
// When this function is called highPrioMsg to send a grant packet.
sxPkt = dynamic_cast<Packet*>(msg);
if (sxPkt) {
ASSERT(sxPkt->getPktType() == PktType::GRANT);
if (transport->sendTimer->isScheduled()) {
// NIC tx link is busy sending another packet
EV << "Grant timer is scheduled! Grant to " <<
sxPkt->getDestAddr().toIPv4().str() << ", mesgId: " <<
sxPkt->getMsgId() << " is queued!"<< endl;
sxPkt->getDestAddr().toIPv4().str() << ", mesgId: " << sxPkt->getMsgId() << " is queued!"<< endl;
outGrantQueue.push(sxPkt);
return;
} else {
ASSERT(outGrantQueue.empty());
EV << "Send grant to: " << sxPkt->getDestAddr().toIPv4().str()
<< ", mesgId: " << sxPkt->getMsgId() << ", prio: " <<
EV << "Send grant to: " << sxPkt->getDestAddr().toIpv4().str()
<< ", mesgId: " << sxPkt->getMsgId() << ", prio: " <<
sxPkt->getGrantFields().schedPrio<< endl;
sendPktAndScheduleNext(sxPkt);
return;
......@@ -661,11 +666,12 @@ HomaTransport::SendController::sendOrQueue(cMessage* msg)
* packet to be transmitted.
*/
void
HomaTransport::SendController::sendPktAndScheduleNext(HomaPkt* sxPkt)
HomaTransport::SendController::sendPktAndScheduleNext(Packet* sxPkt)
{
PktType pktType = (PktType)sxPkt->getPktType();
uint32_t numDataBytes = sxPkt->getDataBytes();
uint32_t bytesSentOnWire = HomaPkt::getBytesOnWire(numDataBytes, pktType);
auto chunk = sxPkt->peekAtFront<AppMessage>();
uint32_t numDataBytes = chunk->getDataBytes();
uint32_t bytesSentOnWire = HomaPktHelper::getBytesOnWire(numDataBytes, pktType);
simtime_t currentTime = simTime();
simtime_t sxPktDuration = SimTime(1e-9 * (bytesSentOnWire * 8.0 / homaConfig->nicLinkSpeed));
......@@ -810,7 +816,7 @@ HomaTransport::SendController::OutbndMsgSorter::operator()(
* an message id to uniquely identify this OutboundMessage from every other
* message in this transport.
*/
HomaTransport::OutboundMessage::OutboundMessage(AppMessage* outMsg,
HomaTransport::OutboundMessage::OutboundMessage(Ptr<const AppMessage> outMsg,
SendController* sxController, uint64_t msgId,
std::vector<uint16_t> reqUnschedDataVec)
: msgId(msgId)
......@@ -956,16 +962,17 @@ HomaTransport::OutboundMessage::prepareRequestAndUnsched()
Packet *packet = new Packet();
packet->addTag<FragmentationReq>()->setDontFragment(true);
const auto &unschedDataPart = makeShared<AppMessage>();
const auto &unschedDataPart = makeShared<HomaPkt>();
unschedDataPart->addTag<Homa>()->setHomaTransport(sxController->transport);//SJH: is it right to add tag to chunk?
unschedDataPart->setPktType(pktType);
EV_DEBUG << "Attaching UserPriorityReq" << EV_FIELD(packet) << EV_ENDL;
packet->addTagIfAbsent<UserPriorityReq>()->setUserPriority(userPriority);
// set homa fields
unschedDataPart->setDestAddr(this->destAddr);
unschedDataPart->setSrcAddr(this->srcAddr);
unschedDataPart->setMsgId(this->msgId);
unschedDataPart->setPriority(unschedPrioVec[i]);
EV_DEBUG << "Attaching UserPriorityReq" << EV_FIELD(packet) << EV_ENDL;
packet->addTagIfAbsent<UserPriorityReq>()->setUserPriority(unschedPrioVec[i]);
// fill up unschedFields
UnschedFields unschedFields;
......@@ -1023,8 +1030,7 @@ HomaTransport::OutboundMessage::prepareRequestAndUnsched()
* remaining scheduled bytes to be sent for this message.
*/
uint32_t
HomaTransport::OutboundMessage::prepareSchedPkt(uint32_t offset,
uint16_t numBytes, uint16_t schedPrio)
HomaTransport::OutboundMessage::prepareSchedPkt(uint32_t offset, uint16_t numBytes, uint16_t schedPrio)
{
ASSERT(this->bytesToSched > 0);
uint32_t bytesToSend = std::min((uint32_t)numBytes, this->bytesToSched);
......@@ -1066,11 +1072,12 @@ HomaTransport::OutboundMessage::prepareSchedPkt(uint32_t offset,
* for transmission.
*/
bool
HomaTransport::OutboundMessage::getTransmitReadyPkt(HomaPkt** outPkt)
HomaTransport::OutboundMessage::getTransmitReadyPkt(Packet** outPkt)
{
ASSERT(!txPkts.empty());
HomaPkt* head = txPkts.top();
PktType outPktType = (PktType)head->getPktType();
Packet* head = txPkts.top();
auto chunk = head->peekAtFront<AppMessage>();
PktType outPktType = (PktType)chunk->getPktType();
txPkts.pop();
if (outPktType == PktType::SCHED_DATA) {
txSchedPkts.erase(head);
......@@ -1234,10 +1241,10 @@ HomaTransport::ReceiveScheduler::~ReceiveScheduler()
* the message.)
*/
HomaTransport::InboundMessage*
HomaTransport::ReceiveScheduler::lookupInboundMesg(HomaPkt* rxPkt) const
HomaTransport::ReceiveScheduler::lookupInboundMesg(Ptr<const HomaPkt> rxPkt) const
{
// Get the SenderState collection for the sender of this pkt
inet::IPv4Address srcIp = rxPkt->getSrcAddr().toIPv4();
Ipv4Address srcIp = rxPkt->getSrcAddr().toIpv4();
auto iter = ipSendersMap.find(srcIp.getInt());
if (iter == ipSendersMap.end()) {
return NULL;
......
......@@ -17,19 +17,21 @@
#define DCTRANS_TRANSPORTLAYER_HOMA_HOMATRANSPORT_H_
#include <vector>
#include <queue>
#include <list>
#include <unordered_map>
#include <unordered_set>
#include "common/Minimal.h"
#include "dctrans/common/Minimal.h"
#include "dctrans/common/Util.h"
#include "dctrans/applications/AppMessage_m.h"
#include "dctrans/common/DCTRANSDefs.h"
#include "HomaPkt_m.h"
#include "HomaPktHelper.h"
#include "UnschedByteAllocator.h"
#include "WorkloadEstimator.h"
#include "common/Util.h"
#include "HomaConfigDepot.h"
#include "mocks/MockUdpSocket.h"
#include "inet/transportlayer/contract/udp/UDPSocket.h"
#include "inet/transportlayer/contract/udp/UdpSocket.h"
#include "inet/common/packet/Packet.h"
using namespace inet;
// forward decalration to enable mutual header include
......@@ -61,7 +63,7 @@ public:
class OutboundMessage {
public:
explicit OutboundMessage();
explicit OutboundMessage(AppMessage* outMsg,
explicit OutboundMessage(Ptr<const AppMessage> outMsg,
SendController* sxController, uint64_t msgId,
std::vector<uint16_t> reqUnschedDataVec);
explicit OutboundMessage(const OutboundMessage& outboundMsg);
......@@ -85,13 +87,13 @@ public:
typedef std::priority_queue<Packet*, std::vector<Packet*>, OutbndPktSorter> OutbndPktQueue;
// getters functions
const uint32_t& getMsgSize() { return msgSize; }
const uint32_t& getMsgSize() const { return msgSize; }
const uint64_t& getMsgId() { return msgId; }
const OutbndPktQueue& getTxPktQueue() {return txPkts;}
const std::unordered_set<HomaPkt*>& getTxSchedPkts() {return txSchedPkts;}
const uint32_t getBytesLeft() { return bytesLeft; }
const simtime_t getMsgCreationTime() { return msgCreationTime; }
bool getTransmitReadyPkt(HomaPkt** outPkt);
bool getTransmitReadyPkt(Packet** outPkt);
protected:
// Unique identification number assigned by in the construction time for
......@@ -152,8 +154,7 @@ public:
private:
void copy(const OutboundMessage &other);
std::vector<uint32_t> getUnschedPerPrio(
std::vector<uint32_t>& unschedPrioVec);
std::vector<uint32_t> getUnschedPerPrio(std::vector<uint32_t>& unschedPrioVec);
friend class SendController;
friend class PriorityResolver;
};
......@@ -173,10 +174,11 @@ public:
~SendController();
void initSendController(HomaConfigDepot* homaConfig,
PriorityResolver* prioResolver);
void processSendMsgFromApp(AppMessage* msg);
void processReceivedGrant(HomaPkt* rxPkt);
OutboundMsgMap* getOutboundMsgMap() {return &outboundMsgMap;}
void sendOrQueue(cMessage* msg = NULL);
void processSendMsgFromApp(Ptr<const AppMessage> sendMsg);
void processReceivedGrant(Packet* rxPkt);
// SJH:double const to deal with getTag problem because getTag always return const Ptr
const OutboundMsgMap* getOutboundMsgMap() const {return &outboundMsgMap;}
void sendOrQueue(cMessage* msg = nullptr);
void handlePktTransmitEnd();
public:
......@@ -239,8 +241,7 @@ public:
// Queue for keeping grants that receiver side of this host machine
// wants to send out.
std::priority_queue<HomaPkt*, std::vector<HomaPkt*>,
HomaPkt::HomaPktSorter> outGrantQueue;
std::priority_queue<Packet*, std::vector<Packet*>, HomaPktSorter> outGrantQueue;
// Transport that owns this SendController.
HomaTransport* transport;
......@@ -441,7 +442,7 @@ public:
public:
ReceiveScheduler(HomaTransport* transport);
~ReceiveScheduler();
InboundMessage* lookupInboundMesg(HomaPkt* rxPkt) const;
InboundMessage* lookupInboundMesg(Ptr<const HomaPkt> rxPkt) const;
class UnschedRateComputer {
public:
......@@ -465,11 +466,11 @@ public:
*/
class SenderState {
public:
SenderState(inet::IPv4Address srcAddr,
SenderState(Ipv4Address srcAddr,
ReceiveScheduler* rxScheduler, cMessage* grantTimer,
HomaConfigDepot* homaConfig);
~SenderState(){}
const inet::IPv4Address& getSenderAddr() {
const Ipv4Address& getSenderAddr() {
return senderAddr;
}
simtime_t getNextGrantTime(simtime_t currentTime,
......@@ -480,7 +481,7 @@ public:
protected:
HomaConfigDepot* homaConfig;
ReceiveScheduler* rxScheduler;
inet::IPv4Address senderAddr;
Ipv4Address senderAddr;
// Timer object for sending grants for this sender. Will be used
// to send timer paced grants for this sender if totalBytesInFlight
......@@ -791,7 +792,7 @@ public:
virtual void handleMessage(cMessage *msg);
virtual void finish();
const inet::L3Address& getLocalAddr() {return localAddr;}
void handleRecvdPkt(cPacket* ptk);
void handleRecvdPkt(Packet* ptk);
void processStart();