#!/usr/bin/python import os, sys, time from mitsql import db from mitsql.util import new_cursor, get_dbs, db_backup_pre, db_backup_mkdir from Queue import Queue, Empty import threading import subprocess MYSQLDUMP_ARGS = ['--compact', '--add-drop-table', '--add-locks', '--create-options', '--disable-keys', '--dump-date', '--extended-insert', '--quick', '--no-autocommit', '--quote-names', '--routines', '--single-transaction', '--triggers', '--force'] finished = False queue = Queue() def consumer(): while True: try: next = queue.get(timeout=3) print "Consuming", next #print next[0] + ':', log = db.Backup.get_by(db=next[0]) if not log: log = db.Backup(db=next[0]) log.dump_path = next[1] log.dump_date = db.func.now() db_backup_mkdir(next[1]) args = ['mysqldump', next[0]] args.extend(MYSQLDUMP_ARGS) err = '' try: p0 = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p1 = subprocess.Popen(['gzip'], stdin=p0.stdout, stdout=file(next[1], 'w+')) p1.wait() err = p0.stderr.read() except Exception, e: print "Error on", next print e err = str(e) if len(err): log.dump_errnum = p0.returncode log.dump_errstr = err else: log.dump_errnum = None log.dump_errstr = None log.save_or_update() db.session.flush() #print 'Done' except (KeyboardInterrupt, SystemExit): print "Got exit request" break except Empty: print "No queue." if finished: print "Done!" break t_consumer = threading.Thread(target=consumer) t_consumer.start() def producer(): c = new_cursor('mysqldump') for dbname in get_dbs(c): log = db.Backup.get_by(db=dbname) if not log: log = db.Backup(db=dbname) elif log.skip_date and log.skip_date.timetuple: if time.mktime(log.skip_date.timetuple()) + 3600 > time.time(): # never recheck a db skipped in the past hour continue d = db_backup_pre(c, dbname) if d[0]: queue.put((dbname, d[1])) log.skip_reason = None log.skip_date = None else: log.skip_reason = d[1] log.skip_date = db.func.now() log.save_or_update() #db.session.flush() try: producer() except KeyboardInterrupt: sys.exit(1) finally: finished = True