This is the debug shows when i run my script
2025-05-16 13:39:06,951 [INFO] [('106.209.213.128', 61576)] New connection
2025-05-16 13:39:06,991 [INFO] [('106.209.213.128', 61576)] Login: 868996035386572
2025-05-16 13:39:17,066 [INFO] [('106.209.197.170', 55354)] New connection
2025-05-16 13:39:17,104 [INFO] [('106.209.197.170', 55354)] Login: 868996035384833
2025-05-16 13:39:20,392 [INFO] [('106.209.207.165', 52347)] Location: 10.942661,75.906053
2025-05-16 13:39:20,739 [INFO] ✅ Upserted Firestore doc for IMEI 868996035386465
^C2025-05-16 13:39:40,157 [WARNING] [('106.209.199.249', 54242)] Connection error: [Errno 104] Connection reset by peer
2025-05-16 13:39:40,157 [INFO] [('106.209.199.249', 54242)] Connection closed
2025-05-16 13:39:40,709 [INFO] [('106.209.213.128', 61576)] Location: 10.977890,75.904240
2025-05-16 13:39:41,050 [INFO] ✅ Upserted Firestore doc for IMEI 868996035386572
2025-05-16 13:39:41,618 [INFO] [('106.209.204.124', 53850)] New connection
2025-05-16 13:39:42,633 [INFO] [('106.209.204.124', 53850)] Login: 868996035386465
^C2025-05-16 13:39:44,331 [INFO] [('106.209.197.170', 55354)] Connection closed
2025-05-16 13:39:44,331 [INFO] [('106.209.207.165', 52347)] Connection closed
2025-05-16 13:39:44,332 [INFO] [('106.209.204.124', 53850)] Connection closed
2025-05-16 13:39:44,332 [INFO] [('106.209.213.128', 61576)] Connection closed
2025-
i want to receive gps location packets from multiple GT06 devices in same port, i tried my script to run that but the connection between devices and my server will losts after receiving the location packet, what is the solution to keep the connection between them,
here is the code i am using now
import asyncio
import logging
import socket
import firebase_admin
from firebase_admin import credentials, firestore
import datetime
import pytz
import struct
# Constants
IDLE_TIMEOUT = 900 # seconds for application-level idle timeout
BACKLOG = 100 # server backlog for pending connections
TCP_KEEP_IDLE = 60 # seconds of idle before starting keepalive probes
TCP_KEEP_INTERVAL = 10 # seconds between keepalive probes
TCP_KEEP_PROBES = 3 # number of keepalive probes before declaring dead
# Initialize Firebase Admin SDK
cred = credentials.Certificate('serviceAccountKey.json')
firebase_admin.initialize_app(cred)
# Firestore Client
db = firestore.client()
active_connections = {} # addr -> (writer, last_seen_timestamp)
def crc_itu(data: bytes) -> int:
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc >>= 1
crc ^= 0x8408
else:
crc >>= 1
crc ^= 0xFFFF
return crc & 0xFFFF
def decode_imei(bcd: bytes) -> str:
return ''.join(f"{(b >> 4) & 0x0F}{b & 0x0F}" for b in bcd).lstrip('0')
def decode_timestamp(data: bytes) -> datetime.datetime:
if len(data) != 6:
return datetime.datetime.now(pytz.utc)
try:
return datetime.datetime(
2000 + data[0], data[1], data[2],
data[3], data[4], data[5], tzinfo=pytz.utc
)
except ValueError:
return datetime.datetime.now(pytz.utc)
async def save_to_firestore(imei: str, timestamp_obj: datetime.datetime,
lat: float, lon: float, speed: int, addr: tuple):
"""
Use the IMEI as the document ID and perform a single upsert with merge=True to
avoid round trips and reduce latency.
"""
try:
ist_timezone = pytz.timezone('Asia/Kolkata')
timestamp_ist = timestamp_obj.astimezone(ist_timezone)
live_time = timestamp_ist.strftime("%I:%M %p")
# Directly use IMEI as doc ID, upsert with merge to avoid querying
doc_ref = db.collection('Busdata').document(imei)
data = {
'client_ip': addr[0],
'last_seen': firestore.SERVER_TIMESTAMP,
'liveLocations': {
'LiveTime': live_time,
'latitude': lat,
'longitude': lon,
'timestamp': timestamp_ist.isoformat(),
'speed': speed
}
}
# Single RPC: set with merge (creates or updates)
await asyncio.to_thread(lambda: doc_ref.set(data, merge=True))
logging.info(f"✅ Upserted Firestore doc for IMEI {imei}")
except Exception as e:
logging.error(f"🔥 Firestore error: {e}", exc_info=True)
def configure_socket_keepalive(sock, idle=TCP_KEEP_IDLE,
interval=TCP_KEEP_INTERVAL,
count=TCP_KEEP_PROBES):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if hasattr(socket, 'TCP_KEEPIDLE'):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, idle)
if hasattr(socket, 'TCP_KEEPINTVL'):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
if hasattr(socket, 'TCP_KEEPCNT'):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, count)
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
addr = writer.get_extra_info('peername')
sock = writer.get_extra_info('socket')
client_imei = None
if sock:
try:
configure_socket_keepalive(sock)
except Exception as e:
logging.warning(f"[{addr}] Error setting keepalive: {e}")
logging.info(f"[{addr}] New connection")
active_connections[addr] = (writer, asyncio.get_event_loop().time())
try:
while True:
try:
start = await asyncio.wait_for(reader.readexactly(2), timeout=IDLE_TIMEOUT)
except asyncio.TimeoutError:
logging.info(f"[{addr}] Keep-alive timeout (>{IDLE_TIMEOUT}s)")
break
if start != b'\x78\x78':
logging.warning(f"[{addr}] Invalid start bytes: {start.hex()}")
writer.write(b'\x78\x78\x05\x01\xff\xff\x0d\x0a')
await writer.drain()
continue
length_byte = await reader.readexactly(1)
length = length_byte[0]
if length < 10 or length > 255:
logging.warning(f"[{addr}] Suspicious packet length: {length}")
writer.write(b'\x78\x78\x05\x01\xff\xff\x0d\x0a')
await writer.drain()
continue
packet = await reader.readexactly(length)
stop = await reader.readexactly(2)
if stop != b'\x0d\x0a':
logging.warning(f"[{addr}] Invalid stop bytes: {stop.hex()}")
writer.write(b'\x78\x78\x05\x01\xff\xff\x0d\x0a')
await writer.drain()
continue
recv_crc = int.from_bytes(packet[-2:], 'little') # Changed to little endian
calc_crc = crc_itu(length_byte + packet[:-2])
if calc_crc != recv_crc:
logging.warning(f"[{addr}] CRC mismatch (calc: {calc_crc:04X}, recv: {recv_crc:04X})")
continue
proto = packet[0]
serial = packet[-4:-2]
data_content = packet[1:-4]
active_connections[addr] = (writer, asyncio.get_event_loop().time())
try:
if proto == 0x01: # Login
client_imei = decode_imei(data_content[:8])
logging.info(f"[{addr}] Login: {client_imei}")
elif proto == 0x12 and client_imei: # Location
if len(data_content) >= 16:
timestamp_obj = decode_timestamp(data_content[0:6])
lat = struct.unpack('>i', data_content[7:11])[0] / 1800000.0
lon = struct.unpack('>i', data_content[11:15])[0] / 1800000.0
speed = data_content[15]
logging.info(f"[{addr}] Location: {lat:.6f},{lon:.6f}")
await save_to_firestore(client_imei, timestamp_obj, lat, lon, speed, addr)
elif proto == 0x13: # Heartbeat
logging.debug(f"[{addr}] Heartbeat received")
pong = bytes([0x78,0x78,0x05,0x13]) + serial
crc_pong = crc_itu(pong[2:])
writer.write(pong + crc_pong.to_bytes(2,'big') + b'\x0d\x0a')
await writer.drain()
# Send ACK
ack = bytes([0x78,0x78,0x05,proto]) + serial
crc_ack = crc_itu(ack[2:])
writer.write(ack + crc_ack.to_bytes(2,'big') + b'\x0d\x0a')
await writer.drain()
except Exception as e:
logging.error(f"[{addr}] Processing error: {e}", exc_info=True)
except (ConnectionResetError, TimeoutError) as e:
logging.warning(f"[{addr}] Connection error: {e}")
finally:
active_connections.pop(addr, None)
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
logging.info(f"[{addr}] Connection closed")
async def sweep_idle_connections():
while True:
now = asyncio.get_event_loop().time()
for addr, (writer, last_seen) in list(active_connections.items()):
if now - last_seen > IDLE_TIMEOUT:
logging.info(f"Sweeper: closing idle {addr}")
try:
writer.close()
await writer.wait_closed()
except:
pass
active_connections.pop(addr, None)
await asyncio.sleep(IDLE_TIMEOUT / 2)
async def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
server = await asyncio.start_server(
handle_client,
host='0.0.0.0',
port=5023,
reuse_port=True,
backlog=BACKLOG,
)
logging.info(f"Server started on {server.sockets[0].getsockname()}")
asyncio.create_task(sweep_idle_connections())
async with server:
await server.serve_forever()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Server shutdown requested")