From 61cfbb556a1613838fb52470ac61cdbf58a20def Mon Sep 17 00:00:00 2001 From: rziemianek Date: Mon, 4 Nov 2024 12:36:42 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=A8=20Updated=20docstrings=20and=20ret?= =?UTF-8?q?urn=20statements?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/viadot/orchestration/prefect/utils.py | 72 +++++++++++------------ 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/src/viadot/orchestration/prefect/utils.py b/src/viadot/orchestration/prefect/utils.py index d31491cdc..276d3e881 100644 --- a/src/viadot/orchestration/prefect/utils.py +++ b/src/viadot/orchestration/prefect/utils.py @@ -40,9 +40,9 @@ def __init__( dynamic_date_format: str = "%Y%m%d", dynamic_date_timezone: str = "Europe/Warsaw", ): - """This class handles pendulum DateTimes or time-related patterns within - a provided text, replacing dynamic date marker with actual dates. + """This class processes time-related patterns in the provided text. + Replaces dynamic date markers with actual dates. The supported patterns include: - "today" - "yesterday" @@ -69,10 +69,10 @@ def __init__( the start and the end of a dynamic date pattern in a text. Defaults to ["<<", ">>"]. dynamic_date_format (str, optional): A date and time format string - defining the text representation of date. Defaults to "%Y%m%d". + defining the text representation of date. Defaults to "%Y%m%d". dynamic_date_timezone (str, optional): A string that sets the default - timezone used by all datetime functions. Defaults to "Europe/Warsaw". - """ # noqa: D205 + timezone used by all datetime functions. Defaults to "Europe/Warsaw". + """ self.singular_patterns = { "last_day_of_month": r"last_day_of_(\w+)_(\d{4})", "x_units_ago_full_date": r"(\d+)_(years?|months?|days?)_ago_full_date", @@ -106,15 +106,14 @@ def _generate_years( """ current_year = pendulum.now().year if last_years: - result = [str(current_year - i) for i in range(last_years)][ + return [str(current_year - i) for i in range(last_years)][ ::-1 ] # Reversed to ascending order - return result # noqa: RET504 if from_year and num_years: - result = [ + return [ str(int(from_year) + i) for i in range(int(num_years)) ] # Ascending order - return result # noqa: RET504 + return [] def _generate_months(self, last_months: int) -> list[str]: @@ -127,11 +126,11 @@ def _generate_months(self, last_months: int) -> list[str]: list: A list of dates representing the last X months in ascending order. """ current_date = pendulum.now() - result = [ + + return [ current_date.subtract(months=i).start_of("month").format("YMM") for i in range(last_months) ][::-1] # Reversed to ascending order - return result # noqa: RET504 def _generate_dates(self, last_days: int) -> list[str]: """Generate a list of dates for the last X days. @@ -143,10 +142,10 @@ def _generate_dates(self, last_days: int) -> list[str]: list: A list of dates in ascending order. """ current_date = pendulum.now(self.dynamic_date_timezone) - result = [ + + return [ current_date.subtract(days=i).format("YMMDD") for i in range(last_days) ][::-1] # Reversed to ascending order - return result # noqa: RET504 def _process_first_days( self, month_name: str, year: int, num_days: int @@ -166,10 +165,10 @@ def _process_first_days( pendulum.parse(month_name, strict=False).month, # type: ignore 1, # type: ignore ) - result = [ + + return [ start_date.add(days=i).format("YMMDD") for i in range(num_days) ] # Ascending order - return result # noqa: RET504 def _process_last_days( self, month_name: str, year: int, num_days: int @@ -190,10 +189,10 @@ def _process_last_days( 1, # type: ignore ) end_date = start_date.end_of("month") - result = [end_date.subtract(days=i).format("YMMDD") for i in range(num_days)][ + + return [end_date.subtract(days=i).format("YMMDD") for i in range(num_days)][ ::-1 ] # Reversed to ascending order - return result # noqa: RET504 def _process_last_day_of_month( self, year: str, month_name: str @@ -208,8 +207,8 @@ def _process_last_day_of_month( pendulum.DateTime: A date object containing the last day of the given month. """ month_num = pendulum.parse(month_name, strict=False).month # type: ignore - date = pendulum.datetime(int(year), month_num, 1).end_of("month") - return date # noqa: RET504 + + return pendulum.datetime(int(year), month_num, 1).end_of("month") def _process_x_years_ago(self, year: int) -> str: """Retrieve the year of a date X years from now. @@ -221,8 +220,8 @@ def _process_x_years_ago(self, year: int) -> str: str: A string containing the year of the specified time ago. """ current_date = pendulum.now() - result = current_date.subtract(years=year).format("Y") - return result # noqa: RET504 + + return current_date.subtract(years=year).format("Y") def _get_date_x_ago_full_date( self, number: int, unit: str @@ -236,17 +235,16 @@ def _get_date_x_ago_full_date( Returns: pendulum.DateTime: A date for X units ago from today. """ - full_date = { + return { "years": pendulum.now(self.dynamic_date_timezone).subtract(years=number), "months": pendulum.now(self.dynamic_date_timezone).subtract(months=number), "days": pendulum.now(self.dynamic_date_timezone).subtract(days=number), }.get(unit) - return full_date # noqa: RET504 - def _create_date_dict(self) -> dict[str, str]: """Create and return a key phrase: dynamic date value dictionary. - dictionary values "today", "yesterday" and "last_year_previous_month" are + + Dictionary values "today", "yesterday" and "last_year_previous_month" are formatted into the dynamic_date_format. The other values and their formatting: @@ -259,14 +257,15 @@ def _create_date_dict(self) -> dict[str, str]: Returns: dict[str, str]: A dictionary with key phrases as keys and dynamically created dates as values. - """ # noqa: D205 + """ today = pendulum.today(self.dynamic_date_timezone) yesterday = pendulum.yesterday(self.dynamic_date_timezone) last_month = today.subtract(months=1).month last_year = today.subtract(years=1) now_time = pendulum.now(self.dynamic_date_timezone) last_day_prev_month = today.subtract(months=1).end_of("month") - replacements = { + + return { "today": today.strftime(self.dynamic_date_format), "yesterday": yesterday.strftime(self.dynamic_date_format), "current_month": today.strftime("%m"), @@ -278,7 +277,6 @@ def _create_date_dict(self) -> dict[str, str]: self.dynamic_date_format ), } - return replacements # noqa: RET504 def _handle_singular_dates( self, dynamic_date_marker: str, match: list[tuple], key: str @@ -375,28 +373,26 @@ def _handle_data_ranges( elif key == "y_years_from_x": for number, start_year in match_found: - replacement = self._generate_years( + return self._generate_years( last_years=None, from_year=start_year, num_years=int(number), # type: ignore ) - return replacement # noqa: RET504 elif key == "first_x_days_from": for num_days, month_name, year in match_found: - replacement = self._process_first_days(month_name, year, int(num_days)) - return replacement # noqa: RET504 + return self._process_first_days(month_name, year, int(num_days)) elif key == "last_x_days_from": for num_days, month_name, year in match_found: - replacement = self._process_last_days(month_name, year, int(num_days)) - return replacement # noqa: RET504 + return self._process_last_days(month_name, year, int(num_days)) return dynamic_date_marker def _process_string(self, text: str) -> list[str] | str: - """Analyze and extract date ranges or singular dates from the given text - based on specific patterns or pendulum dates. + """Analyze and extract date ranges or singular dates from the given text. + + It bases on specific patterns or pendulum dates. Args: text (str): A string containing various time-related patterns to be analyzed. @@ -407,7 +403,7 @@ def _process_string(self, text: str) -> list[str] | str: returns list of extracted date ranges in ascending order. - If the input is a key phrase for a single date or a pendulum date, returns the input text with an accurate date. - """ # noqa: D205, W505 + """ start_symbol, end_symbol = self.dynamic_date_symbols start, end = re.escape(start_symbol), re.escape(end_symbol) pattern = rf"{start}.*?{end}" @@ -434,7 +430,7 @@ def _process_string(self, text: str) -> list[str] | str: if match_no_symbols in self.replacements: replacement = self.replacements[match_no_symbols] if not replacement: - replacement = eval(match_no_symbols) # noqa: S307 + replacement = eval(match_no_symbols) text = text.replace( match, (