You are here

mongodb_field_storage.module in MongoDB 7

Implementation of the field storage API for MongoDB.

File

mongodb_field_storage/mongodb_field_storage.module
View source
<?php

/**
 * @file
 * Implementation of the field storage API for MongoDB.
 */

/**
 * Implements hook_field_storage_info().
 */
function mongodb_field_storage_field_storage_info() {
  return array(
    'mongodb_field_storage' => array(
      'label' => t('MongoDB field storage'),
      'description' => t('Stores nodes and fields in a MongoDB database.'),
    ),
  );
}

/**
 * Implements hook_field_storage_details().
 */
function mongodb_field_storage_field_storage_details($field) {

  // We probably want to implement that at one point, but this is informative
  // only.
}

/**
 * Implements hook_field_storage_create_field().
 */
function mongodb_field_storage_field_storage_create_field($field) {

  // Nothing to do?
}

/**
 * Implements hook_field_storage_update_field().
 */
function mongodb_field_storage_field_storage_update_field($field, $prior_field, $has_data) {

  // Nothing to do?
}

/**
 * Implements hook_field_storage_delete_field().
 */
function mongodb_field_storage_field_storage_delete_field($field) {

  // Nothing to do?
}

/**
 * Implements hook_field_storage_load().
 */
function mongodb_field_storage_field_storage_load($entity_type, $entities, $age, $fields, $options) {
  if (module_exists('mongodb_migrate')) {
    mongodb_migrate_load_helper($entity_type, $entities, $age, $fields, $options);
  }
  $load_current = $age == FIELD_LOAD_CURRENT;

  // Fetch information about the fields and prepare the keys.
  $keys = array();
  $saved_fields = array();
  foreach ($fields as $field_id => $ids) {
    $field = field_info_field_by_id($field_id);
    $saved_fields[$field_id] = $field;

    // Add this fields' id to the list.
    foreach ($ids as $id) {
      $keys[$id] = TRUE;
    }
  }

  // Execute the query and save the fields to the parent object.
  $collection = mongodb_collection($load_current ? 'fields_current' : 'fields_revision', $entity_type);
  $result = $collection
    ->find(array(
    '_id' => array(
      '$in' => array_keys($keys),
    ),
  ));
  foreach ($result as $row) {
    if ($entity_type == 'node' && isset($row['nid'])) {
      $entity_id = $row['nid'];
    }
    elseif ($entity_type == 'field_collection_item' && isset($row['item_id'])) {
      $entity_id = $row['item_id'];
    }
    else {
      $entity_id = $row['_id'];
    }
    foreach ($saved_fields as $field_id => $field) {
      $field_name = $field['field_name'];
      $field_values = array();
      if (!empty($row[$field_name])) {

        // Restore the field structure.
        if ($field['cardinality'] == 1) {
          $language = isset($row[$field_name]['_language']) ? $row[$field_name]['_language'] : LANGUAGE_NONE;
          unset($row[$field_name]['_language']);
          $field_values[$language][0] = $row[$field_name];
        }
        else {
          foreach ($row[$field_name] as $delta => $column_values) {
            $language = isset($column_values['_language']) ? $column_values['_language'] : LANGUAGE_NONE;
            unset($column_values['_language']);
            $field_values[$language][$delta] = $column_values;
          }
        }
      }
      $entities[$entity_id]->{$field_name} = $field_values;
    }
  }
}

/**
 * Implements hook_field_storage_write().
 */
