-1

I am currently working on implementing a toy store server using Python, employing socket connections and a custom thread pool for handling concurrent client requests. However, I'm facing some challenges with managing concurrent requests efficiently.

Server Component: The server listens for incoming client requests over sockets. It takes the name of a toy as an argument and returns the dollar price of the item if it's in stock. If the item is not found, it returns -1, and if the item is found but not in stock, it returns 0.

A custom thread pool is implemented to handle multiple client connections concurrently.

Client Component:

The client connects to the server using a socket connection and issues random toy requests.

I've implemented a custom thread pool to manage client requests, but I'm unsure if it's handling concurrent requests optimally. The server does not receive any further requests after the first client request.

Server code:

import socket
import json
from threading import Thread, Lock
from collections import deque

class Server():
    def __init__(self, host, port, num_threads):
        self.items = {
        "tux": {
            "qty": 100,
            "cost": 25.99
        },
        "whale": {
            "qty": 100,
            "cost": 19.99
        }
    }
        self.lock = Lock()
        self.request_queue = deque([])
        self.thread_pool = [Thread(target=self.serve_request) for _ in range(num_threads)]
        self.s = socket.socket()
        self.s.bind((host, port))
        print("socket binded to port", port)
        self.s.listen(5)
        print("socket is listening")

    def get_item(self, item):
        with self.lock:
            if item not in self.items:
                return -1
            if self.items[item]['qty'] == 0:
                return 0
            self.items[item]['qty'] -= 1
            return self.items[item]['cost']

    def add_request(self, req):
        data = req.recv(4096)
        data = json.loads(data.decode('utf-8'))
        cost = self.get_item(data['query'])
        self.request_queue.append([req, cost])

    def serve_request(self):
        while True:
            if self.request_queue:
                req, cost = self.request_queue.popleft()
                print("cost: ", cost)
                req.send(str(cost).encode('utf-8'))

    def run(self):
        req, addr = self.s.accept()
        for thread in self.thread_pool:
            thread.start()
        while True:
            self.add_request(req)
            print("request_queue: ", self.request_queue)

host = "127.0.0.1"
port = 12345
server = Server(host, port, 100)
server.run()

Client code:

import socket
import json
import random
def main():
    host = "127.0.0.1"
    port = 12345
    s = socket.socket()
    s.connect((host, port))
    while True:
        toys = ["tux", "whale"]
        choice = random.choice(toys)
        message = {"query": str(choice)}
        serialzed_message = json.dumps(message)
        print("requesting: ", choice)
        s.send(serialzed_message.encode('utf-8'))
        data = s.recv(4096)
        print("Server replied: {}".format(str(data.decode('utf-8'))))

if __name__ == "__main__":
    main()
