You are here

function sf_queue_handle_upserts in Salesforce Suite 7.2

Same name and namespace in other branches
  1. 6.2 sf_queue/sf_queue.module \sf_queue_handle_upserts()

Helper function to process queue updates and inserts (creates). The logic proceeds basically like this:

  • Get a list of object types that we're going to upsert by eliminating those queue item groups that do not meet the criteria (too many fails, threshold not met, etc.)
  • For each object type:
    • update up to 200 records at a time
    • then, create up to 200 records at a time
  • On each iteration of each major loop, break if we have exceeded our allotted processing time limit.
1 call to sf_queue_handle_upserts()
sf_queue_process_queue in sf_queue/sf_queue.module
@todo Please document this function.

File

sf_queue/sf_queue.module, line 286
sf_queue.module Implements export queue and administrativa for Salesforce API

Code

function sf_queue_handle_upserts($settings) {
  $ph_op = db_placeholders(array_filter($settings['cron_operations']), 'text');

  //$count_args = $settings['cron_operations'];
  $count_sql = 'SELECT sf_type, count(sf_type) as total
      FROM {salesforce_export_queue}
      WHERE sf_op IN ("create", "update")';
  $sql = "SELECT id, oid, fieldmap_name, drupal_type, sf_type, sfid, sf_data\n            FROM {salesforce_export_queue}\n            WHERE sf_op = '%s' AND sf_type = '%s'";
  $count_having = '';
  if ($settings['retry_num_attempts'] >= 0) {
    $count_sql .= ' AND attempts < %d';
    $sql .= ' AND attempts < %d';
    $count_args[] = $settings['retry_num_attempts'] + 1;
  }
  if ($settings['cron_min_threshold']) {
    $count_args[] = $settings['cron_min_threshold'];
    $count_having = ' HAVING total >= %d';
  }
  $count_sql .= " GROUP BY sf_type " . $count_having . " ORDER BY total DESC";
  error_log(print_r($count_sql, 1));

  // TODO Please convert this statement to the D7 database API syntax.
  $count_result = db_query($count_sql, $count_args);
  $sql .= ' ORDER BY created LIMIT ' . $settings['cron_num_objects'];
  $done = FALSE;
  while ($settings['start_time'] + $settings['limit'] > REQUEST_TIME && ($row = db_fetch_array($count_result))) {
    $sf_type = $row['sf_type'];
    foreach (array(
      'update',
      'create',
    ) as $op) {
      $op_done = FALSE;
      $args = array(
        $op,
        $sf_type,
        $settings['retry_num_attempts'] + 1,
      );
      while ($settings['start_time'] + $settings['limit'] > REQUEST_TIME && $op_done == FALSE) {
        $items = $ids = $items = $objects = array();

        // TODO Please convert this statement to the D7 database API syntax.
        $result = db_query($sql, $args);
        while ($queue_item = db_fetch_array($result)) {
          $ids[] = $queue_item['id'];
          $items[] = $queue_item;
          $object = unserialize($queue_item['sf_data']);
          if ($op == 'create') {
            unset($object->Id);
          }
          $objects[] = $object;
        }
        if (count($ids) > 0) {
          try {
            $sf = salesforce_api_connect();
            if (!$sf) {
              throw new Exception('Unable to connect to Salesforce');
            }
            $responses = $sf->client
              ->{$op}($objects, $sf_type);
            error_log(print_r($responses, 1));
            list($wins, $fails) = sf_queue_handle_responses($responses, $items);
          } catch (Exception $e) {
            sf_queue_handle_exception($e, $responses);
            continue;
          }
          error_log(print_r($wins, 1));
          error_log(print_r($fails, 1));
          if (count($fails) > 0) {
            watchdog('sf_queue', "Failed objects [<pre>" . print_r($objects, TRUE) . "</pre>]");
          }
          if (count($wins) == 0) {
            break;
          }

          // Instead of using salesforce_api_id_save to generate 2 SQL queries per
          // record, we collect them all to do 2 massive queries for all the
          // records at once.
          // @TODO: salesforce_api_id_save_multi
          $delete_sql = 'DELETE FROM {salesforce_object_map} WHERE oid IN (' . db_placeholders($wins) . ')';
          $insert_sql = 'INSERT INTO {salesforce_object_map} (drupal_type, oid, sfid, name) VALUES ';
          $row_ph = '("%s", %d, "%s", "%s")';
          $rows = '';
          $args = array();

          //, $drupal_type, $oid, $sfid, $name);
          $offset = 0;
          foreach ($wins as $sfid => $oid) {
            if (!empty($rows)) {
              $rows .= ', ';
            }
            $rows .= $row_ph;
            $drupal_type = $items[$offset]['drupal_type'];
            if (strpos($drupal_type, 'node_') === 0) {
              $drupal_type = 'node';
            }
            $args[] = $drupal_type;
            $args[] = $oid;
            $args[] = $sfid;
            $args[] = $items[$offset]['fieldmap_name'];
            $offset++;
          }
          $insert_sql .= ' ' . $rows;

          // TODO Please convert this statement to the D7 database API syntax.
          db_query($delete_sql, $wins);
          error_log(print_r($insert_sql, 1));
          error_log(print_r($args, 1));

          // TODO Please convert this statement to the D7 database API syntax.
          db_query($insert_sql, $args);
        }
        else {
          $op_done = TRUE;
          break;
        }
      }

      // while time limit for each operation
    }

    // foreach operation
  }

  // while sf_type loop
}