function mongodb_field_storage_field_storage_write($entity_type, $entity, $op, $fields, $entity_write = FALSE) {
  if (strpos($entity_type, "rules") === 0) {
    return;
  }
  $wrote =& drupal_static(__FUNCTION__, array());
  list($entity_id, $revision_id, $bundle) = entity_extract_ids($entity_type, $entity);

  // There is always a field_attach_insert/update call followed by an
  // entity_insert/update. field_attach does not call storage_write needlessly
  // so we need to make sure if the entity was not saved during field_attach,
  // it will be during entity write. The static is set during attach write if
  // one happened and always unset on entity write.
  if ($entity_write) {
    if (isset($wrote[$entity_type][$entity_id])) {
      unset($wrote[$entity_type][$entity_id]);
      return;
    }
  }
  else {
    $wrote[$entity_type][$entity_id] = TRUE;
  }

  // Create a new object.
  $new_entity = new stdClass();
  $new_entity->_id = intval($entity_id);
  $new_entity->_type = $entity_type;
  $new_entity->_bundle = $bundle;
  if (isset($revision_id)) {
    $new_entity->_revision_id = intval($revision_id);
  }

  // Add the base table's fields to the new object.
  $entity_info = entity_get_info($entity_type);

  // Although it's rare not to define a base table, test entities for sure
  // don't.
  if (isset($entity_info['base table'])) {
    $table_schema = drupal_get_schema($entity_info['base table']);
    foreach ($table_schema['fields'] as $field_name => $column_definition) {
      if (isset($entity->{$field_name})) {
        $new_entity->{$field_name} = _mongodb_field_storage_value($column_definition['type'], $entity->{$field_name});
      }
    }
  }

  // Add the fieldapi fields to the new object.
  foreach ($fields as $field_id) {
    $field = field_info_field_by_id($field_id);
    $field_name = $field['field_name'];
    $field_values = array();
    if (isset($entity->{$field_name})) {
      foreach ($entity->{$field_name} as $language => $values) {

        // According to field.test, we should not save anything for NULL.
        if (!empty($values[0])) {
          if ($field['cardinality'] == 1) {
            foreach ($values[0] as $column_name => $column_value) {
              if (isset($field['columns'][$column_name])) {
                $field_values[$column_name] = _mongodb_field_storage_value($field['columns'][$column_name]['type'], $column_value);
              }
              if ($language != LANGUAGE_NONE) {
                $field_values['_language'] = $language;
              }
            }
          }
          else {

            // Collapse deltas.
            $values = array_values($values);
            if ($field['cardinality'] > 1 && count($values) > $field['cardinality']) {
              throw new MongodbStorageException(t('Invalid delta for @field_name, not saving @entity_type @entity_id', array(
                '@field_name' => $field_name,
                '@entity_type' => $entity_type,
                '@entity_id' => $entity_id,
              )));
            }
            foreach ($values as $column_values) {
              $store_values = array();
              foreach ($column_values as $column_name => $column_value) {
                if (isset($field['columns'][$column_name])) {
                  $store_values[$column_name] = _mongodb_field_storage_value($field['columns'][$column_name]['type'], $column_values[$column_name]);
                }
              }

              // Collapse the field structure.
              if ($language != LANGUAGE_NONE) {
                $store_values['_language'] = $language;
              }
              $field_values[] = $store_values;
            }
          }
        }
      }
    }
    $new_entity->{$field_name} = empty($field_values) ? NULL : $field_values;
  }

  // Save the object.
  $collection = mongodb_collection('fields_current', $entity_type);
  $collection
    ->save($new_entity, mongodb_default_write_options());
  $slaves = variable_get('mongodb_slave');
  if ($slaves) {
    $return = $collection->db
      ->runCommand(array(
      'getlasterror' => 1,
      'w' => $slaves,
      'wtimeout' => 3000,
    ));
    if ($return['wtimeout']) {
      header('HTTP/1.0 500 Database write failed');
      exit;
    }
  }
  if (isset($revision_id)) {
    $new_entity->_id = (int) $revision_id;
    mongodb_collection('fields_revision', $entity_type)
      ->save($new_entity);
  }
  module_invoke('mongodb_migrate', 'write_helper', $entity_type, $entity_id);
}

/**
 * Setting the type of field value.
 *
 * @param string $type
 *   Type of field.
 * @param string $value
 *   Vaule of field.
 *
 * @return string
 *   Returns the value.
 */
function _mongodb_field_storage_value($type, $value) {
  switch ($type) {
    case 'int':
    case 'serial':
      return is_array($value) ? array_map('intval', $value) : intval($value);
    case 'float':
      return is_array($value) ? array_map('floatval', $value) : floatval($value);
    default:
      return $value;
  }
}

