Package com.google.api.ads.adwords.awreporting.processors.onfile

Source Code of com.google.api.ads.adwords.awreporting.processors.onfile.RunnableProcessorOnFile

// Copyright 2013 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.api.ads.adwords.awreporting.processors.onfile;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.log4j.Logger;

import au.com.bytecode.opencsv.CSVReader;
import au.com.bytecode.opencsv.bean.MappingStrategy;

import com.google.api.ads.adwords.awreporting.model.csv.AwReportCsvReader;
import com.google.api.ads.adwords.awreporting.model.entities.Report;
import com.google.api.ads.adwords.awreporting.model.persistence.EntityPersister;
import com.google.api.ads.adwords.awreporting.model.util.CsvParserIterator;
import com.google.api.ads.adwords.awreporting.model.util.ModifiedCsvToBean;
import com.google.api.ads.adwords.awreporting.util.AdWordsSessionBuilderSynchronizer;
import com.google.api.ads.adwords.lib.jaxb.v201406.ReportDefinitionDateRangeType;
import com.google.common.collect.Lists;

/**
* This {@link Runnable} implements the core logic to download the report file
* from the AdWords API.
*
* The {@link Collection}s passed to this runner are considered to be synchronized and thread safe.
* This class has no blocking logic when adding elements to the collections.
*
* Also the {@link AdWordsSessionBuilderSynchronizer} is kept by the client class, and should
* handle all the concurrent threads.
*
* Parse the rows in the CSV file for the report type, and persists the beans into the data base.
*
* @author gustavomoreira@google.com (Gustavo Moreira)
* @author jtoledo@google.com (Julian Toledo)
*
* @param <R> type of sub Report.
*/
public class RunnableProcessorOnFile<R extends Report> implements Runnable {

  private static final Logger LOGGER = Logger.getLogger(RunnableProcessorOnFile.class);

  private CountDownLatch latch;

  private File file;
  private ModifiedCsvToBean<R> csvToBean;
  private MappingStrategy<R> mappingStrategy;
  private ReportDefinitionDateRangeType dateRangeType;
  private String dateStart;
  private String dateEnd;
  private String mccAccountId;
  private EntityPersister entityPersister;
  private int reportRowsSetSize;

  private Exception error = null;

  /**
   * C'tor.
   *
   * @param file the CSV file.
   * @param csvToBean the {@code CsvToBean}
   * @param mappingStrategy
   */
  public RunnableProcessorOnFile(File file, ModifiedCsvToBean<R> csvToBean,
      MappingStrategy<R> mappingStrategy, ReportDefinitionDateRangeType dateRangeType,
      String dateStart, String dateEnd, String mccAccountId, EntityPersister entityPersister,
      Integer reportRowsSetSize) {
    this.file = file;
    this.csvToBean = csvToBean;
    this.mappingStrategy = mappingStrategy;
    this.dateRangeType = dateRangeType;
    this.dateStart = dateStart;
    this.dateEnd = dateEnd;
    this.mccAccountId = mccAccountId;
    this.entityPersister = entityPersister;
    this.reportRowsSetSize = reportRowsSetSize;
  }

  /**
   * Executes the API call to download the report that was given when this {@code Runnable} was
   * created.
   *
   *  The download blocks this thread until it is finished, and also does the file copying.
   *
   *  There is also a retry logic implemented by this method, where the times retried depends on the
   * value given in the constructor.
   *
   * @see java.lang.Runnable#run()
   */
  @Override
  public void run() {

    try {
      CSVReader csvReader = this.createCsvReader(file);

      LOGGER.debug("Starting parse of report rows...");
      CsvParserIterator<R> reportRowsList = csvToBean.lazyParse(mappingStrategy, csvReader);
      LOGGER.debug("... success.");

      LOGGER.debug("Starting report persistence...");
      List<R> reportBuffer = Lists.newArrayList();
      while (reportRowsList.hasNext()) {

        R report = reportRowsList.next();

        // Getting Account Id from File Name for reports that do not have Client Customer Id
        if (report.getAccountId() == null && file.getName().contains("-")
            && file.getName().split("-") != null && file.getName().split("-").length > 2
            && file.getName().split("-")[2].matches("\\d*")) {
          report.setAccountId(Long.parseLong(file.getName().split("-")[2]));
        }

        report.setTopAccountId(Long.parseLong(this.mccAccountId.replaceAll("-", "")));
        report.setDateRangeType(dateRangeType.value());
        report.setDateStart(dateStart);
        report.setDateEnd(dateEnd);
        report.setId();

        reportBuffer.add(report);

        if (reportBuffer.size() >= this.reportRowsSetSize) {
          this.entityPersister.persistReportEntities(reportBuffer);
          reportBuffer.clear();
        }
      }
      if (reportBuffer.size() > 0) {
        this.entityPersister.persistReportEntities(reportBuffer);
      }
      LOGGER.debug("... success.");
      csvReader.close();

    } catch (Exception e) {
      error = new Exception("Error processing file: " + file.getAbsolutePath(), e);
      LOGGER.error("Error processing file: " + file.getAbsolutePath());
      e.printStackTrace();
    } finally {
      if (this.latch != null) {
        this.latch.countDown();
      }
    }
  }

  /**
   * Creates the proper {@link CSVReader} to parse the AW reports.
   *
   * @param file the CSV file.
   * @return the {@code CSVReader}
   * @throws UnsupportedEncodingException should not happen.
   * @throws FileNotFoundException in case the file has been deleted before the reading.
   */
  private CSVReader createCsvReader(File file)
      throws UnsupportedEncodingException, FileNotFoundException {

    LOGGER.debug("Creating AwReportCsvReader for file: " + file.getAbsolutePath() + ".gunzip");
    return new AwReportCsvReader(
        new InputStreamReader(new FileInputStream(file.getAbsolutePath() + ".gunzip"), "UTF-8"),
        ',', '\"', 1);
  }

  /**
   * @param latch the latch to set
   */
  public void setLatch(CountDownLatch latch) {
    this.latch = latch;
  }

  public Exception getError() {
    return error;
  }

  /**
   * @param entityPersister
   *            the entityPersister to set
   */
  public void setPersister(EntityPersister entityPersister) {
    this.entityPersister = entityPersister;
  }
}
TOP

Related Classes of com.google.api.ads.adwords.awreporting.processors.onfile.RunnableProcessorOnFile

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.