You are here

FeedImportSQLHashes.php in Feed Import 8

File

feed_import_base/src/FeedImportSQLHashes.php
View source
<?php

namespace Drupal\feed_import_base;


/**
 * This class implements SQL hash storage
 */
class FeedImportSQLHashes extends FeedImportHashManager {

  // Feed machine name.
  protected $feedName;

  // Entity name.
  protected $entity;

  // Insert format.
  protected $insertFormat;

  // Update format.
  protected $updateFormat;

  // Reuseable db objects.
  protected $select;
  protected $insert;
  protected $update;

  // Options
  protected $updateChunkSize = 500;
  protected $insertChunkSize = 500;
  protected $group = '';

  /**
   * {@inheritdoc}
   */
  public function __construct($entity_name, $feed_machine_name) {
    $this->entity = $entity_name;
    $this->feedName = $feed_machine_name;
  }

  /**
   * {@inheritdoc}
   */
  public function setOptions(array $options, $overwrite = FALSE) {
    if (isset($options['ttl']) && $options['ttl'] >= 0) {
      $this->ttl = (int) $options['ttl'];
    }
    if (isset($options['update_chunk']) && $options['update_chunk'] > 0) {
      $this->updateChunkSize = (int) $options['update_chunk'];
    }
    if (isset($options['insert_chunk']) && $options['insert_chunk'] > 0) {
      $this->insertChunkSize = (int) $options['insert_chunk'];
    }
    if (isset($options['group'])) {
      $this->group = $options['group'];
    }

    // Create insert format.
    $this->insertFormat = array(
      $this->feedName,
      $this->group,
      $this->entity,
      0,
      // holds id
      NULL,
      // holds hash
      0,
    );

    // Create select object.
    $this->select = db_select('feed_import_hashes', 'f')
      ->fields('f', array(
      'hash',
      'id',
      'entity_id',
      'expire',
    ))
      ->condition('feed_group', $this->group)
      ->condition('entity', $this->entity);

    // Create update object.
    $this->update = db_update('feed_import_hashes')
      ->condition('feed_group', $this->group)
      ->condition('entity', $this->entity);

    // Create update format.
    $this->updateFormat = array(
      'expire' => 0,
    );

    // Create insert object.
    $this->insert = db_insert('feed_import_hashes')
      ->fields(array(
      'feed_machine_name',
      'feed_group',
      'entity',
      'entity_id',
      'hash',
      'expire',
    ));
  }

  /**
   * {@inheritdoc}
   */
  public function hash(&$uniq) {
    return $this->generatedHashes[] = md5($uniq);
  }

  /**
   * {@inheritdoc}
   */
  public function get() {
    if (!$this->generatedHashes) {
      return array();
    }

    // Get select conditions (reuse same select object).
    $cond =& $this->select
      ->conditions();

    // Remove last condition.
    array_pop($cond);

    // Return hashes.
    $hashes = $this->select
      ->condition('hash', $this->generatedHashes, 'IN')
      ->execute()
      ->fetchAllAssoc('hash');
    $this->generatedHashes = array();
    return $hashes;
  }

  // Items left to insert.
  protected $toInsert = 0;

  /**
   * {@inheritdoc}
   */
  public function insert($id, $hash) {
    $this->insertFormat[3] = $id;
    $this->insertFormat[4] = $hash;
    $this->insertFormat[5] = $this->ttl ? $this->ttl + time() : 0;
    $this->insert
      ->values($this->insertFormat);

    // Commit insert.
    if (++$this->toInsert == $this->insertChunkSize) {
      $this
        ->insertCommit();
    }
  }

  /**
   * {@inheritdoc}
   */
  public function insertCommit() {
    if ($this->toInsert) {
      $this->insert
        ->execute();
      $this->toInsert = 0;
    }
  }

  // Ids that need to be updated.
  protected $updateIds = array();
  protected $toUpdate = 0;

  /**
   * {@inheritdoc}
   */
  public function update($id) {
    $this->updateIds[] = $id;
    if (++$this->toUpdate == $this->updateChunkSize) {
      $this
        ->_update($this->updateIds, $this->ttl ? $this->ttl + time() : 0);
      $this->toUpdate = 0;
    }
  }

