From b360ae5d1f6278430d0ca5b31c943d32f5216c01 Mon Sep 17 00:00:00 2001 From: dyf Date: Thu, 10 Nov 2022 23:23:08 -0800 Subject: [PATCH 1/3] output dir is still not working right --- argschema/fields/files.py | 110 +++++++++++++++++++---------- argschema/schemas.py | 88 +++++++++++++++--------- test/fields/test_files.py | 141 +++++++++++++------------------------- 3 files changed, 173 insertions(+), 166 deletions(-) diff --git a/argschema/fields/files.py b/argschema/fields/files.py index 1172d1b8..22a54fe3 100644 --- a/argschema/fields/files.py +++ b/argschema/fields/files.py @@ -1,6 +1,5 @@ '''marshmallow fields related to validating input and output file paths''' import os -import marshmallow as mm import tempfile import errno import sys @@ -38,21 +37,21 @@ def validate_outpath(path): except Exception as e: if isinstance(e, OSError): if e.errno == errno.ENOENT: - raise mm.ValidationError( + raise ValueError( "%s is not in a directory that exists" % path) elif e.errno == errno.EACCES: - raise mm.ValidationError( + raise ValueError( "%s does not appear you can write to path" % path) else: - raise mm.ValidationError( + raise ValueError( "Unknown OSError: {}".format(e.message)) else: - raise mm.ValidationError( + raise ValueError( "Unknown Exception: {}".format(e.message)) -class OutputFile(mm.fields.Str): - """OutputFile :class:`marshmallow.fields.Str` subclass which is a path to a +class OutputFile(str): + """OutputFile :class:`str` subclass which is a path to a file location that can be written to by the current user (presently tested by opening a temporary file to that location) @@ -65,7 +64,12 @@ class OutputFile(mm.fields.Str): """ - def _validate(self, value): + @classmethod + def __get_validators__(cls): + yield cls.validate + + @classmethod + def validate(cls, value): """ Parameters @@ -86,15 +90,17 @@ def _validate(self, value): try: path = os.path.dirname(value) except Exception as e: # pragma: no cover - raise mm.ValidationError( + raise ValueError( "%s cannot be os.path.dirname-ed" % value) # pragma: no cover validate_outpath(path) + return cls(value) + class OutputDirModeException(Exception): pass -class OutputDir(mm.fields.Str): - """OutputDir is a :class:`marshmallow.fields.Str` subclass which is a path to +class OutputDir(str): + """OutputDir is a :class:`str` subclass which is a path to a location where this module will write files. Validation will check that the directory exists and create the directory if it is not present, and will fail validation if the directory cannot be created or cannot be @@ -104,86 +110,116 @@ class OutputDir(mm.fields.Str): ========== mode: str mode to create directory - *args: - smae as passed to marshmallow.fields.Str - **kwargs: - same as passed to marshmallow.fields.Str """ - def __init__(self, mode=None, *args, **kwargs): - self.mode = mode - if (self.mode is not None) & (sys.platform == "win32"): + def __new__(cls, value='', mode=None): + obj = str.__new__(cls, value) + obj.mode = mode + + if (mode is not None) & (sys.platform == "win32"): raise OutputDirModeException( "Setting mode of OutputDir supported only on posix systems") - super(OutputDir, self).__init__(*args, **kwargs) - def _validate(self, value): + return obj + + def __class_getitem__(cls, mode): + return lambda value: cls(value, mode=mode) + + @classmethod + def __get_validators__(cls): + yield cls.validate + + @classmethod + def validate(cls, value): + # this gets called before pydantic attempts to typecast + if not isinstance(value, cls): + value = cls(value) + + mode = value.mode + value = str(value) + if not os.path.isdir(value): try: os.makedirs(value) - if self.mode is not None: - os.chmod(value, self.mode) + if mode is not None: + os.chmod(value, mode) except OSError as e: if e.errno == errno.EEXIST: pass else: - raise mm.ValidationError( + raise ValueError( "{} is not a directory and you cannot create it".format( value) ) - if self.mode is not None: + if mode is not None: try: - assert((os.stat(value).st_mode & 0o777) == self.mode) + assert((os.stat(value).st_mode & 0o777) == mode) except AssertionError: - raise mm.ValidationError( + raise ValueError( "{} does not have the mode ({}) that was specified ".format( - value, self.mode) + value, mode) ) except os.error: - raise mm.ValidationError( + raise ValueError( "cannot get os.stat of {}".format(value) ) + # use outputfile to test that a file in this location is a valid path validate_outpath(value) + return value + def validate_input_path(value): if not os.path.isfile(value): - raise mm.ValidationError("%s is not a file" % value) + raise ValueError("%s is not a file" % value) else: try: with open(value) as f: pass except Exception as value: - raise mm.ValidationError("%s is not readable" % value) + raise ValueError("%s is not readable" % value) -class InputDir(mm.fields.Str): +class InputDir(str): """InputDir is :class:`marshmallow.fields.Str` subclass which is a path to a a directory that exists and that the user can access (presently checked with os.access) """ - def _validate(self, value): + @classmethod + def __get_validators__(cls): + yield cls.validate + + @classmethod + def validate(cls, value): if not os.path.isdir(value): - raise mm.ValidationError("%s is not a directory") + raise ValueError("%s is not a directory") if sys.platform == "win32": try: x = list(os.scandir(value)) except PermissionError: - raise mm.ValidationError( + raise ValueError( "%s is not a readable directory" % value) else: if not os.access(value, os.R_OK): - raise mm.ValidationError( + raise ValueError( "%s is not a readable directory" % value) + return cls(value) + -class InputFile(mm.fields.Str): +class InputFile(str): """InputDile is a :class:`marshmallow.fields.Str` subclass which is a path to a file location which can be read by the user (presently passes os.path.isfile and os.access = R_OK) """ - def _validate(self, value): + @classmethod + def __get_validators__(cls): + yield cls.validate + + @classmethod + def validate(cls, value): validate_input_path(value) + return cls(value) diff --git a/argschema/schemas.py b/argschema/schemas.py index 52dac07c..96684e69 100644 --- a/argschema/schemas.py +++ b/argschema/schemas.py @@ -1,44 +1,64 @@ -import marshmallow as mm -from .fields import LogLevel, InputFile, OutputFile +from pydantic import BaseModel, Field +from pydantic.main import ModelMetaclass +from typing import get_origin +from enum import Enum +import logging +import argparse +from .fields import InputFile, OutputFile +class LogLevel(Enum): + DEBUG = logging.DEBUG + INFO = logging.INFO + WARNING = logging.WARNING + ERROR = logging.ERROR + CRITICAL = logging.CRITICAL -class DefaultSchema(mm.Schema): - """mm.Schema class with support for making fields default to - values defined by that field's arguments. - """ +class ArgSchema(BaseModel): + input_json: InputFile = Field('input.json', description='zee inputs') + output_json: OutputFile = Field('output.json', description='zee outputs') + log_level: LogLevel = Field(logging.ERROR, description='zee log level') - @mm.pre_load - def make_object(self, in_data, **kwargs): - """marshmallow.pre_load decorated function for applying defaults on deserialation + @classmethod + def from_args(cls, a): + arg_data = vars(a) + with open(arg_data['input_json'],'r') as f: + input_data = json.load(f) - Parameters - ---------- - in_data : + input_data['input_json'] = arg_data['input_json'] + input_data['output_json'] = arg_data['output_json'] + input_data['log_level'] = arg_data['log_level'] + + return populate_schema_from_data(cls, input_data, arg_data) + @classmethod + def argument_parser(cls, *args, **kwargs): + parser = argparse.ArgumentParser(*args, **kwargs) + add_arguments_from_schema(parser, cls) + return parser - Returns - ------- - dict - a dictionary with default values applied +def populate_schema_from_data(schema, input_data, arg_data): + xdata = {} - """ - for name, field in self.fields.items(): - if name not in in_data: - if field.default is not mm.missing: - in_data[name] = field.default - return in_data + for field_name, field in schema.__fields__.items(): + if isinstance(field.outer_type_, ModelMetaclass): + sub_input_data = input_data[field_name] + sub_arg_data = { k.replace(f'{field_name}.',''):v for k,v in arg_data.items() if k.startswith(field_name)} + xdata[field_name] = populate_schema_from_data(field.type_, sub_input_data, sub_arg_data) + else: + arg_value = arg_data.get(field_name, None) + xdata[field_name] = input_data[field_name] if arg_value is None else arg_value + + return schema(**xdata) +def add_arguments_from_schema(parser, schema, parent_prefix=''): + for field_name, field in schema.__fields__.items(): + fn = field_name.replace('_','-') + + if isinstance(field.outer_type_, ModelMetaclass): + add_arguments_from_schema(parser, field.outer_type_, parent_prefix=f'{parent_prefix}{fn}.') + elif get_origin(field.outer_type_) is list: + parser.add_argument(f'--{parent_prefix}{fn}', nargs='+', type=field.type_, default=field.default, help=field.field_info.description) + else: + parser.add_argument(f'--{parent_prefix}{fn}', type=field.type_, default=field.default, help=field.field_info.description) -class ArgSchema(DefaultSchema): - """The base marshmallow schema used by ArgSchemaParser to identify - input_json and output_json files and the log_level - """ - input_json = InputFile( - description="file path of input json file") - - output_json = OutputFile( - description="file path to output json file") - log_level = LogLevel( - default='ERROR', - description="set the logging level of the module") diff --git a/test/fields/test_files.py b/test/fields/test_files.py index 29419eca..7d374d5c 100644 --- a/test/fields/test_files.py +++ b/test/fields/test_files.py @@ -1,8 +1,8 @@ import pytest -from argschema import ArgSchemaParser, ArgSchema +from argschema import ArgSchema from argschema.fields import InputFile, OutputFile, InputDir, OutputDir from argschema.fields.files import OutputDirModeException -import marshmallow as mm +from pydantic import Field import os import sys if sys.platform == "win32": @@ -12,8 +12,7 @@ # OUTPUT FILE TESTS class BasicOutputFile(ArgSchema): - output_file = OutputFile(required=True, - description='a simple output file') + output_file: OutputFile = Field(..., description='a simple output file') output_file_example = { @@ -38,63 +37,50 @@ def test_outputfile_no_write(tmpdir): outdir.chmod(0o444) outfile = outdir.join('test') - with pytest.raises(mm.ValidationError): - ArgSchemaParser(input_data={'output_file': str(outfile)}, - schema_type=BasicOutputFile, args=[]) + with pytest.raises(ValueError): + BasicOutputFile(output_file=str(outfile)) if sys.platform != "win32": outdir.chmod(0o666) def test_outputfile_not_a_path(): - with pytest.raises(mm.ValidationError): - ArgSchemaParser(input_data={'output_file': 10}, - schema_type=BasicOutputFile, args=[]) - + with pytest.raises(ValueError): + BasicOutputFile(output_file=10) + def test_enoent_outputfile_failed(): - with pytest.raises(mm.ValidationError): - ArgSchemaParser( - input_data=enoent_outfile_example, - schema_type=BasicOutputFile, args=[]) + with pytest.raises(ValueError): + BasicOutputFile(**enoent_outfile_example) def test_output_file_relative(): - ArgSchemaParser( - input_data=output_file_example, schema_type=BasicOutputFile, args=[]) + BasicOutputFile(**output_file_example) def test_output_path(tmpdir): file_ = tmpdir.join('testoutput.json') - args = ['--output_json', str(file_)] - ArgSchemaParser(args=args) + ArgSchema(output_json=str(file_)) def test_output_path_cannot_write(): - with pytest.raises(mm.ValidationError): - file_ = '/etc/notok/notalocation.json' - args = ['--output_json', str(file_)] - ArgSchemaParser(args=args) + with pytest.raises(ValueError): + ArgSchema(output_json='/etc/notok/notalocation.json') def test_output_path_noapath(): - with pytest.raises(mm.ValidationError): - file_ = '@/afa\\//' - args = ['--output_json', str(file_)] - ArgSchemaParser(args=args) + with pytest.raises(ValueError): + ArgSchema(output_json='@/afa\\//') class BasicOutputDir(ArgSchema): - output_dir = OutputDir(required=True, description="basic output dir") + output_dir: OutputDir = Field(..., description="basic output dir") def test_output_dir_basic(tmpdir): outdir = tmpdir.mkdir('mytmp') - output_dir_example = { - 'output_dir': str(outdir) - } - ArgSchemaParser(schema_type=BasicOutputDir, - input_data=output_dir_example, - args=[]) + + BasicOutputDir(output_dir=str(outdir)) + def test_output_dir_bad_permission(tmpdir): outdir = tmpdir.mkdir('no_read') @@ -107,49 +93,31 @@ def test_output_dir_bad_permission(tmpdir): win32security.SetFileSecurity (str(outdir), win32security.DACL_SECURITY_INFORMATION, sd) else: outdir.chmod(0o222) - output_dir_example = { - 'output_dir': outdir - } - with pytest.raises(mm.ValidationError): - ArgSchemaParser(schema_type=BasicOutputDir, - input_data=output_dir_example, - args=[]) + + with pytest.raises(ValueError): + BasicOutputDir(output_dir=outdir) def test_output_dir_bad_location(): - output_dir_example = { - 'output_dir': '///\\\//\/' - } - with pytest.raises(mm.ValidationError): - ArgSchemaParser(schema_type=BasicOutputDir, - input_data=output_dir_example, - args=[]) + with pytest.raises(ValueError): + BasicOutputDir(output_dir='///\\\//\/') if sys.platform != "win32": class ModeOutputDirSchema(ArgSchema): - output_dir = OutputDir(required=True, - description="775 output directory", - mode=0o775) + output_dir = OutputDir(mode=0o775) @pytest.mark.skipif(sys.platform != "win32", reason="no general support for chmod octal in windows") def test_windows_outdir_mode_fail(): with pytest.raises(OutputDirModeException): - output_dir = OutputDir(required=True, - description="775 output directory", - mode=0o775) + output_dir = OutputDir(mode=0o775) @pytest.mark.skipif(sys.platform == "win32", reason="no general support for chmod octal in windows") def test_mode_output_osdir(tmpdir): outdir = tmpdir.join('mytmp') - output_dir_example = { - 'output_dir': str(outdir) - } - mod = ArgSchemaParser(schema_type=ModeOutputDirSchema, - input_data=output_dir_example, - args=[]) - assert((os.stat(mod.args['output_dir']).st_mode & 0o777) == 0o775) + mod = ModeOutputDirSchema(output_dir=str(outdir)) + assert((os.stat(mod.output_dir).st_mode & 0o777) == 0o775) @pytest.mark.skipif(sys.platform == "win32", reason="no general support for chmod octal in windows") @@ -160,17 +128,14 @@ def test_failed_mode(tmpdir): output_dir_example = { 'output_dir': str(outdir) } - with pytest.raises(mm.ValidationError): - ArgSchemaParser(schema_type=ModeOutputDirSchema, - input_data=output_dir_example, - args=[]) + with pytest.raises(ValueError): + ModeOutputDirSchema(output_dir=str(outdir)) # INPUT FILE TESTS class BasicInputFile(ArgSchema): - input_file = InputFile(required=True, - description='a simple file') + input_file: InputFile = Field(..., description='a simple file') input_file_example = { @@ -181,15 +146,15 @@ class BasicInputFile(ArgSchema): def test_relative_file_input(): with open(input_file_example['input_file'], 'w') as fp: fp.write("test") - ArgSchemaParser( - input_data=input_file_example, schema_type=BasicInputFile, args=[]) + + BasicInputFile(**input_file_example) + os.remove(input_file_example['input_file']) def test_relative_file_input_failed(): - with pytest.raises(mm.ValidationError): - ArgSchemaParser( - input_data=input_file_example, schema_type=BasicInputFile, args=[]) + with pytest.raises(ValueError): + BasicInputFile(**input_file_example) def test_access_inputfile_failed(): @@ -210,33 +175,22 @@ def test_access_inputfile_failed(): else: os.chmod(input_file_example['input_file'], 0o222) - with pytest.raises(mm.ValidationError): - ArgSchemaParser( - input_data=input_file_example, schema_type=BasicInputFile, args=[]) + with pytest.raises(ValueError): + BasicInputFile(**input_file_example) os.remove(input_file_example['input_file']) # INPUTDIR TESTS class BasicInputDir(ArgSchema): - input_dir = InputDir(required=True, - description='a simple file') + input_dir: InputDir = Field(description='a simple file') def test_basic_inputdir(tmpdir): - input_data = { - 'input_dir': str(tmpdir) - } - ArgSchemaParser(input_data=input_data, - schema_type=BasicInputDir, args=[]) - + BasicInputDir(input_dir=str(tmpdir)) def test_bad_inputdir(): - input_data = { - 'input_dir': 'not_a_dir' - } - with pytest.raises(mm.ValidationError): - ArgSchemaParser(input_data=input_data, - schema_type=BasicInputDir, args=[]) + with pytest.raises(ValueError): + BasicInputDir(input_dir='not_a_dir') @pytest.mark.skipif(sys.platform == "win32", reason="can't get working after migrating from appveyor to" @@ -256,9 +210,6 @@ def test_inputdir_no_access(tmpdir): win32security.DACL_SECURITY_INFORMATION, sd) else: input_dir.chmod(0o222) - input_data = { - 'input_dir': str(input_dir) - } - with pytest.raises(mm.ValidationError): - ArgSchemaParser(input_data=input_data, - schema_type=BasicInputDir, args=[]) + + with pytest.raises(ValueError): + BasicInputDir(input_dir=str(input_dir)) \ No newline at end of file From dea46ff78bc498cfe7a5ea29d6bdee112d2d1ca1 Mon Sep 17 00:00:00 2001 From: dyf Date: Thu, 10 Nov 2022 23:30:20 -0800 Subject: [PATCH 2/3] tweaking test --- test/fields/test_files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/fields/test_files.py b/test/fields/test_files.py index 7d374d5c..3742b8ea 100644 --- a/test/fields/test_files.py +++ b/test/fields/test_files.py @@ -104,7 +104,7 @@ def test_output_dir_bad_location(): if sys.platform != "win32": class ModeOutputDirSchema(ArgSchema): - output_dir = OutputDir(mode=0o775) + output_dir = OutputDir[0o775] @pytest.mark.skipif(sys.platform != "win32", reason="no general support for chmod octal in windows") From 8266d98217d7e3df3ca2a92addf57b2599668238 Mon Sep 17 00:00:00 2001 From: dyf Date: Fri, 11 Nov 2022 10:05:01 -0800 Subject: [PATCH 3/3] removing InputFile, InputDir --- argschema/__init__.py | 2 +- argschema/fields/__init__.py | 4 +- argschema/fields/files.py | 85 ++---------------------------------- argschema/schemas.py | 6 +-- test/fields/test_files.py | 65 +++++++-------------------- 5 files changed, 24 insertions(+), 138 deletions(-) diff --git a/argschema/__init__.py b/argschema/__init__.py index f28aafde..e83394b4 100644 --- a/argschema/__init__.py +++ b/argschema/__init__.py @@ -1,5 +1,5 @@ '''argschema: flexible definition, validation and setting of parameters''' -from .fields import InputFile, InputDir, OutputFile, OptionList # noQA:F401 +from .fields import OutputFile, OptionList # noQA:F401 from .schemas import ArgSchema # noQA:F401 from .argschema_parser import ArgSchemaParser # noQA:F401 from .deprecated import JsonModule, ModuleParameters # noQA:F401 diff --git a/argschema/fields/__init__.py b/argschema/fields/__init__.py index c752233c..d2963995 100644 --- a/argschema/fields/__init__.py +++ b/argschema/fields/__init__.py @@ -1,13 +1,13 @@ '''sub-module for custom marshmallow fields of general utility''' from marshmallow.fields import * # noQA:F401 from marshmallow.fields import __all__ as __mmall__ # noQA:F401 -from .files import OutputFile, InputDir, InputFile, OutputDir # noQA:F401 +from .files import OutputFile, OutputDir # noQA:F401 from .numpyarrays import NumpyArray # noQA:F401 from .deprecated import OptionList # noQA:F401 from .loglevel import LogLevel # noQA:F401 from .slice import Slice # noQA:F401 -__all__ = __mmall__ + ['OutputFile', 'InputDir', 'InputFile', 'OutputDir', +__all__ = __mmall__ + ['OutputFile','OutputDir', 'NumpyArray', 'OptionList', 'LogLevel', 'Slice'] # Python 2 subpackage (not module) * imports break if items in __all__ diff --git a/argschema/fields/files.py b/argschema/fields/files.py index 22a54fe3..aa878cef 100644 --- a/argschema/fields/files.py +++ b/argschema/fields/files.py @@ -31,6 +31,7 @@ def __exit__(self, *args): def validate_outpath(path): try: with NamedTemporaryFile(mode='w', dir=path) as tfile: + print(tfile) tfile.write('0') tfile.close() @@ -92,6 +93,7 @@ def validate(cls, value): except Exception as e: # pragma: no cover raise ValueError( "%s cannot be os.path.dirname-ed" % value) # pragma: no cover + validate_outpath(path) return cls(value) @@ -105,44 +107,18 @@ class OutputDir(str): the directory exists and create the directory if it is not present, and will fail validation if the directory cannot be created or cannot be written to. - - Parameters - ========== - mode: str - mode to create directory """ - def __new__(cls, value='', mode=None): - obj = str.__new__(cls, value) - obj.mode = mode - - if (mode is not None) & (sys.platform == "win32"): - raise OutputDirModeException( - "Setting mode of OutputDir supported only on posix systems") - - return obj - - def __class_getitem__(cls, mode): - return lambda value: cls(value, mode=mode) @classmethod def __get_validators__(cls): yield cls.validate @classmethod - def validate(cls, value): - # this gets called before pydantic attempts to typecast - if not isinstance(value, cls): - value = cls(value) - - mode = value.mode - value = str(value) - + def validate(cls, value): if not os.path.isdir(value): try: os.makedirs(value) - if mode is not None: - os.chmod(value, mode) except OSError as e: if e.errno == errno.EEXIST: pass @@ -151,18 +127,6 @@ def validate(cls, value): "{} is not a directory and you cannot create it".format( value) ) - if mode is not None: - try: - assert((os.stat(value).st_mode & 0o777) == mode) - except AssertionError: - raise ValueError( - "{} does not have the mode ({}) that was specified ".format( - value, mode) - ) - except os.error: - raise ValueError( - "cannot get os.stat of {}".format(value) - ) # use outputfile to test that a file in this location is a valid path validate_outpath(value) @@ -180,46 +144,3 @@ def validate_input_path(value): except Exception as value: raise ValueError("%s is not readable" % value) -class InputDir(str): - """InputDir is :class:`marshmallow.fields.Str` subclass which is a path to a - a directory that exists and that the user can access - (presently checked with os.access) - """ - - @classmethod - def __get_validators__(cls): - yield cls.validate - - @classmethod - def validate(cls, value): - if not os.path.isdir(value): - raise ValueError("%s is not a directory") - - if sys.platform == "win32": - try: - x = list(os.scandir(value)) - except PermissionError: - raise ValueError( - "%s is not a readable directory" % value) - else: - if not os.access(value, os.R_OK): - raise ValueError( - "%s is not a readable directory" % value) - - return cls(value) - - -class InputFile(str): - """InputDile is a :class:`marshmallow.fields.Str` subclass which is a path to a - file location which can be read by the user - (presently passes os.path.isfile and os.access = R_OK) - """ - - @classmethod - def __get_validators__(cls): - yield cls.validate - - @classmethod - def validate(cls, value): - validate_input_path(value) - return cls(value) diff --git a/argschema/schemas.py b/argschema/schemas.py index 96684e69..fc27dcec 100644 --- a/argschema/schemas.py +++ b/argschema/schemas.py @@ -1,10 +1,10 @@ -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, FilePath from pydantic.main import ModelMetaclass from typing import get_origin from enum import Enum import logging import argparse -from .fields import InputFile, OutputFile +from .fields import OutputFile class LogLevel(Enum): DEBUG = logging.DEBUG @@ -14,7 +14,7 @@ class LogLevel(Enum): CRITICAL = logging.CRITICAL class ArgSchema(BaseModel): - input_json: InputFile = Field('input.json', description='zee inputs') + input_json: FilePath = Field('input.json', description='zee inputs') output_json: OutputFile = Field('output.json', description='zee outputs') log_level: LogLevel = Field(logging.ERROR, description='zee log level') diff --git a/test/fields/test_files.py b/test/fields/test_files.py index 3742b8ea..00ebe80e 100644 --- a/test/fields/test_files.py +++ b/test/fields/test_files.py @@ -1,14 +1,13 @@ import pytest from argschema import ArgSchema -from argschema.fields import InputFile, OutputFile, InputDir, OutputDir +from argschema.fields import OutputFile, OutputDir from argschema.fields.files import OutputDirModeException -from pydantic import Field +from pydantic import Field, FilePath, DirectoryPath import os import sys -if sys.platform == "win32": - import win32security - import ntsecuritycon as con - +from pathlib import Path +from stat import S_IREAD, S_IRGRP, S_IROTH +import stat # OUTPUT FILE TESTS class BasicOutputFile(ArgSchema): @@ -25,22 +24,17 @@ class BasicOutputFile(ArgSchema): def test_outputfile_no_write(tmpdir): - outdir = tmpdir.mkdir('cannot_write_here') - if sys.platform == "win32": - sd = win32security.GetFileSecurity(str(outdir), win32security.DACL_SECURITY_INFORMATION) - everyone, domain, type = win32security.LookupAccountName ("", "Everyone") - dacl = win32security.ACL () - dacl.AddAccessAllowedAce (win32security.ACL_REVISION, con.FILE_GENERIC_READ, everyone) - sd.SetSecurityDescriptorDacl (1, dacl, 0) - win32security.SetFileSecurity (str(outdir), win32security.DACL_SECURITY_INFORMATION, sd) - else: - outdir.chmod(0o444) - outfile = outdir.join('test') + # this doesn't work in windows + + outdir = Path(tmpdir / 'cannot_write_here') + outdir.mkdir(mode=S_IREAD) + + outfile = outdir / 'test' with pytest.raises(ValueError): BasicOutputFile(output_file=str(outfile)) - if sys.platform != "win32": - outdir.chmod(0o666) + + outdir.chmod(0o666) def test_outputfile_not_a_path(): @@ -102,40 +96,11 @@ def test_output_dir_bad_location(): with pytest.raises(ValueError): BasicOutputDir(output_dir='///\\\//\/') -if sys.platform != "win32": - class ModeOutputDirSchema(ArgSchema): - output_dir = OutputDir[0o775] - - -@pytest.mark.skipif(sys.platform != "win32", reason="no general support for chmod octal in windows") -def test_windows_outdir_mode_fail(): - with pytest.raises(OutputDirModeException): - output_dir = OutputDir(mode=0o775) - - -@pytest.mark.skipif(sys.platform == "win32", reason="no general support for chmod octal in windows") -def test_mode_output_osdir(tmpdir): - outdir = tmpdir.join('mytmp') - mod = ModeOutputDirSchema(output_dir=str(outdir)) - assert((os.stat(mod.output_dir).st_mode & 0o777) == 0o775) - - -@pytest.mark.skipif(sys.platform == "win32", reason="no general support for chmod octal in windows") -def test_failed_mode(tmpdir): - outdir = tmpdir.join('mytmp_failed') - os.makedirs(str(outdir)) - os.chmod(str(outdir), 0o777) - output_dir_example = { - 'output_dir': str(outdir) - } - with pytest.raises(ValueError): - ModeOutputDirSchema(output_dir=str(outdir)) - # INPUT FILE TESTS class BasicInputFile(ArgSchema): - input_file: InputFile = Field(..., description='a simple file') + input_file: FilePath = Field(..., description='a simple file') input_file_example = { @@ -182,7 +147,7 @@ def test_access_inputfile_failed(): # INPUTDIR TESTS class BasicInputDir(ArgSchema): - input_dir: InputDir = Field(description='a simple file') + input_dir: DirectoryPath = Field(description='a simple file') def test_basic_inputdir(tmpdir):