多线程(无状态类)

3

非常抱歉,代码有点长,但我想知道是否有人可以帮忙解决一个多线程问题(我对多线程还很陌生)。我正在尝试设计一个RESTful Web服务API的Facade类,可以与多个线程共享。我使用HttpURLConnection进行连接,并使用Google GSON将数据转换为JSON格式。

下面是我目前的代码。在这个例子中,它有一个公共方法来调用API(authenticateCustomer()),私有方法用于实现API调用(例如构建POST数据字符串,发送POST请求等)。

我创建了一个该类的实例,并与1000个线程共享。这些线程调用authenticateCustomer()方法。大多数线程都能正常工作,但有一些线程会出现空指针异常,这是因为我没有实现任何同步。如果我将authenticateCustomer()方法设置为“synchronized”,它就可以正常工作。问题是这会导致并发性差(例如,如果POST请求突然需要很长时间才能完成,这将阻塞所有其他线程)。

现在我的问题是,下面的类不是无状态的吗?因此线程安全?类中仅有极少量的字段,在构造函数中声明为final并进行赋值。所有方法均使用本地变量。Gson对象是无状态的(根据其网站),而且在API方法中也是作为本地变量创建的。

public final class QuizSyncAPIFacade 
{
    // API Connection Details
private final String m_apiDomain;
private final String m_apiContentType;
private final int m_bufferSize;

// Constructors
public QuizSyncAPIFacade()
{
    m_apiDomain      = "http://*****************************";
    m_apiContentType = ".json";
    m_bufferSize = 8192; // 8k
}

private String readInputStream(InputStream stream) throws IOException
{
        // Create a buffer for the input stream
    byte[] buffer = new byte[m_bufferSize];

    int readCount;

    StringBuilder builder = new StringBuilder();

    while ((readCount = stream.read(buffer)) > -1) {
        builder.append(new String(buffer, 0, readCount));
    }

    return builder.toString();
}

private String buildPostData(HashMap<String,String> postData) throws UnsupportedEncodingException
{
    String data = "";

    for (Map.Entry<String,String> entry : postData.entrySet()) 
    {
        data += (URLEncoder.encode(entry.getKey(), "UTF-8") + "=" + URLEncoder.encode(entry.getValue(), "UTF-8") + "&");        
    }

    // Trim the last character (a trailing ampersand)
    int length = data.length();

    if (length > 0) {
        data = data.substring(0, (length - 1));
    }

    return data;
}

private String buildJSONError(String message, String name, String at)
{
    String error = "{\"errors\":[{\"message\":\"" + message + "\",\"name\":\"" + name + "\",\"at\":\"" + at + "\"}]}";

    return error;
}

private String callPost(String url, HashMap<String,String> postData) throws IOException
{
    // Set up the URL for the API call 
    URL apiUrl = new URL(url);

    // Build the post data
    String data = buildPostData(postData);

    // Call the API action
    HttpURLConnection conn;

    try {
        conn = (HttpURLConnection)apiUrl.openConnection();
    } catch (IOException e) {
        throw new IOException(buildJSONError("Failed to open a connection.", "CONNECTION_FAILURE", ""));
    }

    // Set connection parameters for posting data
    conn.setRequestMethod("POST");
    conn.setUseCaches(false);
    conn.setDoInput(true);
    conn.setDoOutput(true);

    // Write post data
    try {
        DataOutputStream wr = new DataOutputStream(conn.getOutputStream());
        wr.writeBytes(data);
        wr.flush();
        wr.close();
    } catch (IOException e) {
        throw new IOException(buildJSONError("Failed to post data in output stream (Connection OK?).", "POST_DATA_FAILURE", ""));           
    }

    // Read the response from the server                
    InputStream is;

    try {
        is = conn.getInputStream();
    } catch (IOException e) {
        InputStream errStr = conn.getErrorStream();

        if (errStr != null) 
        {
            String errResponse = readInputStream(errStr);
            throw new IOException(errResponse);
        } 
        else 
        {
            throw new IOException(buildJSONError("Failed to read error stream (Connection OK?).", "ERROR_STREAM_FAILURE", ""));
        }
    }

    // Read and return response from the server
    return readInputStream(is);
}

/* -------------------------------------
 * 
 * Synchronous API calls
 * 
   ------------------------------------- */

public APIResponse<CustomerAuthentication> authenticateCustomer(HashMap<String,String> postData)
{
    // Set the URL for this API call
    String apiURL = m_apiDomain + "/customer/authenticate" + m_apiContentType;

    Gson jsonConv = new Gson();

    String apiResponse = "";

    try 
    { 
        // Call the API action
        apiResponse = callPost(apiURL, postData);

        // Convert JSON response to the required object type
        CustomerAuthentication customerAuth = jsonConv.fromJson(apiResponse, CustomerAuthentication.class);

        // Build and return the API response object
        APIResponse<CustomerAuthentication> result = new APIResponse<CustomerAuthentication>(true, customerAuth, null);

        return result;
    } 
    catch (IOException e) 
    {
        // Build and return the API response object for a failure with error list
        APIErrorList errorList = jsonConv.fromJson(e.getMessage(), APIErrorList.class);

        APIResponse<CustomerAuthentication> result = new APIResponse<CustomerAuthentication>(false, null, errorList);

        return result;
    }
}

}


