How can I turn this sync sqlalchemy query logic to async sqlalchemy.
def update_job_by_id(id:int, job: JobCreate,db: Session,owner_id):
existing_job = db.query(Job).filter(Job.id == id)
if not existing_job.first():
return 0
job.__dict__.update(owner_id=owner_id) #update dictionary with new key value of owner_id
existing_job.update(job.__dict__)
db.commit()
return 1
I tried to convert this logic into async sqlalchemy but no luck.Here is the async version of the above code:
async def update_job_by_id(self, id: int, job: PydanticValidationModel, owner_id):
job_query = await self.db_session.get(db_table, id)
if not job_query:
return 0
updated_job = update(db_table).where(db_table.id == id).values(job.__dict__.update(owner_id = owner_id)).execution_options(synchronize_session="fetch")
await self.db_session.execute(updated_job)
return 1
it produces this error:
AttributeError: 'NoneType' object has no attribute 'items'
Scope:
Making a simple Job Board API using FastAPI and Async SQLAlchemy and I am struggling through update feature of the API in function async def update_job_by_id which I mention above first checking the ID of the job if ID is True then it will export the PydanticModel to dict object to update the PydanticModel with owner_id and finally updating the Model and update the job, but unfortunately when I try to update the PydanticModel update(Job).values(**job_dict.update({"owner_id": owner_id})).where(Job.id == id).execution_options(synchronize_session="fetch") I get None instead of updated dict.
models/jobs.py
from sqlalchemy import Column, Integer, String, Boolean, Date, ForeignKey
from sqlalchemy.orm import relationship
from db.base_class import Base
class Job(Base):
id = Column(Integer, primary_key=True, index=True)
title = Column(String, nullable=False)
company_name = Column(String, nullable=False)
company_url = Column(String)
location = Column(String, nullable=False)
description = Column(String)
date_posted = Column(Date)
is_active = Column(Boolean, default=True)
owner_id = Column(Integer, ForeignKey('user.id'))
owner = relationship("User", back_populates="jobs")
schemas/jobs.py
from typing import Optional
from pydantic import BaseModel
from datetime import date, datetime
class JobBase(BaseModel):
title: Optional[str] = None
company_name: Optional[str] = None
company_url: Optional[str] = None
location: Optional[str] = "remote"
description: Optional[str] = None
date_posted: Optional[date] = datetime.now().date()
class JobCreate(JobBase):
title: str
company_name: str
location: str
description: str
class ShowJob(JobBase):
title: str
company_name: str
company_url: Optional[str]
location: str
date_posted: date
description: str
class Config():
orm_mode = True
routes/route_jobs.py
from typing import List
from fastapi import APIRouter, HTTPException, status
from fastapi import Depends
from db.repository.job_board_dal import job_board
from schemas.jobs import JobCreate, ShowJob
from db.repository.job_board_dal import Job
from depends import get_db
router = APIRouter()
@router.post("/create-job",response_model=ShowJob)
async def create_user(Job: JobCreate, jobs: Job = Depends(get_db)):
owner_id = 1
return await jobs.create_new_job(Job, owner_id)
@router.get("/get/{id}", response_model=ShowJob)
async def retrieve_job_by_id(id:int, id_job: job_board = Depends(get_db)):
job_id = await job_board.retrieve_job(id_job, id=id)
if not job_id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Job with id {id} does not exist")
return job_id
@router.get("/all", response_model=List[ShowJob])
async def retrieve_all_jobs(all_jobs: job_board = Depends(get_db)):
return await all_jobs.get_all_jobs()
@router.put("/update/{id}")
async def update_job(id: int, job: JobCreate, job_update: job_board = Depends(get_db)):
current_user = 1
response = await job_update.update_job_by_id(id = id, job = job, owner_id = current_user)
if not response:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Job with id {id} does not exist")
return {"response": "Successfully updated the Job."}
db/repository/job_board_dal.py
from typing import List
from sqlalchemy import update
from sqlalchemy.engine import result
from sqlalchemy.orm import Session
from sqlalchemy.future import select
from schemas.users import UserCreate
from schemas.jobs import JobCreate
from db.models.users import User
from db.models.jobs import Job
from core.hashing import Hasher
class job_board():
def __init__(self, db_session: Session):
self.db_session = db_session
async def register_user(self, user: UserCreate):
new_user = User(username=user.username,
email=user.email,
hashed_password=Hasher.get_password_hash(user.password),
is_active = False,
is_superuser=False
)
self.db_session.add(new_user)
await self.db_session.flush()
return new_user
async def create_new_job(self, job: JobCreate, owner_id: int):
new_job = Job(**job.dict(), owner_id = owner_id)
self.db_session.add(new_job)
await self.db_session.flush()
return new_job
async def retrieve_job(self, id:int):
item = await self.db_session.get(Job, id)
return item
async def get_all_jobs(self) -> List[Job]:
query = await self.db_session.execute(select(Job).order_by(Job.is_active == True))
return query.scalars().all()
async def update_job_by_id(self, id: int, job: JobCreate, owner_id):
_job = await self.db_session.execute(select(Job).where(Job.id==id))
(result, ) = _job.one()
job_dict = job.dict()
print(job_dict.update({"owner_id": owner_id}))
#print(job_update)
if not result:
return 0
job_update = update(Job).values(job_dict.update({"owner_id": owner_id})).where(Job.id == id).execution_options(synchronize_session="fetch")
return await self.db_session.execute(job_update)
If anyone can point out what am I missing here it would be much appreciated.