虚拟路由实验

自组织和中控UDP模拟虚拟主机间路由

Posted by Nino Lau on October 30, 2018

本实验在小组成员的主机之间,根据选定的虚拟网络拓扑图建立虚拟连接;并且在建立的虚拟网络中选用不同的路由算法(LS 或 DV),生成网络中的路由信息,实现主机之间的数据传输。实验主要分为两部分,一部分是自组织的路由,另一部分是中心化的路由。


实验设计

数据结构定义

  • 主机地址:IP 子网掩码 端口
  • 数据包:源地址 目的地址 内容 包类型
  • OSPF协议的路由表表项:目的地址 下一跳地址
  • RIP协议的路由表表项:目的地址 下一跳地址 到目的地的跳数

自组织路由

数据传输

实验中,采用 UDP 进行数据传输。在每一台主机上,选用两个端口,$Port_{in}$ 和 $Port_{out}$ ,分别用于接收数据和发送数据。使用套接字,在主机之间进行数据传输。每台主机从输入端口接受收的数据,并进行相应的处理。

路由算法

实验中选用 OSPF 和 RIP 两种选路协议,分别对应于 LS 和 DV 的路由算法。

OSPF 协议

每一台主机都会维护一个自己的网络拓扑图,并定期地向网络内的所有其他主机广播自己的链路状态信息。每台主机都根据从 $Port_{in}$ 端口收到的链路状态信息,更新自己维护的网络拓扑图,通过 Dijkstra 最短路径算法生成自己的路由表。

RIP 协议

每一台主机都会定期地向网络内的相邻主机广播自己的距离向量信息(个体转发表)。每台主机都根据从 $Portin$ 端口收到的选路信息,更新个体转发表。主机更新本地路由的同时,都会向相邻的节点发送新的距离向量。转发表中使用跳数作为费用测度,即每一条链路的计为1跳,费用为1。此外,每一条路径的最大费用被限制为15,若大于15,则被视为该目的主机不可达。

发送数据

当主机要发送一个普通的数据包 M 或是从别的主机收到一个数据包 M 需要转发时,主机查询自己的路由表:若路由表中存在该表项,将其发送到该目的地址对应的下一跳主机;若路由表中无对应表项,则将其丢弃。

宕机检测

OSPF 协议

实验中,主机之间都定时地向其他所有主机发送链路状态信息。每台主机(e.g. A)都记录着与这台主机(e.g. B)上一次交换路由信息的时间。若在指定时间内,A 还未接收到 B 下一次的路由信息,则 A 就会判定 B 已经发生故障。这时,A 就会将 B 从自己维护的网络拓扑图中移除,然后通过 Dijkstra 最短路径算法重新生成自己的路由表。

RIP 协议

实验中,主机之间都定时地向相邻主机发送距离向量信息。每台主机(e.g. A)都记录着与这台主机(e.g. B)上一次交换路由信息的时间,若超过指定时间,A还未接收到B下一次的路由信息,则A就会判定B已经发生故障。这时,为避免路由环路,A 会采用 “毒性反转“(当一条路径信息变为无效之后,路由器并不立即将它从路由表中删除,而是用16,即不可达的度量值将它广播给相邻的主机)的方式将 B 故障的消息广播出去,让所有主机都得到 B 故障的消息并更新自己的路由信息。

中心化路由

数据传输

数据传输部分与自组织路由相同,采用 UDP 进行数据传输。在每一台主机上,选用两个端口,$Port_{in}$ 和 $Port_{out}$ ,分别用于接收数据和发送数据。每台主机通过 $Port_{out}$ 向控制主机 Server 发送信息。控制主机收集处理各节点发来的信息得到路由表,将路由表发送至每台主机的 $Port_{in}$ 端口。

路由算法

中心化路由同样选用 LS 和 DV 两种路由算法,对应于 OSPF 和 RIP 两种选路协议。

OSPF 协议

每台主机定期向控制主机 Server 发送自己的链路状态信息。控制主机 Server 收到每台主机发送来的信息后可以构建出整个网络的拓扑结构并不断更新,通过 Dijkstra 最短路径算法为每个发送来信息的主机计算出其路由表,并将路由表返回给相应的主机。

RIP 协议

每台主机定期向控制主机 Server 发送自己的路由表信息。接收到每台主机发送过来的路由表信息之后,根据 DV 算法更新发送信息主机相邻主机的路由表信息,将更新后的路由表信息发送给路由表改变的主机。

