You are here

salesforce_pull.module in Salesforce Suite 7.3

Pull updates from Salesforce when a Salesforce object is updated.

File

modules/salesforce_pull/salesforce_pull.module
View source
<?php

/**
 * @file
 * Pull updates from Salesforce when a Salesforce object is updated.
 */

/**
 * Define pull queue name.
 */
define('SALESFORCE_PULL_QUEUE', 'salesforce_pull');
define('SALESFORCE_MERGE_RECORD', 'salesforce_contact_merge_record');

/**
 * Implements hook_menu().
 */
function salesforce_pull_menu() {
  $items = array();
  $items['salesforce/webhook/pull'] = array(
    'title' => 'Salesforce pull webhook callback',
    'page callback' => 'salesforce_pull_webhook',
    'access callback' => 'salesforce_pull_webhook_access',
    'type' => MENU_CALLBACK,
  );
  return $items;
}

/**
 * Access callback for Salesforce webhooks.
 */
function salesforce_pull_webhook_access() {
  if (variable_get('salesforce_pull_webhook_enable') && isset($_GET['key']) && $_GET['key'] === variable_get('salesforce_pull_webhook_key')) {
    return user_access('access content');
  }
  return FALSE;
}

/**
 * Webhook callback for salesforce pull. Returns status of 200 for successful
 * attempt or 403 for a failed pull attempt (SF not authorized, threshhold
 * reached, etc.
 */
function salesforce_pull_webhook() {
  if (salesforce_pull()) {
    $code = '200';

    // Queue is populated, but not processed yet so we manually do some of what
    // drupal_cron_run() does to trigger processing of our pull queue.
    $queues = salesforce_pull_cron_queue_info();
    $info = $queues[SALESFORCE_PULL_QUEUE];
    $callback = $info['worker callback'];
    $end = time() + (isset($info['time']) ? $info['time'] : 15);
    $queue = DrupalQueue::get(SALESFORCE_PULL_QUEUE);
    while (time() < $end && ($item = $queue
      ->claimItem())) {
      try {
        call_user_func($callback, $item->data);
        $queue
          ->deleteItem($item);
      } catch (Exception $e) {

        // In case of exception log it and leave the item in the queue
        // to be processed again later.
        watchdog_exception('salesforce_pull', $e);
      }
    }
  }
  else {
    $code = '403';
  }
  http_response_code($code);
  drupal_exit();
}

/**
 * Implements hook_cron().
 */
function salesforce_pull_cron() {
  salesforce_pull();
}

/**
 * Callback for the standard pull process used by webhooks and cron.
 */
function salesforce_pull() {
  $sfapi = salesforce_get_api();
  if ($sfapi
    ->isAuthorized() && salesforce_pull_check_throttle()) {
    salesforce_pull_get_updated_records();
    salesforce_pull_get_deleted_records();

    // Store this request time for the throttle check.
    variable_set('salesforce_pull_last_sync', REQUEST_TIME);
    return TRUE;
  }

  // No pull happened.
  return FALSE;
}

/**
 * Implements hook_cron_queue_info().
 */
function salesforce_pull_cron_queue_info() {
  $queues[SALESFORCE_PULL_QUEUE] = array(
    'worker callback' => 'salesforce_pull_process_updated_records',
    // Set to a high max timeout in case pulling in lots of data from SF.
    'time' => 180,
  );
  return $queues;
}

/**
 * Implements hook_queue_info().
 *
 * Provided for integration with 3rd party queue-monitoring tools (Drush,
 * Queue UI, etc.)
 */
function salesforce_pull_queue_info() {
  $queues[SALESFORCE_PULL_QUEUE] = array(
    'title' => t('Salesforce pull queue'),
  );
  return $queues;
}

/**
 * Implements hook_form_FORM_ID_alter().
 */
