需要在Java中编写代码创建连接池。

33
需要Java代码来创建连接池?如何确保连接池不会返回已经在使用中的对象?如果客户端在从连接池中取出连接后关闭了连接,会发生什么?
更新1:我想用简单的Java术语创建它,并想看看在多线程环境下它是如何工作的。我的意思是哪些方法将被同步,哪些不会。此类将是公共类吗?如果是,那么任何人都可以访问此类并重新初始化连接池?
更新2:我有一些代码如下。但我不知道“关闭从池中获取的连接会将其返回到池中,而不会物理关闭连接。”也不理解“因为如果一个连接已经从池中借出并且尚未归还,则它不可用,无法重新分配给池的另一个客户端。”
import java.util.*;
import java.sql.*;

class ConnectionPoolManager
{

 String databaseUrl = "jdbc:mysql://localhost:3306/myDatabase";
 String userName = "userName";
 String password = "userPass";

 Vector connectionPool = new Vector();

 public ConnectionPoolManager()
 {
  initialize();
 }

 public ConnectionPoolManager(
  //String databaseName,
  String databaseUrl,
  String userName,
  String password
  )
 {
  this.databaseUrl = databaseUrl;
  this.userName = userName;
  this.password = password;
  initialize();
 }

 private void initialize()
 {
  //Here we can initialize all the information that we need
  initializeConnectionPool();
 }

 private void initializeConnectionPool()
 {
  while(!checkIfConnectionPoolIsFull())
  {
   System.out.println("Connection Pool is NOT full. Proceeding with adding new connections");
   //Adding new connection instance until the pool is full
   connectionPool.addElement(createNewConnectionForPool());
  }
  System.out.println("Connection Pool is full.");
 }

 private synchronized boolean checkIfConnectionPoolIsFull()
 {
  final int MAX_POOL_SIZE = 5;

  //Check if the pool size
  if(connectionPool.size() < 5)
  {
   return false;
  }

  return true;
 }

 //Creating a connection
 private Connection createNewConnectionForPool()
 {
  Connection connection = null;

  try
  {
   Class.forName("com.mysql.jdbc.Driver");
   connection = DriverManager.getConnection(databaseUrl, userName, password);
   System.out.println("Connection: "+connection);
  }
  catch(SQLException sqle)
  {
   System.err.println("SQLException: "+sqle);
   return null;
  }
  catch(ClassNotFoundException cnfe)
  {
   System.err.println("ClassNotFoundException: "+cnfe);
   return null;
  }

  return connection;
 }

 public synchronized Connection getConnectionFromPool()
 {
  Connection connection = null;

  //Check if there is a connection available. There are times when all the connections in the pool may be used up
  if(connectionPool.size() > 0)
  {
   connection = (Connection) connectionPool.firstElement();
   connectionPool.removeElementAt(0);
  }
  //Giving away the connection from the connection pool
  return connection;
 }

 public synchronized void returnConnectionToPool(Connection connection)
 {
  //Adding the connection from the client back to the connection pool
  connectionPool.addElement(connection);
 }

 public static void main(String args[])
 {
  ConnectionPoolManager ConnectionPoolManager = new ConnectionPoolManager();
 }

}

10
不管下面有些回答说什么,都要编写自己的连接池。将其与其他连接池进行比较,并在此过程中学习更多关于JDBC和其他相关知识。仅仅拥有一堆已经存在的产品不应该阻止你自己去开发。把它们作为打败的标准即可。勇往直前。 - Mindwin Remember Monica
10个回答

49
需要在Java中创建连接池的代码吗?不确定问题是什么,但不要再创建另一个连接池,使用现有的解决方案,如C3P0Apache DBCPProxoolBoneCP(该领域的新玩家)。我会使用C3P0。
我们如何确保连接池不返回已经在使用中的相同对象?
因为如果一个连接从池中借出并且还没有归还,它就不在池中,不能分配给池的另一个客户端(资源从池中删除,直到它们被归还)。
如果客户端在从连接池中取出连接后关闭了连接会发生什么?
客户端从连接池中获取的连接实际上不是 java.sql.Connection,而是对一些方法行为进行了定制的 java.sql.Connection 的包装器(代理)。其中之一就是 close() 方法,并不会关闭 Connection 实例,而是将其返回到连接池中。

2
+1 for C3PO... 我一直在使用它,效果非常好。它非常轻便且极易使用。 - Polaris878
3
“不会关闭连接实例,而是将其返回到池中”- 连接类如何知道池的存在并在调用关闭方法时将自己返回给池呢? - Greg
3
请务必查看Hikari CP:https://github.com/brettwooldridge/HikariCP。从我所读的内容来看,它似乎比列出的其他选项更快。 - Joshua Kissoon

14

不要自己编写。有很多开源的库可以为你完成这个任务,而且易于使用,已经解决了你在尝试自己编写时会遇到的所有问题。

这里是一个使用Apache的Commons DBCP和Commons Pool的简单示例:

首先设置一个数据源(DataSource)。

javax.sql.DataSource source = new org.apache.commons.dbcp.BasicDataSource();
source.setDriverClassName("com.mysql.jdbc.Driver");
source.setUsername("username");
source.setPassword("password");
source.setUrl("jdbc:mysql://localhost:3306/myDatabase");

一旦您有了数据源(DataSource),从连接池获取连接就非常容易。

java.sql.Connection connection = source.getConnection();
关闭连接后,它会被返回到池中。
connection.close();

2
+1。但是,需要更正的是,方法setDriverClassName()和其他方法在javax.sql.DataSource[java 1.6]中不可用。正确的类型应该是BasicDataSource。即:BasicDataSource source = new BasicDataSource(); 更多详情请参考:http://svn.apache.org/viewvc/commons/proper/dbcp/trunk/doc/BasicDataSourceExample.java?revision=1100136&view=markup - spiderman
1
不要自己编写代码,而是使用现有的“不受支持、取消支持、历史”库,并且什么也不学习。我不同意这种观点。为什么新开发人员不应该自己编写代码呢?至少他们应该得到支持,以便能够自己实现并学习。为什么不呢?“自己编写代码,并编写良好的单元测试和文档。” - Levent Divilioglu

14

我希望这段源代码能有所帮助。 http://jagadeeshmanne.blogspot.com/2014/03/connection-pool-in-java-jdbc.html

Configuration.java

package com.jmanne.utils;
 
public class Configuration {
  
 public String DB_USER_NAME ;
  
 public String DB_PASSWORD ;
  
 public String DB_URL;
  
 public String DB_DRIVER;
  
 public Integer DB_MAX_CONNECTIONS;
  
 public Configuration(){
  init();
 }
  
 private static Configuration configuration = new Configuration();
  
 public static Configuration getInstance(){ 
  return configuration;
 }
  
 private void init(){
  DB_USER_NAME = "root"
  DB_PASSWORD = "root"
  DB_URL = "jdbc:mysql://localhost:3306/jmanne"
  DB_DRIVER = "com.mysql.jdbc.Driver"
  DB_MAX_CONNECTIONS = 5
 }     
}

JdbcConnectionPool.java

package com.jmanne.db;
 
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
 
import com.jmanne.utils.Configuration;
import com.mysql.jdbc.Connection;
 
public class JdbcConnectionPool {
  
 List<connection> availableConnections = new ArrayList<connection>();
 
 public JdbcConnectionPool()
 {
  initializeConnectionPool();
 }
 
 private void initializeConnectionPool()
 {
  while(!checkIfConnectionPoolIsFull())
  {
   availableConnections.add(createNewConnectionForPool());
  }
 }
 
 private synchronized boolean checkIfConnectionPoolIsFull()
 {
  final int MAX_POOL_SIZE = Configuration.getInstance().DB_MAX_CONNECTIONS;
 
  if(availableConnections.size() < MAX_POOL_SIZE)
  {
   return false;
  }
 
  return true;
 }
 
 //Creating a connection
 private Connection createNewConnectionForPool()
 {
  Configuration config = Configuration.getInstance();
  try {
   Class.forName(config.DB_DRIVER);
   Connection connection = (Connection) DriverManager.getConnection(
     config.DB_URL, config.DB_USER_NAME, config.DB_PASSWORD);
   return connection;
  } catch (ClassNotFoundException e) {
   e.printStackTrace();
  } catch (SQLException e) {
   e.printStackTrace();
  }
  return null;
   
 }
 
 public synchronized Connection getConnectionFromPool()
 {
  Connection connection = null;
  if(availableConnections.size() > 0)
  {
   connection = (Connection) availableConnections.get(0);
   availableConnections.remove(0);
  }
  return connection;
 }
 
 public synchronized void returnConnectionToPool(Connection connection)
 {
  availableConnections.add(connection);
 }
}

DataSource.java

package com.jmanne.db;
 
import java.sql.SQLException;
 
import com.mysql.jdbc.Connection;
 
public class DataSource {
  
 static JdbcConnectionPool pool = new JdbcConnectionPool();
  
 public static Connection getConnection() throws ClassNotFoundException, SQLException{
  Connection connection = pool.getConnectionFromPool();
  return connection;
 }
  
 public static void returnConnection(Connection connection) {
  pool.returnConnectionToPool(connection);
 }
}

什么是数据库驱动程序? - LoveMeow
您能否举个例子,说明如何使用这段代码? - LoveMeow
2
抱歉耽搁了。db driver 意味着驱动程序类名。需要将该变量名称更改为 driverClass。您可以在服务类中使用 DataSource.getConnection()。它将从池中返回连接对象。 - Jagadeesh

6

只需使用信号量。理想情况下,您应该使用 CP3ODBCP 作为您的连接池。现在,您可以根据信号量来限制您的连接。

每次执行 Get 操作时,您需要从 Semaphore 中获取,并在每个 Release 操作上释放。此外,信号量是线程安全的。


4

可以使用现有的连接池,例如Apache DBCP

连接池返回的连接通常是代理,该代理会“忽略”应用程序中对close()方法的调用。当这些连接返回到池中时,它们可以被重复利用。如果需要,池还可以自动关闭和重新打开。


2
如果您的应用程序运行在服务器上,则配置为数据源,其中服务器将负责连接池。如果是一个简单的Java客户端,则使用Apache DBCP(如果连接数据库),否则使用Apache Commons Pooling API。 请参见此处:Apache Commons

2

自己编写连接池的一个理由是可以避免配置和额外的jar包。我同意需要启用第三方接口,以便可以切换成成熟的连接池,但拥有自己的小型解决方案也有其优点。使用带有同步块的自清理向量和具有close()方法的连接包装器来标记可用连接,在servlet应用程序中非常有效。


0

我有一个解决方案,可以创建一个连接池实用程序,帮助您创建一个默认大小为10的池。

@Component public class ConnectionPool { private static final Logger logger = LoggerFactory.getLogger(ConnectionPool.class); private static final int MAX_POOL_SIZE_LIMIT = 10; private BlockingQueue activeConnectinoQueue = new LinkedBlockingQueue<>(); private BlockingQueue usedConnectinoList = new LinkedBlockingQueue<>(); private int initialPoolSize = 5;

@Autowired
@Qualifier("dataSource")
private DataSource dataSource;

public void initConnectionPool() {
    logger.info("ConnectionPool initialization started.");
    if(activeConnectinoQueue.isEmpty() && usedConnectinoList.isEmpty()) {
        for (int i=0; i<initialPoolSize; i++) {
            createConnections();
        }
    }
    logger.info("ConnectionPool initialization completed. ConnectionPool size : {}", activeConnectinoQueue.size());
}

private void createConnections() {
    try {
        Connection connection = dataSource.getConnection();
        activeConnectinoQueue.add(connection);
    }catch (SQLException e) {
        logger.error("Error in getting connection from pool : ", e);
    }
}

public Connection getConnection() {
    if(activeConnectinoQueue.isEmpty()) {
        initConnectionPool();
    }
    Connection connection =  activeConnectinoQueue.remove();

    try {
        if(connection.isClosed()) {
            connection = dataSource.getConnection();
        }
    }catch (SQLException e) {
        logger.error("Error while getting connection from pool : ", e);
    }

    usedConnectinoList.add(connection);
    return connection;
}


public void releaseConnection(Connection connection) {
    if(connection != null) {
        usedConnectinoList.remove(connection);
        activeConnectinoQueue.add(connection);
    }
}

public void setInitialPoolSize(int initialPoolSize) {
    if(!(initialPoolSize < 0 || initialPoolSize > MAX_POOL_SIZE_LIMIT)) {
        this.initialPoolSize = initialPoolSize;
    }
}

public int getInitialPoolSize() {
    return initialPoolSize;
}

public int getConnectionPoolSize() {
    return activeConnectinoQueue.size() + usedConnectinoList.size();
}

public void setDataSource(AbstractDataSource dataSource) {
    this.dataSource = dataSource;
}

public void closeConnectionPool() {

    logger.info("Closing connectionPool started.");
    close(usedConnectinoList);
    close(activeConnectinoQueue);
    logger.info("ConnectionPool Closed.");
}

private void close(BlockingQueue<Connection> connectinosQueue) {
    for (int i=0; i<connectinosQueue.size(); i++) {
        Connection connection = connectinosQueue.remove();
        if(connection != null) {
            try {
                connection.close();
            } catch (SQLException e) {
                logger.error("Error in initializing connection pool : ", e);
            }
        }
    }
}

}

