0

I can't find an example implementation of a logging interceptor for python that measures the time it takes to process a request.

I want to measure how long it takes to process a request on the server side. I tried to write my own logging interceptor without success.

It seems to me that continuation doesnt wait for the request to be completed.

Here is my .proto file

syntax = "proto3";

package auth;

service SampleService {
  rpc Hello(HelloRequest) returns (HelloResponse);
}

message HelloRequest{
  string Name = 1;
}

message HelloResponse{
  string Greeting = 1;
}

grpc.py

import asyncio
from typing import Callable, Awaitable

import grpc

from pkg.py.gen import service_pb2
from pkg.py.gen import service_pb2_grpc
from ..logger import get_logger
from ..services import Service


class SampleService(service_pb2_grpc.SampleServiceServicer):
    def __init__(self) -> None:
        super().__init__()

    async def Hello(self, request: service_pb2.HelloRequest, _) -> service_pb2.HelloResponse:
        await asyncio.sleep(1)
        return service_pb2.HelloResponse(Greeting=Service.hello(request.Name))


class LoggingServerInterceptor(grpc.aio.ServerInterceptor):
    async def intercept_service(
            self,
            continuation: Callable[
                [grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]
            ],
            handler_call_details: grpc.HandlerCallDetails,
    ) -> grpc.RpcMethodHandler:
        ...

app.py

from abc import ABC, abstractmethod
from concurrent import futures

import grpc.aio

from pkg.py.gen import service_pb2_grpc
from ..config import Config
from ..logger import get_logger
from ..transport import SampleService, LoggingServerInterceptor


class AbstractApp(ABC):
    @abstractmethod
    async def run(self) -> None:
        ...

    @abstractmethod
    async def stop(self) -> None:
        ...


class App(AbstractApp):
    def __init__(self):
        self.server = grpc.aio.server(
            futures.ThreadPoolExecutor(max_workers=10),
            interceptors=[LoggingServerInterceptor()]
        )
        service_pb2_grpc.add_SampleServiceServicer_to_server(
            SampleService(), self.server
        )
        address = f"{Config().HOST}:{Config().PORT}"
        logger = get_logger()
        logger.info(f"Listening on {address}")
        self.server.add_insecure_port(address)

    async def run(self) -> None:
        await self.server.start()
        await self.server.wait_for_termination()

    async def stop(self) -> None:
        logger = get_logger()
        logger.info("Shutting down in progress")

main.py

import asyncio
import sys
from pathlib import Path

root = Path(__file__).resolve().parents[2].__str__()  # noqa: E402
sys.path.append(root)  # noqa: E402

from internal.config import Config
from internal.logger import get_logger, setup_logger
from internal.app.app import App


def main() -> None:
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    app = App()
    logger = get_logger()

    loop.create_task(app.run())

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        logger.info("Gracefully shutting down")
        loop.run_until_complete(app.stop())
        loop.close()
        logger.info("Shut down completed")


if __name__ == '__main__':
    main()

1 Answer 1

0

Found that lib.

import time
from typing import Callable, Any

import grpc
from grpc_interceptor.server import AsyncServerInterceptor


class RequestLoggingInterceptor(AsyncServerInterceptor):
    async def intercept(
            self,
            method: Callable,
            request: Any,
            context: grpc.ServicerContext,
            method_name: str,
    ) -> Any:
        start_time = time.monotonic()

        try:
            return await method(request, context)
        finally:
            request_time = time.monotonic() - start_time
            get_logger().info(
                f"Request {method_name} - duration {request_time}s"
            )
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.