Support from Acquia helps fund testing for Drupal Acquia logo

Comments

dstol’s picture

Status: Active » Needs work
FileSize
2.23 KB
havran’s picture

Thanks for work on this.

havran’s picture

I use for mongodb as source this class:

/**
 * @file
 * Define a MigrateSource for importing from MongoDB connections
 */

/**
 * Implementation of MigrateSource, to handle imports from MongoDB connections.
 */
class MigrateSourceMongoDB extends MigrateSource {
  /**
   * The result object from process the incoming data.
   *
   * @var MongoCursor
   */
  protected $cursor;

  /**
   * List of available source fields.
   *
   * @var array
   */
  protected $fields = array();

  /**
   * Simple initialization.
   */
  public function __construct(MongoCursor $cursor, array $fields, array $options = array()) {
    parent::__construct($options);

    $this->cursor = $cursor;
    $this->fields = $fields;
  }

  /**
   * Returns a list of fields available to be mapped from the source query.
   *
   * @return array
   *  Keys: machine names of the fields (to be passed to addFieldMapping)
   *  Values: Human-friendly descriptions of the fields.
   */
  public function fields() {
    // The fields are passed to the constructor for this plugin.
    return $this->fields;
  }

  /**
   * Return a count of all available source records.
   */
  public function computeCount() {
    return $this->cursor->count();
  }

  /**
   * Implementation of MigrateSource::getNextRow().
   *
   * @return object
   */
  public function getNextRow() {
    $row = (object)$this->cursor->getNext();

    return $row;
  }

  /**
   * Implementation of MigrateSource::performRewind().
   *
   * @return void
   */
  public function performRewind() {
    $this->cursor->rewind();
  }

  /**
   * Return a string representing the source query.
   *
   * @return string
   */
  public function __toString() {
    return (string) drupal_json_encode($this->cursor->info());
  }
}

And here is migration class:

/**
 * @file
 * Define a Migrate class for importing zip codes from MongoDB.
 */

class ZipsMigration extends Migration {
  public function __construct() {
    parent::__construct(MigrateGroup::getInstance('MongoDB Test'));

    $this->description = t('Test migration from mongodb to Drupal');

    $m = new MongoClient('http://localhost'); // connect to mongodb server
    $db = $m->test_migration; // select database
    $collection = $db->zips; // select collection

    // mongodb example query
    $query = array(
      'city' => 'NEW YORK',
    );

    // cursor
    $cursor = $collection->find($query);

    // we need list of fields
    $fields = array(
      '_id' => '_id',
      'loc' => 'loc',
      'city' => 'city',
      'pop' => 'pop',
      'state' => 'state',
    );

    $this->source = new MigrateSourceMongoDB($cursor, $fields);
    $this->destination = new MigrateDestinationNode('zip'); // to content (node) type 'zip'

    // We instantiate the MigrateMap
    $this->map = new MigrateSQLMap(
      $this->machineName,
      array(
        '_id' => array(
          'type' => 'int',
          'unsigned' => TRUE,
          'not null' => TRUE,
        ),
      ),
      MigrateDestinationNode::getKeySchema()
    );

    $this->addFieldMapping('title', 'city'); // from mongo 'city' field to drupal node 'title'
  }
}
havran’s picture

Issue tags: +Performance, +migration, +mongodb

I have working MongoDB source class. Here is code:

/**
 * @file
 * Define a MigrateSource for importing from MongoDB connections
 */

/**
 * Implementation of MigrateSource, to handle imports from MongoDB connections.
 */
class MigrateSourceMongoDB extends MigrateSource {
  /**
   * The mongodb collection object.
   *
   * @var MongoCollection
   */
  protected $collection;

  /**
   * The mongodb cursor object.
   *
   * @var MongoCursor
   */
  protected $cursor;

  /**
   * The mongodb query array.
   *
   * @var array
   */
  protected $query;

  /**
   * List of available source fields.
   *
   * @var array
   */
  protected $fields = array();

  /**
   * Simple initialization.
   */
  public function __construct(MongoCollection $collection, array $query, array $sort = array('_id' => 1), array $fields = array(), array $options = array()) {
    parent::__construct($options);

    $this->collection = $collection;
    $this->query = $query;
    $this->sort = $sort;
    $this->fields = $fields;
  }

  /**
   * Returns a list of fields available to be mapped from the source query.
   *
   * @return array
   *  Keys: machine names of the fields (to be passed to addFieldMapping)
   *  Values: Human-friendly descriptions of the fields.
   */
  public function fields() {
    // The fields are passed to the constructor for this plugin.
    return $this->fields;
  }

  /**
   * Return a count of all available source records.
   */
  public function computeCount() {
    return $this->cursor->count(TRUE);
  }

  /**
   * Implementation of MigrateSource::getNextRow().
   *
   * @return object
   */
  public function getNextRow() {
    $row = $this->cursor->getNext();

    if ($row) {
      return (object) $row;
    }

    return NULL;
  }

  /**
   * Implementation of MigrateSource::performRewind().
   *
   * @return void
   */
  public function performRewind() {
    // Get the key values, for potential use in joining to the map table, or
    // enforcing idlist.
    $keys = array();
    foreach ($this->activeMap->getSourceKey() as $field_name => $field_schema) {
      if (isset($field_schema['alias'])) {
        $field_name = $field_schema['alias'] . '.' . $field_name;
      }
      $keys[] = $field_name;
    }

    $maptable = $this->activeMap->getQualifiedMapTable();

    if ($this->idList) {
      $this->query[$keys[0]]['$in'] = $this->idList;
    }
    else {
      // ToDo: find some method for better performance because for now
      // we get always all (migrated and unmigrated) rows
      // something like:
      // get only rows where is id (or date or something) > than id from last
      // migrated row
    }

    migrate_instrument_start('MigrateSourceMongoDB execute');
    $this->cursor = $this->collection->find($this->query);
    $this->cursor->sort($this->sort);
    migrate_instrument_stop('MigrateSourceMongoDB execute');
  }

  /**
   * Return a string representing the source query.
   *
   * @return string
   */
  public function __toString() {
    if (is_null($this->cursor)) {
      $this->cursor = $this->collection->find($this->query);
    }

    return (string) drupal_json_encode($this->cursor->info());
  }
}

And here is migration class:

/**
 * @file
 * Define a Migrate class for importing zip codes from MongoDB.
 */

class ZipsMigration extends Migration {
  public function __construct() {
    parent::__construct(MigrateGroup::getInstance('TestMigration'));

    $this->description = t('Test migration from mongodb to Drupal');

    // We instantiate the MigrateMap
    $this->map = new MigrateSQLMap(
      $this->machineName,
      array(
        '_id' => array(
          'type' => 'int',
          'unsigned' => TRUE,
          'not null' => TRUE,
        ),
      ),
      MigrateDestinationNode::getKeySchema()
    );

    $m = new MongoClient('mongodb://localhost'); // connect to mongodb server
    $collection = $m->test_migration->zips; // select database and collection

    // we need list of fields
    $fields = array(
      '_id' => '_id',
      'loc' => 'loc',
      'city' => 'city',
      'pop' => 'pop',
      'state' => 'state',
    );

    // mongodb query
    $query = array(
      'city' => 'ADAMSVILLE',
    );
    
    $sort = array(
      '_id' => 1,
    );

    $this->source = new MigrateSourceMongoDB($collection, $query, $sort, $fields);
    $this->destination = new MigrateDestinationNode('zip');

    $this->addFieldMapping('title', 'city');
    $this->addFieldMapping('field_loc', 'loc');
    $this->addFieldMapping('field_pop', 'pop');
    $this->addFieldMapping('field_state', 'state');
  }
}

But this still need more work on performance (see comments on performRewind() function)

havran’s picture

Today progress.

- performance optimization - now mongodb query return only unmigrated documents
- some code cleanup

MigrateSource class:

/**
 * @file
 * Define a MigrateSource for importing from MongoDB connections
 */

/**
 * Implementation of MigrateSource, to handle imports from MongoDB connections.
 */
class MigrateSourceMongoDB extends MigrateSource {
  /**
   * The mongodb collection object.
   *
   * @var MongoCollection
   */
  protected $collection;

  /**
   * The mongodb cursor object.
   *
   * @var MongoCursor
   */
  protected $cursor;

  /**
   * The mongodb query.
   *
   * @var array
   */
  protected $query;

  /**
   * List of available source fields.
   *
   * @var array
   */
  protected $fields = array();

  /**
   * Simple initialization.
   */
  public function __construct(MongoCollection $collection, array $query, array $fields = array(), array $sort = array('_id' => 1), array $options = array()) {
    parent::__construct($options);

    $this->collection = $collection;
    $this->query = $query;
    $this->sort = $sort;
    $this->fields = $fields;

    // get all indexes from collection
    $indexes = $this->collection->getIndexInfo();

    // check if index for drupalMigration data exist
    $create_index = TRUE;
    foreach ($indexes as $index) {
      if ($index['name'] == '_drupalMigration_') {
        $create_index = FALSE;
        break;
      }
    }

    // if index don't exist we create one
    if ($create_index) {
      $this->collection->ensureIndex(
        array('drupalMiration.destinationID' => 1),
        array('name' => '_drupalMigration_')
      );
    }

    // ToDo: delete index if is not neccessary (0 migrated items?)
  }

  /**
   * Returns a list of fields available to be mapped from the source query.
   *
   * @return array
   *  Keys: machine names of the fields (to be passed to addFieldMapping)
   *  Values: Human-friendly descriptions of the fields.
   */
  public function fields() {
    // The fields are passed to the constructor for this plugin.
    return $this->fields;
  }

  /**
   * Return a count of all available source records.
   */
  public function computeCount() {
    return $this->cursor->count(TRUE);
  }

  /**
   * Implementation of MigrateSource::getNextRow().
   *
   * @return object
   */
  public function getNextRow() {
    $row = $this->cursor->getNext();

    if ($row) {
      return (object) $row;
    }

    return NULL;
  }

  /**
   * Implementation of MigrateSource::performRewind().
   *
   * @return void
   */
  public function performRewind() {
    // If we have existing idlist we use them.
    if ($this->idList) {
      foreach ($this->idList as $key => $id) {
        $this->idList[$key] = $this->getMongoId($id);
      }

      $this->query['_id']['$in'] = $this->idList;
    }
    else {
      // ToDo: find some method for better performance because for now
      // we get always all (migrated and unmigrated) rows
      // Now we use method witch require complete and completeRollbacks method
      // in migration class.
      $this->query['drupalMigration'] = NULL;
    }

    migrate_instrument_start('MigrateSourceMongoDB execute');
    $this->cursor = $this->collection->find($this->query);
    $this->cursor->sort($this->sort);
    migrate_instrument_stop('MigrateSourceMongoDB execute');
  }

  /**
   * Return a string representing the source query.
   *
   * @return string
   */
  public function __toString() {
    if (is_null($this->cursor)) {
      $this->cursor = $this->collection->find($this->query);
      $this->cursor->sort($this->sort);
    }

    return (string) drupal_json_encode($this->cursor->info());
  }

  /**
   * Migration document complete - set migration info into migrated document.
   *
   * @param type $document_id
   * @param type $destination_id
   * @param type $timestamp
   */
  public function setMongoDocumentMigrationInfo($document_id, $destination_id, $timestamp) {
    $result = $this->collection->update(
      array('_id' => $this->getMongoId($document_id)),
      array(
        '$set' => array(
          'drupalMigration.destinationID' => $destination_id,
          'drupalMigration.timestamp' => $timestamp,
        ),
      )
    );
  }

  /**
   * Rollback complete - remove migration info from document.
   *
   * @param type $destination_id
   */
  public function removeMongoDocumentMigrationInfo($destination_id) {
    if (!is_array($destination_id)) {
      $destination_id = array($destination_id);
    }

    $result = $this->collection->update(
      array('drupalMigration.destinationID' => array('$in' => $destination_id)),
      array(
        '$unset' => array(
          'drupalMigration' => 1,
        ),
      ),
      array(
        'multiple' => 1,
      )
    );
  }

  /**
   * Check if given id is mongo ObjectId and return id as mongo ObjectId.
   *
   * @param type $id
   * @return type
   */
  private function getMongoId($id) {
    // Trying create Mongo ObjectId
    $mongoid = new MongoId($id);
    // If (string)$mongoid != $document_id we simple use $document_id as $mongoid
    if ((string) $mongoid != $id) {
      $mongoid = $id;
    }

    return $id;
  }
}

Example migration class:

/**
 * @file
 * Define a Migrate class for importing zip codes from MongoDB.
 */

class ZipsMigration extends Migration {
  public function __construct() {
    parent::__construct(MigrateGroup::getInstance('TestMigration'));

    $this->description = t('Test migration from mongodb to Drupal');

    // We instantiate the MigrateMap
    $this->map = new MigrateSQLMap(
      $this->machineName,
      array(
        '_id' => array(
          'type' => 'varchar',
          'length' => 24,
          'not null' => TRUE,
          'description' => 'MongoDB ID (_id) field.'
        ),
      ),
      MigrateDestinationNode::getKeySchema()
    );

//    // TIP: By default, each time a migration is run, any previously unimported source items
//    // are imported (along with any previously-imported items marked for update). If the
//    // source data contains a timestamp that is set to the creation time of each new item,
//    // as well as set to the update time for any existing items that are updated, then
//    // you can have those updated items automatically reimported by setting the field as
//    // your highwater field.
//    $this->highwaterField = array(
//      'name' => 'editdate',     // Column to be used as highwater mark
//      //'type' => 'int',        // By default, highwater marks are assumed to be lexicographically
//                                // sortable (e.g., '2011-05-19 17:53:12'). To properly
//                                // deal with integer highwater marks (such as UNIX
//                                // timestamps), indicate so here.
//    );

    $m = new MongoClient(MONGODB_SERVER); // connect to mongodb server
    $db = $m->test_migration; // connect to mongodb database
    $collection = $db->zips; // select collection from mongodb database

    // we need explicit list of fields because collection documents can use
    // different fields
    $fields = array(
      '_id' => '_id',
      'city' => 'city',
    );

    // mongodb query - empty array mean all documents in collection
    $query = array();

    // mongodb query sort - number 1 mean ascending, -1 descending
    $sort = array(
      '_id' => 1,
    );

    $this->source = new MigrateSourceMongoDB($collection, $query, $fields, $sort);
    $this->destination = new MigrateDestinationNode('zip'); // to node type 'zip'

    $this->addFieldMapping('title', 'city');
  }

  public function complete($entity, $row) {
    // this store additional migration info in mongodb collection document
    // (used for performance)
    $this->source->setMongoDocumentMigrationInfo($row->_id, $entity->nid, $entity->created);
  }

  public function completeRollback($entity_id) {
    // this remove additional migration info in mongodb collection document
    $this->source->removeMongoDocumentMigrationInfo($entity_id);
  }
}
mikeryan’s picture

Please attach files rather than pasting all your code into the comment body.

Thank you.

TravisCarden’s picture

Title: MongoDB source plugin » Add a MongoDB source plugin
Status: Needs work » Needs review
FileSize
558 bytes
5.5 KB

Here's a patch for @havran's code in #5. It works pretty well for me, but don't take that as a blanket affirmation of the details as I didn't give it a thorough review. The one comment I do have is that we can't assume that we have write access to the database to add an index, so I removed that code from the patch (see interdiff.txt below). I'm sure you could add it back conditioned on a test for permission, but I'm not sure you want to go modifying data sources without asking.

mikeryan’s picture

Just a note - I don't have MongoDB data to test this with, or time to construct a test case. If someone not involved in developing the code can give it a good test and mark it rtbc I'll commit it.

Quick scan of the code looks fine to me, only quibble is to use @todo rather than ToDo.

dstol’s picture

mikeryan, I'd be happy to share with you a mongo datasource and a sample migration that I've worked with.

mikeryan’s picture

Status: Needs review » Needs work

@dstol: I found the zips example data, and then noticed that's what @havran's sample code is using, so I'm good.

+++ b/plugins/sources/mongodb.inc
@@ -0,0 +1,199 @@
+  public function __construct(MongoCollection $collection, array $query, array $fields = array(), array $sort = array('_id' => 1), array $options = array()) {

Line break needed (80 chars)

+++ b/plugins/sources/mongodb.inc
@@ -0,0 +1,199 @@
+    // check if index for drupalMigration data exist
+    $create_index = TRUE;

Add an option (defaulting to FALSE) for creating an index

+++ b/plugins/sources/mongodb.inc
@@ -0,0 +1,199 @@
+    // ToDo: delete index if is not neccessary (0 migrated items?)

@todo instead of ToDo, one c in necessary.

The place for index deletion would be postImport(), I think.

+++ b/plugins/sources/mongodb.inc
@@ -0,0 +1,199 @@
+   * Implementation of MigrateSource::getNextRow().

{@inheritdoc}

+++ b/plugins/sources/mongodb.inc
@@ -0,0 +1,199 @@
+   * Implementation of MigrateSource::performRewind().

{@inheritdoc}

+++ b/plugins/sources/mongodb.inc
@@ -0,0 +1,199 @@
+    // If we have existing idlist we use them.

// If we have an existing idlist we use it.

+++ b/plugins/sources/mongodb.inc
@@ -0,0 +1,199 @@
+      // ToDo: find some method for better performance because for now
+      // we get always all (migrated and unmigrated) rows
+      // Now we use method witch require complete and completeRollbacks method
+      // in migration class.
+      $this->query['drupalMigration'] = NULL;

If I follow correctly, the idea is to (in conjuction with the setMongoDocumentMigrationInfo and removeMongoDocumentMigrationInfo methods) write information to the source database to try to optimize the query? I'm... not a fan. For the first pass at this, to get into Migrate 2.6, forget about optimization and just make it work. Then, look at how MigrateSourceSQL optimizes highwater fields and see if a similar approach could be used here when there's a highwater mark.

+    return (string) drupal_json_encode($this->cursor->info());

Please return something human-readable - collection name, representation of query/sort components, ...

havran’s picture

Status: Needs work » Needs review
FileSize
5.08 KB

I attached last version mongodb source plugin which i use in my project. I remove index related code and setMongoDocumentMigrationInfo/removeMongoDocumentMigrationInfo functions because i'm too not fan for this solution.

Part of performRewind() method is refactored because as row id can be used any field from mongodb collection (i use "old_id" field).

For performance optimatization we need still work - i try use highwater approach later in my migration (~650k articles with images, videos, comments and more).

The last submitted patch, migrate-mongodb-source-1890610-11.patch, failed testing.

mikeryan’s picture

Status: Needs review » Needs work
Issue tags: -Performance, -migration, -mongodb

The patch doesn't apply, see under "More git commands" at https://drupal.org/node/707484 how to add a new file in the patch.

Several lines have trailing whitespace which should be removed.

Why are the getMongoId and getSourceKeyNameAndType methods private? Is there a reason to prevent derived classes from overriding them?

dstol’s picture

Issue summary: View changes
Status: Needs work » Needs review
FileSize
5.51 KB

Moving this forward

mikeryan’s picture

Component: Code » Documentation
Status: Needs review » Needs work
Issue tags: +Migrate 2.6

Committed, thanks!

Could someone document this plugin under https://drupal.org/node/1006986?

  • Commit 95c04fc on 7.x-2.x by mikeryan:
    Issue #1890610 by TravisCarden, havran, dstol: Add a MongoDB source...
Dane Powell’s picture

Status: Needs work » Fixed

I documented this at https://drupal.org/node/2286035, so I think this issue is finished.

Status: Fixed » Closed (fixed)

Automatically closed - issue fixed for 2 weeks with no activity.