Utilities¶
Introduction¶
Here is collection of helper classes.
Transaction conflict resolver¶
ConflictResolver is a helper class to provide serialized transaction conflict resolution mechanism in your SQLAlchemy application.
Preface¶
Transaction conflict resolution is a way to deal with concurrency and race condition issues within multiuser application. It is a way to resolve race conditions when two users, or two threads, are performing an action affecting the same data set simultaneously.
There are two basic ways of concurrency control
- Up-front locking: You use interprocess / interserver locks to signal you are about to access and modify resources. If there are concurrent access the actors accessing the resource wait for the lock before taking action. This is pessimistic concurrency control mechanism.
- Transaction serialization: Database detects concurrent access from different clients (a.k.a serialization anomaly) and do not let concurrent modifications to take place. Instead, only one transaction is let through and other conflicting transactions are rolled back. The strongest level of transaction isolation is achieved using SQL Serializable isolation level. This is optimistic concurrency control mechanism.
For complex systems, locking may pose scalability and complexity issues. More fine grained locking is required, placing cognitive load on the software developer to carefully think and manage all locks upfront to prevent race conditions and deadlocks. Thus, locking may be error prone approach in real world application development (TBD needs better sources).
Relying on database transaction serialization is easier from the development perspective. If you use serialized transactions you know there will never be database race conditions. In the worst case there is an user error saying there was concurrency error. But transaction serialization creates another problem: your application must be aware of potential transaction conflicts and in the case of transaction conflict it must be able to recover from them.
Please note that when system is under high load and having high concurrent issue rate, both approaches will lead to degraded performance. In pessimistic approach, clients are waiting for locks, never getting them and eventually timing out. In optimistic approach high transaction conflict rate may exceed the rate the system can successfully replay transactions. Long running transaction are also an issue in both approaches, thus batch processing is encouraged to use limited batch size for each transaction if possible.
Benefits and design goals¶
cryptoassets.core.utils.conflictresolver.ConflictResolver
is a helper class to manage serialized transaction conflicts in your code and resolve them in idiomatic Python manner. The design goals include
- Race condition free codebase because there is no need for application level locking
- Easy, Pythonic, to use
- Simple
- Have fine-grained control over transaction life cycle
- Works with SQLAlchemy
These all should contribute toward cleaner, more robust and bug free, application codebase.
The work was inspired by ZODB transaction package which provides abstract two-phase commit protocol for Python. transaction package contains more features, works across databases, but also has more complex codebase and lacks decorator approach provided by ConflictResolver. Whereas ConflictResolver works directly with SQLAlchemy sessions, making it more straightforward to use in SQLAlchemy-only applications.
Transaction retries¶
In the core of transaction serialization approach is recovery from the transaction conflict. If you do not have any recovery mechanism, when two users edit the same item on a website and press save simultaneously, leading to a transaction conflict in the database, one of the user gets save succeed the other gets an internal error page. The core principle here is that we consider transaction conflict a rare event under normal system load conditions i.e. it is rare users press the save simultaneously. But it still very bad user experience to serve an error page for one of the users, especially if the system itself knows how it could recovery from the situation - without needing intervention from the user.
ConflictResolver approach to recovery is to
- Run a transaction sensitive code within a marked Python code block
- If the code block raises an exception which we identify to be a transaction conflict error from the database, just reset the situation and replay the code block
- Repeat this X times and give up if it seems like our transaction is never going through (because of too high system load or misdesigned long running transaction blocking all writes)
Marked Python code blocks are created using Python function decorators. This is not optimal approach in the sense of code cleanness and Python with
block would be preferred. However, Python with
lacks ability to run loops which is prerequisite for transaction retries. However combined with Python closures, the boilerplate is quite minimal.
Example¶
Here is a simple example how to use ConflictResolver:
from cryptoassets.core.utils.conflictresolver import ConflictResolver
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine('postgresql:///unittest-conflict-resolution',
isolation_level='SERIALIZABLE')
# Create new session for SQLAlchemy engine
def create_session():
Session = sessionmaker()
Session.configure(bind=engine)
return Session()
conflict_resolver = ConflictResolver(create_session, retries=3)
# Create a decorated function which can try to re-run itself in the case of conflict
@conflict_resolver.managed_transaction
def top_up_balance(session, amount):
# Many threads could modify this account simultanously,
# as incrementing the value in application code is
# not atomic
acc = session.query(Account).get(1)
acc.balance += amount
# Execute the conflict sensitive code inside a transaction aware code block
top_up_balance(100)
Rules and limitations¶
The rules:
You must not blindly swallow all exceptions (generic Python
Exception
) withinmanaged_transactions
. Example how to handle exceptions if generic exception catching is needed:# Create a decorated function which can try to re-run itself in the case of conflict @conflict_resolver.managed_transaction def myfunc(session): try: my_code() except Exception as e: if ConflictResolver.is_retryable_exception(e): # This must be passed to the function decorator, so it can attempt retry raise # Otherwise the exception is all yours
Use special read-only database sessions if you know you do not need to modify the database and you need weaker transaction guarantees e.g. for displaying the total balance.
Never do external actions, like sending emails, inside
managed_transaction
. If the database transaction is replayed, the code is run twice and you end up sending the same email twice.Managed transaction code block should be as small and fast as possible to avoid transaction conflict congestion. Avoid long-running transactions by splitting up big transaction to smaller worker batches.
Compatibility¶
ConflictResolver should be compatible with all SQL databases providing Serializable isolation level. However, because Python SQL drivers and SQLAlchemy do not standardize the way how SQL execution communicates the transaction conflict back to the application, the exception mapping code might need to be updated to handle your database driver.
API documentation¶
See ConflictResolver API documentation below.
-
cryptoassets.core.utils.conflictresolver.
DATABASE_COFLICT_ERRORS
= [(<class 'sqlalchemy.orm.exc.StaleDataError'>, None)]¶ Tuples of (Exception class, test function). Behavior copied from _retryable_errors definitions copied from zope.sqlalchemy
-
class
cryptoassets.core.utils.conflictresolver.
ConflictResolver
(session_factory, retries)[source]¶ Helper class to resolve transaction conflicts in graceful manner.
Parameters: - session_factory – callback() which will give us a new SQLAlchemy session object for each transaction and retry
- retries – The number of attempst we try to re-run the transaction in the case of transaction conflict.
-
classmethod
is_retryable_exception
(e)[source]¶ Does the exception look like a database conflict error?
Check for database driver specific cases.
Parameters: e – Python Exception instance
-
managed_transaction
(func)[source]¶ Function decorator for SQL Serialized transaction conflict resolution through retries.
managed_transaction
decorator will retry to run the decorator function. Retries are attempted untilConflictResolver.retries
is exceeded, in the case the original SQL exception is let to fall through.Please obey the rules and limitations of transaction retries in the decorated functions.
-
managed_non_retryable_transaction
(func)[source]¶ Provide
managed_transactions
decorator API compatibility without retrying.Decorate your transaction handling functions with this method if you absolute must not run the code twice for transaction retry and the user error is desirable outcome.
-
transaction
()[source]¶ Get a transaction contextmanager instance using the conflict resolver session.
This approach does not support conflict resolution, because Python context managers don’t support looping. Instead, it will let any exception fall through.
ConflictResolver.transaction
is only useful to access the configured SQLAlchemy session in easy manner.- Useful for unit testing
- Useful for shell sessions
Transaction handling
- Transaction is committed if the context manager exists succesfully
- Transaction is rolled back on an exception
Example:
conflict_resolver = ConflictResolver(create_session, retries=3) with conflict_resolver.transaction() as session: account = session.query(Account).get(1) account.balance += 1
-
exception
cryptoassets.core.utils.conflictresolver.
CannotResolveDatabaseConflict
[source]¶ The managed_transaction decorator has given up trying to resolve the conflict.
We have exceeded the threshold for database conflicts. Probably long-running transactions or overload are blocking our rows in the database, so that this transaction would never succeed in error free manner. Thus, we need to tell our service user that unfortunately this time you cannot do your thing.
Conflict resolver unit tests provide tests for different transaction conflict resolution outcomes and their resolution. If you are unsure Python database driver can handle transaction conflicts, this is a good smoke test to find out.
Automatic enumeration classes¶
Python dictionary deep merge¶
-
cryptoassets.core.utils.dictutil.
merge_dict
(a, b)[source]¶ merges b into a and return merged result.
NOTE: tuples and arbitrary objects are not handled as it is totally ambiguous what should happen
Courtesy of http://stackoverflow.com/a/15836901/315168
HTTP event listener decorator¶
Convenience decorator to open HTTP event listever for configured cryptoassets service.
Opens a new HTTP server running a background thread. Whenever cryptoassets helper service posts a new event, it will be received by this HTTP server which then executes the event in your application context.
This can be used only once per application, so you need to dispatch listened events to your own event handling funcions in one singleton handler.
The callback receives two arguments, event_name
(string) and data
(dict). Data payload depends on the event type.
Example:
app = CryptoAssetsApp()
# This will load the configuration file for the cryptoassets framework
configurer = Configurator(app)
configurer.load_yaml_file("cryptoassets-settings.yaml")
@simple_http_event_listener(configurer.config)
def my_event_callback(event_name, data):
if event_name == "txupdate":
print("Got transaction update {}".format(data))
-
cryptoassets.core.utils.httpeventlistener.
simple_http_event_listener
(config, daemon=True)[source]¶ Function decorator to make the target function to retrieve events from cryptoassets helper service over HTTP event callback.
You can also call this manually from command line from testing:
curl --data 'event_name=txupdate&data={"transaction_type":"broadcast","address":"x","confirmations":2,"txid":"foobar"}' http://127.0.0.1:10000
Parameters: - config – cryptoassets.core app configuration as Python dict. We’ll extract the information which port and IP to listen to on HTTP server from there.
- func – The event handling callback function,
callback(event_name, data_dict)
. - daemon – Should the server be started as a daemon thread (does not prevent Python application quitting unless explictly stopped)
Ngrok automatic HTTP endpoint tunneling¶
Expose local HTTP ports to the world using ngrok service.
Today many API services provide webhooks calling back your website or system over HTTP. This enables simple third party interprocess communications for websites. However unless you are running in production, you often find yourself in a situation where it is not possible to get an Internet exposed HTTP endpoint over publicly accessible IP address. These situations may include your home desktop, public WI-FI access point or continuous integration services. Thus, developing or testing against webhook APIs become painful for contemporary nomad developers.
ngrok (source <https://github.com/inconshreveable/ngrok>_) is a pay-what-you-want service to create HTTP tunnels through third party relays. What makes ngrok attractice is that the registration is dead simple with Github credentials and upfront payments are not required. ngrok is also open source, so you can run your own relay for sensitive traffic.
In this blog post, I present a Python solution how to programmatically create ngrok tunnels on-demand. This is especially useful for webhook unit tests, as you have zero configuration tunnels available anywhere where you run your code. ngrok is spawned as a controlled subprocess for a given URL. Then, you can tell your webhook service provider to use this URL to make calls back to your unit tests.
One could use ngrok completely login free. In this case you lose the ability to name your HTTP endpoints. I have found it practical to have control over the endpoint URLs, as this makes debugging much more easier.
For real-life usage, you can check cryptoassets.core project where I came up with ngrok method. ngrok succesfully tunneled me out from drone.io CI service and my laptop.
Installation¶
Installing ngrok on OSX from Homebrew:
brew install ngrok
Installing ngrok for Ubuntu:
apt-get install -y unzip
cd /tmp
wget -O ngrok.zip "https://api.equinox.io/1/Applications/ap_pJSFC5wQYkAyI0FIVwKYs9h1hW/Updates/Asset/ngrok.zip?os=linux&arch=386&channel=stable"
unzip ngrok
mv ngrok /usr/local/bin
Official ngrok download, self-contained zips.
Sign up for the ngrok service and grab your auth token.
Export auth token as an environment variable in your shell, don’t store it in version control system:
export NGROK_AUTH_TOKEN=xxx
Ngrok tunnel code¶
Below is Python 3 code for NgrokTunnel
class. See the full source code here.
Example code¶
Here is a short pseudo example from cryptoassets.core block.io webhook handler unit tests. See the full unit test code here.:
class BlockWebhookTestCase(CoinTestRoot, unittest.TestCase):
def setUp(self):
self.ngrok = None
self.backend.walletnotify_config["class"] = "cryptoassets.core.backend.blockiowebhook.BlockIoWebhookNotifyHandler"
# We need ngrok tunnel for webhook notifications
auth_token = os.environ["NGROK_AUTH_TOKEN"]
self.ngrok = NgrokTunnel(21211, auth_token)
# Pass dynamically generated tunnel URL to backend config
tunnel_url = self.ngrok.start()
self.backend.walletnotify_config["url"] = tunnel_url
self.backend.walletnotify_config["port"] = 21211
# Start the web server
self.incoming_transactions_runnable = self.backend.setup_incoming_transactions(self.app.conflict_resolver, self.app.event_handler_registry)
self.incoming_transactions_runnable.start()
def teardown(self):
# Stop webserver
incoming_transactions_runnable = getattr(self, "incoming_transactions_runnable", None)
if incoming_transactions_runnable:
incoming_transactions_runnable.stop()
# Stop tunnelling
if self.ngrok:
self.ngrok.stop()
self.ngrok = None
Other¶
Please see the unit tests for NgrokTunnel
class itself.