forked from device42/traceroute_tags
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtraceroute.py
132 lines (113 loc) · 3.8 KB
/
traceroute.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import socket
import random
import struct
import time
import platform
import os
import re
__all__ = ['Tracer']
class Tracer(object):
def __init__(self, dst, hops=30, timeout=100):
"""
Initializes a new tracer object
Args:
dst (str): Destination host to probe
hops (int): Max number of hops to probe
"""
self.dst = dst
self.hops = hops
self.timeout = timeout
self.ttl = 1
# Pick up a random port in the range 33434-33534
self.port = random.choice(range(33434, 33535))
def run(self):
"""
Run the tracer
Raises:
IOError
"""
if platform.system().lower() == "linux":
try:
dst_ip = socket.gethostbyname(self.dst)
except socket.error as e:
raise IOError('Unable to resolve {}: {}', self.dst, e)
text = 'traceroute to {} ({}), {} hops max'.format(
self.dst,
dst_ip,
self.hops
)
print(text)
last_ip = None
while True:
# startTimer = time.time()
receiver = self.create_receiver()
sender = self.create_sender()
sender.sendto(b'', (self.dst, self.port))
addr = None
try:
data, addr = receiver.recvfrom(1024)
# entTimer = time.time()
except socket.error as e:
print(str(e))
pass
# raise IOError('Socket error: {}'.format(e))
finally:
receiver.close()
sender.close()
if addr:
last_ip = addr[0]
if addr[0] == dst_ip:
return True, last_ip
else:
print('{:<4} *'.format(self.ttl))
self.ttl += 1
if self.ttl > self.hops:
return False, last_ip
else: # Windows
cmd = "tracert -d -h %d -w %d %s" % (self.hops, self.timeout, self.dst)
last_ip = None
p = os.popen(cmd)
output = p.read()
lines = output.splitlines()
for line in lines:
x = re.search(r"^\s*\d+.*$", line)
if x is not None:
ip_addr = re.findall(r'[0-9]+(?:\.[0-9]+){3}', line)
if len(ip_addr) > 0:
last_ip = ip_addr[0]
if ip_addr[-1] == self.dst:
return True, last_ip
return False, last_ip
def create_receiver(self):
"""
Creates a receiver socket
Returns:
A socket instance
Raises:
IOError
"""
s = socket.socket(
family=socket.AF_INET,
type=socket.SOCK_RAW,
proto=socket.IPPROTO_ICMP
)
timeout = struct.pack("ll", int(self.timeout / 1000), self.timeout % 1000 * 1000)
s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, timeout)
try:
s.bind(('', self.port))
except socket.error as e:
raise IOError('Unable to bind receiver socket: {}'.format(e))
return s
def create_sender(self):
"""
Creates a sender socket
Returns:
A socket instance
"""
s = socket.socket(
family=socket.AF_INET,
type=socket.SOCK_DGRAM,
proto=socket.IPPROTO_UDP
)
s.setsockopt(socket.SOL_IP, socket.IP_TTL, self.ttl)
return s