Source code for cryptoassets.core.service.main

"""Cryptoassets helper service is a standalone process managing cryptoasset backend connections and transaction updates.

Manages asynchronous tasks for sending and receiving cryptocurrency over various APIs. This includes

* Broadcasting transactions to the cryptocurrency network asynchronously

* Handle incoming transactions and write them to the database, calls your application via event handlers

* Updates confirmation counts of open transactions

"""

import sys
import datetime
import logging
import time
import signal

import pkg_resources
from apscheduler.schedulers.background import BackgroundScheduler

from ..app import CryptoAssetsApp
from ..app import Subsystem
from ..app import ALL_SUBSYSTEMS
from ..configure import Configurator
from ..backend.base import IncomingTransactionRunnable
from ..coin.registry import Coin

from ..tools import confirmationupdate
from ..tools import receivescan
from ..tools import broadcast

from ..utils import danglingthreads

from . import status
from . import defaultlogging


#: Must be instiated after the logging configure is passed in
logger = None


[docs]def splash_version(): """Log out cryptoassets.core package version.""" version = pkg_resources.require("cryptoassets.core")[0].version logger.info("cryptoassets.core version %s", version)
[docs]class Service: """Main cryptoassets helper service. This class runs *cryptoassets helper service* process itself and various command line utilities (*initialize-database*, etc.) We uses `Advanced Python Scheduler <http://apscheduler.readthedocs.org/>`_ to run timed jobs (broadcasts, confirmatino updates). Status server (:py:mod:`cryptoassets.core.service.status`) can be started for inspecting our backend connections are running well. """ def __init__(self, config, subsystems=[Subsystem.database, Subsystem.backend], daemon=False, logging=True): """ :param config: cryptoassets configuration dictionary :param subsystems: List of subsystems needed to initialize for this process :param daemon: Run as a service """ self.app = CryptoAssetsApp(subsystems) #: Status server instance self.status_server = None #: coin name -> IncomingTransactionRunnable self.incoming_transaction_runnables = {} self.running = False self.last_broadcast = None self.receive_scan_thread = None #: How often we check out for outgoing transactions self.broadcast_period = 30 # List of active running threads self.threads = [] self.daemon = daemon self.config(config, logging_=logging) self.setup()
[docs] def config(self, config, logging_): """Load configuration from Python dict. Initialize logging system if necessary. """ self.configurator = Configurator(self.app, self) self.configurator.load_from_dict(config) if logging_: self.setup_logging(config) # Now logging is up'n'running and we can finally create logger for this Python module global logger logger = logging.getLogger(__name__) splash_version()
[docs] def setup(self): """Start background threads and such.""" if Subsystem.broadcast in self.app.subsystems: self.setup_jobs() if Subsystem.database in self.app.subsystems: self.setup_session() if Subsystem.incoming_transactions in self.app.subsystems: self.setup_incoming_notifications() # XXX: We are aliasing here, because configurator can only touch app object. Need to figure out something cleaner. self.status_server = self.app.status_server
def setup_logging(self, config): if not self.daemon or not config.get("service", {}).get("logging"): # Setup console logging if we run as a batch command or service config lacks logging defaultlogging.setup_stdout_logging()
[docs] def setup_session(self): """Setup database sessions and conflict resolution.""" self.app.setup_session()
[docs] def initialize_db(self): """ """ logger.info("Creating database tables for %s", self.app.engine.url) self.app.setup_session() self.app.create_tables()
def setup_jobs(self): logger.debug("Setting up broadcast scheduled job") self.scheduler = BackgroundScheduler() self.broadcast_job = self.scheduler.add_job(self.poll_broadcast, 'interval', seconds=self.broadcast_period) self.open_transaction_job = self.scheduler.add_job(self.poll_network_transaction_confirmations, 'interval', minutes=1)
[docs] def start_status_server(self): """Start the status server on HTTP. The server is previously set up by ``configure`` module.We need just to pass the status report generator of this service to it before starting it up. """ if self.status_server: report_generator = status.StatusReportGenerator(self, self.app.conflict_resolver) logger.info("Starting status server %s with report generators %s", self.status_server, report_generator) self.status_server.start(report_generator) self.threads.append(self.status_server)
[docs] def setup_incoming_notifications(self): """Start incoming transaction handlers. """ assert self.app.conflict_resolver for name, coin, in self.app.coins.all(): assert type(name) == str assert isinstance(coin, Coin) backend = coin.backend runnable = backend.setup_incoming_transactions(self.app.conflict_resolver, self.app.event_handler_registry) if runnable: logger.info("Setting up incoming transaction notifications for %s using %s", coin, runnable.__class__) assert isinstance(runnable, IncomingTransactionRunnable) if runnable: self.incoming_transaction_runnables[name] = runnable self.threads.append(runnable)
[docs] def setup_sigterm(self): """Capture SIGTERM and shutdown on it.""" old_sigint = None def term_handler(signum, frame): logger.info("Received SIGTERM") self.running = False def keyboard_handler(signum, frame): logger.info("Received SIGINT") self.running = False # Reove keyboard handler, so that CTRL+C twice does hard kill signal.signal(signal.SIGINT, old_sigint) # Set the signal handler and a 5-second alarm signal.signal(signal.SIGTERM, term_handler) old_sigint = signal.signal(signal.SIGINT, keyboard_handler)
[docs] def poll_broadcast(self): """"A scheduled task to broadcast any new transactions to the bitcoin network. Each wallet is broadcasted in its own transaction. """ self.last_broadcast = datetime.datetime.utcnow() for name, coin in self.app.coins.all(): wallet_class = coin.wallet_model @self.app.conflict_resolver.managed_transaction def create_broadcasters(session): return [broadcast.Broadcaster(wallet, self.app.conflict_resolver, coin.backend) for wallet in session.query(wallet_class).all()] broadcasters = create_broadcasters() for broadcaster in broadcasters: broadcaster.do_broadcasts()
[docs] def poll_network_transaction_confirmations(self): """Scan incoming open transactions. :return: Number of rescans attempted """ rescans = 0 for name, coin in self.app.coins.all(): if coin.backend.require_tracking_incoming_confirmations(): max_confirmation_count = coin.max_confirmation_count tx_updater = coin.backend.create_transaction_updater(self.app.conflict_resolver, self.app.event_handler_registry) confirmationupdate.update_confirmations(tx_updater, max_confirmation_count) rescans += 1 return rescans
[docs] def scan_received(self): """Scan through all received transactions, see if we missed some through walletnotify.""" receivescan.scan(self.app.coins, self.app.conflict_resolver, self.app.event_handler_registry)
def start_startup_receive_scan(self): self.receive_scan_thread = receivescan.BackgroundScanThread(self.app.coins, self.app.conflict_resolver, self.app.event_handler_registry) self.receive_scan_thread.start() self.threads.append(self.receive_scan_thread)
[docs] def start(self): """Start cryptoassets helper service. Keep running until we get SIGTERM or CTRL+C. :return: Process exit code """ logger.info("Starting cryptoassets helper service") self.running = True self.scheduler.start() for coin, runnable in self.incoming_transaction_runnables.items(): logger.info("Starting incoming transaction notifications for %s", coin) runnable.start() self.start_status_server() self.start_startup_receive_scan() self.setup_sigterm() if self.daemon: # Leave cryptoassets helper service running return self.run_thread_monitor() else: # Testing from unit tests return
[docs] def run_thread_monitor(self): """Run thread monitor until terminated by SIGTERM.""" self.running = True while self.running: if not self.check_threads(): logger.fatal("Shutting down due to failed thread") self.shutdown(unclean=True) return 2 time.sleep(3.0) self.shutdown() return 0
[docs] def check_threads(self): """Check all the critical threads are running and do shutdown if any of the threads has died unexpectly. :return: True if all threads stil alive """ for thread in self.threads: if not thread.is_alive(): # Assume all of our threads have thead.running attribute set False when they terminate their main loop normally if getattr(thread, "running", False) is True: logger.error("Thread abnormally terminated %s", thread) return False return True
[docs] def shutdown(self, unclean=False): """Shutdown the service process. :param unclean: True if we terminate due to exception """ logger.info("Attempting shutdown of cryptoassets helper service, unclean %s", unclean) self.running = False for runnable in self.incoming_transaction_runnables.values(): runnable.stop() if self.scheduler.running: self.scheduler.shutdown() logger.info("Attempting of shutdown status server") if self.app.status_server: self.app.status_server.stop() self.app.status_server = None logger.debug("Checking for dangling threads") danglingthreads.check_dangling_threads() logger.debug("Quit") # setuptools entry points
def parse_config_argv(): if len(sys.argv) < 2: sys.exit("Usage: {} <configfile.config.yaml>".format(sys.argv[0])) config = Configurator.prepare_yaml_file(sys.argv[1]) return config def initializedb(): config = parse_config_argv() service = Service(config, (Subsystem.database,)) service.initialize_db() def scan_received(): config = parse_config_argv() service = Service(config, (Subsystem.database, Subsystem.backend, Subsystem.event_handler_registry)) service.scan_received() def helper(): config = parse_config_argv() Configurator.setup_startup(config) service = Service(config, ALL_SUBSYSTEMS, daemon=True) exit_code = service.start() sys.exit(exit_code)