lricardo.space

Asynchronous SQLAlchemy in FastAPI

Update (February 22th, 2024): SQLAlchemy updated its documentation, clearly stating that The QueuePool class is not compatible with asyncio.

FastAPI is a very popular Python web framework used to create web applications. It’s essentially a fork from Starlette with a set of plugins and utilities on top.

FastAPI and Starlette foundation is asyncio, a Python built-in library to write concurrent code. It is very interesting and it borrows some concepts from NodeJS: single-threaded, non-blocking IO, async/await, etc.

This article explains how to do it!

Engine initialization

SQLAlchemy 2.0 provides an async mode.

To use SQLAlchemy, we must create an engine, an object that provides the source for the database connectivity and behaviour.

To do that, we must use create_async_engine(). There a bunch of different configuration options but the ones to look at are related with the connection pool - an important mechanism that stores and manages reusable database connections without the need of repeatedly opening and closing them, thus having a significant impact on effiecieny and performance of the application and the database.

Connection Pool initialization

TL;DR: These are the most important configurations to be aware of:

  • poolclass: a Pool subclass which will be used by SQLAlchemy to create a connection pool. We should use AsyncAdaptedQueuePool, which is the async version of QueuePool.
  • pool_size: minimum number of connections that will be kept (the total might be pool_size + max_overflow))
  • max_overflow: the maximum overflow size of the pool

Although not strictly necessary, there might some use-cases when the default values might need to be configured:

  • pool_recycle: this setting causes the pool to recycle connections after the given number of seconds has passed
  • pool_timeout: number of seconds to wait before giving up on getting a connection from the pool

Below, an example of an engine that is asynchronous, uses an async pool and will have up to 30 connections.

engine = create_async_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test",
    pool_class=AsyncAdaptedQueuePool,
    pool_size=20,
    max_overflow=10
)  

Session initialization

In what regards session initalization, there isn’t much to do or be aware of but two small things.

Use async_sessionmaker

Instead of using sessionmaker, the best is to use the analogous async_sessionmaker:

async_session = async_sessionmaker(engine)  

Using the session

We should be absolutely aware that the a session is meant to be scoped to only a request. Also, Session / AsyncSession instances are not thread-safe and not meant to be shared among concurrent coroutines, so, do not use app.state to share this instances of these classes!!!

The way I recommend to do it, consists in creating a module, e.g.: sqlalchemy.py , declare the engine, async_session and finally define the following async generator:

async def get_session() -> AsyncSession:  
    async with async_session() as session:  
        yield session

Now, using Depends, inject it!

Although this is a basic method, I think this specific set of configurations forms a good starting point. Hope that this helps!