Utilities

Introduction

Here is collection of helper classes.

Transaction conflict resolver

ConflictResolver is a helper class to provide serialized transaction conflict resolution mechanism in your SQLAlchemy application.

Preface

Transaction conflict resolution is a way to deal with concurrency and race condition issues within multiuser application. It is a way to resolve race conditions when two users, or two threads, are performing an action affecting the same data set simultaneously.

There are two basic ways of concurrency control

For complex systems, locking may pose scalability and complexity issues. More fine grained locking is required, placing cognitive load on the software developer to carefully think and manage all locks upfront to prevent race conditions and deadlocks. Thus, locking may be error prone approach in real world application development (TBD needs better sources).

Relying on database transaction serialization is easier from the development perspective. If you use serialized transactions you know there will never be database race conditions. In the worst case there is an user error saying there was concurrency error. But transaction serialization creates another problem: your application must be aware of potential transaction conflicts and in the case of transaction conflict it must be able to recover from them.

Please note that when system is under high load and having high concurrent issue rate, both approaches will lead to degraded performance. In pessimistic approach, clients are waiting for locks, never getting them and eventually timing out. In optimistic approach high transaction conflict rate may exceed the rate the system can successfully replay transactions. Long running transaction are also an issue in both approaches, thus batch processing is encouraged to use limited batch size for each transaction if possible.

Benefits and design goals

cryptoassets.core.utils.conflictresolver.ConflictResolver is a helper class to manage serialized transaction conflicts in your code and resolve them in idiomatic Python manner. The design goals include

  • Race condition free codebase because there is no need for application level locking
  • Easy, Pythonic, to use
  • Simple
  • Have fine-grained control over transaction life cycle
  • Works with SQLAlchemy

These all should contribute toward cleaner, more robust and bug free, application codebase.

The work was inspired by ZODB transaction package which provides abstract two-phase commit protocol for Python. transaction package contains more features, works across databases, but also has more complex codebase and lacks decorator approach provided by ConflictResolver. Whereas ConflictResolver works directly with SQLAlchemy sessions, making it more straightforward to use in SQLAlchemy-only applications.

Transaction retries

In the core of transaction serialization approach is recovery from the transaction conflict. If you do not have any recovery mechanism, when two users edit the same item on a website and press save simultaneously, leading to a transaction conflict in the database, one of the user gets save succeed the other gets an internal error page. The core principle here is that we consider transaction conflict a rare event under normal system load conditions i.e. it is rare users press the save simultaneously. But it still very bad user experience to serve an error page for one of the users, especially if the system itself knows how it could recovery from the situation - without needing intervention from the user.

ConflictResolver approach to recovery is to

  • Run a transaction sensitive code within a marked Python code block
  • If the code block raises an exception which we identify to be a transaction conflict error from the database, just reset the situation and replay the code block
  • Repeat this X times and give up if it seems like our transaction is never going through (because of too high system load or misdesigned long running transaction blocking all writes)

Marked Python code blocks are created using Python function decorators. This is not optimal approach in the sense of code cleanness and Python with block would be preferred. However, Python with lacks ability to run loops which is prerequisite for transaction retries. However combined with Python closures, the boilerplate is quite minimal.

Example

Here is a simple example how to use ConflictResolver:

from cryptoassets.core.utils.conflictresolver import ConflictResolver
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine('postgresql:///unittest-conflict-resolution',
    isolation_level='SERIALIZABLE')

# Create new session for SQLAlchemy engine
def create_session():
    Session = sessionmaker()
    Session.configure(bind=engine)
    return Session()

conflict_resolver = ConflictResolver(create_session, retries=3)

# Create a decorated function which can try to re-run itself in the case of conflict
@conflict_resolver.managed_transaction
def top_up_balance(session, amount):

    # Many threads could modify this account simultanously,
    # as incrementing the value in application code is
    # not atomic
    acc = session.query(Account).get(1)
    acc.balance += amount

# Execute the conflict sensitive code inside a transaction aware code block
top_up_balance(100)

Rules and limitations

