1

How can I turn this sync sqlalchemy query logic to async sqlalchemy.

def update_job_by_id(id:int, job: JobCreate,db: Session,owner_id):
    existing_job = db.query(Job).filter(Job.id == id)
    if not existing_job.first():
        return 0
    job.__dict__.update(owner_id=owner_id)  #update dictionary with new key value of owner_id
    existing_job.update(job.__dict__)
    db.commit()
    return 1

I tried to convert this logic into async sqlalchemy but no luck.Here is the async version of the above code:

async def update_job_by_id(self, id: int, job: PydanticValidationModel, owner_id):
        job_query = await self.db_session.get(db_table, id)
        if not job_query:
            return 0
        updated_job = update(db_table).where(db_table.id == id).values(job.__dict__.update(owner_id = owner_id)).execution_options(synchronize_session="fetch")
        await self.db_session.execute(updated_job)
        return 1

it produces this error:

AttributeError: 'NoneType' object has no attribute 'items'

Scope:

Making a simple Job Board API using FastAPI and Async SQLAlchemy and I am struggling through update feature of the API in function async def update_job_by_id which I mention above first checking the ID of the job if ID is True then it will export the PydanticModel to dict object to update the PydanticModel with owner_id and finally updating the Model and update the job, but unfortunately when I try to update the PydanticModel update(Job).values(**job_dict.update({"owner_id": owner_id})).where(Job.id == id).execution_options(synchronize_session="fetch") I get None instead of updated dict.

models/jobs.py

from sqlalchemy import Column, Integer, String, Boolean, Date, ForeignKey
from sqlalchemy.orm import relationship

from db.base_class import Base



class Job(Base):
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, nullable=False)
    company_name = Column(String, nullable=False)
    company_url = Column(String)
    location = Column(String, nullable=False)
    description = Column(String)
    date_posted = Column(Date)
    is_active = Column(Boolean, default=True)
    owner_id = Column(Integer, ForeignKey('user.id'))
    owner = relationship("User", back_populates="jobs")

schemas/jobs.py

from typing import Optional
from pydantic import BaseModel
from datetime import date, datetime


class JobBase(BaseModel):
    title: Optional[str] = None
    company_name: Optional[str] = None
    company_url: Optional[str] = None
    location: Optional[str] = "remote"
    description: Optional[str] = None
    date_posted: Optional[date] = datetime.now().date()

class JobCreate(JobBase):
    title: str
    company_name: str
    location: str
    description: str

class ShowJob(JobBase):
    title: str
    company_name: str
    company_url: Optional[str]
    location: str
    date_posted: date
    description: str

    class Config():
        orm_mode = True

routes/route_jobs.py

from typing import List
from fastapi import APIRouter, HTTPException, status
from fastapi import Depends

from db.repository.job_board_dal import job_board
from schemas.jobs import JobCreate, ShowJob
from db.repository.job_board_dal import Job
from depends import get_db

router = APIRouter()

@router.post("/create-job",response_model=ShowJob)
async def create_user(Job: JobCreate, jobs: Job = Depends(get_db)):
    owner_id = 1
    return await jobs.create_new_job(Job, owner_id)

@router.get("/get/{id}", response_model=ShowJob)
async def retrieve_job_by_id(id:int, id_job: job_board = Depends(get_db)):
    job_id = await job_board.retrieve_job(id_job, id=id)
    if not job_id:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
        detail=f"Job with id {id} does not exist")
    return job_id

@router.get("/all", response_model=List[ShowJob])
async def retrieve_all_jobs(all_jobs: job_board = Depends(get_db)):
    return await all_jobs.get_all_jobs()

@router.put("/update/{id}")
async def update_job(id: int, job: JobCreate, job_update: job_board = Depends(get_db)):
    current_user = 1
    response = await job_update.update_job_by_id(id = id, job = job, owner_id = current_user)
    if not response:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
        detail=f"Job with id {id} does not exist")
    return {"response": "Successfully updated the Job."}

db/repository/job_board_dal.py

from typing import List


from sqlalchemy import update
from sqlalchemy.engine import result
from sqlalchemy.orm import Session
from sqlalchemy.future import select

from schemas.users import UserCreate
from schemas.jobs import JobCreate
from db.models.users import User
from db.models.jobs import Job
from core.hashing import Hasher



class job_board():
    def __init__(self, db_session: Session):
        self.db_session = db_session

    async def register_user(self, user: UserCreate):
        new_user = User(username=user.username,
        email=user.email,
        hashed_password=Hasher.get_password_hash(user.password),
        is_active = False,
        is_superuser=False
        )
        self.db_session.add(new_user)
        await self.db_session.flush()
        return new_user

    
    async def create_new_job(self, job: JobCreate, owner_id: int):
        new_job = Job(**job.dict(), owner_id = owner_id)
        self.db_session.add(new_job)
        await self.db_session.flush()
        return new_job

    async def retrieve_job(self, id:int):
        item = await self.db_session.get(Job, id)
        return item

    async def get_all_jobs(self) -> List[Job]:
        query = await self.db_session.execute(select(Job).order_by(Job.is_active == True))
        return query.scalars().all()

    async def update_job_by_id(self, id: int, job: JobCreate, owner_id):
        _job = await self.db_session.execute(select(Job).where(Job.id==id))
        (result, ) = _job.one()
        job_dict = job.dict()
        print(job_dict.update({"owner_id": owner_id}))
        
        #print(job_update)
        if not result:
            return 0
        job_update = update(Job).values(job_dict.update({"owner_id": owner_id})).where(Job.id == id).execution_options(synchronize_session="fetch")
        return await self.db_session.execute(job_update)

If anyone can point out what am I missing here it would be much appreciated.

1

1 Answer 1

1

Sqlalchemy didn't support async operations until version 1.4, see this.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.