0
class SessionTakeOverAPIView(generics.GenericAPIView):
    """
    This API view allows a human or AI to take over a chat session.
    The view handles session takeover validation, updates the session state, 
    and broadcasts relevant events to the chat group.
    
    A POST request is used to trigger either a human or AI takeover of a session.

    Authentication is required to access this view.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.room_group_name = None

    permission_classes = [BotUserHasRequiredPermissionForMethod]
    post_permission_required = ['session.reply_session']
    queryset = Session.objects.select_related('bot').all()
    serializer_class = SessionTakeOverSerializer

    def get_object(self):
        """
        Retrieves the session object based on the session_id provided in the request data.
        Raises a 404 error if the session is not found.
        """
        try:
            return super().get_queryset().get(session_id=self.request.data.get('session_id'))
        except Session.DoesNotExist:
            raise Http404  # Return 404 error if session not found


    def handle_human_take_over(self):
        """
        Handles the logic when a human takes over the chat session. It performs the following:
        - Validates if the session is already taken over by another human.
        - Updates the session to reflect the current user as the human taking over.
        - Sends a message to the chat group about the takeover.
        - Creates a log entry for the takeover asynchronously.
        """
        request = self.request
        session: Session = self.get_object()

        # Check if the session is already taken over by another human
        if session.is_session_currently_taken_over:
            raise ValidationError({
                "detail" :f"Session is currently taken over by {session.currently_taken_over_by.get_full_name}"})

        # prepare the takeover message for the bot
        take_over_msg = f"{session.bot.human_take_over_msg} {request.user.get_full_name}"

        # Prepare the response message to send to the group
        response = {
            "event_type": EventTypeChoicesChat.HUMAN_TAKEOVER,
            "user_type": "client",
            "sender_icon": request.user.profile_icon_url,

            "chat_data": {
                "msg": take_over_msg,
                "recorded_audio_msg_link": None,
                "images_link": [],
                "files_link": []
            }
        }

        # Update the session state to reflect the human takeover
        session.currently_taken_over_by = request.user
        session.is_session_currently_taken_over = True
        session.is_ai_active = False  # AI is no longer in control

        session.save(
            update_fields=[
                'currently_taken_over_by',
                'is_session_currently_taken_over',
                'is_ai_active'
            ]
        )

        transaction.on_commit(
            lambda: send_data_to_channel_layer(
                self.room_group_name, 
                'send_chat_message', 
                response
            )
        )

        # Create an entry to log the session takeover action asynchronously
        create_session_take_over_info.delay(session.id, request.user.id)

    def handle_ai_take_over(self):
        """
        Handles the logic when an AI takes over the chat session. It performs the following:
        - Validates if AI is already active in the session.
        - Updates the session to reflect AI control and sends a message to the chat group.
        """
        session: Session = self.get_object()

        # Check if AI is already active in the session
        if session.is_ai_active:
            raise ValidationError({"detail": "AI is already in action"})

        # Prepare the AI takeover message from the bot
        take_over_msg = session.bot.ai_take_over_msg

        # Prepare the response message to send to the chat room group
        response = {
            "event_type": EventTypeChoicesChat.AI_TAKE_OVER,
            "user_type": "ai",
            "sender_icon": session.bot.bot_icon,

            "chat_data": {
                "msg": take_over_msg,
                "recorded_audio_msg_link": None,
                "images_link": [],
                "files_link": []
            }
        }

        # Update the session state to reflect AI takeover
        session.currently_taken_over_by = None  # No human is in control
        session.is_session_currently_taken_over = False
        session.is_ai_active = True  # AI is now in control
        
        session.save(
            update_fields=[
                'currently_taken_over_by',
                'is_session_currently_taken_over',
                'is_ai_active'
            ]
        )

        transaction.on_commit(
            lambda: send_data_to_channel_layer(
                self.room_group_name, 
                'send_chat_message', 
                response
            )
        )


    def post(self, request, *args, **kwargs):
        """
        Handles the POST request to take over the session (either by human or AI).
        It validates the session ID and checks if the session is active before 
        allowing a human or AI to take over.
        """

        serializer = self.serializer_class(data=request.data)
        serializer.is_valid(raise_exception=True)

        # Determine the type of takeover (human or AI) based on request data
        take_over_type = serializer.validated_data.get('take_over_type')
        self.room_group_name = constants.CHAT_SESSION_ROOM_GROUP_NAME.format(
            session_id=request.data.get('session_id')
        )  

        session: Session = self.get_object()
        if session.session_has_ended:
            raise ValidationError({"detail" :"Session is not active."})

        with transaction.atomic():
            if take_over_type == EventTypeChoicesChat.HUMAN_TAKEOVER:
                self.handle_human_take_over()  # Handle human takeover
            elif take_over_type == EventTypeChoicesChat.AI_TAKE_OVER:
                self.handle_ai_take_over()  # Handle AI takeover
            else:
                raise ValidationError({"detail": "Invalid takeover type."})

        return Response({"message": "Take over succeeded"})

this is an api to update my session model.

now here is the consumer

class ChatConsumer(AsyncWebsocketConsumer):
    """ WebSocket consumer for handling chat messages in a specific session."""

    async def connect(self):
        """Handle WebSocket connection initialization."""
        self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
        self.room_group_name = constants.CHAT_SESSION_ROOM_GROUP_NAME.format(session_id=self.room_name)

        session = await self.__get_session({"session_id": self.room_name, "session_has_ended": False})
        if not session:
            logger.info(f"Session [{self.room_name}] expired or does not exist")
            # close the connection
            await self.close()
            return

        self.session = session

        await self.__set_attributes()
        await self.channel_layer.group_add(self.room_group_name, self.channel_name)
        await self.accept()

        # Verify connection conditions
        if not await self.__validate_connection():
            await self.__send_validation_failed_message()
            return

        # Send intro message if no messages exist
        if not await database_sync_to_async(lambda: session.has_messages)():
            await self.__send_intro_message()


    async def __set_attributes(self):
        self.org = self.session.org
        self.bot = self.session.bot
        self.platform = self.bot.platform
        self.language = self.bot.language
        self.ai_response_url = settings.AI_BASE_URL + f'/api/v1.0/ai/conversation/{self.org.id}/{self.bot.id}'
        self.goal = 'you are an intelligent customer service agent'
        self.customer_name = self.session.user_name if self.session.user_name else ""
        self.customer_email = self.session.user_email if self.session.user_email else ""
        self.user_type = None
        self.user = self.scope["user"] if not self.scope["user"].is_anonymous else None
    @database_sync_to_async
    def __get_session(self, filter_kwargs: dict):
        """Fetch a session with related data."""
        try:
            session = Session.objects.select_related(
                "bot",
                "org",
                "bot__language", 
                "bot__platform"
            ).get(**filter_kwargs)
            return session
        except Exception as err:
            logger.error(f"Error fetching session: {err}")
            return None

now when i am connected to my consumer then if i try to use the api that i have given above doesn't actually updates the database but if i am not connected to my consumer then the api works

more details is if i try to update those fields via django admin while connected to the consumer the update works fine but it doesn't work with the api i created

and the most unexpected scenario is if i remove `self.org` from the consumer then the api works, something is related to the organization model that i am trying to fetch

if i remove the part where i am sending a message to the channel layer via the api then the update works

i tried background task but didn't work as well

2
  • Can you give more details about how your API server and websocket server are running? Are they running out of the same service? (i.e. one Django app holds both the API and the websocket server) If they are, it is likely the only a single worker is available to manage both the API and websockets, and once a websocket client has connected to the websocket server, the single worker is tied up and unable to receive incoming API requests Commented Jun 2 at 20:41
  • I am using gunicorn for wsgi and daphne for asgi and differenciated the proxy pass by usign nginx. in the consumer class there is a method named send_message and i used a Session save operation there and when i removed this operation now everything works fine Commented Jun 10 at 6:06

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.