You are here

public function AcquiaPurgeService::process in Acquia Purge 7

Process as many items from the queue as the runtime capacity allows.

Return value

bool Returns TRUE when it processed items, FALSE when the capacity limit has been reached or when the queue is empty and there's nothing left to do.

File

lib/AcquiaPurgeService.php, line 401
Contains AcquiaPurgeService.

Class

AcquiaPurgeService
The Acquia Purge service.

Code

public function process() {

  // Do not even attempt to process when the total counter is zero.
  if ($this
    ->queue()
    ->total()
    ->get() === 0) {
    return FALSE;
  }

  // How much can we safely process during this request?
  $maxitems = $this
    ->capacity()
    ->queueClaimsLimit();
  if ($maxitems < 1) {
    return FALSE;
  }

  // Ask the diagnostic subsystem to identify ACQUIA_PURGE_SEVLEVEL_ERROR
  // level severities, which mandate processing to stop and log the problem.
  if ($err = $this
    ->diagnostics()
    ->isSystemBlocked()) {
    $this
      ->diagnostics()
      ->log($err);
    return FALSE;
  }

  // Claim a number of items we can maximally process during request lifetime.
  if (!($queue_items = $this
    ->queue()
    ->claimItemMultiple($maxitems))) {
    $this
      ->state()
      ->wipe();
    return FALSE;
  }

  // Setup the $succeeded and $failed lists and fill $invalidations. Make sure
  // that already processed paths are immediately dismissed.
  $succeeded = $failed = $invalidations = array();
  foreach ($queue_items as $i => $item) {
    if ($this
      ->deduplicate($item
      ->getPath(), 'purged')) {
      $succeeded[] = $item;
      unset($queue_items[$i]);
      continue;
    }
    foreach ($this
      ->hostingInfo()
      ->getSchemes() as $s) {
      foreach ($this
        ->hostingInfo()
        ->getDomains() as $d) {
        $invalidations[] = $item
          ->getInvalidation($s, $d, $this->basePath);
      }
    }
  }

  // Iterate each executor and feed the invalidations to all of them.
  foreach ($this
    ->executors() as $executor) {
    $executor_id = $executor
      ->getId();
    foreach ($invalidations as $i) {
      $i
        ->setStatusContext($executor_id);
    }
    $executor
      ->invalidate($invalidations);
  }

  // Put each and every invalidation back into general context.
  foreach ($invalidations as $i) {
    $i
      ->setStatusContext(NULL);
    if ($i
      ->getStatusBoolean() === TRUE) {
      $this
        ->history($i
        ->getUri());
    }
  }

  // In reality, $invalidation::setStatus() has kept the statuses on the queue
  // item, so we can now use ::getStatusBoolean() to delete/release items.
  foreach ($queue_items as $item) {
    if ($item
      ->getStatusBoolean() === TRUE) {
      $this
        ->deduplicate($item
        ->getPath(), 'purged');
      $succeeded[] = $item;
    }
    else {
      $failed[] = $item;
    }
  }
  $this
    ->queue()
    ->deleteItemMultiple($succeeded);
  $this
    ->queue()
    ->releaseItemMultiple($failed);

  // Adjust the remaining capacity downwards for future ::process() calls.
  $this
    ->capacity()
    ->queueClaimsSubtract(count($succeeded) + count($failed));

  // When the bottom of the queue has been reached, reset all state data.
  if ($this
    ->queue()
    ->numberOfItems() === 0) {
    $this
      ->state()
      ->wipe();
  }

  // Invoke deprecated hook_acquia_purge_purge_(failure|success) hooks!
  if (count($failed)) {
    foreach (module_implements('acquia_purge_purge_failure') as $module) {
      $function = $module . '_acquia_purge_purge_failure';
      _acquia_purge_deprecated('hook_acquia_purge_executors()', $function);
      $function($this
        ->queueItemPaths($failed));
    }
  }
  if (count($succeeded)) {
    foreach (module_implements('acquia_purge_purge_success') as $module) {
      $function = $module . '_acquia_purge_purge_success';
      _acquia_purge_deprecated('hook_acquia_purge_executors()', $function);
      $function($this
        ->queueItemPaths($succeeded));
    }
  }
  return TRUE;
}