View source
<?php
namespace Drupal\feed_import_base;
class FeedImportSQLHashes extends FeedImportHashManager {
protected $feedName;
protected $entity;
protected $insertFormat;
protected $updateFormat;
protected $select;
protected $insert;
protected $update;
protected $updateChunkSize = 500;
protected $insertChunkSize = 500;
protected $group = '';
public function __construct($entity_name, $feed_machine_name) {
$this->entity = $entity_name;
$this->feedName = $feed_machine_name;
}
public function setOptions(array $options, $overwrite = FALSE) {
if (isset($options['ttl']) && $options['ttl'] >= 0) {
$this->ttl = (int) $options['ttl'];
}
if (isset($options['update_chunk']) && $options['update_chunk'] > 0) {
$this->updateChunkSize = (int) $options['update_chunk'];
}
if (isset($options['insert_chunk']) && $options['insert_chunk'] > 0) {
$this->insertChunkSize = (int) $options['insert_chunk'];
}
if (isset($options['group'])) {
$this->group = $options['group'];
}
$this->insertFormat = array(
$this->feedName,
$this->group,
$this->entity,
0,
NULL,
0,
);
$this->select = db_select('feed_import_hashes', 'f')
->fields('f', array(
'hash',
'id',
'entity_id',
'expire',
))
->condition('feed_group', $this->group)
->condition('entity', $this->entity);
$this->update = db_update('feed_import_hashes')
->condition('feed_group', $this->group)
->condition('entity', $this->entity);
$this->updateFormat = array(
'expire' => 0,
);
$this->insert = db_insert('feed_import_hashes')
->fields(array(
'feed_machine_name',
'feed_group',
'entity',
'entity_id',
'hash',
'expire',
));
}
public function hash(&$uniq) {
return $this->generatedHashes[] = md5($uniq);
}
public function get() {
if (!$this->generatedHashes) {
return array();
}
$cond =& $this->select
->conditions();
array_pop($cond);
$hashes = $this->select
->condition('hash', $this->generatedHashes, 'IN')
->execute()
->fetchAllAssoc('hash');
$this->generatedHashes = array();
return $hashes;
}
protected $toInsert = 0;
public function insert($id, $hash) {
$this->insertFormat[3] = $id;
$this->insertFormat[4] = $hash;
$this->insertFormat[5] = $this->ttl ? $this->ttl + time() : 0;
$this->insert
->values($this->insertFormat);
if (++$this->toInsert == $this->insertChunkSize) {
$this
->insertCommit();
}
}
public function insertCommit() {
if ($this->toInsert) {
$this->insert
->execute();
$this->toInsert = 0;
}
}
protected $updateIds = array();
protected $toUpdate = 0;
public function update($id) {
$this->updateIds[] = $id;
if (++$this->toUpdate == $this->updateChunkSize) {
$this
->_update($this->updateIds, $this->ttl ? $this->ttl + time() : 0);
$this->toUpdate = 0;
}
}
protected $protectIds = array();
protected $toProtect = 0;
public function protect($id) {
$this->protectIds[] = $id;
if (++$this->toProtect == $this->updateChunkSize) {
$this
->_update($this->protectIds, static::MARK_PROTECTED);
$this->toProtect = 0;
}
}
public function updateCommit() {
if ($this->toUpdate) {
$this
->_update($this->updateIds, $this->ttl ? $this->ttl + time() : 0);
$this->toUpdate = 0;
}
if ($this->toProtect) {
$this
->_update($this->protectIds, static::MARK_PROTECTED);
$this->toProtect = 0;
}
}
protected function _update(array &$ids, $expire) {
$this->updateFormat['expire'] = $expire;
$this->update
->fields($this->updateFormat);
$conditions =& $this->update
->conditions();
if (count($ids) <= $this->updateChunkSize) {
$this->update
->condition('id', $ids, 'IN')
->execute();
array_pop($conditions);
$ids = array();
}
else {
$ids = array_chunk($ids, $this->updateChunkSize);
for ($i = 0, $m = count($ids); $i < $m; $i++) {
$this->update
->condition('id', $ids[$i], 'IN')
->execute();
unset($ids[$i]);
array_pop($conditions);
}
}
}
public static function delete(array $ids) {
db_delete('feed_import_hashes')
->condition('id', $ids)
->execute();
}
public static function deleteByFeed($name) {
db_delete('feed_import_hashes')
->condition('feed_machine_name', $name)
->execute();
}
public static function deleteByGroup($group) {
db_delete('feed_import_hashes')
->condition('feed_group', $group)
->execute();
}
public static function deleteEntities($ids, $entity_type) {
db_delete('feed_import_hashes')
->condition('entity', $entity_type)
->condition('entity_id', $ids)
->execute();
}
public static function getExpired($name, $max = 0) {
$q = db_select('feed_import_hashes', 'f')
->fields('f', array(
'entity',
'id',
'entity_id',
))
->condition('feed_machine_name', $name)
->condition('expire', array(
static::MARK_PROTECTED + 1,
time(),
), 'BETWEEN');
if ($max) {
$q
->range(0, $max);
}
$q = $q
->execute();
$ret = array();
while ($r = $q
->fetch(PDO::FETCH_ASSOC)) {
$ret[$r['entity']][$r['id']] = $r['entity_id'];
}
return $ret;
}
public static function rescheduleAll($name, $ttl) {
if ($ttl) {
$ttl += time();
}
db_update('feed_import_hashes')
->condition('feed_machine_name', $name)
->condition('expire', static::MARK_PROTECTED, '>')
->fields(array(
'expire' => $ttl,
))
->execute();
}
public static function totalHashes($name = NULL) {
$q = db_select('feed_import_hashes', 'f')
->fields('f', array(
'feed_machine_name',
));
if ($name) {
$q
->condition('feed_machine_name', $name);
}
$q
->addExpression('COUNT(*)', 'cnt');
$q = $q
->groupBy('feed_machine_name')
->execute()
->fetchAllKeyed();
if ($name && is_scalar($name)) {
return isset($q[$name]) ? $q[$name] : 0;
}
return $q;
}
}