Below is the file 'scan.py' from this revision. You can also download the file.
#!/usr/bin/python from goatpy.nmapwrapper import nmap from goatpy.gen2consume import gen2consume import getopt import ConfigParser import threading import select import popen2 import fcntl import time import sets import sys import os import config class TextNotify: def start_scan(self): print "Starting scan." def complete_scan(self): print "Scan complete." def start_host(self, thread_id, host): print "[%2d] starting to scan: %-15s (T:%s U:%s)" % (thread_id, host.addresses[0][0], str(host.tcp_ports), str(host.udp_ports)) def complete_host(self, thread_id, host): print "[%2d] %-15s: done with this host" % (thread_id, host.addresses[0][0]) def start_plugin(self, thread_id, plugin, host): print "[%2d] %-15s: run plugin %s" % (thread_id, host.addresses[0][0], plugin.export['name']) def complete_plugin(self, thread_id, host): pass def start_writing_results(self, thread_id, host): pass def complete_writing_results(self, thread_id, host): pass def notice(self, str): print "%s" % (str) notification = None # urgency urgency_info = 0 urgency_notice = 1 urgency_warning = 2 urgency_alert = 3 urgency_critical = 4 urgency_to_string = { urgency_info : 'Info', urgency_notice : 'Notice', urgency_warning : 'Warning', urgency_alert : 'Alert', urgency_critical : 'CRITICAL' } # plugins should provide a host_callback method that takes args: # scanner : an instance of Scanner (as below) # host : Host() instance # tcp_ports : open, interesting ports for this plugin # udp_ports : open, interesting ports for this plugin # they should return a list of ScannerResponse objects def set_nonblocking(fd): fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NDELAY) def scan_host(mythread, host, library): # if the magic UDP port is open, then ignore any open UDP ports if config.magic_udp_port in host.udp_ports: host.udp_ports = [] plugins = library.plugins_for_ports(host.tcp_ports, host.udp_ports) notification.start_host(mythread.id, host) errors = [] results = [] for plugin in plugins: tcp_ports = filter(lambda x: x in plugins[plugin][0], host.tcp_ports) udp_ports = filter(lambda x: x in plugins[plugin][1], host.udp_ports) try: notification.start_plugin(mythread.id, plugin, host) mythread.status_string = "in plugin %s" % (plugin.export['name']) result = plugin.host_callback(library, host, tcp_ports, udp_ports) mythread.status_string = "completed plugin %s" % (plugin.export['name']) notification.complete_plugin(mythread.id, host) results += result except: import traceback t,v,tr = sys.exc_info() err_data = '\n'.join(traceback.format_exception(t, v, tr)) errors.append((plugin.export['name'], err_data)) mythread.status_string = "writing results" notification.start_writing_results(mythread.id, host) for plugin in library.output_plugins: plugin.write_results(library, host, library.scanner_names, results) plugin.write_errors(library, host, errors) notification.complete_writing_results(mythread.id, host) mythread.status_string = None notification.complete_host(mythread.id, host) # clean up memory explicitly, as otherwise Python might not garbage collect # quickly enough for us. del results del errors # (I hate Unix) : setting a SIGCHLD handler will start me getting # lots of exceptions whenver I do blah.read(), select(), etc.. which # makes the code of this program so _massively_ ugly that I'll just # call wait occassionally and it'll probably even out # # You can't just call wait() in a thread that does a SIGCHLD because # Unix won't let you. Damnit. class WaitThread(threading.Thread): def __init__(self): for base in self.__class__.__bases__: base.__init__(self) self.setDaemon(True) def run(self): while 1: try: procs = os.wait() notification.notice("Processes have terminated: %s" % (str(procs))) except: pass # for some reason, sometimes this fails unless # this check is here. A bit mysterious. if time != None: time.sleep(1) class PluginLibrary: def __init__(self, config): self.config = config self.urgency_to_string = urgency_to_string orig_path = None self.output_plugins = [] self.tcp_to_plugin = {} self.udp_to_plugin = {} orig_path = sys.path sys.path.insert(0, self.config.plugin_path) self.scanner_names = [] for plugin in self.config.plugins: mod = self.do_load_plugin(plugin) if mod.export['type'] == "scanner": self.link_ports(mod, mod.export['tcp_ports'], self.tcp_to_plugin) self.link_ports(mod, mod.export['udp_ports'], self.udp_to_plugin) self.scanner_names.append(mod.export['name']) elif mod.export['type'] == "output": mod.initialise(self) self.output_plugins.append(mod) if orig_path: sys.path = orig_path def finalise_plugins(self): map(lambda x: x.finalise(self), self.output_plugins) def link_ports(self, mod, port_list, hash): if not port_list: return for port in port_list: if not hash.has_key(port): hash[port] = [] hash[port].append(mod) def get_ports(self): return self.tcp_to_plugin.keys(), self.udp_to_plugin.keys() def do_load_plugin(self, plugin_name): mod = __import__('%s' % plugin_name, globals(), locals(), ['']) return mod def plugins_for_ports(self, tcp_ports, udp_ports): rv = {} for port in tcp_ports: for module in self.tcp_to_plugin.get(port, []): if not rv.has_key(module): rv[module] = ([], []) rv[module][0].append(port) for port in udp_ports: for module in self.udp_to_plugin.get(port, []): if not rv.has_key(module): rv[module] = ([], []) rv[module][1].append(port) return rv class ScannerResponse: def __init__(self): self.address = None self.plugin_name = None self.tcp_ports = None self.udp_ports = None self.urgency = None self.short_mesg = None self.long_mesg = None self.advice = None def text_summary(self): if self.urgency == None or self.plugin_name == None: return ports = [] if self.tcp_ports and len(self.tcp_ports): ports.append("T:" + ','.join([str(t) for t in self.tcp_ports])) if self.udp_ports and len(self.udp_ports): ports.append("U:" + ','.join([str(t) for t in self.udp_ports])) return "%-10s %-10s %-10s %s" % (','.join(ports), self.plugin_name, urgency_to_string[self.urgency], self.short_mesg) class Scanner: def __init__(self, config_path): self._config_path = config_path self.config = config self.library = PluginLibrary(self.config) def scan(self): nmap_command = self.config.nmap_command + ' ' + self.config.nmap_options ports = [] tcp_ports, udp_ports = self.library.get_ports() if len(tcp_ports): ports.append("T:" + ','.join([str(t) for t in tcp_ports])) if len(udp_ports): udp_ports.append(config.magic_udp_port) ports.append("U:" + ','.join([str(t) for t in udp_ports])) if not len(ports): notification.notice("Nothing to do; no ports requested by plugins.") return nmap_command += ' -p ' + ','.join(ports) nmap_command += ' -oX -' range = ' '.join(self.config.scan_networks) nmap_command += ' ' + range notification.start_scan() notification.notice("Scanner command is: %s" % (nmap_command)) gt = gen2consume(nmap(nmap_command), self.config.max_threads, lambda thread, host: scan_host(thread, host, self.library)) gt.join() countdown = self.config.thread_wait_time while countdown > 0: running = filter(lambda x: x.isAlive(), threading.enumerate()) has_status = filter(lambda x: "status_string" in dir(x) and x.status_string != None, running) cnt = len(running) reasons = map(lambda x: x.status_string, has_status) notification.notice("[%2d secs] Waiting for %d threads (%s)" % (countdown, cnt, ','.join(reasons))) if cnt == 2: break countdown -= 1 time.sleep(1) notification.notice("Finalising output plugins.") self.library.finalise_plugins() notification.notice("Done!") notification.complete_scan() if __name__ == '__main__': wt = WaitThread() wt.start() matched, remain = getopt.getopt(sys.argv[1:], "c:x") output_mode = "text" config_file = 'scanner.cfg' for opt, value in matched: if opt == '-c': config_file = value elif opt == '-g': output_mode = "graphical" if output_mode == "graphical": notification = GraphicalNotify() else: notification = TextNotify() scanner = Scanner(config_file) scanner.scan() sys.exit(0)