0

The code is used in many projects and works but in one service sometimes an error occurs:

"InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: cannot use Connection.transaction() in a manually started transaction\n[SQL: SELECT tags.id, tags.name, tags.color, tags.channel_id, tags.creator_id, tags.tag_type_id, tags.created_at, tags.updated_at \nFROM tags \nowhere tags.channel_id = $1::BIGINT ORDER BY tags.created_at DESC]\n[parameters: (-1001943261480,)]\n(Background on this error at: https://sqlalche.me/e/20/rvf5)"

What in production causes this? Everything is OK in development.

IoC code of the container:

from typing import AsyncIterable

from aiokafka import AIOKafkaConsumer
from dishka import Provider, Scope, provide
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy.orm import sessionmaker

from config import Config, get_config
from infrastructure.clients.api.invite_links import HelperBotCli
from infrastructure.db.sqlalchemy.setup import create_engine, create_session_pool
from infrastructure.message_brokers.kafka import KafkaMessageBroker
from infrastructure.uow.sqlalchemy import SQLAlchemyUnitOfWork


class BaseProvider(Provider):
    @provide(scope=Scope.APP)
    def config(self) -> Config:
        return get_config()

    @provide(scope=Scope.APP)
    def sqlalchemy_engine(self, config: Config) -> AsyncEngine:
        return create_engine(config.pg_config)

    @provide(scope=Scope.APP)
    def session_pool(self, sqlalchemy_engine: AsyncEngine) -> sessionmaker:
        return create_session_pool(sqlalchemy_engine)

    @provide(scope=Scope.REQUEST)
    def sqlalchemy_uow(self, session_factory: sessionmaker) -> SQLAlchemyUnitOfWork:
        return SQLAlchemyUnitOfWork(_session=session_factory())

    @provide(scope=Scope.REQUEST)
    def helper_bot_client(self, config: Config) -> HelperBotCli:
        return HelperBotCli(
            base_url=config.helper_bot.HELPER_BOT_BASE_URL,
            create_url=config.helper_bot.ADD_LINK_ENDPOINT,
            delete_url=config.helper_bot.DELETE_LINK_ENDPOINT,
        )

    @provide(scope=Scope.APP)
    async def kafka_broker(self, config: Config) -> AsyncIterable[KafkaMessageBroker]:
        consumer = AIOKafkaConsumer(
            config.rp_config.CONSUME_TOPIC,
            config.rp_config.CONSUME_MERGE_ACCOUNTS_TOPIC,
            bootstrap_servers=config.rp_config.RP_SERVER,
            group_id=config.rp_config.CONSUME_GROUP,
            enable_auto_commit=False,
            auto_offset_reset="earliest",
            sasl_mechanism=config.rp_config.SASL_MECHANISM,
            security_protocol=config.rp_config.SECURITY_PROTOCOL,
            sasl_plain_username=config.rp_config.RP_USER,
            sasl_plain_password=config.rp_config.RP_PASS,
            max_poll_records=config.rp_config.BATCH_SIZE,
        )
        broker = KafkaMessageBroker(consumer)
        await broker.start()
        yield broker
        await broker.close()

setup.py:

from typing import Callable, AsyncContextManager

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, AsyncEngine
from sqlalchemy.orm import sessionmaker

from config import PGConfig
from infrastructure.db.sqlalchemy.models import BaseModel


def create_engine(db: PGConfig, echo: bool = False) -> AsyncEngine:
    engine = create_async_engine(
        db.pg_database_url,
        echo=echo
    )
    return engine

async def create_tables(engine: AsyncEngine) -> None:
    async with engine.begin() as conn:
        await conn.run_sync(BaseModel.metadata.create_all)

def create_session_pool(engine: AsyncEngine) -> Callable[[], AsyncContextManager[AsyncSession]]:
    session_pool = sessionmaker(bind=engine, expire_on_commit=False, class_=AsyncSession)
    return session_pool

