Database Layer
In this post, we will be building the database layer, which can be used to connect the database for querying.
Requirements:
- Singleton DB instance
- Migrations
- DB Pool (Safe opening and closing)
- Models for DB tables
environment Variable
Need to add the DB url in .env file so that I can access the url and provide to SQLAlchemy to connect the app to DB. I am using asyncpg driver to connect PostgreSQL DB so that we will be using fastapi’s asybc features
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/bookly_db
A env config file, where I can centralize all the env variables and then use those variables everywhere in the app
from pydantic_settings import BaseSettings, SettingsConfigDictclass EnvSettings(BaseSettings): model_config = SettingsConfigDict( env_file=".env", extra="ignore" ) DATABASE_URL: strenv_config = EnvSettings()
We are using pydantic_settings’s SettingsConfigDict to load .env file. Declaring the variables we want in our app and ignoring others if present in the .env file. And getting all the variables through get_env_config(). Making this as a centralized env data store so that we can import it anywhere in the app to use the env variables.
As we have not defined any value to the DATABASE_URL, Pydantic takes it as a mandatory data. If it is not defined in the .env, then the app will crash and wont start. The moment it executes line #11, app will crash if we have not provided the variable. We will attach it later to the app start. Bdw, Pydantic is used as a type checker in fastapi like we have typescript in javascript
DB layer
from sqlalchemy.ext.asyncio import create_async_enginefrom sqlalchemy.exc import SQLAlchemyErrorfrom src.core.configs.env_config import env_configdb_url = env_config.DATABASE_URLasync_engine = create_async_engine( url=db_url, pool_size=10, echo=True # Set to False in production)async def init_db(): try: async with async_engine.begin() as conn: statement = text("SELECT 'hello';") result = await conn.execute(statement) print(result.all()) except SQLAlchemyError as db_error: # π Specific Guard: Catches database issues (wrong password, network drop, etc.) print(f"π¨ DATABASE ERROR: Something went wrong with the query execution: {db_error}") raise db_error except Exception as general_error: # π Catch-All Guard: Catches any other unexpected Python bugs print(f"π₯ UNEXPECTED ERROR: A non-database error occurred: {general_error}") raise general_error async def close_db(): try: await async_engine.dispose() except Exception as general_error: print(f"π₯ UNEXPECTED ERROR: An error occurred while closing the DB pool: {general_error}")
As, we can see:
- created a async db engine, with pool size and db url in line #7
- created a function to start a db engine, creating a connection and verify it in line #13
- created a function to close the db engine
Now, we need to call the init_db() in the app’s lifespan. Create the engine in app startup and close when app shuts down. This way we are efficiently maintaining the db connection w/o any mempry leaks
app’s lifespan
We need to create a context that runs/wraps the entire lifespan of our app, so that we can attach the db initiation function at the start and db closing at the end
from fastapi import FastAPIfrom contextlib import asynccontextmanagerfrom src.db.session import init_db, close_dbasynccontextmanagerasync def life_span(app: FastAPI): print(f"server is starting...") await init_db() yield print(f"server is stopped") await close_db()
@asynccontextmanager decorator along with the function creates a context that can be attached to the fastapi life span.
app = FastAPI( title="Bookly", version=version, lifespan=life_span)
In the main file, we can attach this function in the lifespan parameter. When the app starts, it calls the init_db() which tries to import env_config, this is time where app will fail if we do not provide the required env variables. If present, it will create the engine, starts the app and when the keyword “yield” appears the life_span() pauses but the app is fully operational. If we stop the server or somehow server crashes, the life_span() resumes and executes the code after the “yield” keyword.
db models
After creating db engine and connection. lets create models that represents the db tables so that we can interact with db through them.
As I said in previous post, I wanted services to be separated so that I can reuse it rather than tightly attached to the controller
my_project/ βββ src/ βββ features/ βββ __init__.py βββ books βββ models.py
We can have books, authors, publications, etc., and each will have their own models, schemas and services. I will be having “books” table in the Db and following is its model representation
from sqlmodel import SQLModel, Fieldfrom datetime import datetime, timezonefrom sqlalchemy import DateTimeimport uuidimport sqlalchemy.dialects.postgresql as pgclass Book(SQLModel, table=True): __tablename__ = "books" id: int = Field(primary_key=True, nullable=False) title: str = Field(nullable=False) author: str = Field(nullable=False) uid: uuid.UUID = Field( default_factory=uuid.uuid4, nullable=False, sa_type=pg.UUID(as_uuid=True), unique=True, index=True ) created_at: datetime = Field( sa_type=DateTime(timezone=True), # Uses timezone-aware type default_factory=lambda: datetime.now(timezone.utc), # Sets explicit UTC nullable=False ) updated_at: datetime = Field( sa_type=DateTime(timezone=True), default_factory=lambda: datetime.now(timezone.utc), sa_column_kwargs={"onupdate": lambda: datetime.now(timezone.utc)}, # Updates on change nullable=False ) def __repr__(self): return f"<Book {self.title}>"
Few things to explain in the above snippet:
- __tablename__ = “books”
- this creates the table named “books” in the DB. I wanted to explicitly name the table, hence this statement. Its not mandatory
- sa_type=pg.UUID(as_uuid=True):
- This tells sqlalchemy to use postgres’ native uuid
- as_uuid=True, helps in translating between python uuid and pg uuid, while reading (pg to python) and while writing (python to pg)
- sa_type=DateTime(timezone=True):
- I wanted to enable timezone, so that all the stored timestamps should be in UTC and when I retrieve it in frontend, I can format it according to the user’s timezone
- default_factory=lambda: datetime.now(timezone.utc)
- If i use it without lambda, it will create atimestamp when app starts and the same timestamp will be used
- lambda makes it to run while inserting a row
- timezone.utc makes it to store utc timestamp
- sa_column_kwargs={“onupdate”: lambda: datetime.now(timezone.utc)}
- this is the same as the created at, but a new value will be created when we do an update query, hence the “onupdate”
- __repr__:
- this helps while debugging
- w/o this, when we print Book, it will print some reference address for the book as its a class
- this keyword id called represents, which will print the string that we return in the function
This finishes our models creation. Now, we need to generate migration files and apply all the migrations to our DB so that we can execute all the queries.
migrations
I will be using alembic for this. After installing alembic, we will have a separate folder named “alembic” in the project root folder
my_project/ βββ alembic/ βββ versions/ βββ env.py βββ script.py.mako βββ alembic.ini
We need to configure in the env.py mainly. We can do it in the almebic.ini as well, but it will get hardcoded and I do not want to push my DB url to the repo, so I preferred doing it programmatically.
import osfrom dotenv import load_dotenvfrom sqlmodel import SQLModelimport src.db.models# Load the .env fileload_dotenv(os.path.join(os.path.dirname(__file__), "../.env"))# this is the Alembic Config object, which provides# access to the values within the .ini file in use.config = context.configdb_url = os.getenv("DATABASE_URL")if not db_url: raise ValueError( "CRITICAL ERROR: 'DATABASE_URL' environment variable is missing or empty. " "Please check that your .env file exists in the root folder and contains this variable." )# Set the db urlconfig.set_main_option('sqlalchemy.url', db_url)target_metadata = SQLModel.metadata
Explanation:
- need to load .env so that we can get the DB url, which I did at line #8
- getting the configuration of alembic present in alembic.ini file which is done at lone #12
- Before proceeding, need to check if the env variable is available or not, if not we can stop the execution and throw error, at line #14
- setting the DB url for migration to happen at line #22
- alembic needs the models’ metatdata so that it can track changes, which is done at line #24
- One more thing, we need to import all the models so that the metadata can check, which is done at line #5
from src.features.books.models import Book# This is optinal, we cna remove it and still the code works__all__ = ["Book"]
As, you can see we are importing all the models of each feature in this file so that we can import it all at once from this file for alembic to keep track of migrations and changes. Now, it is only one, but it will keep on increasing as we keep on making new models
After all this setup, I found a small issue. When i generated migrations ad looked on the generated file, I found one error shown by vscode saying that one of the varibale is unknown.
op.create_table('books',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('title', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('author', sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column('uid', sa.UUID(), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
sa.PrimaryKeyConstraint('id')
This is the generated file, in which I was getting the error at line #3 for the variable “sqlmodel”. Seems like for the latest version, sqlmodel is unknown to sqlalchemy, hence the issue. So, we had the blueprint of this whole file in script.py.mako
from alembic import opimport sqlalchemy as saimport sqlmodel # --> This is the new addition${imports if imports else ""}
After installation, the line #3 was not present, so we need to add it. This will fix the issue. Then, we can proceed to migrate using the following commands:
- alembic revision –autogenerate -m “create books table”
- generated the migration files
- alembic upgrade head
- migrate the generated files so that it will reflect in the DB
- alembic check
- This is not to be done in the process, but its present to check if we have made some changes or not in our models. It will check with the current models and the tables present in the DB to see if any mismatch is there or not
NOTE: Alembic is not perfect, as I was seeing some docs. Some migrations files might generated wrong as well. Sometimes, changing the column name might remove the column and create a new column rather than just an alter command. Not completely wrong, but its always better to cross check the generated migration files to make sure the migrations are correct before applying and we are good.
Leave a comment