View source
<?php
define('SALESFORCE_PUSH_QUEUE', 'salesforce_push');
function salesforce_push_entity_insert($entity, $type) {
salesforce_push_entity_crud($type, $entity, SALESFORCE_MAPPING_SYNC_DRUPAL_CREATE);
}
function salesforce_push_entity_update($entity, $type) {
salesforce_push_entity_crud($type, $entity, SALESFORCE_MAPPING_SYNC_DRUPAL_UPDATE);
}
function salesforce_push_entity_delete($entity, $type) {
salesforce_push_entity_crud($type, $entity, SALESFORCE_MAPPING_SYNC_DRUPAL_DELETE);
}
function salesforce_push_entity_crud($entity_type, $entity, $sf_sync_trigger) {
if (isset($entity->salesforce_pull) && $entity->salesforce_pull) {
return FALSE;
}
list($entity_id, , $bundle) = entity_extract_ids($entity_type, $entity);
$mappings = salesforce_mapping_load_multiple(array(
'drupal_entity_type' => $entity_type,
'drupal_bundle' => $bundle,
));
foreach ($mappings as $mapping) {
if ($mapping->sync_triggers & $sf_sync_trigger) {
foreach (module_implements('salesforce_push_entity_allowed') as $module) {
if (module_invoke($module, 'salesforce_push_entity_allowed', $entity_type, $entity, $sf_sync_trigger, $mapping) === FALSE) {
continue 2;
}
}
if (isset($mapping->push_async) && $mapping->push_async) {
$queue = DrupalQueue::get(SALESFORCE_PUSH_QUEUE);
$queue
->createItem(array(
'entity_type' => $entity_type,
'entity_id' => $entity_id,
'mapping' => $mapping,
'trigger' => $sf_sync_trigger,
));
continue;
}
salesforce_push_sync_rest($entity_type, $entity, $mapping, $sf_sync_trigger);
}
}
}
function salesforce_push_form_salesforce_mapping_object_form_alter(&$form, &$form_state) {
$form['salesforce_id']['#required'] = FALSE;
$form['actions']['push'] = array(
'#type' => 'submit',
'#value' => t('Push'),
'#submit' => array(
'salesforce_push_mapping_object_form_submit',
),
);
}
function salesforce_push_mapping_object_form_submit($form, &$form_state) {
$values = $form_state['values'];
$entity_type = $values['entity_type'];
$entity_id = $values['entity_id'];
$entity = entity_load_single($entity_type, $entity_id);
$trigger = SALESFORCE_MAPPING_SYNC_DRUPAL_UPDATE;
salesforce_push_entity_crud($entity_type, $entity, $trigger);
}
function salesforce_push_sync_rest($entity_type, $entity, $mapping, $sf_sync_trigger) {
$sfapi = salesforce_get_api();
if (!$sfapi
->isAuthorized()) {
return;
}
list($entity_id) = entity_extract_ids($entity_type, $entity);
$entity_wrapper = entity_metadata_wrapper($entity_type, $entity);
$mapping_object = salesforce_mapping_object_load_by_drupal($entity_type, $entity_id, TRUE);
drupal_alter('salesforce_push_mapping_object', $mapping_object, $entity, $mapping);
$synced_entity = array(
'entity_wrapper' => $entity_wrapper,
'mapping_object' => $mapping_object,
'queue_item' => FALSE,
'mapping' => $mapping,
);
$op = '';
if ($sf_sync_trigger == SALESFORCE_MAPPING_SYNC_DRUPAL_DELETE) {
if ($mapping_object) {
$op = 'Delete';
try {
$sfapi
->objectDelete($mapping->salesforce_object_type, $mapping_object->salesforce_id);
} catch (SalesforceException $e) {
$exception_message = t('Salesforce Exception during @event attempting @op : @exception');
$message_vars = array(
'@event' => 'push',
'@op' => $op,
'@exception' => $e
->getMessage(),
);
watchdog_exception('salesforce_push', $e, $exception_message, $message_vars);
salesforce_set_message(format_string($exception_message, $message_vars), 'error');
module_invoke_all('salesforce_push_fail', $op, $sfapi->response, $synced_entity);
}
salesforce_set_message(t('Salesforce object %sfid has been deleted.', array(
'%sfid' => $mapping_object->salesforce_id,
)));
$mapping_object
->delete();
module_invoke_all('salesforce_push_success', $op, $sfapi->response, $synced_entity);
}
return;
}
$key_field = $key_value = NULL;
$params = salesforce_push_map_params($mapping, $entity_wrapper, FALSE, !$mapping_object);
if (!$mapping_object) {
if ($mapping->salesforce_record_type_default != SALESFORCE_MAPPING_DEFAULT_RECORD_TYPE && !array_key_exists('RecordTypeId', $params) && $mapping->salesforce_object_type != 'CampaignMember') {
$params['RecordTypeId'] = $mapping->salesforce_record_type_default;
}
try {
if (!empty($mapping->dedupe_key) && !empty($params[$mapping->dedupe_key])) {
$key_value = $params[$mapping->dedupe_key];
$encoded_value = urlencode($key_value);
$encoded_value = str_replace('.', '%2E', $encoded_value);
$op = 'Upsert';
$data = $sfapi
->objectUpsert($mapping->salesforce_object_type, $mapping->dedupe_key, $encoded_value, $params);
switch ($sfapi->response->code) {
case '204':
$sf_object = $sfapi
->objectReadbyExternalId($mapping->salesforce_object_type, $mapping->dedupe_key, $encoded_value);
$data['id'] = $sf_object['Id'];
break;
case '300':
$data['errorCode'] = $sfapi->response->error . ' (' . $mapping->dedupe_key . ':' . $key_value . ')';
break;
}
}
else {
$op = 'Create';
$data = $sfapi
->objectCreate($mapping->salesforce_object_type, $params);
}
} catch (SalesforceException $e) {
$exception_message = t('Salesforce Exception during @event attempting @op : @exception');
$message_vars = array(
'@event' => 'push',
'@op' => $op,
'@exception' => $e
->getMessage(),
);
watchdog_exception('salesforce_push', $e, $exception_message, $message_vars);
salesforce_set_message(format_string($exception_message, $message_vars), 'error');
module_invoke_all('salesforce_push_fail', $op, $sfapi->response, $synced_entity);
return;
}
if (empty($data['errorCode'])) {
$mapping_object = entity_create('salesforce_mapping_object', array(
'entity_id' => $entity_id,
'entity_type' => $entity_type,
'salesforce_id' => $data['id'],
'last_sync_message' => t('Mapping object created via !function.', array(
'!function' => __FUNCTION__,
)),
'last_sync_status' => SALESFORCE_MAPPING_STATUS_SUCCESS,
));
module_invoke_all('salesforce_push_success', $op, $sfapi->response, $synced_entity);
}
else {
$message = t('Failed to sync %label with Salesforce. @code: @message', array(
'%label' => $entity_wrapper
->label(),
'@code' => $data['errorCode'],
'@message' => $data['message'],
));
salesforce_set_message($message, 'error');
watchdog('salesforce_push', $message);
module_invoke_all('salesforce_push_fail', $op, $sfapi->response, $synced_entity);
return;
}
}
else {
if ($mapping_object->last_sync > $mapping_object->entity_updated) {
return;
}
try {
$op = 'Update';
$sfapi
->objectUpdate($mapping->salesforce_object_type, $mapping_object->salesforce_id, $params);
$mapping_object->last_sync_message = t('Mapping object updated via !function.', array(
'!function' => __FUNCTION__,
));
$mapping_object->last_sync_status = SALESFORCE_MAPPING_STATUS_SUCCESS;
salesforce_set_message(t('%name has been synchronized with Salesforce record %sfid.', array(
'%name' => $entity_wrapper
->label(),
'%sfid' => $mapping_object->salesforce_id,
)));
module_invoke_all('salesforce_push_success', $op, $sfapi->response, $synced_entity);
} catch (SalesforceException $e) {
$exception_message = t('Error syncing @type @id to Salesforce record @sfid. Error message: "@msg". @extra');
$message_vars = array(
'@type' => $mapping_object->entity_type,
'@id' => $mapping_object->entity_id,
'@sfid' => $mapping_object->salesforce_id,
'@msg' => $e
->getMessage(),
'@extra' => '',
);
if (in_array($e
->getMessage(), array(
"invalid cross reference id",
"Not Found",
))) {
$message_vars['@extra'] = "Are you trying to sync with an object from a different Salesforce instance?";
}
watchdog_exception('salesforce_push', $e, $exception_message, $message_vars);
salesforce_set_message(format_string($exception_message, $message_vars), 'error');
$mapping_object->last_sync_message = truncate_utf8($e
->getMessage(), 255, FALSE, TRUE);
$mapping_object->last_sync_status = SALESFORCE_MAPPING_STATUS_ERROR;
module_invoke_all('salesforce_push_fail', $op, $sfapi->response, $synced_entity);
}
}
$mapping_object->last_sync_action = 'push';
$mapping_object->last_sync = REQUEST_TIME;
$mapping_object
->save();
}
function salesforce_push_queue_info() {
$queues[SALESFORCE_PUSH_QUEUE] = array(
'title' => t('Salesforce push queue'),
);
return $queues;
}
function salesforce_push_cron() {
$sfapi = salesforce_get_api();
if (!$sfapi
->isAuthorized()) {
return;
}
$queue = DrupalQueue::get(SALESFORCE_PUSH_QUEUE);
$limit = variable_get('salesforce_push_limit', 50);
$use_soap = module_exists('salesforce_soap') && variable_get('salesforce_soap_batch_enable', TRUE);
$entity_ids = array();
$delta = 0;
for ($i = 0; $i < $limit; $i++) {
$item = $queue
->claimItem();
if (!$item) {
break;
}
$mapping = $item->data['mapping'];
if (!isset($entity_ids[$item->data['entity_type']])) {
$entity_ids[$item->data['entity_type']] = array();
}
$entity_type = $item->data['entity_type'];
$entity_id = $item->data['entity_id'];
if (in_array($entity_id, $entity_ids[$item->data['entity_type']])) {
$queue
->deleteItem($item);
continue;
}
$entity = entity_load_single($entity_type, $entity_id);
if ($entity === FALSE) {
$queue
->deleteItem($item);
continue;
}
$entity_ids[$item->data['entity_type']][] = $entity_id;
if (!$use_soap) {
salesforce_push_sync_rest($entity_type, $entity, $mapping, $item->data['trigger']);
$queue
->deleteItem($item);
continue;
}
$mapping_object = salesforce_mapping_object_load_by_drupal($entity_type, $entity_id);
drupal_alter('salesforce_push_mapping_object', $mapping_object, $entity, $mapping);
if ($item->data['trigger'] == SALESFORCE_MAPPING_SYNC_DRUPAL_DELETE && $mapping_object) {
$delete_list[$delta] = $mapping_object->salesforce_id;
continue;
}
$wrapper = entity_metadata_wrapper($item->data['entity_type'], $entity);
$key_field = $key_value = NULL;
$params = salesforce_push_map_params($mapping, $wrapper, $use_soap, !$mapping_object);
$synced_entities[$delta] = array(
'entity_wrapper' => $wrapper,
'mapping_object' => $mapping_object,
'queue_item' => $item,
'mapping' => $mapping,
);
if ($mapping->salesforce_record_type_default != SALESFORCE_MAPPING_DEFAULT_RECORD_TYPE && empty($params['RecordTypeId']) && $mapping->salesforce_object_type != 'CampaignMember') {
$params['RecordTypeId'] = $mapping->salesforce_record_type_default;
}
$sobject = new stdClass();
$sobject->type = $mapping->salesforce_object_type;
foreach ($params as $key => $value) {
$sobject->fields[$key] = $value;
}
if ($mapping_object && $mapping_object->salesforce_id) {
$sobject->Id = $mapping_object->salesforce_id;
$update_list[$delta] = $sobject;
continue;
}
if ($mapping->dedupe_key && !empty($params[$mapping->dedupe_key])) {
$upsert_list[$mapping->dedupe_key][$delta] = $sobject;
}
else {
$create_list[$delta] = $sobject;
}
$queue
->deleteItem($item);
$delta++;
}
if (!$use_soap) {
return;
}
$soap = new SalesforceSoapPartner($sfapi, variable_get('salesforce_partner_wsdl', libraries_get_path('salesforce') . '/soapclient/partner.wsdl.xml'));
if (!empty($delete_list)) {
$results = $soap - trySoap('delete', $delete_list);
salesforce_push_process_soap_results('Delete', $results, $synced_entities);
}
if (!empty($create_list)) {
$results = $soap
->trySoap('create', $create_list);
salesforce_push_process_soap_results('Create', $results, $synced_entities);
}
if (!empty($update_list)) {
$results = $soap
->trySoap('update', $update_list);
salesforce_push_process_soap_results('Update', $results, $synced_entities);
}
if (!empty($upsert_list)) {
foreach ($upsert_list as $key => $upsert_item) {
$results = $soap
->trySoap('upsert', $key, $upsert_item);
salesforce_push_process_soap_results('Upsert', $results, $synced_entities);
}
}
}
function salesforce_push_process_soap_results($op, $results, $synced_entities) {
foreach ($results as $key => $result) {
$synced_entity = $synced_entities[$key];
$mapping_object = empty($synced_entity['mapping_object']) ? FALSE : $synced_entity['mapping_object'];
if ($result->success) {
if (drupal_strtolower($op) == 'delete' && $mapping_object) {
$mapping_object
->delete();
return;
}
if (!$mapping_object) {
$wrapper = $synced_entity['entity_wrapper'];
list($entity_id) = entity_extract_ids($wrapper
->type(), $wrapper
->value());
$mapping_object = entity_create('salesforce_mapping_object', array(
'entity_id' => $entity_id,
'entity_type' => $wrapper
->type(),
'salesforce_id' => $result->id,
'last_sync_message' => t('Mapping object created via !function.', array(
'!function' => __FUNCTION__,
)),
));
}
else {
$mapping_object->last_sync_message = t('Mapping object updated via !function.', array(
'!function' => __FUNCTION__,
));
}
$mapping_object->last_sync_status = SALESFORCE_MAPPING_STATUS_SUCCESS;
$mapping_object->last_sync = REQUEST_TIME;
$mapping_object->last_sync_action = 'push';
$mapping_object
->save();
watchdog('salesforce_push', '%op: Salesforce object %id', array(
'%id' => $result->id,
'%op' => $op,
));
module_invoke_all('salesforce_push_success', $op, $result, $synced_entity);
}
else {
$error_messages = array();
foreach ($result->errors as $error) {
watchdog('salesforce_push', '%op error for Salesforce object %id. @code: @message', array(
'%id' => $result->id,
'@code' => $error->statusCode,
'@message' => $error->message,
'%op' => $op,
), WATCHDOG_ERROR);
$error_messages[] = $error->message;
}
if ($mapping_object) {
$mapping_object->last_sync = REQUEST_TIME;
$mapping_object->last_sync_action = 'push';
$mapping_object->last_sync_status = SALESFORCE_MAPPING_STATUS_ERROR;
$mapping_object->last_sync_message = truncate_utf8(t('Push error via %function. Message: @message', array(
'%function' => __FUNCTION__,
'@message' => implode(' | ', $error_messages),
)), 255, FALSE, TRUE);
$mapping_object
->save();
}
module_invoke_all('salesforce_push_fail', $op, $result, $synced_entity);
}
}
}
function salesforce_push_map_params($mapping, $entity_wrapper, $use_soap = FALSE, $is_new = TRUE) {
foreach ($mapping->field_mappings as $fieldmap) {
if (!in_array($fieldmap['direction'], array(
SALESFORCE_MAPPING_DIRECTION_DRUPAL_SF,
SALESFORCE_MAPPING_DIRECTION_SYNC,
))) {
continue;
}
if (!$is_new && !$fieldmap['salesforce_field']['updateable']) {
continue;
}
$fieldmap_type = salesforce_mapping_get_fieldmap_types($fieldmap['drupal_field']['fieldmap_type']);
$value = call_user_func($fieldmap_type['push_value_callback'], $fieldmap, $entity_wrapper);
$params[$fieldmap['salesforce_field']['name']] = $value;
}
drupal_alter('salesforce_push_params', $params, $mapping, $entity_wrapper);
return $params;
}
function salesforce_push_action_info() {
return array(
'salesforce_push_action' => array(
'type' => 'entity',
'label' => t('Push entity to Salesforce'),
'configurable' => FALSE,
'triggers' => array(
'any',
),
),
);
}
function salesforce_push_action(&$entity, $context) {
$trigger = !empty($entity->is_new) && $entity->is_new ? SALESFORCE_MAPPING_SYNC_DRUPAL_CREATE : SALESFORCE_MAPPING_SYNC_DRUPAL_UPDATE;
salesforce_push_entity_crud($context['entity_type'], $entity, $trigger);
}