You are here

salesforce_push.module in Salesforce Suite 7.3

Push updates to Salesforce when a Drupal entity is updated.

File

modules/salesforce_push/salesforce_push.module
View source
<?php

/**
 * @file
 * Push updates to Salesforce when a Drupal entity is updated.
 */
define('SALESFORCE_PUSH_QUEUE', 'salesforce_push');

/**
 * Implements hook_entity_insert().
 */
function salesforce_push_entity_insert($entity, $type) {
  salesforce_push_entity_crud($type, $entity, SALESFORCE_MAPPING_SYNC_DRUPAL_CREATE);
}

/**
 * Implements hook_entity_update().
 */
function salesforce_push_entity_update($entity, $type) {
  salesforce_push_entity_crud($type, $entity, SALESFORCE_MAPPING_SYNC_DRUPAL_UPDATE);
}

/**
 * Implements hook_entity_delete().
 */
function salesforce_push_entity_delete($entity, $type) {
  salesforce_push_entity_crud($type, $entity, SALESFORCE_MAPPING_SYNC_DRUPAL_DELETE);
}

/**
 * Push entities to Salesforce.
 *
 * @param string $entity_type
 *   Type of Drupal entity.
 * @param object $entity
 *   The entity object.
 * @param int $sf_sync_trigger
 *   The trigger being responded to.
 */
function salesforce_push_entity_crud($entity_type, $entity, $sf_sync_trigger) {

  // Avoid duplicate processing if this entity has just been updated by
  // Salesforce pull.
  if (isset($entity->salesforce_pull) && $entity->salesforce_pull) {
    return FALSE;
  }
  list($entity_id, , $bundle) = entity_extract_ids($entity_type, $entity);
  $mappings = salesforce_mapping_load_multiple(array(
    'drupal_entity_type' => $entity_type,
    'drupal_bundle' => $bundle,
  ));
  foreach ($mappings as $mapping) {
    if ($mapping->sync_triggers & $sf_sync_trigger) {

      // Allow other modules to prevent a sync per-mapping.
      foreach (module_implements('salesforce_push_entity_allowed') as $module) {
        if (module_invoke($module, 'salesforce_push_entity_allowed', $entity_type, $entity, $sf_sync_trigger, $mapping) === FALSE) {
          continue 2;
        }
      }
      if (isset($mapping->push_async) && $mapping->push_async) {
        $queue = DrupalQueue::get(SALESFORCE_PUSH_QUEUE);
        $queue
          ->createItem(array(
          'entity_type' => $entity_type,
          'entity_id' => $entity_id,
          'mapping' => $mapping,
          'trigger' => $sf_sync_trigger,
        ));
        continue;
      }
      salesforce_push_sync_rest($entity_type, $entity, $mapping, $sf_sync_trigger);
    }
  }
}
function salesforce_push_form_salesforce_mapping_object_form_alter(&$form, &$form_state) {
  $form['salesforce_id']['#required'] = FALSE;
  $form['actions']['push'] = array(
    '#type' => 'submit',
    '#value' => t('Push'),
    '#submit' => array(
      'salesforce_push_mapping_object_form_submit',
    ),
  );
}
function salesforce_push_mapping_object_form_submit($form, &$form_state) {
  $values = $form_state['values'];
  $entity_type = $values['entity_type'];
  $entity_id = $values['entity_id'];
  $entity = entity_load_single($entity_type, $entity_id);
  $trigger = SALESFORCE_MAPPING_SYNC_DRUPAL_UPDATE;
  salesforce_push_entity_crud($entity_type, $entity, $trigger);

  // @TODO we can do better than this:
  // drupal_set_message('Push request submitted. If your mappings are set to push asynchronously, you need to run cron.');
}

/**
 * Sync Drupal entities and Salesforce objects using the REST API.
 *
 * @param string $entity_type
 *   Type of Drupal entity.
 * @param object $entity
 *   The entity object.
 * @param object $mapping
 *   Salesforce mapping object.
 * @param int $sf_sync_trigger
 *   Trigger for this sync.
 */
