How to import / export data with an ETL Command using Symfony2 (in TDD)

When you manage data on a regular basis, it can be very useful to write a
command to facilitate this repetitive tasks. The crude way to do it would
be to let the command do the job alone. But this isn’t a clean way to
do it. Why? Because you wouldn’t be able to write a test for your command.
It would be a black box that interact with some critical component of your
project. There is a solution: using ETL services in your command, and that’s
what we’re are going to show you here.

Using ETL services means defining 3 services: an extractor to get the data
from a source, a transformer to adapt the data to its new use and a loader
to update or create the data in the target, each services passing the data
to the next one.

Of course this method has a wide range of application, from editing files
to migrating database or importing/exporting data. As example I’ll handle
the migration of article from a blog: my article are in a legacy mysql
database, they contain a content and an author id that link them to an user
and I have already migrated the user and stored their legacy id. I choose
to use raw SQL for performance issue but this would also work with ORM.

Command

Because I use ETL I already know how the command will work:

  • I start by loading the 3 services.
  • I extract the data using an extract() method from the extractor.
  • I’ll load a row of data to process with nextRow. The row will be put in a parameter bag.
  • While there is rows to process I try to iterate the following:
  • Transforming the data with a transform($data) method from the transformer.
  • Loading the data with a load($data) method from the loader.
  • Outputing a nice success message and go on to the next row.
  • If this process fail, I catch the error and output a faillure message. Then I go on to the next row.

I chose to extract the data piece by piece instead of all in one shoot to
use less memory. It also allows me to log some information of the migration
as it goes on, even in the case of a failure. If something goes wrong with
a piece of data (for example the author of an article cannot be found in the
new database), the transformer or the loader will throw an exception with
an explicit message.

Now that I know what process I expect, I can write the command before writing
any problem specific code.

class MigrateArticleCommand extends Command
{
        protected function configure()
        {
                $this->setName('migration:article')
        }

        protected function execute(InputInterface $input, OutputInterface $output)
        {
                $container = $this->getContainer();

                $extractor = $container->get('extractor.article');
                $transformer = $container->get('transformer.article');
                $loader = $container->get('loader.article');

                $extractor->extract();

                while ($article = $extractor->nextRow()) {
                        try {
                                $transformer->transform($article);
                                $loader->load($article);
                                $this->output->writeln('<info>[Success]  Article #'.$article->getLegacyId().' migrated</info>');
                        } catch (\Exception $e) {
                                $this->output->writeln('<error>[Failure] '.$e->getMessage().'</error>');
                        }
                }
        }
}

Extractor in TDD

Let’s write the extractor TDD style using Phake.

At first I have to make sure I execute the right query. I write a test for
it.

class ArticleExtractorTest extends \PHPUnit_Framework_TestCase
{
        private $extractor;
        private $connection;
        private $statement;

        public function setUp()
        {
                $this->extractor = new ArticleExtractor($this->connection);
                $this->connection = Phake::mock('Doctrine\DBAL\Connection');
                $this->statement = Phake::mock('Doctrine\DBAL\Driver\PDOStatement');
                Phake::when($this->connection)->executeQuery(Phake::anyParameters())->thenReturn($this->statement);
        }

        /**
         * Test if the right queries happen using some regex
         */
        public function testQueries()
        {
                $this->extractor->extract();

                $regexArray[] = '#SELECT.*FROM articles#is'; //I do a select in the right table
                $regexArray[] = '#AS author_legacy_id#is'; //I get the author legacy id
                $regexArray[] = '#AS content#is'; //I get the content
                // I could define more regex if I use a more complex query

                Phake::verify($this->connection, Phake::times(1))->executeQuery(Phake::capture($query));
                //I have tested that I did only one query and captured it

                foreach($regexArray as $regex) {
                        $this->assertRegExp($regex, $query);
                }
        }
}

With this test as starter, I can begin writing the actual functionnal
code.

class ArticleExtractor
{
        private $connection;
        private $statement;

        public function __construct(\Doctrine\DBAL\Connection $connection)
        {
                $this->connection = $connection;
        }

        public function extract()
        {
                $this->statement = $this->connection->executeQuery($this->getQuery());
        }

        private function getQuery()
        {
                return 'SELECT article_content AS content, author_id AS author_legacy_id, id AS legacy_id FROM articles';
        }
}

I now add the following tests about the expected result:

class ArticleExtractorTest extends \PHPUnit_Framework_TestCase
{
        // [...]

        /**
         * Test if the paramerter bag is correctly set when some data are extracted using a dataprovider
         * @dataProvider providerDummyStatementFetch
         */
        public function testResult($data)
        {
                Phake::when($this->statement)->fetch()->thenReturn($data);
                $this->extractor->extract();

                $result = $this->extractor->nextRow();
                $this->assertTrue($result instanceof Article);
                $this->assertEquals($data['content'], $result->getContent());
                $this->assertEquals($data['author_legacy_id'], $result->getAuthorLegacyId());
        }

