Skip to content

Tables

Module for dealing with BigQueryTables

BQTable

Represents a BigQuery table.

Source code in bquest/tables.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class BQTable:
    """
    Represents a BigQuery table.
    """

    def __init__(self, original_table_id: str, fq_test_table_id: str, bq_client: google.cloud.bigquery.Client) -> None:
        """

        Args:
            original_table_id: original table id
            fq_test_table_id: full qualified test table id
            bq_client: BigQuery client used for interacting with BigQuery
        """
        if original_table_id == fq_test_table_id:
            raise ValueError("'original_table_id' and 'fq_test_table_id' can't be the same.")

        if is_sql(fq_test_table_id):
            raise ValueError("'test_table_id' contains sql syntax.")

        self._original_table_id = original_table_id
        self._fq_test_table_id = fq_test_table_id
        self._bq_client = bq_client

    @property
    def original_table_id(self) -> str:
        """Returns the original table identifier (e.g. bquest.example_id)"""
        return self._original_table_id

    @property
    def fq_test_table_id(self) -> str:
        """
        Returns the table identifier used for testing (e.g. bquest.example_id)
        """
        return self._fq_test_table_id

    def remove_require_partition_filter(self, table_id: str) -> None:
        """
        Method to drop table partition filter requirement
        Args:
            table_id: table id

        Returns:
            None - table settings updated in place
        """
        table = self._bq_client.get_table(table_id)
        if "requirePartitionFilter" in table.to_api_repr():
            table.require_partition_filter = False
            self._bq_client.update_table(table, ["require_partition_filter"])

    def to_df(self) -> pd.DataFrame:
        """Loads the table into a dataframe

        Returns:
            Loaded table as pandas dataframe
        """
        self.remove_require_partition_filter(self._fq_test_table_id)

        sql = f"SELECT * FROM `{self._fq_test_table_id}`"  # noqa: S608, SQL injection prevented in init

        return self._bq_client.query(sql).to_dataframe()

    def delete(self) -> None:
        """Deletes the table"""
        self._bq_client.delete_table(google.cloud.bigquery.table.TableReference.from_string(self._fq_test_table_id))

fq_test_table_id: str property

Returns the table identifier used for testing (e.g. bquest.example_id)

original_table_id: str property

Returns the original table identifier (e.g. bquest.example_id)

__init__(original_table_id, fq_test_table_id, bq_client)

Parameters:

Name Type Description Default
original_table_id str

original table id

required
fq_test_table_id str

full qualified test table id

required
bq_client Client

BigQuery client used for interacting with BigQuery

required
Source code in bquest/tables.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def __init__(self, original_table_id: str, fq_test_table_id: str, bq_client: google.cloud.bigquery.Client) -> None:
    """

    Args:
        original_table_id: original table id
        fq_test_table_id: full qualified test table id
        bq_client: BigQuery client used for interacting with BigQuery
    """
    if original_table_id == fq_test_table_id:
        raise ValueError("'original_table_id' and 'fq_test_table_id' can't be the same.")

    if is_sql(fq_test_table_id):
        raise ValueError("'test_table_id' contains sql syntax.")

    self._original_table_id = original_table_id
    self._fq_test_table_id = fq_test_table_id
    self._bq_client = bq_client

delete()

Deletes the table

Source code in bquest/tables.py
76
77
78
def delete(self) -> None:
    """Deletes the table"""
    self._bq_client.delete_table(google.cloud.bigquery.table.TableReference.from_string(self._fq_test_table_id))

remove_require_partition_filter(table_id)

Method to drop table partition filter requirement Args: table_id: table id

Returns:

Type Description
None

None - table settings updated in place

Source code in bquest/tables.py
50
51
52
53
54
55
56
57
58
59
60
61
62
def remove_require_partition_filter(self, table_id: str) -> None:
    """
    Method to drop table partition filter requirement
    Args:
        table_id: table id

    Returns:
        None - table settings updated in place
    """
    table = self._bq_client.get_table(table_id)
    if "requirePartitionFilter" in table.to_api_repr():
        table.require_partition_filter = False
        self._bq_client.update_table(table, ["require_partition_filter"])