function salesforce_push_sync_rest($entity_type, $entity, $mapping, $sf_sync_trigger) {
  $sfapi = salesforce_get_api();

  // Not authorized, we need to bail this time around.
  if (!$sfapi
    ->isAuthorized()) {
    return;
  }
  list($entity_id) = entity_extract_ids($entity_type, $entity);
  $entity_wrapper = entity_metadata_wrapper($entity_type, $entity);
  $mapping_object = salesforce_mapping_object_load_by_drupal($entity_type, $entity_id, TRUE);

  // Allow other modules to define or alter the mapping object.
  drupal_alter('salesforce_push_mapping_object', $mapping_object, $entity, $mapping);
  $synced_entity = array(
    'entity_wrapper' => $entity_wrapper,
    'mapping_object' => $mapping_object,
    'queue_item' => FALSE,
    'mapping' => $mapping,
  );
  $op = '';

  // Delete SF object.
  if ($sf_sync_trigger == SALESFORCE_MAPPING_SYNC_DRUPAL_DELETE) {
    if ($mapping_object) {
      $op = 'Delete';
      try {
        $sfapi
          ->objectDelete($mapping->salesforce_object_type, $mapping_object->salesforce_id);
      } catch (SalesforceException $e) {
        $exception_message = t('Salesforce Exception during @event attempting @op : @exception');
        $message_vars = array(
          '@event' => 'push',
          '@op' => $op,
          '@exception' => $e
            ->getMessage(),
        );
        watchdog_exception('salesforce_push', $e, $exception_message, $message_vars);
        salesforce_set_message(format_string($exception_message, $message_vars), 'error');
        module_invoke_all('salesforce_push_fail', $op, $sfapi->response, $synced_entity);
      }
      salesforce_set_message(t('Salesforce object %sfid has been deleted.', array(
        '%sfid' => $mapping_object->salesforce_id,
      )));
      $mapping_object
        ->delete();
      module_invoke_all('salesforce_push_success', $op, $sfapi->response, $synced_entity);
    }

    // No mapped object or object was deleted.
    return;
  }

  // Generate parameter array from field mappings.
  $key_field = $key_value = NULL;
  $params = salesforce_push_map_params($mapping, $entity_wrapper, FALSE, !$mapping_object);

  // Entity is not linked to an SF object.
  if (!$mapping_object) {

    // Setup SF record type. CampaignMember objects get their Campaign's type.
    if ($mapping->salesforce_record_type_default != SALESFORCE_MAPPING_DEFAULT_RECORD_TYPE && !array_key_exists('RecordTypeId', $params) && $mapping->salesforce_object_type != 'CampaignMember') {
      $params['RecordTypeId'] = $mapping->salesforce_record_type_default;
    }
    try {

      // An external key has been specified, attempt an upsert().
      if (!empty($mapping->dedupe_key) && !empty($params[$mapping->dedupe_key])) {
        $key_value = $params[$mapping->dedupe_key];

        // External key values with punctuation need to be escaped.
        $encoded_value = urlencode($key_value);

        // For at least 'email' fields, periods also need to be escaped:
        // https://developer.salesforce.com/forums?id=906F000000099xPIAQ
        $encoded_value = str_replace('.', '%2E', $encoded_value);
        $op = 'Upsert';
        $data = $sfapi
          ->objectUpsert($mapping->salesforce_object_type, $mapping->dedupe_key, $encoded_value, $params);

        // Handle upsert responses.
        switch ($sfapi->response->code) {

          // On Upsert:update retrieve object.
          case '204':
            $sf_object = $sfapi
              ->objectReadbyExternalId($mapping->salesforce_object_type, $mapping->dedupe_key, $encoded_value);
            $data['id'] = $sf_object['Id'];
            break;

          // Handle duplicate records.
          case '300':
            $data['errorCode'] = $sfapi->response->error . ' (' . $mapping->dedupe_key . ':' . $key_value . ')';
            break;
        }
      }
      else {
        $op = 'Create';
        $data = $sfapi
          ->objectCreate($mapping->salesforce_object_type, $params);
      }
    } catch (SalesforceException $e) {
      $exception_message = t('Salesforce Exception during @event attempting @op : @exception');
      $message_vars = array(
        '@event' => 'push',
        '@op' => $op,
        '@exception' => $e
          ->getMessage(),
      );
      watchdog_exception('salesforce_push', $e, $exception_message, $message_vars);
      salesforce_set_message(format_string($exception_message, $message_vars), 'error');
      module_invoke_all('salesforce_push_fail', $op, $sfapi->response, $synced_entity);
      return;
    }

    // Success.
    if (empty($data['errorCode'])) {

      // Create mapping object, saved below.
      $mapping_object = entity_create('salesforce_mapping_object', array(
        'entity_id' => $entity_id,
        'entity_type' => $entity_type,
        'salesforce_id' => $data['id'],
        'last_sync_message' => t('Mapping object created via !function.', array(
          '!function' => __FUNCTION__,
        )),
        'last_sync_status' => SALESFORCE_MAPPING_STATUS_SUCCESS,
      ));
      module_invoke_all('salesforce_push_success', $op, $sfapi->response, $synced_entity);
    }
    else {
      $message = t('Failed to sync %label with Salesforce. @code: @message', array(
        '%label' => $entity_wrapper
          ->label(),
        '@code' => $data['errorCode'],
        '@message' => $data['message'],
      ));
      salesforce_set_message($message, 'error');
      watchdog('salesforce_push', $message);
      module_invoke_all('salesforce_push_fail', $op, $sfapi->response, $synced_entity);
      return;
    }
  }
  else {

    // Handle the case of mapped objects last sync being more recent than
    // the entity's timestamp, which is set by salesforce_mapping.
    if ($mapping_object->last_sync > $mapping_object->entity_updated) {
      return;
    }

    // Update SF object.
    try {
      $op = 'Update';
      $sfapi
        ->objectUpdate($mapping->salesforce_object_type, $mapping_object->salesforce_id, $params);
      $mapping_object->last_sync_message = t('Mapping object updated via !function.', array(
        '!function' => __FUNCTION__,
      ));
      $mapping_object->last_sync_status = SALESFORCE_MAPPING_STATUS_SUCCESS;
      salesforce_set_message(t('%name has been synchronized with Salesforce record %sfid.', array(
        '%name' => $entity_wrapper
          ->label(),
        '%sfid' => $mapping_object->salesforce_id,
      )));
      module_invoke_all('salesforce_push_success', $op, $sfapi->response, $synced_entity);
    } catch (SalesforceException $e) {
      $exception_message = t('Error syncing @type @id to Salesforce record @sfid. Error message: "@msg". @extra');
      $message_vars = array(
        '@type' => $mapping_object->entity_type,
        '@id' => $mapping_object->entity_id,
        '@sfid' => $mapping_object->salesforce_id,
        '@msg' => $e
          ->getMessage(),
        '@extra' => '',
      );
      if (in_array($e
        ->getMessage(), array(
        "invalid cross reference id",
        "Not Found",
      ))) {
        $message_vars['@extra'] = "Are you trying to sync with an object from a different Salesforce instance?";
      }
      watchdog_exception('salesforce_push', $e, $exception_message, $message_vars);
      salesforce_set_message(format_string($exception_message, $message_vars), 'error');
      $mapping_object->last_sync_message = truncate_utf8($e
        ->getMessage(), 255, FALSE, TRUE);
      $mapping_object->last_sync_status = SALESFORCE_MAPPING_STATUS_ERROR;
      module_invoke_all('salesforce_push_fail', $op, $sfapi->response, $synced_entity);
    }
  }
  $mapping_object->last_sync_action = 'push';
  $mapping_object->last_sync = REQUEST_TIME;
  $mapping_object
    ->save();
}

