Skip to content

Models

Pydantic models for koality configuration validation.

Config

Bases: BaseModel

Root configuration model for koality check execution.

Source code in src/koality/models.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
class Config(BaseModel):
    """Root configuration model for koality check execution."""

    name: str
    database_setup: str
    database_accessor: str
    defaults: _GlobalDefaults
    check_bundles: list[_CheckBundle]

    @model_validator(mode="before")
    @classmethod
    def propagate_defaults_to_checks(cls, data: dict) -> dict:
        """Merge defaults and check_bundle.defaults into each check before validation.

        Merge order (later overrides earlier):
        1. defaults
        2. bundle defaults
        3. check-specific values

        For the 'filters' dict, a deep merge is performed so that check-level
        filters override individual filter entries rather than replacing the whole dict.
        """
        if not isinstance(data, dict):
            return data

        defaults = data.get("defaults", {})
        check_bundles = data.get("check_bundles", [])

        if not check_bundles:
            return data

        updated_bundles = []
        for bundle in check_bundles:
            if not isinstance(bundle, dict):
                updated_bundles.append(bundle)
                continue

            bundle_defaults = bundle.get("defaults", {})
            checks = bundle.get("checks", [])

            merged_checks = []
            for check in checks:
                if isinstance(check, dict):
                    # Merge order: defaults -> check_bundle.defaults -> check
                    merged = {**defaults, **bundle_defaults, **check}

                    # Deep merge for 'filters' dict
                    merged["filters"] = cls._merge_filters(
                        defaults.get("filters", {}),
                        bundle_defaults.get("filters", {}),
                        check.get("filters", {}),
                    )

                    merged_checks.append(merged)
                else:
                    merged_checks.append(check)

            bundle["checks"] = merged_checks
            updated_bundles.append(bundle)

        data["check_bundles"] = updated_bundles
        return data

    @staticmethod
    def _merge_filters(*filter_dicts: dict) -> dict:
        """Deep merge multiple filter dicts.

        For each filter name, later values override earlier ones.
        Within a single filter, individual keys are merged (e.g., value overrides
        but column is inherited if not specified).
        """
        result: dict = {}
        for filters in filter_dicts:
            if not filters:
                continue
            for name, config in filters.items():
                if name not in result:
                    result[name] = config if isinstance(config, dict) else {"value": config}
                elif isinstance(config, dict):
                    result[name] = {**result[name], **config}
                else:
                    # Shorthand: just a value
                    result[name]["value"] = config
        return result

    @model_validator(mode="after")
    def validate_identifier_consistency(self) -> Self:
        """Validate identifier filter consistency based on identifier_format.

        When identifier_format is 'filter_name' or 'column_name', all identifier
        filters across all checks must have the same filter name or column name
        respectively, since these are used as result column headers.
        Also checks that if identifier_format is 'filter_name' or 'column_name',
        every check has an identifier filter.
        """
        identifier_format = self.defaults.identifier_format
        if identifier_format == "identifier":
            return self

        filter_names: set[str] = set()
        column_names: set[str] = set()

        for bundle in self.check_bundles:
            for check in bundle.checks:
                has_identifier_filter = False
                for name, config in check.filters.items():
                    if config.type == "identifier":
                        has_identifier_filter = True
                        filter_names.add(name)
                        if config.column:
                            column_names.add(config.column)

                if not has_identifier_filter:
                    msg = (
                        f"Check '{check.check_type}' in bundle '{bundle.name}' is missing an identifier filter. "
                        f"When identifier_format is '{identifier_format}', every check must have a filter with "
                        f"type='identifier'."
                    )
                    raise ValueError(msg)

        if identifier_format == "filter_name" and len(filter_names) > 1:
            msg = (
                f"When identifier_format='filter_name', all identifier filters must have "
                f"the same filter name. Found different names: {sorted(filter_names)}"
            )
            raise ValueError(msg)

        if identifier_format == "column_name" and len(column_names) > 1:
            msg = (
                f"When identifier_format='column_name', all identifier filters must have "
                f"the same column name. Found different columns: {sorted(column_names)}"
            )
            raise ValueError(msg)

        return self

    @model_validator(mode="after")
    def validate_filter_values_complete(self) -> Self:
        """Validate that all filters in checks have column and value set.

        Filters in defaults can omit column/value (to be set at check level),
        but after merging, all filters must have both column and value except for
        identifier-type filters which are allowed to omit a value (used for naming).
        """
        for bundle in self.check_bundles:
            for check in bundle.checks:
                for name, config in check.filters.items():
                    # Non-identifier filters must define a column
                    if config.type != "identifier" and config.column is None:
                        msg = (
                            f"Filter '{name}' in check '{check.check_type}' "
                            f"(bundle '{bundle.name}') is missing a column. "
                            f"Set column in defaults, bundle defaults, or the check itself."
                        )
                        raise ValueError(msg)
                    # For identifier filters, a missing value is allowed only when the
                    # global identifier_format is not 'identifier' (i.e., used solely for naming).
                    if config.type == "identifier":
                        if (
                            self.defaults.identifier_format == "identifier"
                            and config.value is None
                            and config.operator == "="
                        ):
                            msg = (
                                f"Filter '{name}' in check '{check.check_type}' "
                                f"(bundle '{bundle.name}') is missing a value. "
                                f"Set value in defaults, bundle defaults, or the check itself."
                            )
                            raise ValueError(msg)
                    # Require a value for non-identifier filters (unless operator explicitly allows None)
                    elif config.value is None and config.operator == "=":
                        msg = (
                            f"Filter '{name}' in check '{check.check_type}' "
                            f"(bundle '{bundle.name}') is missing a value. "
                            f"Set value in defaults, bundle defaults, or the check itself."
                        )
                        raise ValueError(msg)
        return self