to_df()

Loads the table into a dataframe

Returns:

Type Description
DataFrame

Loaded table as pandas dataframe

Source code in bquest/tables.py
64
65
66
67
68
69
70
71
72
73
74
def to_df(self) -> pd.DataFrame:
    """Loads the table into a dataframe

    Returns:
        Loaded table as pandas dataframe
    """
    self.remove_require_partition_filter(self._fq_test_table_id)

    sql = f"SELECT * FROM `{self._fq_test_table_id}`"  # noqa: S608, SQL injection prevented in init

    return self._bq_client.query(sql).to_dataframe()

BQTableDataframeDefinition

Bases: BQTableDefinition

Defines BigQuery tables based on a pandas dataframe.

Source code in bquest/tables.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class BQTableDataframeDefinition(BQTableDefinition):
    """
    Defines BigQuery tables based on a pandas dataframe.
    """

    def __init__(self, original_table_id: str, df: pd.DataFrame, project: str, dataset: str, location: str) -> None:
        """

        Args:
            original_table_id: table name
            df: pandas DataFrame that is loaded
            project: Google Cloud project id
            dataset: dataset name e.g. bquest
            location: location of dataset e.g. EU
        """
        super().__init__(original_table_id, project, dataset, location)
        self._df = df

    def load_to_bq(self, bq_client: google.cloud.bigquery.Client) -> BQTable:
        """Loads this definition to a BigQuery table.

        Args:
            bq_client: A BigQuery client

        Returns:
            BQTable: A representative of the BigQuery table which was created.
        """
        self._df.to_gbq(
            f"{self._dataset}.{self.table_name}",
            location=self._location,
            project_id=self._project,
            if_exists="replace",
        )
        return BQTable(
            self._original_table_id,
            self.fq_table_id,
            bq_client,
        )

__init__(original_table_id, df, project, dataset, location)

Parameters:

Name Type Description Default
original_table_id str

table name

required
df DataFrame

pandas DataFrame that is loaded

required
project str

Google Cloud project id

required
dataset str

dataset name e.g. bquest

required
location str

location of dataset e.g. EU

required
Source code in bquest/tables.py
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, original_table_id: str, df: pd.DataFrame, project: str, dataset: str, location: str) -> None:
    """

    Args:
        original_table_id: table name
        df: pandas DataFrame that is loaded
        project: Google Cloud project id
        dataset: dataset name e.g. bquest
        location: location of dataset e.g. EU
    """
    super().__init__(original_table_id, project, dataset, location)
    self._df = df

load_to_bq(bq_client)

Loads this definition to a BigQuery table.

Parameters:

Name Type Description Default
bq_client Client

A BigQuery client

required

Returns:

Name Type Description
BQTable BQTable

A representative of the BigQuery table which was created.

Source code in bquest/tables.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def load_to_bq(self, bq_client: google.cloud.bigquery.Client) -> BQTable:
    """Loads this definition to a BigQuery table.

    Args:
        bq_client: A BigQuery client

    Returns:
        BQTable: A representative of the BigQuery table which was created.
    """
    self._df.to_gbq(
        f"{self._dataset}.{self.table_name}",
        location=self._location,
        project_id=self._project,
        if_exists="replace",
    )
    return BQTable(
        self._original_table_id,
        self.fq_table_id,
        bq_client,
    )

BQTableDefinition

Base class for BigQuery table definitions.

