使用Spring JDBC进行多线程处理

3
我在使用Spring JDBC和SQL数据库时遇到了连接问题。问题是,第一次尝试时,我的方法创建了n个线程,它们查询数据库,没有出现任何问题。如果我立即再次运行该方法,同样的情况-没有问题。请注意,在尝试之间我没有重新启动应用程序。
当我等待几分钟再次运行应用程序时,问题就出现了-因此我认为某个地方存在超时问题,或者线程被丢弃了。
而问题在于,当我使用单线程版本运行此方法时,它完全正常。因此,我认为实际的URL / user / pass / driver设置是正确的。我对多线程编程还很陌生,所以我认为我的实现中存在缺陷。
我正在使用Spring JDBC和Apache Tomcat JNDI连接池:
Java多线程:
public List<Item> getSetPoints(List<Item> items) {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    for(Item item: items) {
        executorService.submit(new ProcessItem(item));
    }

    executorService.shutdown();
} 

class ProcessItem implements Runnable {
    private Item item;

    public ProcessItem(Item item) {
        this.item = item;
    }

    public void run() {
        Item newItem = piDAO.retrieveSetPoint(item);
    }
}

DAO:

@Component("PIDAO")
public class PIDAO {

    private NamedParameterJdbcTemplate jdbc;

    @Resource(name="pijdbc")
    public void setPiDataSource(DataSource jdbc) {
        this.jdbc = new NamedParameterJdbcTemplate(jdbc);
    }

    public Item retrieveSetPoint(Item item) {
        MapSqlParameterSource params = new MapSqlParameterSource();
        params.addValue("tag", item.getTagName());

        String sql = "SELECT TOP 1  time, value, status FROM piarchive.picomp2 WHERE tag = :tag AND status=0 AND questionable = false ORDER BY time DESC";
        try {
            return jdbc.queryForObject(sql, params, (rs, rowNum) -> {
                item.setPiDate(rs.getString("time"));
                item.setPiValue(rs.getString("value"));
                return item;
            });
        } catch (Exception e) {
           System.out.println(e);
        }
    }
}

Spring DAO容器:

<jee:jndi-lookup jndi-name="jdbc/PI" id="pijdbc"
                 expected-type="javax.sql.DataSource">
</jee:jndi-lookup>

JNDI配置:

 <Resource
    name="jdbc/PI"
    auth="Container"
    type="javax.sql.DataSource"
    maxTotal ="25"
    maxIdle="30"
    maxWaitMillis ="10000"
    driverClassName="com.osisoft.jdbc.Driver"
    url="**Valid URL**"
    username="**Valid Username**"
    password="**Valid Password**"
/>

当出现错误时的堆栈跟踪:
org.springframework.jdbc.UncategorizedSQLException: PreparedStatementCallback; uncategorized SQLException for SQL [SELECT TOP 1  time, value, status FROM piarchive.picomp2 WHERE tag = ? AND status=0 AND questionable = false ORDER BY time DESC]; SQL state [null]; error code [0]; [Orb.Channel] The channel is not registered on server.; nested exception is java.sql.SQLException: [Orb.Channel] The channel is not registered on server.
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:84)
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:81)
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:81)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:645)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:680)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:707)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:757)
at org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate.queryForObject(NamedParameterJdbcTemplate.java:211)
at btv.app.dao.PIDAO.retrieveSetPoint(PIDAO.java:36)
at btv.app.service.PiService$ProcessItem.run(PiService.java:91)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: [Orb.Channel] The channel is not registered on server.
at com.osisoft.jdbc.PreparedStatementImpl.executeQuery(PreparedStatementImpl.java:167)
at org.apache.tomcat.dbcp.dbcp2.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:82)
at org.apache.tomcat.dbcp.dbcp2.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:82)
at org.springframework.jdbc.core.JdbcTemplate$1.doInPreparedStatement(JdbcTemplate.java:688)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:629)
... 11 more

错误: java.sql.SQLException: [Orb.Channel] 通道未在服务器上注册 是特定驱动程序的独有问题。

经过一些研究,它基本上意味着与服务器的连接已断开 - 然而,查看 SQL 服务器日志表明它并没有在服务器端断开连接。

1个回答

1
尝试在retrieveSetPoint方法内创建一个新的NamedParameterJdbcTemplate,以查看是否可以消除任何超时问题。
@Component("PIDAO")
public class PIDAO {

    private DataSource jdbc;

    @Resource(name="pijdbc")
    public void setPiDataSource(DataSource jdbc) {
        this.jdbc = jdbc;
    }

    public Item retrieveSetPoint(Item item) {
        MapSqlParameterSource params = new MapSqlParameterSource();
        params.addValue("tag", item.getTagName());

        String sql = "SELECT TOP 1  time, value, status FROM piarchive.picomp2 WHERE tag = :tag AND status=0 AND questionable = false ORDER BY time DESC";
        try {
            return (new NamedParameterJdbcTemplate(jdbc)).queryForObject(sql, params, (rs, rowNum) -> {
                item.setPiDate(rs.getString("time"));
                item.setPiValue(rs.getString("value"));
                return item;
            });
        } catch (Exception e) {
           System.out.println(e);
        }
    }
}

或者,您可以重用NamedParameterJdbcTemplates并在它们超时时刷新它们;这可以从显式池化中受益,例如:

private final int poolSize = 10;

public Collection<Item> getSetPoints(List<Item> items) {
    ExecutorService executorService = Executors.newFixedThreadPool(poolSize);
    Queue<Item> queue = new ConcurrentLinkedQueue<>();
    queue.addAll(items);
    Collection<Item> output = new ConcurrentLinkedQueue<>();
    for(int i = 0; i < poolSize; i++) {
        executorService.submit(new ProcessItem(queue, output);
    }
    return output;
} 

class ProcessItem implements Runnable {
    private final Queue<Item> queue;
    private final Collection<Item> output;
    private NamedParameterJdbcTemplate jdbc;

    public ProcessItem(Queue<Item> queue, Collection<Item> output) {
        this.queue = queue;
        this.output = output;
        this.jdbc = piDAO.getNamedJdbcTemplate();
    }

    public void run() {
        Item item = null;
        while((item = queue.poll()) != null) {
            try {
                output.add(piDAO.retrieveSetPoint(item, jdbc));
            } catch(SQLException e) {
                this.jdbc = piDAO.getNamedJdbcTemplate();
                output.add(piDAO.retrieveSetPoint(item, jdbc));
            }
        }
    }
}

@Component("PIDAO")
public class PIDAO {

    private DataSource jdbc;

    @Resource(name="pijdbc")
    public void setPiDataSource(DataSource jdbc) {
        this.jdbc = jdbc;
    }

    public NamedParameterJdbcTemplate getNamedJdbcTemplate() {
        return new NamedParameterJdbcTemplate(jdbc);
    }

    public Item retrieveSetPoint(Item item, NamedParameterJdbcTemplate template) throws SQLException {
        MapSqlParameterSource params = new MapSqlParameterSource();
        params.addValue("tag", item.getTagName());

        String sql = "SELECT TOP 1  time, value, status FROM piarchive.picomp2 WHERE tag = :tag AND status=0 AND questionable = false ORDER BY time DESC";
        return template.queryForObject(sql, params, (rs, rowNum) -> {
            item.setPiDate(rs.getString("time"));
            item.setPiValue(rs.getString("value"));
            return item;
        });
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接