Commit 9295c5e0 authored by shengjh's avatar shengjh 🇨🇳
Browse files

Work on function prepareRequestAndUnsched():change the unschedPkt to payload unschedPkt data part.

parent 274c0a83
......@@ -3,6 +3,7 @@
*_m.cc
*_sm.h
*_sm.cc
dctransport
/.settings
.vscode
......
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program. If not, see http://www.gnu.org/licenses/.
//
package dctrans.simulations;
import inet.node.inet.Router;
import inet.networklayer.configurator.ipv4.Ipv4NetworkConfigurator;
import dctrans.node.HomaHost;
import ned.DatarateChannel;
network dcn
{
parameters:
xml topologyConfig = xmldoc("config.xml");
// This models NIC intrinsic delay
double edgeLinkDelay @unit(us) = default(0.0us);
// models the fabrciLinkDealy plus the intrinsic switching delay
double fabricLinkDelay @unit(us) = default(0.0us);
// Enables the cut-through switching wherever possible
//bool isFabricCutThrough = default(false);
// NOTE: always greater and equal to nicLinkSpeed
int fabricLinkSpeed @unit(Gbps);
// models the software trun around time, in the hosts, between the time
// ethFrame is received until the notification is received in software
// or when the pkt is send in software until it's received at the NIC.
//double hostSwTurnAroundTime @unit(us);
// when non-zero, it models the intrinsic fixed switching delay
// on the output ports of the routers
double switchFixDelay @unit(us);
// models NIC send think time in host nics.
double hostNicSxThinkTime @unit(us);
// must be greater than or equal to 1.0 (1.0 means full bisection bw)
double overSubFactor = default(1.0);
double realLoadFactor;
int nicLinkSpeed @unit(Gbps);
int numTors;
int numServersPerTor;
int numAggSwitches = (numServersPerTor * nicLinkSpeed) / (overSubFactor * fabricLinkSpeed);
int numHosts = numServersPerTor * numTors;
types:
channel FabricChannel extends ned.DatarateChannel
{
datarate = fabricLinkSpeed;
}
channel EdgeChannel extends DatarateChannel
{
datarate = nicLinkSpeed;
}
submodules:
aggRouter[numAggSwitches]: Router {
@display("i=abstract/router2;p=257,61");
**.promiscuous = true;
}
tor[numTors]: Router {
@display("i=abstract/router;p=257,157");
**.promiscuous = true;
}
host[numHosts]: HomaHost {
@display("i=device/server2;p=257,245");
hostConfig = xmldoc("config.xml", "/topologyConfig/hostConfig[@id=$MODULE_INDEX]");
//appConfig = xmldoc("config.xml", "/topologyConfig/hostConfig[@id=$PARENTMODULE_INDEX]/appConfig");
nicLinkSpeed = nicLinkSpeed;
fabricLinkSpeed = fabricLinkSpeed;
edgeLinkDelay = edgeLinkDelay;
fabricLinkDelay = fabricLinkDelay;
//app[*].hostSwTurnAroundTime = hostSwTurnAroundTime;
hostNicSxThinkTime = hostNicSxThinkTime;
switchFixDelay = switchFixDelay;
}
configurator: Ipv4NetworkConfigurator {
parameters:
config = xmldoc("config.xml", "/topologyConfig/IPv4Configurator/config");
@display("p=26,25");
}
connections allowunconnected:
for i=0..numTors-1, for j=0..numServersPerTor-1 {
tor[i].ethg$o++ --> EdgeChannel { delay = (edgeLinkDelay+switchFixDelay); } --> host[i * numServersPerTor + j].ethg$i++;
tor[i].ethg$i++ <-- EdgeChannel { delay = (edgeLinkDelay+hostNicSxThinkTime); } <-- host[i * numServersPerTor + j].ethg$o++;
}
for i=0..numAggSwitches-1, for j=0..numTors-1 {
aggRouter[i].ethg++ <--> FabricChannel { delay = (fabricLinkDelay+switchFixDelay); } <--> tor[j].ethg++;
}
}
[General]
network=dcn
*.host*.eth[0].typename = "LayeredEthernetInterface"
*.*.eth[*].macLayer.queue.typename = "EthernetPriorityQueue"
*.*.eth[*].macLayer.queue.buffer.typename = ""
*.host*.eth[0].macLayer.queue.numQueues = 8
*.host*.eth[0].macLayer.queue.queue[*].typename = "DropTailQueue"
*.*.ethernet.typename = "EthernetLayer"
*.*.ieee8021q.typename = "Ieee8021qLayer"
\ No newline at end of file
......@@ -561,8 +561,8 @@ WorkloadSynthesizer::processRcvdMsg(Packet* pk)
double
WorkloadSynthesizer::idealMsgEndToEndDelay(Ptr<const AppMessage> rcvdMsg)
{
int totalBytesTranmitted = 0;
inet::L3Address srcAddr = rcvdMsg->getSrcAddr();
int totalBytesTranmitted = 0;
ASSERT(srcAddr.getType() == inet::L3Address::AddressType::IPv4);
inet::L3Address destAddr = rcvdMsg->getDestAddr();
ASSERT(destAddr.getType() == inet::L3Address::AddressType::IPv4);
......
......@@ -12,8 +12,8 @@
// You should have received a copy of the GNU Lesser General Public License
// along with this program. If not, see http://www.gnu.org/licenses/.
//
package dctrans.node.homa;
import dctrans.node.base.LinkLayerNodeBase;
package dctrans.node;
import inet.node.base.LinkLayerNodeBase;
import inet.networklayer.contract.IRoutingTable;
import inet.networklayer.common.InterfaceTable;
import inet.networklayer.contract.INetworkLayer;
......@@ -30,43 +30,44 @@ import dctrans.transportlayer.ITransportScheme;
module HomaHost extends LinkLayerNodeBase
{
@display("i=device/pc2");
xml hostConfig;
int nicLinkSpeed @unit(Gbps);
int fabricLinkSpeed @unit(Gbps);
int numTrafficGeneratorApp = default(0);
// num of TCP apps. Specify the app types in INI file with
// tcpApp[0..1].typename="TCPEchoApp" syntax
//int numTcpApps = default(0);
// num of UDP apps. Specify the app types in INI file with
// udpApp[0..1].typename="UDPVideoStreamCli" syntax
//int numUdpApps = default(0);
bool hasTransportScheme = default(numTrafficGeneratorApp>0);
//bool hasTcp = default(numTcpApps>0);
//bool hasUdp = default((numUdpApps>0) || (numTrafficGeneratorApp>0));
// tcp implementation (e.g. ~TCP, ~TCP_lwIP, ~TCP_NSC) or ~TCPSpoof
//string tcpType = default(firstAvailable("TCP", "TCP_lwIP", "TCP_NSC", "TCP_None"));
//string udpType = default(firstAvailable("UDP","UDP_None"));
string transportSchemeType = default(firstAvailable("HomaTransport", "TransportSchemeNone"));
ipv4.arp.proxyArpInterfaces = default(""); // proxy arp is disabled on hosts by default
double edgeLinkDelay @unit(us);
double fabricLinkDelay @unit(us);
double hostSwTurnAroundTime @unit(us);
double hostNicSxThinkTime @unit(us);
double switchFixDelay @unit(us);
bool forwarding = default(true);
bool multicastForwarding = default(false);
*.forwarding = forwarding;
*.multicastForwarding = multicastForwarding;
bool hasUdp = default(firstAvailableOrEmpty("Udp") != "");
bool hasIpv4 = default(true);
parameters:
@display("i=device/pc2");
xml hostConfig;
int nicLinkSpeed @unit(Gbps);
int fabricLinkSpeed @unit(Gbps);
int numTrafficGeneratorApp = default(0);
// num of TCP apps. Specify the app types in INI file with
// tcpApp[0..1].typename="TCPEchoApp" syntax
//int numTcpApps = default(0);
// num of UDP apps. Specify the app types in INI file with
// udpApp[0..1].typename="UDPVideoStreamCli" syntax
//int numUdpApps = default(0);
bool hasTransportScheme = default(numTrafficGeneratorApp>0);
//bool hasTcp = default(numTcpApps>0);
//bool hasUdp = default((numUdpApps>0) || (numTrafficGeneratorApp>0));
// tcp implementation (e.g. ~TCP, ~TCP_lwIP, ~TCP_NSC) or ~TCPSpoof
//string tcpType = default(firstAvailable("TCP", "TCP_lwIP", "TCP_NSC", "TCP_None"));
//string udpType = default(firstAvailable("UDP","UDP_None"));
string transportSchemeType = default(firstAvailable("HomaTransport", "TransportSchemeNone"));
ipv4.arp.proxyArpInterfaces = default(""); // proxy arp is disabled on hosts by default
double edgeLinkDelay @unit(us);
double fabricLinkDelay @unit(us);
double hostSwTurnAroundTime @unit(us);
double hostNicSxThinkTime @unit(us);
double switchFixDelay @unit(us);
//copy from LinkLayerNodeBase.ned
bool forwarding = default(true);
bool multicastForwarding = default(false);
*.forwarding = forwarding;
*.multicastForwarding = multicastForwarding;
bool hasUdp = default(firstAvailableOrEmpty("Udp") != "");
bool hasIpv4 = default(true);
submodules:
......
//
// Copyright (C) 2020 OpenSim Ltd.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//
package dctrans.node.base;
import inet.node.base.NodeBase;
import inet.common.MessageDispatcher;
import inet.common.packet.recorder.PcapRecorder;
import inet.linklayer.contract.IEthernetInterface;
import inet.linklayer.contract.ILoopbackInterface;
import inet.linklayer.contract.IPppInterface;
import inet.linklayer.contract.ITunnelInterface;
import inet.linklayer.contract.IVirtualInterface;
import inet.linklayer.contract.IWirelessInterface;
import inet.linklayer.ethernet.contract.IEthernetLayer;
import inet.linklayer.ieee8021q.IIeee8021qLayer;
import inet.networklayer.common.InterfaceTable;
module LinkLayerNodeBase extends NodeBase
{
parameters:
bool recordPcap = default(false);
int numPcapRecorders = default(recordPcap ? 1 : 0);
int numLoInterfaces = default(1);
int numWlanInterfaces = default(0);
int numEthInterfaces = default(0); // minimum number of ethernet interfaces
int numPppInterfaces = default(0); // minimum number of PPP interfaces
int numTunInterfaces = default(0);
int numVirtInterfaces = default(0);
pcapRecorder[*].pcapFile = default("results/" + fullPath() + ".pcap");
mobility.typename = default(numWlanInterfaces > 0 ? "StationaryMobility" : "");
*.interfaceTableModule = default(absPath(".interfaceTable"));
wlan[*].radio.antenna.mobilityModule = default("^.^.^.mobility");
@figure[linkLayer](type=rectangle; pos=250,456; size=1000,130; fillColor=#0000ff; lineColor=#808080; cornerRadius=5; fillOpacity=0.1);
@figure[linkLayer.title](type=text; pos=1245,461; anchor=ne; text="link layer");
@figure[interfaceLayer](type=rectangle; pos=250,606; size=1000,210; fillColor=#00ffff; lineColor=#808080; cornerRadius=5; fillOpacity=0.1);
@figure[interfaceLayer.title](type=text; pos=1245,611; anchor=ne; text="interface layer");
gates:
input radioIn[numWlanInterfaces] @directIn;
inout pppg[numPppInterfaces] @labels(PppFrame-conn) @allowUnconnected;
inout ethg[numEthInterfaces] @labels(EtherFrame-conn) @allowUnconnected;
submodules:
pcapRecorder[numPcapRecorders]: PcapRecorder {
parameters:
@display("p=125,640;is=s");
}
interfaceTable: InterfaceTable {
parameters:
@display("p=125,240;is=s");
}
li: MessageDispatcher {
parameters:
@display("p=750,596;b=1000,5,,,,1");
}
ethernet: <default("")> like IEthernetLayer if typename != "" {
parameters:
@display("p=375,526");
}
ieee8021q: <default("")> like IIeee8021qLayer if typename != "" {
parameters:
@display("p=525,526");
}
lo[numLoInterfaces]: <default("LoopbackInterface")> like ILoopbackInterface {
parameters:
@display("p=750,676,row,150");
}
// TODO move wlan interfaces after eth interfaces, but it changes IP address assignment and breaks examples/inet/configurator/complex.ini
wlan[numWlanInterfaces]: <default("Ieee80211Interface")> like IWirelessInterface {
parameters:
@display("p=375,766,row,150;q=queue");
}
ppp[sizeof(pppg)]: <default("PppInterface")> like IPppInterface {
parameters:
@display("p=300,676,row,150;q=txQueue");
}
eth[sizeof(ethg)]: <default("EthernetInterface")> like IEthernetInterface {
parameters:
@display("p=900,676,row,150;q=txQueue");
}
tun[numTunInterfaces]: <default("TunInterface")> like ITunnelInterface {
parameters:
@display("p=975,766,row,150;q=txQueue");
}
virt[numVirtInterfaces]: <default("VirtualInterface")> like IVirtualInterface {
parameters:
@display("p=975,766,row,150;q=txQueue");
}
connections allowunconnected:
ieee8021q.lowerLayerOut --> li.in++ if exists(ieee8021q);
li.out++ --> ieee8021q.lowerLayerIn if exists(ieee8021q);
ethernet.lowerLayerOut --> li.in++ if exists(ethernet);
li.out++ --> ethernet.lowerLayerIn if exists(ethernet);
for i=0..sizeof(radioIn)-1 {
radioIn[i] --> { @display("m=s"); } --> wlan[i].radioIn;
}
for i=0..sizeof(ethg)-1 {
ethg[i] <--> { @display("m=s"); } <--> eth[i].phys;
}
for i=0..sizeof(pppg)-1 {
pppg[i] <--> { @display("m=s"); } <--> ppp[i].phys;
}
for i=0..numLoInterfaces-1 {
li.out++ --> lo[i].upperLayerIn;
lo[i].upperLayerOut --> li.in++;
}
for i=0..sizeof(radioIn)-1 {
wlan[i].upperLayerOut --> li.in++;
wlan[i].upperLayerIn <-- li.out++;
}
for i=0..sizeof(ethg)-1 {
eth[i].upperLayerOut --> li.in++;
eth[i].upperLayerIn <-- li.out++;
}
for i=0..sizeof(pppg)-1 {
ppp[i].upperLayerOut --> li.in++;
ppp[i].upperLayerIn <-- li.out++;
}
for i=0..numTunInterfaces-1 {
tun[i].upperLayerOut --> li.in++;
tun[i].upperLayerIn <-- li.out++;
}
for i=0..numVirtInterfaces-1 {
virt[i].upperLayerOut --> li.in++;
virt[i].upperLayerIn <-- li.out++;
}
}
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program. If not, see http://www.gnu.org/licenses/.
//
#include "HomaTransport.h"
#include <algorithm>
#include <random>
#include <cmath>
#include "HomaPkt_m.h"
#include "HomaPktHelper.h"
#include "PriorityResolver.h"
#include "HomaPktHelper.h"
#include "inet/networklayer/common/FragmentationTag_m.h"
#include "HomaTransportTag_m.h"
// Required by OMNeT++ for all simple modules.
Define_Module(HomaTransport);
/**
* Registering all statistics collection signals.
*/
simsignal_t HomaTransport::msgsLeftToSendSignal =
registerSignal("msgsLeftToSend");
simsignal_t HomaTransport::stabilitySignal =
registerSignal("stabilitySignal");
simsignal_t HomaTransport::bytesLeftToSendSignal =
registerSignal("bytesLeftToSend");
simsignal_t HomaTransport::bytesNeedGrantSignal =
registerSignal("bytesNeedGrant");
simsignal_t HomaTransport::outstandingGrantBytesSignal =
registerSignal("outstandingGrantBytes");
simsignal_t HomaTransport::totalOutstandingBytesSignal =
registerSignal("totalOutstandingBytes");
simsignal_t HomaTransport::rxActiveTimeSignal =
registerSignal("rxActiveTime");
simsignal_t HomaTransport::rxActiveBytesSignal =
registerSignal("rxActiveBytes");
simsignal_t HomaTransport::oversubTimeSignal =
registerSignal("oversubscriptionTime");
simsignal_t HomaTransport::oversubBytesSignal =
registerSignal("oversubscriptionBytes");
simsignal_t HomaTransport::higherRxSelfWasteSignal =
registerSignal("higherRxSelfWasteTime");
simsignal_t HomaTransport::lowerRxSelfWasteSignal =
registerSignal("lowerRxSelfWasteTime");
simsignal_t HomaTransport::sxActiveTimeSignal =
registerSignal("sxActiveTime");
simsignal_t HomaTransport::sxActiveBytesSignal =
registerSignal("sxActiveBytes");
simsignal_t HomaTransport::sxSchedPktDelaySignal =
registerSignal("sxSchedPktDelay");
simsignal_t HomaTransport::sxUnschedPktDelaySignal =
registerSignal("sxUnschedPktDelay");
simsignal_t HomaTransport::activeSchedsSignal =
registerSignal("activeScheds");
/**
* Contstructor for the HomaTransport.
*/
HomaTransport::HomaTransport()
: sxController(this)
, rxScheduler(this)
, trackRTTs(this)
, priorityStatsSignals()
, socket()
, localAddr()
, homaConfig(NULL)
, prioResolver(NULL)
, distEstimator(NULL)
, sendTimer(NULL)
, emitSignalTimer(NULL)
, nextEmitSignalTime(SIMTIME_ZERO)
, outstandingGrantBytes(0)
{
//ev << "Test HomaTransport constructor called" << endl;
//ev << "HomaTransport size in bytes: " << sizeof(HomaTransport) << endl;
}
/**
* Destructor of HomaTransport.
*/
HomaTransport::~HomaTransport()
{
//std::cout << "destructor called" << std::endl;
delete prioResolver;
delete distEstimator;
delete homaConfig;
}
/**
* This functions is used to subscribe HomaTransport's vector signals
* and stats to the core omnet++ simulator. Vector signals are usually defined
* for the set of signals that have a common signal name pattern with one part
* of the name being wildcarded.
*/
void HomaTransport::registerTemplatedStats()
{
// Registering signals and stats for priority usage percentages.
uint32_t numPrio = homaConfig->allPrio;
for (size_t i = 0; i < numPrio; i++) {
char prioSignalName[50];
sprintf(prioSignalName, "homaPktPrio%luSignal", i);
simsignal_t pktPrioSignal = registerSignal(prioSignalName);
priorityStatsSignals.push_back(pktPrioSignal);
char pktPrioStatsName[50];
sprintf(pktPrioStatsName, "homaPktPrio%luSignal", i);
cProperty *statisticTemplate = getProperties()->get("statisticTemplate", "pktPrioStats");
getEnvir()->addResultRecorders(this, pktPrioSignal, pktPrioStatsName, statisticTemplate);
}
}
/**
* This method is a virtual method defined for every simple module in omnet++
* simulator. OMNeT++ core simulator automatically calles this function at the
* early stage of the simulator, after simulation objects are constructed and as
* a part of the steps for the simlation setup.
*/
void HomaTransport::initialize()
{
//std::cout << "HomaTransport::initialize() invoked." << std::endl;
// Read in config parameters for HomaTransport from config files and store
// the parameters in a depot container.
homaConfig = new HomaConfigDepot(this);
//std::cout << homaConfig->destPort << std::endl;
//std::cout << "transport ptr " << static_cast<void*>(homaConfig) <<
// std::endl;
// If grantMaxBytes is given too large in the config file, we should correct
// for it.
// SJH
// HomaPkt dataPkt = HomaPkt();
// dataPkt.setPktType(PktType::SCHED_DATA);
// uint32_t maxDataBytes = MAX_ETHERNET_PAYLOAD_BYTES -
// IP_HEADER_SIZE - UDP_HEADER_SIZE - dataPkt.headerSize();
uint32_t maxDataBytes = MAX_ETHERNET_PAYLOAD_BYTES - IP_HEADER_SIZE - UDP_HEADER_SIZE
- HomaPktHelper::getHomaPktHeaderSize(PktType::SCHED_DATA);
if (homaConfig->grantMaxBytes > maxDataBytes) {
homaConfig->grantMaxBytes = maxDataBytes;
}
// Setting up the timer objects associated with this transport.
sendTimer = new cMessage("SendTimer");
emitSignalTimer = new cMessage("SignalEmitionTimer");
nextEmitSignalTime = SIMTIME_ZERO;
registerTemplatedStats();
distEstimator = new WorkloadEstimator(homaConfig);
prioResolver = new PriorityResolver(homaConfig, distEstimator);
rxScheduler.initialize(homaConfig, prioResolver);
sxController.initSendController(homaConfig, prioResolver);
outstandingGrantBytes = 0;
// set sendTimer state to START and schedule the timer at the simulation
// start-time. When this timer fires, the rest of initialization steps will
// be done ( ie. those step that are needed to be done after simulation
// setup is complete.)
sendTimer->setKind(SelfMsgKind::START);
scheduleAt(simTime(), sendTimer);
//std::cout << "transport ptr " << static_cast<void*>(homaConfig) <<
// std::endl;
}
/**
* This method performs initialization steps that are needed to be done after
* the simulation object is created. It's only invoked when timer state is set
* to START.
*/
void HomaTransport::processStart()
{
//std::cout << "processStart() init called" << std::endl;
// initialized udp socket
socket.setOutputGate(gate("udpOut"));
socket.bind(homaConfig->localPort);
sendTimer->setKind(SelfMsgKind::SEND);
emitSignalTimer->setKind(SelfMsgKind::EMITTER);
scheduleAt(simTime(), emitSignalTimer);
}
/**
* This method is the single place to emit stability signal to the
* GlobalSignalListener. At the events of a new message arrival from
* application, transmission completion of a message at sender, or
* emitSignalTimer firing, this method is called and decides if it's time to
* emit stability signal to the GlobalSignalListener. It further setup next
* signal emission if necessary.
*/
void HomaTransport::testAndEmitStabilitySignal()
{
if (!sxController.outboundMsgMap.size()) {
if (emitSignalTimer->isScheduled())
cancelEvent(emitSignalTimer);
return;
}
if (emitSignalTimer->isScheduled()) {
return;
}
simtime_t currentTime = simTime();
if (currentTime == nextEmitSignalTime) {
emit(stabilitySignal, sxController.outboundMsgMap.size());
nextEmitSignalTime = currentTime + homaConfig->signalEmitPeriod;