Source code in bquest/tables.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class BQTableDefinition:
    """
    Base class for BigQuery table definitions.
    """

    def __init__(self, original_table_id: str, project: str, dataset: str, location: str) -> None:
        """

        Args:
            name: table name
            project: Google Cloud project id
            dataset: dataset name e.g. bquest
            location: location of dataset e.g. EU
        """
        self._original_table_id = original_table_id
        self._project = project
        self._dataset = dataset
        self._location = location
        self._test_table_id = (
            f"{original_table_id}_{str(uuid.uuid1())}".replace("-", "_")
            .replace(".", "_")
            .replace("{", "_")
            .replace("}", "_")
            .replace("$", "_")
        )

    @property
    def table_name(self) -> str:
        return self._test_table_id

    @property
    def dataset(self) -> str:
        return self._dataset

    @property
    def project(self) -> str:
        return self._project

    @property
    def fq_table_id(self) -> str:
        """
        Returns the fully qualified table name in the form
        {project_name}.{dataset}.{table_name}
        """
        return f"{self._project}.{self._dataset}.{self.table_name}"

    def load_to_bq(self, bq_client: google.cloud.bigquery.Client) -> BQTable:
        return BQTable(self._original_table_id, self.fq_table_id, bq_client)

fq_table_id: str property

Returns the fully qualified table name in the form {project_name}.{dataset}.{table_name}

__init__(original_table_id, project, dataset, location)

Parameters:

Name Type Description Default
name

table name

required
project str

Google Cloud project id

required
dataset str

dataset name e.g. bquest

required
location str

location of dataset e.g. EU

required
Source code in bquest/tables.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def __init__(self, original_table_id: str, project: str, dataset: str, location: str) -> None:
    """

    Args:
        name: table name
        project: Google Cloud project id
        dataset: dataset name e.g. bquest
        location: location of dataset e.g. EU
    """
    self._original_table_id = original_table_id
    self._project = project
    self._dataset = dataset
    self._location = location
    self._test_table_id = (
        f"{original_table_id}_{str(uuid.uuid1())}".replace("-", "_")
        .replace(".", "_")
        .replace("{", "_")
        .replace("}", "_")
        .replace("$", "_")
    )

BQTableDefinitionBuilder

Helper class for building BQTableDefinitions

Source code in bquest/tables.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
class BQTableDefinitionBuilder:
    """Helper class for building BQTableDefinitions"""

    def __init__(self, project: str, dataset: str = "bquest", location: str = "EU"):
        """

        Args:
            project: Google Cloud project
            dataset: BigQuery dataset e.g. bquest
            location: location of dataset e.g. EU
        """
        self._project = project
        self._dataset = dataset
        self._location = location

    def from_json(
        self,
        name: str,
        rows: List[Dict[str, Any]],
        schema: Optional[List[google.cloud.bigquery.SchemaField]] = None,
    ) -> BQTableJsonDefinition:
        return BQTableJsonDefinition(name, rows, schema, self._project, self._dataset, self._location)

    def from_df(self, name: str, df: pd.DataFrame) -> BQTableDataframeDefinition:
        return BQTableDataframeDefinition(name, df, self._project, self._dataset, self._location)

    def create_empty(self, name: str) -> BQTableDefinition:
        return BQTableDefinition(name, self._project, self._dataset, self._location)

__init__(project, dataset='bquest', location='EU')

Parameters:

Name Type Description Default
project str

Google Cloud project

required
dataset str

BigQuery dataset e.g. bquest

'bquest'
location str

location of dataset e.g. EU

'EU'
Source code in bquest/tables.py
241
242
243
244
245
246
247
248
249
250
251
def __init__(self, project: str, dataset: str = "bquest", location: str = "EU"):
    """

    Args:
        project: Google Cloud project
        dataset: BigQuery dataset e.g. bquest
        location: location of dataset e.g. EU
    """
    self._project = project
    self._dataset = dataset
    self._location = location

BQTableJsonDefinition

Bases: BQTableDefinition

Defines BigQuery tables based on a JSON format.

