View source
<?php
define('SALESFORCE_PATH_ADMIN_IMPORT', SALESFORCE_PATH_ADMIN . '/import');
function sf_import_menu() {
$items[SALESFORCE_PATH_ADMIN_IMPORT] = array(
'title' => 'Import',
'description' => 'Configure settings for regular imports of data.',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'sf_import_settings_form',
),
'access arguments' => array(
'administer salesforce',
),
'file' => 'sf_import.admin.inc',
'type' => MENU_LOCAL_TASK,
);
$items[SALESFORCE_PATH_ADMIN_IMPORT . '/create'] = array(
'title' => 'Batch Import',
'description' => 'Create a one-time batch import of Salesforce data',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'sf_import_create',
),
'access arguments' => array(
'administer salesforce',
),
'file' => 'sf_import.admin.inc',
'type' => MENU_LOCAL_TASK,
);
$items[SALESFORCE_PATH_ADMIN_IMPORT . '/manual'] = array(
'title' => 'Manual Import',
'description' => 'Import Salesforce records manually.',
'page callback' => 'drupal_get_form',
'page arguments' => array(
'sf_import_manual',
),
'access arguments' => array(
'administer salesforce',
),
'file' => 'sf_import.admin.inc',
'type' => MENU_LOCAL_TASK,
);
$items[SALESFORCE_PATH_ADMIN_IMPORT . '/overview'] = array(
'title' => 'Import Settings',
'access arguments' => array(
'administer salesforce',
),
'file' => 'sf_import.admin.inc',
'type' => MENU_DEFAULT_LOCAL_TASK,
'weight' => -10,
);
return $items;
}
function sf_import_help($path, $arg) {
switch ($path) {
case 'admin/settings/salesforce/import':
return '<p>' . t('Select the Fieldmaps you would like to use for regular
imports of data from Salesforce to Drupal. <strong>This feature requires
a properly configured cron job.</strong> Once fieldmaps have been selected,
additional information regarding pending imports is available, along with
the ability to manually trigger imports. <br />You can also run a one-time
batch import of data from Salesforce to Drupal by visiting the "Batch Import" page.') . '</p>';
}
}
function sf_import_cron() {
if ($sf = salesforce_api_connect()) {
if (variable_get('sf_import_cron_import', 1)) {
sf_import_import_records();
}
if (variable_get('sf_import_cron_process', 1)) {
_sf_import_process_records();
}
}
}
function _sf_import_get_soql_records($map, $start, $end) {
if (!$map->salesforce) {
return FALSE;
}
$date_time = gmdate(DATE_ATOM, $start);
$default_soql = 'SELECT Id, LastModifiedDate FROM ' . $map->salesforce . ' WHERE LastModifiedDate > ' . $date_time;
$soql_where = variable_get('sf_import_' . $map->name . '_soql', NULL);
$soql = NULL;
$order_by = ' ORDER BY LastModifiedDate DESC';
is_null($soql_where) ? $soql = $default_soql . $order_by : ($soql = $default_soql . ' AND ' . $soql_where . $order_by);
if ($soql) {
$results = salesforce_api_query($soql, array(
'queryAll' => FALSE,
'queryMore' => TRUE,
'limit' => 2000,
));
salesforce_api_log(SALESFORCE_LOG_ALL, 'Query to retreive updated records: %soql. !count result(s) returned: <pre>%ret</pre>', array(
'%soql' => print_r($soql, TRUE),
'!count' => count($results),
'%ret' => print_r($results, TRUE),
));
if (empty($results)) {
return FALSE;
}
}
$response = new stdClass();
$records = array();
foreach ($results as $key => $object) {
$last_import = db_result(db_query("SELECT last_import FROM {salesforce_object_map} WHERE sfid = '%s'", $object->Id));
if ($last_import < strtotime($object->LastModifiedDate)) {
$records[] = $object->Id;
}
}
$response->ids = $records;
$response->latestDateCovered = $results[0]->LastModifiedDate;
return $response;
}
function sf_import_import_records() {
$fieldmaps = variable_get('sf_import_fieldmaps', salesforce_api_salesforce_field_map_load_all());
$active_fieldmaps = array();
foreach ($fieldmaps as $map_key => $map_value) {
if ($map_value !== 0) {
$active_fieldmaps[$map_key] = $map_value;
}
}
if (!$active_fieldmaps) {
return FALSE;
}
$records = array();
foreach ($active_fieldmaps as $map) {
$map = salesforce_api_salesforce_field_map_load($map);
$sql = "SELECT time FROM {sf_import_queue} ORDER BY time DESC LIMIT 1";
$start = db_result(db_query($sql));
if (!$start) {
$start = variable_get('sf_import_queue_last_import', time() - 3600);
}
$end = time();
$import_method = variable_get('sf_import_' . $map->name . '_update_method', 'get_updated');
if ($end - $start < 3600 && $import_method == 'get_updated') {
$start = $start - 3600;
}
variable_set('sf_import_queue_last_import', time());
$import_function = $import_method == 'get_updated' ? 'salesforce_api_get_updated' : '_sf_import_get_soql_records';
if ($updates = $import_function($map, $start, $end)) {
$update_sfids = $updates->ids;
if (!is_array($update_sfids)) {
$update_sfids = array(
$update_sfids,
);
}
foreach ($update_sfids as $sfid) {
$sql = "SELECT sfid FROM {sf_import_queue} WHERE sfid = '%s'";
$exists = db_result(db_query($sql, $sfid));
if (!$exists) {
$object->time = time();
$object->sfid = $sfid;
$object->fieldmap = $map->name;
$ret = drupal_write_record('sf_import_queue', $object);
}
$records[] = array(
$sfid,
$map->name,
time(),
);
}
}
}
if (count($records) > 0) {
variable_set('sf_import_queue_import_count', count($records));
return $records;
}
else {
variable_set('sf_import_queue_import_count', 0);
return FALSE;
}
}
function _sf_import_process_records() {
$fieldmaps = salesforce_api_salesforce_field_map_load_all();
$records = array();
$sql = "SELECT sfid, fieldmap FROM {sf_import_queue}";
while ($sfids = db_fetch_object(db_query($sql))) {
$fieldmap = $sfids->fieldmap;
$type = $fieldmaps[$fieldmap]->drupal;
if (strpos($type, 'node_') === 0) {
$type = 'node';
}
$function = 'sf_' . $type . '_import';
$drupal_id = salesforce_api_get_id_with_sfid($sfids->sfid, $type);
if (function_exists($function)) {
$oid = $function($sfids->sfid, $sfids->fieldmap, $drupal_id);
$records[] = array(
$sfids->sfid,
$oid,
$sfids->fieldmap,
);
}
db_query("DELETE FROM {sf_import_queue} WHERE sfid = '%s'", $sfids->sfid);
}
if (count($records) > 0) {
variable_set('sf_import_queue_processed_count', count($records));
return $records;
}
else {
variable_set('sf_import_queue_processed_count', 0);
return FALSE;
}
}