Coh-Fu

The Discipline of Applied Coherence

Visualizing the Coherence Datagram Test

Figure 1

The graph above was generated from the output of the Coherence Datagram Test utility. The Coherence Datagram Test is a tool that sends and receives UDP packets between two ore more machines to evaluate the health and performance of the network between those machines. The above test was run for 100 secs on two server-class machines with a 1 Gb Ethernet connection to the same switch. I think it’s pretty clear from the graph that there is significant packet loss between the two machines. Here’s what the graph looks like on a healthy network:

Figure 2

The difference between the two graphs is very clear. The Coherence Production Checklist suggests running the test before deploying a Coherence application into a real environment. I have found that most users have a hard time interpreting the output of the test, but I think most users who have run the Datagram Test would agree that the above graphs are much easier to understand. The Datagram Test can help identify some types of problems in a network that could adversely affect a Coherence application, such as packet loss. Here, I will describe how I generated these graphs, which can come in handy when analyzing a large number of tests results.

The first step is to actually run the Datagram Test to generate report data:

server1$ java -server -cp coherence.jar com.tangosol.net.DatagramTest -local 192.168.1.100 -log 192.168.1.100.log -txDurationMs 100000 -polite 192.168.1.101
server2$ java -server -cp coherence.jar com.tangosol.net.DatagramTest -local 192.168.1.101 -log 192.168.1.101.log -txDurationMs 100000 192.168.1.100

The above pair of commands will run a bi-directional test for 100 seconds, generating a tab-delimited report in the file specified by -log. As of Coherence 3.6, the tab-delimited report spits out aggregated lifetime (since the test began) metrics every 100,000 (by default) received packets. For analyzing packet loss, it makes more sense to look at the metrics accumulated between reporting intervals rather than since the beginning of the test, since lifetime metrics could mask spikes that occur later in the test. Luckily, the per interval metrics we need to look at can be derived from the lifetime metrics. The following awk script will calculate the additional columns of interest (as well as fix a bug in the test where the data columns don’t align with the header columns due to two missing delimiters):

#!/usr/bin/awk -f
BEGIN {
    FS = "[\t\r\n]";
}

# Header line
/^publisher/ {
    if (FILENAME == "") {
        FILENAME = "stdin";
    }
    else {
        print("Processing " FILENAME);
    }
    gsub(/[\r\n]/, "", $0);
    header = sprintf("%s\tinterval duration secs\tinterval missing packets\tinterval drop rate\tinterval success rate\tinterval throughput mb/sec", $0);
    for (outfile in aOutfile) {
        close(aOutfile[outfile]);
    }
    delete aPrevSent;
    delete aPrevReceived;
    delete aPrevMissing;
    delete aPrevDurationMillis;
    delete aDurationOffset;
    delete aOutfile;
    next;
}

# Initialize prev values
aPrevSent[$1] == ""  {
    aPrevSent[$1] = 0;
		aPrevReceived[$1] = 0;
    aPrevMissing[$1] = 0;
    aPrevDurationMillis[$1] = 0;
    aDurationOffset[$1] = 0;
    aOutfile[$1] = FILENAME "." substr($1, 2, length($1))  ".csv";
    if (aOutfile[$1] ~ /^stdin/) {
        print(header);
    }
    else {
        print(header) > aOutfile[$1];
    }
}

# Account for packet sequence restart
$2 < aPrevDurationMillis[$1] {
    aPrevSent[$1] = 0;
    aPrevReceived[$1] = 0;
    aPrevMissing[$1] = 0;
    aDurationOffset[$1] += aPrevDurationMillis[$1];
}

# Skip duplicate lines
$2 == aPrevDurationMillis[$1] {
    next;
}

