You are here

function salesforce_push_cron in Salesforce Suite 7.3

Same name and namespace in other branches
  1. 8.4 modules/salesforce_push/salesforce_push.module \salesforce_push_cron()
  2. 8.3 modules/salesforce_push/salesforce_push.module \salesforce_push_cron()
  3. 5.0.x modules/salesforce_push/salesforce_push.module \salesforce_push_cron()

Implements hook_cron().

File

modules/salesforce_push/salesforce_push.module, line 316
Push updates to Salesforce when a Drupal entity is updated.

Code

function salesforce_push_cron() {
  $sfapi = salesforce_get_api();
  if (!$sfapi
    ->isAuthorized()) {
    return;
  }
  $queue = DrupalQueue::get(SALESFORCE_PUSH_QUEUE);
  $limit = variable_get('salesforce_push_limit', 50);
  $use_soap = module_exists('salesforce_soap') && variable_get('salesforce_soap_batch_enable', TRUE);
  $entity_ids = array();
  $delta = 0;
  for ($i = 0; $i < $limit; $i++) {
    $item = $queue
      ->claimItem();

    // We do this after the "for()" so that when we reach the limit, we don't
    // incidentally claim a queue license on an item we aren't going to process.
    if (!$item) {
      break;
    }
    $mapping = $item->data['mapping'];

    // Initialize array for the entity type if it isn't set yet.
    if (!isset($entity_ids[$item->data['entity_type']])) {
      $entity_ids[$item->data['entity_type']] = array();
    }
    $entity_type = $item->data['entity_type'];
    $entity_id = $item->data['entity_id'];

    // Duplicate entity in the queue.
    if (in_array($entity_id, $entity_ids[$item->data['entity_type']])) {
      $queue
        ->deleteItem($item);
      continue;
    }

    // Attempt to load our entity from the queue item.
    $entity = entity_load_single($entity_type, $entity_id);

    // If the entity fails to load, remove it from the queue. This can happen
    // if we're processing records asynchronously and it was deleted from SF
    // before cron ran.
    if ($entity === FALSE) {
      $queue
        ->deleteItem($item);
      continue;
    }

    // Add entity id to array of pushed entities to check for duplicates later.
    $entity_ids[$item->data['entity_type']][] = $entity_id;
    if (!$use_soap) {
      salesforce_push_sync_rest($entity_type, $entity, $mapping, $item->data['trigger']);
      $queue
        ->deleteItem($item);
      continue;
    }
    $mapping_object = salesforce_mapping_object_load_by_drupal($entity_type, $entity_id);

    // Allow other modules to define or alter the mapping object.
    drupal_alter('salesforce_push_mapping_object', $mapping_object, $entity, $mapping);
    if ($item->data['trigger'] == SALESFORCE_MAPPING_SYNC_DRUPAL_DELETE && $mapping_object) {
      $delete_list[$delta] = $mapping_object->salesforce_id;
      continue;
    }
    $wrapper = entity_metadata_wrapper($item->data['entity_type'], $entity);
    $key_field = $key_value = NULL;
    $params = salesforce_push_map_params($mapping, $wrapper, $use_soap, !$mapping_object);
    $synced_entities[$delta] = array(
      'entity_wrapper' => $wrapper,
      'mapping_object' => $mapping_object,
      'queue_item' => $item,
      'mapping' => $mapping,
    );

    // Setup SF record type. CampaignMember objects get their type from
    // their Campaign.
    // @TODO: remove object-specific logic. Figure out how this works and implement generic support for recordtype inheritence, or objects that don't support recordtypes
    if ($mapping->salesforce_record_type_default != SALESFORCE_MAPPING_DEFAULT_RECORD_TYPE && empty($params['RecordTypeId']) && $mapping->salesforce_object_type != 'CampaignMember') {
      $params['RecordTypeId'] = $mapping->salesforce_record_type_default;
    }
    $sobject = new stdClass();
    $sobject->type = $mapping->salesforce_object_type;
    foreach ($params as $key => $value) {
      $sobject->fields[$key] = $value;
    }

    // If we have a SFID, this is an update.
    if ($mapping_object && $mapping_object->salesforce_id) {
      $sobject->Id = $mapping_object->salesforce_id;
      $update_list[$delta] = $sobject;
      continue;
    }

    // If we have a dedupe key, prefer upsert.
    if ($mapping->dedupe_key && !empty($params[$mapping->dedupe_key])) {
      $upsert_list[$mapping->dedupe_key][$delta] = $sobject;
    }
    else {

      // Otherwise create.
      $create_list[$delta] = $sobject;
    }

    // Remove item from queue.
    $queue
      ->deleteItem($item);
    $delta++;
  }
  if (!$use_soap) {
    return;
  }

  // Use soap API to batch process records.
  $soap = new SalesforceSoapPartner($sfapi, variable_get('salesforce_partner_wsdl', libraries_get_path('salesforce') . '/soapclient/partner.wsdl.xml'));
  if (!empty($delete_list)) {
    $results = $soap - trySoap('delete', $delete_list);
    salesforce_push_process_soap_results('Delete', $results, $synced_entities);
  }
  if (!empty($create_list)) {
    $results = $soap
      ->trySoap('create', $create_list);
    salesforce_push_process_soap_results('Create', $results, $synced_entities);
  }
  if (!empty($update_list)) {
    $results = $soap
      ->trySoap('update', $update_list);
    salesforce_push_process_soap_results('Update', $results, $synced_entities);
  }
  if (!empty($upsert_list)) {
    foreach ($upsert_list as $key => $upsert_item) {
      $results = $soap
        ->trySoap('upsert', $key, $upsert_item);
      salesforce_push_process_soap_results('Upsert', $results, $synced_entities);
    }
  }
}