function salesforce_pull_form_salesforce_settings_form_alter(&$form, &$form_state, $form_id) {
  $form['salesforce_pull_throttle'] = array(
    '#type' => 'textfield',
    '#title' => t('Pull throttle (seconds)'),
    '#description' => t('Number of seconds to wait between repeated salesforce pulls.<br>
      Prevents the webserver from becoming overloaded in case of too many cron runs, or webhook usage.'),
    '#default_value' => variable_get('salesforce_pull_throttle', 5),
    '#element_validate' => array(
      'element_validate_number',
    ),
  );
  $webhooks_enabled = variable_get('salesforce_pull_webhook_enable', FALSE);
  $form['salesforce_pull_webhook_enable'] = array(
    '#type' => 'checkbox',
    '#title' => t('Enable webhooks'),
    '#description' => t('Allow external applications to trigger Salesforce sync with a web request.'),
    '#default_value' => $webhooks_enabled,
  );
  $webhook_key = variable_get('salesforce_pull_webhook_key', drupal_random_key());
  $url = url('salesforce/webhook/pull', array(
    'query' => array(
      'key' => $webhook_key,
    ),
    'absolute' => TRUE,
  ));
  $form['salesforce_pull_webhook_key'] = array(
    '#type' => 'textfield',
    '#title' => t('Webhook key'),
    '#description' => t('A secret key that is required in the webhook request url (@url).<br>
      Leave blank to auto-generate a new random key.', array(
      '@url' => $url,
    )),
    '#default_value' => $webhook_key,
    '#states' => array(
      // Only show this field when the 'webhook_enable' checkbox is enabled.
      'visible' => array(
        ':input[name="salesforce_pull_webhook_enable"]' => array(
          'checked' => TRUE,
        ),
      ),
    ),
    '#element_validate' => array(
      'salesforce_pull_webhook_key_validate',
    ),
  );
  module_load_include('inc', 'salesforce_mapping', 'includes/salesforce_mapping.admin');
  $form[SALESFORCE_MERGE_RECORD] = array(
    '#type' => 'select',
    '#title' => t('Salesforce Contact Merge Object'),
    '#options' => _salesforce_mapping_get_salesforce_object_type_options($form_state),
    '#default_value' => variable_get(SALESFORCE_MERGE_RECORD, NULL),
    '#weight' => 4,
    '#description' => t('Select a Salesforce object that describes merged Salesforce Contacts to automate merging. Not seeing an object? !settings_link', array(
      '!settings_link' => l(t('Check your Salesforce object filters settings.'), 'admin/config/salesforce/settings', array(
        'fragment' => 'sf_object_filters_anchor',
      )),
    )),
  );
}

/**
 * Element validation callback for the webhook key.
 */
function salesforce_pull_webhook_key_validate($element, &$form_state, $form) {

  // If there is no key set, generate a default key.
  if (empty($element['#value'])) {
    form_set_value($form['salesforce_pull_webhook_key'], drupal_random_key(), $form_state);
  }
}

/**
 * Determines if the Salesforce pull should be allowed, or throttled.
 *
 * Prevents too many pull processes from running at once.
 *
 * @return bool
 *    Returns false if the time elapsed between recent pulls is too short.
 */
function salesforce_pull_check_throttle() {
  $pull_throttle = variable_get('salesforce_pull_throttle', 5);
  $last_sync = variable_get('salesforce_pull_last_sync', 0);
  if (REQUEST_TIME > $last_sync + $pull_throttle) {
    return TRUE;
  }
  return FALSE;
}

/**
 * Pull updated records from Salesforce and place them in the queue.
 *
 * Executes a SOQL query based on defined mappings, loops through the results,
 * and places each updated SF object into the queue for later processing.
 */
function salesforce_pull_get_updated_records() {
  $queue = DrupalQueue::get(SALESFORCE_PULL_QUEUE);

  // Avoid overloading the processing queue and pass this time around if it's
  // over a configurable limit.
  if ($queue
    ->numberOfItems() > variable_get('salesforce_pull_max_queue_size', 100000)) {
    watchdog('salesforce_pull', 'Salesforce Pull aborted pulling updated records due to queue size being > %limit. Current queue size is %current', array(
      '%limit' => variable_get('salesforce_pull_max_queue_size', 100000),
      '%current' => $queue
        ->numberOfItems(),
    ), WATCHDOG_WARNING);
    return;
  }
  $sfapi = salesforce_get_api();
  foreach (salesforce_mapping_get_mapped_objects() as $type) {
    $soql = salesforce_pull_get_pull_query($type);
    if (empty($soql)) {
      continue;
    }
    $results = $sfapi
      ->query($soql);
    $version_path = parse_url($sfapi
      ->getApiEndPoint(), PHP_URL_PATH);
    if (!isset($results['errorCode'])) {

      // Write items to the queue.
      foreach ($results['records'] as $result) {
        $queue_item = array(
          'update' => $result,
        );
        $queue
          ->createItem($queue_item);
      }

      // Handle requests larger than the batch limit (usually 2000).
      $next_records_url = isset($results['nextRecordsUrl']) ? str_replace($version_path, '', $results['nextRecordsUrl']) : FALSE;
      while ($next_records_url) {
        $new_result = $sfapi
          ->apiCall($next_records_url);
        if (!isset($new_result['errorCode'])) {

          // Write items to the queue.
          foreach ($new_result['records'] as $result) {
            $queue_item = array(
              'update' => $result,
            );
            $queue
              ->createItem($queue_item);
          }
        }
        $next_records_url = isset($new_result['nextRecordsUrl']) ? str_replace($version_path, '', $new_result['nextRecordsUrl']) : FALSE;
      }
      variable_set('salesforce_pull_last_sync_' . $type, REQUEST_TIME);
    }
    else {
      watchdog('Salesforce Pull', $results['errorCode'] . ':' . $results['message'], array(), WATCHDOG_ERROR);
    }
  }
}

