Source code for net.peer

# -*- coding: utf-8 -*-

__all__ = [
    'Peer'
]

# std imports
import re
import sys
import copy
import json
import socket
import base64
import getpass
import traceback
import threading

# third party
import termcolor

# package imports
from .handler import PeerHandler
from .imports import socketserver, ConnectionRefusedError

# package imports
import net


# globals
SINGLETON = None

# utilities
ID_REGEX = re.compile(r"(?P<host>.+):(?P<port>\d+) -> (?P<group>.+)")

# threading
LOCK = threading.Lock()


# noinspection PyMissingConstructor
[docs]class _Peer(socketserver.ThreadingMixIn, socketserver.TCPServer, object): # adding to inheritance object for 2.7 support # store CONNECTIONS = {} SUBSCRIPTIONS = {} FLAGS = {}
[docs] @staticmethod def decode(byte_string): """ Decode a byte string sent from a peer. :param byte_string: base64 :return: str """ try: byte_string = base64.b64decode(byte_string).decode('ascii') byte_string = json.loads(byte_string) except (Exception, json.JSONDecodeError) as err: net.LOGGER.debug(byte_string) net.LOGGER.debug(err) net.LOGGER.debug(traceback.format_exc()) # if the connection returns data that is not prepackaged as a JSON object, return # the raw response as it originally was returned. if isinstance(byte_string, dict) and 'raw' in byte_string: return byte_string['raw'] return byte_string
[docs] @staticmethod def encode(obj): """ Encode an object for delivery. :param obj: JSON compatible types :return: str """ if not isinstance(obj, dict): try: if obj in Peer().FLAGS: return obj except TypeError: pass obj = {'raw': obj} # tag with the peer return base64.b64encode(json.dumps(obj).encode('ascii'))
@staticmethod def ports(): """ Generator; All ports defined in the environment. :return: int """ return [port for port in range(net.PORT_START, net.PORT_START + net.PORT_RANGE)] @staticmethod def ping(port, host=socket.gethostname()): """ Ping a port and check if it is alive or open. :param port: required port to hit :param host: host address default is 'localhost' :return: bool """ interface = socket.socket(socket.AF_INET, socket.SOCK_STREAM) interface.settimeout(0.25) try: interface.connect((host, port)) return True except ConnectionRefusedError: return False
[docs] @staticmethod def generate_id(port, host, group=None): """ Generate a peers id. :param port: int :param host: str :param group: str :return: base64 """ return base64.b64encode( '{host}:{port} -> {group}'.format( host=socket.gethostname() if not host else host, port=port, group=str(group), ).encode('ascii') )
# noinspection PyShadowingBuiltins
[docs] @staticmethod def decode_id(id): """ Decode a peer id :param id: base64 :return: dict {'group': str, 'host': str, 'port': int } """ expr = ID_REGEX.match(base64.b64decode(id).decode('ascii')).groupdict() return { 'group': expr['group'], 'host': expr['host'], 'port': int(expr['port']) }
@staticmethod def build_connection_name(connection): """ Build a connections full name based on the module/name of the function. This is then encoded in base64 for easier delivery between peers. :param connection: connection :return: base64 """ return base64.b64encode( '{0}.{1}'.format( connection.__module__, connection.__name__ ).encode('ascii') ) def __init__(self, launch=True, test=False, group=None): # mask with singleton catch unless being tested if SINGLETON and not test: raise RuntimeError( "Can not create a new peer in without shutting down the previous" " one. Please use net.Peer() instead." ) # find port self._host = net.HOST_IP self._port = self.scan_for_port() self._group = net.GROUP if not group else group self._is_hub = net.IS_HUB # handle threading self._thread = threading.Thread(target=self.serve_forever) self._thread.daemon = True # launch the peer if launch: self.launch() @property def hub(self): """ Defines if this peer acts as the hub for communication through the network. :return: bool """ return self._is_hub @property def group(self): """ Group this peer is assigned to. :return: str """ return self._group @property def port(self): """ Port that the peer is running on. :return: int """ return self._port @property def host(self): """ Host that the peer is running on. :return: str """ return self._host @property def id(self): """ Get this peers id. This is tethered to the port and the executable path the peer was launched with. This is base64 encoded for easier delivery. :return: base64 """ return self.generate_id(self.port, self.host, self.group) @property def friendly_id(self, peer_id=None): """ Get the peers id in a friendly displayable way. :return: str """ if not peer_id: peer_id = self.id # decode and hand back friendly = self.decode_id(peer_id) friendly['hub'] = self._is_hub friendly['executable'] = sys.executable friendly['user'] = getpass.getuser() return friendly def register_connection(self, connection, tag=None): """ Registers a connection with the global handler. Do not use this directly. Instead use the net.connect decorator. :func:`net.connect` :param tag: str :param connection: function :return: str """ if not tag: tag = self.build_connection_name(connection) else: tag = self.encode(tag) # add the connection to the connection registry. if tag in self.CONNECTIONS: net.LOGGER.warning( "Redefining a connection handler. Be aware, this could cause " "unexpected results." ) self.CONNECTIONS[tag] = connection return tag def register_subscriber(self, event, peer, connection): """ Registers the peer and connection to the peers subscription system. This is for internal use only, use the ``net.subscribe`` decorator instead. :param event: event id :param peer: peer id :param connection: connection id :return: None """ subscription = self.SUBSCRIPTIONS.setdefault(event, {}) peer_connection = subscription.setdefault(peer, []) peer_connection.append(connection) def register_flag(self, flag, handler): """ Registers a flag with the peer server. Flags are simple responses that can trigger error handling or logging. Do not use this directly. Instead use the net.flag decorator. @net.flag("SOME_ERROR") def your_next_function(peer, connection): raise SomeError( "This failed because {0} failed on the other peer.".format( connection ) ) :param flag: payload :param handler: function :return: base64 """ flag = base64.b64encode(flag.encode('ascii')) if flag in self.FLAGS: net.LOGGER.warning( "Redefining a flag handler. Be aware, this could cause " "unexpected results." ) self.FLAGS[flag] = handler return flag def process_flags(self, response): """ Check a response and test if it should be processed as a flag. :param response: Anything :return: response from the registered process """ # handle flags try: if response in self.FLAGS: return self.FLAGS[response] except TypeError: pass
[docs] def get_flag(self, flag): """ Get a flags id. :param flag: str :return: str """ encoded = base64.b64encode(flag.encode('ascii')) # validate the flag requested if encoded not in self.FLAGS: raise Exception("Invalid Flag requested.") return encoded
def scan_for_port(self): """ Scan for a free port to bind to. You can override the default port range and search range by setting the environment variables NET_PORT NET_PORT_RANGE. Port range: default 3010-3050 :return: int """ # cast as int and default to 3010 and 40 port = net.PORT_START port_range = port + net.PORT_RANGE net.LOGGER.debug("Scanning {0} ports for open port...".format(port_range - port)) while port <= port_range: # ping the local host ports if not self.ping(port): try: super(_Peer, self).__init__((self.host, port), PeerHandler) net.LOGGER.debug("Found Port: {0}".format(termcolor.colored(port, "green"))) break except (OSError, ConnectionRefusedError): net.LOGGER.warning("Stale Port: {0}".format(termcolor.colored(port, "yellow"))) port += 1 # throw error if there is no open port if port > port_range: raise ValueError("No open port found between {0} - {1}".format(port, port_range)) # return found port return port def launch(self): """ Launch the peer. This should only be used if Peer(launch=False). Otherwise this is executed at init. """ self._thread.start() def request(self, peer, connection, args, kwargs): """ Request an action and response from a peer. :param peer: base64 encoded peer id :param connection: the target connection id to run :param args: positional arguments to pass to the target connection (must be json compatible) :param kwargs: keyword arguments to pass to the target connection (must be json compatible) :return: response from peer """ # decode if not isinstance(peer, tuple): expr = self.decode_id(peer) peer = (expr['host'], expr['port']) # package up the request, by default delete the peer argument in the kwargs. if kwargs.get("peer"): del kwargs['peer'] if not isinstance(connection, str): connection = connection.decode('ascii') payload = {'connection': connection, 'args': args, 'kwargs': kwargs} # socket connection sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # set the time out on the function if kwargs.get('time_out'): sock.settimeout(kwargs.get('time_out')) # connect sock.connect(peer) try: # send request sock.sendall(self.encode(payload)) # sock raw = sock.recv(1024) # safely close the socket sock.close() except Exception as err: # safely close the socket sock.close() # handle error logging net.LOGGER.error(traceback.format_exc()) raise err # handle flags processor = self.process_flags(raw) if processor: terminate = processor( connection, { 'host': peer[0], 'port': peer[1], } ) # if flag returns anything, return it if terminate: return terminate # decode and return final response return self.decode(raw) def protected_request(self, peer, connection, args, kwargs, stale): """ This allows for protected requests. Intended for threaded event calls. .. warning:: INTERNAL USE ONLY Do not use this directly, it will only cause you pain. :param peer: base64 encoded peer id :param connection: the target connection id to run :param args: positional arguments to pass to the target connection (must be json compatible) :param kwargs: keyword arguments to pass to the target connection (must be json compatible) :param stale: share resource for detecting old peers :return: response from peer """ try: self.request(peer, connection, args, kwargs) except Exception as e: if isinstance(e, ConnectionRefusedError): # thread handling LOCK.acquire() stale.append(peer) LOCK.release() else: net.LOGGER.warning( "An error has happened a remote peer. " "This was a protected request and will " "ignore the error response.\n\t" "Peer: {0}".format(self.decode_id(peer)) ) def trigger_event(self, event, *args, **kwargs): """ Registers the peer and connection to the peers subscription system. This is for internal use only, use the ``net.subscribe`` decorator instead. :param event: event id :param args: args to pass to the subscribed connections :param kwargs: args to pass to the subscribed connections :return: None """ event = self.SUBSCRIPTIONS.get(event) if not event: net.LOGGER.info( "Invalid Event {0}.\n" "This event has no subscribers so it will be skipped." "Active Events:\n" + '\n'.join( sorted(self.SUBSCRIPTIONS.keys()) ) ) return # loop over the peers stale = [] # thread spawning variables threads = [] # loop over and multi thread the requests for peer, connections in event.items(): for connection in connections: # try to execute the subscription trigger on the subscribed peer # For the purpose of protecting the event triggering peer from # remote errors, all connection errors and remote runtime errors # will be caught and logged. But nothing will halt the running # application. i.e. this peer. # # Stale peer subscriptions will be added to the stale list and # pruned. Since the list subscriptions are created per client # request, this peer will not know until a request is made that # the subscribed peer went offline. if net.THREAD_LIMIT == 0: self.protected_request(peer, connection, args, kwargs, stale) continue # if multi-threading is configured, distribute the workload to # the threads. thread = threading.Thread( target=self.protected_request, args=(peer, connection, args, kwargs, stale) ) thread.setName( "Network_Scanner_{0}_{1}".format(peer, connection) ) thread.daemon = True threads.append(thread) thread.start() # if we hit the configured thread limit, wait until we free up # some threads if len(threads) == net.THREAD_LIMIT: for thread in threads: thread.join() # reset the threads list and continue requesting threads = [] # safety catch incase there are still some working threads for thread in threads: thread.join() # create a working copy before we prune updated_subscriptions = copy.deepcopy(self.SUBSCRIPTIONS) # Clean out the stale peers that are no longer valid. for event, event_data in self.SUBSCRIPTIONS.items(): # clean out the offline or unreachable peers for stale_address in stale: if stale_address in event_data: del updated_subscriptions[event][stale_address] # delete the event if it is empty if not event_data.keys(): del updated_subscriptions[event] # update the subscriptions registry self.SUBSCRIPTIONS = updated_subscriptions
# noinspection PyPep8Naming
[docs]def Peer(*args, **kwargs): """ Running Peer server for this instance of python. :return: :class:`net.peer._Peer` """ global SINGLETON # handle singleton behavior if not SINGLETON: SINGLETON = _Peer(*args, **kwargs) return SINGLETON