Single transaction using multiple connections. (MYSQL/JDBC)
Asked Answered
B

2

7

The application I'm working on is a Java-based ETL process that loads data into multiple tables. The DBMS is Infobright (a MYSQL-based DBMS geared for data warehousing).

The data loading should be done atomically; however, for performance reasons, I want to load data into multiple tables at the same time (using a LOAD DATA INFILE command). This means I need to open multiple connections.

Is there any solution that allows me to do the loads atomically and in parallel? (I'm guessing the answer might depend on the engine for the tables I load into; most of them are Brighthouse, which allows Transactions, but no XA and no Savepoints).

To further clarify, I want to avoid a situation where let's say:

  • I load data into 5 tables
  • I commit the loads for the first 4 tables
  • The commit for the 5th table fails

In this situation, I can't rollback the first 4 loads, because they are already commited.

Botha answered 29/11, 2011 at 8:53 Comment(6)
I don't know InfoBright, but you lock tables, not databases, so as long as it's separate tables, you ought to be able to spawn some threads and have each one lock and fill a different table.Horst
What do you mean by "single transaction using multiple connections"? If by chance you want to use multiple Connection objects to issue LOAD DATA commands to load data from file into a single table, then it is not possible. The documentation for LOAD DATA says that If you specify CONCURRENT [...] other threads can retrieve data from the table while LOAD DATA is executing. If reading stuff from the database could be an issues while LOAD DATA executes, then writing to it at the same time is big no in my book.Scottscotti
I think my question was misunderstood. I want to load into separate tables with each connection. But i want all loads to be part of a single transaction. That means, i either commit data for all tables or rollback data for all tables.Botha
Very interesting, if I have time I'll to hack-up an answer, but basically you have to disable auto-commit for each of your connections, create savepoints, load stuff into your table and do a commit. If one of your transaction fails you just rollback everything.Scottscotti
@MasterF I lied, because I didn't use savepoints ... oh well.Scottscotti
@MasterF It seems you have a peculiar use-case here, but I think your approach is too complicated. If you can give us info on the kind of data you use, and the tables you want those data to be loaded into, maybe we can give you some insight, better alternatives, anything.Scottscotti
C
5

Intro

As I've promised I've hacked up a complete example. I've used MySQL and created three tables like the following:

CREATE TABLE `test{1,2,3}` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `data` varchar(255) NOT NULL UNIQUE,
  PRIMARY KEY (`id`)
);

test2 contains a single row initially.

INSERT INTO `test2` (`data`) VALUES ('a');

(I've posted the full code to http://pastebin.com.)

The following example does several things.

  1. Sets threads to 3 which determines how many jobs are going to be run in parallel.
  2. Creates threads number of connections.
  3. Spouts out some sample data for every table (by default the data is a for every table).
  4. Creates threads number of jobs to be run and loads them with data.
  5. Runs the jobs in threads number of threads and waits for their completion (successful or not).
  6. If no exceptions occurred commits every connection; otherwise it rolls back each of them.
  7. Closes the connections (however these can be reused).

(Note, that I've used Java 7's automatic resource management feature in SQLTask.call().)

Logic

public static void main(String[] args) throws SQLException, InterruptedException {
  int threads = 3;
  List<Connection> connections = getConnections(threads);
  Map<String, String> tableData = getTableData(threads);
  List<SQLTask> tasks = getTasks(threads, connections);
  setData(tableData, tasks);
  try {
    runTasks(tasks);
    commitConnections(connections);
  } catch (ExecutionException ex) {
    rollbackConnections(connections);
  } finally {
    closeConnections(connections);
  }
}

Data

private static Map<String, String> getTableData(int threads) {
  Map<String, String> tableData = new HashMap<>();
  for (int i = 1; i <= threads; i++)
    tableData.put("test" + i, "a");
  return tableData;
}

Tasks

private static final class SQLTask implements Callable<Void> {

  private final Connection connection;

  private String data;
  private String table;

  public SQLTask(Connection connection) {
    this.connection = connection;
  }

  public void setTable(String table) {
    this.table = table;
  }

  public void setData(String data) {
    this.data = data;
  }

  @Override
  public Void call() throws SQLException {
    try (Statement statement = connection.createStatement()) {
      statement.executeUpdate(String.format(
        "INSERT INTO `%s` (data) VALUES  ('%s');", table, data));
    }
    return null;
  }
}

private static List<SQLTask> getTasks(int threads, List<Connection> connections) {
  List<SQLTask> tasks = new ArrayList<>();
  for (int i = 0; i < threads; i++)
    tasks.add(new SQLTask(connections.get(i)));
  return tasks;
}

private static void setData(Map<String, String> tableData, List<SQLTask> tasks) {
  Iterator<Entry<String, String>> i = tableData.entrySet().iterator();
  Iterator<SQLTask> j = tasks.iterator();
  while (i.hasNext()) {
    Entry<String, String> entry = i.next();
    SQLTask task = j.next();
    task.setTable(entry.getKey());
    task.setData(entry.getValue());
  }
}

Run

private static void runTasks(List<SQLTask> tasks) 
    throws ExecutionException, InterruptedException {
  ExecutorService executorService = Executors.newFixedThreadPool(tasks.size());
  List<Future<Void>> futures = executorService.invokeAll(tasks);
  executorService.shutdown();
  for (Future<Void> future : futures)
    future.get();
}

Result

Given the default data returned by getTableData(...)

test1 -> `a`
test2 -> `a`
test3 -> `a`

and the fact that test2 already contains a (and the data column is unique) the second job will fail and throw an exception, thus every connection will be rolled back.

If instead of as you return bs, then the connections will be committed safely.

This can be done similarly with LOAD DATA.


After OP's response on my answer I realized that what she/he wants to do isn't possible to do in a simple and clear manner.

Basically the problem is that after a successful commit the data that was committed can't be rolled-back, because the operation is atomic. Given multiple commits are needed in the case given, rolling-back everything isn't possible unless one tracks all data (in all of the transactions) and if somethings happens deletes everything that was successfully committed.

There is a nice answer relating to the issue of commits and rollbacks.

Cowgill answered 7/12, 2011 at 17:8 Comment(3)
Sorry, i didn't get a chance to read this until now. This is very similar to what i am currently doing, and it does cover situations like SQL errors nicely.Botha
But, it doesn't cover the situation where the ACTUAL COMMIT FAILS (unlikely, but possible). Try this: add a System.in.read() in the "for" in the commitConnections method (before doing the actual commit). Press enter for the first 2 commits, then stop the mysql server and press enter for the 3rd. The 3rd commit will fail, but the first 2 will have already been executed. This is what i'm trying to avoid.Botha
@MasterF After a commit, what was committed can't be rolled-back. Unless you track what was committed and delete those rows after the commit. If you try to commit to totally separate tables then you will never receive errors like duplicate id, etc., so this isn't an issue. If you're connecting to a remote database there is the possibility that the link between you and the database breaks while the third commit() executes and thus will prevent data to propagate to the database from that batch. Basically, you want to break ACID rules in your transaction. That's not going to go smoothly.Scottscotti
F
0

Actually in the newer version of IEE, not ICE, there is an additional feature called DLP (Distributed Load Processing). There is a PDF file on the site, linked from here:

http://www.infobright.com/Products/Features/

Faustena answered 16/2, 2012 at 20:39 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.