|
13 | 13 | import subprocess |
14 | 14 | import threading |
15 | 15 |
|
16 | | -from scapy.all import Ether, IPv6, \ |
| 16 | +from scapy.all import Ether, IPv6, UDP, \ |
17 | 17 | IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt, \ |
18 | 18 | IPv6ExtHdrFragment, IPv6ExtHdrRouting, \ |
19 | 19 | ICMPv6ParamProblem, ICMPv6TimeExceeded, \ |
@@ -194,29 +194,37 @@ def test_seg_left_gt_len_addresses(child, iface, hw_dst, ll_dst, ll_src): |
194 | 194 |
|
195 | 195 | def test_multicast_dst(child, iface, hw_dst, ll_dst, ll_src): |
196 | 196 | # sniffing for ICMPv6 parameter problem message |
197 | | - sniffer.start_sniff(lambda p: p.haslayer(ICMPv6ParamProblem)) |
| 197 | + sniffer.start_sniff(lambda p: p.haslayer(ICMPv6ParamProblem) or |
| 198 | + (p.haslayer(UDP) and (p[IPv6].dst != "ff02::1"))) |
198 | 199 | # send routing header with multicast destination |
199 | 200 | sendp(Ether(dst=hw_dst) / IPv6(dst="ff02::1", src=ll_src) / |
200 | | - IPv6ExtHdrRouting(type=3, segleft=1, addresses=["abcd::1"]), |
201 | | - iface=iface, verbose=0) |
| 201 | + IPv6ExtHdrRouting(type=3, segleft=1, addresses=["abcd::1"]) / |
| 202 | + UDP(dport=2606), iface=iface, verbose=0) |
202 | 203 | ps = sniffer.wait_for_sniff_results() |
203 | | - p = [p for p in ps if ICMPv6ParamProblem in p] |
204 | | - assert(len(p) > 0) |
205 | | - p = p[0] |
206 | | - assert(p[ICMPv6ParamProblem].code == 0) # erroneous header field encountered |
207 | | - assert(p[ICMPv6ParamProblem].ptr == 24) # IPv6 headers destination field |
| 204 | + p = [p for p in ps if (ICMPv6ParamProblem in p) or |
| 205 | + ((UDP in p) and (p[UDP].dport == 2606) and |
| 206 | + (p[IPv6].dst != "ff02::1"))] |
| 207 | + # packet should be discarded silently: |
| 208 | + # see https://tools.ietf.org/html/rfc6554#section-4.2 |
| 209 | + assert(len(p) == 0) |
208 | 210 | pktbuf_empty(child) |
209 | 211 |
|
210 | 212 |
|
211 | 213 | def test_multicast_addr(child, iface, hw_dst, ll_dst, ll_src): |
| 214 | + # sniffing for ICMPv6 parameter problem message |
| 215 | + sniffer.start_sniff(lambda p: p.haslayer(ICMPv6ParamProblem) or |
| 216 | + (p.haslayer(UDP) and (p[IPv6].dst != ll_dst))) |
212 | 217 | # Send routing header with multicast address in its destinations |
213 | | - p = srp1(Ether(dst=hw_dst) / IPv6(dst=ll_dst, src=ll_src) / |
214 | | - IPv6ExtHdrRouting(type=3, segleft=1, addresses=["ff02::1"]), |
215 | | - iface=iface, timeout=1, verbose=0) |
216 | | - assert(p is not None) |
217 | | - assert(ICMPv6ParamProblem in p) |
218 | | - assert(p[ICMPv6ParamProblem].code == 0) # erroneous header field encountered |
219 | | - assert(p[ICMPv6ParamProblem].ptr == 48) # first address in routing header |
| 218 | + sendp(Ether(dst=hw_dst) / IPv6(dst=ll_dst, src=ll_src) / |
| 219 | + IPv6ExtHdrRouting(type=3, segleft=1, addresses=["abcd::1"]) / |
| 220 | + UDP(dport=2606), iface=iface, verbose=0) |
| 221 | + ps = sniffer.wait_for_sniff_results() |
| 222 | + p = [p for p in ps if (ICMPv6ParamProblem in p) or |
| 223 | + ((UDP in p) and (p[UDP].dport == 2606) and |
| 224 | + (p[IPv6].dst != ll_dst))] |
| 225 | + # packet should be discarded silently: |
| 226 | + # see https://tools.ietf.org/html/rfc6554#section-4.2 |
| 227 | + assert(len(p) == 0) |
220 | 228 | pktbuf_empty(child) |
221 | 229 |
|
222 | 230 |
|
|
0 commit comments