Source code for cryptoassets.core.utils.httpeventlistener

"""Convenience decorator to open HTTP event listever for configured cryptoassets service.

Opens a new HTTP server running a background thread. Whenever cryptoassets helper service posts a new event, it will be received by this HTTP server which then executes the event in your application context.

This can be used only once per application, so you need to dispatch listened events to your own event handling funcions in one singleton handler.

The callback receives two arguments, ``event_name`` (string) and ``data`` (dict). Data payload depends on the event type.

Example::

    app = CryptoAssetsApp()

    # This will load the configuration file for the cryptoassets framework
    configurer = Configurator(app)
    configurer.load_yaml_file("cryptoassets-settings.yaml")

    @simple_http_event_listener(configurer.config)
    def my_event_callback(event_name, data):
        if event_name == "txupdate":
            print("Got transaction update {}".format(data))

"""

import json
import threading
import atexit
import urllib
import logging
from http.server import HTTPServer
from http.server import BaseHTTPRequestHandler

from cgi import parse_header
from cgi import parse_multipart
from urllib.parse import parse_qs


logger = logging.getLogger(__name__)


class CryptoassetsServiceRequestHandler(BaseHTTPRequestHandler):
    """Very crude HTTP POST processor.

    Extra txid from the POST request.
    """

    def log_request(self, code=None, size=None):
        logger.debug("HTTP %d", code)

    def do_POST(self):

        try:
            # http://stackoverflow.com/a/12731208/315168
            # Extract and print the contents of the POST
            length = int(self.headers['Content-Length'])
            post_data = parse_qs(self.rfile.read(length).decode('utf-8'))

            if "data" not in post_data:
                raise RuntimeError("Incoming POST did not contain data field: {}".format(post_data))

            event_name = post_data["event_name"][0]
            data = post_data["data"][0]

            logger.debug("Handling incoming event %s", event_name)

            data = json.loads(data)
            self.server.func(event_name, data)

            self.send_response(200, "OK")
            self.end_headers()
            return ""
        except Exception as e:
            logger.error("Error handling incoming event")
            logger.exception(e)
            self.send_response(500, "Internal server error")
            self.end_headers()
            raise e


class EventCaptureHTTPServer(HTTPServer):
    """HTTP Server responsing to event HTTP POST notifications."""


class SimpleHTTPEventListenerThread(threading.Thread):

    def __init__(self, ip, port, func):
        """
        :param func: The event handling callback function

        :param ip: IP address / host to bind

        :param port: Port to bind
        """

        self.func = func

        #: HTTP server instance we are running
        self.httpd = None

        server_address = (ip, port)

        try:
            self.httpd = EventCaptureHTTPServer(server_address, CryptoassetsServiceRequestHandler)

            # XXX: More explicitly pass this around?
            self.httpd.func = func
        except OSError as e:
            raise RuntimeError("Could not start cryptoassets server HTTP event listener at {}:{}".format(ip, port)) from e

        self.running = False

        threading.Thread.__init__(self)

    def run(self):
        self.running = True
        self.httpd.serve_forever()

    def stop(self):
        self.running = False
        if self.httpd:
            self.httpd.shutdown()


[docs]def simple_http_event_listener(config, daemon=True): """Function decorator to make the target function to retrieve events from cryptoassets helper service over HTTP event callback. You can also call this manually from command line from testing:: curl --data 'event_name=txupdate&data={"transaction_type":"broadcast","address":"x","confirmations":2,"txid":"foobar"}' http://127.0.0.1:10000 :param config: *cryptoassets.core* app configuration as Python dict. We'll extract the information which port and IP to listen to on HTTP server from there. :param func: The event handling callback function, ``callback(event_name, data_dict)``. :param daemon: Should the server be started as a daemon thread (does not prevent Python application quitting unless explictly stopped) """ def actual_decorator(func): assert type(config) == dict # Exract status server address from the configuration notify_config = config.get("events") if not notify_config: raise RuntimeError("Could not get the configuration for cryptoassets service process events") # Get first HTTP event handling entty from the config and grab it's IP and URL there host = port = None for data in notify_config.values(): if data["class"] == "cryptoassets.core.event.http.HTTPEventHandler": url = urllib.parse.urlparse(data["url"]) port = url.port host = url.hostname assert url.path in ("/", ""), "Simple HTTP Event listener doesn't support URLs with paths, your path was {}".format(url.path) break else: raise RuntimeError("Could not find cryptoassets.core.event.http.HTTPEventHandler configuration in cryptoassets config") server = SimpleHTTPEventListenerThread(host, port, func) server.daemon = daemon server.start() def handle_cleanup(): server.stop() atexit.register(handle_cleanup) func.http_server = server return func return actual_decorator
__all__ = [simple_http_event_listener]