宕机检测

OSPF 协议

实验中,主机之间都定时地向其他所有主机发送链路状态信息。每台主机(e.g. A)都记录着与这台主机(e.g. B)上一次交换路由信息的时间。若在指定时间内,A 还未接收到 B 下一次的路由信息,则 A 就会判定 B 已经发生故障。这时,A 就会将 B 从自己维护的网络拓扑图中移除,然后通过 Dijkstra 最短路径算法重新生成自己的路由表。

RIP 协议

实验中,主机之间都定时地向相邻主机发送距离向量信息。每台主机(e.g. A)都记录着与这台主机(e.g. B)上一次交换路由信息的时间,若超过指定时间,A还未接收到B下一次的路由信息,则A就会判定B已经发生故障。这时,为避免路由环路,A 会采用 “毒性反转“(当一条路径信息变为无效之后,路由器并不立即将它从路由表中删除,而是用16,即不可达的度量值将它广播给相邻的主机)的方式将 B 故障的消息广播出去,让所有主机都得到 B 故障的消息并更新自己的路由信息。


实验部署

自组织路由

自组织路由的实现文件如下。

Configuration/Nodes 中的是每台主机的配置信息,包括主机名,IP,子网掩码,端口号。主机通过读取该文件,获取其他主机的配置信息。A、B、C、D、E主机的接收端口 $Port_{in}$ 分别配置为30001、30002、30003、30004、30005,IP和子网掩码均为127.0.0.1、24。

Configuration/Topo 中的是网络拓扑图的信息,是一个5*5的矩阵。主机通过读取该文件,获取自己的链路状态信息,邻居,网络中的主机数量等信息。链路费用如下表所示:

  A B C D E
A 0 -1 80 60 20
B -1 0 -1 50 70
C 80 -1 0 60 -1
D 60 50 60 0 -1
E 20 70 -1 -1 0

Codes/DataStructure.py 包含了实验用到的数据结构:

  • 地址 Address 数据结构

  • class Address():
    	def __init__(self, ip, mask:int, port:int):
    		self.ip = ip;
    		self.mask = mask;
    		self.port = port;
      
    	def __eq__(self, another):
    		return self.ip == another.ip and self.mask == another.mask and self.port == another.port;
    
  • 数据包 Packet 数据结构

    class Packet():
    	def __init__(self, src:Address=None, dest:Address=None, payload=None, packetType:int=None):
    		self.src = src;
    		self.dest = dest;
    		self.payload = payload;
    		self.packetType = packetType;  	
      
    	def serialize(self):
    		return json.dumps(self, default=lambda obj: obj.__dict__);
      
    	def deserialize(self, serialization):
    		d = json.loads(serialization);
    		self.src = Address(d['src']['ip'], d['src']['mask'], d['src']['port']);
    		self.dest = Address(d['dest']['ip'], d['dest']['mask'], d['dest']['port']);
    		self.payload = d['payload'];
    		self.packetType = int(d['packetType']);
    
  • OSPF 协议的路由表条目 OSPF_ForwardingTableEntry 数据结构

    class OSPF_ForwardingTableEntry():
    	def __init__(self, dest:str=None, nextHop:str=None):
    		self.dest = dest;
    		self.nextHop = nextHop;
      
    	def __str__(self):
    		return "dest: " + str(self.dest) + ", next-hop: " + str(self.nextHop);
    
  • RIP 协议的路由表条目 RIP_RoutingTableEntry 数据结构

    class RIP_RoutingTableEntry():
    	def __init__(self, dest:str=None, nextHop:str=None, hopsToDest:int=0):
    		self.dest = dest;
    		self.nextHop = nextHop;
    		self.hopsToDest = hopsToDest;
      
    	def __str__(self):
    		return "dest: " + str(self.dest) + ", next-hop: " + str(self.nextHop) + ", hops-to-dest: " + str(self.hopsToDest);
    
  • 主机 Node 数据结构

    class Node():
    	'''
    	name: str  # eg: 'A', 'B'
    	address: Address
    	neighbors: dict  # eg: {'D': (<__main__.Address object at 0x7f3c575d7a90>, 60), 'E': (<__main__.Address object at 0x7f3c575d7ac8>, 20), 'C': (<__main__.Address object at 0x7f3c575d7a58>, 80)}
    	socket: socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    	'''
    	def __init__(self, name):
    	def printOutputMessageHeader(self):
    	def printOSPFForwardingTable(self):
    	def printRIPRoutingTable(self):
    	def send_a_normal_packet(self, dest:Address, payload, packetType):
    	def forward_a_normal_packet(self, recvPkt: Packet):
    