propagate_defaults_to_checks(data) classmethod

Merge defaults and check_bundle.defaults into each check before validation.

Merge order (later overrides earlier): 1. defaults 2. bundle defaults 3. check-specific values

For the 'filters' dict, a deep merge is performed so that check-level filters override individual filter entries rather than replacing the whole dict.

Source code in src/koality/models.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
@model_validator(mode="before")
@classmethod
def propagate_defaults_to_checks(cls, data: dict) -> dict:
    """Merge defaults and check_bundle.defaults into each check before validation.

    Merge order (later overrides earlier):
    1. defaults
    2. bundle defaults
    3. check-specific values

    For the 'filters' dict, a deep merge is performed so that check-level
    filters override individual filter entries rather than replacing the whole dict.
    """
    if not isinstance(data, dict):
        return data

    defaults = data.get("defaults", {})
    check_bundles = data.get("check_bundles", [])

    if not check_bundles:
        return data

    updated_bundles = []
    for bundle in check_bundles:
        if not isinstance(bundle, dict):
            updated_bundles.append(bundle)
            continue

        bundle_defaults = bundle.get("defaults", {})
        checks = bundle.get("checks", [])

        merged_checks = []
        for check in checks:
            if isinstance(check, dict):
                # Merge order: defaults -> check_bundle.defaults -> check
                merged = {**defaults, **bundle_defaults, **check}

                # Deep merge for 'filters' dict
                merged["filters"] = cls._merge_filters(
                    defaults.get("filters", {}),
                    bundle_defaults.get("filters", {}),
                    check.get("filters", {}),
                )

                merged_checks.append(merged)
            else:
                merged_checks.append(check)

        bundle["checks"] = merged_checks
        updated_bundles.append(bundle)

    data["check_bundles"] = updated_bundles
    return data

validate_filter_values_complete()

Validate that all filters in checks have column and value set.

Filters in defaults can omit column/value (to be set at check level), but after merging, all filters must have both column and value except for identifier-type filters which are allowed to omit a value (used for naming).

