Coh-Fu

The Discipline of Applied Coherence

Bulk loading a Coherence cache from an Oracle Database

Recently, I’ve worked on a couple of projects that required pre-loading data from an Oracle Database table into an Oracle Coherence cache. There are many ways to accomplish this task, but what I’ve found to work well is to distribute the load processing to the Coherence grid itself. To do this, I use the Coherence Invocation Service.

To get started, let’s look at the SQL query that will be used to retrieve all rows from the underlying table (all columns have type VARCHAR2):

SELECT id, customer_id, zip5, zip4 FROM customer_zip

The class definition for the objects to be cached looks like:

public class CustomerZip
        implements ExternalizableLite, PortableObject
    {
    // ----- constructors ----------------------------------------------------

    public CustomerZip()
        {
        }

    public CustomerZip(String sId, String sCustomerId, String sZip5, String sZip4)
        {
        m_sId = sId;
        m_sCustomerId = sCustomerId;
        m_sZip5 = sZip5;
        m_sZip4 = sZip4;
        }

    // ----- accessors/mutators ----------------------------------------------

    /* removed for brevity */

    // ----- ExternalizableLite interface ------------------------------------

    /* removed for brevity */

    // ----- PortableObject interface ----------------------------------------

    /* removed for brevity */

    // ----- data members ----------------------------------------------------

    private String m_sZip5;
    private String m_sZip4;
    private String m_sCustomerId;
    private String m_sId;
    }

The first approach I tried involved pulling back all of the ids from the table, running them through PartitionedService#getKeyOwner and then submitting a task to each member with the set of ids for that member to load. This method leverages Coherence’s data partitioning to distribute the rows among the loading members. This worked fine in my testing with a small number of rows, but when I applied this to the full data set of over 13 million rows, I quickly ran out of memory trying to query and process all of the ids. In addition, querying and processing the ids takes time.

The second, and final, approach I tried involved pulling back only the row count. Dividing the rows up among the loading members was now simply a matter of establishing the first and last rows to load for each member. I can then use the Oracle pseudocolumn ROWNUM to execute the following query on each member:

SELECT * FROM
  (SELECT a.*, ROWNUM r FROM
    (SELECT id, customer_id, zip5, zip4 FROM customer_zip ORDER BY id) a
  WHERE ROWNUM <= ?)
WHERE r >= ?

This query allows each loading member to specify the last and first rows to load and allows the database to filter out all of the rows outside its range. In my testing, I found that range sizes beyond a certain threshold started performing exponentially slower (perhaps a DB tuning issue, but IANADBA ;-)). You could easily run into this scenario with a large number of rows and a small number of loading members. To handle this situation, I further broke down each member’s range into smaller ranges and had each member execute multiple queries. Processing the results of these queries and performing bulk puts into the cache requires breaking up the results into batches as well. Here’s a look at the code that actually executes the query and inserts the entries into Coherence (to be executed on each loading member). This code is actually part of a CacheLoader implementation that is used for read-through as well. Having the read-through and pre-load logic co-located allows me to share database properties (connection information, SQL statements, etc…).

    public void preload(NamedCache cache, int iFirstRow, int iLastRow,
            int cFetchSize, int cMaxQueryRange)
        {
        String sSqlQuery = ...; // see above
        String sCacheName = cache.getCacheName();
        Connection con = null;
        PreparedStatement stmtPrep = null;
        ResultSet rs = null;
        try
            {
            con = getConnection();
            stmtPrep = con.prepareStatement(sSqlQuery);
            stmtPrep.setFetchSize(cFetchSize);

            // break the query up into batches based on cMaxQueryRange
            int cRows = (iLastRow - iFirstRow) + 1;
            int cBatches = cRows / cMaxQueryRange;
            int cRemaining = cRows % cMaxQueryRange;
            // add additional batch to handle any remainder
            cBatches += cRemaining == 0 ? 0 : 1;

            Map mapBuffer = new HashMap(cFetchSize);
            int iBatchFirstRow;
            int iBatchLastRow = iFirstRow - 1;
            int cRowsLoadedTotal = 0;
            log("Executing preload query in " + cBatches + " batches");
            for (int i = 0; i < cBatches; ++i)
                {
                iBatchFirstRow = iBatchLastRow + 1;
                // last row for the batch or the entire range
                iBatchLastRow = Math.min(iLastRow, iBatchFirstRow + (cMaxQueryRange - 1));
                stmtPrep.setInt(1, iBatchLastRow);
                stmtPrep.setInt(2, iBatchFirstRow);
                rs = stmtPrep.executeQuery();

                // process cFetchSize rows at a time
                while (processResults(rs, mapBuffer, cFetchSize))
                    {
                    cache.putAll(mapBuffer);
                    mapBuffer.clear();
                    }
                rs.close();
                }
            }
        catch (SQLException e)
            {
            log(e);
            throw new RuntimeException(e);
            }
        finally
            {
            close(con, stmtPrep, rs);
            }
        }

    protected boolean processResults(ResultSet rs, Map mapResults, int cFetchSize)
            throws SQLException
        {
        for (int i = 0; i < cFetchSize && rs.next(); ++i)
            {
            // create domain object from single row
            CustomerZip customerZip = createCustomerZip(rs);
            mapResults.put(customerZip.getId(), customerZip);
            }
        return mapResults.size() > 0;
        }