{
    split($11, aOoo, /^[0-9]/);
    sOoo = sprintf("%s\t%s", substr($11, 1, 1), aOoo[2]);

    split($13, aGapMillis, /^[0-9]/);
    sGapMillis = sprintf("%s\t%s", substr($13, 1, 1), aGapMillis[2]);

    cIntervalDurationMillis = $2 - aPrevDurationMillis[$1];
    cIntervalSent = $6 - aPrevSent[$1];
    cIntervalReceived = $7 - aPrevReceived[$1];
    cIntervalMissing = $8 - aPrevMissing[$1];
    dflIntervalDropRate = cIntervalMissing / cIntervalSent;
    dflIntervalSuccessRate = 1 - dflIntervalDropRate;
		dflIntervalThroughput = (($3 * cIntervalReceived) / (cIntervalDurationMillis / 1000)) / (1024 * 1024);

    aPrevDurationMillis[$1] = $2;
    aPrevSent[$1] = $6;
    aPrevReceived[$1] = $7;
    aPrevMissing[$1] = $8;

    if (aOutfile[$1] ~ /^stdin/) {
        printf("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%.3f\t%d\t%f\t%f\t%d\n",
                $1, $2 + aDurationOffset[$1], $3, $4, $5, $6, $7, $8, $9, $10, sOoo, $12, sGapMillis,
                cIntervalDurationMillis / 1000, cIntervalMissing, dflIntervalDropRate, dflIntervalSuccessRate, dflIntervalThroughput);
    }
    else {
        printf("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%.3f\t%d\t%f\t%f\t%d\n",
                $1, $2 + aDurationOffset[$1], $3, $4, $5, $6, $7, $8, $9, $10, sOoo, $12, sGapMillis,
                cIntervalDurationMillis / 1000, cIntervalMissing, dflIntervalDropRate, dflIntervalSuccessRate, dflIntervalThroughput) > aOutfile[$1];
    }
}

This script will take the output of the -log option and produce a new file. Assuming you save the contents of the above script to augment-datagram-test.awk and set the execute bit, you can use the script as follows:

server1$ ./augment-datagram-test.awk 192.168.1.101.log

The above command will generate a new file called 192.168.1.101.log.192.168.1.100:10000.csv which contains the additional columns “interval duration secs”, “interval missing packets”, “interval drop rate”, “interval success rate” and “interval throughput mb/sec”. The script will produce one csv file for each publisher present in the tab-delimited report. The script will also accept multiple tab-delimited files as input, processing each one independently, and can also accept input piped through stdin (with output going to stdout).

To actually generate the graphs, I use R. I encountered R earlier this year working with a customer, but didn’t have the chance to play around with it myself. Before I decided to use R, I was taking the output from my awk script and importing into a spreadsheet application and then generating graphs. This proved to be quite tedious and involved too many mouse clicks for my taste, so I turned to R to let me script the process and eliminate the need for a spreadsheet application altogether. R is also much more flexible when it comes to producing graphs, as you have complete control over the plot area. After a few days of playing around with R, I was able to come up with the following script to generate the graphs seen at the beginning of this post:

args <- commandArgs(TRUE)
for (file in args)
{
    outfile <- paste(file, ".png", sep = "")
    cat("Plotting ", file, " as ", outfile, "\n", sep = "")
    
    # Read and process input file
    dgt     <- read.table(file, header = TRUE, sep = "\t")
    x       <- dgt$duration.ms / 1000
    y       <- dgt$interval.drop.rate * 100
    x.range <- c(0, max(x))
    y.range <- c(0, max(y, 20))
    nonzero <- which(y > 0)
    loss.intervals   <- (length(nonzero) / length(y)) * 100
    throughput.range <- c(0, max(dgt$interval.throughput.mb.sec, 120))
    title <- sub("\\.log\\.", " <- ", file)
    title <- sub("\\.csv", "", title)

    # Create plot as PNG
    png(filename = outfile, height = 400, width = 600, bg = "white")

    # Set margins to make room for right-side axis labels
    par(mar = c(7,5,4,5) + 0.1)

    # Plot packet loss line
    plot(x, y, type = "l", main = title, xlab = "Time (secs)", ylab = "Loss (%)",
            col = "blue", xlim = x.range, ylim = y.range, lwd = 2)

    # Circle points where packet loss > 0
    points(x[nonzero], y[nonzero], cex=1.5)

    # Plot throughput line
    lines(x, dgt$interval.throughput.mb.sec * (y.range[2] / throughput.range[2]),
            col = "green", lwd = 2)

    # Create right-side axis labels and tick marks
    axis(4, at = y.range[2] * c(0:4) / 4,
            labels = (throughput.range[2] / 4) * c(0:4))
    mtext("Throughput (MB/s)", side = 4, line = 3)

    # Draw the background grid lines
    grid()

    # Report the number of intervals that experienced loss (as a %)
    mtext(sprintf("Intervals w/ Loss: %.2f%%", loss.intervals), side = 1,
            line = 3, adj = 1)

    # Create the legend at the bottom
    legend("bottom", inset = -0.4, c("loss", "throughput"),
            col = c("blue", "green"), lty = 1, lwd = 2, bty = "n", horiz = TRUE,
            xpd = TRUE)

    # Close the PNG
    dev.off()
}

