function sf_queue_handle_upserts in Salesforce Suite 6.2
Same name and namespace in other branches
- 7.2 sf_queue/sf_queue.module \sf_queue_handle_upserts()
Helper function to process queue updates and inserts (creates). The logic proceeds basically like this:
- Get a list of object types that we're going to upsert by eliminating those queue item groups that do not meet the criteria (too many fails, threshold not met, etc.)
- For each object type:
- update up to 200 records at a time
- then, create up to 200 records at a time
- On each iteration of each major loop, break if we have exceeded our allotted processing time limit.
1 call to sf_queue_handle_upserts()
- sf_queue_process_queue in sf_queue/
sf_queue.module
File
- sf_queue/
sf_queue.module, line 267 - sf_queue.module Implements export queue and administrativa for SalesForce API
Code
function sf_queue_handle_upserts($settings) {
$ph_op = db_placeholders(array_filter($settings['cron_operations']), 'text');
//$count_args = $settings['cron_operations'];
$count_sql = 'SELECT sf_type, count(sf_type) as total
FROM {salesforce_export_queue}
WHERE sf_op IN ("create", "update")';
$sql = "SELECT id, oid, fieldmap_name, drupal_type, sf_type, sfid, sf_data\n FROM {salesforce_export_queue}\n WHERE sf_op = '%s' AND sf_type = '%s'";
$count_having = '';
if ($settings['retry_num_attempts'] >= 0) {
$count_sql .= ' AND attempts < %d';
$sql .= ' AND attempts < %d';
$count_args[] = $settings['retry_num_attempts'] + 1;
}
if ($settings['cron_min_threshold']) {
$count_args[] = $settings['cron_min_threshold'];
$count_having = ' HAVING total >= %d';
}
$count_sql .= " GROUP BY sf_type " . $count_having . " ORDER BY total DESC";
error_log(print_r($count_sql, 1));
$count_result = db_query($count_sql, $count_args);
$sql .= ' ORDER BY created LIMIT ' . $settings['cron_num_objects'];
$done = FALSE;
while ($settings['start_time'] + $settings['limit'] > time() && ($row = db_fetch_array($count_result))) {
$sf_type = $row['sf_type'];
foreach (array(
'update',
'create',
) as $op) {
$op_done = FALSE;
$args = array(
$op,
$sf_type,
$settings['retry_num_attempts'] + 1,
);
while ($settings['start_time'] + $settings['limit'] > time() && $op_done == FALSE) {
$items = $ids = $items = $objects = array();
$result = db_query($sql, $args);
while ($queue_item = db_fetch_array($result)) {
$ids[] = $queue_item['id'];
$items[] = $queue_item;
$object = unserialize($queue_item['sf_data']);
if ($op == 'create') {
unset($object->Id);
}
$objects[] = $object;
}
if (count($ids) > 0) {
try {
$sf = salesforce_api_connect();
$responses = $sf->client
->{$op}($objects, $sf_type);
error_log(print_r($responses, 1));
list($wins, $fails) = sf_queue_handle_responses($responses, $items);
} catch (Exception $e) {
sf_queue_handle_exception($e, $responses);
continue;
}
error_log(print_r($wins, 1));
error_log(print_r($fails, 1));
if (count($fails) > 0) {
watchdog('sf_queue', "Failed objects [<pre>" . print_r($objects, TRUE) . "</pre>]");
}
if (count($wins) == 0) {
break;
}
// Instead of using salesforce_api_id_save to generate 2 SQL queries per
// record, we collect them all to do 2 massive queries for all the
// records at once.
// @TODO: salesforce_api_id_save_multi
$delete_sql = 'DELETE FROM {salesforce_object_map} WHERE oid IN (' . db_placeholders($wins) . ')';
$insert_sql = 'INSERT INTO {salesforce_object_map} (drupal_type, oid, sfid, name, last_export) VALUES ';
$row_ph = '("%s", %d, "%s", "%s", %d)';
$rows = '';
$args = array();
//, $drupal_type, $oid, $sfid, $name);
$offset = 0;
foreach ($wins as $sfid => $oid) {
if (!empty($rows)) {
$rows .= ', ';
}
$rows .= $row_ph;
$drupal_type = $items[$offset]['drupal_type'];
if (strpos($drupal_type, 'node_') === 0) {
$drupal_type = 'node';
}
$args[] = $drupal_type;
$args[] = $oid;
$args[] = $sfid;
$args[] = $items[$offset]['fieldmap_name'];
$args[] = time();
$offset++;
}
$insert_sql .= ' ' . $rows;
db_query($delete_sql, $wins);
error_log(print_r($insert_sql, 1));
error_log(print_r($args, 1));
db_query($insert_sql, $args);
}
else {
$op_done = TRUE;
break;
}
}
// while time limit for each operation
}
// foreach operation
}
// while sf_type loop
}