Source code in src/koality/models.py
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
@model_validator(mode="after")
def validate_filter_values_complete(self) -> Self:
    """Validate that all filters in checks have column and value set.

    Filters in defaults can omit column/value (to be set at check level),
    but after merging, all filters must have both column and value except for
    identifier-type filters which are allowed to omit a value (used for naming).
    """
    for bundle in self.check_bundles:
        for check in bundle.checks:
            for name, config in check.filters.items():
                # Non-identifier filters must define a column
                if config.type != "identifier" and config.column is None:
                    msg = (
                        f"Filter '{name}' in check '{check.check_type}' "
                        f"(bundle '{bundle.name}') is missing a column. "
                        f"Set column in defaults, bundle defaults, or the check itself."
                    )
                    raise ValueError(msg)
                # For identifier filters, a missing value is allowed only when the
                # global identifier_format is not 'identifier' (i.e., used solely for naming).
                if config.type == "identifier":
                    if (
                        self.defaults.identifier_format == "identifier"
                        and config.value is None
                        and config.operator == "="
                    ):
                        msg = (
                            f"Filter '{name}' in check '{check.check_type}' "
                            f"(bundle '{bundle.name}') is missing a value. "
                            f"Set value in defaults, bundle defaults, or the check itself."
                        )
                        raise ValueError(msg)
                # Require a value for non-identifier filters (unless operator explicitly allows None)
                elif config.value is None and config.operator == "=":
                    msg = (
                        f"Filter '{name}' in check '{check.check_type}' "
                        f"(bundle '{bundle.name}') is missing a value. "
                        f"Set value in defaults, bundle defaults, or the check itself."
                    )
                    raise ValueError(msg)
    return self

validate_identifier_consistency()

Validate identifier filter consistency based on identifier_format.

When identifier_format is 'filter_name' or 'column_name', all identifier filters across all checks must have the same filter name or column name respectively, since these are used as result column headers. Also checks that if identifier_format is 'filter_name' or 'column_name', every check has an identifier filter.

Source code in src/koality/models.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
@model_validator(mode="after")
def validate_identifier_consistency(self) -> Self:
    """Validate identifier filter consistency based on identifier_format.

    When identifier_format is 'filter_name' or 'column_name', all identifier
    filters across all checks must have the same filter name or column name
    respectively, since these are used as result column headers.
    Also checks that if identifier_format is 'filter_name' or 'column_name',
    every check has an identifier filter.
    """
    identifier_format = self.defaults.identifier_format
    if identifier_format == "identifier":
        return self

    filter_names: set[str] = set()
    column_names: set[str] = set()

    for bundle in self.check_bundles:
        for check in bundle.checks:
            has_identifier_filter = False
            for name, config in check.filters.items():
                if config.type == "identifier":
                    has_identifier_filter = True
                    filter_names.add(name)
                    if config.column:
                        column_names.add(config.column)

            if not has_identifier_filter:
                msg = (
                    f"Check '{check.check_type}' in bundle '{bundle.name}' is missing an identifier filter. "
                    f"When identifier_format is '{identifier_format}', every check must have a filter with "
                    f"type='identifier'."
                )
                raise ValueError(msg)

    if identifier_format == "filter_name" and len(filter_names) > 1:
        msg = (
            f"When identifier_format='filter_name', all identifier filters must have "
            f"the same filter name. Found different names: {sorted(filter_names)}"
        )
        raise ValueError(msg)

    if identifier_format == "column_name" and len(column_names) > 1:
        msg = (
            f"When identifier_format='column_name', all identifier filters must have "
            f"the same column name. Found different columns: {sorted(column_names)}"
        )
        raise ValueError(msg)

    return self

DatabaseProvider dataclass

Data class representing a DuckDB database provider connection.

Source code in src/koality/models.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@dataclass
class DatabaseProvider:
    """Data class representing a DuckDB database provider connection."""

    database_name: str
    database_oid: int
    path: str
    comment: str | None
    tags: dict
    internal: bool
    type: str
    readonly: bool
    encrypted: bool
    cipher: str | None

FilterConfig

Bases: BaseModel

Configuration for a single filter.

Attributes:

Name Type Description
column str | None

The database column name to filter on.

value FilterValue

The filter value (can be any type, will be converted to string in SQL). For IN/NOT IN operators, use a list of values. For date filters, supports relative dates like "today", "yesterday", and offsets like "yesterday-2" or "today+1".

operator FilterOperator

SQL comparison operator. Defaults to "=" (equality).

type FilterType