/**
 * Implements hook_queue_info().
 *
 * Provided for integration with 3rd party queue-monitoring tools (Drush,
 * Queue UI, etc.)
 */
function salesforce_push_queue_info() {
  $queues[SALESFORCE_PUSH_QUEUE] = array(
    'title' => t('Salesforce push queue'),
  );
  return $queues;
}

/**
 * Implements hook_cron().
 */
function salesforce_push_cron() {
  $sfapi = salesforce_get_api();
  if (!$sfapi
    ->isAuthorized()) {
    return;
  }
  $queue = DrupalQueue::get(SALESFORCE_PUSH_QUEUE);
  $limit = variable_get('salesforce_push_limit', 50);
  $use_soap = module_exists('salesforce_soap') && variable_get('salesforce_soap_batch_enable', TRUE);
  $entity_ids = array();
  $delta = 0;
  for ($i = 0; $i < $limit; $i++) {
    $item = $queue
      ->claimItem();

    // We do this after the "for()" so that when we reach the limit, we don't
    // incidentally claim a queue license on an item we aren't going to process.
    if (!$item) {
      break;
    }
    $mapping = $item->data['mapping'];

    // Initialize array for the entity type if it isn't set yet.
    if (!isset($entity_ids[$item->data['entity_type']])) {
      $entity_ids[$item->data['entity_type']] = array();
    }
    $entity_type = $item->data['entity_type'];
    $entity_id = $item->data['entity_id'];

    // Duplicate entity in the queue.
    if (in_array($entity_id, $entity_ids[$item->data['entity_type']])) {
      $queue
        ->deleteItem($item);
      continue;
    }

    // Attempt to load our entity from the queue item.
    $entity = entity_load_single($entity_type, $entity_id);

    // If the entity fails to load, remove it from the queue. This can happen
    // if we're processing records asynchronously and it was deleted from SF
    // before cron ran.
    if ($entity === FALSE) {
      $queue
        ->deleteItem($item);
      continue;
    }

    // Add entity id to array of pushed entities to check for duplicates later.
    $entity_ids[$item->data['entity_type']][] = $entity_id;
    if (!$use_soap) {
      salesforce_push_sync_rest($entity_type, $entity, $mapping, $item->data['trigger']);
      $queue
        ->deleteItem($item);
      continue;
    }
    $mapping_object = salesforce_mapping_object_load_by_drupal($entity_type, $entity_id);

    // Allow other modules to define or alter the mapping object.
    drupal_alter('salesforce_push_mapping_object', $mapping_object, $entity, $mapping);
    if ($item->data['trigger'] == SALESFORCE_MAPPING_SYNC_DRUPAL_DELETE && $mapping_object) {
      $delete_list[$delta] = $mapping_object->salesforce_id;
      continue;
    }
    $wrapper = entity_metadata_wrapper($item->data['entity_type'], $entity);
    $key_field = $key_value = NULL;
    $params = salesforce_push_map_params($mapping, $wrapper, $use_soap, !$mapping_object);
    $synced_entities[$delta] = array(
      'entity_wrapper' => $wrapper,
      'mapping_object' => $mapping_object,
      'queue_item' => $item,
      'mapping' => $mapping,
    );

    // Setup SF record type. CampaignMember objects get their type from
    // their Campaign.
    // @TODO: remove object-specific logic. Figure out how this works and implement generic support for recordtype inheritence, or objects that don't support recordtypes
    if ($mapping->salesforce_record_type_default != SALESFORCE_MAPPING_DEFAULT_RECORD_TYPE && empty($params['RecordTypeId']) && $mapping->salesforce_object_type != 'CampaignMember') {
      $params['RecordTypeId'] = $mapping->salesforce_record_type_default;
    }
    $sobject = new stdClass();
    $sobject->type = $mapping->salesforce_object_type;
    foreach ($params as $key => $value) {
      $sobject->fields[$key] = $value;
    }

    // If we have a SFID, this is an update.
    if ($mapping_object && $mapping_object->salesforce_id) {
      $sobject->Id = $mapping_object->salesforce_id;
      $update_list[$delta] = $sobject;
      continue;
    }

    // If we have a dedupe key, prefer upsert.
    if ($mapping->dedupe_key && !empty($params[$mapping->dedupe_key])) {
      $upsert_list[$mapping->dedupe_key][$delta] = $sobject;
    }
    else {

      // Otherwise create.
      $create_list[$delta] = $sobject;
    }

    // Remove item from queue.
    $queue
      ->deleteItem($item);
    $delta++;
  }
  if (!$use_soap) {
    return;
  }

  // Use soap API to batch process records.
  $soap = new SalesforceSoapPartner($sfapi, variable_get('salesforce_partner_wsdl', libraries_get_path('salesforce') . '/soapclient/partner.wsdl.xml'));
  if (!empty($delete_list)) {
    $results = $soap - trySoap('delete', $delete_list);
    salesforce_push_process_soap_results('Delete', $results, $synced_entities);
  }
  if (!empty($create_list)) {
    $results = $soap
      ->trySoap('create', $create_list);
    salesforce_push_process_soap_results('Create', $results, $synced_entities);
  }
  if (!empty($update_list)) {
    $results = $soap
      ->trySoap('update', $update_list);
    salesforce_push_process_soap_results('Update', $results, $synced_entities);
  }
  if (!empty($upsert_list)) {
    foreach ($upsert_list as $key => $upsert_item) {
      $results = $soap
        ->trySoap('upsert', $key, $upsert_item);
      salesforce_push_process_soap_results('Upsert', $results, $synced_entities);
    }
  }
}

