You are here

sf_import.module in Salesforce Suite 7.2

Same filename and directory in other branches
  1. 6.2 sf_import/sf_import.module

File

sf_import/sf_import.module
View source
<?php

define('SALESFORCE_PATH_ADMIN_IMPORT', SALESFORCE_PATH_ADMIN . '/import');

/**
 * Implements hook_menu().
 */
function sf_import_menu() {
  $items[SALESFORCE_PATH_ADMIN_IMPORT] = array(
    'title' => 'Import',
    'description' => 'Configure settings for regular imports of data.',
    'page callback' => 'drupal_get_form',
    'page arguments' => array(
      'sf_import_settings_form',
    ),
    'access arguments' => array(
      'administer salesforce',
    ),
    'file' => 'sf_import.admin.inc',
    'type' => MENU_LOCAL_TASK,
  );
  $items[SALESFORCE_PATH_ADMIN_IMPORT . '/create'] = array(
    'title' => 'Batch Import',
    'description' => 'Create a one-time batch import of Salesforce data',
    'page callback' => 'drupal_get_form',
    'page arguments' => array(
      'sf_import_create',
    ),
    'access arguments' => array(
      'administer salesforce',
    ),
    'file' => 'sf_import.admin.inc',
    'type' => MENU_LOCAL_TASK,
  );
  $items[SALESFORCE_PATH_ADMIN_IMPORT . '/overview'] = array(
    'title' => 'Import Settings',
    'access arguments' => array(
      'administer salesforce',
    ),
    'file' => 'sf_import.admin.inc',
    'type' => MENU_DEFAULT_LOCAL_TASK,
    'weight' => -10,
  );
  return $items;
}

/**
 * Implements hook_help().
 */
