class SessionTakeOverAPIView(generics.GenericAPIView):
"""
This API view allows a human or AI to take over a chat session.
The view handles session takeover validation, updates the session state,
and broadcasts relevant events to the chat group.
A POST request is used to trigger either a human or AI takeover of a session.
Authentication is required to access this view.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.room_group_name = None
permission_classes = [BotUserHasRequiredPermissionForMethod]
post_permission_required = ['session.reply_session']
queryset = Session.objects.select_related('bot').all()
serializer_class = SessionTakeOverSerializer
def get_object(self):
"""
Retrieves the session object based on the session_id provided in the request data.
Raises a 404 error if the session is not found.
"""
try:
return super().get_queryset().get(session_id=self.request.data.get('session_id'))
except Session.DoesNotExist:
raise Http404 # Return 404 error if session not found
def handle_human_take_over(self):
"""
Handles the logic when a human takes over the chat session. It performs the following:
- Validates if the session is already taken over by another human.
- Updates the session to reflect the current user as the human taking over.
- Sends a message to the chat group about the takeover.
- Creates a log entry for the takeover asynchronously.
"""
request = self.request
session: Session = self.get_object()
# Check if the session is already taken over by another human
if session.is_session_currently_taken_over:
raise ValidationError({
"detail" :f"Session is currently taken over by {session.currently_taken_over_by.get_full_name}"})
# prepare the takeover message for the bot
take_over_msg = f"{session.bot.human_take_over_msg} {request.user.get_full_name}"
# Prepare the response message to send to the group
response = {
"event_type": EventTypeChoicesChat.HUMAN_TAKEOVER,
"user_type": "client",
"sender_icon": request.user.profile_icon_url,
"chat_data": {
"msg": take_over_msg,
"recorded_audio_msg_link": None,
"images_link": [],
"files_link": []
}
}
# Update the session state to reflect the human takeover
session.currently_taken_over_by = request.user
session.is_session_currently_taken_over = True
session.is_ai_active = False # AI is no longer in control
session.save(
update_fields=[
'currently_taken_over_by',
'is_session_currently_taken_over',
'is_ai_active'
]
)
transaction.on_commit(
lambda: send_data_to_channel_layer(
self.room_group_name,
'send_chat_message',
response
)
)
# Create an entry to log the session takeover action asynchronously
create_session_take_over_info.delay(session.id, request.user.id)
def handle_ai_take_over(self):
"""
Handles the logic when an AI takes over the chat session. It performs the following:
- Validates if AI is already active in the session.
- Updates the session to reflect AI control and sends a message to the chat group.
"""
session: Session = self.get_object()
# Check if AI is already active in the session
if session.is_ai_active:
raise ValidationError({"detail": "AI is already in action"})
# Prepare the AI takeover message from the bot
take_over_msg = session.bot.ai_take_over_msg
# Prepare the response message to send to the chat room group
response = {
"event_type": EventTypeChoicesChat.AI_TAKE_OVER,
"user_type": "ai",
"sender_icon": session.bot.bot_icon,
"chat_data": {
"msg": take_over_msg,
"recorded_audio_msg_link": None,
"images_link": [],
"files_link": []
}
}
# Update the session state to reflect AI takeover
session.currently_taken_over_by = None # No human is in control
session.is_session_currently_taken_over = False
session.is_ai_active = True # AI is now in control
session.save(
update_fields=[
'currently_taken_over_by',
'is_session_currently_taken_over',
'is_ai_active'
]
)
transaction.on_commit(
lambda: send_data_to_channel_layer(
self.room_group_name,
'send_chat_message',
response
)
)
def post(self, request, *args, **kwargs):
"""
Handles the POST request to take over the session (either by human or AI).
It validates the session ID and checks if the session is active before
allowing a human or AI to take over.
"""
serializer = self.serializer_class(data=request.data)
serializer.is_valid(raise_exception=True)
# Determine the type of takeover (human or AI) based on request data
take_over_type = serializer.validated_data.get('take_over_type')
self.room_group_name = constants.CHAT_SESSION_ROOM_GROUP_NAME.format(
session_id=request.data.get('session_id')
)
session: Session = self.get_object()
if session.session_has_ended:
raise ValidationError({"detail" :"Session is not active."})
with transaction.atomic():
if take_over_type == EventTypeChoicesChat.HUMAN_TAKEOVER:
self.handle_human_take_over() # Handle human takeover
elif take_over_type == EventTypeChoicesChat.AI_TAKE_OVER:
self.handle_ai_take_over() # Handle AI takeover
else:
raise ValidationError({"detail": "Invalid takeover type."})
return Response({"message": "Take over succeeded"})
this is an api to update my session model.
now here is the consumer
class ChatConsumer(AsyncWebsocketConsumer):
""" WebSocket consumer for handling chat messages in a specific session."""
async def connect(self):
"""Handle WebSocket connection initialization."""
self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
self.room_group_name = constants.CHAT_SESSION_ROOM_GROUP_NAME.format(session_id=self.room_name)
session = await self.__get_session({"session_id": self.room_name, "session_has_ended": False})
if not session:
logger.info(f"Session [{self.room_name}] expired or does not exist")
# close the connection
await self.close()
return
self.session = session
await self.__set_attributes()
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
await self.accept()
# Verify connection conditions
if not await self.__validate_connection():
await self.__send_validation_failed_message()
return
# Send intro message if no messages exist
if not await database_sync_to_async(lambda: session.has_messages)():
await self.__send_intro_message()
async def __set_attributes(self):
self.org = self.session.org
self.bot = self.session.bot
self.platform = self.bot.platform
self.language = self.bot.language
self.ai_response_url = settings.AI_BASE_URL + f'/api/v1.0/ai/conversation/{self.org.id}/{self.bot.id}'
self.goal = 'you are an intelligent customer service agent'
self.customer_name = self.session.user_name if self.session.user_name else ""
self.customer_email = self.session.user_email if self.session.user_email else ""
self.user_type = None
self.user = self.scope["user"] if not self.scope["user"].is_anonymous else None
@database_sync_to_async
def __get_session(self, filter_kwargs: dict):
"""Fetch a session with related data."""
try:
session = Session.objects.select_related(
"bot",
"org",
"bot__language",
"bot__platform"
).get(**filter_kwargs)
return session
except Exception as err:
logger.error(f"Error fetching session: {err}")
return None
now when i am connected to my consumer then if i try to use the api that i have given above doesn't actually updates the database but if i am not connected to my consumer then the api works
more details is if i try to update those fields via django admin while connected to the consumer the update works fine but it doesn't work with the api i created
and the most unexpected scenario is if i remove `self.org` from the consumer then the api works, something is related to the organization model that i am trying to fetch
if i remove the part where i am sending a message to the channel layer via the api then the update works
i tried background task but didn't work as well