Skip to content
Snippets Groups Projects
Commit e477afc0 authored by Anders Blomdell's avatar Anders Blomdell
Browse files

Initial commit

parents
No related branches found
No related tags found
No related merge requests found
*~
__pycache__
totalbackup
\ No newline at end of file
totalbackup: totalbackup.py $(sort $(wildcard *.py)) Makefile
apa -o $@ $(filter %.py, $^)
#!/usr/bin/python3
from pyparsing import Keyword, QuotedString, Suppress, Regex, OneOrMore, Group
from pyparsing import lineno
def checkSubdirectory(s, l, t):
if t[0].startswith('/'):
raise Exception('Subdirectory expected at line %d" %s' %
(lineno(l, s), t[0]))
def checkDirectory(s, l, t):
if not t[0].startswith('/'):
raise Exception('Directory expected at line %d" %s' %
(lineno(l, s), t[0]))
_subdirectory_ = (
Regex('[a-zA-Z0-9/]+') |
QuotedString('"') |
QuotedString("'")
).setParseAction(checkSubdirectory)
_directory_ = (
Regex('[a-zA-Z0-9/]+') |
QuotedString('"') |
QuotedString("'")
).setParseAction(checkDirectory)
_host_and_directory_ = (
Regex('[A-Za-z][A-Za-z0-9-.]*')('host') +
Suppress(':') +
_directory_('path')
)
_client_ = (
Regex('@?[A-Za-z][A-Za-z0-9-.]') | QuotedString('"') | QuotedString("'")
)
_options_ = (
Regex('[a-z][a-z_,]*') | QuotedString('"') | QuotedString("'")
)
_client_export_ = (
_client_('host') + _options_('options')
)
_export_entry_ = (
_subdirectory_('path') +
Suppress('[') +
Group(OneOrMore(Group(_client_export_)))('client') +
Suppress(']')
)
_primary_ = (
Suppress(Keyword('primary')) +
Suppress('{') +
Suppress(Keyword('attributes')) +
( QuotedString('"') | QuotedString("'") )('attributes') +
Group(_host_and_directory_)('mount') +
Suppress(Keyword('export')) +
Suppress('{') +
Group(OneOrMore(Group(_export_entry_)))('export') +
Suppress('}') +
Suppress('}')
)
_backup_entry_ = (
Group(_host_and_directory_)('mount') +
Suppress('[') +
Group(OneOrMore(_subdirectory_)).setResultsName('path') +
Suppress(']')
)
_backup_ = (
Suppress(Keyword('backup')) +
Suppress('{') +
Group(OneOrMore(Group(_backup_entry_)))('entry') +
Suppress('}')
)
_secondary_ = (
Suppress(Keyword('secondary')) +
Suppress('{') +
Group(OneOrMore(Group(_backup_)))('backup') +
Suppress('}')
)
_config_ = (
Group(_primary_)('primary') +
Group(_secondary_)('secondary')
)
def parse(s):
return _config_.parseString(s)
if __name__ == '__main__':
import sys
config1 = parse(open(sys.argv[1]).read())
config2 = parse(open(sys.argv[1]).read())
print(config1.dump())
print(config1.asList() == config2.asList())
primary {
attributes "rw,usrquota"
primary.host.name:/export
export {
sub/dir/1 [
"client1" "rw,insecure,sync"
"client2" "rw,no_root_squash,sync"
]
sub/dir/2 [
"client3" "rw,insecure,sync"
"client4" "rw,no_root_squash,sync"
]
}
}
secondary {
backup {
secondary-1.host.name:/var/tmp/1 [ sub/dir/1 sub/dir/2 ]
secondary-2.host.name:/var/tmp/2 [ sub/dir/1 sub/dir/2 ]
}
backup {
secondary-3.host.name:/var/tmp/3 [ sub/dir/1 sub/dir/2 ]
secondary-4.host.name:/var/tmp/4 [ sub/dir/1 sub/dir/2 ]
secondary-5.host.name:/var/tmp/5 [ sub/dir/1 sub/dir/2 ]
}
}
\ No newline at end of file
#!/usr/bin/python3
import threading
import asyncio
import os
import sys
LOG_MESSAGE = 0
LOG_ERROR = 1
LOG_WARNING = 2
LOG_DEBUG = 3
class LOG:
class singleton(object):
loop = asyncio.new_event_loop()
lock = threading.Lock()
def __init__(self):
t = threading.Thread(daemon=True, target=self.loop.run_forever)
t.start()
def __call__(self):
return self
singleton = singleton()
loop = singleton().loop
lock = singleton().lock
def __init__(self, level=None, prefix=None, parent=None):
if level != None:
self.level = level
elif parent:
self.level = parent.level
else:
level = LOG_WARNING
self.prefix = ''.join(filter(None, [parent and parent.prefix,
prefix]))
def MESSAGE(self, *args, level=LOG_MESSAGE):
if self.level >= level:
with self.lock:
buf = ' '.join(map(str, args))
while '\n' in buf:
line, buf = buf.split('\n', 1)
if self.prefix != None:
print(self.prefix, end='', file=sys.stderr)
print(line, file=sys.stderr)
if len(buf):
if self.prefix != None:
print(self.prefix, end='', file=sys.stderr)
print(buf, file=sys.stderr)
def ERROR(self, *args):
self.MESSAGE(*args, level=LOG_ERROR)
def WARNING(self, *args):
self.MESSAGE(*args, level=LOG_WARNING)
def DEBUG(self, *args):
self.MESSAGE(*args, level=LOG_DEBUG)
def makefile(self, level=LOG_WARNING, encoding=None):
loop = self.singleton().loop
class Reader:
def __init__(self, log, fd):
self.log = log
self.fd = fd
self.buf = bytearray()
self.mutex = threading.Lock()
def decode(b):
if encoding == None:
return b
else:
return b.decode(encoding)
self.decode = decode
def __call__(self, flush=False):
with self.mutex:
while True:
buf = os.read(self.fd, 10)
if len(buf) == 0: break
self.buf.extend(buf)
while True:
i = self.buf.find(b'\n')
if i < 0: break
self.log.MESSAGE(self.decode(self.buf[0:i]),
level=level)
self.buf = self.buf[i+1:]
if len(self.buf) > 0 and flush:
self.log.MESSAGE(self.decode(self.buf), level=level)
class MakeFile:
def __init__(self, log):
self.rpipe, self.wpipe = os.pipe()
self.reader = Reader(log, self.rpipe)
loop.add_reader(self.rpipe, self.reader)
def fileno(self):
return self.wpipe
def __del__(self):
loop.remove_reader(self.rpipe)
os.close(self.wpipe)
self.reader(flush=True)
return MakeFile(self)
import os
import subprocess
import atexit
class MD5TOC:
def __init__(self, fd):
self.more = True
def read_entry():
data = b''
while True:
if not b'\n' in data:
tmp = fd.read(4096)
if len(tmp) == 0:
raise Exception('Premature end of file')
data += tmp
continue
l,data = data.split(b'\n', 1)
if l.startswith(b'#fields:'):
self.labels = list(map(lambda s: s.strip().decode(),
l.split(b':')[1:]))
elif l.startswith(b'#endTOC'):
for k in self.labels:
setattr(self, k, None)
break
elif l.startswith(b'#'):
pass
else:
result = list(zip(self.labels,
l.split(b':', len(self.labels) - 1)))
for k,v in result:
setattr(self, k, v)
yield result
while True:
self.more = False
yield None
self.read_entry = read_entry()
self.next()
def next(self):
try:
return self.read_entry.__next__()
except AttributeError:
return self.read_entry.next()
def __repr__(self):
return 'MD5TOC(%s)' % ",".join(map(lambda k: "%s=%s" %
(k, getattr(self, k)),
self.labels))
#!/usr/bin/python3
import atexit
import config
import loghandler
import netifaces
import os
import socket
import subprocess
import sys
import threading
import time
import uuid
def cond_unlink(path, log):
try:
os.unlink(path)
log.DEBUG('removed %s' % path)
except FileNotFoundError:
pass
class AddrInfo:
def __init__(self, host):
self.addr = [ h[4][0] for h in
socket.getaddrinfo(host, 22,
type=socket.SocketKind.SOCK_STREAM) ]
names = set([ socket.gethostbyaddr(a)[0] for a in self.addr ])
if len(names) != 1:
raise Exception('%s resolves to more than one name',
self.addr, names)
self.name = names.pop()
def __repr__(self):
return 'AddrInfo(%s, %s)' % (self.name, self.addr)
class Server:
def __init__(self, options, config, entry, path, uuid, log):
self.options = options
self.config = config
self.entry = entry
self.path = path
self.log = log
self.uuid = uuid
self.socket_path = '/tmp/%s_server' % (self.uuid)
self.mutex = threading.Lock()
self.thread_md5 = None
self.thread_cpio = None
self.thread_server = threading.Thread(daemon=True, target=self.run)
self.thread_server.start()
def run(self):
atexit.register(cond_unlink, self.socket_path, self.log)
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.bind(self.socket_path)
server.listen(2)
config_MD5_socket,_ = server.accept()
self.log.DEBUG('config+MD5', config_MD5_socket)
c = config_MD5_socket.makefile('r').read()
if config.parse(c).asList() != self.config.asList():
raise Exception('Configuration differs')
with self.mutex:
self.thread_md5 = threading.Thread(daemon=True,
target=self.send_MD5,
args=(config_MD5_socket,))
self.thread_md5.start()
cpio_socket,_ = server.accept()
self.log.DEBUG('CPIO', cpio_socket)
with self.mutex:
self.thread_md5 = threading.Thread(daemon=True,
target=self.run_cpio,
args=(cpio_socket,))
self.thread_md5.start()
cond_unlink(self.socket_path, self.log)
def pending(self):
with self.mutex:
return ((self.thread_md5 and self.thread_md5.thread.is_alive()) or
(self.thread_cpio and self.thread_cpio.thread.is_alive()) or
(self.thread_server.is_alive()))
def send_MD5(self, config_MD5):
self.log.DEBUG('send_MD5',
self.config.primary.mount.path,
self.path)
cmd = (
[ '/usr/bin/md5toc' ] +
( self.options.xattr and
[ '--xattr' ] or []) +
( self.options.xattr and self.options.max_age and
[ '--max-age', self.options.max_age ] or []) +
[ '.' ]
)
cwd = os.path.join(self.config.primary.mount.path, self.path)
stdout = config_MD5.makefile('wb')
try:
subprocess.check_call(cmd, cwd=cwd, stdout=stdout)
finally:
config_MD5.shutdown(socket.SHUT_RDWR)
config_MD5.close()
def run_cpio(self, cpio_socket):
self.log.DEBUG('START run_cpio',
self.config.primary.mount.path,
self.path)
cmd = [ '/bin/cpio', '-oc', '--quiet' ]
cwd = os.path.join(self.config.primary.mount.path, self.path)
stdin = cpio_socket.makefile('rb')
stdout = cpio_socket.makefile('wb')
try:
subprocess.check_call(cmd, cwd=cwd,
stdin=stdin,
stdout=stdout,
stderr=self.log.makefile(encoding='utf-8'))
self.log.DEBUG('OK run_cpio', cmd, cwd)
finally:
cpio_socket.shutdown(socket.SHUT_RDWR)
self.log.DEBUG('DONE run_cpio', cmd, cwd)
class Client:
def __init__(self, options, config, entry, log):
self.options = options
self.config = config
self.entry = entry
self.uuid = str(uuid.uuid4())
self.log = loghandler.LOG(parent=log,
prefix="%s client " % (self.uuid))
self.server_log = loghandler.LOG(parent=log,
prefix="%s server " % (self.uuid))
self.thread = threading.Thread(daemon=True, target=self.run)
self.thread.start()
def pending(self):
return self.thread.is_alive()
def run(self):
self.log.DEBUG('Running', self.entry.dump())
for path in self.entry.path:
readable = '%s:%s/%s -> %s:%s/%s' % (
self.config.primary.mount.host,
self.config.primary.mount.path,
path,
self.entry.mount.host,
self.entry.mount.path,
path)
self.log.MESSAGE('START %s' % (readable))
server = Server(options=self.options,
config=self.config,
entry=self.entry,
path=path,
uuid=self.uuid,
log=self.server_log)
socket_path = '/tmp/%s_client' % (self.uuid)
cmd = (
[ 'ssh', self.entry.mount.host ] +
( self.options.user and
[ '-l', self.options.user ] or []) +
( self.options.identity and
[ '-i', self.options.identity ] or []) +
[ '-R', '%s:%s' % (socket_path, server.socket_path) ] +
[ os.path.realpath(sys.argv[0]),
'--secondary', socket_path, self.entry.mount.path, path ] +
( self.options.debug and
[ '--debug' ] or []) +
( self.options.xattr and
[ '--xattr' ] or []) +
( self.options.xattr and self.options.max_age and
[ '--max-age', self.options.max_age ] or [])
)
self.log.DEBUG('CMD="%s"' % (' '.join(cmd)))
stdout = loghandler.LOG(parent=self.log, prefix='STDOUT ')
stderr = loghandler.LOG(parent=self.log, prefix='STDERR ')
subprocess.check_call(cmd,
stdout=stdout.makefile(encoding='utf-8'),
stderr=stderr.makefile(encoding='utf-8'))
time.sleep(1)
self.log.MESSAGE('DONE %s' % (readable))
time.sleep(1)
def do_backup(options, config):
def is_primary():
node = set()
for i in netifaces.interfaces():
for k,al in netifaces.ifaddresses(i).items():
if k in [netifaces.AF_INET, netifaces.AF_INET6]:
for a in al:
node.add(a['addr'])
primary = AddrInfo(config.primary.mount.host)
return any(node.intersection(primary.addr))
if not is_primary():
raise Exception('Host is not primary')
if options.debug:
log = loghandler.LOG(loghandler.LOG_DEBUG)
else:
log = loghandler.LOG(loghandler.LOG_WARNING)
for b in config.secondary.backup:
client = [ Client(options=options,
config=config,
entry=e,
log=log) for e in b.entry ]
while any([ c.pending() for c in client ]):
time.sleep(1)
#!/usr/bin/python3
import atexit
import md5toc
import os
import socket
import subprocess
import loghandler
import time
def cond_unlink(path, log):
try:
os.unlink(path)
log.DEBUG('removed %s' % path)
except FileNotFoundError:
pass
def cond_kill(p):
try:
p.kill()
except:
pass
class Status:
def __init__(self, log):
self.checked = 0
self.added = 0
self.deleted = 0
self.replaced = 0
self.unchanged = 0
self.metadata = 0
self.extract_OK = -1
def report():
log.MESSAGE('STATUS %d = +%d -%d =%d ?%d (%d)' % (
self.checked, self.added, self.deleted,
self.replaced, self.metadata,
self.extract_OK))
atexit.register(report)
class Backup:
def __init__(self, primary_cpio, mount, path, status, log):
self.primary_cpio = primary_cpio
self.primary_in = primary_cpio.makefile('wb')
self.primary_out = primary_cpio.makefile('rb')
self.mount = mount
self.path = path
self.status = status
self.log = log
self.dst_root = os.path.join(mount, path).encode('utf-8')
self.trash_root = os.path.join(mount, 'TRASH').encode('utf-8')
self.trash = os.path.join(self.trash_root,
str(int(time.time())).encode('utf-8'))
extract_cmd = [ '/bin/cpio',
'-idmu',
'--quiet',
'--no-absolute-filenames',
'--preserve-modification-time' ]
self.extract = subprocess.Popen(extract_cmd,
cwd=os.path.join(mount, path),
stdin=self.primary_out)
atexit.register(cond_kill, self.extract)
def close(self):
self.primary_in.flush()
self.primary_cpio.shutdown(socket.SHUT_WR)
self.status.extract_OK = self.extract.wait()
def check(self, src, dst):
if src.name != dst.name:
raise Exception('Names differ: %s, %s' % (src, dst))
if src.kind != dst.kind or src.md5 != dst.md5 or src.size != dst.size:
self.log.DEBUG('Replace...', src.name, dst.name,
src.md5, dst.md5, src.size, dst.size)
self.status.replaced += 1
self.delete(dst)
self.add(src)
elif os.path.exists(dst.name):
changed = False
if src.kind in [ 'F', 'D'] and src.mode != dst.mode:
self.log.DEBUG('MODE', dst.name, src.mode, dst.mode)
os.chmod(dst.name, int(src.mode, 8))
changed = True
if (src.kind in [ 'F', 'D'] and
src.uid != dst.uid or src.gid != dst.gid):
self.log.DEBUG('UID/GID', dst.name, src.uid, src.gid,
dst.uid, dst.gid)
os.lchown(dst.name, int(src.uid), int(src.gid))
changed = True
if src.kind == 'F' and src.mtime != dst.mtime:
self.log.DEBUG('MTIME', src.name, src.mtime, dst.mtime)
atime = os.stat(dst.name).st_atime
os.utime(dst.name, (int(atime), int(src.mtime)))
changed = True
if changed:
self.status.metadata += 1
else:
self.status.unchanged += 1
def make_room(self, size):
for p in sorted(os.listdir(self.trash_root)):
stat = os.statvfs(self.dst_root)
free = stat.f_frsize * stat.f_bavail
need = size + stat.f_frsize
if free > need:
break
self.log.MESSAGE("Need to free:",
need - free, (need, free), self.trash_root)
d = os.path.join(self.trash_root, p)
if os.path.isdir(d):
self.log.MESSAGE('Removing dir', d)
shutil.rmtree(d)
pass
else:
self.log.MESSAGE('Removing file', d)
os.unlink(d)
def add(self, src):
self.log.DEBUG('Add:', src.name)
if len(src.size) == 0:
size = 0
else:
size = int(src.size)
self.make_room(size)
parent = os.path.dirname(src.name)
while len(parent) != 0:
# Make sure directories get the correct modes
self.primary_in.write(parent + b'\n')
parent = os.path.dirname(parent)
self.primary_in.write(src.name + b'\n')
def delete(self, dst):
self.log.DEBUG('Delete:', dst.name)
dst_path = os.path.join(self.dst_root, dst.name)
if os.path.exists(dst_path):
trash_path = os.path.join(self.trash, dst.name)
trash_dir = os.path.dirname(trash_path)
if not os.path.exists(trash_dir):
os.makedirs(trash_dir, mode=0o700)
os.rename(dst_path, trash_path)
def do_backup(options, socket_path, mount, path):
if options.debug:
log = loghandler.LOG(loghandler.LOG_DEBUG)
else:
log = loghandler.LOG(loghandler.LOG_WARNING)
atexit.register(cond_unlink, socket_path, log)
status = Status(log)
config_path = '%s/TOTALBACKUP.config' % (mount)
if not os.path.exists(config_path):
raise Exception('"%s" does not exists' % (config_path))
# Connect to server config/md5toc socket
config_md5 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
config_md5.connect(socket_path)
# Send secondary config to primary
config_md5.makefile('w').write(open(config_path).read())
config_md5.shutdown(socket.SHUT_WR)
# Make ready to read primary TOC (src)
src = md5toc.MD5TOC(config_md5.makefile('rb'))
# Create secondary md5toc (dst)
cmd = (
[ '/usr/bin/md5toc' ] +
( options.xattr and [ '--xattr' ] or []) +
( options.xattr and options.max_age and [ '--max-age', options.max_age ]
or []) +
[ '.' ]
)
p = subprocess.Popen(cmd,
cwd=os.path.join(mount, path),
stdout=subprocess.PIPE)
atexit.register(cond_kill, p)
dst = md5toc.MD5TOC(p.stdout)
# Connect to server cpio socket
primary_cpio = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
primary_cpio.connect(socket_path)
backup = Backup(primary_cpio=primary_cpio,
mount=mount, path=path, status=status, log=log)
while True:
if src.name == None and dst.name == None:
# All done
break
status.checked += 1
if src.name == None:
status.deleted += 1
backup.delete(dst)
dst.next()
elif dst.name == None:
status.added += 1
backup.add(src)
src.next()
elif src.name == dst.name:
backup.check(src=src, dst=dst)
src.next()
dst.next()
elif src.name < dst.name:
status.added += 1
backup.add(src)
src.next()
elif src.name > dst.name:
status.deleted += 1
backup.delete(dst)
dst.next()
else:
raise Exception()
backup.close()
log.DEBUG('md5toc result', p.wait())
config_md5.shutdown(socket.SHUT_RD)
config_md5.close()
#!/usr/bin/python3
import argparse
import sys
import config
import primary
import secondary
if __name__ == '__main__':
argParser = argparse.ArgumentParser(usage="%(prog)s [options]")
group = argParser.add_mutually_exclusive_group(required=True)
group.add_argument('--primary', nargs='*',
metavar='CONFIG',
help='backup as specified in CONFIG')
group.add_argument('--secondary', nargs=3,
metavar=('SOCKET', 'MOUNT', 'PATH'),
help='backup from SOCKET to MOUNT/PATH2')
argParser.add_argument('--xattr', action='store_true',
help='let md5toc store MD5 in extended attribute')
argParser.add_argument('--max-age',
metavar='SECONDS',
help='maximum SECONDS since last MD5 calculation')
argParser.add_argument('--user',
metavar='USER',
help='ssh USER for secondary (-l)')
argParser.add_argument('--identity',
metavar='IDENTITY',
help='ssh IDENTITY for secondary (-i)')
argParser.add_argument('--debug', action='store_true',
help='debug actions')
options = argParser.parse_args(sys.argv[1:])
if options.primary:
done = set()
for path in options.primary:
if path in done:
continue
done.add(path)
primary.do_backup(options=options,
config=config.parse(open(path).read()))
if options.secondary:
secondary.do_backup(options=options,
socket_path=options.secondary[0],
mount=options.secondary[1],
path=options.secondary[2])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment