View source
<?php
define('SALESFORCE_PULL_QUEUE', 'salesforce_pull');
define('SALESFORCE_MERGE_RECORD', 'salesforce_contact_merge_record');
function salesforce_pull_menu() {
$items = array();
$items['salesforce/webhook/pull'] = array(
'title' => 'Salesforce pull webhook callback',
'page callback' => 'salesforce_pull_webhook',
'access callback' => 'salesforce_pull_webhook_access',
'type' => MENU_CALLBACK,
);
return $items;
}
function salesforce_pull_webhook_access() {
if (variable_get('salesforce_pull_webhook_enable') && isset($_GET['key']) && $_GET['key'] === variable_get('salesforce_pull_webhook_key')) {
return user_access('access content');
}
return FALSE;
}
function salesforce_pull_webhook() {
if (salesforce_pull()) {
$code = '200';
$queues = salesforce_pull_cron_queue_info();
$info = $queues[SALESFORCE_PULL_QUEUE];
$callback = $info['worker callback'];
$end = time() + (isset($info['time']) ? $info['time'] : 15);
$queue = DrupalQueue::get(SALESFORCE_PULL_QUEUE);
while (time() < $end && ($item = $queue
->claimItem())) {
try {
call_user_func($callback, $item->data);
$queue
->deleteItem($item);
} catch (Exception $e) {
watchdog_exception('salesforce_pull', $e);
}
}
}
else {
$code = '403';
}
http_response_code($code);
drupal_exit();
}
function salesforce_pull_cron() {
salesforce_pull();
}
function salesforce_pull() {
$sfapi = salesforce_get_api();
if ($sfapi
->isAuthorized() && salesforce_pull_check_throttle()) {
salesforce_pull_get_updated_records();
salesforce_pull_get_deleted_records();
variable_set('salesforce_pull_last_sync', REQUEST_TIME);
return TRUE;
}
return FALSE;
}
function salesforce_pull_cron_queue_info() {
$queues[SALESFORCE_PULL_QUEUE] = array(
'worker callback' => 'salesforce_pull_process_updated_records',
'time' => 180,
);
return $queues;
}
function salesforce_pull_queue_info() {
$queues[SALESFORCE_PULL_QUEUE] = array(
'title' => t('Salesforce pull queue'),
);
return $queues;
}
function salesforce_pull_form_salesforce_settings_form_alter(&$form, &$form_state, $form_id) {
$form['salesforce_pull_throttle'] = array(
'#type' => 'textfield',
'#title' => t('Pull throttle (seconds)'),
'#description' => t('Number of seconds to wait between repeated salesforce pulls.<br>
Prevents the webserver from becoming overloaded in case of too many cron runs, or webhook usage.'),
'#default_value' => variable_get('salesforce_pull_throttle', 5),
'#element_validate' => array(
'element_validate_number',
),
);
$webhooks_enabled = variable_get('salesforce_pull_webhook_enable', FALSE);
$form['salesforce_pull_webhook_enable'] = array(
'#type' => 'checkbox',
'#title' => t('Enable webhooks'),
'#description' => t('Allow external applications to trigger Salesforce sync with a web request.'),
'#default_value' => $webhooks_enabled,
);
$webhook_key = variable_get('salesforce_pull_webhook_key', drupal_random_key());
$url = url('salesforce/webhook/pull', array(
'query' => array(
'key' => $webhook_key,
),
'absolute' => TRUE,
));
$form['salesforce_pull_webhook_key'] = array(
'#type' => 'textfield',
'#title' => t('Webhook key'),
'#description' => t('A secret key that is required in the webhook request url (@url).<br>
Leave blank to auto-generate a new random key.', array(
'@url' => $url,
)),
'#default_value' => $webhook_key,
'#states' => array(
'visible' => array(
':input[name="salesforce_pull_webhook_enable"]' => array(
'checked' => TRUE,
),
),
),
'#element_validate' => array(
'salesforce_pull_webhook_key_validate',
),
);
module_load_include('inc', 'salesforce_mapping', 'includes/salesforce_mapping.admin');
$form[SALESFORCE_MERGE_RECORD] = array(
'#type' => 'select',
'#title' => t('Salesforce Contact Merge Object'),
'#options' => _salesforce_mapping_get_salesforce_object_type_options($form_state),
'#default_value' => variable_get(SALESFORCE_MERGE_RECORD, NULL),
'#weight' => 4,
'#description' => t('Select a Salesforce object that describes merged Salesforce Contacts to automate merging. Not seeing an object? !settings_link', array(
'!settings_link' => l(t('Check your Salesforce object filters settings.'), 'admin/config/salesforce/settings', array(
'fragment' => 'sf_object_filters_anchor',
)),
)),
);
}
function salesforce_pull_webhook_key_validate($element, &$form_state, $form) {
if (empty($element['#value'])) {
form_set_value($form['salesforce_pull_webhook_key'], drupal_random_key(), $form_state);
}
}
function salesforce_pull_check_throttle() {
$pull_throttle = variable_get('salesforce_pull_throttle', 5);
$last_sync = variable_get('salesforce_pull_last_sync', 0);
if (REQUEST_TIME > $last_sync + $pull_throttle) {
return TRUE;
}
return FALSE;
}
function salesforce_pull_get_updated_records() {
$queue = DrupalQueue::get(SALESFORCE_PULL_QUEUE);
if ($queue
->numberOfItems() > variable_get('salesforce_pull_max_queue_size', 100000)) {
watchdog('salesforce_pull', 'Salesforce Pull aborted pulling updated records due to queue size being > %limit. Current queue size is %current', array(
'%limit' => variable_get('salesforce_pull_max_queue_size', 100000),
'%current' => $queue
->numberOfItems(),
), WATCHDOG_WARNING);
return;
}
$sfapi = salesforce_get_api();
foreach (salesforce_mapping_get_mapped_objects() as $type) {
$soql = salesforce_pull_get_pull_query($type);
if (empty($soql)) {
continue;
}
$results = $sfapi
->query($soql);
$version_path = parse_url($sfapi
->getApiEndPoint(), PHP_URL_PATH);
if (!isset($results['errorCode'])) {
foreach ($results['records'] as $result) {
$queue_item = array(
'update' => $result,
);
$queue
->createItem($queue_item);
}
$next_records_url = isset($results['nextRecordsUrl']) ? str_replace($version_path, '', $results['nextRecordsUrl']) : FALSE;
while ($next_records_url) {
$new_result = $sfapi
->apiCall($next_records_url);
if (!isset($new_result['errorCode'])) {
foreach ($new_result['records'] as $result) {
$queue_item = array(
'update' => $result,
);
$queue
->createItem($queue_item);
}
}
$next_records_url = isset($new_result['nextRecordsUrl']) ? str_replace($version_path, '', $new_result['nextRecordsUrl']) : FALSE;
}
variable_set('salesforce_pull_last_sync_' . $type, REQUEST_TIME);
}
else {
watchdog('Salesforce Pull', $results['errorCode'] . ':' . $results['message'], array(), WATCHDOG_ERROR);
}
}
}
function salesforce_pull_get_pull_query($type) {
$mapped_fields = array();
$mapped_record_types = array();
foreach (salesforce_mapping_load_multiple(array(
'salesforce_object_type' => $type,
)) as $mapping) {
if (!($mapping->sync_triggers & (SALESFORCE_MAPPING_SYNC_SF_CREATE | SALESFORCE_MAPPING_SYNC_SF_UPDATE))) {
continue;
}
$mapped_fields = array_merge($mapped_fields, $mapping
->getMappedFields(array(
SALESFORCE_MAPPING_DIRECTION_SYNC,
SALESFORCE_MAPPING_DIRECTION_SF_DRUPAL,
)));
$mapping_record_types = $mapping
->getMappedRecordTypes();
if (empty($mapping_record_types)) {
$mapped_record_types = FALSE;
}
elseif (is_array($mapped_record_types)) {
$mapped_record_types = array_merge($mapped_record_types, $mapping_record_types);
}
}
if (empty($mapped_fields)) {
return NULL;
}
$soql = new SalesforceSelectQuery($type);
$soql->fields = array_merge($mapped_fields, array(
'Id' => 'Id',
$mapping->pull_trigger_date => $mapping->pull_trigger_date,
));
$sf_last_sync = variable_get('salesforce_pull_last_sync_' . $type, NULL);
if ($sf_last_sync) {
$last_sync = gmdate('Y-m-d\\TH:i:s\\Z', $sf_last_sync);
$soql
->addCondition($mapping->pull_trigger_date, $last_sync, '>');
}
if (!empty($mapped_record_types)) {
$soql
->addCondition('RecordTypeId', $mapped_record_types, 'IN');
}
return $soql;
}
function salesforce_pull_process_updated_records($sf_object) {
if (isset($sf_object['delete'])) {
salesforce_pull_process_deleted_records($sf_object['delete']);
}
else {
if (isset($sf_object['merge'])) {
salesforce_pull_process_merged_records($sf_object['merge']);
}
else {
if (isset($sf_object['update'])) {
$sf_object = $sf_object['update'];
}
$mapping_conditions = array(
'salesforce_object_type' => $sf_object['attributes']['type'],
);
if (isset($sf_object['RecordTypeId']) && $sf_object['RecordTypeId'] != SALESFORCE_MAPPING_DEFAULT_RECORD_TYPE) {
$mapping_conditions['salesforce_record_type'] = $sf_object['RecordTypeId'];
}
$sf_mappings = salesforce_mapping_load_multiple($mapping_conditions);
$hold_exceptions = count($sf_mappings) > 1;
$exception = FALSE;
foreach ($sf_mappings as $sf_mapping) {
$mapping_object = salesforce_mapping_object_load_by_sfid($sf_object['Id']);
foreach (module_implements('salesforce_pull_allow_sf_object') as $module) {
if (module_invoke($module, 'salesforce_pull_allow_sf_object', $sf_object, $mapping_object, $sf_mapping) === FALSE) {
continue 2;
}
}
drupal_alter('salesforce_pull_mapping_object', $mapping_object, $sf_object, $sf_mapping);
$exists = $mapping_object ? TRUE : FALSE;
if ($exists && $sf_mapping->sync_triggers & SALESFORCE_MAPPING_SYNC_SF_UPDATE) {
try {
$entity = entity_load_single($mapping_object->entity_type, $mapping_object->entity_id);
if ($entity === FALSE) {
$exists = FALSE;
$message = t('Unable to update %type entity %label from Salesforce object %sfobjectid. Entity does not exist. Mapping removed, continuing with Create instead Update.', array(
'%type' => $mapping_object->entity_type,
'%label' => $mapping_object->entity_id,
'%sfobjectid' => $sf_object['Id'],
));
watchdog('Salesforce Pull', $message, array(), WATCHDOG_NOTICE);
salesforce_set_message($message, 'status', FALSE);
entity_delete('salesforce_mapping_object', $mapping_object->salesforce_mapping_object_id);
}
else {
$entity->salesforce_pull = TRUE;
$entity_updated = isset($entity->updated) ? $entity->updated : $mapping_object->entity_updated;
$sf_object_updated = strtotime($sf_object[$sf_mapping->pull_trigger_date]);
if ($sf_object_updated > $entity_updated) {
$wrapper = entity_metadata_wrapper($sf_mapping->drupal_entity_type, $entity);
salesforce_pull_map_fields($sf_mapping->field_mappings, $wrapper, $sf_object);
module_invoke_all('salesforce_pull_entity_presave', $wrapper
->value(), $sf_object, $sf_mapping);
$wrapper
->save();
module_invoke_all('salesforce_pull_entity_update', $wrapper
->value(), $sf_object, $sf_mapping);
$mapping_object->last_sync_message = t('Retrieved updates from Salesforce');
$mapping_object->last_sync_status = SALESFORCE_MAPPING_STATUS_SUCCESS;
$mapping_object->entity_updated = $mapping_object->last_sync = time();
watchdog('Salesforce Pull', 'Updated entity %label associated with Salesforce Object ID: %sfid', array(
'%label' => $wrapper
->label(),
'%sfid' => $sf_object['Id'],
));
}
}
} catch (Exception $e) {
$message = t('Failed to update entity %label %id from Salesforce object %sfobjectid. Error: @msg', array(
'%label' => $mapping_object->entity_type,
'%id' => $mapping_object->entity_id,
'%sfobjectid' => $sf_object['Id'],
'@msg' => $e
->getMessage(),
));
watchdog('Salesforce Pull', $message, array(), WATCHDOG_ERROR);
salesforce_set_message($message, 'error', FALSE);
$mapping_object->last_sync_status = SALESFORCE_MAPPING_STATUS_ERROR;
$mapping_object->last_sync_message = t('Processing failed');
$mapping_object->last_sync = time();
if (!$hold_exceptions) {
throw $e;
}
if (empty($exception)) {
$exception = $e;
}
else {
$my_class = get_class($e);
$exception = new $my_class($e
->getMessage(), $e
->getCode(), $exception);
}
}
}
else {
$exists = FALSE;
}
if (!$exists && $sf_mapping->sync_triggers & SALESFORCE_MAPPING_SYNC_SF_CREATE) {
try {
$entity_info = entity_get_info($sf_mapping->drupal_entity_type);
$values = array();
if (isset($entity_info['entity keys']['bundle']) && !empty($entity_info['entity keys']['bundle'])) {
$values[$entity_info['entity keys']['bundle']] = $sf_mapping->drupal_bundle;
if (isset($values['vocabulary_machine_name'])) {
$vocabulary = taxonomy_vocabulary_machine_name_load($values['vocabulary_machine_name']);
$values['vid'] = $vocabulary->vid;
}
}
else {
$values[$sf_mapping->drupal_bundle] = $sf_mapping->drupal_bundle;
}
$values['salesforce_pull'] = TRUE;
$entity = entity_create($sf_mapping->drupal_entity_type, $values);
$entity->salesforce_pull = TRUE;
$wrapper = entity_metadata_wrapper($sf_mapping->drupal_entity_type, $entity);
salesforce_pull_map_fields($sf_mapping->field_mappings, $wrapper, $sf_object);
module_invoke_all('salesforce_pull_entity_presave', $wrapper
->value(), $sf_object, $sf_mapping);
$wrapper
->save();
module_invoke_all('salesforce_pull_entity_insert', $wrapper
->value(), $sf_object, $sf_mapping);
$last_sync_message = t('Retrieved new record from Salesforce');
$last_sync_status = SALESFORCE_MAPPING_STATUS_SUCCESS;
$entity_updated = time();
} catch (Exception $e) {
$message = $e
->getMessage() . ' ' . t('Processing failed for entity %label associated with Salesforce Object ID: %sfobjectid', array(
'%label' => $wrapper
->label(),
'%sfobjectid' => $sf_object['Id'],
));
watchdog('Salesforce Pull', $message, array(), WATCHDOG_ERROR);
salesforce_set_message(t('There were failures processing data from Salesforce. Please check the error logs.'), 'error', FALSE);
$last_sync_status = SALESFORCE_MAPPING_STATUS_ERROR;
$last_sync_message = t('Processing failed for new record');
$entity_updated = NULL;
if (!$hold_exceptions) {
throw $e;
}
if (empty($exception)) {
$exception = $e;
}
else {
$my_class = get_class($e);
$exception = new $my_class($e
->getMessage(), $e
->getCode(), $exception);
}
}
list($entity_id) = entity_extract_ids($sf_mapping->drupal_entity_type, $entity);
if (!$entity_id) {
$sf_object_id = $sf_object['Id'];
throw new Exception("Failed to create Drupal entity when processing data from Salesforce object: {$sf_object_id}.");
}
$mapping_object = entity_create('salesforce_mapping_object', array(
'salesforce_id' => $sf_object['Id'],
'entity_type' => $sf_mapping->drupal_entity_type,
'entity_id' => $entity_id,
'entity_updated' => $entity_updated,
'last_sync' => time(),
'last_sync_message' => $last_sync_message,
'last_sync_status' => $last_sync_status,
));
watchdog('Salesforce Pull', 'Created entity %label associated with Salesforce Object ID: %sfid', array(
'%label' => $wrapper
->label(),
'%sfid' => $sf_object['Id'],
));
}
if ($mapping_object) {
$mapping_object->last_sync_action = 'pull';
$mapping_object
->save();
}
}
if (!empty($exception)) {
throw $exception;
}
}
}
}
function salesforce_pull_get_deleted_records() {
$sfapi = salesforce_get_api();
$client = $sfapi;
$use_soap = FALSE;
if (class_exists('SalesforceSoapPartner')) {
$client = new SalesforceSoapPartner($sfapi, variable_get('salesforce_partner_wsdl', libraries_get_path('salesforce') . '/soapclient/partner.wsdl.xml'));
$use_soap = TRUE;
}
$merge_record_type = variable_get(SALESFORCE_MERGE_RECORD, FALSE);
if (!empty($merge_record_type)) {
$merged_results = salesforce_pull_check_merged_records($merge_record_type);
}
foreach (array_reverse(salesforce_mapping_get_mapped_objects()) as $type) {
$last_delete_sync = variable_get('salesforce_pull_delete_last_' . $type, REQUEST_TIME);
variable_set('salesforce_pull_delete_last_' . $type, REQUEST_TIME);
$version_path = parse_url($sfapi
->getApiEndPoint(), PHP_URL_PATH);
$queue = DrupalQueue::get(SALESFORCE_PULL_QUEUE);
$now = time();
$last_delete_sync = $last_delete_sync > REQUEST_TIME - 2505600 ? $last_delete_sync : REQUEST_TIME - 2505600;
$now = $now > $last_delete_sync + 60 ? $now : $now + 60;
$last_delete_sync_sf = gmdate('Y-m-d\\TH:i:s\\Z', $last_delete_sync);
$now_sf = gmdate('Y-m-d\\TH:i:s\\Z', $now);
if ($use_soap) {
$results = (object) $client
->trySoap('getDeleted', $type, $last_delete_sync_sf, $now_sf);
}
else {
$results = (object) $client
->getDeleted($type, $last_delete_sync_sf, $now_sf);
}
if (empty($results->deletedRecords)) {
continue;
}
if (!isset($results->errorCode)) {
foreach ($results->deletedRecords as $index => $result) {
$results->deletedRecords[$index] = (object) $result;
}
$merge_fields = array(
'old_contact_key' => 'Old_Contact__c',
'merged_contact_key' => 'Contact__c',
);
if (!empty($merge_record_type)) {
drupal_alter('salesforce_pull_entity_merge_fields', $merge_fields);
foreach ($results->deletedRecords as $result) {
foreach ($merged_results['records'] as $merged_result) {
if ($result->id == $merged_result[$merge_fields['old_contact_key']]) {
$queue_item = array(
'merge' => $merged_result,
);
$queue
->createItem($queue_item);
}
}
}
}
foreach ($results->deletedRecords as $result) {
$result = (array) $result;
$result['type'] = $type;
$queue_item = array(
'delete' => $result,
);
$queue
->createItem($queue_item);
}
$next_records_url = isset($results->nextRecordsUrl) ? str_replace($version_path, '', $results->nextRecordsUrl) : FALSE;
while ($next_records_url) {
$new_result = $sfapi
->apiCall($next_records_url);
if (!isset($new_result['errorCode'])) {
if (!empty($merge_record_type)) {
foreach ($results->deletedRecords as $result) {
foreach ($merged_results['records'] as $merged_result) {
if ($result->id == $merged_result[$merge_fields['old_contact_key']]) {
$queue_item = array(
'merge' => $merged_result,
);
$queue
->createItem($queue_item);
}
}
}
}
foreach ($new_result['records'] as $result) {
$queue_item = array(
'delete' => $result,
);
$queue
->createItem($queue_item);
}
}
$next_records_url = isset($new_result->nextRecordsUrl) ? str_replace($version_path, '', $new_result->nextRecordsUrl) : FALSE;
}
variable_set('salesforce_pull_last_sync_' . $type, REQUEST_TIME);
}
}
}
function salesforce_pull_process_merged_records($sf_object) {
$merge_fields = array(
'old_contact_key' => 'Old_Contact__c',
'merged_contact_key' => 'Contact__c',
);
drupal_alter('salesforce_pull_entity_merge_fields', $merge_fields);
$entity_to_delete = salesforce_mapping_object_load_by_sfid($sf_object[$merge_fields['old_contact_key']]);
$merged_entity = salesforce_mapping_object_load_by_sfid($sf_object[$merge_fields['merged_contact_key']]);
if ($entity_to_delete && $merged_entity) {
$wrapper = entity_metadata_wrapper($entity_to_delete->entity_type, $entity_to_delete);
$merged_entity->salesforce_pull = TRUE;
module_invoke_all('salesforce_pull_entity_merge', $entity_to_delete, $merged_entity);
watchdog('Salesforce Pull', 'Merged entity %label with ID: %id associated with Salesforce Object ID: %sfid to %label with ID: %newid associated with Salesforce Object ID: %newsfid', array(
'%label' => $wrapper
->label(),
'%id' => $entity_to_delete->entity_id,
'%sfid' => $sf_object[$merge_fields['old_contact_key']],
'%newid' => $merged_entity->entity_id,
'%newsfid' => $sf_object[$merge_fields['merged_contact_key']],
));
}
}
function salesforce_pull_process_deleted_records($sf_object) {
$mapping_object = salesforce_mapping_object_load_by_sfid($sf_object['id']);
if ($mapping_object) {
$deleted_entity = entity_load_single($mapping_object->entity_type, $mapping_object->entity_id);
if ($deleted_entity) {
$wrapper = entity_metadata_wrapper($mapping_object->entity_type, $deleted_entity);
$sf_mapping = reset(salesforce_mapping_load_multiple(array(
'salesforce_object_type' => $sf_object['type'],
'drupal_entity_type' => $wrapper
->type(),
'drupal_bundle' => $wrapper
->getBundle(),
)));
if (!empty($sf_mapping) && $sf_mapping->sync_triggers & SALESFORCE_MAPPING_SYNC_SF_DELETE) {
$deleted_entity->salesforce_pull = TRUE;
try {
$wrapper
->delete();
} catch (Exception $e) {
watchdog_exception('salesforce_pull', $e);
}
watchdog('Salesforce Pull', 'Deleted entity %label with ID: %id associated with Salesforce Object ID: %sfid', array(
'%label' => $wrapper
->label(),
'%id' => $mapping_object->entity_id,
'%sfid' => $sf_object['id'],
));
}
}
$mapping_object
->delete();
}
}
function salesforce_pull_check_merged_records($merge_record_type) {
$sfapi = salesforce_get_api();
$merge_fields = array(
'type' => $merge_record_type,
'merged_contact_key' => 'Contact__c',
'old_contact_key' => 'Old_Contact__c',
'modified_date' => 'LastModifiedDate',
);
drupal_alter('salesforce_pull_entity_merge_fields', $merge_fields);
$objInfo = $sfapi
->objects(array(
'name' => $merge_fields['type'],
));
if (empty($objInfo)) {
return array(
'records' => array(),
);
}
$last_delete_sync = variable_get('salesforce_pull_delete_last_' . $merge_fields['type'], REQUEST_TIME);
$soql = new SalesforceSelectQuery($merge_fields['type']);
$soql->fields = array(
'Id' => 'Id',
$merge_fields['merged_contact_key'] => $merge_fields['merged_contact_key'],
$merge_fields['old_contact_key'] => $merge_fields['old_contact_key'],
$merge_fields['modified_date'] => $merge_fields['modified_date'],
);
$sf_last_sync = variable_get('salesforce_pull_last_sync_' . $merge_fields['type'], NULL);
if ($sf_last_sync) {
$last_sync = gmdate('Y-m-d\\TH:i:s\\Z', $sf_last_sync);
$soql
->addCondition($last_delete_sync, $last_sync, '>');
}
$results = $sfapi
->query($soql);
$version_path = parse_url($sfapi
->getApiEndPoint(), PHP_URL_PATH);
if (!isset($results['errorCode'])) {
$merged_records = $results;
$next_records_url = isset($results['nextRecordsUrl']) ? str_replace($version_path, '', $results['nextRecordsUrl']) : FALSE;
while ($next_records_url) {
$new_result = $sfapi
->apiCall($next_records_url);
if (!isset($new_result['errorCode'])) {
foreach ($new_result['records'] as $result) {
$merged_records[] = $result;
}
}
$next_records_url = isset($new_result['nextRecordsUrl']) ? str_replace($version_path, '', $new_result['nextRecordsUrl']) : FALSE;
}
return $merged_records;
}
else {
return array(
'records' => array(),
);
}
}
function salesforce_pull_map_fields($field_maps, &$entity_wrapper, $sf_object) {
$types = salesforce_mapping_get_fieldmap_types();
foreach ($field_maps as $field_map) {
$fieldmap_type = $field_map['drupal_field']['fieldmap_type'];
if (!isset($types[$fieldmap_type]['pull_value_callback']) || !in_array($field_map['direction'], array(
'sync',
'sf_drupal',
)) || !function_exists($types[$fieldmap_type]['pull_value_callback'])) {
continue;
}
$drupal_fields_array = explode(':', $field_map['drupal_field']['fieldmap_value']);
$parent = $entity_wrapper;
foreach ($drupal_fields_array as $drupal_field) {
if ($parent instanceof EntityListWrapper) {
$child_wrapper = $parent
->get(0)->{$drupal_field};
}
else {
$child_wrapper = $parent->{$drupal_field};
}
$parent = $child_wrapper;
}
$fieldmap_type = salesforce_mapping_get_fieldmap_types($field_map['drupal_field']['fieldmap_type']);
$value = call_user_func($fieldmap_type['pull_value_callback'], $parent, $sf_object, $field_map);
drupal_alter('salesforce_pull_entity_value', $value, $field_map, $sf_object);
try {
if ($parent instanceof EntityListWrapper) {
if (empty($value)) {
$value = array();
}
elseif (!is_array($value)) {
$value = array(
$value,
);
}
}
if (empty($value) && count($drupal_fields_array) > 1 && empty($child_wrapper
->info()['parent']
->info()['required']) && !empty($child_wrapper
->info()['required'])) {
$child_wrapper
->info()['parent']
->set($value);
}
else {
$parent
->set($value);
}
} catch (Exception $e) {
$message = t('Exception during pull for @sfobj.@sffield @sfid to @dobj.@dprop @did with value @v: @e', array(
'@sfobj' => $sf_object['attributes']['type'],
'@sffield' => $field_map['salesforce_field']['name'],
'@sfid' => $sf_object['Id'],
'@dobj' => $entity_wrapper
->type(),
'@dprop' => $field_map['drupal_field']['fieldmap_value'],
'@did' => $entity_wrapper
->getIdentifier(),
'@v' => $value,
'@e' => $e
->getMessage(),
));
throw new Exception($message, $e
->getCode(), $e);
}
}
}