/**
 * Process SOAP API batch results.
 *
 * Create or update mapped object entities and log results.
 *
 * @param string $op
 *   Operation performed.
 * @param array $results
 *   Array of result objects provided by Salesforce.
 * @param array $synced_entities
 *   Entities that were synced with Salesforce.
 */
function salesforce_push_process_soap_results($op, $results, $synced_entities) {
  foreach ($results as $key => $result) {
    $synced_entity = $synced_entities[$key];
    $mapping_object = empty($synced_entity['mapping_object']) ? FALSE : $synced_entity['mapping_object'];
    if ($result->success) {
      if (drupal_strtolower($op) == 'delete' && $mapping_object) {
        $mapping_object
          ->delete();
        return;
      }
      if (!$mapping_object) {

        // Create mapping object, saved below.
        $wrapper = $synced_entity['entity_wrapper'];
        list($entity_id) = entity_extract_ids($wrapper
          ->type(), $wrapper
          ->value());
        $mapping_object = entity_create('salesforce_mapping_object', array(
          'entity_id' => $entity_id,
          'entity_type' => $wrapper
            ->type(),
          'salesforce_id' => $result->id,
          'last_sync_message' => t('Mapping object created via !function.', array(
            '!function' => __FUNCTION__,
          )),
        ));
      }
      else {
        $mapping_object->last_sync_message = t('Mapping object updated via !function.', array(
          '!function' => __FUNCTION__,
        ));
      }
      $mapping_object->last_sync_status = SALESFORCE_MAPPING_STATUS_SUCCESS;
      $mapping_object->last_sync = REQUEST_TIME;
      $mapping_object->last_sync_action = 'push';
      $mapping_object
        ->save();
      watchdog('salesforce_push', '%op: Salesforce object %id', array(
        '%id' => $result->id,
        '%op' => $op,
      ));
      module_invoke_all('salesforce_push_success', $op, $result, $synced_entity);
    }
    else {

      // Otherwise, the item is considered failed.
      $error_messages = array();
      foreach ($result->errors as $error) {
        watchdog('salesforce_push', '%op error for Salesforce object %id. @code: @message', array(
          '%id' => $result->id,
          '@code' => $error->statusCode,
          '@message' => $error->message,
          '%op' => $op,
        ), WATCHDOG_ERROR);
        $error_messages[] = $error->message;
      }
      if ($mapping_object) {
        $mapping_object->last_sync = REQUEST_TIME;
        $mapping_object->last_sync_action = 'push';
        $mapping_object->last_sync_status = SALESFORCE_MAPPING_STATUS_ERROR;
        $mapping_object->last_sync_message = truncate_utf8(t('Push error via %function. Message: @message', array(
          '%function' => __FUNCTION__,
          '@message' => implode(' | ', $error_messages),
        )), 255, FALSE, TRUE);
        $mapping_object
          ->save();
      }
      module_invoke_all('salesforce_push_fail', $op, $result, $synced_entity);
    }
  }
}

