成人午夜激情影院,小视频免费在线观看,国产精品夜夜嗨,欧美日韩精品一区二区在线播放

Java 5.0 多線程編程實踐

2010-08-28 10:48:27來源:西部e網作者:

Java5增加了新的類庫并發集java.util.concurrent,該類庫為并發程序提供了豐富的API多線程編程在Java 5中更加容易,靈活。本文通過一個網絡服務器模型,來實踐Java5的多線程編程,該模型中使用了Java5中的線程池,阻塞隊列,可重入鎖等,還實踐了Callable, Future等接口,并使用了Java 5的另外一個新特性泛型。

  簡介

  本文將實現一個網絡服務器模型,一旦有客戶端連接到該服務器,則啟動一個新線程為該連接服務,服務內容為往客戶端輸送一些字符信息。一個典型的網絡服務器模型如下:

  1. 建立監聽端口。

  2. 發現有新連接,接受連接,啟動線程,執行服務線程。 3. 服務完畢,關閉線程。

  這個模型在大部分情況下運行良好,但是需要頻繁的處理用戶請求而每次請求需要的服務又是簡短的時候,系統會將大量的時間花費在線程的創建銷毀。Java 5的線程池克服了這些缺點。通過對重用線程來執行多個任務,避免了頻繁線程的創建與銷毀開銷,使得服務器的性能方面得到很大提高。因此,本文的網絡服務器模型將如下:

  1. 建立監聽端口,創建線程池。

  2. 發現有新連接,使用線程池來執行服務任務。

  3. 服務完畢,釋放線程到線程池。

  下面詳細介紹如何使用Java 5的concurrent包提供的API來實現該服務器。

  初始化

  初始化包括創建線程池以及初始化監聽端口。創建線程池可以通過調用java.util.concurrent.Executors類里的靜態方法newChahedThreadPool或是newFixedThreadPool來創建,也可以通過新建一個java.util.concurrent.ThreadPoolExecutor實例來執行任務。這里我們采用newFixedThreadPool方法來建立線程池。

ExecutorService pool = Executors.newFixedThreadPool(10);

  表示新建了一個線程池,線程池里面有10個線程為任務隊列服務。

  使用ServerSocket對象來初始化監聽端口。

private static final int PORT = 19527;
serverListenSocket = new ServerSocket(PORT);
serverListenSocket.setReuseAddress(true);
serverListenSocket.setReuseAddress(true);

  服務新連接

  當有新連接建立時,accept返回時,將服務任務提交給線程池執行。

while(true){
 Socket socket = serverListenSocket.accept();
 pool.execute(new ServiceThread(socket));
}

  這里使用線程池對象來執行線程,減少了每次線程創建和銷毀的開銷。任務執行完畢,線程釋放到線程池。

  服務任務

  服務線程ServiceThread維護一個count來記錄服務線程被調用的次數。每當服務任務被調用一次時,count的值自增1,因此ServiceThread提供一個increaseCount和getCount的方法,分別將count值自增1和取得該count值。由于可能多個線程存在競爭,同時訪問count,因此需要加鎖機制,在Java 5之前,我們只能使用synchronized來鎖定。Java 5中引入了性能更加粒度更細的重入鎖ReentrantLock。我們使用ReentrantLock保證代碼線程安全。下面是具體代碼:

private static ReentrantLock lock = new ReentrantLock ();
private static int count = 0;
private int getCount(){
 int ret = 0;
 try{
  lock.lock();
  ret = count;
 }finally{
  lock.unlock();
 }
 return ret;
}
private void increaseCount(){
 try{
  lock.lock();
  ++count;
 }finally{
  lock.unlock();
 }
}

  服務線程在開始給客戶端打印一個歡迎信息,

increaseCount();
int curCount = getCount();
helloString = "hello, id = " + curCount+"\r\n";
dos = new DataOutputStream(connectedSocket.getOutputStream());
dos.write(helloString.getBytes());

  然后使用ExecutorService的submit方法提交一個Callable的任務,返回一個Future接口的引用。這種做法對費時的任務非常有效,submit任務之后可以繼續執行下面的代碼,然后在適當的位置可以使用Future的get方法來獲取結果,如果這時候該方法已經執行完畢,則無需等待即可獲得結果,如果還在執行,則等待到運行完畢。

ExecutorService executor = Executors.newSingleThreadExecutor();
Future future = executor.submit(new TimeConsumingTask());
dos.write("let's do soemthing other".getBytes());
String result = future.get();
dos.write(result.getBytes());

  其中TimeConsumingTask實現了Callable接口