        /**
         * Data provider
         */
        public function providerDummyStatementFetch()
        {
                return array(
                        array('legacy_id' => 1, 'author_legacy_id' => 1, 'content' => 'should be ok'),
                );
        }
}

I add the nextRow() method to my Extractor:

class ArticleExtractor
{
        // [...]

        public function nextRow()
        {
                if ($data = $this->statement->fetch()) {
                        $result = new Article();
                        $result->hydrate($data)

                        return $result;
                }

                return null;
        }
}

And the associated parameter bag for the result:

class Article
{
        private $authorLegacyId;
        private $content;
        private $legacyId;

        //Write the getter and setter

        public function hydrate($data)
        {
                $this->setContent($data['content']);
                $this->setLegacyId($data['legacyId']);
                $this->setAuthorLegacyId($data['authorLegacyId']));
        }
}

Thats good but what should happen if the fetched data are incorrect? I add 2
test cases to my data provider. One with an empty content and an other with a
negtiv id. I expect an error.

class ArticleExtractorTest extends \PHPUnit_Framework_TestCase
{
        // [...]

        /**
         * Test if the paramerter bag is correctly set when some data are extracted using a dataprovider
         * @dataProvider providerDummyStatementFetch
         */
        public function testResult($data)
        {
                Phake::when($this->statement)->fetch()->thenReturn($data);
                $this->extractor->extract();

                try {
                        $result = $this->extractor->nextRow();
                        $this->assertTrue($result instanceof Article);
                        $this->assertEquals($data['content'], $result->getContent());
                        $this->assertEquals($data['author_legacy_id'], $result->getAuthorLegacyId());
                } catch (\Exception $e) {
                        if ($data['legacy_id'] == -1) {
                                $this->assertEquals($e->getMessage(), 'Invalid legacy id')
                        }
                        if ($data['author_legacy_id'] == -1) {
                                $this->assertEquals($e->getMessage(), 'Invalid author legacy id')
                        }
                        if ($data['content'] == '') {
                                $this->assertEquals($e->getMessage(), 'Invalid content')
                        }
                }
        }

        /**
         * Data provider
         */
        public function providerDummyStatementFetch()
        {
                return array(
                        array('legacy_id' => 1,  'author_legacy_id' => 1,  'content' => 'should be ok'),
                        array('legacy_id' => -1, 'author_legacy_id' => 3,  'content' => 'negativ id error'),
                        array('legacy_id' => 1,  'author_legacy_id' => -1, 'content' => 'negativ author id error'),
                        array('legacy_id' => 1,  'author_legacy_id' => 3,  'content' => ''),
                );
        }
}

The parameter bag should check the validity of the data, I edit its
hydrate method:

class Article
{
        // [...]

        public function hydrate($data)
        {
                if (!isset($data['content']) OR empty($data['content'])) {
                        throw new \InvalidArgumentException('Invalid content');
                }
                if (!isset($data['authorLegacyId']) OR $data['authorLegacyId'] < 1) {
                        throw new \InvalidArgumentException('Invalid author legacy id');
                }
                if (!isset($data['legacyId']) OR $data['legacyId'] < 1) {
                        throw new \InvalidArgumentException('Invalid legacy id');
                }

                $this->setContent($data['content']);
                $this->setLegacyId($data['legacyId']);
                $this->setAuthorLegacyId($data['authorLegacyId']));
        }
}

And that’s all. I have an Extractor, a parameter bag and tests for it.

All I have to do now is to define this class as a service. There are multiple
ways to do it, just follow the book. Don’t forget to add a connection to the
legacy database as parameter.

Transformer and Loader

The process for the transformer and the loader is exactly the same. Here is
the code, you can write some tests using the same method as for the extractor.

This time when you define the services use a connection to the new database
as parameter:

class ArticleTransformer
{
        private $connection;

        public function __construct(\Doctrine\DBAL\Connection $connection)
        {
                $this->connection = $connection;
        }

        public function transform($article) {
                $article->setAuthorId($this->defineAuthorId($article->getAuthorLegacyId));
        }

        private function defineAuthorId($legacyId)
        {
                $query = 'SELECT id FROM user WHERE legacy_id = :legacyId';
                $queryData = array('legacyId' => $legacyId);
                $id = $this->connection->fetchColumn($query, $queryData, 0);

                return $id;
        }
}
class ArticleLoader
{
        private $connection;

        public function __construct(\Doctrine\DBAL\Connection $connection)
        {
                $this->connection = $connection;
        }