Codes/SendPacket.py 用于控制主机之间发送普通数据包。当运行该文件时,通过获取用户输入的源主机和目的主机,会通过端口 29999 向对应源主机发送一个命令数据包,要求源主机在收到该命令后,向指定的目的主机发送一个普通的数据包,内容为”Hello, I’m X.”。

src = input("Input the source of this packet(eg: 'A', 'B'..., 'E'): ");
dest =  input("Input the destination of this packet(eg: 'A', 'B'..., 'E'): ");

srcAddr = name_To_address(src);

commandSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM);  #  采用UDP
# commandSocket.bind(('127.0.0.1', 29999));

payload = dest;
sendpkt = Packet(Address('127.0.0.1', 24, 29999), srcAddr, payload, 2);
commandSocket.sendto(sendpkt.serialize().encode(), (srcAddr.ip, srcAddr.port));

Codes/RoutingUsingLS.py 用于实现 OSPF 的路由协议,主要实现3部分的功能:

  1. 通过端口 $Port_{out}$ 向其他所有主机周期性地广播自己的链路状态信息;

    # 向所有路由器周期性地广播节点node的链路状态信息
    def broadcast_link_state_periodcally(node: Node):
    	while True:
    		node.printOutputMessageHeader();
    		print('broadcasting link state... ');
       
    		for n in nodesInTopo:
    			if n != node.name:
    				n_addr = name_To_address(n);
    				sendpkt = Packet(node.address, n_addr, node.neighbors, 1);
    				node.sendSocket.sendto(sendpkt.serialize().encode(), (n_addr.ip, n_addr.port));
       
    		time.sleep(30);
       
    # 启动周期性地广播链路状态的线程
    def start_broadcast_link_state_periodcally_thread(node: Node):
    	t = threading.Thread(target=broadcast_link_state_periodcally, args=(node,), name='BroadcastLinkStateThread');
    	t.start();
    
  2. 监听端口 $Port_{in}$ 是否收到数据包,根据其中 packetType 字段进行处理。其中,packetType 为0表示普通数据包。主机通过判断检查其目的主机字段,决定是接收、转发还是丢弃;1表示 OSPF 的链路状态信息数据包,主机会以此更新该网络的拓扑结构图,并通过 Dijkstra 最短路径算法计算最短路径;2 表示该数据包是一条发送数据包的指令,主机会向指令中指定的目的主机发送一个”Hello, I’m x.”的报文;

    # 监听是否收到数据包
    def start_UDP_listener_thread(node: Node):
    	t = threading.Thread(target=handle_receiving_packet, args=(node,), name='UDPListenerThread');
    	t.start();
       
    # 处理收到的数据包
    def handle_receiving_packet(node: Node):
    	while True:
    		data, addr = node.receiveSocket.recvfrom(1024);
    		recvPkt = Packet();
    		recvPkt.deserialize(bytes.decode(data));
               
    		if recvPkt.packetType == 0:
    			handle_receiving_normal_packet(node, recvPkt);
    		elif recvPkt.packetType == 1:
    			handle_receiving_OSPF_link_state_packet(node, recvPkt);
    		elif recvPkt.packetType == 2:
    			handle_receiving_command_packet(node, recvPkt);
       
       
    def handle_receiving_command_packet(node: Node, recvPkt: Packet):
    def handle_receiving_normal_packet(node: Node, recvPkt: Packet):
    def handle_receiving_OSPF_link_state_packet(node: Node, recvPkt: Packet):
    def run_dijkstra_algorithm(node: Node):
    def construct_forwarding_table(node: Node, prev_step: dict):
    
  3. 周期性地检查其他所有节点是否宕机,若有节点宕机,则更新自己的链路状态信息,并重新运行 Dijkstra 最短路径算法,生成新的路由表。

    # 启动周期性宕机检测的线程
    def start_check_nodes_alive_periodcally_thread(node: Node):
    	t = threading.Thread(target=check_nodes_alive_periodcally, args=(node,), name='CheckNodesAliveThread');	
    	t.start();
       
    # 周期性地检查其他节点是否宕机
    def check_nodes_alive_periodcally(node: Node):
    	while True:
    		lock.acquire();
    		try:
    			aliveNodes = nodesAliveInTopo.copy();
    			for nodeName in aliveNodes:
    				if nodeName != node.name and time.time() - lastTimeRecvPktFromNode[nodeName] > 60:
    					nodesAliveInTopo.remove(nodeName);
    					node.printOutputMessageHeader();
    					print('periodcally check...,', nodeName, 'is down...');
    					run_dijkstra_algorithm(node);
    		finally:
    			lock.release();
       
    		time.sleep(20);
    