Assuming you save the contents of the above script as plot-datagram.r, you can invoke the script as follows:

server1$ r -q --slave -f plot-datagram.r --args 192.168.1.101.log.192.168.1.100:10000.csv

The output from the above command will be a new file called 192.168.1.101.log.192.168.1.100:10000.csv.png which represents a graph of both packet loss and throughput over the duration of the test. The circles indicate intervals where packet loss occurred. This script can also accept multiple files as input, generating a graph for each in a separate file.

With both scripts in hand, generating graphs to visualize packet loss from the output of the Datagram Test can be done in a few seconds:

server1$ ./augment-datagram-test.awk *.log
server1$ r -q --slave -f plot-datagram.r --args *.csv

December 13, 2010 Posted by | General | 2 Comments

Bulk loading a Coherence cache from an Oracle Database

Recently, I’ve worked on a couple of projects that required pre-loading data from an Oracle Database table into an Oracle Coherence cache. There are many ways to accomplish this task, but what I’ve found to work well is to distribute the load processing to the Coherence grid itself. To do this, I use the Coherence Invocation Service.

To get started, let’s look at the SQL query that will be used to retrieve all rows from the underlying table (all columns have type VARCHAR2):

SELECT id, customer_id, zip5, zip4 FROM customer_zip

The class definition for the objects to be cached looks like:

public class CustomerZip
        implements ExternalizableLite, PortableObject
    {
    // ----- constructors ----------------------------------------------------

    public CustomerZip()
        {
        }

    public CustomerZip(String sId, String sCustomerId, String sZip5, String sZip4)
        {
        m_sId = sId;
        m_sCustomerId = sCustomerId;
        m_sZip5 = sZip5;
        m_sZip4 = sZip4;
        }

    // ----- accessors/mutators ----------------------------------------------

    /* removed for brevity */

    // ----- ExternalizableLite interface ------------------------------------

    /* removed for brevity */

    // ----- PortableObject interface ----------------------------------------

    /* removed for brevity */

    // ----- data members ----------------------------------------------------

    private String m_sZip5;
    private String m_sZip4;
    private String m_sCustomerId;
    private String m_sId;
    }

The first approach I tried involved pulling back all of the ids from the table, running them through PartitionedService#getKeyOwner and then submitting a task to each member with the set of ids for that member to load. This method leverages Coherence’s data partitioning to distribute the rows among the loading members. This worked fine in my testing with a small number of rows, but when I applied this to the full data set of over 13 million rows, I quickly ran out of memory trying to query and process all of the ids. In addition, querying and processing the ids takes time.

The second, and final, approach I tried involved pulling back only the row count. Dividing the rows up among the loading members was now simply a matter of establishing the first and last rows to load for each member. I can then use the Oracle pseudocolumn ROWNUM to execute the following query on each member:

SELECT * FROM
  (SELECT a.*, ROWNUM r FROM
    (SELECT id, customer_id, zip5, zip4 FROM customer_zip ORDER BY id) a
  WHERE ROWNUM <= ?)
WHERE r >= ?

