-
Notifications
You must be signed in to change notification settings - Fork 0
/
jemf
executable file
·1660 lines (1400 loc) · 49.9 KB
/
jemf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python3
#
# jemf: an encrypted mini-filesystem.
#
# (command-line password manager)
#
# Copyright (c) Zev Weiss <[email protected]>
#
import sys
import argparse
from getpass import getuser as lib_getuser, getpass as lib_getpass
import subprocess
import json
import os
import os.path
import errno
import secrets
import socket
import time
import signal
import curses
import tempfile
from cmd import Cmd as CommandInterpreter
import shlex
import types
import functools
import base64
import hashlib
import atexit
import re
import struct
import traceback
from typing import NoReturn, Any, Optional, List, Dict, Tuple, Callable, Union
from typing import Text, Sequence, ItemsView, cast
from types import FrameType
try:
import readline
import string
readline.set_completer_delims(string.whitespace + os.sep)
except ImportError:
pass
default_fspaths = [os.path.expanduser(p) for p in ["~/.config/jemf", "~/.jemf"]]
gpgcmd = "gpg2"
CURRENT_FORMAT_VERSION = 3
# python3's os.pipe() sets O_CLOEXEC, which we don't want
def pipe() -> Tuple[int, int]:
return os.pipe2(0)
class UserError(Exception):
pass
class UsageError(UserError):
pass
class InternalError(Exception):
pass
class ProtocolError(Exception):
pass
class CorruptFS(Exception):
pass
class FormatVersionMismatch(Exception):
pass
class PathLookupError(UserError):
pass
class FileNotFound(PathLookupError):
pass
class NotADirectory(PathLookupError):
pass
def errprint_tty(msg: str) -> None:
if msg != '' and msg[-1] != '\n':
msg += '\n'
print(msg, end='', file=sys.stderr)
zenity_args = ["zenity", "--title", "jemf"]
def errprint_gui(msg: str) -> None:
errprogs = [zenity_args + ["--error", "--text"], ["gxmessage"], ["xmessage"]]
for ep in errprogs:
try:
subprocess.call(ep + [msg])
return
except:
pass
print("(None of %s succeeded)" % [ep[0] for ep in errprogs], file=sys.stderr)
errprint_tty(msg)
def error(msg: str) -> NoReturn:
raise UserError("Error: %s" % msg)
def internal_error(msg: str) -> NoReturn:
raise InternalError("Internal error: %s" % msg)
def getpass_gui(prompt: str) -> str:
progs = [["ssh-askpass"], ["gnome-ssh-askpass"],
zenity_args + ["--entry", "--hide-text", "--text"]]
for p in progs:
try:
proc = subprocess.Popen(p + [prompt], stdout=subprocess.PIPE)
except OSError as ose:
continue
output = proc.communicate()[0].decode("utf-8")
if proc.wait() == 0:
assert output[-1] == '\n'
return output[:-1]
error("None of %s succeeded" % [p[0] for p in progs])
def getpass_tty(prompt: str) -> str:
return lib_getpass(prompt)
getpass_interactive = getpass_tty
errprint = errprint_tty
def report_internal_error(err: InternalError) -> None:
errprint("Internal error! Please report a bug with the following stack trace:")
errprint(''.join(traceback.format_exception(err)))
def confirm_gui(prompt: str) -> bool:
xmsg_flags = ["-buttons", "No:1,Yes:0", "-default", "Yes"]
progs = [zenity_args + ["--question", "--text"],
["gxmessage"] + xmsg_flags,
["xmessage"] + xmsg_flags]
for p in progs:
try:
ret = subprocess.call(p + [prompt])
return ret == 0
except:
pass
error("None of %s succeeded" % [p[0] for p in progs])
def getpass(prompt: str) -> str:
if os.getenv("__JEMF_TEST__") == '1':
tmp = os.getenv("__JEMF_TEST_PASSWORD__")
assert tmp is not None
return tmp
return getpass_interactive(prompt)
def confirmed_getpass(label: str) -> str:
pass1 = getpass("Enter %s: " % label)
pass2 = getpass("Confirm %s: " % label)
if pass1 != pass2:
error("input mismatch")
else:
return pass1
def find_fs() -> Optional[str]:
for p in default_fspaths:
if os.path.exists(p):
return p
return None
class JemfJSONEncoder(json.JSONEncoder):
def default(self, obj: Any) -> Any:
if isinstance(obj, FSObj):
return obj.to_json_obj()
else:
return json.JSONEncoder.default(self, obj)
def pretty_json_dump(d: Any) -> str:
return json.dumps(d, sort_keys=True, indent=4, separators=(',', ': '), cls=JemfJSONEncoder)
def check_tty_print_interlock(force: bool) -> None:
if not force and sys.stdout.isatty():
error("won't print to terminal without '-f'")
def run_gpg(args: List[str], passwd: str, stdin_data: Optional[bytes]=None) -> bytes:
pw_rfd, pw_wfd = pipe()
pw_w = os.fdopen(pw_wfd, "w")
cmdargs = [gpgcmd, "--quiet", "--batch", "--passphrase-fd", str(pw_rfd)] + args
try:
# close write end of passphrase pipe in child process
proc = subprocess.Popen(cmdargs, stdout=subprocess.PIPE, stdin=subprocess.PIPE,
stderr=subprocess.PIPE, close_fds=False,
preexec_fn=pw_w.close)
except OSError as ose:
if ose.errno == errno.ENOENT:
raise UserError("Error attempting to execute %s: %s"
% (gpgcmd, ose.strerror))
else:
raise
# parent doesn't use read end
os.close(pw_rfd)
# send passphrase to gpg child
pw_w.write(passwd)
# probably redundant, since we're just about to close it, but why not...
pw_w.flush()
pw_w.close()
stdout_data, stderr_data = proc.communicate(stdin_data)
retval = proc.wait()
if retval != 0:
msg = stderr_data.decode("utf-8")
# make a common message from gpg a bit more "user friendly"
if "decryption failed: Bad session key" in msg:
msg = "Error: incorrect password."
errprint(msg)
raise subprocess.CalledProcessError(retval, cmdargs)
return stdout_data
def gpg_decrypt(path: str, password: str) -> bytes:
return run_gpg(["-d", "--no-mdc-warning", path], password)
def gpg_encrypt(path: str, password: str, data: bytes) -> bytes:
return run_gpg(["-c", "--yes", "-o", path], password, data)
def update_fsfile(path: str, password: str, plaintext: bytes) -> None:
# If filename is a symlink, do the new file write & rename in
# the directory in which the pointed-to file resides
filename = os.path.realpath(path)
dirpath = os.path.dirname(filename)
tmpfd, tmppath = tempfile.mkstemp(prefix=os.path.basename(filename), dir=dirpath)
output = gpg_encrypt(tmppath, password, plaintext)
if output != b'':
os.unlink(tmppath)
internal_error("Unexpected output from gpg: %r\n" % output)
os.fsync(tmpfd)
os.close(tmpfd)
os.rename(tmppath, path)
# fsync the containing directory to persist the rename
dirfd = os.open(dirpath, os.O_RDONLY)
os.fsync(dirfd)
os.close(dirfd)
hostname = None
def get_hostname() -> str:
global hostname
if hostname is None:
hostname = socket.gethostbyaddr(socket.gethostname())[0]
return hostname
multi_slash_pat = re.compile('//+')
trailing_slash_pat = re.compile('/+$')
def path_components(path: str) -> List[str]:
# collapse redundant slashes and remove trailing slashes
np = re.sub(multi_slash_pat, '/', path)
np = re.sub(trailing_slash_pat, '', np)
while len(np) > 0 and np[0] == os.sep:
np = np[1:]
return np.split(os.sep) if len(np) > 0 else []
default_constraints = "L12:C2:m2:N2:P2"
gen_constraints_desc = """
CONSTRAINTS is a colon-separated list of data constraints of the form TV.
T is a type indicator: L for length, C for capital letters, m for
lower-case letters, N for numerals (digits), and P for punctuation
characters.
V is the numeric value of of the constraint: for L (length) this
specifies the length of the generated data; for all others it provides
a minimum number of characters of that type, though as a special case
V=0 means the character type will be omitted.
The default constraint is %s, specifying a twelve-character
entry with at least two capital letters, at least two lower-case
letters, at least two digits, and at least two punctuation characters.
The user-provided constraint list (which may be empty) can selectively
override these constraints.
Stupid-site example: L8:C2:m2:N2:P0 for an 8-character purely
alphanumeric (but mixed-case) entry.""" % default_constraints
char_classes = {
'C': "ABCDEFGHIJKLMNOPQRSTUVWXYZ", # uppercase
'm': "abcdefghijklmnopqrstuvwxyz", # lowercase
'N': "0123456789", # digits
'P': "!@#$%^&*:._-=+", # punctuation
}
def constraints_to_dict(cstr: str) -> Dict[str, int]:
d = {}
for c in cstr.split(":"):
if c != "":
if c[0] not in char_classes and c[0] != 'L':
error("invalid constraint type: '%c'" % c[0])
d[c[0]] = int(c[1:])
return d
def generate_data(constraints: str, punctuation_chars: Optional[str]=None) -> str:
classes = char_classes.copy()
if punctuation_chars is not None:
classes['P'] = punctuation_chars
cdict = constraints_to_dict(default_constraints)
cdict.update(constraints_to_dict(constraints))
s, padchars = "", ""
for c, chars in classes.items():
for _ in range(0, cdict[c]):
s += secrets.choice(chars)
if cdict[c] > 0:
padchars += chars
# fill remaining space from pad character sets
for _ in range(0, cdict['L']-len(s)):
s += secrets.choice(padchars)
l = list(s)
secrets.SystemRandom().shuffle(l)
return ''.join(l[:cdict['L']])
def isdot(p: str) -> bool:
return p.startswith('.') and p not in [".", ".."]
def recursive_list(obj: 'FSObj', pfx: str, nodirs: bool=False, skip_dots: bool=True) -> List[str]:
l = []
skip = skip_dots and isdot(os.path.basename(pfx))
if (not nodirs or not isinstance(obj, Directory)) and not skip:
l.append(pfx)
if isinstance(obj, Directory) and not skip:
for n, o in obj.items():
l += recursive_list(o, os.path.join(pfx, n), nodirs=nodirs,
skip_dots=skip_dots)
return l
def extract(jd: Dict[str, Any], key: str, expected: type) -> Any:
if key not in jd:
raise CorruptFS("missing %s" % key)
tmp = jd.pop(key)
if not isinstance(tmp, expected):
raise CorruptFS("invalid %s (%s)" % (key, type(tmp).__name__))
return tmp
class FSObj(object):
# to be overridden with something meaningful by derived bclasses
type_id: Optional[str] = None
def __init__(self, init_metadata: bool=True) -> None:
self.mtime: Optional[float] = None
self.mtzname: Optional[str] = None
self.mhost: Optional[str] = None
if init_metadata:
self.touch()
def touch(self) -> None:
self.mtime = time.time()
self.mtzname = time.tzname[time.daylight]
self.mhost = get_hostname()
def _fill_from_json_obj(self, jd: Dict[str, Any]) -> None:
self.mtime = extract(jd, "mtime", float)
self.mtzname = extract(jd, "mtzname", str)
self.mhost = extract(jd, "mhost", str)
def to_json_obj(self) -> Dict[str, Any]:
return dict(mtime=self.mtime, mtzname=self.mtzname, mhost=self.mhost,
type=self.type_id)
class File(FSObj):
type_id = 'f'
def __init__(self, data: Optional[str], init_metadata: bool=True) -> None:
super().__init__(init_metadata)
self.data = data
def _fill_from_json_obj(self, jd: Dict[str, Any]) -> None:
super()._fill_from_json_obj(jd)
self.data = extract(jd, "data", str)
if len(jd) > 0:
raise CorruptFS("extra keys in file object: %s"
% ", ".join(jd.keys()))
@classmethod
def from_json_obj(cls, jd: Dict[str, Any]) -> 'File':
if not isinstance(jd, dict):
raise CorruptFS("invalid file object (%s)" % type(jd).__name__)
f = File(None, init_metadata=False)
f._fill_from_json_obj(jd)
return f
def to_json_obj(self) -> Dict[str, Any]:
d = super().to_json_obj()
d["data"] = self.data
return d
class Directory(FSObj):
type_id = 'd'
def __init__(self, parent: Optional['Directory'], init_metadata: bool=True) -> None:
super().__init__(init_metadata)
if parent is None:
parent = self
else:
assert type(parent) is Directory
self._specials = {os.path.curdir: self, os.path.pardir: parent}
self.dentries: Dict[str, FSObj] = {}
def _fill_from_json_obj(self, jd: Dict[str, Any]) -> None:
for name, item in extract(jd, "entries", dict).items():
t = extract(item, "type", str)
obj: FSObj
if t == 'f':
obj = File.from_json_obj(item)
elif t == 'd':
obj = Directory.from_json_obj(item, parent=self)
elif t == 'l':
obj = Symlink.from_json_obj(item)
else:
raise CorruptFS("invalid object type: %s" % t)
self[name] = obj
# do this last so as to clobber metadata updates done
# automatically while adding dirents
super()._fill_from_json_obj(jd)
if len(jd) > 0:
raise CorruptFS("extra keys in directory object: %s"
% ", ".join(jd.keys()))
@classmethod
def from_json_obj(cls, jd: Dict[str, Any], parent: Optional['Directory']=None) -> 'Directory':
if not isinstance(jd, dict):
raise CorruptFS("invalid directory object (%s)" % type(jd).__name__)
d = Directory(parent, init_metadata=False)
d._fill_from_json_obj(jd)
return d
def to_json_obj(self) -> Dict[str, Any]:
d = super().to_json_obj()
d["entries"] = self.dentries
return d
def __getitem__(self, k: str) -> FSObj:
if k in self._specials:
return self._specials[k]
else:
return self.dentries.__getitem__(k)
def __contains__(self, k: str) -> bool:
return k in self._specials or self.dentries.__contains__(k)
def __setitem__(self, k: str, v: FSObj) -> None:
assert k not in self._specials
self.touch()
self.dentries.__setitem__(k, v)
def __delitem__(self, k: str) -> None:
assert k not in self._specials
self.touch()
self.dentries.__delitem__(k)
def set_parent(self, p: 'Directory') -> None:
self._specials[os.path.pardir] = p
def items(self) -> ItemsView[str, FSObj]:
return self.dentries.items()
class Symlink(FSObj):
type_id = 'l'
def __init__(self, target: Optional[str], init_metadata: bool=True) -> None:
super().__init__(init_metadata)
self.target = target
def _fill_from_json_obj(self, jd: Dict[str, Any]) -> None:
super()._fill_from_json_obj(jd)
self.target = extract(jd, "target", str)
if len(jd) > 0:
raise CorruptFS("extra keys in symlink object: %s"
% ", ".join(jd.keys()))
@classmethod
def from_json_obj(cls, jd: Dict[str, Any]) -> 'Symlink':
if not isinstance(jd, dict):
raise CorruptFS("invalid symlink object (%s)" % type(jd).__name__)
l = Symlink(None, init_metadata=False)
l._fill_from_json_obj(jd)
return l
def to_json_obj(self) -> Dict[str, Any]:
d = super().to_json_obj()
d["target"] = self.target
return d
def is_nonempty_dir(obj: FSObj) -> bool:
return isinstance(obj, Directory) and len(obj.dentries) > 0
ansi_red = "\x1b[31m"
ansi_rst = "\x1b[m"
def clear_terminal() -> None:
curses.setupterm()
e3 = curses.tigetstr("E3")
if e3:
curses.putp(e3)
clr = curses.tigetstr("clear")
if clr:
curses.putp(clr)
sys.stdout.flush()
Args = argparse.Namespace
class JemfShell(CommandInterpreter):
# Time out after 3 minutes waiting for input (can be
# overridden via JEMF_TMOUT environment variable)
input_timeout = 180
def __init__(self, jemf: 'Jemf', args: Args):
self.fs = jemf
self.args = args
self.setprompt()
tmout = os.getenv("JEMF_TMOUT")
if tmout:
try:
self.input_timeout = int(tmout)
except ValueError:
errprint("JEMF_TMOUT value not a valid base-10 integer, retaining default (%d)"
% self.input_timeout)
CommandInterpreter.__init__(self)
def setprompt(self) -> None:
if sys.stdin.isatty():
ro = "[ro]" if self.fs.read_only else ""
try:
cwd = Jemf.get_dirname(self.fs.cwd)
except InternalError as ex:
report_internal_error(ex)
cwd = '???'
self.prompt = "%sjemf[%s:%s]%s> %s" % (ansi_red, self.fs.filename, cwd, ro, ansi_rst)
else:
self.prompt = ""
def do_EOF(self, args: List[str]) -> bool:
"""EOF (^D): exit shell session."""
if sys.stdin.isatty():
sys.stdout.write('\n')
return True
def set_input_timeout_alarm(self) -> None:
if self.input_timeout <= 0:
return
def alrm_handler(signum: int, frame: Optional[FrameType]) -> None:
clear_terminal()
errprint("Input timeout exceeded; exiting.\n")
sys.exit(1)
signal.signal(signal.SIGALRM, alrm_handler)
signal.alarm(self.input_timeout)
def preloop(self) -> None:
self.set_input_timeout_alarm()
def postcmd(self, stop: bool, line: str) -> bool:
if not stop:
self.setprompt()
self.set_input_timeout_alarm()
return stop
# This is defined to override default "repeat previous
# command" behavior on getting an empty line.
def emptyline(self) -> bool:
return False
def default(self, line: str) -> None:
errmsg = "Unrecognized command: %s" % shlex.split(line)[0]
if self.args.exit_on_failure:
raise UserError(errmsg)
else:
errprint("%s\n(see 'help')" % errmsg)
def completedefault(self, orig_text: str, line: str, beg: int, end: int) -> List[str]: # type: ignore[override]
try:
if shlex.split(line[:end] + ".")[-1] != ".":
text = shlex.split(line[:end])[-1]
else:
text = orig_text
except ValueError:
text = orig_text
ppath, pfx = os.path.split(text)
try:
pdir = self.fs.lookup_path(ppath)
except PathLookupError:
return []
if not isinstance(pdir, Directory):
return []
base = orig_text[:-len(pfx)]
l = []
for name in pdir.dentries:
if name.startswith(pfx):
suffix = os.sep if isinstance(pdir.dentries[name], Directory) else " "
l.append(base + name + suffix)
return l
@classmethod
def _add_cmd_handler(cls, cmd: 'JemfCommand') -> None:
def cmdfun(self: 'JemfShell', argstr: str) -> None:
try:
args = cmd.parser.parse_args(shlex.split(argstr))
except (ValueError, UsageError) as exc:
if exc.args[0]:
errprint(exc.args[0])
return
except HelpFlagPassed:
return
# merge in original self.args if not overridden
for k,v in self.args.__dict__.items():
if k not in args:
setattr(args, k, v)
try:
cmd(self.fs, args)
except (InternalError, UserError) as err:
if self.args.exit_on_failure:
raise
if isinstance(err, UserError):
if err.args[0]:
errprint(err.args[0])
elif isinstance(err, InternalError):
report_internal_error(err)
cmdfun.__doc__ = cmd.parser.format_help()
setattr(cls, "do_%s" % cmd.name, cmdfun)
# For throwing when a -h/--help flag is passed, so that a caller N levels up can
# either skip executing the command (in shell mode) or just exit cleanly (in
# batch mode).
class HelpFlagPassed(Exception):
pass
# Custom help action that doesn't unconditionally exit (which we don't want in
# shell mode).
class JemfHelpAction(argparse.Action):
def __call__(self, parser: argparse.ArgumentParser, namespace: Args,
values: Union[Text, Sequence[Any], None], option_string: Optional[str]=None) -> NoReturn:
parser.print_help()
print("") # add an extra line so 'foo --help' output matches 'help foo' (cmd.do_help() adds one)
raise HelpFlagPassed()
def mk_raise_usage_error(usage: str) -> Callable[[str], NoReturn]:
usg = usage.strip()
def func(msg: str) -> NoReturn:
raise UsageError("%s\n%s" % (usg, msg.strip()))
return func
def mk_path_arg(desc: str, opt: bool=False, multiple_ok: bool=False) -> Tuple[Tuple[str], Dict[str, str]]:
d = dict(metavar="PATH", help=desc)
if opt and multiple_ok:
d["nargs"] = '*'
elif multiple_ok:
d["nargs"] = '+'
elif opt:
d["nargs"] = '?'
return (("path%s" % ("s" if multiple_ok else ""),), d)
JemfCommandFn = Callable[['Jemf', Args], None]
JemfCommandArgs = List[Tuple[Union[Tuple[str], Tuple[str, str]], Dict[str, Any]]]
JemfCommandDecorator = Callable[[JemfCommandFn], 'JemfCommand']
class JemfCommand(object):
_all_commands: List['JemfCommand'] = []
def __init__(self, cmdfn: JemfCommandFn, cmd_args: Optional[JemfCommandArgs]=None,
parser_kwargs: Optional[Dict[str, Any]]=None, read_only: bool=False,
shell_only: bool=False, batch_only: bool=False) -> None:
self.name = cmdfn.__name__
self.desc = cmdfn.__doc__
self.cmdfn = cmdfn
self.cmd_args = cmd_args if cmd_args is not None else []
self.parser_kwargs = parser_kwargs if parser_kwargs is not None else {}
self.read_only = read_only
self.shell_only = shell_only
self.batch_only = batch_only
JemfCommand._all_commands.append(self)
def __call__(self, origself: 'Jemf', args: Args) -> None:
return self.cmdfn(origself, args)
def add_to(self, subparsers: 'argparse._SubParsersAction[argparse.ArgumentParser]') -> None:
self.parser = subparsers.add_parser(self.name, description=self.desc,
help=self.desc, add_help=False, **self.parser_kwargs)
self.parser.set_defaults(action=self)
for a in self.cmd_args:
self.parser.add_argument(*a[0], **a[1])
# Custom help argument that doesn't unconditionally exit
# (so that we can avoid exiting when --help is passed to
# a command in shell mode).
self.parser.add_argument('-h', "--help", action=JemfHelpAction, default=argparse.SUPPRESS,
nargs=0, help="show this help message")
def create_parser(self) -> None:
tmp = argparse.ArgumentParser().add_subparsers()
self.add_to(tmp)
# decorator for commands that modify the FS
@staticmethod
def writer(cmd_args: JemfCommandArgs, **kwargs: Any) -> JemfCommandDecorator:
def wr_dec(fn: JemfCommandFn) -> JemfCommand:
@functools.wraps(fn)
def new_fn(self: 'Jemf', args: Args) -> None:
# attempts to execute @writer commands should be
# caught and rejected well before reaching this point
assert not self.read_only, "@writer command executed on read-only FS"
fn(self, args)
self._write_out()
return JemfCommand(new_fn, cmd_args=cmd_args, read_only=False, **kwargs)
return wr_dec
# decorator for read-only commands
@staticmethod
def reader(cmd_args: JemfCommandArgs, **kwargs: Any) -> JemfCommandDecorator:
def rd_dec(fn: JemfCommandFn) -> JemfCommand:
return JemfCommand(fn, cmd_args=cmd_args, read_only=True, **kwargs)
return rd_dec
SOCKBUF_SIZE = 1024
class Jemf(object):
reader = JemfCommand.reader
writer = JemfCommand.writer
commands = JemfCommand._all_commands
def __init__(self, filename: str, password: Optional[str], read_only: bool) -> None:
self.filename = filename
self.password = password
self.read_only = read_only
self.pipe_to: Optional[str] = None
self.data = Directory(None)
self.metadata: Dict[str, Any] = {}
self.cwd = self.data
self.socket: Optional[socket.socket] = None
self.shell_mode = False
def _to_json(self) -> str:
d: Dict[str, Any] = {}
d["data"] = self.data
d["metadata"] = self.metadata
return pretty_json_dump(d)
def _from_json(self, s: str) -> None:
jd = json.loads(s)
self.metadata = extract(jd, "metadata", dict)
fileversion = self.metadata.get("format_version", 1)
if fileversion < CURRENT_FORMAT_VERSION:
raise FormatVersionMismatch("data is in v%d format, we need v%d"
% (fileversion, CURRENT_FORMAT_VERSION))
data = extract(jd, "data", dict)
t = extract(data, "type", str)
if t != 'd':
raise CorruptFS("root entry must be a directory, got '%s'" % t)
self.data = Directory.from_json_obj(data)
if len(jd):
raise CorruptFS("extra keys in top level: %s"
% ", ".join(jd.keys()))
self.cwd = self.data
@classmethod
def load_from_path(cls, filename: str, password: str, read_only: bool) -> 'Jemf':
fs = cls(filename, password, read_only)
s = gpg_decrypt(filename, password)
try:
fs._from_json(s.decode("utf-8"))
except KeyError:
raise CorruptFS("data/metadata not found")
return fs
# return a (success-boolean, data) tuple
def recv_response(self) -> Tuple[bool, Optional[str]]:
buf = b''
assert self.socket is not None
while len(buf) == 0 or buf[-1] != b'\n'[0]:
t = self.socket.recv(SOCKBUF_SIZE)
if len(t) == 0:
raise ProtocolError("invalid response from server")
else:
buf += t
parts = buf[:-1].decode("utf-8").split(' ', 1)
data = None if len(parts) == 1 else parts[1]
if parts[0] == "OK":
status = True
elif parts[0] == "ERROR":
status = False
else:
raise ProtocolError("invalid response from server: %r" % buf)
return (status, data)
@classmethod
def from_server(cls, socket: socket.socket, filename: str, read_only: bool) -> 'Jemf':
fs = cls(filename, None, read_only)
fs.socket = socket
fs.socket.sendall(b"%s %s\n" % (READ_REQ, base64.b64encode(filename.encode("utf-8"))))
ok, data = fs.recv_response()
if ok:
assert data is not None
fs._from_json(base64.b64decode(data).decode("utf-8"))
return fs
else:
raise ProtocolError(data)
def output_secret(self, s: str, force: bool) -> None:
if self.pipe_to is None:
check_tty_print_interlock(force)
print(s, end='\n' if sys.stdout.isatty() else '')
else:
proc = subprocess.Popen(self.pipe_to, shell=True, stdin=subprocess.PIPE)
proc.communicate(s.encode("utf-8"))
status = proc.wait()
if status != 0:
errprint("['%s' exited with status %d]" % (self.pipe_to, status))
else:
# might stderr be more appropriate here?
print("[sent output to '%s']" % self.pipe_to)
def _write_out(self) -> None:
self.metadata["last_modification_time"] = time.time()
self.metadata["last_modification_tzname"] = time.tzname[time.daylight]
self.metadata["last_modification_host"] = get_hostname()
jemf_buf = self._to_json().encode("utf-8") + b'\n'
if self.socket is not None:
assert self.password is None
self.socket.sendall(b"%s %s %s\n" % (WRITE_REQ,
base64.b64encode(self.filename.encode("utf-8")),
base64.b64encode(jemf_buf)))
ok, data = self.recv_response()
if ok:
return
else:
raise ProtocolError(data)
else:
assert self.password is not None
update_fsfile(self.filename, self.password, jemf_buf)
def _lookup_comps(self, root: Directory, comps: List[str]) -> Union[File, Directory]:
fsdir: FSObj = root
trail = os.sep
for c in comps:
if not isinstance(fsdir, Directory):
raise NotADirectory("'%s' is not a directory" % trail)
prevdir = fsdir
try:
fsdir = fsdir[c]
except KeyError:
raise FileNotFound("'%s' does not exist in '%s'" % (c, trail))
# TODO: ELOOP detection?
if isinstance(fsdir, Symlink):
assert fsdir.target is not None # mypy
fsdir = self.lookup_path(fsdir.target, override_cwd=prevdir)
trail = os.path.join(trail, c)
assert isinstance(fsdir, (File, Directory))
return fsdir
def lookup_path(self, path: str, override_cwd: Optional[Directory]=None) -> Union[File, Directory]:
if len(path) > 0 and path[0] == os.sep:
root = self.data
else:
root = override_cwd if override_cwd else self.cwd
return self._lookup_comps(root, path_components(path))
def lookup_for_edit(self, path: str, should_exist: Optional[bool], override_cwd: Optional[Directory]=None) -> Tuple[Directory, str, Optional[FSObj]]:
"""Lookup 'path', returning a 3-tuple of the parent directory, the
last component of 'path', and the object itself (or None
if it doesn't exist).
"""
if len(path) > 0 and path[0] == os.sep:
root = self.data
else:
root = override_cwd if override_cwd else self.cwd
comps = path_components(path)
if len(comps) > 0:
parent = self._lookup_comps(root, comps[:-1])
tgtcomp = comps[-1]
else:
parent = root
tgtcomp = os.path.curdir
if not isinstance(parent, Directory):
error("'%s' is not a directory" % os.path.join(*comps))
if should_exist is not None:
if not should_exist and tgtcomp in parent:
error("'%s' already exists" % path)
elif should_exist and tgtcomp not in parent:
error("'%s' does not exist" % path)
existing = parent[tgtcomp] if tgtcomp in parent else None
return (parent, tgtcomp, existing)
# dumb brute force...could introduce some extra machinery to make this
# more efficient, but it's probably good enough.
@staticmethod
def get_dirname(obj: Directory) -> str:
parent = obj[os.path.pardir]
assert isinstance(parent, Directory)
if parent is obj:
return os.sep
for k, v in parent.items():
if v is obj:
return os.path.join(Jemf.get_dirname(parent), k)
internal_error("unable to determine directory path")
# some common flags
generate_arg = (("-g", "--generate"), dict(metavar="CONSTRAINTS", type=str,
help="auto-generate entry data"))
data_arg = (("-D", "--data"), dict(metavar="DATA", type=str,
help="use argument as data instead of prompting"))
punctchars_arg = (("-P", "--punctuation-chars"), dict(metavar="CHARS", type=str,
help="specify punctuation characters"
" allowed in generated data"))
force_print_arg = (("-f", "--force"), dict(help="disable anti-tty-print interlock",
action="store_true"))
force_overwrite_arg = (("-f", "--force"), dict(help="overwrite an existing file",
action="store_true"))
@writer([force_overwrite_arg], batch_only=True)
def mkfs(self, args: Args) -> None:
"""initialize a new jemf FS"""
self.data = Directory(None)
self.metadata = { "format_version": CURRENT_FORMAT_VERSION }
@writer([])
def chpass(self, args: Args) -> None:
"""change the master password of a jemf FS"""
if self.password is None:
error("can't change password via background-process socket")
# Only verify that the user knows the current password
# in shell mode (in batch mode they'll have just
# entered it).
if self.shell_mode:
if getpass("Enter current password for %s: " % self.filename) != self.password:
error("password incorrect")
newpass = confirmed_getpass("new password for %s" % self.filename)
self.password = newpass
@reader([mk_path_arg("directory to change to (default: root)", opt=True)],
shell_only=True)
def cd(self, args: Args) -> None:
"""change working directory"""
if args.path is not None:
obj = self.lookup_path(args.path)
if not isinstance(obj, Directory):
error("'%s' is not a directory" % args.path)
else:
obj = self.data
self.cwd = obj
@reader([], shell_only=True)
def pwd(self, args: Args) -> None:
"""print working directory"""
print(Jemf.get_dirname(self.cwd))
@writer([mk_path_arg("the directory to create")])
def mkdir(self, args: Args) -> None:
"""create a new directory"""
parent, newname, _ = self.lookup_for_edit(args.path, should_exist=False)
d = Directory(parent)
parent[newname] = d
@writer([generate_arg, data_arg, punctchars_arg, mk_path_arg("the file to create")],
parser_kwargs=dict(epilog=gen_constraints_desc,
formatter_class=argparse.RawDescriptionHelpFormatter))
def create(self, args: Args) -> None:
"""create a new file"""
parent, newname, existing = self.lookup_for_edit(args.path, should_exist=None)
if existing is not None:
if not isinstance(existing, Symlink):
error("'%s' already exists" % args.path)
assert existing.target is not None # mypy