当运行 OSPF 时,需要输入本主机的主机名,然后,这三个功能会以三个线程的方式一起运行:

if __name__ == "__main__":
	...
	start_broadcast_link_state_periodcally_thread(a);
	start_UDP_listener_thread(a);
	start_check_nodes_alive_periodcally_thread(a);

Codes/RoutingUsingDV.py 用于实现 RIP 的路由协议,主要实现3部分的功能:

  1. 通过端口 $Port_{out}$ 向相邻路由器周期性(30s)地交换距离向量;

    # 启动周期性地交换距离向量的线程
    def start_send_distance_vector_periodcally_thread(node: Node):
    	t = threading.Thread(target=send_distance_vector_periodcally, args=(node,), name='SendDistanceVectorThread');
    	t.start();
           
    # 相邻路由器周期性(30s)地交换距离向量
    def send_distance_vector_periodcally(node: Node):
    	while True:
    		node.printOutputMessageHeader();
    		print('sending DV periodcally... ');
       		
    		lock.acquire();
    		try:
    			for n in node.neighbors:
    				if n != node.name:
    					n_addr = name_To_address(n);
    					sendpkt = Packet(node.address, n_addr, node.RIP_routingTable, 1);
    					node.sendSocket.sendto(sendpkt.serialize().encode(), (n_addr.ip, n_addr.port));
    		finally:
    			lock.release();
       
    		time.sleep(30);
    
  2. 监听端口 $Portin$ 是否收到数据包,并根据数据包中的 packetType 字段,处理收到的数据包。其中,packetType 为0表示普通数据包,主机通过判断检查其目的主机字段,决定是接收,转发还是丢弃;1表示 RIP 响应报文数据包,主机会合并自己的 RIP 表和收到的 RIP 表,更新本地路由后,向相邻的主机发送新的距离向量;2表示该数据包是一条发送数据包的指令,主机会向指令中指定的目的主机发送一个”Hello, I’m x.”的报文;

    # 监听是否收到数据包
    def start_UDP_listener_thread(node: Node):
    	t = threading.Thread(target=handle_receiving_packet, args=(node,), name='UDPListenerThread');
    	t.start();
       
    # 处理收到的数据包
    def handle_receiving_packet(node: Node):
    	while True:
    		data, addr = node.receiveSocket.recvfrom(1024);  # 接收数据
    		recvPkt = Packet();
    		recvPkt.deserialize(bytes.decode(data));
       
    		if recvPkt.packetType == 0:
    			handle_receiving_normal_packet(node, recvPkt);
    		elif recvPkt.packetType == 1:
    			handle_receiving_RIP_distance_vector_packet(node, recvPkt);
    		elif recvPkt.packetType == 2:
    			handle_receiving_command_packet(node, recvPkt);
       
    def handle_receiving_normal_packet(node: Node, recvPkt: Packet):
    def handle_receiving_RIP_distance_vector_packet(node: Node, recvPkt: Packet):
    def handle_receiving_command_packet(node: Node, recvPkt: Packet):
    
  3. 周期性地检查相邻节点是否宕机,若有节点宕机,则采用 “毒性反转” 的方式解决路由环路,当一条路径信息变为无效之后,路由器并不立即将它从路由表中删除,而是用16,即不可达的度量值将它广播给相邻的主机。

    # 启动周期性宕机检测的线程
    def start_check_neighbor_nodes_alive_periodcally_thread(node: Node):
    	t = threading.Thread(target=check_neighbor_nodes_alive_periodcally, args=(node,), name='CheckNeighborNodesAliveThread');	
    	t.start();
       
    # 周期性地检查其他节点是否宕机
    def check_neighbor_nodes_alive_periodcally(node: Node):
    	while True:
    		lock.acquire();
    		try:
    			aliveNodes = node.aliveNeighbors.copy();
    			for nodeName in aliveNodes:
    				if time.time() - lastTimeRecvPktFromNode[nodeName] > 60:
       
    					node.printOutputMessageHeader();
    					print('periodcally check...,', nodeName, 'is down...');
    					node.aliveNeighbors.remove(nodeName);
                           
    					# 采用 "毒性反转" 解决路由环路
    					for i in range(len(node.RIP_routingTable)):
    						if node.RIP_routingTable[i].dest == nodeName or node.RIP_routingTable[i].nextHop == nodeName:
    							node.RIP_routingTable[i].hopsToDest = 16;
       
    					# 向相邻的的alive的节点发送 distance vector
    					send_distance_vector_once(node);
    					node.printOutputMessageHeader();
    					print('RIP routing table after updating... ');
    					node.printRIPRoutingTable();
    		finally:
    			lock.release();
       
    		time.sleep(30);
       
    