/**
 * Given a SObject type name, build an SOQL query to include all fields for all
 * SalesforceMappings mapped to that SObject.
 *
 * @param string $type
 *   e.g. "Contact", "Account", etc.
 *
 * @return SalesforceSelectQuery or NULL if no mappings or no mapped fields
 *   were found.
 *
 * @see SalesforceMapping::getMappedFields
 * @see SalesforceMapping::getMappedRecordTypes
 */
function salesforce_pull_get_pull_query($type) {
  $mapped_fields = array();
  $mapped_record_types = array();

  // Iterate over each field mapping to determine our query parameters.
  foreach (salesforce_mapping_load_multiple(array(
    'salesforce_object_type' => $type,
  )) as $mapping) {

    // Check the sync settings for create or update trigger.
    if (!($mapping->sync_triggers & (SALESFORCE_MAPPING_SYNC_SF_CREATE | SALESFORCE_MAPPING_SYNC_SF_UPDATE))) {

      // Skip this mapping.
      continue;
    }
    $mapped_fields = array_merge($mapped_fields, $mapping
      ->getMappedFields(array(
      SALESFORCE_MAPPING_DIRECTION_SYNC,
      SALESFORCE_MAPPING_DIRECTION_SF_DRUPAL,
    )));

    // If Record Type is specified, restrict query.
    $mapping_record_types = $mapping
      ->getMappedRecordTypes();

    // If Record Type is not specified for a given mapping, ensure query is unrestricted.
    if (empty($mapping_record_types)) {
      $mapped_record_types = FALSE;
    }
    elseif (is_array($mapped_record_types)) {
      $mapped_record_types = array_merge($mapped_record_types, $mapping_record_types);
    }
  }

  // There are no field mappings configured to pull data from Salesforce so
  // move on to the next mapped object. Prevents querying unmapped data.
  if (empty($mapped_fields)) {
    return NULL;
  }
  $soql = new SalesforceSelectQuery($type);

  // Convert field mappings to SOQL.
  $soql->fields = array_merge($mapped_fields, array(
    'Id' => 'Id',
    $mapping->pull_trigger_date => $mapping->pull_trigger_date,
  ));

  // If no lastupdate, get all records, else get records since last pull.
  $sf_last_sync = variable_get('salesforce_pull_last_sync_' . $type, NULL);
  if ($sf_last_sync) {
    $last_sync = gmdate('Y-m-d\\TH:i:s\\Z', $sf_last_sync);
    $soql
      ->addCondition($mapping->pull_trigger_date, $last_sync, '>');
  }
  if (!empty($mapped_record_types)) {
    $soql
      ->addCondition('RecordTypeId', $mapped_record_types, 'IN');
  }
  return $soql;
}

/**
 * Process records in the queue.
 */
function salesforce_pull_process_updated_records($sf_object) {
  if (isset($sf_object['delete'])) {
    salesforce_pull_process_deleted_records($sf_object['delete']);
  }
  else {
    if (isset($sf_object['merge'])) {
      salesforce_pull_process_merged_records($sf_object['merge']);
    }
    else {
      if (isset($sf_object['update'])) {
        $sf_object = $sf_object['update'];
      }

      // Get Mapping.
      $mapping_conditions = array(
        'salesforce_object_type' => $sf_object['attributes']['type'],
      );
      if (isset($sf_object['RecordTypeId']) && $sf_object['RecordTypeId'] != SALESFORCE_MAPPING_DEFAULT_RECORD_TYPE) {
        $mapping_conditions['salesforce_record_type'] = $sf_object['RecordTypeId'];
      }
      $sf_mappings = salesforce_mapping_load_multiple($mapping_conditions);
      $hold_exceptions = count($sf_mappings) > 1;
      $exception = FALSE;
      foreach ($sf_mappings as $sf_mapping) {

        // Mapping object exists?
        $mapping_object = salesforce_mapping_object_load_by_sfid($sf_object['Id']);

        // Trigger a hook to allow other modules to prevent this object from
        // creating or updating an entity.
        foreach (module_implements('salesforce_pull_allow_sf_object') as $module) {
          if (module_invoke($module, 'salesforce_pull_allow_sf_object', $sf_object, $mapping_object, $sf_mapping) === FALSE) {
            continue 2;
          }
        }

        // Allow other modules to define or alter the mapping object.
        drupal_alter('salesforce_pull_mapping_object', $mapping_object, $sf_object, $sf_mapping);
        $exists = $mapping_object ? TRUE : FALSE;
        if ($exists && $sf_mapping->sync_triggers & SALESFORCE_MAPPING_SYNC_SF_UPDATE) {
          try {
            $entity = entity_load_single($mapping_object->entity_type, $mapping_object->entity_id);
            if ($entity === FALSE) {
              $exists = FALSE;
              $message = t('Unable to update %type entity %label from Salesforce object %sfobjectid. Entity does not exist. Mapping removed, continuing with Create instead Update.', array(
                '%type' => $mapping_object->entity_type,
                '%label' => $mapping_object->entity_id,
                '%sfobjectid' => $sf_object['Id'],
              ));
              watchdog('Salesforce Pull', $message, array(), WATCHDOG_NOTICE);
              salesforce_set_message($message, 'status', FALSE);
              entity_delete('salesforce_mapping_object', $mapping_object->salesforce_mapping_object_id);
            }
            else {

              // Flag this entity as having been processed. This does not persist,
              // but is used by salesforce_push to avoid duplicate processing.
              $entity->salesforce_pull = TRUE;
              $entity_updated = isset($entity->updated) ? $entity->updated : $mapping_object->entity_updated;
              $sf_object_updated = strtotime($sf_object[$sf_mapping->pull_trigger_date]);
              if ($sf_object_updated > $entity_updated) {
                $wrapper = entity_metadata_wrapper($sf_mapping->drupal_entity_type, $entity);

                // Set fields values on the Drupal entity.
                salesforce_pull_map_fields($sf_mapping->field_mappings, $wrapper, $sf_object);

                // Allow modules to react just prior to entity save.
                module_invoke_all('salesforce_pull_entity_presave', $wrapper
                  ->value(), $sf_object, $sf_mapping);

                // Update entity.
                $wrapper
                  ->save();

                // Allow modules to react to entity update.
                module_invoke_all('salesforce_pull_entity_update', $wrapper
                  ->value(), $sf_object, $sf_mapping);

                // Update mapping object.
                $mapping_object->last_sync_message = t('Retrieved updates from Salesforce');
                $mapping_object->last_sync_status = SALESFORCE_MAPPING_STATUS_SUCCESS;
                $mapping_object->entity_updated = $mapping_object->last_sync = time();
                watchdog('Salesforce Pull', 'Updated entity %label associated with Salesforce Object ID: %sfid', array(
                  '%label' => $wrapper
                    ->label(),
                  '%sfid' => $sf_object['Id'],
                ));
              }
            }
          } catch (Exception $e) {
            $message = t('Failed to update entity %label %id from Salesforce object %sfobjectid. Error: @msg', array(
              '%label' => $mapping_object->entity_type,
              '%id' => $mapping_object->entity_id,
              '%sfobjectid' => $sf_object['Id'],
              '@msg' => $e
                ->getMessage(),
            ));
            watchdog('Salesforce Pull', $message, array(), WATCHDOG_ERROR);
            salesforce_set_message($message, 'error', FALSE);
            $mapping_object->last_sync_status = SALESFORCE_MAPPING_STATUS_ERROR;
            $mapping_object->last_sync_message = t('Processing failed');
            $mapping_object->last_sync = time();
            if (!$hold_exceptions) {
              throw $e;
            }
            if (empty($exception)) {
              $exception = $e;
            }
            else {
              $my_class = get_class($e);
              $exception = new $my_class($e
                ->getMessage(), $e
                ->getCode(), $exception);
            }
          }
        }
        else {
          $exists = FALSE;
        }
        if (!$exists && $sf_mapping->sync_triggers & SALESFORCE_MAPPING_SYNC_SF_CREATE) {
          try {

            // Create entity from mapping object and field maps.
            $entity_info = entity_get_info($sf_mapping->drupal_entity_type);

            // Define values to pass to entity_create().
            $values = array();
            if (isset($entity_info['entity keys']['bundle']) && !empty($entity_info['entity keys']['bundle'])) {
              $values[$entity_info['entity keys']['bundle']] = $sf_mapping->drupal_bundle;

              // Because creating term via entities actually needs vid and won't be
              // fixed in Entity API (https://www.drupal.org/node/1409256).
              if (isset($values['vocabulary_machine_name'])) {
                $vocabulary = taxonomy_vocabulary_machine_name_load($values['vocabulary_machine_name']);
                $values['vid'] = $vocabulary->vid;
              }
            }
            else {

              // Not all entities will have bundle defined under entity keys,
              // e.g. the User entity.
              $values[$sf_mapping->drupal_bundle] = $sf_mapping->drupal_bundle;
            }

            // See note above about flag.
            $values['salesforce_pull'] = TRUE;

            // Create entity.
            $entity = entity_create($sf_mapping->drupal_entity_type, $values);

            // Flag this entity as having been processed. This does not persist,
            // but is used by salesforce_push to avoid duplicate processing.
            $entity->salesforce_pull = TRUE;
            $wrapper = entity_metadata_wrapper($sf_mapping->drupal_entity_type, $entity);
            salesforce_pull_map_fields($sf_mapping->field_mappings, $wrapper, $sf_object);

            // Allow modules to react just prior to entity save.
            module_invoke_all('salesforce_pull_entity_presave', $wrapper
              ->value(), $sf_object, $sf_mapping);
            $wrapper
              ->save();

            // Allow modules to react to entity creation.
            module_invoke_all('salesforce_pull_entity_insert', $wrapper
              ->value(), $sf_object, $sf_mapping);

            // Update mapping object.
            $last_sync_message = t('Retrieved new record from Salesforce');
            $last_sync_status = SALESFORCE_MAPPING_STATUS_SUCCESS;
            $entity_updated = time();
          } catch (Exception $e) {
            $message = $e
              ->getMessage() . ' ' . t('Processing failed for entity %label associated with Salesforce Object ID: %sfobjectid', array(
              '%label' => $wrapper
                ->label(),
              '%sfobjectid' => $sf_object['Id'],
            ));
            watchdog('Salesforce Pull', $message, array(), WATCHDOG_ERROR);
            salesforce_set_message(t('There were failures processing data from Salesforce. Please check the error logs.'), 'error', FALSE);
            $last_sync_status = SALESFORCE_MAPPING_STATUS_ERROR;
            $last_sync_message = t('Processing failed for new record');
            $entity_updated = NULL;
            if (!$hold_exceptions) {
              throw $e;
            }
            if (empty($exception)) {
              $exception = $e;
            }
            else {
              $my_class = get_class($e);
              $exception = new $my_class($e
                ->getMessage(), $e
                ->getCode(), $exception);
            }
          }

          // If no id exists, the insert failed and we cannot create a mapping
          // object. We are left with no choice but to throw an exception.
          list($entity_id) = entity_extract_ids($sf_mapping->drupal_entity_type, $entity);
          if (!$entity_id) {
            $sf_object_id = $sf_object['Id'];
            throw new Exception("Failed to create Drupal entity when processing data from Salesforce object: {$sf_object_id}.");
          }

          // Create mapping object.
          $mapping_object = entity_create('salesforce_mapping_object', array(
            'salesforce_id' => $sf_object['Id'],
            'entity_type' => $sf_mapping->drupal_entity_type,
            'entity_id' => $entity_id,
            'entity_updated' => $entity_updated,
            'last_sync' => time(),
            'last_sync_message' => $last_sync_message,
            'last_sync_status' => $last_sync_status,
          ));
          watchdog('Salesforce Pull', 'Created entity %label associated with Salesforce Object ID: %sfid', array(
            '%label' => $wrapper
              ->label(),
            '%sfid' => $sf_object['Id'],
          ));
        }

        // Save our mapped objects.
        if ($mapping_object) {
          $mapping_object->last_sync_action = 'pull';
          $mapping_object
            ->save();
        }
      }
      if (!empty($exception)) {
        throw $exception;
      }
    }
  }
}

/**
 * Get deleted records from salesforce.
 */
function salesforce_pull_get_deleted_records() {

  // Calculate which client to use. We default to the REST client but also
  // support SOAP if enabled. Note that deletions can only be queried via REST
  // with an API version >= 29.0.
  $sfapi = salesforce_get_api();
  $client = $sfapi;
  $use_soap = FALSE;
  if (class_exists('SalesforceSoapPartner')) {
    $client = new SalesforceSoapPartner($sfapi, variable_get('salesforce_partner_wsdl', libraries_get_path('salesforce') . '/soapclient/partner.wsdl.xml'));
    $use_soap = TRUE;
  }

  // Before we delete any records, we must check for merged records.
  $merge_record_type = variable_get(SALESFORCE_MERGE_RECORD, FALSE);
  if (!empty($merge_record_type)) {
    $merged_results = salesforce_pull_check_merged_records($merge_record_type);
  }

  // Load all unique SF record types that we have mappings for.
  foreach (array_reverse(salesforce_mapping_get_mapped_objects()) as $type) {
    $last_delete_sync = variable_get('salesforce_pull_delete_last_' . $type, REQUEST_TIME);
    variable_set('salesforce_pull_delete_last_' . $type, REQUEST_TIME);
    $version_path = parse_url($sfapi
      ->getApiEndPoint(), PHP_URL_PATH);
    $queue = DrupalQueue::get(SALESFORCE_PULL_QUEUE);
    $now = time();

    // getDeleted() constraint: startDate cannot be more than 30 days ago
    // (using an incompatible data may lead to exceptions).
    $last_delete_sync = $last_delete_sync > REQUEST_TIME - 2505600 ? $last_delete_sync : REQUEST_TIME - 2505600;

    // getDeleted() constraint: startDate must be at least one minute greater
    // than endDate.
    $now = $now > $last_delete_sync + 60 ? $now : $now + 60;
    $last_delete_sync_sf = gmdate('Y-m-d\\TH:i:s\\Z', $last_delete_sync);
    $now_sf = gmdate('Y-m-d\\TH:i:s\\Z', $now);

    // SOAP getDeleted() returns an object while the REST getDeleted() returns
    // an array, so cast all checked values to an object to avoid additional
    // conditional logic.
    if ($use_soap) {
      $results = (object) $client
        ->trySoap('getDeleted', $type, $last_delete_sync_sf, $now_sf);
    }
    else {
      $results = (object) $client
        ->getDeleted($type, $last_delete_sync_sf, $now_sf);
    }
    if (empty($results->deletedRecords)) {
      continue;
    }
    if (!isset($results->errorCode)) {

      // Convert deleted records to objects.
      foreach ($results->deletedRecords as $index => $result) {
        $results->deletedRecords[$index] = (object) $result;
      }

      // Handle merges from Salesforce.
      $merge_fields = array(
        'old_contact_key' => 'Old_Contact__c',
        'merged_contact_key' => 'Contact__c',
      );
      if (!empty($merge_record_type)) {
        drupal_alter('salesforce_pull_entity_merge_fields', $merge_fields);
        foreach ($results->deletedRecords as $result) {
          foreach ($merged_results['records'] as $merged_result) {
            if ($result->id == $merged_result[$merge_fields['old_contact_key']]) {
              $queue_item = array(
                'merge' => $merged_result,
              );
              $queue
                ->createItem($queue_item);
            }
          }
        }
      }

      // Write items to the queue.
      foreach ($results->deletedRecords as $result) {

        // Cast to array for consistency.
        $result = (array) $result;
        $result['type'] = $type;
        $queue_item = array(
          'delete' => $result,
        );
        $queue
          ->createItem($queue_item);
      }

      // Handle requests larger than the batch limit (usually 2000).
      $next_records_url = isset($results->nextRecordsUrl) ? str_replace($version_path, '', $results->nextRecordsUrl) : FALSE;
      while ($next_records_url) {
        $new_result = $sfapi
          ->apiCall($next_records_url);
        if (!isset($new_result['errorCode'])) {
          if (!empty($merge_record_type)) {
            foreach ($results->deletedRecords as $result) {
              foreach ($merged_results['records'] as $merged_result) {
                if ($result->id == $merged_result[$merge_fields['old_contact_key']]) {
                  $queue_item = array(
                    'merge' => $merged_result,
                  );
                  $queue
                    ->createItem($queue_item);
                }
              }
            }
          }

          // Write items to the queue.
          foreach ($new_result['records'] as $result) {
            $queue_item = array(
              'delete' => $result,
            );
            $queue
              ->createItem($queue_item);
          }
        }
        $next_records_url = isset($new_result->nextRecordsUrl) ? str_replace($version_path, '', $new_result->nextRecordsUrl) : FALSE;
      }
      variable_set('salesforce_pull_last_sync_' . $type, REQUEST_TIME);
    }
  }
}

