Skip to content

Utils

Utils for big expectations

execute_query(query, duckdb_client, database_provider)

Execute a query, using bigquery_query() if the accessor is a BigQuery database.

This handles the limitation where BigQuery's Storage Read API cannot read views. When a BigQuery accessor is detected, the query is wrapped in bigquery_query() which uses the Jobs API instead.

Note: bigquery_query() only works for SELECT queries. Write operations (INSERT, CREATE, UPDATE, DELETE) use standard DuckDB execution with the accessor prefix.

Source code in src/koality/utils.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def execute_query(
    query: str,
    duckdb_client: duckdb.DuckDBPyConnection,
    database_provider: DatabaseProvider | None,
) -> duckdb.DuckDBPyRelation:
    """
    Execute a query, using bigquery_query() if the accessor is a BigQuery database.

    This handles the limitation where BigQuery's Storage Read API cannot read views.
    When a BigQuery accessor is detected, the query is wrapped in bigquery_query()
    which uses the Jobs API instead.

    Note: bigquery_query() only works for SELECT queries. Write operations
    (INSERT, CREATE, UPDATE, DELETE) use standard DuckDB execution with the accessor prefix.
    """
    if database_provider:
        if database_provider.type == "bigquery":
            # Check if this is a write operation
            query_upper = query.strip().upper()
            is_write_operation = query_upper.startswith(("INSERT", "CREATE", "UPDATE", "DELETE", "DROP", "ALTER"))

            # Need to escape single quotes in the query
            escaped_query = query.replace("'", "\\'")
            # path -> google cloud project
            project = database_provider.path

            if is_write_operation:
                # Use bigquery_execute for write operations
                wrapped_query = f"CALL bigquery_execute('{project}', '{escaped_query}')"  # noqa: S608
            else:
                # Use bigquery_query for read operations (supports views)
                wrapped_query = f"SELECT * FROM bigquery_query('{project}', '{escaped_query}')"  # noqa: S608

            return duckdb_client.query(wrapped_query)
        log.info(f"Database is of type '{database_provider.type}'. Using standard query execution.")

    return duckdb_client.query(query)

parse_date(date, offset_days=0)

Parses a date string which can be a relative terms like "today", "yesterday", or "tomorrow", actual dates, or relative dates like "today-2".

Parameters:

Name Type Description Default
date str

The date string to be parsed.

required
offset_days int

The number of days to be added/substracted.

0
Source code in src/koality/utils.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def parse_date(date: str, offset_days: int = 0) -> str:
    """
    Parses a date string which can be a relative terms like "today", "yesterday",
    or "tomorrow", actual dates, or relative dates like "today-2".

    Args:
        date: The date string to be parsed.
        offset_days: The number of days to be added/substracted.
    """
    date = str(date).lower()

    if date == "today":
        return (dt.datetime.today() + dt.timedelta(days=offset_days)).date().isoformat()

    if date == "yesterday":
        offset_days -= 1
        return (dt.datetime.today() + dt.timedelta(days=offset_days)).date().isoformat()

    if date == "tomorrow":
        offset_days += 1
        return (dt.datetime.today() + dt.timedelta(days=offset_days)).date().isoformat()

    if regex_match := re.search(r"today([+-][0-9]+)", date):
        offset_days += int(regex_match[1])
        return (dt.datetime.today() + dt.timedelta(days=offset_days)).date().isoformat()

    return (dt.datetime.fromisoformat(date) + dt.timedelta(days=offset_days)).date().isoformat()

to_set(value)

Converts the input string to a set. The special case of one single string is also covered. Duplicates are also removed and for deterministic behavior, the values are sorted.

It will, convert input as follows: - 1 -> {1} - True -> {True} - "toys" / '"toys"' -> {"toys"} - ("toys") / '("toys")' -> {"toys"} - ("toys", "shirt") / '("toys", "shirt")' -> {"shirt", "toys"} - ["toys"] -> {"toys"} - {"toys"} -> {"toys"}

Source code in src/koality/utils.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def to_set(value: Any) -> set:
    """
    Converts the input string to a set. The special case of one single string
    is also covered. Duplicates are also removed and for deterministic behavior,
    the values are sorted.

    It will, convert input as follows:
    - 1 -> {1}
    - True -> {True}
    - "toys" / '"toys"' -> {"toys"}
    - ("toys") / '("toys")' -> {"toys"}
    - ("toys", "shirt") / '("toys", "shirt")' -> {"shirt", "toys"}
    - ["toys"] -> {"toys"}
    - {"toys"} -> {"toys"}

    """
    try:
        value = literal_eval(value)
    except ValueError:
        pass
    if not isinstance(value, Iterable) or isinstance(value, (str, bytes)):
        return {value}
    if isinstance(value, set):
        return value
    return set(value)