The final piece of required code is the one that generates the ranges for each member and issues each member a task to execute. As I mentioned earlier, I use the Coherence Invocation Service to asynchronously execute a task on each loading member. For my use case, the set of loading members is simply every member running the Invocation Service, except for the member issuing the tasks:

    protected Set<Member> getLoadingMembers(InvocationService serviceInv)
        {
        Set setMembers = serviceInv.getInfo().getServiceMembers();
        setMembers.remove(serviceInv.getCluster().getLocalMember());
        int cMembers = setMembers.size();
        if (cMembers == 0)
            {
            throw new IllegalStateException("No other members are running InvocationService. Is the cluster up?");
            }
        return setMembers;
        }

With the set of members and the total number of rows in the database table, I can now create an Invocation Service task for each member, specifying the range of rows to be loaded by each member:

    protected Map<Member, PreloadTask> generateTasks(Set<Member> setMembers, int cRows)
        {
        Map<Member, PreloadTask> mapTasks =
                new HashMap<Member, PreloadTask>(setMembers.size());

        if (cRows <= m_cFetchSize)
            {
            // for small number of rows, just send the load to one member
            Member member = setMembers.iterator().next();
            PreloadTask task = new PreloadTask(m_sCacheName, 1, cRows,
                    m_cFetchSize, m_cMaxQueryRange);
            mapTasks.put(member, task);
            }
        else
            {
            int cMembers = setMembers.size();
            int cMinRowsPerMember = cRows / cMembers;
            int cRemainingRows = cRows % cMembers;

            int iFirstRow;
            int iLastRow = 0;
            for (Member member : setMembers)
                {
                iFirstRow = iLastRow + 1;
                iLastRow = iFirstRow + cMinRowsPerMember +
                        (cRemainingRows-- > 0 ? 1 : 0) - 1;
                PreloadTask task = new PreloadTask(m_sCacheName, iFirstRow,
                        iLastRow, m_cFetchSize, m_cMaxQueryRange);
                mapTasks.put(member, task);
                }
            }

        return mapTasks;
        }