/**
 * Implements hook_field_storage_delete().
 *
 * This function deletes data for all fields for an entity from the database.
 */
function mongodb_field_storage_field_storage_delete($entity_type, $entity, $fields) {
  list($entity_id, , ) = entity_extract_ids($entity_type, $entity);
  mongodb_collection('fields_current', $entity_type)
    ->remove(array(
    '_id' => (int) $entity_id,
  ), mongodb_default_write_options());
  $entity_info = entity_get_info($entity_type);
  mongodb_collection('fields_revision', $entity_type)
    ->remove(array(
    $entity_info['entity keys']['id'] => (int) $entity_id,
  ), mongodb_default_write_options());
}

/**
 * Implements hook_field_storage_delete_revision().
 *
 * This function actually deletes the data from the database.
 */
function mongodb_field_storage_field_storage_delete_revision($entity_type, $entity, $fields) {
  list(, $revision_id, ) = entity_extract_ids($entity_type, $entity);
  mongodb_collection('fields_revision', $entity_type)
    ->remove(array(
    '_id' => (int) $revision_id,
  ), mongodb_default_write_options());
}

/**
 * Implements hook_field_storage_delete_instance().
 *
 * This function simply marks for deletion all data associated with the field.
 */
function mongodb_field_storage_field_storage_delete_instance($instance) {

  // @todo: figure out what to do.
}

// phpcs:ignore
class MongodbStorageException extends Exception {

}

/**
 * Implements hook_entity_query_alter().
 *
 * Alter the entity info.
 */
function mongodb_field_storage_entity_query_alter($query) {
  if (isset($query->fields[0]) && $query->fields[0]['storage']['type'] == 'mongodb_field_storage') {
    $query->executeCallback = 'mongodb_field_storage_query';
  }
}

/**
 * Implements hook_field_storage_query().
 *
 * Execute an EntityFieldQuery.
 */
