5

Hello, I'm trying to multiprocess my discord bot with a master class that create workers and retrieve discord messages to put them in a queue. When I initialize my workers i pass as argument my discord client, here is the error.

AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'

My code:

class Master:
    def __init__(self):
        # Vars init
        print('Starting Master...')
        self.client = discord.Client()
        self.manager = Manager()
        self.user_spam = self.manager.dict()
        self.user_mute_chat = self.manager.dict()
        self.forbidden_words_list = []
        self.spam_t = self.manager.Value('i', 0)
        self.user_t_spam_muted = self.manager.Value('i', 0)
        self.to_master = []
        self.request_queue = Queue()
        self.cpu = cpu_count()

        # Web socket handlers
        # self.client.event(self.on_member_update)
        self.client.event(self.on_ready)
        self.client.event(self.on_message)

        # Bot init
        self.forbidden_words_list = self.manager.list(get_file_content('res/forbidden_words.txt'))
        for line in get_file_content('res/settings.txt'):
            if 'spamtime=' in line:
                self.spam_t = int(line.replace('spamtime=', ''))
            if 'usertimemuted' in line:
                self.user_t_spam_muted = int(line.replace('usertimemuted=', ''))

        # Workers init
        print('Starting Workers...')
        for i in range(self.cpu):
            Worker(self.request_queue, self.client, self.user_spam, self.user_mute_chat, self.spam_t,
                   self.user_t_spam_muted, self.forbidden_words_list).start()

        # Discord init
        print('Connecting to discord...')
        self.client.run(TOKEN)

class Worker(Process):
    def __init__(self, queue, client, user_spam, user_mute_chat, spam_t, user_t_spam_muted, forbidden_words_list):
        super(Worker, self).__init__()
        # Vars init
        self.client = client
        self.message = ''
        self.message_id = discord.Message
        self.word_list = []
        self.username = discord.Member
        self.queue = queue
        self.user_spam = user_spam
        self.user_mute_chat = user_mute_chat
        self.spam_t = spam_t
        self.user_t_spam_muted = user_t_spam_muted
        self.forbidden_words_list = forbidden_words_list

    async def run(self):
        # do some initialization here

        # Work here
        for data in iter(self.queue.get, None):
            pass

Full traceback :

Traceback (most recent call last):
  File "C:/Users/PC/PycharmProjects/AIDiscord/AIDiscord.py", line 406, in <module>
    master = Master()
  File "C:/Users/PC/PycharmProjects/AIDiscord/AIDiscord.py", line 42, in __init__
    self.user_t_spam_muted, self.forbidden_words_list).start()
  File "C:\Users\PC\AppData\Local\Programs\Python\Python36-32\Lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Users\PC\AppData\Local\Programs\Python\Python36-32\Lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\PC\AppData\Local\Programs\Python\Python36-32\Lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Users\PC\AppData\Local\Programs\Python\Python36-32\Lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Users\PC\AppData\Local\Programs\Python\Python36-32\Lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0EDDAED0>

I searched a little more and it append when I start() processes, it is not dependant of arguments I passed.

6
  • Show the full traceback. Commented May 27, 2018 at 10:05
  • Full traceback is added Commented May 27, 2018 at 11:05
  • There's some discussion here that looks relevant but I don't really understand what's going on myself. You might have run into a bug but since the issue was closed it seems not. async objects seem to be a problem. What if you remove the async from def run? Commented May 27, 2018 at 12:17
  • I can't remove async, my methods : filter_chat_words() and spam_detection() use async cause of discord.py I tryed to remove it and I still get the same error Commented May 27, 2018 at 12:21
  • I've been around 6 hours on the problem without getting any solutions... Commented May 27, 2018 at 12:22

2 Answers 2

4

You need to create the instances of your asyncio loop and related items in multiprocessing.Process.run instead of init.

The pattern I use is something like:

import asyncio
import multiprocessing
import typing


class Process(multiprocessing.Process):

    def __init__(self,
                 group=None,
                 target=None,
                 name=None,
                 args=(),
                 kwargs=None):
        super().__init__(group, target, name, args, kwargs)
        self.loop: typing.Optional[asyncio.AbstractEventLoop] = None
        self.stopped: typing.Optional[asyncio.Event] = None

    def run(self):
        self.loop = asyncio.get_event_loop()
        self.stopped = asyncio.Event()
        self.loop.run_until_complete(self._run())
    
    async def _run(self):
        """My async stuff here"""
Sign up to request clarification or add additional context in comments.

Comments

0

The multiprocessing library tries to pickle the necessary objects. However, you are using Asyncio (the "run" method in the "worker" class) and it causes this error.

you can not pickle Asyncio objects in Python.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.