The final step is to asynchronously invoke each member’s task, and then wait for all of them to finish. I use a CountDownLatch and an InvocationObserver to track the completion of all tasks:

    public void preloadCache()
        {
        final String sCacheName = "CustomerZipCache";
        int cRows = getRowCount();
        InvocationService serviceInv = (InvocationService)
                CacheFactory.getService("InvocationService");
        long ldtStart = System.currentTimeMillis();
        Set<Member> setLoadingMembers = getLoadingMembers(serviceInv);
        Map<Member, PreloadTask> mapMemberTasks = generateTasks(setLoadingMembers, cRows);

        // prepare the invocation observer
        int cTasks = mapMemberTasks.size();
        final CountDownLatch latch = new CountDownLatch(cTasks);
        InvocationObserver observer = new InvocationObserver()
            {
            public void memberCompleted(Member member, Object oResult)
                {
                latch.countDown();
                log(String.format("%s: load finished on %s", sCacheName, member.toString()));
                }

            public void memberFailed(Member member, Throwable eFailure)
                {
                // TODO: resubmit tasks due to transient failures
                latch.countDown();
                log(String.format("%s: load failed on %s", sCacheName, member.toString()));
                CacheFactory.log(eFailure);
                }

            public void memberLeft(Member member)
                {
                // TODO: resubmit to a member that is up
                latch.countDown();
                log(String.format("%s: member left before load finished (%s)", sCacheName, member.toString()));
                }

            public void invocationCompleted()
                {
                log(String.format("%s: invocation has completed", sCacheName));
                }
            };

        // asynchronously execute each member's task
        for (Map.Entry<Member, PreloadTask> entry : mapMemberTasks.entrySet())
            {
            Member member = entry.getKey();
            Set setTaskMembers = Collections.singleton(member);
            PreloadTask task = entry.getValue();
            serviceInv.execute(task, setTaskMembers, observer);
            log(String.format("%s: rows %d-%d sent to %s",
                    sCacheName, task.getFirstKey(), task.getLastKey(), member.toString()));
            }

        // wait for all tasks to finish
        try
            {
            latch.await();
            }
        catch (InterruptedException e)
            {
            }
        long lDurationMillis = System.currentTimeMillis() - ldtStart;
        log(String.format("%s: pre-loaded %d rows in %.3f secs (%.3f rows/sec)",
                sCacheName, cRows, lDurationMillis / 1000.0,
                cRows / (lDurationMillis / 1000.0)));
        NamedCache cache = CacheFactory.getCache(sCacheName);
        log(String.format("%s: final size is %d", sCacheName, cache.size()));
        }

If you’re reading carefully, you’ll see that I am actually issuing database queries from two logical places: the grid client that generates the tasks and the grid members executing the load. I mentioned earlier that I’m sharing database parameters between read-through and pre-load by using a CacheLoader. I will have to save the details of how I achieve that sharing for another post.

About these ads

January 6, 2010 - Posted by | Loading | , , ,

7 Comments »

  1. [...] we have Coherence architect Andy Nguyen debuting with a detailed description of a sophisticated distributed bulk loading technique he’s employed on several customer [...]

    Pingback by ~pperalta » Blog Archive » What’s happening in the world of Coherence? | February 19, 2010 | Reply

  2. Great blog! Keep’m coming!

    Comment by warpdriv | August 18, 2010 | Reply

  3. Very useful!

    Comment by Ashwin Jayaprakash | September 10, 2010 | Reply

  4. Hi can you post the PreloadTask source code? Thx in advance

    Comment by kobe820 | October 26, 2011 | Reply

  5. Great post and use of the invocation service!

    However, I think this pattern suffers from edge cases if the data in the table is changing while you are warming your grid. This is because rownum is not fixed to a particular row within a database i.e. it can change as rows are inserted or deleted. This can result in some rows being loaded twice and some not at all, if the table has entries being inserted/deleted during the load.

    But worry not, there is a small tweak to your pattern that will fix the issue! Assuming you have a integer based primary key you can get around this problem by using the PK, which is tied to a particular entry, rather than rownum, which is not.

    Even better, you can use the PK to chunk up your data by using the modulus of the PK. Say you have 10 nodes in your grid, you can get each node to load one chunk of data by assigning each a unique Id (from 0 to 9) and then have each load all rows which the remainder of (PK / number of chunks i.e. 10) equals their id. So, for example, ‘node 0′ would load any rows where the PK ends in a zero, ‘node 1′ will load rows ending in a 1, etc. This works for any number of nodes and you can actually break the data into smaller chunks if you want. All you need to know is the total number of chunks and which chunk you want to load – very easy to distribute.

    Comment by Andrew Coates | October 4, 2012 | Reply

    • You’re right. I did not consider that edge case. Great suggestion!

      Comment by acnguyen | October 4, 2012 | Reply

  6. Thanks great post. But how about if one in a kind of situation like memberfailed or memberLeft? Maybe in memberfailed, a retry can be useful and for memberleft there can be a definition to give the task to another one to load it

    Comment by Ali | November 13, 2012 | Reply


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: