# -*- coding: utf-8 -*-
__all__ = [
'Peer'
]
# std imports
import re
import sys
import copy
import json
import socket
import base64
import getpass
import traceback
import threading
# third party
import termcolor
# package imports
from .handler import PeerHandler
from .imports import socketserver, ConnectionRefusedError
# package imports
import net
# globals
SINGLETON = None
# utilities
ID_REGEX = re.compile(r"(?P<host>.+):(?P<port>\d+) -> (?P<group>.+)")
# threading
LOCK = threading.Lock()
# noinspection PyMissingConstructor
[docs]class _Peer(socketserver.ThreadingMixIn, socketserver.TCPServer, object):
# adding to inheritance object for 2.7 support
# store
CONNECTIONS = {}
SUBSCRIPTIONS = {}
FLAGS = {}
[docs] @staticmethod
def decode(byte_string):
"""
Decode a byte string sent from a peer.
:param byte_string: base64
:return: str
"""
try:
byte_string = base64.b64decode(byte_string).decode('ascii')
byte_string = json.loads(byte_string)
except (Exception, json.JSONDecodeError) as err:
net.LOGGER.debug(byte_string)
net.LOGGER.debug(err)
net.LOGGER.debug(traceback.format_exc())
# if the connection returns data that is not prepackaged as a JSON object, return
# the raw response as it originally was returned.
if isinstance(byte_string, dict) and 'raw' in byte_string:
return byte_string['raw']
return byte_string
[docs] @staticmethod
def encode(obj):
"""
Encode an object for delivery.
:param obj: JSON compatible types
:return: str
"""
if not isinstance(obj, dict):
try:
if obj in Peer().FLAGS:
return obj
except TypeError:
pass
obj = {'raw': obj}
# tag with the peer
return base64.b64encode(json.dumps(obj).encode('ascii'))
@staticmethod
def ports():
"""
Generator; All ports defined in the environment.
:return: int
"""
return [port for port in range(net.PORT_START, net.PORT_START + net.PORT_RANGE)]
@staticmethod
def ping(port, host=socket.gethostname()):
"""
Ping a port and check if it is alive or open.
:param port: required port to hit
:param host: host address default is 'localhost'
:return: bool
"""
interface = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
interface.settimeout(0.25)
try:
interface.connect((host, port))
return True
except ConnectionRefusedError:
return False
[docs] @staticmethod
def generate_id(port, host, group=None):
"""
Generate a peers id.
:param port: int
:param host: str
:param group: str
:return: base64
"""
return base64.b64encode(
'{host}:{port} -> {group}'.format(
host=socket.gethostname() if not host else host,
port=port,
group=str(group),
).encode('ascii')
)
# noinspection PyShadowingBuiltins
[docs] @staticmethod
def decode_id(id):
"""
Decode a peer id
:param id: base64
:return: dict {'group': str, 'host': str, 'port': int }
"""
expr = ID_REGEX.match(base64.b64decode(id).decode('ascii')).groupdict()
return {
'group': expr['group'],
'host': expr['host'],
'port': int(expr['port'])
}
@staticmethod
def build_connection_name(connection):
"""
Build a connections full name based on the module/name of the function.
This is then encoded in base64 for easier delivery between peers.
:param connection: connection
:return: base64
"""
return base64.b64encode(
'{0}.{1}'.format(
connection.__module__,
connection.__name__
).encode('ascii')
)
def __init__(self, launch=True, test=False, group=None):
# mask with singleton catch unless being tested
if SINGLETON and not test:
raise RuntimeError(
"Can not create a new peer in without shutting down the previous"
" one. Please use net.Peer() instead."
)
# find port
self._host = net.HOST_IP
self._port = self.scan_for_port()
self._group = net.GROUP if not group else group
self._is_hub = net.IS_HUB
# handle threading
self._thread = threading.Thread(target=self.serve_forever)
self._thread.daemon = True
# launch the peer
if launch:
self.launch()
@property
def hub(self):
"""
Defines if this peer acts as the hub for communication through the
network.
:return: bool
"""
return self._is_hub
@property
def group(self):
"""
Group this peer is assigned to.
:return: str
"""
return self._group
@property
def port(self):
"""
Port that the peer is running on.
:return: int
"""
return self._port
@property
def host(self):
"""
Host that the peer is running on.
:return: str
"""
return self._host
@property
def id(self):
"""
Get this peers id. This is tethered to the port and the executable path
the peer was launched with. This is base64 encoded for easier delivery.
:return: base64
"""
return self.generate_id(self.port, self.host, self.group)
@property
def friendly_id(self, peer_id=None):
"""
Get the peers id in a friendly displayable way.
:return: str
"""
if not peer_id:
peer_id = self.id
# decode and hand back
friendly = self.decode_id(peer_id)
friendly['hub'] = self._is_hub
friendly['executable'] = sys.executable
friendly['user'] = getpass.getuser()
return friendly
def register_connection(self, connection, tag=None):
"""
Registers a connection with the global handler.
Do not use this directly. Instead use the net.connect decorator.
:func:`net.connect`
:param tag: str
:param connection: function
:return: str
"""
if not tag:
tag = self.build_connection_name(connection)
else:
tag = self.encode(tag)
# add the connection to the connection registry.
if tag in self.CONNECTIONS:
net.LOGGER.warning(
"Redefining a connection handler. Be aware, this could cause "
"unexpected results."
)
self.CONNECTIONS[tag] = connection
return tag
def register_subscriber(self, event, peer, connection):
"""
Registers the peer and connection to the peers subscription system. This
is for internal use only, use the ``net.subscribe`` decorator instead.
:param event: event id
:param peer: peer id
:param connection: connection id
:return: None
"""
subscription = self.SUBSCRIPTIONS.setdefault(event, {})
peer_connection = subscription.setdefault(peer, [])
peer_connection.append(connection)
def register_flag(self, flag, handler):
"""
Registers a flag with the peer server. Flags are simple responses that
can trigger error handling or logging. Do not use this directly. Instead
use the net.flag decorator.
@net.flag("SOME_ERROR")
def your_next_function(peer, connection):
raise SomeError(
"This failed because {0} failed on the other peer.".format(
connection
)
)
:param flag: payload
:param handler: function
:return: base64
"""
flag = base64.b64encode(flag.encode('ascii'))
if flag in self.FLAGS:
net.LOGGER.warning(
"Redefining a flag handler. Be aware, this could cause "
"unexpected results."
)
self.FLAGS[flag] = handler
return flag
def process_flags(self, response):
"""
Check a response and test if it should be processed as a flag.
:param response: Anything
:return: response from the registered process
"""
# handle flags
try:
if response in self.FLAGS:
return self.FLAGS[response]
except TypeError:
pass
[docs] def get_flag(self, flag):
"""
Get a flags id.
:param flag: str
:return: str
"""
encoded = base64.b64encode(flag.encode('ascii'))
# validate the flag requested
if encoded not in self.FLAGS:
raise Exception("Invalid Flag requested.")
return encoded
def scan_for_port(self):
"""
Scan for a free port to bind to. You can override the default port range and search range by setting the
environment variables NET_PORT NET_PORT_RANGE.
Port range:
default 3010-3050
:return: int
"""
# cast as int and default to 3010 and 40
port = net.PORT_START
port_range = port + net.PORT_RANGE
net.LOGGER.debug("Scanning {0} ports for open port...".format(port_range - port))
while port <= port_range:
# ping the local host ports
if not self.ping(port):
try:
super(_Peer, self).__init__((self.host, port), PeerHandler)
net.LOGGER.debug("Found Port: {0}".format(termcolor.colored(port, "green")))
break
except (OSError, ConnectionRefusedError):
net.LOGGER.warning("Stale Port: {0}".format(termcolor.colored(port, "yellow")))
port += 1
# throw error if there is no open port
if port > port_range:
raise ValueError("No open port found between {0} - {1}".format(port, port_range))
# return found port
return port
def launch(self):
"""
Launch the peer. This should only be used if Peer(launch=False).
Otherwise this is executed at init.
"""
self._thread.start()
def request(self, peer, connection, args, kwargs):
"""
Request an action and response from a peer.
:param peer: base64 encoded peer id
:param connection: the target connection id to run
:param args: positional arguments to pass to the target connection (must be json compatible)
:param kwargs: keyword arguments to pass to the target connection (must be json compatible)
:return: response from peer
"""
# decode
if not isinstance(peer, tuple):
expr = self.decode_id(peer)
peer = (expr['host'], expr['port'])
# package up the request, by default delete the peer argument in the kwargs.
if kwargs.get("peer"):
del kwargs['peer']
if not isinstance(connection, str):
connection = connection.decode('ascii')
payload = {'connection': connection, 'args': args, 'kwargs': kwargs}
# socket connection
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# set the time out on the function
if kwargs.get('time_out'):
sock.settimeout(kwargs.get('time_out'))
# connect
sock.connect(peer)
try:
# send request
sock.sendall(self.encode(payload))
# sock
raw = sock.recv(1024)
# safely close the socket
sock.close()
except Exception as err:
# safely close the socket
sock.close()
# handle error logging
net.LOGGER.error(traceback.format_exc())
raise err
# handle flags
processor = self.process_flags(raw)
if processor:
terminate = processor(
connection,
{
'host': peer[0],
'port': peer[1],
}
)
# if flag returns anything, return it
if terminate:
return terminate
# decode and return final response
return self.decode(raw)
def protected_request(self, peer, connection, args, kwargs, stale):
"""
This allows for protected requests. Intended for threaded event calls.
.. warning::
INTERNAL USE ONLY
Do not use this directly, it will only cause you pain.
:param peer: base64 encoded peer id
:param connection: the target connection id to run
:param args: positional arguments to pass to the target connection (must be json compatible)
:param kwargs: keyword arguments to pass to the target connection (must be json compatible)
:param stale: share resource for detecting old peers
:return: response from peer
"""
try:
self.request(peer, connection, args, kwargs)
except Exception as e:
if isinstance(e, ConnectionRefusedError):
# thread handling
LOCK.acquire()
stale.append(peer)
LOCK.release()
else:
net.LOGGER.warning(
"An error has happened a remote peer. "
"This was a protected request and will "
"ignore the error response.\n\t"
"Peer: {0}".format(self.decode_id(peer))
)
def trigger_event(self, event, *args, **kwargs):
"""
Registers the peer and connection to the peers subscription system. This
is for internal use only, use the ``net.subscribe`` decorator instead.
:param event: event id
:param args: args to pass to the subscribed connections
:param kwargs: args to pass to the subscribed connections
:return: None
"""
event = self.SUBSCRIPTIONS.get(event)
if not event:
net.LOGGER.info(
"Invalid Event {0}.\n"
"This event has no subscribers so it will be skipped."
"Active Events:\n" + '\n'.join(
sorted(self.SUBSCRIPTIONS.keys())
)
)
return
# loop over the peers
stale = []
# thread spawning variables
threads = []
# loop over and multi thread the requests
for peer, connections in event.items():
for connection in connections:
# try to execute the subscription trigger on the subscribed peer
# For the purpose of protecting the event triggering peer from
# remote errors, all connection errors and remote runtime errors
# will be caught and logged. But nothing will halt the running
# application. i.e. this peer.
#
# Stale peer subscriptions will be added to the stale list and
# pruned. Since the list subscriptions are created per client
# request, this peer will not know until a request is made that
# the subscribed peer went offline.
if net.THREAD_LIMIT == 0:
self.protected_request(peer, connection, args, kwargs, stale)
continue
# if multi-threading is configured, distribute the workload to
# the threads.
thread = threading.Thread(
target=self.protected_request,
args=(peer, connection, args, kwargs, stale)
)
thread.setName(
"Network_Scanner_{0}_{1}".format(peer, connection)
)
thread.daemon = True
threads.append(thread)
thread.start()
# if we hit the configured thread limit, wait until we free up
# some threads
if len(threads) == net.THREAD_LIMIT:
for thread in threads:
thread.join()
# reset the threads list and continue requesting
threads = []
# safety catch incase there are still some working threads
for thread in threads:
thread.join()
# create a working copy before we prune
updated_subscriptions = copy.deepcopy(self.SUBSCRIPTIONS)
# Clean out the stale peers that are no longer valid.
for event, event_data in self.SUBSCRIPTIONS.items():
# clean out the offline or unreachable peers
for stale_address in stale:
if stale_address in event_data:
del updated_subscriptions[event][stale_address]
# delete the event if it is empty
if not event_data.keys():
del updated_subscriptions[event]
# update the subscriptions registry
self.SUBSCRIPTIONS = updated_subscriptions
# noinspection PyPep8Naming
[docs]def Peer(*args, **kwargs):
"""
Running Peer server for this instance of python.
:return: :class:`net.peer._Peer`
"""
global SINGLETON
# handle singleton behavior
if not SINGLETON:
SINGLETON = _Peer(*args, **kwargs)
return SINGLETON