This query allows each loading member to specify the last and first rows to load and allows the database to filter out all of the rows outside its range. In my testing, I found that range sizes beyond a certain threshold started performing exponentially slower (perhaps a DB tuning issue, but IANADBA ;-)). You could easily run into this scenario with a large number of rows and a small number of loading members. To handle this situation, I further broke down each member’s range into smaller ranges and had each member execute multiple queries. Processing the results of these queries and performing bulk puts into the cache requires breaking up the results into batches as well. Here’s a look at the code that actually executes the query and inserts the entries into Coherence (to be executed on each loading member). This code is actually part of a CacheLoader implementation that is used for read-through as well. Having the read-through and pre-load logic co-located allows me to share database properties (connection information, SQL statements, etc…).

    public void preload(NamedCache cache, int iFirstRow, int iLastRow,
            int cFetchSize, int cMaxQueryRange)
        {
        String sSqlQuery = ...; // see above
        String sCacheName = cache.getCacheName();
        Connection con = null;
        PreparedStatement stmtPrep = null;
        ResultSet rs = null;
        try
            {
            con = getConnection();
            stmtPrep = con.prepareStatement(sSqlQuery);
            stmtPrep.setFetchSize(cFetchSize);

            // break the query up into batches based on cMaxQueryRange
            int cRows = (iLastRow - iFirstRow) + 1;
            int cBatches = cRows / cMaxQueryRange;
            int cRemaining = cRows % cMaxQueryRange;
            // add additional batch to handle any remainder
            cBatches += cRemaining == 0 ? 0 : 1;

            Map mapBuffer = new HashMap(cFetchSize);
            int iBatchFirstRow;
            int iBatchLastRow = iFirstRow - 1;
            int cRowsLoadedTotal = 0;
            log("Executing preload query in " + cBatches + " batches");
            for (int i = 0; i < cBatches; ++i)
                {
                iBatchFirstRow = iBatchLastRow + 1;
                // last row for the batch or the entire range
                iBatchLastRow = Math.min(iLastRow, iBatchFirstRow + (cMaxQueryRange - 1));
                stmtPrep.setInt(1, iBatchLastRow);
                stmtPrep.setInt(2, iBatchFirstRow);
                rs = stmtPrep.executeQuery();

                // process cFetchSize rows at a time
                while (processResults(rs, mapBuffer, cFetchSize))
                    {
                    cache.putAll(mapBuffer);
                    mapBuffer.clear();
                    }
                rs.close();
                }
            }
        catch (SQLException e)
            {
            log(e);
            throw new RuntimeException(e);
            }
        finally
            {
            close(con, stmtPrep, rs);
            }
        }

    protected boolean processResults(ResultSet rs, Map mapResults, int cFetchSize)
            throws SQLException
        {
        for (int i = 0; i < cFetchSize && rs.next(); ++i)
            {
            // create domain object from single row
            CustomerZip customerZip = createCustomerZip(rs);
            mapResults.put(customerZip.getId(), customerZip);
            }
        return mapResults.size() > 0;
        }

The final piece of required code is the one that generates the ranges for each member and issues each member a task to execute. As I mentioned earlier, I use the Coherence Invocation Service to asynchronously execute a task on each loading member. For my use case, the set of loading members is simply every member running the Invocation Service, except for the member issuing the tasks:

    protected Set<Member> getLoadingMembers(InvocationService serviceInv)
        {
        Set setMembers = serviceInv.getInfo().getServiceMembers();
        setMembers.remove(serviceInv.getCluster().getLocalMember());
        int cMembers = setMembers.size();
        if (cMembers == 0)
            {
            throw new IllegalStateException("No other members are running InvocationService. Is the cluster up?");
            }
        return setMembers;
        }

With the set of members and the total number of rows in the database table, I can now create an Invocation Service task for each member, specifying the range of rows to be loaded by each member:

    protected Map<Member, PreloadTask> generateTasks(Set<Member> setMembers, int cRows)
        {
        Map<Member, PreloadTask> mapTasks =
                new HashMap<Member, PreloadTask>(setMembers.size());

        if (cRows <= m_cFetchSize)
            {
            // for small number of rows, just send the load to one member
            Member member = setMembers.iterator().next();
            PreloadTask task = new PreloadTask(m_sCacheName, 1, cRows,
                    m_cFetchSize, m_cMaxQueryRange);
            mapTasks.put(member, task);
            }
        else
            {
            int cMembers = setMembers.size();
            int cMinRowsPerMember = cRows / cMembers;
            int cRemainingRows = cRows % cMembers;

            int iFirstRow;
            int iLastRow = 0;
            for (Member member : setMembers)
                {
                iFirstRow = iLastRow + 1;
                iLastRow = iFirstRow + cMinRowsPerMember +
                        (cRemainingRows-- > 0 ? 1 : 0) - 1;
                PreloadTask task = new PreloadTask(m_sCacheName, iFirstRow,
                        iLastRow, m_cFetchSize, m_cMaxQueryRange);
                mapTasks.put(member, task);
                }
            }

        return mapTasks;
        }

The final step is to asynchronously invoke each member’s task, and then wait for all of them to finish. I use a CountDownLatch and an InvocationObserver to track the completion of all tasks:

    public void preloadCache()
        {
        final String sCacheName = "CustomerZipCache";
        int cRows = getRowCount();
        InvocationService serviceInv = (InvocationService)
                CacheFactory.getService("InvocationService");
        long ldtStart = System.currentTimeMillis();
        Set<Member> setLoadingMembers = getLoadingMembers(serviceInv);
        Map<Member, PreloadTask> mapMemberTasks = generateTasks(setLoadingMembers, cRows);

        // prepare the invocation observer
        int cTasks = mapMemberTasks.size();
        final CountDownLatch latch = new CountDownLatch(cTasks);
        InvocationObserver observer = new InvocationObserver()
            {
            public void memberCompleted(Member member, Object oResult)
                {
                latch.countDown();
                log(String.format("%s: load finished on %s", sCacheName, member.toString()));
                }

            public void memberFailed(Member member, Throwable eFailure)
                {
                // TODO: resubmit tasks due to transient failures
                latch.countDown();
                log(String.format("%s: load failed on %s", sCacheName, member.toString()));
                CacheFactory.log(eFailure);
                }

            public void memberLeft(Member member)
                {
                // TODO: resubmit to a member that is up
                latch.countDown();
                log(String.format("%s: member left before load finished (%s)", sCacheName, member.toString()));
                }

            public void invocationCompleted()
                {
                log(String.format("%s: invocation has completed", sCacheName));
                }
            };

        // asynchronously execute each member's task
        for (Map.Entry<Member, PreloadTask> entry : mapMemberTasks.entrySet())
            {
            Member member = entry.getKey();
            Set setTaskMembers = Collections.singleton(member);
            PreloadTask task = entry.getValue();
            serviceInv.execute(task, setTaskMembers, observer);
            log(String.format("%s: rows %d-%d sent to %s",
                    sCacheName, task.getFirstKey(), task.getLastKey(), member.toString()));
            }

        // wait for all tasks to finish
        try
            {
            latch.await();
            }
        catch (InterruptedException e)
            {
            }
        long lDurationMillis = System.currentTimeMillis() - ldtStart;
        log(String.format("%s: pre-loaded %d rows in %.3f secs (%.3f rows/sec)",
                sCacheName, cRows, lDurationMillis / 1000.0,
                cRows / (lDurationMillis / 1000.0)));
        NamedCache cache = CacheFactory.getCache(sCacheName);
        log(String.format("%s: final size is %d", sCacheName, cache.size()));
        }

If you’re reading carefully, you’ll see that I am actually issuing database queries from two logical places: the grid client that generates the tasks and the grid members executing the load. I mentioned earlier that I’m sharing database parameters between read-through and pre-load by using a CacheLoader. I will have to save the details of how I achieve that sharing for another post.

January 6, 2010 Posted by | Loading | , , , | 7 Comments

Hello world!

I’ve been meaning to start this blog for a while now, and the stars have finally aligned to let that happen.  My intentions for this blog are to document findings, observations and generally interesting things about Oracle Coherence (and related technologies) discovered through practice.

December 15, 2009 Posted by | General | Leave a comment

   

Follow

Get every new post delivered to your Inbox.