        public function load($article)
        {
                if (!$article->isValid()) {
                        throw new \InvalidArgumentException('Wrong format');
                }

                try {
                        $this->connection->transactional(function ($connection) use ($article) {

                                $queryData = $this->getQueryData($article);
                                $id = $connection->fetchColumn($this->getUpdateQuery(), $queryData, 0);

                                //if couldn't update then insert
                                if (0 >= $id) {
                                        $connection->fetchColumn($this->getInsertQuery(), $queryData, 0);
                                }
                        });
                } catch (\Exception $e) {
                        throw new \InvalidArgumentException('Fail migrating article #'.$article>getLegacyId()."\n".'Connection error message : '.$e->getMessage());
                }
        }

        private function getQueryData($article)
        {
                return array(
                        'legacy_id' => $article->getLegacyId(),
                        'authorId' => $article->getAuthorId(),
                        'content' => $article->getContent(),
                );
        }

        private function getUpdateQuery()
        {
                return 'UPDATE article SET author = :authorId, content = :content WHERE legacy_id = :legacyId RETURNING id';
        }

        private function getInsertQuery()
        {
                return 'INSERT INTO article (id, author, content, legacy_id) VALUES ((nextval('article_id_seq'), :authorId, :content, :legacyId) RETURNING id)';
        }
}

I also had to update the parameter bag by adding an authorId field and
an isValid() method:

class Article
{
        private $authorLegacyId;
        private $authorId;
        private $content;
        private $legacyId;

        //Write the getter and setter

        public function hydrate($data)
        {
                if (!isset($data['content']) OR empty($data['content'])) {
                        throw new \InvalidArgumentException('Invalid content');
                }
                if (!isset($data['authorLegacyId']) OR $data['authorLegacyId'] < 1) {
                        throw new \InvalidArgumentException('Invalid author legacy id');
                }
                if (!isset($data['legacyId']) OR $data['legacyId'] < 1) {
                        throw new \InvalidArgumentException('Invalid legacy id');
                }

                $this->setContent($data['content']);
                $this->setLegacyId($data['legacyId']);
                $this->setAuthorLegacyId($data['authorLegacyId']));
        }

        public function isValid()
        {
                return  ! empty($this->content) AND $this->authorId > 0;
        }
}

Writing test when using a transactionnal

One problem you may have when trying to write tests for the loader
is that it uses a transactionnal, which encapsulates the queries.

Here is a way around. Let’s assume I have already phaked a connection
in $this->connection and have a loader ($this->loader):

/**
 * @param HabitationLegalEntityResult $data
 * @dataProvider providerDummyTransformerResult
 */
public function testQueries(HabitationLegalEntityResult $data)
{
        //Update everytime
        Phake::when($this->connection)->fetchColumn((Phake::anyParameters()), (Phake::anyParameters()), 0)->thenReturn(1);

        $this->loader->load($data, false);
        Phake::verify($this->connection, Phake::times(1))->transactional(Phake::capture($trans));
        call_user_func($trans, $this->connection);

        Phake::verify($this->connection, Phake::times(1))->fetchColumn(
                Phake::capture($query)->when($this->matchesRegularExpression('#^UPDATE#is')),
                (Phake::capture($queryData)),
                0
        );

        //I can do test with the $query and the $queryData of the Update Query

        //Has to do insert because Phake update will return -1 then insert will return 1
        Phake::when($this->connection)->fetchColumn((Phake::anyParameters()), (Phake::anyParameters()), 0)
                ->thenReturn(-1)->thenReturn(1);

        $this->loader->load($data, false);
        Phake::verify($this->connection, Phake::times(2))->transactional(Phake::capture($trans)); // second time transactional is called in the test
        call_user_func($trans, $this->connection);

        Phake::verify($this->connection, Phake::times(1))->fetchColumn(
                Phake::capture($query)->when($this->matchesRegularExpression('#^INSERT INTO#is')),
                (Phake::capture($queryData)),
                0
        );
        //I can do test with the $query and the $queryData of the Insert Query
}

Conclusion

You now know the basic of implementing ETL services command. As you have seen
the layout for it is very unspecific. This allows you to write abstract class
to factor the code if you plan to use multiple ETL command or simply to start
coding an almost functionnal command even if you are still not sure of how
to get or load the data.


You liked this article? You'd probably be a good match for our ever-growing tech team at Theodo.

Join Us

  • Stof

    This is an interesting approach, but I see 1 design issue in your extractor:
    it is both the service, and the result of the extraction. this means that this service is stateful and cannot be used several times safely.
    A better way to implement it would be to make extract() return an iterator.

  • You can check the exporter lib from the sonata-project with can be used to import/export data.

    This will avoid to reinvent the weel and as stof said, this will real stateless services 😉

    https://github.com/sonata-project/exporter

  • Rénald Casagraude

    Small typo:
    \InvalideArgumentException

    Should be:
    \InvalidArgumentException

  • Marek Kalnik

    Thanks Rénald, fixed.