现在为了让它更安全,我们需要附加一个工厂对象...

public enum ConnectionFactory {
CONNECTION;

private ConnectionPool connectionPool;
public void setConnectionPool(ConnectionPool connectionPool) {
    this.connectionPool = connectionPool;
}

public Connection getConnection() {
    return connectionPool.getConnection();
}

public void closeConnection() {
    connectionPool.closeConnectionPool();
}

public void releaseConnection(Connection connection) {
    connectionPool.releaseConnection(connection);
}

public int getConnectionPoolSize() {
    return connectionPool.getConnectionPoolSize();
}

@Component
public static class ConnectionBuilder {
    @Autowired
    private ConnectionPool connectionPool;

    public void setConnectionPool(ConnectionPool connectionPool) {
        this.connectionPool = connectionPool;
    }
    @PostConstruct
    public void postConstruct() {
        for (ConnectionFactory cfactory : EnumSet.allOf(ConnectionFactory.class)) {
            cfactory.setConnectionPool(connectionPool);
        }
    }
}

}


0

Java连接池?
创建JDBC连接池有三种方法,非常简单...

  1. Apache Commons DBCP

    public class DBCPDataSource {
    
    private static BasicDataSource ds = new BasicDataSource();
    
    static {
        ds.setUrl("jdbc:h2:mem:test");
        ds.setUsername("user");
        ds.setPassword("password");
        ds.setMinIdle(5);
        ds.setMaxIdle(10);
        ds.setMaxOpenPreparedStatements(100);
    }
    
    public static Connection getConnection() throws SQLException {
        return ds.getConnection();
    }
    
    private DBCPDataSource(){ }
    }
    

    现在您可以获取连接

    Connection con = DBCPDataSource.getConnection();
    
  2. HikariCP

    public class HikariCPDataSource {
    
    private static HikariConfig config = new HikariConfig();
    private static HikariDataSource ds;
    
    static {
        config.setJdbcUrl("jdbc:h2:mem:test");
        config.setUsername("user");
        config.setPassword("password");
        config.addDataSourceProperty("cachePrepStmts", "true");
        config.addDataSourceProperty("prepStmtCacheSize", "250");
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
        ds = new HikariDataSource(config);
    }
    
    public static Connection getConnection() throws SQLException {
        return ds.getConnection();
    }
    
    private HikariCPDataSource(){}
    }
    

现在您可以获取连接