1
这个类绝对是线程安全的。 - Roman K
2
你直接使用传入的HashMap(没有复制)- 如果它在你使用它时被另一个线程修改,可能会导致你描述的问题。我建议先将其复制到本地变量中,然后检查本地变量的内容是否有效,然后再使用本地变量。 - assylias
4
你如何确定NPE是由多线程引起的?触发NPE的代码行是哪一行?以及堆栈跟踪信息是什么? - Sean Mickey
1
我们确实需要更多的信息。authenticateCustomer 不能返回空指针。你是不是指在 authenticateCustomer 中发生异常,导致返回的 APIResponse 中的 CustomerAuthentication 为空? - toto2
谢谢大家。我认为问题是线程相关的,因为当我同步authenticateCustomer()方法时,它每次都能正常工作(尽管并发性较差)。结果发现,正如下面的答案所述,我过载了我的身份验证服务,并且没有正确地意识到这一点。这导致我的工作线程中出现了NPE。你们的回复帮助我确认我的类没有根本性的问题。 - RWilson
2个回答

2
如果您遇到错误,可能是由于您过载了身份验证服务(如果您逐个执行,则不会发生这种情况)。也许返回了一个错误,如500、503或504,您可能忽略了它,得不到您所期望的任何东西,您返回 nullhttp://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html 假设您没有1000个CPU,我会使用更少的线程。有可能使用这么多线程会比更有效率变得更加缓慢。
我还将检查您的服务是否每次都正确返回,并调查为什么会出现 null 值。
如果您的服务一次只能处理20个请求,作为最后的选择,可以尝试使用 Semaphore。这可用于限制并发请求的数量。

3
在某些情况下,使用比CPU核心更多的线程可能是有意义的,特别是当它们很可能主要需要等待I/O时。不过,考虑基于非阻塞I/O的方法也是值得的。 - x4u
1
只要通过增加并发请求来提高服务或IO的吞吐量,那就是正确的。虽然一些并发通常会提高吞吐量,但在某些情况下最优值可能相当小。显然,尝试执行超过服务处理能力的请求可能会导致失败。 - Peter Lawrey
1
那就是了,Peter,谢谢!我在过载我的认证服务。创建的线程越多,错误百分比就越大。NPE是由于我没有正确验证工作线程中来自服务的响应而不是线程安全问题引起的。一旦我修复了验证,我发现当我超过一定数量的工作线程时,会出现一些500错误。 - RWilson

0

任何无状态类本质上都是线程安全的,前提是它所访问的对象要么是线程私有的,要么本身就是线程安全的。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接