Filter type - "date" for date filters (used for rolling checks), "identifier" for identifier filters (e.g., shop_id), "other" for regular filters. Only one "date" and one "identifier" type filter is allowed per configuration. When type="date", the value is automatically parsed as a date.

parse_as_date bool

If True, the value will be parsed as a date even for type="other". Useful for filters that need date parsing but aren't the primary date filter.

Example

filters: partition_date: column: BQ_PARTITIONTIME value: yesterday-2 # 2 days before yesterday type: date shop_id: column: shopId value: EC0601 type: identifier created_at: column: created_date value: today+1 # tomorrow parse_as_date: true # parses date but doesn't count as the "date" filter revenue: column: total_revenue value: 1000 operator: ">="

Source code in src/koality/models.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class FilterConfig(BaseModel):
    """Configuration for a single filter.

    Attributes:
        column: The database column name to filter on.
        value: The filter value (can be any type, will be converted to string in SQL).
            For IN/NOT IN operators, use a list of values.
            For date filters, supports relative dates like "today", "yesterday",
            and offsets like "yesterday-2" or "today+1".
        operator: SQL comparison operator. Defaults to "=" (equality).
        type: Filter type - "date" for date filters (used for rolling checks),
            "identifier" for identifier filters (e.g., shop_id),
            "other" for regular filters. Only one "date" and one "identifier" type
            filter is allowed per configuration.
            When type="date", the value is automatically parsed as a date.
        parse_as_date: If True, the value will be parsed as a date even for type="other".
            Useful for filters that need date parsing but aren't the primary date filter.

    Example:
        filters:
          partition_date:
            column: BQ_PARTITIONTIME
            value: yesterday-2  # 2 days before yesterday
            type: date
          shop_id:
            column: shopId
            value: EC0601
            type: identifier
          created_at:
            column: created_date
            value: today+1  # tomorrow
            parse_as_date: true  # parses date but doesn't count as the "date" filter
          revenue:
            column: total_revenue
            value: 1000
            operator: ">="

    """

    column: str | None = None
    value: FilterValue = None
    operator: FilterOperator = "="
    type: FilterType = "other"
    parse_as_date: bool = False

    @model_validator(mode="after")
    def validate_operator_value_combination(self) -> Self:
        """Validate that operator and value type are compatible.

        Skips validation when value is None with default operator, as this
        indicates a partial filter in defaults that will be completed later.
        """
        # Skip validation for partial filters (value not set, using default operator)
        if self.value is None and self.operator == "=":
            return self

        if self.value is None:
            if self.operator not in ("=", "!="):
                msg = f"None/null values can only be used with = or != operators, got: {self.operator}"
                raise ValueError(msg)
        elif isinstance(self.value, list):
            if self.operator not in ("IN", "NOT IN"):
                msg = f"List values can only be used with IN/NOT IN operators, got: {self.operator}"
                raise ValueError(msg)
        elif self.operator in ("IN", "NOT IN"):
            msg = f"IN/NOT IN operators require a list value, got: {type(self.value).__name__}"
            raise ValueError(msg)
        return self

validate_operator_value_combination()

Validate that operator and value type are compatible.

Skips validation when value is None with default operator, as this indicates a partial filter in defaults that will be completed later.

Source code in src/koality/models.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@model_validator(mode="after")
def validate_operator_value_combination(self) -> Self:
    """Validate that operator and value type are compatible.

    Skips validation when value is None with default operator, as this
    indicates a partial filter in defaults that will be completed later.
    """
    # Skip validation for partial filters (value not set, using default operator)
    if self.value is None and self.operator == "=":
        return self

    if self.value is None:
        if self.operator not in ("=", "!="):
            msg = f"None/null values can only be used with = or != operators, got: {self.operator}"
            raise ValueError(msg)
    elif isinstance(self.value, list):
        if self.operator not in ("IN", "NOT IN"):
            msg = f"List values can only be used with IN/NOT IN operators, got: {self.operator}"
            raise ValueError(msg)
    elif self.operator in ("IN", "NOT IN"):
        msg = f"IN/NOT IN operators require a list value, got: {type(self.value).__name__}"
        raise ValueError(msg)
    return self