function salesforce_push_cron in Salesforce Suite 7.3
Same name and namespace in other branches
- 8.4 modules/salesforce_push/salesforce_push.module \salesforce_push_cron()
- 8.3 modules/salesforce_push/salesforce_push.module \salesforce_push_cron()
- 5.0.x modules/salesforce_push/salesforce_push.module \salesforce_push_cron()
Implements hook_cron().
File
- modules/
salesforce_push/ salesforce_push.module, line 316 - Push updates to Salesforce when a Drupal entity is updated.
Code
function salesforce_push_cron() {
$sfapi = salesforce_get_api();
if (!$sfapi
->isAuthorized()) {
return;
}
$queue = DrupalQueue::get(SALESFORCE_PUSH_QUEUE);
$limit = variable_get('salesforce_push_limit', 50);
$use_soap = module_exists('salesforce_soap') && variable_get('salesforce_soap_batch_enable', TRUE);
$entity_ids = array();
$delta = 0;
for ($i = 0; $i < $limit; $i++) {
$item = $queue
->claimItem();
// We do this after the "for()" so that when we reach the limit, we don't
// incidentally claim a queue license on an item we aren't going to process.
if (!$item) {
break;
}
$mapping = $item->data['mapping'];
// Initialize array for the entity type if it isn't set yet.
if (!isset($entity_ids[$item->data['entity_type']])) {
$entity_ids[$item->data['entity_type']] = array();
}
$entity_type = $item->data['entity_type'];
$entity_id = $item->data['entity_id'];
// Duplicate entity in the queue.
if (in_array($entity_id, $entity_ids[$item->data['entity_type']])) {
$queue
->deleteItem($item);
continue;
}
// Attempt to load our entity from the queue item.
$entity = entity_load_single($entity_type, $entity_id);
// If the entity fails to load, remove it from the queue. This can happen
// if we're processing records asynchronously and it was deleted from SF
// before cron ran.
if ($entity === FALSE) {
$queue
->deleteItem($item);
continue;
}
// Add entity id to array of pushed entities to check for duplicates later.
$entity_ids[$item->data['entity_type']][] = $entity_id;
if (!$use_soap) {
salesforce_push_sync_rest($entity_type, $entity, $mapping, $item->data['trigger']);
$queue
->deleteItem($item);
continue;
}
$mapping_object = salesforce_mapping_object_load_by_drupal($entity_type, $entity_id);
// Allow other modules to define or alter the mapping object.
drupal_alter('salesforce_push_mapping_object', $mapping_object, $entity, $mapping);
if ($item->data['trigger'] == SALESFORCE_MAPPING_SYNC_DRUPAL_DELETE && $mapping_object) {
$delete_list[$delta] = $mapping_object->salesforce_id;
continue;
}
$wrapper = entity_metadata_wrapper($item->data['entity_type'], $entity);
$key_field = $key_value = NULL;
$params = salesforce_push_map_params($mapping, $wrapper, $use_soap, !$mapping_object);
$synced_entities[$delta] = array(
'entity_wrapper' => $wrapper,
'mapping_object' => $mapping_object,
'queue_item' => $item,
'mapping' => $mapping,
);
// Setup SF record type. CampaignMember objects get their type from
// their Campaign.
// @TODO: remove object-specific logic. Figure out how this works and implement generic support for recordtype inheritence, or objects that don't support recordtypes
if ($mapping->salesforce_record_type_default != SALESFORCE_MAPPING_DEFAULT_RECORD_TYPE && empty($params['RecordTypeId']) && $mapping->salesforce_object_type != 'CampaignMember') {
$params['RecordTypeId'] = $mapping->salesforce_record_type_default;
}
$sobject = new stdClass();
$sobject->type = $mapping->salesforce_object_type;
foreach ($params as $key => $value) {
$sobject->fields[$key] = $value;
}
// If we have a SFID, this is an update.
if ($mapping_object && $mapping_object->salesforce_id) {
$sobject->Id = $mapping_object->salesforce_id;
$update_list[$delta] = $sobject;
continue;
}
// If we have a dedupe key, prefer upsert.
if ($mapping->dedupe_key && !empty($params[$mapping->dedupe_key])) {
$upsert_list[$mapping->dedupe_key][$delta] = $sobject;
}
else {
// Otherwise create.
$create_list[$delta] = $sobject;
}
// Remove item from queue.
$queue
->deleteItem($item);
$delta++;
}
if (!$use_soap) {
return;
}
// Use soap API to batch process records.
$soap = new SalesforceSoapPartner($sfapi, variable_get('salesforce_partner_wsdl', libraries_get_path('salesforce') . '/soapclient/partner.wsdl.xml'));
if (!empty($delete_list)) {
$results = $soap - trySoap('delete', $delete_list);
salesforce_push_process_soap_results('Delete', $results, $synced_entities);
}
if (!empty($create_list)) {
$results = $soap
->trySoap('create', $create_list);
salesforce_push_process_soap_results('Create', $results, $synced_entities);
}
if (!empty($update_list)) {
$results = $soap
->trySoap('update', $update_list);
salesforce_push_process_soap_results('Update', $results, $synced_entities);
}
if (!empty($upsert_list)) {
foreach ($upsert_list as $key => $upsert_item) {
$results = $soap
->trySoap('upsert', $key, $upsert_item);
salesforce_push_process_soap_results('Upsert', $results, $synced_entities);
}
}
}