Commit 7093f948 authored by Anders Blomdell's avatar Anders Blomdell
Browse files

Use hashtoc and create a sha512backup as well

parent a9c85513
*~
__pycache__
md5backup
\ No newline at end of file
md5backup
sha512backup
md5backup: md5backup.py $(sort $(wildcard *.py)) Makefile
SOURCE_md5backup = config.py loghandler.py md5toc.py primary.py secondary.py
SOURCE_hashbackup = command.py config.py loghandler.py hashtoc.py \
primary.py secondary.py
all: md5backup sha512backup
md5backup: hashbackup.py $(SOURCE_hashbackup) Makefile
apa -o $@ $(filter %.py, $^)
sha512backup: hashbackup.py $(SOURCE_hashbackup) Makefile
apa -o $@ $(filter %.py, $^)
.PHONY: test
test: all
$(MAKE) -C test test
#!/usr/bin/python3
class Command:
def __init__(self, command):
self.result = [ command ]
pass
def __iter__(self):
return iter(self.result)
def arg(self, arg):
self.result.append(arg)
return self
def flag(self, flag, condition=True):
condition and self.result.append(flag)
return self
def option(self, name, value=None, kind=str):
isinstance(value, kind) and self.result.extend([name, str(value)])
return self
......@@ -13,12 +13,12 @@ def checkDirectory(s, l, t):
(lineno(l, s), t[0]))
_subdirectory_ = (
Regex('[a-zA-Z0-9/]+') |
Regex('[._a-zA-Z0-9/]+') |
QuotedString('"') |
QuotedString("'")
).setParseAction(checkSubdirectory)
_directory_ = (
Regex('[a-zA-Z0-9/]+') |
Regex('[._a-zA-Z0-9/]+') |
QuotedString('"') |
QuotedString("'")
).setParseAction(checkDirectory)
......@@ -82,7 +82,17 @@ def parse(s):
if __name__ == '__main__':
import sys
config1 = parse(open(sys.argv[1]).read())
config2 = parse(open(sys.argv[1]).read())
from pyparsing import ParseException
config = open(sys.argv[1]).read()
try:
config1 = parse(config)
except ParseException as e:
lines = config.split('\n')
print('\n'.join(lines[0:e.lineno]))
print('%s^ %s' % (' ' * e.col, e))
raise
pass
config2 = parse(config)
print(config1.dump())
print(config1.asList() == config2.asList())
#!/usr/bin/python3
import argparse
import sys
import config
import primary
import secondary
import re
import os
def main():
parser = argparse.ArgumentParser()#usage="%(prog)s [options]")
# parser = argparse.ArgumentParser(prog='PROG', add_help=False)
parser.add_argument('--debug', action='store_true',
help='debug actions')
parser.add_argument('--xattr', action='store_true',
help='let hashtoc store HASH in extended attribute')
parser.add_argument('--max-age',
metavar='SECONDS',
help='maximum SECONDS since last HASH calculation')
parser.add_argument('--jobs',
metavar='CPU',
help='number of CPUs for HASH calculation')
parser.add_argument('--lookahead',
metavar='SIZE',
help='lookahead buffer size for HASH calculation')
group_primary = parser.add_argument_group('primary')
group_primary.add_argument('--primary', nargs='+',
metavar='CONFIG',
help='backup as specified in CONFIG')
group_primary.add_argument('--identity',
metavar='IDENTITY',
help='ssh IDENTITY for connecting (-i)')
group_primary.add_argument('--user',
metavar='USER',
help='ssh USER for connecting (-l)')
group_secondary = parser.add_argument_group('secondary')
group_secondary.add_argument('--secondary', nargs=3,
metavar=('SOCKET', 'MOUNT', 'PATH'),
help='backup from SOCKET to MOUNT/PATH2')
options = parser.parse_args(sys.argv[1:])
if not (options.primary == None) ^ (options.secondary == None):
parser.print_help()
print('Either --primary or --secondary should be specified')
exit(1)
pass
m = re.match('^(.*)backup', os.path.basename(sys.argv[0]))
if m:
hash_name = m.group(1)
pass
if options.primary:
done = set()
for path in options.primary:
if path in done:
continue
done.add(path)
primary.do_backup(hash_name=hash_name,
options=options,
config=config.parse(open(path).read()))
pass
pass
elif options.secondary:
secondary.do_backup(hash_name=hash_name,
options=options,
socket_path=options.secondary[0],
mount=options.secondary[1],
path=options.secondary[2])
pass
pass
if __name__ == '__main__':
main()
#!/usr/bin/python3
import subprocess
import os
import collections
import re
import sys
def readline_toc(f):
buf = f.read(4096)
m = re.match(b'^#fields:[ ]*[^\0x00\n\r]+([\x00\r\n]+)', buf)
if not m:
raise Exception("Not a valid TOC file '%s'" % f)
terminator = m.group(1)
while True:
pos = buf.find(terminator)
if pos == -1:
tmp = f.read(4096)
if len(tmp) == 0:
raise EOFError()
buf += tmp
continue
yield buf[0:pos]
buf = buf[pos+len(terminator):]
class HashTOC:
def __init__(self, stream, rename={}):
def readtoc(f):
for l in readline_toc(f):
if not l.startswith(b'#'):
data = l.split(b':', N-1)
yield tocEntry(*data)
continue
tmp = [ s.strip().decode('utf8') for s in l.split(b':') ]
if tmp[0].startswith('#fields'):
fields = [ rename.get(f) or f for f in tmp[1:] ]
print(fields, file=sys.stderr)
tocEntry = collections.namedtuple('TOCEntry', fields)
N = len(tmp) - 1
pass
elif tmp[0].startswith('#endTOC'):
while True:
yield tocEntry(*[ None for e in tocEntry._fields ])
else:
#print('X',tmp, file=sys.stderr)
pass
pass
raise Exception("Incorrect termination of TOC file '%s'" % f)
self.reader = readtoc(stream)
self.next()
def next(self):
data = self.reader.__next__()
self.fields = data._fields
for k,v in zip(data._fields, data):
setattr(self, k, v)
pass
pass
#!/usr/bin/python3
import argparse
import sys
import config
import primary
import secondary
if __name__ == '__main__':
argParser = argparse.ArgumentParser(usage="%(prog)s [options]")
group = argParser.add_mutually_exclusive_group(required=True)
group.add_argument('--primary', nargs='*',
metavar='CONFIG',
help='backup as specified in CONFIG')
group.add_argument('--secondary', nargs=3,
metavar=('SOCKET', 'MOUNT', 'PATH'),
help='backup from SOCKET to MOUNT/PATH2')
argParser.add_argument('--xattr', action='store_true',
help='let md5toc store MD5 in extended attribute')
argParser.add_argument('--max-age',
metavar='SECONDS',
help='maximum SECONDS since last MD5 calculation')
argParser.add_argument('--user',
metavar='USER',
help='ssh USER for secondary (-l)')
argParser.add_argument('--identity',
metavar='IDENTITY',
help='ssh IDENTITY for secondary (-i)')
argParser.add_argument('--debug', action='store_true',
help='debug actions')
options = argParser.parse_args(sys.argv[1:])
if options.primary:
done = set()
for path in options.primary:
if path in done:
continue
done.add(path)
primary.do_backup(options=options,
config=config.parse(open(path).read()))
if options.secondary:
secondary.do_backup(options=options,
socket_path=options.secondary[0],
mount=options.secondary[1],
path=options.secondary[2])
import os
import subprocess
import atexit
class MD5TOC:
def __init__(self, fd):
self.more = True
def read_entry():
data = b''
while True:
if not b'\n' in data:
tmp = fd.read(4096)
if len(tmp) == 0:
raise Exception('Premature end of file')
data += tmp
continue
l,data = data.split(b'\n', 1)
if l.startswith(b'#fields:'):
self.labels = list(map(lambda s: s.strip().decode(),
l.split(b':')[1:]))
elif l.startswith(b'#endTOC'):
for k in self.labels:
setattr(self, k, None)
break
elif l.startswith(b'#'):
pass
else:
result = list(zip(self.labels,
l.split(b':', len(self.labels) - 1)))
for k,v in result:
setattr(self, k, v)
yield result
while True:
self.more = False
yield None
self.read_entry = read_entry()
self.next()
def next(self):
try:
return self.read_entry.__next__()
except AttributeError:
return self.read_entry.next()
def __repr__(self):
return 'MD5TOC(%s)' % ",".join(map(lambda k: "%s=%s" %
(k, getattr(self, k)),
self.labels))
#!/usr/bin/python3
import atexit
import command
import config
import loghandler
import netifaces
......@@ -18,6 +19,7 @@ def cond_unlink(path, log):
log.DEBUG('removed %s' % path)
except FileNotFoundError:
pass
pass
class AddrInfo:
......@@ -36,7 +38,8 @@ class AddrInfo:
class Server:
def __init__(self, options, config, entry, path, uuid, log):
def __init__(self, hash_name, options, config, entry, path, uuid, log):
self.hash_name = hash_name
self.options = options
self.config = config
self.entry = entry
......@@ -45,7 +48,7 @@ class Server:
self.uuid = uuid
self.socket_path = '/tmp/%s_server' % (self.uuid)
self.mutex = threading.Lock()
self.thread_md5 = None
self.thread_hash = None
self.thread_star = None
self.thread_server = threading.Thread(daemon=True, target=self.run)
self.thread_server.start()
......@@ -55,59 +58,59 @@ class Server:
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.bind(self.socket_path)
server.listen(2)
config_MD5_socket,_ = server.accept()
self.log.DEBUG('config+MD5', config_MD5_socket)
c = config_MD5_socket.makefile('r').read()
config_HASH_socket,_ = server.accept()
self.log.DEBUG('config+HASH', config_HASH_socket)
c = config_HASH_socket.makefile('r').read()
if config.parse(c).asList() != self.config.asList():
raise Exception('Configuration differs')
with self.mutex:
self.thread_md5 = threading.Thread(daemon=True,
target=self.send_MD5,
args=(config_MD5_socket,))
self.thread_md5.start()
self.thread_hash = threading.Thread(daemon=True,
target=self.send_HASH,
args=(config_HASH_socket,))
self.thread_hash.start()
star_socket,_ = server.accept()
self.log.DEBUG('CPIO', star_socket)
with self.mutex:
self.thread_md5 = threading.Thread(daemon=True,
self.thread_hash = threading.Thread(daemon=True,
target=self.run_star,
args=(star_socket,))
self.thread_md5.start()
self.thread_hash.start()
cond_unlink(self.socket_path, self.log)
def pending(self):
with self.mutex:
return ((self.thread_md5 and self.thread_md5.thread.is_alive()) or
return ((self.thread_hash and self.thread_hash.thread.is_alive()) or
(self.thread_star and self.thread_star.thread.is_alive()) or
(self.thread_server.is_alive()))
def send_MD5(self, config_MD5):
self.log.DEBUG('send_MD5',
def send_HASH(self, config_HASH):
self.log.DEBUG('send_HASH',
self.config.primary.mount.path,
self.path)
cmd = (
[ '/usr/bin/md5toc' ] +
( self.options.xattr and
[ '--xattr' ] or []) +
( self.options.xattr and self.options.max_age and
[ '--max-age', self.options.max_age ] or []) +
[ '.' ]
)
cmd = list( command.Command('/usr/bin/hashtoc')
.flag('--%s' % self.hash_name)
.flag('--zero-terminated')
.flag('--xattr', self.options.xattr)
.option('--max-age', self.options.max_age)
.option('--jobs', self.options.jobs)
.option('--lookahead', self.options.lookahead)
.arg('.') )
cwd = os.path.join(self.config.primary.mount.path, self.path)
stdout = config_MD5.makefile('wb')
stdout = config_HASH.makefile('wb')
try:
subprocess.check_call(cmd, cwd=cwd, stdout=stdout)
finally:
config_MD5.shutdown(socket.SHUT_RDWR)
config_MD5.close()
config_HASH.shutdown(socket.SHUT_RDWR)
config_HASH.close()
def run_star(self, star_socket):
self.log.DEBUG('START run_star',
self.config.primary.mount.path,
self.path)
cmd = [ '/bin/star', '-c', '-acl', '-Hexustar', '-dump', '-list=-',
'-no-statistics' ]
cmd = [ '/bin/star', '-c', '-acl', '-Hexustar', '-dump',
'-read0', '-list=-', '-no-statistics' ]
cwd = os.path.join(self.config.primary.mount.path, self.path)
stdin = star_socket.makefile('rb')
stdout = star_socket.makefile('wb')
......@@ -123,7 +126,8 @@ class Server:
class Client:
def __init__(self, options, config, entry, log):
def __init__(self, hash_name, options, config, entry, log):
self.hash_name = hash_name
self.options = options
self.config = config
self.entry = entry
......@@ -149,40 +153,48 @@ class Client:
self.entry.mount.path,
path)
self.log.MESSAGE('START %s' % (readable))
server = Server(options=self.options,
server = Server(hash_name=self.hash_name,
options=self.options,
config=self.config,
entry=self.entry,
path=path,
uuid=self.uuid,
log=self.server_log)
socket_path = '/tmp/%s_client' % (self.uuid)
cmd = (
[ 'ssh', '-n', self.entry.mount.host ] +
( self.options.user and
[ '-l', self.options.user ] or []) +
( self.options.identity and
[ '-i', self.options.identity ] or []) +
[ '-R', '%s:%s' % (socket_path, server.socket_path) ] +
[ os.path.realpath(sys.argv[0]),
'--secondary', socket_path, self.entry.mount.path, path ] +
( self.options.debug and
[ '--debug' ] or []) +
( self.options.xattr and
[ '--xattr' ] or []) +
( self.options.xattr and self.options.max_age and
[ '--max-age', self.options.max_age ] or [])
)
cmd = list( command.Command('/bin/ssh')
.flag('-y')
.flag('-n')
.arg(self.entry.mount.host)
.option('-l', self.options.user)
.option('-i', self.options.identity)
.option('-R', '%s:%s' % (socket_path,
server.socket_path))
.arg(os.path.realpath(sys.argv[0]))
.flag('--secondary')
.arg(socket_path)
.arg(self.entry.mount.path)
.arg(path)
.flag('--debug', self.options.debug)
.flag('--xattr', self.options.xattr)
.option('--max-age', self. options.max_age)
.option('--jobs', self. options.jobs)
.option('--lookahead', self. options.lookahead) )
self.log.DEBUG('CMD="%s"' % (' '.join(cmd)))
stdout = loghandler.LOG(parent=self.log, prefix='STDOUT ')
stderr = loghandler.LOG(parent=self.log, prefix='STDERR ')
subprocess.check_call(cmd,
stdout=stdout.makefile(encoding='utf-8'),
stderr=stderr.makefile(encoding='utf-8'))
try:
subprocess.check_call(cmd,
stdout=stdout.makefile(encoding='utf-8'),
stderr=stderr.makefile(encoding='utf-8'))
except:
self.log.MESSAGE('DIED %s' % (readable))
exit(1)
time.sleep(1)
self.log.MESSAGE('DONE %s' % (readable))
time.sleep(1)
def do_backup(options, config):
def do_backup(hash_name, options, config):
def is_primary():
node = set()
for i in netifaces.interfaces():
......@@ -202,7 +214,8 @@ def do_backup(options, config):
log = loghandler.LOG(loghandler.LOG_WARNING)
for b in config.secondary.backup:
client = [ Client(options=options,
client = [ Client(hash_name=hash_name,
options=options,
config=config,
entry=e,
log=log) for e in b.entry ]
......
#!/usr/bin/python3
import atexit
import md5toc
import command
import hashtoc
import loghandler
import os
import socket
import subprocess
import loghandler
import time
def cond_unlink(path, log):
......@@ -58,7 +59,7 @@ class Backup:
stdin=self.primary_out)
atexit.register(cond_kill, self.extract)
# Make sure that the generated star archive is not empty
self.primary_in.write(b'.\n')
self.primary_in.write(b'.\0')
def close(self):
self.primary_in.flush()
......@@ -69,9 +70,9 @@ class Backup:
if src.name != dst.name:
raise Exception('Names differ: %s, %s' % (src, dst))
dst_path = os.path.join(self.dst_root, dst.name)
if src.kind != dst.kind or src.md5 != dst.md5 or src.size != dst.size:
if src.kind != dst.kind or src.sum != dst.sum or src.size != dst.size:
self.log.DEBUG('Replace...', src.name, dst.name,
src.md5, dst.md5, src.size, dst.size)
src.sum, dst.sum, src.size, dst.size)
self.status.replaced += 1
self.delete(dst)
self.add(src)
......@@ -125,9 +126,9 @@ class Backup:
parent = os.path.dirname(src.name)
while len(parent) != 0:
# Make sure directories get the correct modes
self.primary_in.write(parent + b'\n')
self.primary_in.write(parent + b'\0')
parent = os.path.dirname(parent)
self.primary_in.write(src.name + b'\n')
self.primary_in.write(src.name + b'\0')
def delete(self, dst):
self.log.DEBUG('Delete:', dst.name)
......@@ -140,7 +141,7 @@ class Backup:
os.rename(dst_path, trash_path)
def do_backup(options, socket_path, mount, path):
def do_backup(hash_name, options, socket_path, mount, path):
if options.debug:
log = loghandler.LOG(loghandler.LOG_DEBUG)
else:
......@@ -153,28 +154,28 @@ def do_backup(options, socket_path, mount, path):
if not os.path.exists(config_path):
raise Exception('"%s" does not exists' % (config_path))
# Connect to server config/md5toc socket
config_md5 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
config_md5.connect(socket_path)
# Connect to server config/hashtoc socket
config_hash = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
config_hash.connect(socket_path)
# Send secondary config to primary
config_md5.makefile('w').write(open(config_path).read())
config_md5.shutdown(socket.SHUT_WR)
config_hash.makefile('w').write(open(config_path).read())
config_hash.shutdown(socket.SHUT_WR)
# Make ready to read primary TOC (src)