logo
Back to blog list
Blog Image

Create spatial APIs with FastAPI

Posted on January 6, 2025

8 minutes

 

When working with geospatial data in a FastAPI application, one of the most common tasks is to store and retrieve geographical information, such as points, using databases. If you’re working with PostGIS and GeoAlchemy2 , you can efficiently manage and query geospatial data like points.

In this blog post, we’ll walk through how to set up a FastAPI app to retrieve all point-based records stored in a database and return them in an easy-to-use format.

Check GitHub code : https://github.com/Rotten-Grapes-Pvt-Ltd/create-spatial-fastapi

Prerequisites

Before we get started, make sure you have the following set up:

  1. PostgreSQL with PostGIS extension: This is necessary for storing geospatial data.
  2. GeoAlchemy2: This library provides integration between SQLAlchemy and PostGIS.
  3. FastAPI: To build the web application.
  4. SQLAlchemy: To handle database sessions and queries.

This blog is structured in a way that we explain logic and then code as well 

Create FASTAPI app

We’ll start by creating a fastapi app and virtualenv

  • Create virtual environment first and activate it
python3 -m venv env
source env/bin/activate
  • Install packages
pip install "fastapi[standard]" 
  • Make folder structure as following 
  • Write boilerplate code in main.py
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def read_root():
    return {"Hello": "World"}
  • run FastAPI code
cd app
uvicorn main:app --reload 
 

Add database settings

  • Install packages
pip install sqlalchemy geoalchemy2 pydantic-settings asyncpg greenlet

Now we’ll add files and connection to database

  • Let’s start by creating .env and adding database url based on following details

username = postgres
password = postgres
host = localhost
port = 5432
db = fastpg

DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/fastpg
  • create app/config.py to load .env 
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    DATABASE_URL: str

    
    class Config:
        env_file = ".env"
        case_sensitive = True # Ensure environment variables are case-sensitive
        extra = "ignore" # Ignore extra env vars not defined in model

settings = Settings(_env_file=".env") # Pass env file explicitly
  • create app/database.py 
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from config import settings

DATABASE_URL = settings.DATABASE_URL

engine = create_async_engine(DATABASE_URL, future=True, echo=True)
SessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)

Base = declarative_base()

async def get_db():
    async with SessionLocal() as session:
        yield session
  • create app/models.py 
from sqlalchemy import Column, Integer, String
from geoalchemy2 import Geometry
from database import Base

class Item(Base):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    description = Column(String, nullable=True)
    geom = Column(Geometry("POINT"))
  • create app/schemas.py 
from pydantic import BaseModel
from typing import Optional

class ItemBase(BaseModel):
    name: str
    description: Optional[str]
    geom: str  # WKT (Well-Known Text)

class ItemCreate(ItemBase):
    pass

class Item(ItemBase):
    id: int

    class Config:
        from_attributes = True
  • Edit main.py to get database when app starts running
from fastapi import FastAPI
from contextlib import asynccontextmanager

from database import engine, Base

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield

app = FastAPI(lifespan=lifespan)

@app.get("/")
def read_root():
    return {"Hello": "World"}

Setup Migrations

Migrations allows us to keep track of database changes

  • Setup alembic 
pip install alembic
alembic init migrations

by doing this, new file alembic.ini as well as new folder migrations will be created

  • Edit alembic.ini with sqlalchemy.url
sqlalchemy.url = postgresql+asyncpg://postgres:postgres@localhost:5432/fastpg
  • Edit app/migrations/env.py with following
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from models import Base
from geoalchemy2.admin.dialects.common import _check_spatial_type
from geoalchemy2 import Geometry, Geography, Raster


# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)

target_metadata = Base.metadata

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.

def render_item(obj_type, obj, autogen_context):
    """Apply custom rendering for selected items."""
    if obj_type == 'type' and isinstance(obj, (Geometry, Geography, Raster)):
        import_name = obj.__class__.__name__
        autogen_context.imports.add(f"from geoalchemy2 import {import_name}")
        return "%r" % obj

    # default rendering for other objects
    return False

def include_object(object, name, type_, reflected, compare_to):
    # Stop making 'index' for geometry column
    if type_ == "index":
        if len(object.expressions) == 1:
            try:
                col = object.expressions[0]
                if (
                    _check_spatial_type(col.type, (Geometry, Geography, Raster))
                    and col.type.spatial_index
                ):
                    return False
            except AttributeError:
                pass
    # Exclude 'spatial_ref_sys' from migrations
    if type_ == "table" and name == "spatial_ref_sys":
        return False
     
    return True

def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        render_item=render_item,
        include_object=include_object,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()

def do_run_migrations(connection):
    """
    This function is used to execute migrations within an async context.
    """
    context.configure(connection=connection, 
                      target_metadata=target_metadata,
                      render_item=render_item,
                      include_object=include_object,)

    with context.begin_transaction():
        context.run_migrations()

async def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)


if context.is_offline_mode():
    run_migrations_offline()
else:
    import asyncio
    asyncio.run(run_migrations_online())
  • Edit app/migrations/script.py.mako
"""${message}

Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
import geoalchemy2 # add geoalchemy to the migration file
${imports if imports else ""}

# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}


def upgrade() -> None:
    ${upgrades if upgrades else "pass"}


def downgrade() -> None:
    ${downgrades if downgrades else "pass"}
  • Generate migration automatically and upgrade head
alembic revision --autogenerate -m "init"
alembic upgrade head

  • Check your database to confirm if alembic_version and Item tables are added

 

Try creating one more model, and subsequent schema. Once done, generate migrations

Create APIs

We’ll start by creating CRUD functions and APIs

  • Install packages
pip install "geoalchemy2[shapely]"
  • Create crud.py 
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from models import Item
from schemas import ItemCreate
from geoalchemy2 import WKTElement
from geoalchemy2.shape import to_shape

async def create_item(db: AsyncSession, item: ItemCreate):
    db_item = Item(name=item.name, description=item.description, geom=WKTElement(item.geom, srid=4326) )
    db.add(db_item)
    await db.commit()
    await db.refresh(db_item)
    return {"status": 201,"message":'Added successfully'}

async def get_items(db: AsyncSession):
    result = await db.execute(select(Item))
    items = result.scalars().all()
    for obj in items:
        if isinstance(obj.geom, str):
            obj.geom = WKTElement(obj.geom)
        obj.geom = to_shape(obj.geom).wkt
    return  items
  • Create routers/items.py 
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from schemas import Item, ItemCreate
from crud import create_item, get_items

router = APIRouter()

@router.post("/")
async def create_new_item(item: ItemCreate, db: AsyncSession = Depends(get_db)):
    res = await create_item(db, item)
    return res

@router.get("/")
async def read_items(db: AsyncSession = Depends(get_db)):
    return await get_items(db)
  • Update main.py 
from fastapi import FastAPI
from contextlib import asynccontextmanager
from routers import items # add router

from database import engine, Base


@asynccontextmanager
async def lifespan(app: FastAPI):
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield

app = FastAPI(lifespan=lifespan)

app.include_router(items.router, prefix="/items", tags=["items"]) # include router

@app.get("/")
def read_root():
    return {"Hello": "World"}

Try APIs

  • Try sending body with WKT geometry to POST request
curl -X 'POST' \
  'http://localhost:8000/items/' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "name": "Rotten Grapes",
  "description": "Open source GIS company",
  "geom": "POINT(73.76311482372093 19.973550453884766)"
}'
  • GET all Items 
curl -X 'GET' \
  'http://127.0.0.1:8000/items/' \
  -H 'accept: application/json'
 

 

 
Image

Unlock Exclusive Content and Stay updated.

Subscribe today!

Interesting content are in store for you.

What are you interested to know more about?