/**
 * Process merged records from salesforce.
 */
function salesforce_pull_process_merged_records($sf_object) {

  // Handle merges from Salesforce.
  $merge_fields = array(
    'old_contact_key' => 'Old_Contact__c',
    'merged_contact_key' => 'Contact__c',
  );
  drupal_alter('salesforce_pull_entity_merge_fields', $merge_fields);
  $entity_to_delete = salesforce_mapping_object_load_by_sfid($sf_object[$merge_fields['old_contact_key']]);
  $merged_entity = salesforce_mapping_object_load_by_sfid($sf_object[$merge_fields['merged_contact_key']]);

  // Load and wrap the Drupal entity linked to the SF object.
  if ($entity_to_delete && $merged_entity) {
    $wrapper = entity_metadata_wrapper($entity_to_delete->entity_type, $entity_to_delete);

    // Prevent processing by salesforce_push. We hate circular logic.
    $merged_entity->salesforce_pull = TRUE;

    // Delete the mapped Drupal entity. Catch any exceptions as
    // we want to ensure the mapping object is deleted even in
    // case of an error.
    module_invoke_all('salesforce_pull_entity_merge', $entity_to_delete, $merged_entity);
    watchdog('Salesforce Pull', 'Merged entity %label with ID: %id associated with Salesforce Object ID: %sfid to %label with ID: %newid associated with Salesforce Object ID: %newsfid', array(
      '%label' => $wrapper
        ->label(),
      '%id' => $entity_to_delete->entity_id,
      '%sfid' => $sf_object[$merge_fields['old_contact_key']],
      '%newid' => $merged_entity->entity_id,
      '%newsfid' => $sf_object[$merge_fields['merged_contact_key']],
    ));
  }
}

