Source code for cryptoassets.core.tools.broadcast

"""Broadcast outgoing transactions.

Broadcaster is responsible for the following things

* Check that there hasn't been any interrupted broadcats before

* Make sure there can be one and only one attempt to broadcast at any moment - so we don't have double broadcast problems

* Scan database for outgoing external transactions

* Merge and allocate these transactions to outgoing broadcasts

* If there are any unbroadcasted broadcasts, mark them scheduled for broadcast and attempt to broadcast them
"""

import datetime
import logging
from collections import Counter

logger = logging.getLogger(__name__)


def _now():
    return datetime.datetime.utcnow()


[docs]class Broadcaster: """Create and send transactions to the cryptoasset networks.""" def __init__(self, wallet, conflict_resolver, backend): assert wallet.id, "We can operate only on persisted wallets" self.wallet_model = wallet.__class__ self.wallet_id = wallet.id self.conflict_resolver = conflict_resolver self.backend = backend
[docs] def get_wallet(self, session): """Get a wallet instance within db transaction.""" Wallet = self.wallet_model return Wallet.get_by_id(session, self.wallet_id)
[docs] def get_broadcast(self, session, broadcast_id): """Get a wallet instance within db transaction.""" assert type(broadcast_id) == int NetworkTransaction = self.wallet_model.coin_description.NetworkTransaction ntx = session.query(NetworkTransaction).get(broadcast_id) assert ntx.transaction_type == "broadcast" return ntx
[docs] def collect_for_broadcast(self): """ :return: Number of outgoing transactions collected for a broadcast """ @self.conflict_resolver.managed_transaction def build_broadcast(session): wallet = self.get_wallet(session) # Get all outgoing pending transactions which are not yet part of any broadcast NetworkTransaction = wallet.coin_description.NetworkTransaction txs = wallet.get_pending_outgoing_transactions() # TODO: If any priority / mixing rules, they should be applied here if txs.count() > 0: count = txs.count() broadcast = NetworkTransaction() broadcast.transaction_type = "broadcast" broadcast.state = "pending" broadcast.opened_at = None broadcast.closed_at = None session.add(broadcast) session.flush() txs.update({"network_transaction_id": broadcast.id}) logger.info("Collected %d outgoing transaction for broadcast %d", count, broadcast.id) else: logger.debug("Did not find outgoing transactions for broadcast") count = 0 return count return build_broadcast()
[docs] def check_interrupted_broadcasts(self): """Check that there aren't any broadcasts which where opened, but never closed. :return: List Open broadcast ids or empty list if all good """ @self.conflict_resolver.managed_transaction def get_open_broadcasts(session): wallet = self.get_wallet(session) Broadcast = wallet.Broadcast bs = session.query(Broadcast).filter(Broadcast.opened_at != None, Broadcast.closed_at == None) # noqa return [b.id for b in bs] return get_open_broadcasts()
[docs] def send_broadcasts(self): """Pick up any unbroadcasted broadcasts and attempt to send them. Carefully do broadcasts within managed transactions, so that if something goes wrong we have a clear audit trail where it failed. Then one can manually check the blockchain if our transaction got there and close the broadcast by hand. :return: tuple (broadcasted network transaction count, total charged network fees) """ @self.conflict_resolver.managed_transaction def get_ready_broadcasts(session): wallet = self.get_wallet(session) NetworkTransaction = wallet.coin_description.NetworkTransaction return session.query(NetworkTransaction).filter(NetworkTransaction.transaction_type == "broadcast", NetworkTransaction.opened_at == None, NetworkTransaction.closed_at == None) # noqa @self.conflict_resolver.managed_non_retryable_transaction def mark_for_sending(session, broadcast_id): """Mark we are going to send this broadcast and get backend data needed for to build the network transaction. :yield: (address, amount) tuples how much to send to each address """ b = self.get_broadcast(session, broadcast_id) assert b.opened_at is None b.opened_at = _now() session.add(b) outputs = Counter() for tx in b.transactions: assert tx.state == "pending" assert tx.receiving_account is None assert tx.amount > 0 assert tx.address assert tx.address.address outputs[tx.address.address] += tx.amount return outputs @self.conflict_resolver.managed_non_retryable_transaction def mark_sending_done(session, broadcast_id, txid): b = self.get_broadcast(session, broadcast_id) assert b.closed_at is None b.txid = txid b.closed_at = _now() b.state = "broadcasted" session.add(b) # TODO: See if we can write update() more neatly tx_ids = [tx.id for tx in b.transactions] Transaction = b.coin_description.Transaction session.query(Transaction).filter(Transaction.id.in_(tx_ids)).update(dict(state="broadcasted", processed_at=_now()), synchronize_session=False) @self.conflict_resolver.managed_transaction def charge_fees(session, broadcast_id, fee): wallet = self.get_wallet(session) broadcast = self.get_broadcast(session, broadcast_id) return wallet.charge_network_fees(broadcast, fee) ready_broadcasts = get_ready_broadcasts() count = ready_broadcasts.count() if count == 0: logger.debug("No broadcasts ready for sending to network") else: logger.info("%d broadcasts prepared for sending", count) broadcasted_count = 0 total_fees = 0 for b in ready_broadcasts: # Note: This is something we must NOT attempt to retry logger.info("Opening broadcast %d for sending", b.id) outgoing = mark_for_sending(b.id) try: txid, fee = self.backend.send(outgoing, "Outgoing broadcast {}".format(b.id)) assert txid broadcasted_count += 1 except Exception as e: # Transaction broadcast died and we don't know why. We are pretty much dead in this situation, as we don't know if it is safe to try to re-broadcast the transaction or not. logger.error("Failed to broadcast external transaction %s", e) logger.exception(e) #: TODO: Throw emergency event here? continue logger.info("Closing broadcast %d as done, it got txid %s", b.id, txid) mark_sending_done(b.id, txid) if fee: charge_fees(b.id, fee) total_fees += fee return broadcasted_count, total_fees
[docs] def do_broadcasts(self): """Collect new outgoing transactions for a broadcast and send out all existing and new outgoing transactions.""" self.collect_for_broadcast() return self.send_broadcasts()