当运行RIP时,需要输入本主机的主机名,然后,这三个功能会以三个线程的方式一起运行:

if __name__ == "__main__":
	...
	start_broadcast_link_state_periodcally_thread(a);
	start_UDP_listener_thread(a);
	start_check_nodes_alive_periodcally_thread(a);

中心化路由

中心化路由的实现文件如下。

中心化路由修改了 Codes/RoutingUsingLS.py 以及 Codes/RoutingUsingDV.py 的实现,并添加了Controller 模块,其余文件以及节点信息和拓扑结构与自组织路由相同。

Codes/RoutingUsingLSCentralized.py 实现了采用 LS 算法的中心化路由的主机部分,主要实现2部分的功能:

  1. 通过端口 $Port_{out}$ 向控制主机周期性地发送自己的链路状态信息;

    # 向控制主机周期性地发送节点node的链路状态信息
    def send_link_state_periodcally(node: Node):
        while True:
       
            n_addr = name_To_address('M');
            sendpkt = Packet(node.address, n_addr, node.neighbors, 1);
            node.sendSocket.sendto(sendpkt.serialize().encode(), (n_addr.ip, n_addr.port));
       
            time.sleep(30);
       
    # 启动周期性地发送链路状态的线程
    def start_broadcast_link_state_periodcally_thread(node: Node):
        t = threading.Thread(target=send_link_state_periodcally, args=(node,), name='SendcastLinkStateThread');
        t.start();
    
  2. 监听端口 $Port_{in}$ 是否收到数据包,并根据数据包中的 packetType 字段,处理收到的数据包。其中,packetType为0表示普通数据包,主机通过判断检查其目的主机字段,决定是接收、转发还是丢弃;1表示控制主机发送来的路由表,主机根据其更新自己的路由信息;2表示该数据包是一条发送数据包的指令,主机会向指令中指定的目的主机发送一个”Hello, I’m x.”的报文;

    # 监听是否收到数据包
    def start_UDP_listener_thread(node: Node):
        t = threading.Thread(target=handle_receiving_packet, args=(node,), name='UDPListenerThread');
        t.start();
       
    # 处理收到的数据包
    def handle_receiving_packet(node: Node):
        while True:
            data, addr = node.receiveSocket.recvfrom(1024);  
       
            recvPkt = Packet();
            recvPkt.deserialize(bytes.decode(data));
       
            if recvPkt.packetType == 0:  
                handle_receiving_normal_packet(node, recvPkt);
            elif recvPkt.packetType == 1:
                handle_receiving_forwarding_table(node, recvPkt);
            elif recvPkt.packetType == 2:
                handle_receiving_command_packet(node, recvPkt);
       
    def handle_receiving_forwarding_table(node: Node, recvPkt: Packet):
    def handle_receiving_normal_packet(node: Node, recvPkt: Packet):
    def handle_receiving_command_packet(node: Node, recvPkt: Packet):
    

当运行该文件时,需要输入本主机的主机名,程序根据主机名获取到自己的节点信息,然后上述两个功能会以两个线程的形式同时工作:

if __name__ == "__main__":
    start_UDP_listener_thread(a);
    start_broadcast_link_state_periodcally_thread(a);