/**
 * Process deleted records from salesforce.
 */
function salesforce_pull_process_deleted_records($sf_object) {
  $mapping_object = salesforce_mapping_object_load_by_sfid($sf_object['id']);

  // Load and wrap the Drupal entity linked to the SF object.
  if ($mapping_object) {
    $deleted_entity = entity_load_single($mapping_object->entity_type, $mapping_object->entity_id);

    // Check if entity exists before processing delete.
    if ($deleted_entity) {
      $wrapper = entity_metadata_wrapper($mapping_object->entity_type, $deleted_entity);

      // Load the mapping that manages these mapped objects.
      $sf_mapping = reset(salesforce_mapping_load_multiple(array(
        'salesforce_object_type' => $sf_object['type'],
        'drupal_entity_type' => $wrapper
          ->type(),
        'drupal_bundle' => $wrapper
          ->getBundle(),
      )));

      // Only delete the mapped Drupal entity if the
      // SALESFORCE_MAPPING_SYNC_SF_DELETE flag is set.
      if (!empty($sf_mapping) && $sf_mapping->sync_triggers & SALESFORCE_MAPPING_SYNC_SF_DELETE) {

        // Prevent processing by salesforce_push. We hate circular logic.
        $deleted_entity->salesforce_pull = TRUE;

        // Delete the mapped Drupal entity. Catch any exceptions as
        // we want to ensure the mapping object is deleted even in
        // case of an error.
        try {
          $wrapper
            ->delete();
        } catch (Exception $e) {
          watchdog_exception('salesforce_pull', $e);
        }
        watchdog('Salesforce Pull', 'Deleted entity %label with ID: %id associated with Salesforce Object ID: %sfid', array(
          '%label' => $wrapper
            ->label(),
          '%id' => $mapping_object->entity_id,
          '%sfid' => $sf_object['id'],
        ));
      }
    }

    // Delete mapping object.
    $mapping_object
      ->delete();
  }
}

/**
 * Get all the merged records since the last delete.
 *
 * @param string $merge_record_type
 *   The Salesforce record type containing the merge history.
 */
function salesforce_pull_check_merged_records($merge_record_type) {
  $sfapi = salesforce_get_api();
  $merge_fields = array(
    'type' => $merge_record_type,
    'merged_contact_key' => 'Contact__c',
    'old_contact_key' => 'Old_Contact__c',
    'modified_date' => 'LastModifiedDate',
  );
  drupal_alter('salesforce_pull_entity_merge_fields', $merge_fields);
  $objInfo = $sfapi
    ->objects(array(
    'name' => $merge_fields['type'],
  ));
  if (empty($objInfo)) {
    return array(
      'records' => array(),
    );
  }
  $last_delete_sync = variable_get('salesforce_pull_delete_last_' . $merge_fields['type'], REQUEST_TIME);
  $soql = new SalesforceSelectQuery($merge_fields['type']);

  // Convert field mappings to SOQL.
  $soql->fields = array(
    'Id' => 'Id',
    $merge_fields['merged_contact_key'] => $merge_fields['merged_contact_key'],
    $merge_fields['old_contact_key'] => $merge_fields['old_contact_key'],
    $merge_fields['modified_date'] => $merge_fields['modified_date'],
  );

  // If no lastupdate, get all records, else get records since last pull.
  $sf_last_sync = variable_get('salesforce_pull_last_sync_' . $merge_fields['type'], NULL);
  if ($sf_last_sync) {
    $last_sync = gmdate('Y-m-d\\TH:i:s\\Z', $sf_last_sync);
    $soql
      ->addCondition($last_delete_sync, $last_sync, '>');
  }

  // Execute query.
  $results = $sfapi
    ->query($soql);
  $version_path = parse_url($sfapi
    ->getApiEndPoint(), PHP_URL_PATH);
  if (!isset($results['errorCode'])) {
    $merged_records = $results;

    // Handle requests larger than the batch limit (usually 2000).
    $next_records_url = isset($results['nextRecordsUrl']) ? str_replace($version_path, '', $results['nextRecordsUrl']) : FALSE;
    while ($next_records_url) {
      $new_result = $sfapi
        ->apiCall($next_records_url);
      if (!isset($new_result['errorCode'])) {

        // Write items to the queue.
        foreach ($new_result['records'] as $result) {
          $merged_records[] = $result;
        }
      }
      $next_records_url = isset($new_result['nextRecordsUrl']) ? str_replace($version_path, '', $new_result['nextRecordsUrl']) : FALSE;
    }
    return $merged_records;
  }
  else {

    // Return a properly structure null result:
    return array(
      'records' => array(),
    );
  }
}

