0

I have been working on a small PoC where I am trying to do a I/O Bound application to execute functions without being blocked. Currently I have created something like this:

import time
import concurrent.futures

found_products = []

site_catalog = [
    "https://www.graffitishop.net/Sneakers",
    "https://www.graffitishop.net/T-shirts",
    "https://www.graffitishop.net/Sweatshirts",
    "https://www.graffitishop.net/Shirts"
]


def threading_feeds():
    # Create own thread for each URL as we want to run concurrent
    with concurrent.futures.ThreadPoolExecutor() as executor:
        executor.map(monitor_feed, site_catalog)


def monitor_feed(link: str) -> None:
    old_payload = product_data(...)

    while True:
        new_payload = product_data(...)

        if old_payload != new_payload:
            for links in new_payload:
                if links not in found_products:
                    logger.info(f'Detected new link -> {found_link} | From -> {link}')
                    # Execute filtering function without blocking, how?
                    filtering(link=found_link)

        else:
            logger.info("Nothing new")
            time.sleep(60)
            continue


def filtering(found_link):
    # More code will be added in the future to handle logical code parts
    ...
    # Test
    time.sleep(60)

Problem: Currently the issue is that whenever we enter the row filtering(link=found_link) there will be a call to filtering(...) which sleeps for 60 seconds (This is only a mock data, in the future I will have a logical code part instead), what it does then is that the monitor_feed stops the execution and waits until the filtering() is finished.

My Question: I wonder how can I be able to execute the filtering(...) and still continue to loop through the monitor_feed without being blocked when we call filtering(...)?

17
  • Remove the time.sleep() call from filtering() and in monitor_feed outdent the time.sleep() call, and remove the continue so the sleep is executed regardless of the if result. Commented May 3, 2021 at 10:25
  • @barny Well the idea in the future is to have more code inside filtering which will do some logical stuff. I used a time.sleep as example as we do not want to be blocked when we call the filtering() or am I missing something here? Commented May 3, 2021 at 10:28
  • Please explain this in your question as this is not obvious/explained at all. Commented May 3, 2021 at 10:30
  • @barny I have explained it at the very bottom :) You might have missed it? or do you mean that it is still not explained well? Commented May 3, 2021 at 10:31
  • The paragraph above says blocking execution is the correct behaviour. Commented May 3, 2021 at 10:31

1 Answer 1

1

This is your code with small modifications - mostly problem was with wrong names of variable (because then are very similar)

To make sure I use names executor1, executor2 and executor2 has to be create before while True because it has to exist all time when threads are used.

If you have def filtering(filtered_link) then you have to use the same name filtered_link in submit(..., filtered_link=...)

import concurrent.futures
import time

found_products = []

site_catalog = [
    "https://www.graffitishop.net/Sneakers",
    "https://www.graffitishop.net/T-shirts",
    "https://www.graffitishop.net/Sweatshirts",
    "https://www.graffitishop.net/Shirts"
]


def threading_feeds():
    print('[threading_feeds] running')
    # Create own thread for each URL as we want to run concurrent
    with concurrent.futures.ThreadPoolExecutor() as executor1:
        executor1.map(monitor_feed, site_catalog)


def monitor_feed(link: str) -> None:
    print('[monitor_feed] start')
    
    old_payload = ['old'] # product_data(...)

    # executor has to exist all time
    with concurrent.futures.ThreadPoolExecutor() as executor2:

        while True:
            print('[monitor_feed] run loop')

            new_payload = ['new1', 'new2', 'new3']  # product_data(...)
            
            if old_payload != new_payload:
                for product_link in new_payload:
                    if product_link not in found_products:
                        print(f'Detected new link -> {product_link} | From -> {link}')
                        
                        executor2.submit(filtering, filtered_link=product_link)
                        #executor2.submit(filtering, product_link)
                        
                        print("Continue")
                        
            time.sleep(2)
    
def filtering(filtered_link):
    # More code will be added in the future to handle logical code parts
    #...
    # Test
    print(f'[filtering]: start: {filtered_link}')
    time.sleep(60)
    print(f'[filtering]: end: {filtered_link}')


# --- start --

threading_feeds()
Sign up to request clarification or add additional context in comments.

3 Comments

Siema and awesome! I was close! Haha, Regarding no error displayed is probably due to you need to catch the try except in both ThreadPools to be able to caught the errors. Thank you so much for all the help. Dziekuje 😁
now you have to test it on real data to see if all is OK. Good Luck (PL: Powodzenia)
Yupp will do, Im half polish aswell but terrible at writting 😎 I will come back if I get any problem else I will mark it as the answer!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.