-
Notifications
You must be signed in to change notification settings - Fork 2
/
cli.py
554 lines (464 loc) · 19.3 KB
/
cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
#!/usr/bin/env python
# Copyright © 2023 Andrei Tatar <[email protected]>
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""Command line interface to btrsync. Run as main with '-h' for usage information."""
import re
import os
import sys
import fnmatch
import posixpath
import asyncio
import argparse
import urllib.parse
import dataclasses
from . import sync
from . import VERSION
class IncrSync(sync.BtrSync):
""":class:`btrsync.sync.BtrSync` class that skips non-incremental transfers."""
@staticmethod
def check(vol, parent):
return parent is not None
def humanbytes(n, sep=' '):
"""
Represent `n` bytes in human-readable form using IEC units (i.e., KiB, MiB, etc.).
:param n: the number of bytes to represent
:param sep: separator between numeric value and unit
:returns: human-readable representation as string
"""
THRESH = 1024
UNITS = (' B', 'KiB', 'MiB', 'GiB', 'TiB', 'EiB')
SCALE = [(2**(10*i), u) for i, u in enumerate(UNITS)]
def rv(q, u): return sep.join((f'{q:6.1f}', u))
for sz, unit in SCALE[:-1]:
r = n/sz
if abs(r) < THRESH:
return rv(r, unit)
return rv(n/SCALE[-1][0], SCALE[-1][1])
class CliProgress(sync.ProgressTransfer):
"""Transfer class that periodically reports progress on stdout."""
def __init__(self, *args, **kwargs):
super().__init__(prog_seq='|/-\\', *args, **kwargs)
async def report_progress(self, total, prev, seq):
print('\r', next(seq), humanbytes(total),
humanbytes((total - prev) / self.period) + '/sec', end='')
class BaseMatch:
"""
Match against a set of include and exclude globs.
If no include globs are given, match all not explicitly excluded.
"""
def __init__(self):
self.incl = []
self.excl = []
def include(self, *globs):
"""Add `globs` to the list of included globs."""
self.incl.extend(globs)
def exclude(self, *globs):
"""Add `globs` to the list of excluded globs."""
self.excl.extend(globs)
def base_match(self, path):
"""
Return the form of `path` to be matched against includes and excludes, or :const:`None` to exclude it outright.
By default return `path` unaltered; override to change behavior.
"""
return path
def match(self, path):
"""Return :const:`True` if `path` matches against :func:`base_match` and current includes and excludes, :const:`False` otherwise."""
rpath = self.base_match(path)
if rpath is None:
return False
if self.incl and not any(fnmatch.fnmatch(rpath, i) for i in self.incl):
return False
if any(fnmatch.fnmatch(rpath, x) for x in self.excl):
return False
return True
def stop(self, paths):
"""
Return :const:`True` if further processing should stop after handling `paths`, or :const:`False` if it should continue.
By default always return :const:`False`."""
return False
class SingleMatch(BaseMatch):
"""
Match a single path, accounting for include and exclude globs, and stop all processing afterwards.
:param path: the target path to match
"""
def __init__(self, path):
super().__init__()
self.path = path
def __repr__(self):
return f'SingleMatch({self.path})'
def base_match(self, path):
"""Match a single path and return it unaltered."""
return path if path == self.path else None
def stop(self, paths):
"""Return :const:`True` to stop immediately after processing the target path."""
assert(self.path in paths)
return True
class UnderGlob(BaseMatch):
"""
Match all paths that are below or matching a particular glob, further accounting for include and exclude globs.
:param glob: a shell-like glob matching the path prefix of targets
"""
def __init__(self, glob):
super().__init__()
if posixpath.isabs(glob):
raise ValueError('glob must specify a relative path')
if not glob.endswith('*'):
glob = posixpath.join(glob, '*')
rx = fnmatch.translate(glob)
assert(rx.endswith('.*)\\Z'))
rx = rx[:-len('.*)\\Z')] + '(.*))\\Z'
self.under = re.compile(rx)
self._glob = glob
self._re = rx
def __repr__(self):
return f'UnderGlob({self._glob})'
def base_match(self, path):
"""Match everything below the base glob, and return a path relative to it."""
m = self.under.match(path)
return m.groups()[0] if m is not None else None
@dataclasses.dataclass(frozen=True)
class SSHLoc:
"""Convenience class to parse, store, and validate SSH location parameters."""
user: str
host: str
port: str
SSHRE = re.compile('(?:([^@]*)@)?(.*)')
URLRE = re.compile('(?:([^@:]*)@)?(\[[A-Fa-f0-9:]+\]|[^:]*)(?::(.*))?')
@classmethod
def parse_ssh(cls, locstr):
"""Parse a SSH location from a ``user@hostname`` form."""
return cls(*cls.SSHRE.match(locstr).groups(), None)
@classmethod
def parse_url(cls, netloc):
"""Parse a SSH location from a URL netloc component."""
return cls(*cls.URLRE.match(netloc).groups())
def validate(self):
"""
Validate SSH parameters: `host` cannot be empty, and `user` or `port`, if specified, cannot be empty.
:returns: `self`
:raises ValueError: for invalid parameters
"""
if not self.host:
raise ValueError('SSH host cannot be empty')
if self.user is not None and not self.user:
raise ValueError('SSH user, if specified, cannot be empty')
if self.port is not None and not self.port:
raise ValueError('SSH port, if specified, cannot be empty')
return self
def asdict(self):
"""Return parameters of `self` in the form of a :class:`dict`."""
return dataclasses.asdict(self)
SSHLOC_RE = re.compile('^((?:[^/:@]*@)?\[[A-Fa-f0-9:]+\]|[^/:]*):(.*)')
URLSCHEME_RE = re.compile('^[A-Za-z][A-Za-z0-9+.-]*://(.*)')
def parse_root(locstr):
"""
Parse a location string into a protocol string and btrfs root options.
:param locstr: the location string to parse
:returns: a tuple ``(protocol, root_options, root_arguments)``:
the protocol to pass to :func:`btrsync.sync.default_root` to obtain a root factory,
the arguments to pass to the root factory to obtain a root class, and
the location to pass to the root class, respectively.
"""
sshmatch = SSHLOC_RE.match(locstr)
if sshmatch:
urlmatch = URLSCHEME_RE.match(locstr)
if urlmatch:
url = urllib.parse.urlparse(locstr)
if url.scheme == 'file':
return 'local', {}, urlmatch.groups()[0]
elif url.scheme == 'ssh':
return 'ssh', SSHLoc.parse_url(url.netloc).validate().asdict(), url.path
else:
return url.scheme, url
else:
host, path = sshmatch.groups()
return 'ssh', SSHLoc.parse_ssh(host).validate().asdict(), path
else:
return 'local', {}, locstr
async def dest_root(loc, rootopts={}, rootargs={}):
"""Process a destination location string, returning a tuple of ``(btrfs_root, receive_path)``."""
prot, largs, path = parse_root(loc)
root, recvpath = await sync.default_root(prot)(**largs, **rootopts).get_root(path, **rootargs)
return root, recvpath
async def dump_root(odir, inner_coro=None, rootopts={}, rootargs={}):
"""Return a ``(root, receive_path)`` pair for dumping a stream to a local file."""
if inner_coro is not None:
sroot, _ = await inner_coro
else:
sroot = None
return sync.default_root('file-dump')(**rootopts)(odir, subroot=sroot, **rootargs), '.'
async def src_root(loc, rootopts={}, rootargs={}):
"""Process a source location string, returning a tuple of ``(btrfs_root, matcher_instance)``."""
prot, largs, path = parse_root(loc)
if prot == 'local' and os.path.isfile(path):
prot = 'file'
rtype = sync.default_root(prot)(**largs, **rootopts)
if prot == 'file':
root = rtype(path, **rootargs)
matcher = BaseMatch()
elif await rtype.is_root(path):
if path.endswith('/'):
root = rtype(path, **rootargs)
matcher = UnderGlob('*')
else:
root, rpath = await rtype.get_root(posixpath.dirname(path), **rootargs)
matcher = SingleMatch(posixpath.join(rpath, posixpath.basename(path)))
else:
root, rglob = await rtype.get_root(posixpath.dirname(path), **rootargs)
matcher = UnderGlob(posixpath.join(rglob, posixpath.basename(path)))
return root, matcher
def format_transfer(volpaths, parent, destdir, *, verb=False):
"""
Format the paths that make up a transfer for display on the command line.
:param volpaths: sequence of paths to send
:param parent: parent to use for incremental send, or :const:`None` for full send
:param destdir: destination directory used for receive
:param verb: if :const:`True` include more details
:returns: formatted string
"""
vpaths = ',\n'.join(volpaths)
if verb:
return '\n'.join((
'',
vpaths,
'\t' + (f'incremental from {parent}' if parent is not None else 'full'),
f'\tinto {destdir}',
))
else:
return vpaths + '\t' + ('full' if parent is None else 'incr') + ' -> ' + destdir
class Confirm(sync.ProgressTransfer):
"""
Handle the UI aspects of confirming a sync via the command line.
:param src: the SOURCE command line argument currently being processed
"""
VERBOSE = False
def __init__(self, src, *args, **kwargs):
super().__init__(*args, **kwargs)
self._preview = []
self.src = src
async def transf(self, vols, par, src, dst):
"""Transfer function as expected by :meth:`btrsync.sync.BtrSync.sync` that only logs transfers."""
volpaths, parent, _ = self._sendpaths(vols, par)
recvpath = self._recvpath(volpaths)
self._preview.append(format_transfer(volpaths, parent, recvpath, verb=self.VERBOSE))
def head(self):
"""Called at the beginning to print a header."""
print('At source', self.src)
def preview(self):
"""Called after the dry run sync finishes to print a preview of the transfers to be performed."""
if not self._preview:
print('Nothing to do')
else:
print('About to sync the following subvolumes:')
print(*self._preview, sep='\n')
def confirm(self):
"""
Called after :meth:`preview` to give final confirmation.
:returns: ``'Y'`` to proceed with sync,
``'S'`` to skip the current source and continue with the next, and
``'N'`` to immediately abort
"""
r = 'S' if not self._preview else ''
while r not in ('Y', 'N', 'S'):
print('Proceed? [y/N/(s)kip]: ', end='', flush=True)
r = input().upper()
if not r:
r = 'N'
return r
async def do_btrsync(*, src_coros, dst_coro, incls, excls, auto,
confirm, syncer, syncopts, transfer, transopts):
"""
Perform btrsync from `srcs` to `dst`.
:param src_coros: sequence of coroutines that return (source_root, matcher) pairs
:param dst_coro: coroutine that returns a (destination_root, receive_path) pair
:param incls: list of globs matching subvolumes to include in the sync
:param excls: list of globs matching subvolumes to exclude from the sync
:param auto: if :const:`False` do a dry run, if :const:`None` ask for confirmation, if :const:`True` proceed without asking
:param confirm: :class:`.Confirm`-like class to handle user interaction for confirmation
:param syncer: :class:`btrsync.sync.BtrSync`-like class to use for sync
:param syncopts: keyword arguments to pass to `syncer`
:param transfer: :class:`btrsync.sync.Transfer`-like class to use for sync
:param transopts: keyword arguments to pass to `transfer`
"""
dtask = asyncio.create_task(dst_coro)
stasks = list(map(asyncio.create_task, src_coros))
try:
droot, recvpath = await dtask
trans = transfer(recvpath=recvpath, **transopts)
sources = await asyncio.gather(*stasks)
except:
for t in stasks:
t.cancel()
await asyncio.wait(stasks)
raise
for sroot, matcher in sources:
if incls is not None:
matcher.include(*incls)
matcher.exclude(*excls)
s = syncer(sroot, droot)
o = {
'target': lambda v: matcher.match(v['path']),
'stop': lambda vs: matcher.stop([v['path'] for v in vs])
}
o.update(syncopts)
# Confirmation
if auto is not True:
conf = confirm(sroot.name, recvpath=recvpath, **transopts)
conf.head()
if not await s.sync(conf, **o):
break
conf.preview()
if auto is False:
continue
cont = conf.confirm()
if cont == 'S':
continue
elif cont != 'Y':
break
# Go time
if not await s.sync(trans, **o):
raise RuntimeError()
def process_args(cliargs):
"""Process :mod:`argparse`-style output into arguments for :func:`.do_btrsync`."""
prog = cliargs.progress and not cliargs.quiet
class CliTransfer(CliProgress if prog else sync.Transfer):
"""Transfer class tailored to cli arguments."""
if cliargs.quiet < 2:
@staticmethod
def err(e, *args):
print('Error:', e, file=sys.stderr)
if not cliargs.quiet:
async def report(self, vols, par, src, dst):
volpaths, parent, _ = self._sendpaths(vols, par)
recvpath = self._recvpath(volpaths)
print(format_transfer(volpaths, parent, recvpath, verb=cliargs.verbose))
@staticmethod
async def report_done(vols, par, src, dst):
print(" - Done")
class CliConfirm(Confirm):
VERBOSE = cliargs.verbose
if cliargs.quiet < 2:
@staticmethod
def err(e, *args):
print('Error:', e, file=sys.stderr)
transopts = {'replicate_dirs': cliargs.replicate_dirs}
if prog:
transopts['period'] = cliargs.progress_period
srootopts = {'sudo': cliargs.sudo or cliargs.sudo_src}
drootopts = {'sudo': cliargs.sudo or cliargs.sudo_dest}
srootargs = {}
if cliargs.scope is not None:
srootargs['scope'] = cliargs.scope
drootargs = {'create_recvpath': cliargs.create_destpath or cliargs.replicate_dirs}
src_coros = [src_root(s, srootopts, srootargs) for s in cliargs.src]
dst_coro = dest_root(cliargs.dst, drootopts, drootargs)
if cliargs.output_dir or cliargs.output_pipe:
dumpargs = drootargs.copy()
if cliargs.output_pipe:
dumpargs['dump_pipe'] = cliargs.output_pipe
if cliargs.output_ext:
dumpargs['ext'] = cliargs.output_ext
dst_coro = dump_root(cliargs.output_dir, dst_coro, drootopts, dumpargs)
return {
'src_coros': src_coros,
'dst_coro': dst_coro,
'incls': cliargs.include,
'excls': cliargs.exclude,
'auto': cliargs.auto,
'confirm': CliConfirm,
'syncer': IncrSync if cliargs.incremental_only else sync.BtrSync,
'syncopts': {'batch': cliargs.batch, 'parallel': cliargs.parallel, 'transfer_existing': cliargs.existing},
'transfer': CliTransfer,
'transopts': transopts,
}
def cli_parser():
"""Return an :mod:`argparse`-like parser for btrsync's command-line options."""
PROG_VERSION = f'%(prog)s {VERSION}'
COPYRIGHT = '''Copyright © 2023 Andrei Tatar.
This is free software; see the source for copying conditions.
There is NO warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.'''
parser = argparse.ArgumentParser(prog='btrsync', description='Sync btrfs volumes')
parser.add_argument('src', metavar='SOURCE', nargs='+',
help='source location, may contain wildcards')
parser.add_argument('dst', metavar='DESTINATION',
help='destination location')
parser.add_argument('-x', '--exclude', action='append', metavar='GLOB', default=[],
help='exclude subvolumes matching GLOB')
parser.add_argument('-i', '--include', action='append', metavar='GLOB',
help='''explicitly include only subvolumes matching GLOB,
overriding the default behavior of including everything matching SOURCE
and not explicitly excluded''')
parser.add_argument('-f', '--existing', action='store_true',
help='transfer subvolumes even if they exist on the destination')
parser.add_argument('-I', '--incremental-only', action='store_true',
help='only perform incremental transfers, skip the rest')
parser.add_argument('-y', '--no-confirm', action='store_const', const=True, dest='auto',
help='do not ask for confirmation, perform transfers immediately')
parser.add_argument('-n', '--dry-run', action='store_const', const=False, dest='auto',
help='do not perform transfers, print what would have been done')
parser.add_argument('--interactive', action='store_const', const=None, dest='auto',
help='(default) ask for confirmation before performing transfers')
parser.add_argument('-v', '--verbose', action='store_true',
help='print more details')
parser.add_argument('-q', '--quiet', action='count', default=0,
help='supress printing to only errors, specify twice to supress all output except confirmation prompts')
parser.add_argument('-p', '--progress', action='store_true',
help='print progress during transfer')
parser.add_argument('-t', '--progress-period', metavar='SEC', type=float, default=1.0,
help='(requires --progress) print progress every SEC seconds (default: 1)')
parser.add_argument('-B', '--batch', action='store_true',
help='batch multiple subvolumes into a single transfer, as possible')
parser.add_argument('-P', '--parallel', action='store_true',
help='run independent transfers in parallel')
parser.add_argument('-o', '--output-dir', metavar='DIR',
help='dump the send streams into DIR instead of performing a receive at DESTINATION')
parser.add_argument('-O', '--output-pipe', metavar='PIPELINE',
help='''pass the send stream through PIPELINE before dumping to file;
if supplied without --output-dir the output of PIPELINE is sent to stdout''')
parser.add_argument('-e', '--output-ext', metavar='EXT',
help='(requires --output-dir) append EXT to dump filenames')
parser.add_argument('-c', '--create-destpath', action='store_true',
help='create the path specified in DESTINATION if it does not exist')
parser.add_argument('-r', '--replicate-dirs', action='store_true',
help='''(implies `-c') replicate the directory structure
containing subvolumes in SOURCEs over to DESTINATION;
paths are taken relative to the source subvolume root
and applied on top of DESTINATION''')
parser.add_argument('-s', '--sudo', action='store_true',
help="use `sudo' for commands, in both source and destination")
parser.add_argument('--sudo-src', action='store_true',
help="use `sudo' for commands executed in source")
parser.add_argument('--sudo-dest', action='store_true',
help="use `sudo' for commands executed in destination")
parser.add_argument('--scope', choices=('all', 'strict', 'isolated'),
help='''set the scope for subvolume discovery:
'all' considers all accessible subvolumes,
'strict' will only consider subvolumes directly contained by the source subvolume,
and 'isolated' completely ignores all other subvolumes even for internal calculations
(warning: 'isolated' may dumb down automatic incremental transfers)''')
vcopts = parser.add_argument_group('version and copyright')
vcopts.add_argument('-V', '--version', action='version', version=PROG_VERSION,
help='Print version')
vcopts.add_argument('--copyright', action='version', version=COPYRIGHT,
help='Print copyright information')
return parser
def cli_main(argv):
"""Parse command-line arguments from `argv` and run :func:`.do_btrsync`."""
args = cli_parser().parse_args(args=argv)
btrsync_args = process_args(args)
try:
asyncio.run(do_btrsync(**btrsync_args))
except BaseException as e:
if args.quiet < 2:
print(e, file=sys.stderr)
if not args.quiet:
print('Aborted')
return 1
else:
return 0
def main():
"""Call :func:`.cli_main` with :data:`sys.argv`."""
return cli_main(sys.argv[1:])
if __name__ == '__main__':
sys.exit(main())