/**
 * Map Drupal values to a Salesforce object.
 *
 * @param object $mapping
 *   Mapping object.
 * @param object $entity_wrapper
 *   Entity wrapper object.
 * @param bool $use_soap
 *   Flag to enforce use of the SOAP API.
 * @param bool $is_new
 *   Indicates whether a mapping object for this entity already exists.
 *
 * @return array
 *   Associative array of key value pairs.
 */
function salesforce_push_map_params($mapping, $entity_wrapper, $use_soap = FALSE, $is_new = TRUE) {
  foreach ($mapping->field_mappings as $fieldmap) {

    // Skip fields that aren't being pushed to Salesforce.
    if (!in_array($fieldmap['direction'], array(
      SALESFORCE_MAPPING_DIRECTION_DRUPAL_SF,
      SALESFORCE_MAPPING_DIRECTION_SYNC,
    ))) {
      continue;
    }

    // Skip fields that aren't updateable when a mapped object already exists.
    if (!$is_new && !$fieldmap['salesforce_field']['updateable']) {
      continue;
    }
    $fieldmap_type = salesforce_mapping_get_fieldmap_types($fieldmap['drupal_field']['fieldmap_type']);
    $value = call_user_func($fieldmap_type['push_value_callback'], $fieldmap, $entity_wrapper);
    $params[$fieldmap['salesforce_field']['name']] = $value;
  }
  drupal_alter('salesforce_push_params', $params, $mapping, $entity_wrapper);
  return $params;
}