Source code in bquest/tables.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
class BQTableJsonDefinition(BQTableDefinition):
    """
    Defines BigQuery tables based on a JSON format.
    """

    def __init__(
        self,
        original_table_id: str,
        rows: List[Dict[str, Any]],
        schema: Optional[List[google.cloud.bigquery.SchemaField]],
        project: str,
        dataset: str,
        location: str,
    ) -> None:
        """

        Args:
            original_table_id: table name
            rows: json-like rows
            schema: schema of the data
            project: Google Cloud project
            dataset: dataset name e.g. bquest
            location: location of dataset e.g. EU
        """
        super().__init__(original_table_id, project, dataset, location)
        self._rows_json_sources = self._convert_rows_to_bq_json_format(rows)
        self._schema = schema

    @staticmethod
    def _convert_rows_to_bq_json_format(rows: List[Dict[str, Any]]) -> BytesIO:
        rows_as_json = [json.dumps(row) for row in rows]
        return BytesIO(bytes("\n".join(rows_as_json), "ascii"))

    def _create_bq_load_config(self) -> google.cloud.bigquery.job.LoadJobConfig:
        load_config = google.cloud.bigquery.job.LoadJobConfig()
        load_config.source_format = google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON
        if self._schema:
            load_config.schema = self._schema
            load_config.autodetect = False
        else:
            load_config.autodetect = True
        return load_config

    def load_to_bq(self, bq_client: google.cloud.bigquery.Client) -> BQTable:
        """Loads this definition to a BigQuery table.

        Arguments:
            bq_client: BigQuery client for interacting with BigQuery

        Returns:
            BQTable: A representative of the BigQuery table which was created.
        """
        job = bq_client.load_table_from_file(
            self._rows_json_sources,
            google.cloud.bigquery.table.TableReference.from_string(self.fq_table_id),
            location=self._location,
            job_config=self._create_bq_load_config(),
        )
        try:
            job.result()
        except BadRequest as e:
            # same error but with full error msg
            raise BadRequest(str(job.errors)) from e  # type: ignore

        return BQTable(self._original_table_id, self.fq_table_id, bq_client)

__init__(original_table_id, rows, schema, project, dataset, location)

Parameters:

Name Type Description Default
original_table_id str

table name

required
rows List[Dict[str, Any]]

json-like rows

required
schema Optional[List[SchemaField]]

schema of the data

required
project str

Google Cloud project

required
dataset str

dataset name e.g. bquest

required
location str

location of dataset e.g. EU

required
Source code in bquest/tables.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def __init__(
    self,
    original_table_id: str,
    rows: List[Dict[str, Any]],
    schema: Optional[List[google.cloud.bigquery.SchemaField]],
    project: str,
    dataset: str,
    location: str,
) -> None:
    """

    Args:
        original_table_id: table name
        rows: json-like rows
        schema: schema of the data
        project: Google Cloud project
        dataset: dataset name e.g. bquest
        location: location of dataset e.g. EU
    """
    super().__init__(original_table_id, project, dataset, location)
    self._rows_json_sources = self._convert_rows_to_bq_json_format(rows)
    self._schema = schema

load_to_bq(bq_client)

Loads this definition to a BigQuery table.

Parameters:

Name Type Description Default
bq_client Client

BigQuery client for interacting with BigQuery

required

Returns:

Name Type Description
BQTable BQTable

A representative of the BigQuery table which was created.

Source code in bquest/tables.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def load_to_bq(self, bq_client: google.cloud.bigquery.Client) -> BQTable:
    """Loads this definition to a BigQuery table.

    Arguments:
        bq_client: BigQuery client for interacting with BigQuery

    Returns:
        BQTable: A representative of the BigQuery table which was created.
    """
    job = bq_client.load_table_from_file(
        self._rows_json_sources,
        google.cloud.bigquery.table.TableReference.from_string(self.fq_table_id),
        location=self._location,
        job_config=self._create_bq_load_config(),
    )
    try:
        job.result()
    except BadRequest as e:
        # same error but with full error msg
        raise BadRequest(str(job.errors)) from e  # type: ignore

    return BQTable(self._original_table_id, self.fq_table_id, bq_client)