function mongodb_field_storage_query($query) {
  $find = array();
  if (isset($query->entityConditions['entity_type'])) {
    $entity_type = $query->entityConditions['entity_type']['value'];
    unset($query->entityConditions['entity_type']);
  }
  else {

    // It looks like we are performing a 'has_data' query as we aren't being
    // passed any entity conditions - we try to be sure of this by rebuilding
    // a has data query and comparing that to our query.
    if (count($query->fields) == 1) {
      $field = $query->fields[0];
      $field_has_data_query = new EntityFieldQuery();
      $column = $values = NULL;
      $count = TRUE;

      // We need to ascertain if the query we're being passed is a 'has data'
      // query that is concerned with listing the allowed values, eg list
      // module's _list_values_in_use function.
      if (!empty($query->fieldConditions[0]['column'])) {

        // We are using a field condition, not just a general has data so we
        // need our has_data query to mimic this by adding a column to the field
        // condition.
        $column = $query->fieldConditions[0]['column'];
        $values = $query->fieldConditions[0]['value'];
        $count = $query->count;
      }
      $field_has_data_query
        ->fieldCondition($field, $column, $values)
        ->range(0, 1);
      if ($count) {
        $field_has_data_query
          ->count();
      }
      $field_has_data_query->executeCallback = 'mongodb_field_storage_query';
      $success = TRUE;
      foreach (get_object_vars($field_has_data_query) as $key => $value) {

        // We rebuild the has_data query and then compare with the received
        // query structure but by this point the query is altered so it's
        // always failing.
        if ($key != 'altered' && $value != $query->{$key}) {
          $success = FALSE;
        }
      }
      if ($success) {
        if (!is_array($field['bundles'])) {

          // This field not yet attached to any bundles.
          return FALSE;
        }
        if (is_array($field['bundles'])) {
          foreach ($field['bundles'] as $entity_type => $data) {
            foreach ($data as $bundle_name) {
              if (mongodb_collection('fields_current', $entity_type)
                ->find([
                $field['field_name'] => [
                  '$exists' => TRUE,
                ],
                'type' => $bundle_name,
              ])
                ->limit(1)
                ->count(TRUE)) {
                return TRUE;
              }
            }
          }
        }
        return FALSE;
      }
    }

    // Throw new EntityFieldQueryException('No entity_type').
  }
  $entity_map = array(
    'entity_id' => '_id',
    'revision_id' => '_revision_id',
    'bundle' => '_bundle',
  );
  $type_map = array(
    'entity_id' => 'int',
    'revision_id' => 'int',
    'bundle' => 'text',
  );
  foreach ($query->entityConditions as $field => $data) {
    $value = $data['value'];
    $find[$entity_map[$field]] = _mongodb_field_storage_query_value($value, $data['operator'], $type_map[$field]);
  }
  foreach ($query->fieldConditions as $index => $data) {
    $fieldName = $data['field']['field_name'] . '.' . $data['column'];
    $type = $data['field']['columns'][$data['column']]['type'];
    if (isset($find[$fieldName])) {
      $find[$fieldName] = array_merge($find[$fieldName], _mongodb_field_storage_query_value($data['value'], $data['operator'], $type));
    }
    else {
      $find[$data['field']['field_name'] . '.' . $data['column']] = _mongodb_field_storage_query_value($data['value'], $data['operator'], $type);
    }
    unset($query->fields[$index]);
  }

  /*
    foreach ($query->fields as $field) {
    $find[$field['field_name']]['$exists'] = TRUE;
    }
  */
  if ($query->propertyConditions) {
    $entity_info = entity_get_info($entity_type);
    $table_schema = isset($entity_info['base table']) ? drupal_get_schema($entity_info['base table']) : array();
    foreach ($query->propertyConditions as $data) {
      $type = isset($table_schema['fields'][$data['column']]['type']) ? $table_schema['fields'][$data['column']]['type'] : NULL;
      $find[$data['column']] = _mongodb_field_storage_query_value($data['value'], $data['operator'], $type);
    }
  }

  // Unset propertyConditions to please execute().
  unset($query->propertyConditions);
  if ($query->age == FIELD_LOAD_CURRENT) {
    if (isset($entity_type)) {
      $collection = mongodb_collection('fields_current', $entity_type);
      $id_key = '_id';
    }
  }
  else {
    $collection = mongodb_collection('fields_revision', $entity_type);
    $id_key = '_revision_id';
  }
  if (isset($entity_type)) {
    $collection = mongodb_collection($query->age == FIELD_LOAD_CURRENT ? 'fields_current' : 'fields_revision', $entity_type);
    if ($query->count && !$query->range) {
      return $collection
        ->count($find);
    }
    $cursor = $collection
      ->find($find);
    if (isset($query->hint)) {
      $cursor
        ->hint($query->hint);
    }
    if ($query->range) {
      $cursor
        ->skip($query->range['start'])
        ->limit($query->range['length']);
    }
    if ($query->count) {
      return $cursor
        ->count(TRUE);
    }
    else {
      $sort = array();
      foreach ($query->order as $order) {
        switch ($order['type']) {
          case 'field':
            $key = $order['specifier']['field']['field_name'] . '.' . $order['specifier']['column'];
            break;
          case 'property':
            $key = $order['specifier'];
            break;
          case 'entity':
            $key = $entity_map[$order['specifier']];
            break;
        }
        $sort[$key] = $order['direction'] == 'ASC' ? 1 : -1;
      }
      if ($sort) {
        $cursor
          ->sort($sort);
      }
    }
  }
  $return = array();
  if (isset($cursor)) {
    foreach ($cursor as $row) {
      $row += array(
        '_revision_id' => NULL,
      );
      $entity = entity_create_stub_entity($entity_type, array(
        $row['_id'],
        $row['_revision_id'],
        $row['_bundle'],
      ));
      $return[$entity_type][$row[$id_key]] = $entity;
    }
  }
  return $return;
}

/**
 * Build a MongoDB query selector based on EntityQuery syntax.
 *
 * @param mixed $value
 *   The value to which to apply the operator.
 * @param string $operator
 *   The query operator.
 * @param string $type
 *   The field type.
 *
 * @return array|array[]|\MongoRegex|string|string[]
 *   The query selector.
 *
 * @throws \EntityFieldQueryException
 * @throws \MongoException
 */