Codes/ControllerUsingLS.py 实现了采用 LS 算法的中心化路由的控制主机部分,主要完成了2个功能:

  1. 监听端口 $Port_{in}$ 是否收到数据包,并根据数据包中的 packetType 字段,处理收到的数据包。其中,1表示 OSPF 的链路状态信息数据包,控制主机会以此更新该网络的拓扑结构图,并通过 Dijkstra 最短路径算法计算最短路径,生成该节点的路由表,并最后将路由表返回给发送链路信息的主机;若收到0或1的数据包则将其丢弃;

    def handle_receiving_packet(node: Node):
        while True:
            data, addr = node.receiveSocket.recvfrom(1024);
       
            recvPkt = Packet();
            recvPkt.deserialize(bytes.decode(data));
       
            if recvPkt.packetType == 1:  
                handle_receiving_OSPF_link_state_packet(node, recvPkt);
       
    def handle_receiving_OSPF_link_state_packet(node: Node, recvPkt: Packet):
    def construct_forwarding_table(client: dict, prev_step: dict):
    def return_forwarding_table(node: Node, client: dict):
        targetAddr = client['address'];
        targetForwardingTabel = forwardingTables[client['name']];
        sendpkt = Packet(node.address, targetAddr,targetForwardingTabel , 1);
        node.sendSocket.sendto(sendpkt.serialize().encode(), (targetAddr.ip, targetAddr.port));
    
  2. 周期性地检查所有节点是否宕机,若有节点宕机,则更新网络的拓扑结构,当收到新的请求时将以最新的拓扑结构运行 Dijkstra 最短路径算法,生成新的路由表。

    # 周期性地检查其他节点是否宕机
    def check_nodes_alive_periodcally(node: Node):
        while True:
            lock.acquire();
            try:
       
                aliveNodes = nodesAliveInTopo.copy();
       
                for nodeName in aliveNodes:
                    if nodeName != node.name and time.time() - lastTimeRecvPktFromNode[nodeName] > 60:
                    	nodesAliveInTopo.remove(nodeName);
                        node.printOutputMessageHeader();
                        print('periodcally check...,', nodeName, 'is down...');
            finally:
                lock.release();
       
            time.sleep(20);
       
    # 启动周期性宕机检测的线程
    def start_check_nodes_alive_periodcally_thread(node: Node):
    	t = threading.Thread(target=check_nodes_alive_periodcally, args=(node,), name='CheckNodesAliveThread');
    	t.start();
    

当运行此文件时,控制主机便会在本机上开始运作,以上两个功能会以两个线程的方式一起运行:

if __name__ == "__main__":
    ...
    start_UDP_listener_thread(m);
    start_check_nodes_alive_periodcally_thread(m);

Codes/RoutingUsingDVCentralized.py 实现了采用 DV 算法的中心化路由的主机部分,主要实现了2部分功能:

  1. 通过 $Port_{out}$ 端口向控制主机周期性地发送自己的路由表信息;

    # 相邻路由器周期性地交换距离向量
    def send_distance_vector_periodcally(node: Node):
    	while True:
    		# node.printOutputMessageHeader();
    		# print('sending DV periodcally... ');
       		
    		lock.acquire();
    		try:
    			n_addr = name_To_address('M');
    			sendpkt = Packet(node.address, n_addr, node.RIP_routingTable, 1);
    			node.socket.sendto(sendpkt.serialize().encode(), (n_addr.ip, n_addr.port));
    		finally:
    			lock.release();
       
    		time.sleep(30);
       
    # 启动周期性地交换距离向量的线程
    def start_send_distance_vector_periodcally_thread(node: Node):
    	t = threading.Thread(target=send_distance_vector_periodcally, args=(node,), name='SendDistanceVectorThread');
    	t.start();
    
  2. 监听 $Port_{in}$ 端口是否接收到数据包。并根据数据包中的 packetType 字段,处理收到的数据包。其中, packetType 为0表示普通数据包,主机通过判断检查其目的主机字段,决定是接收、转发还是丢弃;1表示控制主机发送来的路由表,主机根据其更新自己的路由信息;2表示该数据包是一条发送数据包的指令,主机会向指令中指定的目的主机发送一个”Hello, I’m x.”的报文。

    # 监听是否收到数据包
    def start_UDP_listener_thread(node: Node):
    	t = threading.Thread(target=handle_receiving_packet, args=(node,), name='UDPListenerThread');
    	t.start();
       
    # 处理收到的数据包
    def handle_receiving_packet(node: Node):
    	while True:
    		data, addr = node.socket.recvfrom(1024);
    		recvPkt = Packet();
    		recvPkt.deserialize(bytes.decode(data));
       
    		if recvPkt.packetType == 0:
    			handle_receiving_normal_packet(node, recvPkt);
    		elif recvPkt.packetType == 1:
    			handle_receiving_RIP_distance_vector_packet(node, recvPkt);
    		elif recvPkt.packetType == 2:
    			handle_receiving_command_packet(node, recvPkt);
       
    def handle_receiving_normal_packet(node: Node, recvPkt: Packet):
    def handle_receiving_RIP_distance_vector_packet(node: Node, recvPkt: Packet):
    def handle_receiving_command_packet(node: Node, recvPkt: Packet):
    

