View source  
  <?php
namespace Drupal\weather\Service;
use Drupal\Core\Batch\BatchBuilder;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\StringTranslation\StringTranslationTrait;
use Drupal\weather\Entity\WeatherPlaceInterface;
class DataService {
  use StringTranslationTrait;
  
  protected $entityTypeManager;
  
  protected $placeStorage;
  
  protected $batchBuilder;
  
  public function __construct(EntityTypeManagerInterface $entity_type_manager) {
    $this->entityTypeManager = $entity_type_manager;
    $this->placeStorage = $entity_type_manager
      ->getStorage('weather_place');
    $this->batchBuilder = new BatchBuilder();
  }
  
  public function weatherDataInstallation() {
    
    $deleteIds = $this->placeStorage
      ->getQuery()
      ->condition('status', WeatherPlaceInterface::STATUS_ORIGINAL)
      ->execute();
    
    $changed_geoids = $this->placeStorage
      ->getQuery()
      ->condition('status', WeatherPlaceInterface::STATUS_ORIGINAL, '<>')
      ->execute();
    
    $file = fopen(drupal_get_path('module', 'weather') . '/files/weather_data.csv', 'r');
    $items = [];
    while (($line = fgetcsv($file, 0, '	')) !== FALSE) {
      
      if (!in_array($line[0], $changed_geoids)) {
        $items[] = $line;
      }
    }
    fclose($file);
    $op = 'Importing';
    if ($deleteIds) {
      $op = 'Re-importing';
      $this->batchBuilder
        ->addOperation([
        self::class,
        'processBatch',
      ], [
        array_values($deleteIds),
        'remove',
      ]);
    }
    $this->batchBuilder
      ->addOperation([
      self::class,
      'processBatch',
    ], [
      $items,
      'add',
    ]);
    $this->batchBuilder
      ->setTitle($this
      ->t('@operation weather places', [
      '@operation' => $op,
    ]))
      ->setInitMessage($this
      ->t('Weather places import is starting.'))
      ->setProgressMessage($this
      ->t('Processed @current out of @total.'))
      ->setErrorMessage($this
      ->t('Weather places import has encountered an error.'))
      ->setFinishCallback([
      self::class,
      'finishBatch',
    ]);
    batch_set($this->batchBuilder
      ->toArray());
  }
  
  public static function processBatch($items, $operation, &$context) {
    $placeStorage = \Drupal::entityTypeManager()
      ->getStorage('weather_place');
    
    if (empty($context['sandbox'])) {
      $context['sandbox'] = [];
      $context['sandbox']['progress'] = 0;
      $context['sandbox']['max'] = count($items);
    }
    $limit = 50;
    
    $range = array_slice($items, $context['sandbox']['progress'], $limit);
    foreach ($range as $item) {
      $id = $operation == 'add' ? $item[0] : $item;
      if ($operation == 'add') {
        $placeStorage
          ->create([
          'geoid' => $item[0],
          'latitude' => $item[1],
          'longitude' => $item[2],
          'country' => $item[3],
          'name' => $item[4],
          'link' => trim($item[5]),
          'status' => WeatherPlaceInterface::STATUS_ORIGINAL,
        ])
          ->save();
      }
      elseif ($operation == 'remove') {
        $placeStorage
          ->load($item)
          ->delete();
      }
      $context['message'] = t('@op @current out of @total weather places. @details', [
        '@op' => $operation == 'add' ? 'Imported' : 'Deleted',
        '@current' => $context['sandbox']['progress'],
        '@total' => $context['sandbox']['max'],
        '@details' => $id,
      ]);
      $context['results'][] = $id;
      $context['sandbox']['progress']++;
    }
    if ($context['sandbox']['progress'] != $context['sandbox']['max']) {
      $context['finished'] = $context['sandbox']['progress'] / $context['sandbox']['max'];
    }
  }
  
  public static function finishBatch($success, $results, $operations) {
    $messenger = \Drupal::messenger();
    if ($success) {
      $messenger
        ->addMessage(t('@count items processed.', [
        '@count' => count($results),
      ]));
    }
    else {
      $error_operation = reset($operations);
      $messenger
        ->addMessage(t('An error occurred while processing @operation with arguments : @args', [
        '@operation' => $error_operation[0],
        '@args' => print_r($error_operation[0], TRUE),
      ]));
    }
  }
}