Package killerbee :: Module kbutils
[hide private]
[frames] | no frames]

Source Code for Module killerbee.kbutils

  1  # Import USB support depending on version of pyUSB 
  2  try: 
  3      import usb.core 
  4      import usb.util 
  5      #import usb.backend.libusb01 
  6      #backend = usb.backend.libusb01.get_backend() 
  7      USBVER=1 
  8  except ImportError: 
  9      import usb 
 10      #print("Warning: You are using pyUSB 0.x, future deprecation planned.") 
 11      USBVER=0 
 12   
 13  import serial 
 14  import os, glob 
 15  import time 
 16  import random 
 17  from struct import pack 
 18   
 19  from config import *       #to get DEV_ENABLE_* variables 
 20   
 21  # Known devices by USB ID: 
 22  RZ_USB_VEND_ID      = 0x03EB 
 23  RZ_USB_PROD_ID      = 0x210A 
 24  ZN_USB_VEND_ID      = 0x04D8 
 25  ZN_USB_PROD_ID      = 0x000E 
 26  #FTDI_USB_VEND_ID      = 0x0403 
 27  #FTDI_USB_PROD_ID      = 0x6001 #this is also used by FDTI cables used to attach gps 
 28  FTDI_X_USB_VEND_ID  = 0x0403 
 29  FTDI_X_USB_PROD_ID  = 0x6015    #api-mote FTDI chip 
 30   
 31  usbVendorList  = [RZ_USB_VEND_ID, ZN_USB_VEND_ID] 
 32  usbProductList = [RZ_USB_PROD_ID, ZN_USB_PROD_ID] 
 33   
 34  # Global variables 
 35  gps_devstring = None 
 36   