  // Ids that need to be protected.
  protected $protectIds = array();
  protected $toProtect = 0;

  /**
   * {@inheritdoc}
   */
  public function protect($id) {
    $this->protectIds[] = $id;
    if (++$this->toProtect == $this->updateChunkSize) {
      $this
        ->_update($this->protectIds, static::MARK_PROTECTED);
      $this->toProtect = 0;
    }
  }

  /**
   * {@inheritdoc}
   */
  public function updateCommit() {
    if ($this->toUpdate) {
      $this
        ->_update($this->updateIds, $this->ttl ? $this->ttl + time() : 0);
      $this->toUpdate = 0;
    }
    if ($this->toProtect) {
      $this
        ->_update($this->protectIds, static::MARK_PROTECTED);
      $this->toProtect = 0;
    }
  }

  /**
   * Makes an update.
   */
  protected function _update(array &$ids, $expire) {
    $this->updateFormat['expire'] = $expire;
    $this->update
      ->fields($this->updateFormat);

    // Get update conditions.
    $conditions =& $this->update
      ->conditions();
    if (count($ids) <= $this->updateChunkSize) {

      // Update hashes.
      $this->update
        ->condition('id', $ids, 'IN')
        ->execute();

      // Remove last IN condition.
      array_pop($conditions);

      // Clear array.
      $ids = array();
    }
    else {

      // Split ids in chunks.
      $ids = array_chunk($ids, $this->updateChunkSize);
      for ($i = 0, $m = count($ids); $i < $m; $i++) {

        // Update hashes.
        $this->update
          ->condition('id', $ids[$i], 'IN')
          ->execute();
        unset($ids[$i]);

        // Remove last IN condition.
        array_pop($conditions);
      }
    }
  }

  /**
   * {@inheritdoc}
   */
  public static function delete(array $ids) {
    db_delete('feed_import_hashes')
      ->condition('id', $ids)
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public static function deleteByFeed($name) {
    db_delete('feed_import_hashes')
      ->condition('feed_machine_name', $name)
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public static function deleteByGroup($group) {
    db_delete('feed_import_hashes')
      ->condition('feed_group', $group)
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public static function deleteEntities($ids, $entity_type) {
    db_delete('feed_import_hashes')
      ->condition('entity', $entity_type)
      ->condition('entity_id', $ids)
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public static function getExpired($name, $max = 0) {
    $q = db_select('feed_import_hashes', 'f')
      ->fields('f', array(
      'entity',
      'id',
      'entity_id',
    ))
      ->condition('feed_machine_name', $name)
      ->condition('expire', array(
      static::MARK_PROTECTED + 1,
      time(),
    ), 'BETWEEN');
    if ($max) {
      $q
        ->range(0, $max);
    }
    $q = $q
      ->execute();
    $ret = array();
    while ($r = $q
      ->fetch(PDO::FETCH_ASSOC)) {
      $ret[$r['entity']][$r['id']] = $r['entity_id'];
    }
    return $ret;
  }

  /**
   * {@inheritdoc}
   */
  public static function rescheduleAll($name, $ttl) {
    if ($ttl) {
      $ttl += time();
    }
    db_update('feed_import_hashes')
      ->condition('feed_machine_name', $name)
      ->condition('expire', static::MARK_PROTECTED, '>')
      ->fields(array(
      'expire' => $ttl,
    ))
      ->execute();
  }

  /**
   * {@inheritdoc}
   */
  public static function totalHashes($name = NULL) {
    $q = db_select('feed_import_hashes', 'f')
      ->fields('f', array(
      'feed_machine_name',
    ));
    if ($name) {
      $q
        ->condition('feed_machine_name', $name);
    }
    $q
      ->addExpression('COUNT(*)', 'cnt');
    $q = $q
      ->groupBy('feed_machine_name')
      ->execute()
      ->fetchAllKeyed();
    if ($name && is_scalar($name)) {
      return isset($q[$name]) ? $q[$name] : 0;
    }
    return $q;
  }

}

Classes

Namesort descending Description
FeedImportSQLHashes This class implements SQL hash storage