The rules:

  • You must not blindly swallow all exceptions (generic Python Exception) within managed_transactions. Example how to handle exceptions if generic exception catching is needed:

    # Create a decorated function which can try to re-run itself in the case of conflict
    @conflict_resolver.managed_transaction
    def myfunc(session):
    
        try:
            my_code()
        except Exception as e:
            if ConflictResolver.is_retryable_exception(e):
                # This must be passed to the function decorator, so it can attempt retry
                raise
            # Otherwise the exception is all yours
    
  • Use special read-only database sessions if you know you do not need to modify the database and you need weaker transaction guarantees e.g. for displaying the total balance.

  • Never do external actions, like sending emails, inside managed_transaction. If the database transaction is replayed, the code is run twice and you end up sending the same email twice.

  • Managed transaction code block should be as small and fast as possible to avoid transaction conflict congestion. Avoid long-running transactions by splitting up big transaction to smaller worker batches.

Compatibility

ConflictResolver should be compatible with all SQL databases providing Serializable isolation level. However, because Python SQL drivers and SQLAlchemy do not standardize the way how SQL execution communicates the transaction conflict back to the application, the exception mapping code might need to be updated to handle your database driver.

API documentation

See ConflictResolver API documentation below.

cryptoassets.core.utils.conflictresolver.DATABASE_COFLICT_ERRORS = [(<class 'sqlalchemy.orm.exc.StaleDataError'>, None)]

Tuples of (Exception class, test function). Behavior copied from _retryable_errors definitions copied from zope.sqlalchemy

class cryptoassets.core.utils.conflictresolver.ConflictResolver(session_factory, retries)[source]

Helper class to resolve transaction conflicts in graceful manner.

Parameters:
  • session_factorycallback() which will give us a new SQLAlchemy session object for each transaction and retry
  • retries – The number of attempst we try to re-run the transaction in the case of transaction conflict.
classmethod is_retryable_exception(e)[source]

Does the exception look like a database conflict error?

Check for database driver specific cases.

Parameters:e – Python Exception instance
managed_transaction(func)[source]

Function decorator for SQL Serialized transaction conflict resolution through retries.

managed_transaction decorator will retry to run the decorator function. Retries are attempted until ConflictResolver.retries is exceeded, in the case the original SQL exception is let to fall through.

Please obey the rules and limitations of transaction retries in the decorated functions.

managed_non_retryable_transaction(func)[source]

Provide managed_transactions decorator API compatibility without retrying.

Decorate your transaction handling functions with this method if you absolute must not run the code twice for transaction retry and the user error is desirable outcome.

transaction()[source]

Get a transaction contextmanager instance using the conflict resolver session.

This approach does not support conflict resolution, because Python context managers don’t support looping. Instead, it will let any exception fall through. ConflictResolver.transaction is only useful to access the configured SQLAlchemy session in easy manner.

  • Useful for unit testing
  • Useful for shell sessions

Transaction handling

  • Transaction is committed if the context manager exists succesfully
  • Transaction is rolled back on an exception

Example:

conflict_resolver = ConflictResolver(create_session, retries=3)
with conflict_resolver.transaction() as session:
    account = session.query(Account).get(1)
    account.balance += 1
exception cryptoassets.core.utils.conflictresolver.CannotResolveDatabaseConflict[source]

The managed_transaction decorator has given up trying to resolve the conflict.

We have exceeded the threshold for database conflicts. Probably long-running transactions or overload are blocking our rows in the database, so that this transaction would never succeed in error free manner. Thus, we need to tell our service user that unfortunately this time you cannot do your thing.

Conflict resolver unit tests provide tests for different transaction conflict resolution outcomes and their resolution. If you are unsure Python database driver can handle transaction conflicts, this is a good smoke test to find out.

Automatic enumeration classes

class cryptoassets.core.utils.enum.AutoNumber[source]

Enum pattern with automatic numbering of values.

https://docs.python.org/3/library/enum.html#autonumber

Python dictionary deep merge

cryptoassets.core.utils.dictutil.merge_dict(a, b)[source]

merges b into a and return merged result.

NOTE: tuples and arbitrary objects are not handled as it is totally ambiguous what should happen

Courtesy of http://stackoverflow.com/a/15836901/315168

HTTP event listener decorator

Convenience decorator to open HTTP event listever for configured cryptoassets service.

Opens a new HTTP server running a background thread. Whenever cryptoassets helper service posts a new event, it will be received by this HTTP server which then executes the event in your application context.

This can be used only once per application, so you need to dispatch listened events to your own event handling funcions in one singleton handler.

The callback receives two arguments, event_name (string) and data (dict). Data payload depends on the event type.

Example:

app = CryptoAssetsApp()

# This will load the configuration file for the cryptoassets framework
configurer = Configurator(app)
configurer.load_yaml_file("cryptoassets-settings.yaml")