/**
 * Map field values.
 *
 * @param array $field_maps
 *   Array of field maps.
 * @param object $entity_wrapper
 *   Entity wrapper object.
 * @param object $sf_object
 *   sObject of the Salesforce record.
 */
function salesforce_pull_map_fields($field_maps, &$entity_wrapper, $sf_object) {
  $types = salesforce_mapping_get_fieldmap_types();
  foreach ($field_maps as $field_map) {
    $fieldmap_type = $field_map['drupal_field']['fieldmap_type'];
    if (!isset($types[$fieldmap_type]['pull_value_callback']) || !in_array($field_map['direction'], array(
      'sync',
      'sf_drupal',
    )) || !function_exists($types[$fieldmap_type]['pull_value_callback'])) {
      continue;
    }
    $drupal_fields_array = explode(':', $field_map['drupal_field']['fieldmap_value']);
    $parent = $entity_wrapper;
    foreach ($drupal_fields_array as $drupal_field) {
      if ($parent instanceof EntityListWrapper) {
        $child_wrapper = $parent
          ->get(0)->{$drupal_field};
      }
      else {
        $child_wrapper = $parent->{$drupal_field};
      }
      $parent = $child_wrapper;
    }
    $fieldmap_type = salesforce_mapping_get_fieldmap_types($field_map['drupal_field']['fieldmap_type']);
    $value = call_user_func($fieldmap_type['pull_value_callback'], $parent, $sf_object, $field_map);

    // Allow this value to be altered before assigning to the entity.
    drupal_alter('salesforce_pull_entity_value', $value, $field_map, $sf_object);
    try {
      if ($parent instanceof EntityListWrapper) {
        if (empty($value)) {
          $value = array();
        }
        elseif (!is_array($value)) {
          $value = array(
            $value,
          );
        }
      }
      if (empty($value) && count($drupal_fields_array) > 1 && empty($child_wrapper
        ->info()['parent']
        ->info()['required']) && !empty($child_wrapper
        ->info()['required'])) {

        // When a field with multiple properties (like a link field) has a
        // property that is required (url in the link field example) we cannot
        // set the property/child value to null. We can however set the parent
        // to null when it is not required. In case a field linked to a
        // property gets cleared in Salesforce we assume it's fine to clear the
        // parent of the property in Drupal.
        $child_wrapper
          ->info()['parent']
          ->set($value);
      }
      else {
        $parent
          ->set($value);
      }
    } catch (Exception $e) {
      $message = t('Exception during pull for @sfobj.@sffield @sfid to @dobj.@dprop @did with value @v: @e', array(
        '@sfobj' => $sf_object['attributes']['type'],
        '@sffield' => $field_map['salesforce_field']['name'],
        '@sfid' => $sf_object['Id'],
        '@dobj' => $entity_wrapper
          ->type(),
        '@dprop' => $field_map['drupal_field']['fieldmap_value'],
        '@did' => $entity_wrapper
          ->getIdentifier(),
        '@v' => $value,
        '@e' => $e
          ->getMessage(),
      ));
      throw new Exception($message, $e
        ->getCode(), $e);
    }
  }
}

Functions

Namesort descending Description
salesforce_pull Callback for the standard pull process used by webhooks and cron.
salesforce_pull_check_merged_records Get all the merged records since the last delete.
salesforce_pull_check_throttle Determines if the Salesforce pull should be allowed, or throttled.
salesforce_pull_cron Implements hook_cron().
salesforce_pull_cron_queue_info Implements hook_cron_queue_info().
salesforce_pull_form_salesforce_settings_form_alter Implements hook_form_FORM_ID_alter().
salesforce_pull_get_deleted_records Get deleted records from salesforce.
salesforce_pull_get_pull_query Given a SObject type name, build an SOQL query to include all fields for all SalesforceMappings mapped to that SObject.
salesforce_pull_get_updated_records Pull updated records from Salesforce and place them in the queue.
salesforce_pull_map_fields Map field values.
salesforce_pull_menu Implements hook_menu().
salesforce_pull_process_deleted_records Process deleted records from salesforce.
salesforce_pull_process_merged_records Process merged records from salesforce.
salesforce_pull_process_updated_records Process records in the queue.
salesforce_pull_queue_info Implements hook_queue_info().
salesforce_pull_webhook Webhook callback for salesforce pull. Returns status of 200 for successful attempt or 403 for a failed pull attempt (SF not authorized, threshhold reached, etc.
salesforce_pull_webhook_access Access callback for Salesforce webhooks.
salesforce_pull_webhook_key_validate Element validation callback for the webhook key.

Constants

Namesort descending Description
SALESFORCE_MERGE_RECORD
SALESFORCE_PULL_QUEUE Define pull queue name.