function _mongodb_field_storage_query_value($value, $operator, $type) {
  if (!isset($operator)) {
    $operator = is_array($value) ? 'IN' : '=';
  }
  $value = _mongodb_field_storage_value($type, $value);
  switch ($operator) {
    case '=':
      return $value;
    case 'IN':
      return array(
        '$in' => array_values($value),
      );
    case 'NOT IN':
      return array(
        '$nin' => array_values($value),
      );
    case 'ALL':
      return array(
        '$all' => $value,
      );
    case '<':
      return array(
        '$lt' => $value,
      );
    case '>':
      return array(
        '$gt' => $value,
      );
    case '<=':
      return array(
        '$lte' => $value,
      );
    case '>=':
      return array(
        '$gte' => $value,
      );
    case '!=':
    case '<>':
      return array(
        '$ne' => $value,
      );
    case 'STARTS_WITH':
      return new MongoRegex('/^' . preg_quote($value) . '/');
    case 'CONTAINS':
      return new MongoRegex('/' . preg_quote($value) . '/');
    case 'BETWEEN':
      return array(
        '$gte' => $value[0],
        '$lte' => $value[1],
      );
    default:
      throw new EntityFieldQueryException("{$operator} not implemented");
  }
}

/**
 * Implements hook_field_attach_rename_bundle().
 */
function mongodb_field_storage_field_attach_rename_bundle($entity_type, $bundle_old, $bundle_new) {
  mongodb_collection('fields_current', $entity_type)
    ->update(array(
    '_bundle' => $bundle_old,
  ), array(
    '_bundle' => $bundle_new,
  ), array(
    'multiple' => TRUE,
  ), mongodb_default_write_options());
  mongodb_collection('fields_revision', $entity_type)
    ->update(array(
    '_bundle' => $bundle_old,
  ), array(
    '_bundle' => $bundle_new,
  ), array(
    'multiple' => TRUE,
  ), mongodb_default_write_options());
}

/**
 * Implements hook_entity_insert().
 */
function mongodb_field_storage_entity_insert($entity, $entity_type) {
  mongodb_field_storage_field_storage_write($entity_type, $entity, NULL, array(), TRUE);
}

/**
 * Implements hook_entity_update().
 */
function mongodb_field_storage_entity_update($entity, $entity_type) {
  mongodb_field_storage_field_storage_write($entity_type, $entity, NULL, array(), TRUE);
}

/**
 * Implements hook_field_attach_delete().
 */
function mongodb_field_storage_field_attach_delete($entity_type, $entity) {
  list($entity_id, $revision_id) = entity_extract_ids($entity_type, $entity);
  mongodb_collection('fields_current', $entity_type)
    ->remove(array(
    '_id' => (int) $entity_id,
  ), mongodb_default_write_options());
  if ($revision_id) {
    mongodb_collection('fields_revision', $entity_type)
      ->remove(array(
      '_id' => (int) $revision_id,
    ), mongodb_default_write_options());
  }
}

/**
 * Implements hook_entity_info_alter().
 */
function mongodb_field_storage_entity_info_alter(&$entity_info) {

  // phpcs:ignore
  foreach ($entity_info as &$info) {

    // $info['controller class'] = 'MongoDb' . $info['controller class'];
  }
}

// phpcs:ignore
class MongoDbEntityLoader {

  /**
   * Query results.
   *
   * @var array
   */
  protected $result;

  /**
   * MongoDbEntityLoader constructor.
   *
   * @param string $entityType
   *   The type of entity to load.
   * @param array $ids
   *   The IDs of the entities to load.
   * @param array $conditions
   *   The conditions under which to load the entities.
   */
  public function __construct($entityType, array $ids, array $conditions) {
    $this->entityType = $entityType;
    $this->ids = $ids;
    $this->conditions = $conditions;
  }

  /**
   * Perform the load.
   *
   * @return $this
   *
   * @throws \MongoConnectionException
   */
  public function execute() {
    $entities = array();
    if ($this->ids) {
      $this->conditions['_id']['$in'] = array_map('intval', $this->ids);
      foreach ($this->ids as $id) {
        $entities[$id] = FALSE;
      }
    }
    foreach (mongodb_collection('fields_current', $this->entityType)
      ->find($this->conditions) as $result) {
      $entities[$result['_id']] = (object) $result;
    }
    $this->result = array_filter($entities);
    return $this;
  }