class TimeConsumingTask implements Callable {
 public String call() throws Exception {
  System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
  return "ok, here's the result: It takes me lots of time to produce this result";
 }
}

  這里使用了Java 5的另外一個新特性泛型,聲明TimeConsumingTask的時候使用了String做為類型參數。必須實現Callable接口的call函數,其作用類似與Runnable中的run函數,在call函數里寫入要執行的代碼,其返回值類型等同于在類聲明中傳入的類型值。在這段程序中,我們提交了一個Callable的任務,然后程序不會堵塞,而是繼續執行dos.write("let's do soemthing other".getBytes());當程序執行到String result = future.get()時如果call函數已經執行完畢,則取得返回值,如果還在執行,則等待其執行完畢。

服務器端的完整實現

  服務器端的完整實現代碼如下:

package com.andrew;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class Server {
 private static int produceTaskSleepTime = 100;
 private static int consumeTaskSleepTime = 1200;
 private static int produceTaskMaxNumber = 100;
 private static final int CORE_POOL_SIZE = 2;
 private static final int MAX_POOL_SIZE = 100;
 private static final int KEEPALIVE_TIME = 3;
 private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;
 private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
 private static final String HOST = "127.0.0.1";
 private static final int PORT = 19527;
 private BlockingQueueworkQueue = new ArrayBlockingQueue(QUEUE_CAPACITY);
 //private ThreadPoolExecutor serverThreadPool = null;
 private ExecutorService pool = null;
 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
 private ServerSocket serverListenSocket = null;
 private int times = 5;
 public void start() {
  // You can also init thread pool in this way.
  /*serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
  MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue,
  rejectedExecutionHandler);*/
  pool = Executors.newFixedThreadPool(10);
  try {
   serverListenSocket = new ServerSocket(PORT);
   serverListenSocket.setReuseAddress(true);

   System.out.println("I'm listening");
   while (times-- >0) {
    Socket socket = serverListenSocket.accept();
    String welcomeString = "hello";
    //serverThreadPool.execute(new ServiceThread(socket, welcomeString));
    pool.execute(new ServiceThread(socket));
   }
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  cleanup();
 }

 public void cleanup() {
  if (null != serverListenSocket) {
   try {
    serverListenSocket.close();
   } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
  //serverThreadPool.shutdown();
  pool.shutdown();
 }

 public static void main(String args[]) {
  Server server = new Server();
  server.start();
 }
}

class ServiceThread implements Runnable, Serializable {
 private static final long serialVersionUID = 0;
 private Socket connectedSocket = null;
 private String helloString = null;
 private static int count = 0;
 private static ReentrantLock lock = new ReentrantLock();

 ServiceThread(Socket socket) {
  connectedSocket = socket;
 }

 public void run() {
  increaseCount();
  int curCount = getCount();
  helloString = "hello, id = " + curCount + "\r\n";

  ExecutorService executor = Executors.newSingleThreadExecutor();
  Futurefuture = executor.submit(new TimeConsumingTask());

  DataOutputStream dos = null;
  try {
   dos = new DataOutputStream(connectedSocket.getOutputStream());
   dos.write(helloString.getBytes());
   try {
    dos.write("let's do soemthing other.\r\n".getBytes());
    String result = future.get();
    dos.write(result.getBytes());
   } catch (InterruptedException e) {
    e.printStackTrace();
   } catch (ExecutionException e) {
    e.printStackTrace();
   }
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } finally {
   if (null != connectedSocket) {
    try {
     connectedSocket.close();
    } catch (IOException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }
   if (null != dos) {
    try {
     dos.close();
    } catch (IOException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }
   executor.shutdown();
  }
 }

 private int getCount() {
  int ret = 0;
  try {
   lock.lock();
   ret = count;
  } finally {
   lock.unlock();
  }
  return ret;
 }

 private void increaseCount() {
  try {
   lock.lock();
   ++count;
  } finally {
   lock.unlock();
  }
 }
}

class TimeConsumingTask implements Callable{
 public String call() throws Exception {
  System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
  return "ok, here's the result: It takes me lots of time to produce this result";
 }

}

  運行程序

  運行服務端,客戶端只需使用telnet 127.0.0.1 19527 即可看到信息如下:

Java 5.0 多線程編程實踐

關鍵詞:Java

贊助商鏈接:

主站蜘蛛池模板: 贵港市| 莫力| 错那县| 岱山县| 渭南市| 五大连池市| 苗栗县| 民丰县| 稷山县| 正镶白旗| 睢宁县| 贵德县| 多伦县| 株洲县| 永年县| 天门市| 合水县| 林口县| 自治县| 定襄县| 睢宁县| 平陆县| 灵丘县| 汪清县| 当阳市| 乌拉特后旗| 新龙县| 阜南县| 上思县| 突泉县| 墨玉县| 泰兴市| 涿鹿县| 婺源县| 长海县| 阜阳市| 南汇区| 扬州市| 荣成市| 威海市| 汝城县|