Source code for rudra.data_store.bigquery.maven_bigquery

"""Maven bigquery implementation."""
from collections import Counter
import os
import time

from rudra.data_store.bigquery.base import BigqueryBuilder, DataProcessing
from rudra.utils.mercator import SimpleMercator
from rudra import logger


[docs]class MavenBigQuery(BigqueryBuilder): """MavenBigQuery Implementation.""" def __init__(self, *args, **kwargs): """Initialize MavenBigQuery object.""" super().__init__(*args, **kwargs) self.query_job_config.use_legacy_sql = False self.query_job_config.use_query_cache = True self.query_job_config.timeout_ms = 60000 self.query = """ SELECT con.content AS content FROM `bigquery-public-data.github_repos.contents` AS con INNER JOIN (SELECT files.id AS id FROM `bigquery-public-data.github_repos.languages` AS langs INNER JOIN `bigquery-public-data.github_repos.files` AS files ON files.repo_name = langs.repo_name WHERE REGEXP_CONTAINS(TO_JSON_STRING(language), r'(?i)java') AND files.path LIKE '%pom.xml' ) AS L ON con.id = L.id; """
[docs]class MavenBQDataProcessing(DataProcessing): """Implementation data processing for maven bigquery.""" def __init__(self, big_query_instance=None, s3_client=None, file_name='collated.json'): """Initialize the BigQueryDataProcessing object.""" super().__init__(s3_client) self.big_query_instance = big_query_instance or MavenBigQuery() self.big_query_content = list() self.counter = Counter() self.bucket_name = self.s3_client.bucket_name \ if self.s3_client else'developer-analytics-audit-report' self.filename = '{}/big-query-data/{}'.format( os.getenv('DEPLOYMENT_PREFIX', 'dev'), file_name)
[docs] def process(self): """Process Maven Bigquery response data.""" start = time.monotonic() _processed = 1 logger.info("Running Bigquery for maven synchronously") self.big_query_instance.run_query_sync() for content in self.big_query_instance.get_result(): logger.info("processing bigquery result. {}".format(_processed)) packages = sorted( set(self.construct_packages(content.get('content')))) if packages: pkg_string = ', '.join(packages) logger.info("PACKAGES: {}".format(pkg_string)) self.counter.update([pkg_string]) _processed += 1 logger.info("Processed All the manifests in time: {}".format( time.monotonic() - start)) logger.info("updating file content") self.update_s3_bucket(data={'maven': dict(self.counter.most_common())}, bucket_name=self.bucket_name, filename=self.filename) logger.info("Succefully Processed the MavenBigQuery")
[docs] def construct_packages(self, content): """Construct package list.""" result = list() allowed_scopes = ['compile', 'run', 'provided'] try: mercator_ins = SimpleMercator(content) for dep in mercator_ins.get_dependencies(): scope, aid, gid = str(dep.scope), str( dep.artifact_id), str(dep.group_id) if scope in allowed_scopes and aid and gid: result.append('{g}:{a}'.format( g=gid.strip(), a=aid.strip())) except Exception as _exc: logger.warn("IGNORE THIS ERROR {}".format(_exc)) logger.warn("CONTENT: {}".format(content)) return result