  /**
   * Fetch the query results.
   *
   * @return array
   *   The result documents.
   */
  public function fetchAllAssoc() {
    return $this->result;
  }

}

// phpcs:ignore
class MongoDbDrupalDefaultEntityController extends DrupalDefaultEntityController {

  /**
   * {@inheritDoc}
   */
  public function buildQuery($ids, $conditions = array(), $revision_id = FALSE) {
    return new MongoDbEntityLoader($this->entityType, $ids, $conditions);
  }

}
if (class_exists('CommentController')) {

  // phpcs:ignore
  class MongodbCommentController extends CommentController {

    /**
     * {@inheritDoc}
     */
    public function buildQuery($ids, $conditions = array(), $revision_id = FALSE) {
      return new MongoDbEntityLoader($this->entityType, $ids, $conditions);
    }

  }
}
if (class_exists('NodeController')) {

  // phpcs:ignore
  class MongoDbNodeController extends NodeController {

    /**
     * {@inheritDoc}
     */
    public function buildQuery($ids, $conditions = array(), $revision_id = FALSE) {
      return new MongoDbEntityLoader($this->entityType, $ids, $conditions);
    }

  }
}
if (class_exists('TaxonomyTermController')) {

  // phpcs:ignore
  class MongoDbTaxonomyTermController extends TaxonomyTermController {

    /**
     * {@inheritDoc}
     */
    public function buildQuery($ids, $conditions = array(), $revision_id = FALSE) {
      return new MongoDbEntityLoader($this->entityType, $ids, $conditions);
    }

  }
}
if (class_exists('TaxonomyVocabularyController')) {

  // phpcs:ignore
  class MongoDbTaxonomyVocabularyController extends TaxonomyVocabularyController {

    /**
     * {@inheritDoc}
     */
    public function buildQuery($ids, $conditions = array(), $revision_id = FALSE) {
      return new MongoDbEntityLoader($this->entityType, $ids, $conditions);
    }

  }
}
if (class_exists('UserController')) {

  // phpcs:ignore
  class MongoDbUserController extends UserController {

    /**
     * {@inheritDoc}
     */
    public function buildQuery($ids, $conditions = array(), $revision_id = FALSE) {
      return new MongoDbEntityLoader($this->entityType, $ids, $conditions);
    }

  }
}

Functions

Namesort descending Description
mongodb_field_storage_entity_info_alter Implements hook_entity_info_alter().
mongodb_field_storage_entity_insert Implements hook_entity_insert().
mongodb_field_storage_entity_query_alter Implements hook_entity_query_alter().
mongodb_field_storage_entity_update Implements hook_entity_update().
mongodb_field_storage_field_attach_delete Implements hook_field_attach_delete().
mongodb_field_storage_field_attach_rename_bundle Implements hook_field_attach_rename_bundle().
mongodb_field_storage_field_storage_create_field Implements hook_field_storage_create_field().
mongodb_field_storage_field_storage_delete Implements hook_field_storage_delete().
mongodb_field_storage_field_storage_delete_field Implements hook_field_storage_delete_field().
mongodb_field_storage_field_storage_delete_instance Implements hook_field_storage_delete_instance().
mongodb_field_storage_field_storage_delete_revision Implements hook_field_storage_delete_revision().
mongodb_field_storage_field_storage_details Implements hook_field_storage_details().
mongodb_field_storage_field_storage_info Implements hook_field_storage_info().
mongodb_field_storage_field_storage_load Implements hook_field_storage_load().
mongodb_field_storage_field_storage_update_field Implements hook_field_storage_update_field().
mongodb_field_storage_field_storage_write Implements hook_field_storage_write().
mongodb_field_storage_query Implements hook_field_storage_query().
_mongodb_field_storage_query_value Build a MongoDB query selector based on EntityQuery syntax.
_mongodb_field_storage_value Setting the type of field value.

Classes