1

I have an email account set up that triggers a python script whenever it receives an email. The script goes through several functions which can take about 30 seconds and writes an entry into a MYSQL database.

Everything runs smoothly until a second email is sent in less than 30 seconds after the first. The second email is processed correctly, but the first email creates a corrupted entry into the database.

I'm looking to hold the email data,

msg=email.message_from_file(sys.stdin)

in a queue if the script has not finished processing the prior email.

I'm using python 2.5. Can anyone recommend a package/script that would accomplish this?

7
  • 3
    Did you read about the Queue package yet? And the multiprocessing package? And the celery project? After reading all of those, could you rephrase the question to be more specific? Commented Jan 21, 2012 at 0:25
  • I will read through those. Thanks for giving me a starting point. Commented Jan 21, 2012 at 0:29
  • 1
    Did you write this Python script or are you just using it? Why in the world would it "corrupt" anything? When you trigger the Python scripts they are in separate processes, and using a DB from concurrent processes is typical practice and shouldn't corrupt data. So what is your code doing besides what you described? You may be better off fixing the bugs then creating a queue to work around them. Commented Jan 21, 2012 at 0:55
  • @DerekLitz Would you be able to give any feedback on the code? I am having a lot of difficulty troubleshooting because I cannot recreate the "corrupted" message. pastebin.com/JKjCuv8j To give you a sample of the output I get: æ ¼æµ´ã¹¬æ ¼æ…¥ã¹¤â€ â€ â€ â€ ã° Commented Jan 22, 2012 at 19:52
  • Nothing is jumping out at me, except no db.commit(), which may or may not be a problem (probably not since you are seeing stuff being saved to the DB). See python.org/dev/peps/pep-0249 for more info. Anyways, a golden rule for developers is "It's not a bug if you can't reproduce it." I'd work on getting those steps down if this is a high priority fix, so you have some way to confirm it's actually fixed when you make changes. It could very well be garbage in -> garbage out... but this is just speculation without ANY reproducibility. Commented Jan 22, 2012 at 21:06

3 Answers 3

2

I find this a simple way to avoid running a cronjob while the previous cronjob is still running.

fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 

This will raise an IOError that I then handle by having the process kill itself.

See http://docs.python.org/library/fcntl.html#fcntl.lockf for more info.

Anyways you can easily use the same idea to only allow a single job to run at a time, which really isn't the same as a queue (since any process waiting could potentially acquire the lock), but it achieves what you want.

import fcntl
import time
fd = open('lock_file', 'w')
fcntl.lockf(fd, fcntl.LOCK_EX)
# optionally write pid to another file so you have an indicator
# of the currently running process
print 'Hello'
time.sleep(1)

You could also just use http://docs.python.org/dev/library/multiprocessing.html#exchanging-objects-between-processes, which does exactly what you want.

Sign up to request clarification or add additional context in comments.

Comments

2

While Celery is a very fine piece of software, using it in this scenario is akin to driving in a nail with a sledgehammer. At a conceptual level, you are looking for a job queue (which is what celery provides) but the e-mail inbox you are using to trigger the script is also a capable job-queue.

The more direct solution is to have the Python worker script poll the mail server itself (using the built in poplib for example) retrieve all new mail every few seconds, then process any new e-mails one at a time. This will serialize the work your script is doing, thereby preventing two copies from running at once.

For example, you would wrap your existing script in a function like this (from the documentation linked above):

import getpass, poplib
from time import sleep

M = poplib.POP3('localhost')
M.user(getpass.getuser())
M.pass_(getpass.getpass())
while True:
    numMessages = len(M.list()[1])
    for i in range(numMessages):
        email = '\n'.join(M.retr(i+1)[1])
        # This is what your script normally does:
        do_work_for_message(email)
    sleep(5)

edit: grammar

Comments

2

I would look into http://celeryproject.org/

I'm fairly certain that will meet your needs exactly.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.