2

I've used Python before but only for Flask applications, but I've never used Celery before. After reading the docs and setting everything up (and it works as I've tested it with multiple workers) I'm trying to run an SQL query and for each row that gets returned from the query send it off to be processed by a Celery worker.

Below is a sample of the very basic code.

from celery import Celery
import MySQLdb

app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
    db = MySQLdb.connect(host="localhost", user="DB_USER", passwd="DB_PASS", db="DB_NAME")
    cur = db.cursor()
    cur.execute("SELECT * FROM myTable")

    for row in cur.fetchall():
        print_query_result(row[0])

    db.close()

def print_query_result(result):
    print result

Basically it selects everything in the 'myTable' table and for each row returned it prints it out. If I call the code using just Python it works fine and prints all the data from the MySQL table. When I call it using the .delay() function to send it off to a worker to process it only sends it to the one worker and only outputs the top row in the database.

I've been trying to read up on subtasks but I'm not sure if I'm going in the right direction with that.

In short, I'm wanting this to happen, but I've no where to start with it. Has anyone got any ideas?

  • SQL query to select all rows in table
  • Send each row/result to a worker to process some code
  • Return code result back into a database
  • Pick up next item in queue (if any)

Thanks in advance.

EDIT 1:

I've updated my code to use SQLAlchemy instead, but the results are still returning like my old query which is fine.

from celery import Celery
from models import DBDomains

app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
    query = DBDomains.query.all()
    for i in query:
        print i.domain
        print_query_result.s()

@app.task
def print_query_result():
    print "Received job"

print_domain.delay()

The worker when running the .py file returns:

[2016-08-02 02:08:40,881: INFO/MainProcess] Received task: tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de]
[2016-08-02 02:08:41,036: WARNING/Worker-3] result1
[2016-08-02 02:08:41,037: WARNING/Worker-3] result2
[2016-08-02 02:08:41,039: INFO/MainProcess] Task tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de] succeeded in 0.154022816569s: None

As you can see, the worker gets 'result1' and 'result2' from the table I'm querying but then it doesn't seem to execute the command in the subtask which is just to print "Job received".

UPDATE: It looks like the subtask had to have a .delay() on the end of it as per the Celery docs so my code looks like this and successfully distributes the jobs across the workers now.

from celery import Celery
from models import DBDomains

app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
    query = DBDomains.query.all()
    for i in query:
        subtask = print_query_result.s(i.domain)
        subtask.delay()


@app.task
def print_query_result(domain):
    print domain

 print_domain.delay()
3
  • So you want a task that will make a query on a DB and for each row returned will queue another task? Or is it acceptable for the top level that does the query and dispatches the new tasks to be a regular function? Commented Aug 1, 2016 at 22:21
  • Yeah, basically it'll be a task that makes the query, and then for each result it'll spawn another task/queue item for a worker to process. The reason being I'll be dealing with thousands (if not tens of thousands) of rows of data that will be queried every 30 seconds or so, so my thoughts were the more workers that are running the quicker the data will be processed. It also leaves room for expansion with provisioning more workers the more data I have to deal with. I was also looking into threading but Celery seemed easier and you can scale up on remote workers. Commented Aug 1, 2016 at 22:26
  • isn't it a problem running that query multiple times on the different workers? why not run query once somewhere and then compute on the result on multiple workers? Commented Sep 1, 2021 at 23:46

1 Answer 1

2

Whenever you call a task from within a task, you have to use subtasks. Fortunately the syntax is easy.

from celery import Celery

app = Celery('tasks', broker='redis://127.0.0.1:6379/0')


@app.task
def print_domain():
    for x in range(20):
        print_query_result.s(x)


@app.task
def print_query_result(result):
    print(result)

(Substitute for x in range(20) with your query results.) And if you're watching the celery output, you'll see the tasks created and distributed across the workers.

Sign up to request clarification or add additional context in comments.

6 Comments

Unfortunately that doesn't seem to work, I've updated my OP with the code I have now. It doesn't even look like the subtask is being called correctly even though results from the query are definitely returning. Thanks for the assistance so far.
I've updated my OP again as I seem to have found the issue. Thanks for your help and pointing me in the right direction!
Odd might be a difference in versions. The sample I posted worked correctly.
where is subtask in your answer? @MichaelBrown
print_domain is the primary task. It calls print_query_result as a subtask.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.