0

This is the debug shows when i run my script

2025-05-16 13:39:06,951 [INFO] [('106.209.213.128', 61576)] New connection
2025-05-16 13:39:06,991 [INFO] [('106.209.213.128', 61576)] Login: 868996035386572
2025-05-16 13:39:17,066 [INFO] [('106.209.197.170', 55354)] New connection
2025-05-16 13:39:17,104 [INFO] [('106.209.197.170', 55354)] Login: 868996035384833
2025-05-16 13:39:20,392 [INFO] [('106.209.207.165', 52347)] Location: 10.942661,75.906053
2025-05-16 13:39:20,739 [INFO] ✅ Upserted Firestore doc for IMEI 868996035386465
^C2025-05-16 13:39:40,157 [WARNING] [('106.209.199.249', 54242)] Connection error: [Errno 104] Connection reset by peer
2025-05-16 13:39:40,157 [INFO] [('106.209.199.249', 54242)] Connection closed
2025-05-16 13:39:40,709 [INFO] [('106.209.213.128', 61576)] Location: 10.977890,75.904240
2025-05-16 13:39:41,050 [INFO] ✅ Upserted Firestore doc for IMEI 868996035386572
2025-05-16 13:39:41,618 [INFO] [('106.209.204.124', 53850)] New connection
2025-05-16 13:39:42,633 [INFO] [('106.209.204.124', 53850)] Login: 868996035386465
^C2025-05-16 13:39:44,331 [INFO] [('106.209.197.170', 55354)] Connection closed
2025-05-16 13:39:44,331 [INFO] [('106.209.207.165', 52347)] Connection closed
2025-05-16 13:39:44,332 [INFO] [('106.209.204.124', 53850)] Connection closed
2025-05-16 13:39:44,332 [INFO] [('106.209.213.128', 61576)] Connection closed
2025-

i want to receive gps location packets from multiple GT06 devices in same port, i tried my script to run that but the connection between devices and my server will losts after receiving the location packet, what is the solution to keep the connection between them,

here is the code i am using now

import asyncio
import logging
import socket
import firebase_admin
from firebase_admin import credentials, firestore
import datetime
import pytz
import struct

# Constants
IDLE_TIMEOUT = 900         # seconds for application-level idle timeout
BACKLOG = 100              # server backlog for pending connections
TCP_KEEP_IDLE = 60         # seconds of idle before starting keepalive probes
TCP_KEEP_INTERVAL = 10     # seconds between keepalive probes
TCP_KEEP_PROBES = 3        # number of keepalive probes before declaring dead

# Initialize Firebase Admin SDK
cred = credentials.Certificate('serviceAccountKey.json')
firebase_admin.initialize_app(cred)

# Firestore Client
db = firestore.client()

active_connections = {}  # addr -> (writer, last_seen_timestamp)


def crc_itu(data: bytes) -> int:
    crc = 0xFFFF
    for byte in data:
        crc ^= byte
        for _ in range(8):
            if crc & 0x0001:
                crc >>= 1
                crc ^= 0x8408
            else:
                crc >>= 1
    crc ^= 0xFFFF
    return crc & 0xFFFF


def decode_imei(bcd: bytes) -> str:
    return ''.join(f"{(b >> 4) & 0x0F}{b & 0x0F}" for b in bcd).lstrip('0')


def decode_timestamp(data: bytes) -> datetime.datetime:
    if len(data) != 6:
        return datetime.datetime.now(pytz.utc)
    try:
        return datetime.datetime(
            2000 + data[0], data[1], data[2],
            data[3], data[4], data[5], tzinfo=pytz.utc
        )
    except ValueError:
        return datetime.datetime.now(pytz.utc)


async def save_to_firestore(imei: str, timestamp_obj: datetime.datetime,
                             lat: float, lon: float, speed: int, addr: tuple):
    """
    Use the IMEI as the document ID and perform a single upsert with merge=True to
    avoid round trips and reduce latency.
    """
    try:
        ist_timezone = pytz.timezone('Asia/Kolkata')
        timestamp_ist = timestamp_obj.astimezone(ist_timezone)
        live_time = timestamp_ist.strftime("%I:%M %p")

        # Directly use IMEI as doc ID, upsert with merge to avoid querying
        doc_ref = db.collection('Busdata').document(imei)
        data = {
            'client_ip': addr[0],
            'last_seen': firestore.SERVER_TIMESTAMP,
            'liveLocations': {
                'LiveTime': live_time,
                'latitude': lat,
                'longitude': lon,
                'timestamp': timestamp_ist.isoformat(),
                'speed': speed
            }
        }
        # Single RPC: set with merge (creates or updates)
        await asyncio.to_thread(lambda: doc_ref.set(data, merge=True))
        logging.info(f"✅ Upserted Firestore doc for IMEI {imei}")

    except Exception as e:
        logging.error(f"🔥 Firestore error: {e}", exc_info=True)


