#!/usr/bin/python import os, sys, time import sql.db from sql.util import new_cursor, get_dbs, db_backup_pre, db_backup_mkdir from Queue import Queue, Empty import threading import subprocess MYSQLDUMP_ARGS = ['--compact', '--add-drop-table', '--add-locks', '--create-options', '--disable-keys', '--dump-date', '--extended-insert', '--quick', '--no-autocommit', '--quote-names', '--routines', '--single-transaction', '--triggers', '--force'] finished = False queue = Queue() def consumer(): while True: try: next = queue.get(timeout=3) print next[0] + ':', log = sql.db.Backup.get_by(db=next[0]) if not log: log = sql.db.Backup(db=next[0]) log.dump_path = next[1] log.dump_date = sql.db.func.now() db_backup_mkdir(next[1]) args = ['mysqldump', next[0]] args.extend(MYSQLDUMP_ARGS) p0 = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p1 = subprocess.Popen(['gzip'], stdin=p0.stdout, stdout=file(next[1], 'w+')) p1.wait() err = p0.stderr.read() if len(err): log.dump_errnum = p0.returncode log.dump_errstr = err else: log.dump_errnum = None log.dump_errstr = None log.save_or_update() sql.db.session.flush() print 'Done' except (KeyboardInterrupt, SystemExit): break except Empty: if finished: break t_consumer = threading.Thread(target=consumer) t_consumer.start() def producer(): c = new_cursor('mysqldump') for db in get_dbs(c): log = sql.db.Backup.get_by(db=db) if not log: log = sql.db.Backup(db=db) elif log.skip_date: if time.mktime(log.skip_date.timetuple()) + 3600 > time.time(): # never recheck a db skipped in the past hour continue d = db_backup_pre(c, db) if d[0]: queue.put((db, d[1])) log.skip_reason = None log.skip_date = None else: log.skip_reason = d[1] log.skip_date = sql.db.func.now() log.save_or_update() #sql.db.session.flush() try: producer() except KeyboardInterrupt: sys.exit(1) finally: finished = True