origin

Recently, I worked in a company to develop buried data analysis based on bigQuery, so I summarized my own packaged bigQuery query tool class (there are few articles about bigQuery online).

The conceptual functions of bigQuery can be found in the official bigQuery documentation

There are many operations involved in the example

A query code in the example

import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.FieldValueList; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobId; import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.TableResult; import java.util.UUID; public class SimpleApp { public static void main(String... args) throws Exception { BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder( "SELECT commit, author, repo_name " + "FROM `bigquery-public-data.github_repos.commits` " + "WHERE subject like '%bigquery%' " + "ORDER BY subject DESC LIMIT 10") // Use standard SQL syntax for queries. // See: https://cloud.google.com/bigquery/sql-reference/ .setUseLegacySql(false) .build(); // Create a job ID so that we can safely retry. JobId jobId = JobId.of(UUID.randomUUID().toString()); Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); // Wait for the query to complete. queryJob = queryJob.waitFor(); // Check for errors if (queryJob == null) { throw new RuntimeException("Job no longer exists"); } else if (queryJob.getStatus().getError() ! = null) { // You can also look at queryJob.getStatus().getExecutionErrors() for all // errors, not just the latest one. throw new RuntimeException(queryJob.getStatus().getError().toString()); } // Get the results. TableResult result = queryJob.getQueryResults(); // Print all pages of the results. for (FieldValueList row : result.iterateAll()) { // String type String commit = row.get("commit").getStringValue(); // Record type FieldValueList author = row.get("author").getRecordValue(); String name = author.get("name").getStringValue(); String email = author.get("email").getStringValue(); // String Repeated type String repoName = row.get("repo_name").getRecordValue().get(0).getStringValue(); System.out.printf( "Repo name: %s Author name: %s email: %s commit: %s\n", repoName, name, email, commit); }}}Copy the code

Create a JOB, wait for the query, and process the query result set. If I had to write this mess every time I wrote a query, it would be disgusting.

BigQuery: I’ll give you a piece of SQL, and you’ll give me the result.

Maven rely on

The first step is to introduce Maven dependencies, which are available on the official bigQuery example

<dependencyManagement> <dependencies> <dependency> <groupId>com.google.cloud</groupId> <artifactId>libraries-bom</artifactId> <version>24.0.0</version> <type> POm </type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-bigquery</artifactId> </dependency>Copy the code

Setting up authentication

To run the client library, you must set up authentication, which means you need your service account key to connect to and manipulate bigQuery,The official documentationThe solution he offers is to set environment variablesGOOGLE_APPLICATION_CREDENTIALSProvide authentication credentials to the application code

I think this way is not friendly for us, I can’t set environment variables on the server, we are all micro-services now, deployed on K8S, so I don’t know how to use this scheme (friends who know this way must tell me how to use it).

So what I did was add the authentication credentials JSON file to the project Resource and read the credentials as a stream.

@Value(value = "classpath:netpop-e792a-data-analytics.json")
private Resource dataAnalyticsResource;
Copy the code

Configuration BigQuery Bean

As you can see from the above query example, one of the most important beans is BigQuery, so register this Bean with the IOC container.

@configuration public class BigQueryConfiguration {// Load the authentication credentials @value (Value = "classpath:netpop-e792a-data-analytics.json") private Resource dataAnalyticsResource; // Configure the core Bean @bean BigQuery BigQuery () throws IOException {googlecredcredentials = GoogleCredentials.fromStream(dataAnalyticsResource.getInputStream()); BigQuery bigquery = BigQueryOptions.newBuilder().setCredentials(credentials).build().getService(); return bigquery; @bean BigQueryHelper BigQueryHelper (@autoWired bigQuery bigQuery) {return new BigQueryHelper(bigQuery); }}Copy the code

Utility class


package groot.data.analysis.support;

import com.google.cloud.bigquery.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.util.*;

/**
 * @Classname BigQueryHelper
 * @Description
 * @Date 2021/9/2 17:43
 * @Created by wangchangjiu
 */
@Slf4j
public class BigQueryHelper {

    private BigQuery bigQuery;

    public BigQueryHelper() {
    }

    public BigQueryHelper(BigQuery bigQuery) {
        this.bigQuery = bigQuery;
    }


    /**
     * 获取列表 返回类型的字段不支持复杂类型
     *
     * @param sql
     * @param returnType
     * @param <T>
     * @return
     * @throws InterruptedException
     */
    public <T> List<T> queryForList(String sql, Class<T> returnType) throws InterruptedException {
        TableResult result = execute(sql);
        Map<String, Field> fieldMap = getStringFieldMap(result);
        List<T> results = new ArrayList<>();
        result.iterateAll().forEach(row -> {
            T returnObj;
            try {
                returnObj = returnType.getDeclaredConstructor().newInstance();
            } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException ex) {
                throw new RuntimeException("reflect create object error :", ex);
            }
            ReflectionUtils.doWithFields(returnType, field -> {
                Field bigQueryField = fieldMap.get(field.getName());
                if (bigQueryField != null) {
                    FieldValue fieldValue = row.get(bigQueryField.getName());
                    if (bigQueryField.getType().getStandardType() == StandardSQLTypeName.STRUCT) {
                        throw new UnsupportedOperationException("unsupported returnType field include complex types");
                    }
                    field.setAccessible(true);
                    ReflectionUtils.setField(field, returnObj, resultWrapper(fieldValue, field.getType()));
                }
            });
            results.add(returnObj);
        });
        return results;
    }

    /**
     *  字段名和字段映射
     * @param result
     * @return
     */
    private Map<String, Field> getStringFieldMap(TableResult result) {
        FieldList fieldList = result.getSchema().getFields();
        Map<String, Field> fieldMap = new HashMap<>(fieldList.size());
        for (int i = 0; i < fieldList.size(); i++) {
            Field field = fieldList.get(i);
            fieldMap.put(field.getName(), field);
        }
        return fieldMap;
    }

    /**
     *  执行SQL 获取结果集
     * @param sql
     * @return
     * @throws InterruptedException
     */
    private TableResult execute(String sql) throws InterruptedException {
        Assert.notNull(sql, "SQL must not be null");
        QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(sql).setUseLegacySql(false).build();

        // Create a job ID so that we can safely retry.
        JobId jobId = JobId.of(UUID.randomUUID().toString());
        Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

        // Wait for the query to complete.
        queryJob = queryJob.waitFor();

        if (queryJob == null) {
            throw new RuntimeException("Job no longer exists");
        } else if (queryJob.getStatus().getError() != null) {
            throw new RuntimeException(queryJob.getStatus().getError().toString());
        }
        // Get the results.
        return queryJob.getQueryResults();
    }


    /**
     *  查询列表,实现 ResultSetExtractor 接口 自定义提取数据
     * @param sql
     * @param rse
     * @param <T>
     * @return
     * @throws InterruptedException
     */
    public <T> List<T> queryForList(String sql, ResultSetExtractor<T> rse) throws InterruptedException {
        TableResult tableResult = execute(sql);
        List<T> results = new ArrayList<>();
        tableResult.iterateAll().forEach(row -> results.add(rse.extractData(row)));
        return results;
    }

    /**
     *  查询返回单个结果集
     * @param sql
     * @param returnType
     * @param <T>
     * @return
     * @throws InterruptedException
     */
    public <T> T queryForSingleResult(String sql, Class<T> returnType) throws InterruptedException {
        TableResult tableResult = execute(sql);
        if (tableResult.iterateAll().iterator().hasNext()) {
            // 只有一行
            FieldValueList fieldValues = tableResult.iterateAll().iterator().next();
            if (isBasicType(returnType)) {
                return (T) resultWrapper(fieldValues.get(0), returnType);
            } else {
                T returnObj;
                try {
                    returnObj = returnType.getDeclaredConstructor().newInstance();
                } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException ex) {
                    throw new RuntimeException("reflect create object error :", ex);
                }

                Map<String, Field> fieldMap = getStringFieldMap(tableResult);
                ReflectionUtils.doWithFields(returnType, field -> {
                    Field bigQueryField = fieldMap.get(field.getName());
                    if (bigQueryField != null) {
                        FieldValue fieldValue = fieldValues.get(bigQueryField.getName());
                        if (bigQueryField.getType().getStandardType() == StandardSQLTypeName.STRUCT) {
                            throw new UnsupportedOperationException("unsupported returnType field include complex types");
                        }
                        field.setAccessible(true);
                        ReflectionUtils.setField(field, returnObj, resultWrapper(fieldValue, field.getType()));
                    }
                });
                return returnObj;
            }
        }
        return null;
    }

    /**
     *  结果类型处理
     * @param fieldValue
     * @param returnType
     * @return
     */
    private Object resultWrapper(FieldValue fieldValue, Class returnType) {
        if (returnType == Boolean.class || returnType == boolean.class) {
            return fieldValue.getBooleanValue();
        } else if (returnType == Long.class || returnType == long.class) {
            return fieldValue.getLongValue();
        } else if (returnType == Double.class || returnType == double.class) {
            return fieldValue.getDoubleValue();
        } else if (returnType == BigDecimal.class) {
            return fieldValue.getNumericValue();
        } else if (returnType == String.class) {
            return fieldValue.getStringValue();
        }
        return fieldValue.getValue();
    }

    /**
     *  判断是否是简单类型
     * @param returnType
     * @param <T>
     * @return
     */
    private <T> boolean isBasicType(Class<T> returnType) {
        return returnType == String.class || returnType.isPrimitive()
                || returnType == Boolean.class || returnType == Byte.class
                || returnType == Integer.class || returnType == Long.class
                || returnType == Double.class || returnType == Short.class
                || returnType == Float.class || returnType == BigDecimal.class;
    }

}
Copy the code

Here is the main external supply

Public <T> List<T> queryForList(String SQL, Class<T> returnType) throws InterruptedException Public <T> List<T> queryForList(String SQL, ResultSetExtractor<T> rSE) throws InterruptedException public <T> T queryForSingleResult(String SQL, Class<T> returnType) throws InterruptedExceptionCopy the code

So my main idea here is to use reflection to create a target object, to assign a field to it and of course there’s no support for return types that are nested objects, because that’s complicated, and I don’t have that scenario right now. There is also no support for other operations such as paging

Using utility classes

To do so, inject the bigQueryHelper utility class

This is a simple bigQuery package, but you can make it even more complete.