@simple_http_event_listener(configurer.config)
def my_event_callback(event_name, data):
    if event_name == "txupdate":
        print("Got transaction update {}".format(data))
cryptoassets.core.utils.httpeventlistener.simple_http_event_listener(config, daemon=True)[source]

Function decorator to make the target function to retrieve events from cryptoassets helper service over HTTP event callback.

You can also call this manually from command line from testing:

curl --data 'event_name=txupdate&data={"transaction_type":"broadcast","address":"x","confirmations":2,"txid":"foobar"}' http://127.0.0.1:10000
Parameters:
  • configcryptoassets.core app configuration as Python dict. We’ll extract the information which port and IP to listen to on HTTP server from there.
  • func – The event handling callback function, callback(event_name, data_dict).
  • daemon – Should the server be started as a daemon thread (does not prevent Python application quitting unless explictly stopped)

Ngrok automatic HTTP endpoint tunneling

Expose local HTTP ports to the world using ngrok service.

Today many API services provide webhooks calling back your website or system over HTTP. This enables simple third party interprocess communications for websites. However unless you are running in production, you often find yourself in a situation where it is not possible to get an Internet exposed HTTP endpoint over publicly accessible IP address. These situations may include your home desktop, public WI-FI access point or continuous integration services. Thus, developing or testing against webhook APIs become painful for contemporary nomad developers.

ngrok (source <https://github.com/inconshreveable/ngrok>_) is a pay-what-you-want service to create HTTP tunnels through third party relays. What makes ngrok attractice is that the registration is dead simple with Github credentials and upfront payments are not required. ngrok is also open source, so you can run your own relay for sensitive traffic.

In this blog post, I present a Python solution how to programmatically create ngrok tunnels on-demand. This is especially useful for webhook unit tests, as you have zero configuration tunnels available anywhere where you run your code. ngrok is spawned as a controlled subprocess for a given URL. Then, you can tell your webhook service provider to use this URL to make calls back to your unit tests.

One could use ngrok completely login free. In this case you lose the ability to name your HTTP endpoints. I have found it practical to have control over the endpoint URLs, as this makes debugging much more easier.

For real-life usage, you can check cryptoassets.core project where I came up with ngrok method. ngrok succesfully tunneled me out from drone.io CI service and my laptop.

Installation

Installing ngrok on OSX from Homebrew:

brew install ngrok

Installing ngrok for Ubuntu:

apt-get install -y unzip
cd /tmp
wget -O ngrok.zip "https://api.equinox.io/1/Applications/ap_pJSFC5wQYkAyI0FIVwKYs9h1hW/Updates/Asset/ngrok.zip?os=linux&arch=386&channel=stable"
unzip ngrok
mv ngrok /usr/local/bin

Official ngrok download, self-contained zips.

Sign up for the ngrok service and grab your auth token.

Export auth token as an environment variable in your shell, don’t store it in version control system:

export NGROK_AUTH_TOKEN=xxx

Ngrok tunnel code

Below is Python 3 code for NgrokTunnel class. See the full source code here.

Example code

Here is a short pseudo example from cryptoassets.core block.io webhook handler unit tests. See the full unit test code here.:

class BlockWebhookTestCase(CoinTestRoot, unittest.TestCase):

    def setUp(self):

        self.ngrok = None

        self.backend.walletnotify_config["class"] = "cryptoassets.core.backend.blockiowebhook.BlockIoWebhookNotifyHandler"

        # We need ngrok tunnel for webhook notifications
        auth_token = os.environ["NGROK_AUTH_TOKEN"]
        self.ngrok = NgrokTunnel(21211, auth_token)

        # Pass dynamically generated tunnel URL to backend config
        tunnel_url = self.ngrok.start()
        self.backend.walletnotify_config["url"] = tunnel_url
        self.backend.walletnotify_config["port"] = 21211

        # Start the web server
        self.incoming_transactions_runnable = self.backend.setup_incoming_transactions(self.app.conflict_resolver, self.app.event_handler_registry)

        self.incoming_transactions_runnable.start()

    def teardown(self):

        # Stop webserver
        incoming_transactions_runnable = getattr(self, "incoming_transactions_runnable", None)
        if incoming_transactions_runnable:
            incoming_transactions_runnable.stop()

        # Stop tunnelling
        if self.ngrok:
            self.ngrok.stop()
            self.ngrok = None

Other

Please see the unit tests for NgrokTunnel class itself.