Source code for rudra.data_store.bigquery.base

"""Implementation Bigquery builder base."""
import os
import time
import tempfile
from google.cloud import bigquery

from rudra import logger
from rudra.data_store.aws import AmazonS3


_POLLING_DELAY = 1  # sec


[docs]class BigqueryBuilder: """BigqueryBuilder class Implementation.""" def __init__(self, query_job_config=None): """Initialize the BigqueryBuilder object.""" logger.info('Storing BigQuery Auth Credentials') key_file_contents = self._generate_bq_credentials() tfile = tempfile.NamedTemporaryFile(mode='w+', delete=True) tfile.write(key_file_contents) tfile.flush() tfile.seek(0) self.credential_path = tfile.name os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.credential_path if isinstance(query_job_config, bigquery.job.QueryJobConfig): self.query_job_config = query_job_config else: self.query_job_config = bigquery.job.QueryJobConfig() self.client = None if self.credential_path: self.client = bigquery.Client( default_query_job_config=self.query_job_config) else: raise ValueError("Please provide the the valid credential_path") tfile.close() def _generate_bq_credentials(self): """Create BigQuery Auth Credentials.""" logger.info("Creating BigQuery Auth Credentials") gcp_type = os.getenv("GCP_TYPE", "") gcp_project_id = os.getenv("GCP_PROJECT_ID", "") gcp_private_key_id = os.getenv("GCP_PRIVATE_KEY_ID", "") gcp_private_key = os.getenv("GCP_PRIVATE_KEY", "") gcp_client_email = os.getenv("GCP_CLIENT_EMAIL", "") gcp_client_id = os.getenv("GCP_CLIENT_ID", "") gcp_auth_uri = os.getenv("GCP_AUTH_URI", "") gcp_token_uri = os.getenv("GCP_TOKEN_URI", "") gcp_auth_provider_cert_url = os.getenv( "GCP_AUTH_PROVIDER_X509_CERT_URL", "") gcp_client_url = os.getenv("GCP_CLIENT_X509_CERT_URL", "") key_file_contents = \ """ {{ "type": "{type}", "project_id": "{project_id}", "private_key_id": "{private_key_id}", "private_key": "{private_key}", "client_email": "{client_email}", "client_id": "{client_id}", "auth_uri": "{auth_uri}", "token_uri": "{token_uri}", "auth_provider_x509_cert_url": "{auth_provider_cert_url}", "client_x509_cert_url": "{client_url}" }} """.format(type=gcp_type, project_id=gcp_project_id, private_key_id=gcp_private_key_id, private_key=gcp_private_key, client_email=gcp_client_email, client_id=gcp_client_id, auth_uri=gcp_auth_uri, token_uri=gcp_token_uri, auth_provider_cert_url=gcp_auth_provider_cert_url, client_url=gcp_client_url) return key_file_contents def _run_query(self, job_config=None): if self.client and self.query: self.job_query_obj = self.client.query( self.query, job_config=job_config) while not self.job_query_obj.done(): time.sleep(0.1) return self.job_query_obj.job_id else: raise ValueError
[docs] def run_query_sync(self): """Run the bigquery synchronously.""" return self._run_query()
[docs] def run_query_async(self): """Run the bigquery asynchronously.""" job_config = bigquery.QueryJobConfig() job_config.priority = bigquery.QueryPriority.BATCH return self._run_query(job_config=job_config)
[docs] def get_status(self, job_id): """Get the job status of async query.""" response = self.client.get_job(job_id) return response.state
[docs] def get_result(self, job_id=None, job_query_obj=None): """Get the result of the job.""" if job_id is None: job_query_obj = job_query_obj or self.job_query_obj for row in job_query_obj.result(): yield ({k: v for k, v in row.items()}) else: job_obj = self.client.get_job(job_id) while job_obj.state == 'PENDING': job_obj = self.client.get_job(job_id) logger.info("Job State for Job Id:{} is {}".format( job_id, job_obj.state)) time.sleep(_POLLING_DELAY) yield from self.get_result(job_query_obj=job_obj)
def __iter__(self): """Iterate over the query result.""" yield from self.get_result()
[docs]class DataProcessing: """Process the Bigquery Data.""" def __init__(self, s3_client=None): """Initialize DataProcessing object.""" self.s3_client = s3_client
[docs] def update_s3_bucket(self, data, bucket_name, filename='collated.json'): """Upload s3 bucket.""" if self.s3_client is None: # creat s3 client if not exists. self.s3_client = AmazonS3( bucket_name=bucket_name, aws_access_key_id=os.getenv('AWS_S3_ACCESS_KEY_ID'), aws_secret_access_key=os.getenv('AWS_S3_SECRET_ACCESS_KEY') ) # connect after creating or with existing s3 client self.s3_client.connect() if not self.s3_client.is_connected(): raise ValueError("Unable to connect to s3.") json_data = dict() if self.s3_client.object_exists(filename): logger.info("{} exists, updating it.".format(filename)) json_data = self.s3_client.read_json_file(filename) if not json_data: raise ValueError("Unable to get the json data path:{}/{}" .format(bucket_name, filename)) json_data.update(data) self.s3_client.write_json_file(filename, json_data) logger.info("Updated file Succefully!")