当运行该文件时,需要输入本主机的主机名,程序根据主机名获取到自己的节点信息,然后上述两个功能会以两个线程的形式同时工作。

if __name__ == "__main__":
	...
	start_UDP_listener_thread(a);
	start_send_distance_vector_periodcally_thread(a);

Codes/ControllerUsingDV.py 实现了采用 DV 算法的中心化路由的控制主机部分,主要完成了2个功能:

  1. 监听 $Portin$ 端口是否接收到数据包,并根据数据包中的 packetType 字段,处理收到的数据包。其中,1表示 OSPF 的链路状态信息数据包。控制主机会根据收到的路由表信息,替换当前存储的目标主机的路由表信息,并且使用 DV 算法更新目标主机相邻主机的路由表信息。之后将更新的路由表信息发送给相对应的主机(只发送有路由表更新的对应主机)。收到类型为0或2的数据包则丢弃;

    # 监听是否收到数据包
    def start_UDP_listener_thread(node: Node):
    	t = threading.Thread(target=handle_receiving_packet, args=(node,), name='UDPListenerThread');
    	t.start();
       
    # 处理收到的数据包
    def handle_receiving_packet(node: Node):
    	while True:
    		data, addr = node.socket.recvfrom(1024);
    		recvPkt = Packet();
    		recvPkt.deserialize(bytes.decode(data));
       
    		if recvPkt.packetType == 1: 
    			handle_receiving_RIP_distance_vector_packet(node, recvPkt);
       
       
    def handle_receiving_normal_packet(node: Node, recvPkt: Packet):
    def handle_receiving_RIP_distance_vector_packet(node: Node, recvPkt: Packet):
    def handle_receiving_command_packet(node: Node, recvPkt: Packet):
    
  2. 周期性地检查所有节点是否宕机,若有节点宕机,则更新网络的拓扑结构,并且使用“毒性反转”更新故障主机相邻主机的路由表信息,然后将跟新后的路由表信息发送给对应的相邻主机。

    # 启动周期性宕机检测的线程
    def start_check_neighbor_nodes_alive_periodcally_thread(node: Node):
    	t = threading.Thread(target=check_neighbor_nodes_alive_periodcally, args=(node,), name='CheckNeighborNodesAliveThread');	
    	t.start();
       
    # 周期性地检查其他节点是否宕机
    def check_neighbor_nodes_alive_periodcally(node: Node):
    	while True:
    		lock.acquire();
    		try:
    			aliveNodes = nodesAliveInTopo.copy();
                   
    			for nodeName in aliveNodes:
    				if time.time() - lastTimeRecvPktFromNode[nodeName] > 60:
    					node.printOutputMessageHeader();
    					print('periodcally check...,', nodeName, 'is down...');
    					nodesAliveInTopo.remove(nodeName);
    					# 采用 "毒性反转" 解决路由环路
    					for v in allNeighbor[nodeName]:
    						if v != nodeName and (v in nodesAliveInTopo):
    							for i in range(len(rip_table[v])):
    								if rip_table[v][i].dest == nodeName or rip_table[v][i].nextHop == nodeName:
    									rip_table[v][i].hopsToDest = 16;
    					send_distance_vector_once(nodeName);
    					node.printOutputMessageHeader();
    					print('RIP routing table after updating... ');
       
    		finally:
    			lock.release();
    		time.sleep(30);
    

当运行此文件时,控制主机便会在本机上开始运作,以上两个功能会以两个线程的方式一起运行:

if __name__ == "__main__":
	...
	start_UDP_listener_thread(m);
	start_check_neighbor_nodes_alive_periodcally_thread(m);

实验结果

自组织路由

网络拓扑图