/**
 * Implements hook_action_info().
 */
function salesforce_push_action_info() {
  return array(
    'salesforce_push_action' => array(
      'type' => 'entity',
      'label' => t('Push entity to Salesforce'),
      'configurable' => FALSE,
      'triggers' => array(
        'any',
      ),
    ),
  );
}

/**
 * Push entity to Salesforce.
 */
function salesforce_push_action(&$entity, $context) {
  $trigger = !empty($entity->is_new) && $entity->is_new ? SALESFORCE_MAPPING_SYNC_DRUPAL_CREATE : SALESFORCE_MAPPING_SYNC_DRUPAL_UPDATE;
  salesforce_push_entity_crud($context['entity_type'], $entity, $trigger);
}

Functions

Namesort descending Description
salesforce_push_action Push entity to Salesforce.
salesforce_push_action_info Implements hook_action_info().
salesforce_push_cron Implements hook_cron().
salesforce_push_entity_crud Push entities to Salesforce.
salesforce_push_entity_delete Implements hook_entity_delete().
salesforce_push_entity_insert Implements hook_entity_insert().
salesforce_push_entity_update Implements hook_entity_update().
salesforce_push_form_salesforce_mapping_object_form_alter
salesforce_push_mapping_object_form_submit
salesforce_push_map_params Map Drupal values to a Salesforce object.
salesforce_push_process_soap_results Process SOAP API batch results.
salesforce_push_queue_info Implements hook_queue_info().
salesforce_push_sync_rest Sync Drupal entities and Salesforce objects using the REST API.

Constants

Namesort descending Description
SALESFORCE_PUSH_QUEUE @file Push updates to Salesforce when a Drupal entity is updated.