Connection con = HikariCPDataSource.getConnection();
  • C3PO

     public class C3poDataSource {
    
    private static ComboPooledDataSource cpds = new ComboPooledDataSource();
    
    static {
        try {
            cpds.setDriverClass("org.h2.Driver");
            cpds.setJdbcUrl("jdbc:h2:mem:test");
            cpds.setUser("user");
            cpds.setPassword("password");
        } catch (PropertyVetoException e) {
            // 处理异常
        }
    }
    
    public static Connection getConnection() throws SQLException {
        return cpds.getConnection();
    }
    
    private C3poDataSource(){}
    }
    
  • 现在您可以获取连接

    Connection con = C3poDataSource.getConnection();
    

    0

    我有一些Java模型代码,其中包含具有多线程连接池。

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Iterator;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    abstract class ObjectPool<T> {
        private ConcurrentLinkedQueue<T> pool;
        ScheduledExecutorService executorService;
    
        ObjectPool(int minObjects) {
            pool = new ConcurrentLinkedQueue<T>();
            for (int i = 0; i < minObjects; i++) {
                pool.add(createObject());
            }
        }
    
        ObjectPool(final int minObjects, final int maxSize, final long interval){
            pool = new ConcurrentLinkedQueue<T>();
            for (int i = 0; i < minObjects; i++) {
                pool.add(createObject());
            }
    
            executorService = Executors.newSingleThreadScheduledExecutor();
            executorService.scheduleWithFixedDelay(new Runnable(){
    
                public void run() {
                    int size = pool.size();
                    while(size > maxSize){
                        pool.remove();
                    }
                    Iterator i = pool.iterator();
                    while(i.hasNext()){
                        T t = (T) i.next();
                        if(checkExpiry(t)){
                            System.out.println("Expiry existed...");
                            i.remove();
                        }
                    }
    
                    while(pool.size() < minObjects){
                        System.out.println("Adding more objects to pool");
                        pool.add(createObject());
                    }
                }
    
            }, interval, interval, TimeUnit.MILLISECONDS);
    
        }
    
        public T borrowObject() {
            if (pool.peek() == null)
                return createObject();
            return pool.remove();
        }
    
        public void addObject(T obj) {
            if (obj == null)
                return;
            pool.add(obj);
        }
    
        public abstract T createObject();
    
        public abstract boolean checkExpiry(T t);
    }
    
    class MultithreadQuery extends Thread{
        private ObjectPool<Connection> pool;
        private int threadNo;
        String query;
        MultithreadQuery(ObjectPool<Connection> pool,int threadNo, String query){
            this.pool = pool;
            this.threadNo = threadNo;
            this.query = query;
    
        }
        @Override
        public void run(){
            Connection con = pool.borrowObject();
            Statement stmt;
            try {
                stmt = con.createStatement();
                System.out.println("Query started for thread->"+ threadNo);
                ResultSet rs=stmt.executeQuery(query);
                while(rs.next())  
                System.out.println(rs.getInt(1)+"  "+rs.getString(2)+"  "+rs.getString(3));
                System.out.println("closing connection....");
                con.close();
            } catch (SQLException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }  
            pool.addObject(con);        
            System.out.println("Query ended for thread->"+ threadNo);
        }
    }
    
    public class ObjectPoolPatternDemo {
        ObjectPool<Connection> pool;
    
        public void setUp(){
            pool = new ObjectPool<Connection>(4, 10, 1) {
    
                @Override
                public Connection createObject() {
                    Connection con;
                    try {
                        con = DriverManager.getConnection("URL","Username","Password");
                        return con;
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                    return null;
                }
    
                @Override
                public boolean checkExpiry(Connection conn) {
                    boolean expiryFlag = false;
                    try {
                        if(conn.isClosed())
                            expiryFlag = true;
    
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                    return expiryFlag;
                }
            };
        }
    
        public static void main(String[] args) throws SQLException {
            ObjectPoolPatternDemo oppd = new ObjectPoolPatternDemo();
            oppd.setUp();
    
            ExecutorService es = Executors.newFixedThreadPool(4);
            String query = "select * from TABLE";
            es.execute(new MultithreadQuery(oppd.pool,1,query));
            es.execute(new MultithreadQuery(oppd.pool,2,query));
            es.execute(new MultithreadQuery(oppd.pool,3,query));
            es.execute(new MultithreadQuery(oppd.pool,4,query));
            es.execute(new MultithreadQuery(oppd.pool,5,query));
            es.execute(new MultithreadQuery(oppd.pool,6,query));
    
            es.shutdown();
            try {
                es.awaitTermination(1, TimeUnit.DAYS);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println("finally completed...");
        }
    }
    

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接