mongodb_field_storage.module in MongoDB 7
Implementation of the field storage API for MongoDB.
File
mongodb_field_storage/mongodb_field_storage.moduleView source
<?php
/**
* @file
* Implementation of the field storage API for MongoDB.
*/
/**
* Implements hook_field_storage_info().
*/
function mongodb_field_storage_field_storage_info() {
return array(
'mongodb_field_storage' => array(
'label' => t('MongoDB field storage'),
'description' => t('Stores nodes and fields in a MongoDB database.'),
),
);
}
/**
* Implements hook_field_storage_details().
*/
function mongodb_field_storage_field_storage_details($field) {
// We probably want to implement that at one point, but this is informative
// only.
}
/**
* Implements hook_field_storage_create_field().
*/
function mongodb_field_storage_field_storage_create_field($field) {
// Nothing to do?
}
/**
* Implements hook_field_storage_update_field().
*/
function mongodb_field_storage_field_storage_update_field($field, $prior_field, $has_data) {
// Nothing to do?
}
/**
* Implements hook_field_storage_delete_field().
*/
function mongodb_field_storage_field_storage_delete_field($field) {
// Nothing to do?
}
/**
* Implements hook_field_storage_load().
*/
function mongodb_field_storage_field_storage_load($entity_type, $entities, $age, $fields, $options) {
if (module_exists('mongodb_migrate')) {
mongodb_migrate_load_helper($entity_type, $entities, $age, $fields, $options);
}
$load_current = $age == FIELD_LOAD_CURRENT;
// Fetch information about the fields and prepare the keys.
$keys = array();
$saved_fields = array();
foreach ($fields as $field_id => $ids) {
$field = field_info_field_by_id($field_id);
$saved_fields[$field_id] = $field;
// Add this fields' id to the list.
foreach ($ids as $id) {
$keys[$id] = TRUE;
}
}
// Execute the query and save the fields to the parent object.
$collection = mongodb_collection($load_current ? 'fields_current' : 'fields_revision', $entity_type);
$result = $collection
->find(array(
'_id' => array(
'$in' => array_keys($keys),
),
));
foreach ($result as $row) {
if ($entity_type == 'node' && isset($row['nid'])) {
$entity_id = $row['nid'];
}
elseif ($entity_type == 'field_collection_item' && isset($row['item_id'])) {
$entity_id = $row['item_id'];
}
else {
$entity_id = $row['_id'];
}
foreach ($saved_fields as $field_id => $field) {
$field_name = $field['field_name'];
$field_values = array();
if (!empty($row[$field_name])) {
// Restore the field structure.
if ($field['cardinality'] == 1) {
$language = isset($row[$field_name]['_language']) ? $row[$field_name]['_language'] : LANGUAGE_NONE;
unset($row[$field_name]['_language']);
$field_values[$language][0] = $row[$field_name];
}
else {
foreach ($row[$field_name] as $delta => $column_values) {
$language = isset($column_values['_language']) ? $column_values['_language'] : LANGUAGE_NONE;
unset($column_values['_language']);
$field_values[$language][$delta] = $column_values;
}
}
}
$entities[$entity_id]->{$field_name} = $field_values;
}
}
}
/**
* Implements hook_field_storage_write().
*/
function mongodb_field_storage_field_storage_write($entity_type, $entity, $op, $fields, $entity_write = FALSE) {
if (strpos($entity_type, "rules") === 0) {
return;
}
$wrote =& drupal_static(__FUNCTION__, array());
list($entity_id, $revision_id, $bundle) = entity_extract_ids($entity_type, $entity);
// There is always a field_attach_insert/update call followed by an
// entity_insert/update. field_attach does not call storage_write needlessly
// so we need to make sure if the entity was not saved during field_attach,
// it will be during entity write. The static is set during attach write if
// one happened and always unset on entity write.
if ($entity_write) {
if (isset($wrote[$entity_type][$entity_id])) {
unset($wrote[$entity_type][$entity_id]);
return;
}
}
else {
$wrote[$entity_type][$entity_id] = TRUE;
}
// Create a new object.
$new_entity = new stdClass();
$new_entity->_id = intval($entity_id);
$new_entity->_type = $entity_type;
$new_entity->_bundle = $bundle;
if (isset($revision_id)) {
$new_entity->_revision_id = intval($revision_id);
}
// Add the base table's fields to the new object.
$entity_info = entity_get_info($entity_type);
// Although it's rare not to define a base table, test entities for sure
// don't.
if (isset($entity_info['base table'])) {
$table_schema = drupal_get_schema($entity_info['base table']);
foreach ($table_schema['fields'] as $field_name => $column_definition) {
if (isset($entity->{$field_name})) {
$new_entity->{$field_name} = _mongodb_field_storage_value($column_definition['type'], $entity->{$field_name});
}
}
}
// Add the fieldapi fields to the new object.
foreach ($fields as $field_id) {
$field = field_info_field_by_id($field_id);
$field_name = $field['field_name'];
$field_values = array();
if (isset($entity->{$field_name})) {
foreach ($entity->{$field_name} as $language => $values) {
// According to field.test, we should not save anything for NULL.
if (!empty($values[0])) {
if ($field['cardinality'] == 1) {
foreach ($values[0] as $column_name => $column_value) {
if (isset($field['columns'][$column_name])) {
$field_values[$column_name] = _mongodb_field_storage_value($field['columns'][$column_name]['type'], $column_value);
}
if ($language != LANGUAGE_NONE) {
$field_values['_language'] = $language;
}
}
}
else {
// Collapse deltas.
$values = array_values($values);
if ($field['cardinality'] > 1 && count($values) > $field['cardinality']) {
throw new MongodbStorageException(t('Invalid delta for @field_name, not saving @entity_type @entity_id', array(
'@field_name' => $field_name,
'@entity_type' => $entity_type,
'@entity_id' => $entity_id,
)));
}
foreach ($values as $column_values) {
$store_values = array();
foreach ($column_values as $column_name => $column_value) {
if (isset($field['columns'][$column_name])) {
$store_values[$column_name] = _mongodb_field_storage_value($field['columns'][$column_name]['type'], $column_values[$column_name]);
}
}
// Collapse the field structure.
if ($language != LANGUAGE_NONE) {
$store_values['_language'] = $language;
}
$field_values[] = $store_values;
}
}
}
}
}
$new_entity->{$field_name} = empty($field_values) ? NULL : $field_values;
}
// Save the object.
$collection = mongodb_collection('fields_current', $entity_type);
$collection
->save($new_entity, mongodb_default_write_options());
$slaves = variable_get('mongodb_slave');
if ($slaves) {
$return = $collection->db
->runCommand(array(
'getlasterror' => 1,
'w' => $slaves,
'wtimeout' => 3000,
));
if ($return['wtimeout']) {
header('HTTP/1.0 500 Database write failed');
exit;
}
}
if (isset($revision_id)) {
$new_entity->_id = (int) $revision_id;
mongodb_collection('fields_revision', $entity_type)
->save($new_entity);
}
module_invoke('mongodb_migrate', 'write_helper', $entity_type, $entity_id);
}
/**
* Setting the type of field value.
*
* @param string $type
* Type of field.
* @param string $value
* Vaule of field.
*
* @return string
* Returns the value.
*/
function _mongodb_field_storage_value($type, $value) {
switch ($type) {
case 'int':
case 'serial':
return is_array($value) ? array_map('intval', $value) : intval($value);
case 'float':
return is_array($value) ? array_map('floatval', $value) : floatval($value);
default:
return $value;
}
}
/**
* Implements hook_field_storage_delete().
*
* This function deletes data for all fields for an entity from the database.
*/
function mongodb_field_storage_field_storage_delete($entity_type, $entity, $fields) {
list($entity_id, , ) = entity_extract_ids($entity_type, $entity);
mongodb_collection('fields_current', $entity_type)
->remove(array(
'_id' => (int) $entity_id,
), mongodb_default_write_options());
$entity_info = entity_get_info($entity_type);
mongodb_collection('fields_revision', $entity_type)
->remove(array(
$entity_info['entity keys']['id'] => (int) $entity_id,
), mongodb_default_write_options());
}
/**
* Implements hook_field_storage_delete_revision().
*
* This function actually deletes the data from the database.
*/
function mongodb_field_storage_field_storage_delete_revision($entity_type, $entity, $fields) {
list(, $revision_id, ) = entity_extract_ids($entity_type, $entity);
mongodb_collection('fields_revision', $entity_type)
->remove(array(
'_id' => (int) $revision_id,
), mongodb_default_write_options());
}
/**
* Implements hook_field_storage_delete_instance().
*
* This function simply marks for deletion all data associated with the field.
*/
function mongodb_field_storage_field_storage_delete_instance($instance) {
// @todo: figure out what to do.
}
// phpcs:ignore
class MongodbStorageException extends Exception {
}
/**
* Implements hook_entity_query_alter().
*
* Alter the entity info.
*/
function mongodb_field_storage_entity_query_alter($query) {
if (isset($query->fields[0]) && $query->fields[0]['storage']['type'] == 'mongodb_field_storage') {
$query->executeCallback = 'mongodb_field_storage_query';
}
}
/**
* Implements hook_field_storage_query().
*
* Execute an EntityFieldQuery.
*/
function mongodb_field_storage_query($query) {
$find = array();
if (isset($query->entityConditions['entity_type'])) {
$entity_type = $query->entityConditions['entity_type']['value'];
unset($query->entityConditions['entity_type']);
}
else {
// It looks like we are performing a 'has_data' query as we aren't being
// passed any entity conditions - we try to be sure of this by rebuilding
// a has data query and comparing that to our query.
if (count($query->fields) == 1) {
$field = $query->fields[0];
$field_has_data_query = new EntityFieldQuery();
$column = $values = NULL;
$count = TRUE;
// We need to ascertain if the query we're being passed is a 'has data'
// query that is concerned with listing the allowed values, eg list
// module's _list_values_in_use function.
if (!empty($query->fieldConditions[0]['column'])) {
// We are using a field condition, not just a general has data so we
// need our has_data query to mimic this by adding a column to the field
// condition.
$column = $query->fieldConditions[0]['column'];
$values = $query->fieldConditions[0]['value'];
$count = $query->count;
}
$field_has_data_query
->fieldCondition($field, $column, $values)
->range(0, 1);
if ($count) {
$field_has_data_query
->count();
}
$field_has_data_query->executeCallback = 'mongodb_field_storage_query';
$success = TRUE;
foreach (get_object_vars($field_has_data_query) as $key => $value) {
// We rebuild the has_data query and then compare with the received
// query structure but by this point the query is altered so it's
// always failing.
if ($key != 'altered' && $value != $query->{$key}) {
$success = FALSE;
}
}
if ($success) {
if (!is_array($field['bundles'])) {
// This field not yet attached to any bundles.
return FALSE;
}
if (is_array($field['bundles'])) {
foreach ($field['bundles'] as $entity_type => $data) {
foreach ($data as $bundle_name) {
if (mongodb_collection('fields_current', $entity_type)
->find([
$field['field_name'] => [
'$exists' => TRUE,
],
'type' => $bundle_name,
])
->limit(1)
->count(TRUE)) {
return TRUE;
}
}
}
}
return FALSE;
}
}
// Throw new EntityFieldQueryException('No entity_type').
}
$entity_map = array(
'entity_id' => '_id',
'revision_id' => '_revision_id',
'bundle' => '_bundle',
);
$type_map = array(
'entity_id' => 'int',
'revision_id' => 'int',
'bundle' => 'text',
);
foreach ($query->entityConditions as $field => $data) {
$value = $data['value'];
$find[$entity_map[$field]] = _mongodb_field_storage_query_value($value, $data['operator'], $type_map[$field]);
}
foreach ($query->fieldConditions as $index => $data) {
$fieldName = $data['field']['field_name'] . '.' . $data['column'];
$type = $data['field']['columns'][$data['column']]['type'];
if (isset($find[$fieldName])) {
$find[$fieldName] = array_merge($find[$fieldName], _mongodb_field_storage_query_value($data['value'], $data['operator'], $type));
}
else {
$find[$data['field']['field_name'] . '.' . $data['column']] = _mongodb_field_storage_query_value($data['value'], $data['operator'], $type);
}
unset($query->fields[$index]);
}
/*
foreach ($query->fields as $field) {
$find[$field['field_name']]['$exists'] = TRUE;
}
*/
if ($query->propertyConditions) {
$entity_info = entity_get_info($entity_type);
$table_schema = isset($entity_info['base table']) ? drupal_get_schema($entity_info['base table']) : array();
foreach ($query->propertyConditions as $data) {
$type = isset($table_schema['fields'][$data['column']]['type']) ? $table_schema['fields'][$data['column']]['type'] : NULL;
$find[$data['column']] = _mongodb_field_storage_query_value($data['value'], $data['operator'], $type);
}
}
// Unset propertyConditions to please execute().
unset($query->propertyConditions);
if ($query->age == FIELD_LOAD_CURRENT) {
if (isset($entity_type)) {
$collection = mongodb_collection('fields_current', $entity_type);
$id_key = '_id';
}
}
else {
$collection = mongodb_collection('fields_revision', $entity_type);
$id_key = '_revision_id';
}
if (isset($entity_type)) {
$collection = mongodb_collection($query->age == FIELD_LOAD_CURRENT ? 'fields_current' : 'fields_revision', $entity_type);
if ($query->count && !$query->range) {
return $collection
->count($find);
}
$cursor = $collection
->find($find);
if (isset($query->hint)) {
$cursor
->hint($query->hint);
}
if ($query->range) {
$cursor
->skip($query->range['start'])
->limit($query->range['length']);
}
if ($query->count) {
return $cursor
->count(TRUE);
}
else {
$sort = array();
foreach ($query->order as $order) {
switch ($order['type']) {
case 'field':
$key = $order['specifier']['field']['field_name'] . '.' . $order['specifier']['column'];
break;
case 'property':
$key = $order['specifier'];
break;
case 'entity':
$key = $entity_map[$order['specifier']];
break;
}
$sort[$key] = $order['direction'] == 'ASC' ? 1 : -1;
}
if ($sort) {
$cursor
->sort($sort);
}
}
}
$return = array();
if (isset($cursor)) {
foreach ($cursor as $row) {
$row += array(
'_revision_id' => NULL,
);
$entity = entity_create_stub_entity($entity_type, array(
$row['_id'],
$row['_revision_id'],
$row['_bundle'],
));
$return[$entity_type][$row[$id_key]] = $entity;
}
}
return $return;
}
/**
* Build a MongoDB query selector based on EntityQuery syntax.
*
* @param mixed $value
* The value to which to apply the operator.
* @param string $operator
* The query operator.
* @param string $type
* The field type.
*
* @return array|array[]|\MongoRegex|string|string[]
* The query selector.
*
* @throws \EntityFieldQueryException
* @throws \MongoException
*/
function _mongodb_field_storage_query_value($value, $operator, $type) {
if (!isset($operator)) {
$operator = is_array($value) ? 'IN' : '=';
}
$value = _mongodb_field_storage_value($type, $value);
switch ($operator) {
case '=':
return $value;
case 'IN':
return array(
'$in' => array_values($value),
);
case 'NOT IN':
return array(
'$nin' => array_values($value),
);
case 'ALL':
return array(
'$all' => $value,
);
case '<':
return array(
'$lt' => $value,
);
case '>':
return array(
'$gt' => $value,
);
case '<=':
return array(
'$lte' => $value,
);
case '>=':
return array(
'$gte' => $value,
);
case '!=':
case '<>':
return array(
'$ne' => $value,
);
case 'STARTS_WITH':
return new MongoRegex('/^' . preg_quote($value) . '/');
case 'CONTAINS':
return new MongoRegex('/' . preg_quote($value) . '/');
case 'BETWEEN':
return array(
'$gte' => $value[0],
'$lte' => $value[1],
);
default:
throw new EntityFieldQueryException("{$operator} not implemented");
}
}
/**
* Implements hook_field_attach_rename_bundle().
*/
function mongodb_field_storage_field_attach_rename_bundle($entity_type, $bundle_old, $bundle_new) {
mongodb_collection('fields_current', $entity_type)
->update(array(
'_bundle' => $bundle_old,
), array(
'_bundle' => $bundle_new,
), array(
'multiple' => TRUE,
), mongodb_default_write_options());
mongodb_collection('fields_revision', $entity_type)
->update(array(
'_bundle' => $bundle_old,
), array(
'_bundle' => $bundle_new,
), array(
'multiple' => TRUE,
), mongodb_default_write_options());
}
/**
* Implements hook_entity_insert().
*/
function mongodb_field_storage_entity_insert($entity, $entity_type) {
mongodb_field_storage_field_storage_write($entity_type, $entity, NULL, array(), TRUE);
}
/**
* Implements hook_entity_update().
*/
function mongodb_field_storage_entity_update($entity, $entity_type) {
mongodb_field_storage_field_storage_write($entity_type, $entity, NULL, array(), TRUE);
}
/**
* Implements hook_field_attach_delete().
*/
function mongodb_field_storage_field_attach_delete($entity_type, $entity) {
list($entity_id, $revision_id) = entity_extract_ids($entity_type, $entity);
mongodb_collection('fields_current', $entity_type)
->remove(array(
'_id' => (int) $entity_id,
), mongodb_default_write_options());
if ($revision_id) {
mongodb_collection('fields_revision', $entity_type)
->remove(array(
'_id' => (int) $revision_id,
), mongodb_default_write_options());
}
}
/**
* Implements hook_entity_info_alter().
*/
function mongodb_field_storage_entity_info_alter(&$entity_info) {
// phpcs:ignore
foreach ($entity_info as &$info) {
// $info['controller class'] = 'MongoDb' . $info['controller class'];
}
}
// phpcs:ignore
class MongoDbEntityLoader {
/**
* Query results.
*
* @var array
*/
protected $result;
/**
* MongoDbEntityLoader constructor.
*
* @param string $entityType
* The type of entity to load.
* @param array $ids
* The IDs of the entities to load.
* @param array $conditions
* The conditions under which to load the entities.
*/
public function __construct($entityType, array $ids, array $conditions) {
$this->entityType = $entityType;
$this->ids = $ids;
$this->conditions = $conditions;
}
/**
* Perform the load.
*
* @return $this
*
* @throws \MongoConnectionException
*/
public function execute() {
$entities = array();
if ($this->ids) {
$this->conditions['_id']['$in'] = array_map('intval', $this->ids);
foreach ($this->ids as $id) {
$entities[$id] = FALSE;
}
}
foreach (mongodb_collection('fields_current', $this->entityType)
->find($this->conditions) as $result) {
$entities[$result['_id']] = (object) $result;
}
$this->result = array_filter($entities);
return $this;
}
/**
* Fetch the query results.
*
* @return array
* The result documents.
*/
public function fetchAllAssoc() {
return $this->result;
}
}
// phpcs:ignore
class MongoDbDrupalDefaultEntityController extends DrupalDefaultEntityController {
/**
* {@inheritDoc}
*/
public function buildQuery($ids, $conditions = array(), $revision_id = FALSE) {
return new MongoDbEntityLoader($this->entityType, $ids, $conditions);
}
}
if (class_exists('CommentController')) {
// phpcs:ignore
class MongodbCommentController extends CommentController {
/**
* {@inheritDoc}
*/
public function buildQuery($ids, $conditions = array(), $revision_id = FALSE) {
return new MongoDbEntityLoader($this->entityType, $ids, $conditions);
}
}
}
if (class_exists('NodeController')) {
// phpcs:ignore
class MongoDbNodeController extends NodeController {
/**
* {@inheritDoc}
*/
public function buildQuery($ids, $conditions = array(), $revision_id = FALSE) {
return new MongoDbEntityLoader($this->entityType, $ids, $conditions);
}
}
}
if (class_exists('TaxonomyTermController')) {
// phpcs:ignore
class MongoDbTaxonomyTermController extends TaxonomyTermController {
/**
* {@inheritDoc}
*/
public function buildQuery($ids, $conditions = array(), $revision_id = FALSE) {
return new MongoDbEntityLoader($this->entityType, $ids, $conditions);
}
}
}
if (class_exists('TaxonomyVocabularyController')) {
// phpcs:ignore
class MongoDbTaxonomyVocabularyController extends TaxonomyVocabularyController {
/**
* {@inheritDoc}
*/
public function buildQuery($ids, $conditions = array(), $revision_id = FALSE) {
return new MongoDbEntityLoader($this->entityType, $ids, $conditions);
}
}
}
if (class_exists('UserController')) {
// phpcs:ignore
class MongoDbUserController extends UserController {
/**
* {@inheritDoc}
*/
public function buildQuery($ids, $conditions = array(), $revision_id = FALSE) {
return new MongoDbEntityLoader($this->entityType, $ids, $conditions);
}
}
}