function sf_import_help($path, $arg) {
  switch ($path) {
    case 'admin/config/salesforce/import':
      return '<p>' . t('Select the fieldmaps you would like to use for regular
      imports of data from Salesforce to Drupal. <strong>This feature requires
      a properly configured cron job.</strong> Once fieldmaps have been selected,
       additional information regarding pending imports is available, along with
       the ability to manually trigger imports. <br />You can also run a one-time
        batch import of data from Salesforce to Drupal by visiting the "Batch Import" page.') . '</p>';
  }
}

/**
 * Implements hook_cron().
 */
function sf_import_cron() {
  if ($sf = salesforce_api_connect()) {

    // Get new records from Salesforce since last time cron was run
    if (variable_get('sf_import_cron_import', TRUE)) {

      // NOTE: gets to here.
      sf_import_import_records();
    }

    // Process the records (insert/update records)
    if (variable_get('sf_import_cron_process', TRUE)) {
      _sf_import_process_records();
    }
  }
}

/**
 * For the fieldmap provided, attempt to pull updated Salesforce IDs using
 * the SOQL query defined for the map.
 * @param object $map
 *   A Salesforce fieldmap object, @see salesforce_api_fieldmap_load().
 * @return FALSE if failed, or an object containing an array of Ids and the latest date covered.
 *  $response->ids = an array of SFIDS
 *  $response->latestDateCovered = timestamp of latest updated Salesforce object
 */
function _sf_import_get_soql_records($map, $start, $end) {

  // Ensure that there is a Salesforce object set in the fieldmap.
  if (!isset($map->salesforce)) {
    return FALSE;
  }
  $date_time = gmdate(DATE_ATOM, $start);
  $default_soql = 'SELECT Id, LastModifiedDate FROM ' . $map->salesforce . ' WHERE LastModifiedDate > ' . $date_time;
  $soql_where = variable_get('sf_import_' . $map->name . '_soql', NULL);
  $soql = NULL;
  $order_by = ' ORDER BY LastModifiedDate DESC';
  is_null($soql_where) ? $soql = $default_soql . $order_by : ($soql = $default_soql . ' AND ' . $soql_where . $order_by);
  if ($soql) {

    // Run the query
    $results = salesforce_api_query($soql, array(
      'queryAll' => FALSE,
      'queryMore' => TRUE,
      'limit' => 2000,
    ));
    salesforce_api_log(SALESFORCE_LOG_ALL, 'Query to retreive updated records: %soql. !count result(s) returned: <pre>%ret</pre>', array(
      '%soql' => print_r($soql, TRUE),
      '!count' => count($results),
      '%ret' => print_r($results, TRUE),
    ));
    if (empty($results)) {
      return FALSE;
    }
  }
  $response = new stdClass();
  $records = array();
  foreach ($results as $key => $object) {

    // Check to see that the last_import field in {salesforce_object_map} is older than
    // the LastModifiedDate on this object
    $last_import = db_query("SELECT last_import FROM {salesforce_object_map} WHERE sfid = :sfid", array(
      ':sfid' => $object->Id,
    ))
      ->fetchField();
    if ($last_import < strtotime($object->LastModifiedDate)) {
      $records[] = $object->Id;
    }
  }
  $response->ids = $records;
  $response->latestDateCovered = $results[0]->LastModifiedDate;

  // NOTE: it has IDs.
  return $response;
}

/**
 * This function is called on cron run. It is responsible for calling
 * functions to import records using the getUpdated() method or a custom SOQL
 * query, depending on what the user selected in admin settings for sf_import.
 */
function sf_import_import_records() {
  $fieldmaps = variable_get('sf_import_fieldmaps', salesforce_api_salesforce_fieldmap_load_all());
  $active_fieldmaps = array();
  foreach ($fieldmaps as $map_key => $map_value) {
    if ($map_value !== 0) {
      $active_fieldmaps[$map_key] = $map_value;
    }
  }
  if (!$active_fieldmaps) {
    return FALSE;
  }
  $records = array();

  // Get updated and/or deleted items for each fieldmap and store in sf_import_queue.
  // Start date is newest date of SFID stored in sf_import_queue, end date is REQUEST_TIME.
  foreach ($active_fieldmaps as $name => $map) {
    $map = salesforce_api_salesforce_fieldmap_load($name);

    // Skip to next fieldmap if the fieldmap couldn't be loaded
    // (such as when it doesn't exist in the database).
    if (!is_object($map)) {
      continue;
    }
    $start = db_query_range("SELECT time FROM {sf_import_queue} ORDER BY time", 0, 1)
      ->fetchField();
    if (!$start) {

      // If $start isn't set, then set the start to an hour back from the current time
      $start = variable_get('sf_import_queue_last_import', REQUEST_TIME - 3600);
    }
    $end = REQUEST_TIME;
    $import_method = variable_get('sf_import_' . $map->name . '_update_method', 'get_updated');

    // If the last time we checked for updated records was within the last
    // hour, then push the $start value back an hour.
    // This helps ensure that we don't skip over any updated records.
    // @todo Make this an admin configurable option.
    if ($end - $start < 3600 && $import_method == 'get_updated') {
      $start = $start - 3600;
    }

    // Set the time that the last import took place.
    variable_set('sf_import_queue_last_import', REQUEST_TIME);
    $import_function = $import_method == 'get_updated' ? 'salesforce_api_get_updated' : '_sf_import_get_soql_records';
    if ($updates = $import_function($map, $start, $end)) {
      $update_sfids = $updates->ids;

      // If there is only a single updated record, convert it to an array.
      if (!is_array($update_sfids)) {
        $update_sfids = array(
          $update_sfids,
        );
      }
      foreach ($update_sfids as $sfid) {
        $exists = db_query("SELECT sfid FROM {sf_import_queue} WHERE sfid = :sfid", array(
          ':sfid' => $sfid,
        ))
          ->fetchField();
        if (!$exists) {
          $object->time = REQUEST_TIME;
          $object->sfid = $sfid;
          $object->fieldmap = $map->name;

          // @todo: Replace the deprecated drupal_write_record() with db_update().
          $ret = drupal_write_record('sf_import_queue', $object);
        }
        $records[] = array(
          $sfid,
          $map->name,
          REQUEST_TIME,
        );
      }
    }
  }
  if (count($records) > 0) {
    variable_set('sf_import_queue_import_count', count($records));
    return $records;
  }
  else {
    variable_set('sf_import_queue_import_count', 0);
    return FALSE;
  }
}

/**
 * Processes items in the sf_import_queue table.
 */
function _sf_import_process_records() {

  // Process sf_import_queue items.
  $fieldmaps = salesforce_api_salesforce_fieldmap_load_all();
  $records = array();
  $result = db_query("SELECT sfid, fieldmap FROM {sf_import_queue}");
  foreach ($result as $sfids) {
    $fieldmap = $sfids->fieldmap;
    $type = $fieldmaps[$fieldmap]->drupal_entity;

    // Check to see if the fieldmap matches a Drupal entity.
    // If so, use the entity import function.
    if ($entity_info = entity_get_info($type)) {
      $type = 'entity';
    }
    $function = 'sf_' . $type . '_import';
    $drupal_id = salesforce_api_get_id_with_sfid($sfids->sfid, $fieldmap);
    if (function_exists($function)) {

      // @todo: Ensure that this sets the proper linkage.
      $oid = $function($sfids->sfid, $fieldmap, $drupal_id);
      $records[] = array(
        $sfids->sfid,
        $oid,
        $fieldmap,
      );
    }
    db_delete('sf_import_queue')
      ->condition('sfid', $sfids->sfid)
      ->execute();
  }
  if (count($records) > 0) {
    variable_set('sf_import_queue_processed_count', count($records));
    return $records;
  }
  else {
    variable_set('sf_import_queue_processed_count', 0);
    return FALSE;
  }
}

Functions

Namesort descending Description
sf_import_cron Implements hook_cron().
sf_import_help Implements hook_help().
sf_import_import_records This function is called on cron run. It is responsible for calling functions to import records using the getUpdated() method or a custom SOQL query, depending on what the user selected in admin settings for sf_import.
sf_import_menu Implements hook_menu().
_sf_import_get_soql_records For the fieldmap provided, attempt to pull updated Salesforce IDs using the SOQL query defined for the map.
_sf_import_process_records Processes items in the sf_import_queue table.

Constants