def configure_socket_keepalive(sock, idle=TCP_KEEP_IDLE,
                               interval=TCP_KEEP_INTERVAL,
                               count=TCP_KEEP_PROBES):
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
    if hasattr(socket, 'TCP_KEEPIDLE'):
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, idle)
    if hasattr(socket, 'TCP_KEEPINTVL'):
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
    if hasattr(socket, 'TCP_KEEPCNT'):
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, count)


async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    addr = writer.get_extra_info('peername')
    sock = writer.get_extra_info('socket')
    client_imei = None

    if sock:
        try:
            configure_socket_keepalive(sock)
        except Exception as e:
            logging.warning(f"[{addr}] Error setting keepalive: {e}")

    logging.info(f"[{addr}] New connection")
    active_connections[addr] = (writer, asyncio.get_event_loop().time())

    try:
        while True:
            try:
                start = await asyncio.wait_for(reader.readexactly(2), timeout=IDLE_TIMEOUT)
            except asyncio.TimeoutError:
                logging.info(f"[{addr}] Keep-alive timeout (>{IDLE_TIMEOUT}s)")
                break

            if start != b'\x78\x78':
                logging.warning(f"[{addr}] Invalid start bytes: {start.hex()}")
                writer.write(b'\x78\x78\x05\x01\xff\xff\x0d\x0a')
                await writer.drain()
                continue

            length_byte = await reader.readexactly(1)
            length = length_byte[0]

            if length < 10 or length > 255:
                logging.warning(f"[{addr}] Suspicious packet length: {length}")
                writer.write(b'\x78\x78\x05\x01\xff\xff\x0d\x0a')
                await writer.drain()
                continue

            packet = await reader.readexactly(length)
            stop = await reader.readexactly(2)

            if stop != b'\x0d\x0a':
                logging.warning(f"[{addr}] Invalid stop bytes: {stop.hex()}")
                writer.write(b'\x78\x78\x05\x01\xff\xff\x0d\x0a')
                await writer.drain()
                continue

            recv_crc = int.from_bytes(packet[-2:], 'little')  # Changed to little endian
            calc_crc = crc_itu(length_byte + packet[:-2])
            if calc_crc != recv_crc:
                logging.warning(f"[{addr}] CRC mismatch (calc: {calc_crc:04X}, recv: {recv_crc:04X})")
                continue

            proto = packet[0]
            serial = packet[-4:-2]
            data_content = packet[1:-4]
            active_connections[addr] = (writer, asyncio.get_event_loop().time())

            try:
                if proto == 0x01:  # Login
                    client_imei = decode_imei(data_content[:8])
                    logging.info(f"[{addr}] Login: {client_imei}")

                elif proto == 0x12 and client_imei:  # Location
                    if len(data_content) >= 16:
                        timestamp_obj = decode_timestamp(data_content[0:6])
                        lat = struct.unpack('>i', data_content[7:11])[0] / 1800000.0
                        lon = struct.unpack('>i', data_content[11:15])[0] / 1800000.0
                        speed = data_content[15]
                        logging.info(f"[{addr}] Location: {lat:.6f},{lon:.6f}")
                        await save_to_firestore(client_imei, timestamp_obj, lat, lon, speed, addr)

                elif proto == 0x13:  # Heartbeat
                    logging.debug(f"[{addr}] Heartbeat received")
                    pong = bytes([0x78,0x78,0x05,0x13]) + serial
                    crc_pong = crc_itu(pong[2:])
                    writer.write(pong + crc_pong.to_bytes(2,'big') + b'\x0d\x0a')
                    await writer.drain()

                # Send ACK
                ack = bytes([0x78,0x78,0x05,proto]) + serial
                crc_ack = crc_itu(ack[2:])
                writer.write(ack + crc_ack.to_bytes(2,'big') + b'\x0d\x0a')
                await writer.drain()

            except Exception as e:
                logging.error(f"[{addr}] Processing error: {e}", exc_info=True)

    except (ConnectionResetError, TimeoutError) as e:
        logging.warning(f"[{addr}] Connection error: {e}")

    finally:
        active_connections.pop(addr, None)
        try:
            writer.close()
            await writer.wait_closed()
        except Exception:
            pass
        logging.info(f"[{addr}] Connection closed")


async def sweep_idle_connections():
    while True:
        now = asyncio.get_event_loop().time()
        for addr, (writer, last_seen) in list(active_connections.items()):
            if now - last_seen > IDLE_TIMEOUT:
                logging.info(f"Sweeper: closing idle {addr}")
                try:
                    writer.close()
                    await writer.wait_closed()
                except:
                    pass
                active_connections.pop(addr, None)
        await asyncio.sleep(IDLE_TIMEOUT / 2)


async def main():
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(message)s",
    )
    server = await asyncio.start_server(
        handle_client,
        host='0.0.0.0',
        port=5023,
        reuse_port=True,
        backlog=BACKLOG,
    )

    logging.info(f"Server started on {server.sockets[0].getsockname()}")
    asyncio.create_task(sweep_idle_connections())
    async with server:
        await server.serve_forever()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logging.info("Server shutdown requested")

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.