UoW code (removing await self line._session.begin() didn't help):

from dataclasses import dataclass
from types import TracebackType
from typing import Self

from sqlalchemy.ext.asyncio import AsyncSession

from infrastructure.repositories.invite_links.sqlalchemy import SQLAlchemyInviteLinksRepository
from infrastructure.repositories.tags.sqlalchemy import SQLAlchemyTagsRepository, SQLAlchemyTagsTypesRepository
from infrastructure.uow.base import BaseUnitOfWork


@dataclass
class SQLAlchemyUnitOfWork(BaseUnitOfWork):
    _session: AsyncSession = None

    async def __aenter__(self) -> Self:
        # await self._session.begin()
        return self

    async def __aexit__(
            self,
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType | None
    ) -> None:
        try:
            if exc_type is None:
                await self._session.commit()
            else:
                await self._session.rollback()
        finally:
            await self._session.close()

    @property
    def invite_links(self) -> SQLAlchemyInviteLinksRepository:
        self._session_check()
        return SQLAlchemyInviteLinksRepository(self._session)

    @property
    def tags(self) -> SQLAlchemyTagsRepository:
        self._session_check()
        return SQLAlchemyTagsRepository(self._session)

    @property
    def tags_types(self) -> SQLAlchemyTagsTypesRepository:
        self._session_check()
        return SQLAlchemyTagsTypesRepository(self._session)

    def _session_check(self) -> None:
        if not self._session:
            raise ValueError("Session is not initialized")

lifespan.py:

from contextlib import asynccontextmanager

import aiojobs
from fastapi import FastAPI
from sqlalchemy.orm import sessionmaker

from app.api.background import consume_in_background
from config import Config
from infrastructure.message_brokers.kafka import KafkaMessageBroker


@asynccontextmanager
async def lifespan(app: FastAPI) -> FastAPI:
    container = app.state.dishka_container
    scheduler = aiojobs.Scheduler()
    broker = await container.get(KafkaMessageBroker)
    config = await container.get(Config)
    if config.app.is_dev or config.app.is_production:
        session_factory = await container.get(sessionmaker)
        job = await scheduler.spawn(consume_in_background(broker, session_factory, config))
    yield
    if config.app.is_dev or config.app.is_production:
        await job.close()
    await scheduler.close()
    await container.close()

background.py:

import logging

from sqlalchemy.orm import sessionmaker

from config import Config
from domain.entities.invite_links import InviteLink
from infrastructure.message_brokers.base import BaseMessageBroker
from infrastructure.repositories.filters.invite_links import InviteLinksFilter
from infrastructure.uow.sqlalchemy import SQLAlchemyUnitOfWork


async def consume_in_background(
        broker: BaseMessageBroker,
        session_factory: sessionmaker,
        config: Config,
) -> None:
    logger = logging.getLogger("LinkConsumer")
    async for message_dict in broker.start_consuming():
        logger.info(f"MSG: {message_dict}")
        message = message_dict["value"]
        topic = message_dict["topic"]
        link = message.get("invite_link")
        channel_id = message.get("channel_id")
        try:
            if topic == config.rp_config.CONSUME_TOPIC:
                if not link:
                    continue
                link = InviteLink(
                    channel_id=channel_id,
                    **link
                )
                uow = SQLAlchemyUnitOfWork(_session=session_factory())
                async with uow as unit_of_work:
                    if not await unit_of_work.invite_links.is_exists(link.id, link.channel_id):
                        await unit_of_work.invite_links.create_link(link)
            elif topic == config.rp_config.CONSUME_MERGE_ACCOUNTS_TOPIC:
                uow = SQLAlchemyUnitOfWork(_session=session_factory())
                async with uow as unit_of_work:
                    links = await unit_of_work.invite_links.get_all_links(
                        links_filter=InviteLinksFilter(
                            page_size=-1,
                            creator_id=message["old_account_id"],
                        )
                    )
                    links = links[0]
                    for link in links:
                        link.creator_id = message["new_account_id"]
                        await unit_of_work.invite_links.update_link(link)
            await broker.commit()
        except Exception as e:
            logger.error(f"Error while consuming message: {e}", exc_info=True)
            # break

Code that seems suspicious to me:

@dataclass
class ReplaceTagsUseCase(BaseUseCase):
    uow: BaseUnitOfWork

    async def execute(self, command: ReplacingTagsCommand) -> InviteLink:
        LIMIT = 300
        if len(command.links) > LIMIT:
            raise InviteLinkTagsReplaceException(limit=LIMIT)

        async with self.uow as uow:
            # Создание карты ссылок и списка ID тегов для поиска
            links_map = {link.id: link for link in command.links}
            links_ids = list(links_map.keys())
            tag_ids = {tag_id for link in command.links for tag_id in link.tags}

            # Извлечение всех нужных ссылок и тегов одним запросом
            links, _ = await uow.invite_links.get_all_links(
                InviteLinksFilter(
                    links_ids=links_ids,
                    channel_id=command.channel_id,
                    page_size=LIMIT
                )
            )
            if not links:
                return []

            all_tags, _ = await uow.tags.get_all_tags(
                TagsFilter(
                    channel_id=command.channel_id,
                    tags_ids=tag_ids
                )
            )

            # Создание карты тегов для быстрого доступа
            tag_map = {tag.id: tag for tag in all_tags}

            tasks = []
            for link in links:
                tags = [tag_map[tag_id] for tag_id in links_map[link.id].tags if tag_id in tag_map]
                link.replace_tags(tags)
                tasks.append(asyncio.create_task(uow.invite_links.update_link(link)))

            await asyncio.gather(*tasks)

        return links

1 Answer 1

0

Uncomment await self._session.begin() in your __aenter__ method. This explicitly starts a transaction making the UoW's behavior predictable, preventing implicit transaction starts later.

The asyncio.gather pattern is incompatible with your UoW and session management. Since all update_link calls rely on the same session they must be executed sequentially within the UoW's transaction.

You should modify the for loop to iterate and await each update sequentially, ensuring operations happen within the same transaction.

@dataclass
class ReplaceTagsUseCase(BaseUseCase):
    uow: BaseUnitOfWork

    async def execute(self, command: ReplacingTagsCommand) -> list[InviteLink]:
        LIMIT = 300
        if len(command.links) > LIMIT:
            raise InviteLinkTagsReplaceException(limit=LIMIT)

        async with self.uow as uow:
            links_map = {link.id: link for link in command.links}
            links_ids = list(links_map.keys())
            tag_ids = {tag_id for link in command.links for tag_id in link.tags}

            links, _ = await uow.invite_links.get_all_links(
                InviteLinksFilter(
                    links_ids=links_ids,
                    channel_id=command.channel_id,
                    page_size=LIMIT
                )
            )
            if not links:
                return []

            all_tags, _ = await uow.tags.get_all_tags(
                TagsFilter(
                    channel_id=command.channel_id,
                    tags_ids=tag_ids
                )
            )

            tag_map = {tag.id: tag for tag in all_tags}

            for link in links:
                tags = [tag_map[tag_id] for tag_id in links_map[link.id].tags if tag_id in tag_map]
                link.replace_tags(tags)
                await uow.invite_links.update_link(link)

        return links
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.