4
  • 1
    In Server.run the req, addr = self.s.accept() must be in the while-loop to continually accept new requests. Commented Feb 24, 2024 at 18:18
  • Hey! Thanks, I’ll try doing this once I get back. Out rn Commented Feb 24, 2024 at 18:57
  • @MichaelButscher that didn’t work :( Commented Feb 24, 2024 at 22:58
  • 1
    Please, show (add to the question) a code which faithfully attempts to use sockets. The code currently presented in the post doesn't fit for that purpose: As stated by @MichaelButscher, for process several connections to a server, accept() calls should be issued in a loop. If you don't know how to use sockets at basic level, then there are plenty examples in the net. You may find an example even in the Python documentation. Commented Feb 25, 2024 at 12:56

2 Answers 2

1

Let's look at the client. You need to create a new socket for each request since once the server processes a request and returns a response it "forgets" the client's socket. In this case the client socket should first be closed.

Client Code

import socket
import json
import random

def main():
    host = "127.0.0.1"
    port = 12345
    # Just run 5 requests and then terminate:
    for _ in range(5):
        s = socket.socket()
        s.connect((host, port))
        with s:
            toys = ["tux", "whale"]
            choice = random.choice(toys)
            message = {"query": choice}
            serialzed_message = json.dumps(message)
            print("requesting: ", choice)
            s.send(serialzed_message.encode('utf-8'))
            data = s.recv(4096)
            print("Server replied: {}".format(str(data.decode('utf-8'))))

if __name__ == "__main__":
    main()

As far as the server goes, it is not handling requests correctly because you are only issuing req, addr = self.s.accept() once. Function run should be:

    def run(self):
        for thread in self.thread_pool:
            thread.start()
        while True:
            req, addr = self.s.accept()
            with req:  # So that the client socket is closed automatically
                self.add_request(req)
                print("request_queue: ", self.request_queue)

You also have all your request-handling server threads in a CPU-bound loop checking for new requests. You could have/should have used threading.Condition instances to wait for a non-empty queue. But far simpler is to just use a multithreading pool. Note that I have also allowed for termination of the server by hitting the Enter key.

Server Code

import socket
import json
from threading import Lock
from multiprocessing.pool import ThreadPool

class Server():
    def __init__(self, host, port, num_threads):
        self.items = {
        "tux": {
            "qty": 100,
            "cost": 25.99
        },
        "whale": {
            "qty": 100,
            "cost": 19.99
        }
    }
        self.lock = Lock()
        self.thread_pool = ThreadPool(num_threads)

    def get_item(self, item):
        with self.lock:
            if item not in self.items:
                return -1
            if self.items[item]['qty'] == 0:
                return 0
            self.items[item]['qty'] -= 1
            return self.items[item]['cost']

    def serve_request(self, client_socket):
        data = client_socket.recv(4096)
        data = json.loads(data.decode('utf-8'))
        cost = self.get_item(data['query'])
        print("cost: ", cost)
        with client_socket:
            client_socket.send(str(cost).encode('utf-8'))

    def server(self):
        self.s = socket.socket()
        self.s.bind((host, port))
        print("socket binded to port", port)
        self.s.listen(5)
        print("socket is listening")

        with self.s as server_socket:
            while True:
                client_socket, _ = server_socket.accept()
                self.thread_pool.apply_async(self.serve_request, args=(client_socket,))
                print('request queued')

    def run(self):
        # Run actual server in the pool so that we can wait for Enter key:
        self.thread_pool.apply_async(self.server)
        input('Hit Enter to terminate the server...\n\n')
        # Kill all outstanding requests
        print('Terminating...')
        self.thread_pool.terminate()


host = "127.0.0.1"
port = 12345
server = Server(host, port, 20)
server.run()

Update

If you do not want to use a multithreading pool, then your original code should be modified to use a Condition instance so that there is no wasted CPU cycles checking an empty queue:

Server Code

import socket
import json
from threading import Thread, Lock, Condition
from collections import deque

class Server():
    def __init__(self, host, port, num_threads):
        self.items = {
        "tux": {
            "qty": 100,
            "cost": 25.99
        },
        "whale": {
            "qty": 100,
            "cost": 19.99
        }
    }
        self.lock = Lock()
        self.request_queue = deque()
        self.thread_pool = [Thread(target=self.serve_request) for _ in range(num_threads)]
        self.running = True
        self.request_queue_condition = Condition()

    def get_item(self, item):
        with self.lock:
            if item not in self.items:
                return -1
            if self.items[item]['qty'] == 0:
                return 0
            self.items[item]['qty'] -= 1
            return self.items[item]['cost']

    def serve_request(self):
        while True:
            with self.request_queue_condition:
                # Wait for at least one request on queue or we are no longer running:
                self.request_queue_condition.wait_for(
                    lambda: not self.running or self.request_queue
                )
                if not self.running:
                    return
                client_socket = self.request_queue.popleft()

            data = client_socket.recv(4096)
            data = json.loads(data.decode('utf-8'))
            cost = self.get_item(data['query'])
            print("cost: ", cost)
            with client_socket:
                client_socket.send(str(cost).encode('utf-8'))

    def server(self):
        self.s = socket.socket()
        self.s.bind((host, port))
        print("socket binded to port", port)
        self.s.listen(5)
        print("socket is listening")

        with self.s as server_socket:
            while True:
                client_socket, _ = server_socket.accept()
                if not self.running:
                    return
                with self.request_queue_condition:
                    self.request_queue.append(client_socket)
                    self.request_queue_condition.notify(1) # Wake up one thread
                print('request queued')

    def run(self):
        for thread in self.thread_pool:
            thread.start()

        # Run actual server in the pool so that we can wait for Enter key:
        Thread(target=self.server, daemon=True).start()
        input('Hit Enter to terminate the server...\n\n')
        # Wait for current requests to terminate
        print('Terminating...')
        with self.request_queue_condition:
            self.running = False
            self.request_queue_condition.notify_all()

host = "127.0.0.1"
port = 12345
server = Server(host, port, 20)
server.run()

Client Code

For a better test, we are multithreading this:

import socket
import json
import random
from multiprocessing.pool import ThreadPool

def make_request():
    host = "127.0.0.1"
    port = 12345

    s = socket.socket()
    s.connect((host, port))
    with s:
        toys = ["tux", "whale"]
        choice = random.choice(toys)
        message = {"query": choice}
        serialzed_message = json.dumps(message)
        print("requesting: ", choice)
        s.send(serialzed_message.encode('utf-8'))
        data = s.recv(4096)
        print("Server replied: {}".format(str(data.decode('utf-8'))))

def main():
    pool = ThreadPool(5)
    for _ in range(5):
        pool.apply_async(make_request)
    # Wait for all tasks to complete:
    pool.close()
    pool.join()

if __name__ == "__main__":
    main()

Prints:

requesting:  tux
requesting:  whale
requesting:  whale
requesting:  whale
requesting:  tux
Server replied: 25.99
Server replied: 19.99
Server replied: 25.99
Server replied: 19.99
Server replied: 19.99
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks a lot!! This helped me :D Also, can you suggest some readings on Lock, Mutex and Semaphore. That'll be great
Other than reading the Python Manual, a useful resource is the 3rd edition of the Python Cookbook (Recipes for Mastering Python 3), which can be downloaded free in PDF form from many websites. Chapter 12 talks about Concurrency and Chapter 11 discusses Network and Web Programming.
0

In Python, only one thread can run Python code at a time, so threads are not useful for CPU-bound operations. What threads are good for is I/O-bound operations, such as waiting on a socket for requests.

Start a thread for each client connection, and let that thread handle all the requests from that client. There is no need for a queue or even a thread pool unless you expect dozens of clients.

Another things to point out is that TCP is a byte-oriented streaming protocol. A recv(n) can receive 1-n bytes (or 0 on socket close) and those bytes could be a partial "message", or one or more messages run together, possibly ending in a partial message. A protocol is needed to ensure a complete message is received.

An easy protocol is to send the length of the message followed by the message itself. I've chosen to send the length of the message as decimal ASCII text followed by a newline, then the UTF-8-encoded JSON message. socket.makefile is used to buffer the data stream via a file-like object, where .readline() and .read(n) can be used to read the length and data of the message.

Note I also made the response a JSON message as well.

Server:

import socket
import json
import threading

class Server:
    def __init__(self, host, port):
        self.items = {'tux': {'qty': 100,
                              'cost': 25.99},
                      'whale': {'qty': 100,
                                'cost': 19.99}}
        self.s = socket.socket()
        self.lock = threading.Lock()
        self.s.bind((host, port))
        self.s.listen()

    def get_item(self, item):
        with self.lock:
            if item not in self.items:
                return -1
            if self.items[item]['qty'] == 0:
                return 0
            self.items[item]['qty'] -= 1
            return self.items[item]['cost']

    def handle_request(self, addr, infile, outfile):
        try:
            header = infile.readline()  # I/O-bound.  Will wait here for request.
            if not header:
                return False
            length = int(header)
            data = json.loads(infile.read(length))  # Waits for complete message.
            item = data['query']
            print(f'{addr}: query {item!r}')
            cost = self.get_item(item)
            print(f'{addr}: cost {cost}')
            response = json.dumps({'cost': cost}).encode()
            outfile.write(f'{len(response)}\n'.encode())
            outfile.write(response)
            outfile.flush()   # file-like wrapper is buffered, make sure
                              # complete message is sent.
            return True
        except ConnectionResetError:
            return False

    def handle_client(self, client, addr):
        '''Threaded client handler.'''
        print(f'{addr}: connected')
        with (client,
              client.makefile('rb') as infile,
              client.makefile('wb') as outfile):
            while self.handle_request(addr, infile, outfile):
                pass
        print(f'{addr}: disconnected')

    def run(self):
        print('server running')
        while True:
            client, addr = self.s.accept()
            threading.Thread(target=self.handle_client, args=(client, addr), daemon=True).start()

server = Server('', 12345)
server.run()

Client:

import socket
import json
import random
import time

with socket.socket() as s:
    s.connect(('localhost', 12345))
    try:
        with s.makefile('rb') as infile, s.makefile('wb') as outfile:
            while True:
                    toys = ['tux', 'whale']
                    choice = random.choice(toys)
                    print('requesting: ', choice)
                    request = json.dumps({'query': choice}).encode()
                    outfile.write(f'{len(request)}\n'.encode())
                    outfile.write(request)
                    outfile.flush()   # file-like wrapper is buffered, make sure
                                      # complete message is sent.
                    header = infile.readline()
                    if not header:
                        break
                    length = int(header)
                    data = json.loads(infile.read(length))
                    print(f'Server replied: {data["cost"]}')
                    time.sleep(.1)  # Slow things down to see what's going on
    except ConnectionResetError:
        print('Server stopped')

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.