37 -class KBCapabilities:
38 ''' 39 Class to store and report on the capabilities of a specific KillerBee device. 40 ''' 41 NONE = 0x00 #: Capabilities Flag: No Capabilities 42 SNIFF = 0x01 #: Capabilities Flag: Can Sniff 43 SETCHAN = 0x02 #: Capabilities Flag: Can Set the Channel 44 INJECT = 0x03 #: Capabilities Flag: Can Inject Frames 45 PHYJAM = 0x04 #: Capabilities Flag: Can Jam PHY Layer 46 SELFACK = 0x05 #: Capabilities Flag: Can ACK Frames Automatically 47 PHYJAM_REFLEX = 0x06 #: Capabilities Flag: Can Jam PHY Layer Reflexively 48 SET_SYNC = 0x07 #: Capabilities Flag: Can set the register controlling 802.15.4 sync byte 49 FREQ_2400 = 0x08 #: Capabilities Flag: Can preform 2.4 GHz sniffing (ch 11-26) 50 FREQ_900 = 0x09 #: Capabilities Flag: Can preform 900 MHz sniffing (ch 1-10)
51 - def __init__(self):
52 self._capabilities = { 53 self.NONE : False, 54 self.SNIFF : False, 55 self.SETCHAN : False, 56 self.INJECT : False, 57 self.PHYJAM : False, 58 self.SELFACK: False, 59 self.PHYJAM_REFLEX: False, 60 self.SET_SYNC: False, 61 self.FREQ_2400: False, 62 self.FREQ_900: False }
63 - def check(self, capab):
64 if capab in self._capabilities: 65 return self._capabilities[capab] 66 else: 67 return False
68 - def getlist(self):
69 return self._capabilities
70 - def setcapab(self, capab, value):
71 self._capabilities[capab] = value
72 - def require(self, capab):
73 if self.check(capab) != True: 74 raise Exception('Selected hardware does not support required capability (%d).' % capab)
75 - def is_valid_channel(self, channel):
76 ''' 77 Based on sniffer capabilities, return if this is an OK channel number. 78 @rtype: Boolean 79 ''' 80 if (channel >= 11 or channel <= 26) and self.check(self.FREQ_2400): 81 return True 82 elif (channel >= 1 or channel <= 10) and self.check(self.FREQ_900): 83 return True 84 return False
85
86 -class findFromList(object):
87 ''' 88 Custom matching function for pyUSB 1.x. 89 Used by usb.core.find's custom_match parameter. 90 '''
91 - def __init__(self, vendors_, products_):
92 '''Takes a list of vendor IDs and product IDs.''' 93 self._vendors = vendors_ 94 self._products = products_
95 - def __call__(self, device):
96 ''' 97 Returns True if the device being searched 98 is in these lists. 99 ''' 100 if (device.idVendor in self._vendors) and \ 101 (device.idProduct in self._products): 102 return True 103 return False
104
105 -class findFromListAndBusDevId(findFromList):
106 ''' 107 Custom matching function for pyUSB 1.x. 108 Used by usb.core.find's custom_match parameter. 109 '''
110 - def __init__(self, busNum_, devNum_, vendors_, products_):
111 '''Takes a list of vendor IDs and product IDs.''' 112 findFromList.__init__(self, vendors_, products_) 113 self._busNum = busNum_ 114 self._devNum = devNum_
115 - def __call__(self, device):
116 ''' 117 Returns True if the device being searched 118 is in these lists. 119 ''' 120 if findFromList.__call__(self, device) and \ 121 (self._busNum == None or device.bus == self._busNum) and \ 122 (self._devNum == None or device.address == self._devNum) : 123 return True 124 return False
125
126 -def devlist_usb_v1x(vendor=None, product=None):
127 ''' 128 Private function. Do not call from tools/scripts/etc. 129 ''' 130 devlist = [] 131 if vendor == None: vendor = usbVendorList 132 else: vendor = [vendor] 133 if product == None: product = usbProductList 134 else: product = [product] 135 devs = usb.core.find(find_all=True, custom_match=findFromList(vendor, product)) #backend=backend, 136 try: 137 for dev in devs: 138 # Note, can use "{0:03d}:{1:03d}" to get the old format, 139 # but have decided to move to the new, shorter format. 140 devlist.append(["{0}:{1}".format(dev.bus, dev.address), \ 141 usb.util.get_string(dev, 50, dev.iProduct), \ 142 usb.util.get_string(dev, 50, dev.iSerialNumber)]) 143 except usb.core.USBError as e: 144 if e.errno == 13: #usb.core.USBError: [Errno 13] Access denied (insufficient permissions) 145 raise Exception("Unable to open device. " + 146 "Ensure the device is free and plugged-in. You may need sudo.") 147 else: 148 raise e 149 150 return devlist
151
152 -def devlist_usb_v0x(vendor=None, product=None):
153 ''' 154 Private function. Do not call from tools/scripts/etc. 155 ''' 156 devlist = [] 157 busses = usb.busses() 158 for bus in busses: 159 devices = bus.devices 160 for dev in devices: 161 if ((vendor==None and dev.idVendor in usbVendorList) or dev.idVendor==vendor) \ 162 and ((product==None and dev.idProduct in usbProductList) or dev.idProduct==product): 163 devlist.append([''.join([bus.dirname + ":" + dev.filename]), \ 164 dev.open().getString(dev.iProduct, 50), \ 165 dev.open().getString(dev.iSerialNumber, 12)]) 166 return devlist
167
168 -def isIpAddr(ip):
169 '''Return True if the given string is a valid IPv4 or IPv6 address.''' 170 import socket 171 def is_valid_ipv4_address(address): 172 try: socket.inet_pton(socket.AF_INET, address) 173 except AttributeError: # no inet_pton here, sorry 174 try: socket.inet_aton(address) 175 except socket.error: return False 176 return (address.count('.') == 3) 177 except socket.error: return False 178 return True
179 def is_valid_ipv6_address(address): 180 try: socket.inet_pton(socket.AF_INET6, address) 181 except socket.error: return False 182 return True 183 return ( is_valid_ipv6_address(ip) or is_valid_ipv4_address(ip) ) 184
185 -def devlist(vendor=None, product=None, gps=None, include=None):
186 ''' 187 Return device information for all present devices, 188 filtering if requested by vendor and/or product IDs on USB devices, and 189 running device fingerprint functions on serial devices. 190 @type gps: String 191 @param gps: Optional serial device identifier for an attached GPS 192 unit. If provided, or if global variable has previously been set, 193 KillerBee skips that device in device enumeration process. 194 @type include: List of Strings 195 @param include: Optional list of device handles to be appended to the 196 normally found devices. This is useful for providing IP addresses for 197 remote scanners. 198 @rtype: List 199 @return: List of device information present. 200 For USB devices, get [busdir:devfilename, productString, serialNumber] 201 For serial devices, get [serialFileName, deviceDescription, ""] 202 ''' 203 global usbVendorList, usbProductList, gps_devstring 204 if gps is not None and gps_devstring is None: 205 gps_devstring = gps 206 devlist = [] 207 208 if USBVER == 0: 209 devlist = devlist_usb_v0x(vendor, product) 210 elif USBVER == 1: 211 devlist = devlist_usb_v1x(vendor, product) 212 213 for serialdev in get_serial_ports(include=include): 214 if serialdev == gps_devstring: 215 print("kbutils.devlist is skipping ignored/GPS device string {0}".format(serialdev)) #TODO remove debugging print 216 continue 217 elif (DEV_ENABLE_ZIGDUINO and iszigduino(serialdev)): 218 devlist.append([serialdev, "Zigduino", ""]) 219 elif (DEV_ENABLE_FREAKDUINO and isfreakduino(serialdev)): 220 #TODO maybe move support for freakduino into goodfetccspi subtype==? 221 devlist.append([serialdev, "Dartmouth Freakduino", ""]) 222 else: 223 gfccspi,subtype = isgoodfetccspi(serialdev) 224 if gfccspi and subtype == 0: 225 devlist.append([serialdev, "GoodFET TelosB/Tmote", ""]) 226 elif gfccspi and subtype == 1: 227 devlist.append([serialdev, "GoodFET Api-Mote v1", ""]) 228 elif gfccspi and subtype == 2: 229 devlist.append([serialdev, "GoodFET Api-Mote v2", ""]) 230 elif gfccspi: 231 print("kbutils.devlist has an unknown type of GoodFET CCSPI device ({0}).".format(serialdev)) 232 233 if include is not None: 234 # Ugly nested load, so we don't load this class when unneeded! 235 import dev_wislab #use isWislab, getFirmwareVersion 236 for ipaddr in filter(isIpAddr, include): 237 if dev_wislab.isWislab(ipaddr): 238 devlist.append([ipaddr, "Wislab Sniffer v{0}".format(dev_wislab.getFirmwareVersion(ipaddr)), dev_wislab.getMacAddr(ipaddr)]) 239 #NOTE: Enumerations of other IP connected sniffers go here. 240 else: 241 print("kbutils.devlist has an unknown type of IP sniffer device ({0}).".format(ipaddr)) 242 243 return devlist
244
245 -def get_serial_devs(seriallist):
246 global DEV_ENABLE_FREAKDUINO, DEV_ENABLE_ZIGDUINO
247 #TODO Continue moving code from line 163:181 here, yielding results 248
249 -def isSerialDeviceString(s):
250 return ( ( s.count('/') + s.count('tty') ) > 0 )
251
252 -def get_serial_ports(include=None):
253 ''' 254 Private function. Do not call from tools/scripts/etc. 255 This should return a list of device paths for serial devices that we are 256 interested in, aka USB serial devices using FTDI chips such as the TelosB, 257 ApiMote, etc. This should handle returning a list of devices regardless of 258 the *nix it is running on. Support for more *nix and winnt needed. 259 260 @type include: List of Strings, or None 261 @param include: A list of device strings, of which any which appear to be 262 serial device handles will be added to the set of serial ports returned 263 by the normal search. This may be useful if we're not including some 264 oddly named serial port which you have a KillerBee device on. Optional. 265 ''' 266 seriallist = glob.glob("/dev/ttyUSB*") + glob.glob("/dev/tty.usbserial*") #TODO make cross platform globing/winnt 267 if include is not None: 268 seriallist = list( set(seriallist).union(set(filter(isSerialDeviceString, include))) ) 269 return seriallist
270
271 -def isgoodfetccspi(serialdev):
272 ''' 273 Determine if a given serial device is running the GoodFET firmware with the CCSPI application. 274 This should either be a TelosB/Tmote Sky GOODFET or an Api-Mote design. 275 @type serialdev: String 276 @param serialdev: Path to a serial device, ex /dev/ttyUSB0. 277 @rtype: Tuple 278 @returns: Tuple with the fist element==True if it is some goodfetccspi device. The second element 279 is the subtype, and is 0 for telosb devices and 1 for apimote devices. 280 ''' 281 #TODO reduce code, perhaps into loop iterating over board configs 282 from GoodFETCCSPI import GoodFETCCSPI 283 os.environ["platform"] = "" 284 # First try tmote detection 285 os.environ["board"] = "telosb" #set enviroment variable for GoodFET code to use 286 gf = GoodFETCCSPI() 287 try: 288 gf.serInit(port=serialdev, attemptlimit=2) 289 except serial.serialutil.SerialException as e: 290 raise KBInterfaceError("Serial issue in kbutils.isgoodfetccspi: %s." % e) 291 if gf.connected == 1: 292 #print "TelosB/Tmote attempts: found %s on %s" % (gf.identstr(), serialdev) 293 # now check if ccspi app is installed 294 out = gf.writecmd(gf.CCSPIAPP, 0, 0, None) 295 gf.serClose() 296 if (gf.app == gf.CCSPIAPP) and (gf.verb == 0): 297 return True, 0 298 # Try apimote v2 detection 299 os.environ["board"] = "apimote2" #set enviroment variable for GoodFET code to use 300 gf = GoodFETCCSPI() 301 try: 302 gf.serInit(port=serialdev, attemptlimit=2) 303 #gf.setup() 304 except serial.serialutil.SerialException as e: 305 raise KBInterfaceError("Serial issue in kbutils.isgoodfetccspi: %s." % e) 306 if gf.connected == 1: 307 #print "ApiMotev2+ attempts: found %s on %s" % (gf.identstr(), serialdev) 308 # now check if ccspi app is installed 309 out = gf.writecmd(gf.CCSPIAPP, 0, 0, None) 310 gf.serClose() 311 if (gf.app == gf.CCSPIAPP) and (gf.verb == 0): 312 return True, 2 313 # Then try apimote v1 detection 314 os.environ["board"] = "apimote1" #set enviroment variable for GoodFET code to use 315 gf = GoodFETCCSPI() 316 try: 317 #TODO note that in ApiMote v1, this connect appears to be tricky sometimes 318 # thus attempt limit is raised to 4 for now 319 # manually verify the hardware is working by using direct GoodFET client commands, such as: 320 # export board=apimote1; ./goodfet.ccspi info; ./goodfet.ccspi spectrum 321 gf.serInit(port=serialdev, attemptlimit=4) 322 #gf.setup() 323 #print "Found %s on %s" % (gf.identstr(), serialdev) 324 except serial.serialutil.SerialException as e: 325 raise KBInterfaceError("Serial issue in kbutils.isgoodfetccspi: %s." % e) 326 if gf.connected == 1: 327 #print "ApiMotev1 attempts: found %s on %s" % (gf.identstr(), serialdev) 328 # now check if ccspi app is installed 329 out = gf.writecmd(gf.CCSPIAPP, 0, 0, None) 330 gf.serClose() 331 if (gf.app == gf.CCSPIAPP) and (gf.verb == 0): 332 return True, 1 333 # Nothing found 334 return False, None
335
336 -def iszigduino(serialdev):
337 ''' 338 Determine if a given serial device is running the GoodFET firmware with the atmel_radio application. 339 This should be a Zigduino (only tested on hardware r1 currently). 340 @type serialdev: String 341 @param serialdev: Path to a serial device, ex /dev/ttyUSB0. 342 @rtype: Boolean 343 @returns: Boolean with the fist element==True if it is a goodfet atmel128 device. 344 ''' 345 # TODO why does this only work every-other time zbid is invoked? 346 from GoodFETatmel128 import GoodFETatmel128rfa1 347 os.environ["platform"] = "zigduino" 348 gf = GoodFETatmel128rfa1() 349 try: 350 gf.serInit(port=serialdev, attemptlimit=2) 351 except serial.serialutil.SerialException as e: 352 raise KBInterfaceError("Serial issue in kbutils.iszigduino: %s." % e) 353 if gf.connected == 1: 354 out = gf.writecmd(gf.ATMELRADIOAPP, 0x10, 0, None) 355 gf.serClose() 356 if (gf.app == gf.ATMELRADIOAPP) and (gf.verb == 0x10): #check if ATMELRADIOAPP exists 357 return True 358 return False
359
360 -def isfreakduino(serialdev):
361 ''' 362 Determine if a given serial device is a Freakduino attached with the right sketch loaded. 363 @type serialdev: String 364 @param serialdev: Path to a serial device, ex /dev/ttyUSB0. 365 @rtype: Boolean 366 ''' 367 s = serial.Serial(port=serialdev, baudrate=57600, timeout=1, bytesize=8, parity='N', stopbits=1, xonxoff=0) 368 time.sleep(1.5) 369 s.write('SC!V\r') 370 time.sleep(1.5) 371 #readline should take an eol argument, per: 372 # http://pyserial.sourceforge.net/pyserial_api.html#serial.FileLike.readline 373 # However, many got an "TypeError: readline() takes no keyword arguments" due to a pySerial error 374 # So we have replaced it with a bruteforce method. Old: s.readline(eol='&') 375 for i in range(100): 376 if (s.read() == '&'): break 377 if s.read(3) == 'C!V': version = s.read() 378 else: version = None 379 s.close() 380 return (version is not None)
381
382 -def search_usb(device):
383 ''' 384 Takes either None, specifying that any USB device in the 385 global vendor and product lists are acceptable, or takes 386 a string that identifies a device in the format 387 <BusNumber>:<DeviceNumber>, and returns the pyUSB objects 388 for bus and device that correspond to the identifier string. 389 ''' 390 if device == None: 391 busNum = None 392 devNum = None 393 else: 394 if ':' not in device: 395 raise KBInterfaceError("USB device format expects <BusNumber>:<DeviceNumber>, but got {0} instead.".format(device)) 396 busNum, devNum = map(int, device.split(':', 1)) 397 if USBVER == 0: 398 busses = usb.busses() 399 for bus in busses: 400 dev = search_usb_bus_v0x(bus, busNum, devNum) 401 if dev != None: 402 return (bus, dev) 403 return None #Note, can't expect a tuple returned 404 elif USBVER == 1: 405 return usb.core.find(custom_match=findFromListAndBusDevId(busNum, devNum, usbVendorList, usbProductList)) #backend=backend, 406 else: 407 raise Exception("USB version expected to be 0.x or 1.x.")
408
409 -def search_usb_bus_v0x(bus, busNum, devNum):
410 '''Helper function for USB enumeration in pyUSB 0.x enviroments.''' 411 devices = bus.devices 412 for dev in devices: 413 if (dev.idVendor in usbVendorList) and (dev.idProduct in usbProductList): 414 #Populate the capability information for this device later, when driver is initialized 415 if devNum == None: 416 return dev 417 elif busNum == int(bus.dirname) and devNum == int(dev.filename): 418 #print "Choose device", bus.dirname, dev.filename, "to initialize KillerBee instance on." 419 return dev 420 return None
421
422 -def hexdump(src, length=16):
423 ''' 424 Creates a tcpdump-style hex dump string output. 425 @type src: String 426 @param src: Input string to convert to hexdump output. 427 @type length: Int 428 @param length: Optional length of data for a single row of output, def=16 429 @rtype: String 430 ''' 431 FILTER = ''.join([(len(repr(chr(x))) == 3) and chr(x) or '.' for x in range(256)]) 432 result = [] 433 for i in xrange(0, len(src), length): 434 chars = src[i:i+length] 435 hex = ' '.join(["%02x" % ord(x) for x in chars]) 436 printable = ''.join(["%s" % ((ord(x) <= 127 and FILTER[ord(x)]) or '.') for x in chars]) 437 result.append("%04x: %-*s %s\n" % (i, length*3, hex, printable)) 438 return ''.join(result)
439
440 -def randbytes(size):
441 ''' 442 Returns a random string of size bytes. Not cryptographically safe. 443 @type size: Int 444 @param size: Length of random data to return. 445 @rtype: String 446 ''' 447 return ''.join(chr(random.randrange(0,256)) for i in xrange(size))
448
449 -def randmac(length=8):
450 ''' 451 Returns a random MAC address using a list valid OUI's from ZigBee device 452 manufacturers. Data is returned in air-format byte order (LSB first). 453 @type length: String 454 @param length: Optional length of MAC address, def=8. 455 Minimum address return length is 3 bytes for the valid OUI. 456 @rtype: String 457 @returns: A randomized MAC address in a little-endian byte string. 458 ''' 459 # Valid OUI prefixes for MAC addresses 460 prefixes = [ "\x00\x0d\x6f", # Ember 461 "\x00\x12\x4b", # TI 462 "\x00\x04\xa3", # Microchip 463 "\x00\x04\x25", # Atmel 464 "\x00\x11\x7d", # ZMD 465 "\x00\x13\xa2", # MaxStream 466 "\x00\x30\x66", # Cirronet 467 "\x00\x0b\x57", # Silicon Laboratories 468 "\x00\x04\x9f", # Freescale Semiconductor 469 "\x00\x21\xed", # Telegesis 470 "\x00\xa0\x50" # Cypress 471 ] 472 473 prefix = random.choice(prefixes) 474 suffix = randbytes(length-3) 475 # Reverse the address for use in a packet 476 return ''.join([prefix, suffix])[::-1]
477
478 -def makeFCS(data):
479 ''' 480 Do a CRC-CCITT Kermit 16bit on the data given 481 Implemented using pseudocode from: June 1986, Kermit Protocol Manual 482 See also: http://regregex.bbcmicro.net/crc-catalogue.htm#crc.cat.kermit 483 484 @return: a CRC that is the FCS for the frame, as two hex bytes in 485 little-endian order. 486 ''' 487 crc = 0 488 for i in xrange(len(data)): 489 c = ord(data[i]) 490 #if (A PARITY BIT EXISTS): c = c & 127 #Mask off any parity bit 491 q = (crc ^ c) & 15 #Do low-order 4 bits 492 crc = (crc // 16) ^ (q * 4225) 493 q = (crc ^ (c // 16)) & 15 #And high 4 bits 494 crc = (crc // 16) ^ (q * 4225) 495 return pack('<H', crc) #return as bytes in little endian order
496
497 -class KBException(Exception):
498 '''Base class for all KillerBee specific exceptions.''' 499 pass
500
501 -class KBInterfaceError(KBException):
502 ''' 503 Custom exception for KillerBee having issues communicating 504 with an interface, such as opening a port, syncing with the firmware, etc. 505 ''' 506 pass
507