OSPF 协议

状态广播

分别输入主机名建立五个主机,五个主机开始运行,并周期性(30s)地广播链路状态信息。五个主机接收 OSPF 数据包,并更新自己的路由表。

可见,在A主机的信息记录中,五个主机依次出现在 alive nodes 序列。

数据传输

根据路由表选择最短路径,转发数据包。A向B发送一个数据包,A发送报文”Hello, I’m A”给B,下一跳是E;

E收到A发的报文,转发到目的主机B,下一跳是B;

B接收到A发的报文。

A向B发送报文时,可以有多条路径,例如:A->E->B,A->D->B,A->C->D->B等,但主机A确从中选择了最短的路径A->E->B,可见我们路由的优化是有效的。

宕机检测

将主机C关闭,当其他主机在指定时间范围内(60s)都没有收到其广播的 OSPF 报文,其他主机就会判定C已经 down 掉了,并更新自己的路由表。

RIP 协议

状态广播

输入主机名,主机开始运行,并周期性(30s)地广播链路状态信息。接收 RIP 数据包,并更新自己的路由表。

可见,在A主机的信息记录中,五个主机依次出现在 alive nodes 序列。

数据传输

根据路由表选择最短路径,转发数据包。D向E发送一个数据包,D发送报文”Hello, I’m D”给E,下一跳是A;

A收到D发的报文,转发到目的主机E,下一跳是E;

E接收到D发的报文。

D向E发送报文时,有多条路径,例如:D->B->E,D->A->E, D->C->A->E等等,但主机D从中选择了最短的路径D->A->E。

宕机检测

将主机A关闭,当其他相邻主机在指定时间范围内(60s)都没有收到其广播的RIP报文,相邻主机就会判定A已经 down 掉了,并更新自己的路由表。主机C创建时的原始路由表:

注意到C的RIP转发表中与主机A有关的条目的跳数都被置为16。这时,随着分布式的距离向量算法的不断收敛,C的路由表也在不断更新。C到E的路径,从A宕机前的C->A->E,代价为2跳变为A宕机后的C->D->B->E,代价为3跳。A宕机之后的路由表:

中心化路由

网络拓扑图

OSPF 协议

状态广播

根据路由表选择最短路径,转发数据包。启动控制主机,接收其他主机发送的链路信息,更新自己的网络拓扑结构。根据路由表选择最短路径,转发数据包。启动网络节点主机输入主机名,主机开始运行,并周期性地向控制主机发送链路状态信息。

主机向控制主机发送了链路信息,并等待控制主机返回路由表。控制主机收到链路信息并更新网络拓扑结构,计算出每个节点的路由表并将其返回。主机每30秒向控制主机发送一次链路信息,同时也每30秒得到一次更新过后的路由表。

数据传输

根据路由表选择最短路径,转发数据包。选择发送数据包的节点和目的节点,如D向E发送一个数据包。D根据路由表查到下一跳为A,向A发送数据包;

A收到数据包,根据路由表下一跳为E,向E转发数据包;

E最终收到D发送的数据包。数据包的传递路径为D→A→E,这也是D到E的最短路径。

宕机检测

将主机A关闭,当控制主机在指定时间范围内(60s)都没有收到其发送的链路信息,控制主机就会判定A已经 down 掉了,并更新网络拓扑结构。

控制主机检测到A已经宕机,在为其它节点生成新的路由表时将不再包含有关A的路由。

RIP 协议

状态广播

启动控制主机,此时控制主机开始接受其他主机的信息,当接收到信息时,更新已有的路由表信息。启动网络中的主机(共5台)。控制主机接收到信息后,不断更新拓扑结构。

控制主机同时将计算得到的结果发送给相对应的主机,主机收到信息更新自己的路由表信息。

数据传输

根据路由表选择最短路径,转发数据包。选择发送数据包的节点和目的节点,如E向A发送一个数据包。E向A发送数据包;

A最终收到E发送的数据包。

宕机检测

将主机B关掉,在一段时间(60s)后,控制主机没有收到其路由信息,判断主机B已经 down 掉,更新网络拓扑图。

B没有出现在拓扑图中,并且,对于主机E,起初会将达到B和经过B的跳数设为16,表示不可达。之后,尝试寻找跳数更小的路径。


实验演示

为了更好的阐述我们的实验执行情况,我们把我们的实验 demo 传到了 Youtube 上