月度归档:2017年04月

[线程池]ScheduledExecutorService

#延迟执行队列原型
TODO

private static void scheduledPool() {

  ScheduledExecutorService service = Executors.newScheduledThreadPool(5);

  //延迟执行 5秒钟后执行
  service.schedule(() -> {
    System.out.println("schedule");
    return 1L;
  }, 5, TimeUnit.SECONDS);

  //周期性的执行 5首次延迟时间 3之后延迟时间
  service.scheduleAtFixedRate(() -> {
    System.out.println("scheduleAtFixedRate");
  }, 5, 3, TimeUnit.SECONDS);

}

 

[线程池]ThreadPoolExecutor

#线程池的几个概念

我先说一下它的基本实现模型,预先开启几个线程然后轮训的从队列中获取任务,如果获取到任务则在线程中执行任务,如果没有获取到任务则一直阻塞着。阻塞着的线程会变为WAITING状态。
ps:上面说的只是一个大概模型,实际的ThreadPoolExecutor,会做一些优化,线程不是事先创建,而是当任务来临时创建,任务也不是直接加入队列,而是先判断是否有空余线程来执行,没有才加入队列。

ThreadPoolExecutor会将任务交给线程池中的线程来处理,具体的配置需要使用Executors的工厂方法来创建ThreadPoolExecutor,其中指定了线程数量和队列类型。

#线程池解决什么问题
解决处理大量异步任务,减少线程的切换次数

为了应对不同的使用场景,Executors类提供了这几种不同配置的线程池
Executors.newCachedThreadPool() 没有限制线程数量,并且线程自动回收
Executors.newFixedThreadPool(int) 固定的线程数量
Executors.newSingleThreadExecutor() 一个线程

如果想要调优线程池,阅读下面的指南。

#核心线程数量和最大线程数量

1、线程数量核心
corePoolSize 核心线程数量,线程一直运行(也可以理解成最小线程数量)
poolSize 当前线程数量
maximumPoolSize 最大线程数量
corePoolSize <= poolSize <= maximumPoolSize

2、线程数量变化的规则
ThreadPoolExecutor会在corePoolSize和maximumPoolSize之间自动的调节线程数量,如果有一个新任务被提交,并且当前线程数量小于corePoolSize,那么便会创建一个新的线程来执行这个任务,也不管之前的线程是不是空闲,
如果当前线程数量大于corePoolSize并且小于maximumPoolSize,那么只会在队列已满的情况下再创建新的线程来执行任务。

3、用途说明
如果把maximumPoolSize设置成最大值 例如Integer.MAX_VALUE,你会创建一个可以处理任意数量任务的线程池 newCachedThreadPool
如果corePoolSize和maximumPoolSize相同,那么会创建一个线程数量固定的线程池。 newFixedThreadPool

4、动态修改线程数量
通常corePoolSize和maximumPoolSize是在构造方法中设置的,但是你也可以通过setCorePoolSize(int)和setmaximumPoolSize(int)来动态改变他们的数量

#线程什么时候创建
默认情况下,线程是在有新任务来临的时候才会创建并启动,但是我们可以通过prestartCoreThread()和prestartAllCoreThreads()预先启动一个或所有核心线程。这种场景适合再空队列的时候也有线程在运行

#创建线程
线程是通过ThreadFactory来创建的,默认的是通过 Executors.defaultThreadFactory()来创建,它会拥有相同的ThreadGroup和相同的NORM_PRIORITY优先级并且是非后台线程
通过指派ThreadFactory,你可以设置线程的名字,线程组,优先级,是否是后台线程,等等。
如果ThreadFactory创建线程失败,那么线程池可能无法执行任何任务,虽然线程池还在。

如果线程不具有modifyThread运行时权限,就会出现问题,例如更改配置不会立即生效,线程池不能被关闭

#存活时间
存活时间值的是当线程数量大于corePoolSize时,多余的线程如果闲置的时间超过 keepAliveTime就会终止掉。
这是为了减少资源的浪费,如果之后任务增多,那么还会再重新创建线程。
如果设置超时时间为 Long.MAX_VALUE TimeUnit.NANOSECONDS,那么线程只会在线程中关闭前被中止
默认对线程超时时间的判断策略只会发生在非核心线程上面[只会在线程数量大于corePoolSize的时候进行],但是可以通过设置allowCoreThreadTimeOut(boolean) ,在核心线程上也运行该策略

#队列
任意的BlockingQueue[接口]阻塞队列,都可以实现保存和传输任务的功能。队列的使用和线程池大小有密切关系。
1.如果小于corePoolSize,线程池会添加新线程来执行并不会把任务加入队列。
2.如果大于corePoolSize,会把任务加入队列中。
3.(?队列大小限制)如果超过队列大小,并且当前线程数量小于maximumPoolSize,就会创建新线程来执行任务,否则会拒绝任务抛出异常。
*
可以把队列分为三种
1.Direct handoffs. 传递性队列[只能有一个]。获取或创建线程,有就获取没有就创建,阻塞版本,线程安全。
例如 SynchronousQueue,它只负责在线程间传递任务,并不保存这个任务。
它的一个实际用途是,当我们往队列中添加一个任务,每当当前没有线程能立刻来处理它,所以便会创建一个新的线程来处理它。【实现代码 synchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)】
这个策略解决了有大量任务时获取线程的死锁问题。 使用传递性队列,需要把maximumPoolSizes设置成最大,来避免出现拒绝任务执行的情况,这也会出现导致在线程不断疯长,当处理速度远低于生产速度。

2.Unbounded queues 无边界大小的队列[无限大小]
例如 LinkedBlockingQueue,当有新任务来临,并且核心线程都在执行任务,这时候它会把新任务加入到队列中然后等待线程执行。
在无边界大小的队列中maximumPoolSizes就无效了,因为线程数量不会超过corePoolSize。
这种对于任务之间互相独立的场景非常合适,例如web服务器,它可以承接无限大的任务数量,并且只需要很小的线程数量来执行,这可以解决超大并发的问题,可以让并发更平缓的执行。

3.Bounded queues 有边界大小的队列[有限大小]
例如 ArrayBlockingQueue,它能在有限的maximumPoolSizes下,防止资源枯竭。
队列大小和线程数量可能会相互影响
使用大的队列和小的线程池,会减少CPU使用,系统资源分配,线程上下文切换,但是会导致低吞吐量(每秒钟处理任务数量),如果任务被频繁的阻塞例如IO操作,系统可能要花费更多的时间。
使用小的队列和大的线程池,会增加CPU使用,但会造成更多的系统开销,也有可能会降低吞吐量。

#拒绝任务
执行execute()加入新任务时,如果线程池已经shutdown或者最大线程数量到头和队列容量也满了的情况下任务会被拒绝执行。
拒绝的任务会通过RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)抛出异常,总共有4种异常类型。
1.ThreadPoolExecutor.AbortPolicy
2.ThreadPoolExecutor.CallerRunsPolicy
3.ThreadPoolExecutor.DiscardPolicy
4.ThreadPoolExecutor.DiscardOldestPolicy

#钩子回调方法
beforeExecute(Thread, Runnable)
afterExecute(Runnable, Throwable)

#队列操作
通过getQueue()可以获得在线程池中的队列,但不推荐使用,除非你是为了调试debug队列,或者你有很大的队列数量你想取消其中的一些任务。

#确保线程池能够终止
如果你想确保线程池能彻底的关闭,而不让它一直运行,即使不掉用shutdown,也让它自动关闭,可以通过设置allowCoreThreadTimeOut(boolean)可以让线程池中的线程自动关闭。

[线程池]ExecutorService 讲解

#ExecutorService提供的功能
ExecutorService提供了关闭线程池的功能,并且会把Runnable和Callable包装成FutureTask来执行,扩展出了获取返回值和捕获异常的功能。

#核心操作对象
其中主要操作是是:线程池中的队列,线程池中运行的线程,线程池的状态

#线程池状态
RUNNING
SHUTDOWN
STOP
TIDYING
TERMINATED

#取消任务
取消的任务分为两种
1. 在队列中还未运行
2. 在线程中运行
如果在队列中则从队列中移除,如果是在运行中则发出中断线程信号,中断任务执行。

#终止线程
终止线程池也有分上面两种,一种是等待所有任务执行完毕,在让线程自然消亡,另一种是先取消任务,再让线程池消亡。 awaitTermination方法也很有意思,它是阻塞式用来检测线程池是否消亡。

#Api

拒绝新增任务,在队列中的任务和正在执行的任务继续执行
任务执行完毕,线程死亡

void shutdown();

 

拒绝新增任务,还在队列中的任务会被清空,对正在执行的任务的线程发出InterruptedException中止信号
任务执行完毕,线程死亡

List shutdownNow();

 

不是正在运行中返回true

boolean isShutdown();

 

判断线程池状态是否为TERMINATED

boolean isTerminated();

 

阻塞着等待线程池结束

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

 

执行任务,可获得任务执行结果,和可捕获异常

 Future submit(Callable task);

 

执行任务,可获得result结果,和可捕获异常

 Future submit(Runnable task, T result);

 

执行任务,无返回结果,可捕获异常

Future<?> submit(Runnable task);

 

批量执行任务
等到所有任务执行完毕或者抛出异常,才会返回

 List<Future> invokeAll(Collection<? extends Callable> tasks) throws InterruptedException;

 

批量执行任务
任务超时没有完成会发送中断信号进行取消

 List<Future> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException;

 

批量执行任务,只获取最先执行完成的结果
返回一批任务中最早执行完的,其他没完成执行的则不再执行

 T invokeAny(Collection<? extends Callable> tasks) throws InterruptedException, ExecutionException;

 

增加超时判断

 T invokeAny(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

 

下面是测试代码,大家可以运行体会一下。

public class ExecutorServiceAPI {
	
	
	public static class Task implements Callable<Long> {

		@Override
		public Long call() throws Exception {
			System.out.println("Task");
			return 1L;
		}
	}
	
	public static class LongTimeTask implements Callable<Long> {
		
		@Override
		public Long call() throws Exception {
			System.out.println("LongTimeTask");
			Thread.sleep(10000);
			return 2L;
		}
	}
	
	public static class ExtraTask implements Callable<Long> {
		
		@Override
		public Long call() throws Exception {
			System.out.println("ExtraTask");
			Thread.sleep(10000);
			return 3L;
		}
	}
	
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		// shutdown();
		// shutdownNow();
		// isShutdownAndIsTerminated();
		// submitRunnableAndResult();
		// invoke();
		cancel();
	}
	
	/*
	 * shutdown
	 * 1、拒绝新增任务,在队列中的任务和正在执行的任务继续执行
	 * 2、任务执行完毕,线程死亡
	 * 
	 * awaitTermination
	 * 1.阻塞着等待线程池结束
	 */
	public static void shutdown() throws InterruptedException {
		ExecutorService service = Executors.newFixedThreadPool(4);
		service.submit(new LongTimeTask());//正在执行的任务
		service.submit(new LongTimeTask());//正在执行的任务
		service.submit(new LongTimeTask());//正在执行的任务
		service.submit(new LongTimeTask());//正在执行的任务
		service.submit(new ExtraTask());//在队列中的任务
		
		service.shutdown();//待分析中断原理
		try {
			service.submit(new Task());
		} catch (Exception e) {
			System.out.println("禁止提交新任务");
		}
		
		while (!service.awaitTermination(1, TimeUnit.SECONDS)) {
			System.out.println("线程池没有关闭");
		}
		System.out.println("线程池已经关闭");
	}
	
	/*
	 * 1、拒绝新增任务,还在队列中的任务会被清空,对正在执行的任务的线程发出InterruptedException中止信号
	 * 2、任务执行完毕,线程死亡
	 */
	public static void shutdownNow() throws InterruptedException {
		ExecutorService service = Executors.newFixedThreadPool(4);
		Future<Long> future1 = service.submit(new LongTimeTask());//正在执行的任务
		Future<Long> future2 = service.submit(new LongTimeTask());//正在执行的任务
		Future<Long> future3 = service.submit(new LongTimeTask());//正在执行的任务
		Future<Long> future4 = service.submit(new LongTimeTask());//正在执行的任务
		Future<Long> future5 = service.submit(new ExtraTask());//在队列中的任务
		
		List<Runnable> inQueue = service.shutdownNow();//待分析中断原理
		for (Runnable runnable : inQueue) {
			System.out.println("还在队列中的任务:" + runnable);
		}
		try {
			service.submit(new Task());
		} catch (Exception e) {
			System.out.println("禁止提交新任务");
		}
		
		while (!service.awaitTermination(1, TimeUnit.SECONDS)) {
			System.out.println("线程池没有关闭");
		}
		
		System.out.println("线程池已经关闭");
		
		printReulst(future1, "future1");
		printReulst(future2, "future2");
		printReulst(future3, "future3");
		printReulst(future4, "future4");
		printReulst(future5, "future5");
		System.out.println("die");
	}

	/**
	 * 打印任务
	 */
	public static void printReulst(Future<Long> future, String tag) {
		try {
			future.get(1, TimeUnit.SECONDS);
		} catch (InterruptedException | ExecutionException | TimeoutException e) {
			System.out.println(tag + " : " + e.getMessage());
		}
	}
	
	/**
	 * shutdown 是否执行过,只要线程池状态不是RUNNING状态就返回true
	 *   RUNNING
	 *   SHUTDOWN
	 *   STOP
	 *   TIDYING
	 *   TERMINATED
	 * isTerminated 判断线程池状态是否为TERMINATED
	 */
	public static void isShutdownAndIsTerminated() {
		ExecutorService service = Executors.newFixedThreadPool(4);
		service.submit(new LongTimeTask());
		service.submit(new LongTimeTask());
		service.submit(new LongTimeTask());
		service.submit(new LongTimeTask());
		service.shutdown();//设置为SHUTDOWN状态
		//service.shutdownNow();//设置为STOP状态
		System.out.println("isShutDown : " + service.isShutdown());
		
		while (!service.isTerminated()) {
			
		}
		System.out.println("线程池终止");
	}
	
	/**
	 * Future<?> submit(Runnable task)
	 * 
	 * <T> Future<T> submit(Callable<T> task)
	 * 
	 * <T> Future<T> submit(Runnable task, T result)
	 */
	public static void submitRunnableAndResult() {
		ExecutorService service = Executors.newFixedThreadPool(4);
		
		//Runnbale: 捕获异常
		Future<?> f = service.submit(new Runnable() {
			
			@Override
			public void run() {
				int a = 1 / 0;
			}
		});
		try {
			System.out.println(f.get());
		} catch (InterruptedException | ExecutionException e) {
			System.out.println("Runnable 异常 : " + e.getMessage());
		}
		
		//Runable: 捕获异常,也可返回预先设置的结果。
		Future<String> stringf = service.submit(new Runnable() {
			
			@Override
			public void run() {
				//int a = 1 / 0;
			}
		}, "result");
		try {
			System.out.println(stringf.get());
		} catch (InterruptedException | ExecutionException e) {
			System.out.println("Runnable Result 异常 : " + e.getMessage());
		}
		
		//Callable捕获异常并获得返回值
		Future<Long> longF = service.submit(new Callable<Long>() {
			@Override
			public Long call() throws Exception {
				int a = 1 / 0;
				return 1L;
			}
		});
		try {
			System.out.println(longF.get());
		} catch (InterruptedException | ExecutionException e) {
			System.out.println("Callable 异常 : " + e.getMessage());
		}
	}
	
	public static void invoke() {
		ExecutorService service = Executors.newFixedThreadPool(4);
		List<Callable<Long>> tasks = new ArrayList<>();
		for (int i = 0; i < 10; i++) {
			tasks.add(new Callable<Long>() {
				@Override
				public Long call() throws Exception {
					long sleeptime = new int[] {500, 800 , 1000, 2000, 300, 100, 200, 400, 600, 700}[new Random().nextInt(10)];
					Thread.sleep(sleeptime);
					System.out.println("A2 " + sleeptime);
					//int a = 1 / 0;
					return sleeptime;
				}
			});
		}
		//invokeAll
		try {
			//等到所有任务执行完毕或者抛出异常,才会返回
			//List<Future<Long>> futures = service.invokeAll(tasks);
			//任务超时没有完成会发送中断信号进行取消
			List<Future<Long>> futures = service.invokeAll(tasks, 1, TimeUnit.SECONDS);
			System.out.println("futures size : " + futures.size());
			
			for (Future<Long> future : futures) {
				try {
					System.out.println(future.get());
				} catch (ExecutionException | CancellationException e) {
					System.out.println("异常:" + e);
				}
			}
		} catch (InterruptedException e) {
			System.out.println(e.getMessage());
		}
		
		//invokeAny
		try {
			// 返回一批任务中最早执行完的,其他没完成执行的则不再执行
			Long oneResult = service.invokeAny(tasks);
			System.out.println("oneResult : " + oneResult);
		} catch (InterruptedException | ExecutionException e) {
			System.out.println(e.getMessage());
		}		
		System.out.println("线程池仍在运行中");
	}
	
	/**
	 * Future<T>接口 取消任务
	 * 取消的任务分为两种
	 * 	1. 在队列中还未运行
	 *  2. 在线程中运行
	 * 如果在队列中则从队列中移除,如果是在运行中则发出中断线程信号,中断任务执行。
	 */
	public static void cancel() {
		ExecutorService service = Executors.newFixedThreadPool(4);
		Future<Long> f = service.submit(new LongTimeTask());
		service.submit(new LongTimeTask());
		service.submit(new LongTimeTask());
		service.submit(new LongTimeTask());
		Future<Long> queueF = service.submit(new ExtraTask());
		queueF.cancel(true);
		
		f.cancel(false);
		try {
			f.get();//抛出CancellationException异常
		} catch (InterruptedException | ExecutionException | CancellationException e) {
			System.out.println(e);
		}
		service.shutdown();
	}
}

 

参考链接 : http://victorzhzh.iteye.com/blog/1010359

[线程池]线程和任务

⌈请正确区分这两种的不同⌋

#线程和任务
先明白线程和任务的区别,线程是Thread,只能用Thread类来表示,Runnbale是一个任务,FutureTask也只是一个任务是一个有返回值并且可以捕获执行异常的任务[继承自Runnable和Future]。

线程执行是交由JVM调度来执行 Thread#run() 方法,然后在run方法中去执行任务,去执行重写的run,还是Runnable,还是FutureTask。抽象出来用代码这样表示

 class Thread {
   public void run() {
     ...
     //自己重写
     //Runnable or FutureTask
   }
 }

 

线程:Thread
任务:Runnable FutureTask(Callable)
最后一句Runnable和Callable不是线程,它们是在线程中执行的任务。

public class ThreadUse {

	// 用线程来执行任务的三种方式
	public static void main(String[] args) {
		m1();
		m2();
		m3();
	}

	/**
	 * 重写Thread run (我称之为自定义任务)
	 */
	private static void m1() {
		new Thread() {
			public void run() {
				System.out.println("thread run");
			};
		}.start();
	}

	/**
	 * Runnable 无返回值
	 */
	private static void m2() {
		new Thread(new Runnable() {

			@Override
			public void run() {
				System.out.println("runnable");
			}
		}).start();
	}

	/**
	 * FutureTask 有返回值
	 */
	private static void m3() {
		Callable<Long> callable = new Callable<Long>() {
			@Override
			public Long call() throws Exception {
				Thread.sleep(5000);
				return System.currentTimeMillis();
			}
		};
		FutureTask<Long> futureTask = new FutureTask<>(callable);
		new Thread(futureTask).start();
		try {
			System.out.println(futureTask.get()); //get()方法是阻塞的
		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
		}
	}
}

 

[线程池]Executor

#Executor
它定义了要执行一个任务 void execute(Runnable command);
它保证在未来的某一时刻执行这个任务,不能保证立即执行。任务在那个线程中执行需要看子类怎么实现,可能在调用者的线程中执行,也可能创建一个新线程中执行,也可能放到一个线程池中执行。

#说说Runnable
在这里先说道说道Runnable,它是一个任务不是一个线程,它的执行需要放到一个Thread中,它有两种执行方式

1.创建线程的时候把Runnable传进去(实际是在run方法中执行runnbale.run())

private static void runnable_1(Runnable runnable) {
  new Thread(runnable).start();
}

 

2.在线程的Runnbale方法中执行

private static void runnable_2(Runnable runnable) {
  new Thread() {
    @Override
    public void run() {
      runnable.run();			
    }
  }.start();;
}

 

#下面内容大致讲解了Executor的几种实现方式

Executor接口解耦的任务的执行细节比如线程的使用方式,scheduling线程调度等等,就是它只定义了执行任务,别的啥也没干。

#替代每次创建线程执行任务
Executor用来代替每次都要创建线程的方式,比如相比这样执行线程 new Thread(new RunnableTask()).start(),我们可以
这样来替代:

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

 

#使用调用者线程中同步执行代码
然而Executor接口也不一定要求都是异步执行任务,你也可以在调用者的线程中同步的执行代码

class DirectExecutor implements Executor {
  	public void execute(Runnable r) {
     	r.run();
  	}
}

 


#新建线程执行任务

更常见的是任务不能在调用者的线程中执行,需要单独新建一个线程执行,例如:

class ThreadPerTaskExecutor implements Executor {
  	public void execute(Runnable r) {
     	new Thread(r).start();
     }
}

 

#线性执行任务
一些功能强大的Executor实现类,更注重添加一些限制条件约束任务数量和线程数量,还决定任务什么时候执行。
下面的例子展示线性的执行任务

class SerialExecutor implements Executor {
     final Queue tasks = new ArrayDeque();
     final Executor executor;
     Runnable active;
 
     SerialExecutor(Executor executor) {
       this.executor = executor;
     }
 
     public synchronized void execute(final Runnable r) {
       tasks.offer(new Runnable() {
         public void run() {
           try {
             r.run();
           } finally {
             scheduleNext();
           }
         }
       });
       if (active == null) {
         scheduleNext();
       }
     }
 
     protected synchronized void scheduleNext() {
       if ((active = tasks.poll()) != null) {
         executor.execute(active);
       }
     }
 }

 

#异常:

RejectedExecutionException 如果无法接收任务时抛出异常
  NullPointerException 如果任务为null,则抛出此异常

 

这里是一个测试mani方法

public class Executor_Doc {
	
	public static void main(String[] args) {
		
		// 常见的线程方式
		new Thread(new RunnableTask()).start();
		
		// 在主线程中执行任务(在调用者线程中执行任务)
		new DirectExecutor().execute(new RunnableTask());
		
		// 创建新线程执行
		new ThreadPerExecutor().execute(new RunnableTask());
		
		// 线性执行任务(使用递归取值执行)
		SerialExecutor serialExecutor = new SerialExecutor(new ThreadPerExecutor());
		for (int i = 0; i < 5; i++) {
			serialExecutor.execute(new RunnableTask());
		}
	}
	
	/**
	 * 同步执行任务,在调用者的线程中执行,不另外创建新线程
	 */
	static class DirectExecutor implements Executor {
		@Override
		public void execute(Runnable command) {
			command.run();
		}
	}
	
	/**
	 * 通过新线程执行任务
	 */
	static class ThreadPerExecutor implements Executor {
		@Override
		public void execute(Runnable command) {
			new Thread(command).start();
		}
	}
	
	/*
	 * 装饰器模式
	 * 线性执行任务
	 */
	static class SerialExecutor implements Executor {
		
		final Queue<Runnable> tasks = new ArrayDeque<>();
		final Executor executor;
		Runnable active;
		
		public SerialExecutor(Executor executor) {
			this.executor = executor;
		}
		
		@Override
		public void execute(Runnable command) {
			System.out.println("SerialExecutor execute");
			tasks.offer(() -> {
				try {
					command.run();
				} finally {
					scheduleNext();
				}
			});
			if (active == null) {
				System.out.println("SerialExecutor active == null");
				scheduleNext();
			}
		}
		
		/**
		 * execute和scheduleNext循环递归调用
		 */
		protected synchronized void scheduleNext() {
			System.out.println("SerialExecutor scheduleNext");
			if ((active = tasks.poll()) != null) {
				executor.execute(active); // 由实际的Executor执行
			} else {
				System.out.println("SerialExecutor scheduleNext active == null");
			}
		}
	}	

	/**
	 * 任务
	 */
	static class RunnableTask implements Runnable {
		@Override
		public void run() {
			System.out.println("RunnableTask");
		}
	}
	
	/**
	 * Runnable执行的两种方式
	 * 1.创建线程的时候把Runnable传进去(实际是在run方法中执行runnbale.run())
	 * 2.在线程的Runnbale方法中执行
	 */
	private static void runnable_1(Runnable runnable) {
		new Thread(runnable).start();
	}
	
	private static void runnable_2(Runnable runnable) {
		new Thread() {
			@Override
			public void run() {
				runnable.run();
			}
		}.start();;
	}
}

 

[redis]基本命令

#字符串
    key : string
#列表
    key : [a,a,b,b] 有序  可重复
#集合
    key : [a,d,b,c] 无序 不可重复
#有序集合
    key : [a-1,b-2,c-3,d-4] 有序 可重复
#hash
    key : [key:v, key:v]

这里我以增删改查和其他来说明命令,这样能让大家更好的记住.

package com.liuzhuang.redis;


import java.awt.RenderingHints.Key;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import redis.clients.jedis.Jedis;

/**
 * redis操作命令
 */
public interface Operator {

  //增加
  void add();

  //删除
  void dele();

  //修改
  void modify();

  //查询
  void query();
  
  //其他
  void other();

  Jedis jedis = new Jedis("localhost");

  public static void main(String[] args) {
    Operator operate = null;
    operate = new StringCommand();
    operate = new ListCommand();
    operate = new SetCommand();
    operate = new ZsetCommand();
    operate = new HashCommand();
    operate.add();
    operate.modify();
    operate.query();
    operate.dele();
  }

  /**
   *  ------  ------ 
   * |   k  ||   v  |
   *  ------   ------
   */
  static class StringCommand implements Operator {

	private String key = "title";

    @Override
    public void add() {
      String value = jedis.set(key, "乐视这个声明有些难以自圆其说啊,比如“在易到单独贷款困难情况下,");
      System.out.println(value);
    }

    @Override
    public void dele() {
      Long result = jedis.del(key);
      System.out.println(result);
    }

    @Override
    public void modify() {
      String value = jedis.set(key, "乐视控股以名下乐视大厦为抵押物,以易到为主体");
      System.out.println(value);
    }

    @Override
    public void query() {
      String value = jedis.get(key);
      System.out.println(value);
    }
    
    @Override
    public void other() {
      
    }
  }

  /**
   *  ------  ------  ------  ------  ------
   * |   k  ||   1  ||   2  ||  2   ||   4  |
   *  ------   ------  ------  ------  ------
   */
  static class ListCommand implements Operator {
	
	private String key = "list";

    @Override
    public void add() {
      //lpush
      for (int i = 5; i > 0; i--) {
        Long result = jedis.lpush(key, "article:" + i);
        System.out.println("lpush:" + result);
      }
      //rpush
      for (int i = 6; i <= 10; i++) {
        Long result = jedis.rpush(key, "article:" + i);
        System.out.println("rpush:" + result);
      }
    }

    @Override
    public void dele() {
      //lpop
      String value = jedis.lpop(key);
      System.out.println("lpop:" + value);
      
      //rpop
      value = jedis.rpop(key);
      System.out.println("rpop:" + value);
      
      //lrem 删除列表中的某一项 cout是指删除几个这样的值 >0从左往右 <0从右往左 =0全部删除
      Long result = jedis.lrem(key, 1, "article:6");
      System.out.println("lrem:" + result + " 结果集:" + jedis.lrange("list", 0, -1));
      
      //ltrim
      value = jedis.ltrim(key, 0, 1);
      System.out.println("ltrim:" + value + " 结果集:" + jedis.lrange("list", 0, -1));
      
      //del
      result = jedis.del(key);
      System.out.println("del:" + result);
    }

    @Override
    public void modify() {
      //lset
      String reuslt = jedis.lset(key, 0, "article:1111");
      System.out.println("lset:" + reuslt);
    }

    @Override
    public void query() {
      //lrange
      List result = jedis.lrange(key, 0, -1);
      System.out.println("lrange:" + result);
      
      //lindex
      String value = jedis.lindex(key, 0);
      System.out.println("lindex:" + value);
      
      //llen
      Long length = jedis.llen(key);
      System.out.println("llen:" + length);
    }

    @Override
    public void other() {
      
    }
  }

  /**
   *  ------  ------  ------  ------  ------
   * |   k  ||   c  ||   a  ||  d   ||   b  |
   *  ------   ------  ------  ------  ------
   */
  static class SetCommand implements Operator {

    @Override
    public void add() {
      //sadd
      Long result = jedis.sadd("set", "a", "b", "c");
      System.out.println("sadd:" + result);
    }

    @Override
    public void dele() {
      //srem
      Long result = jedis.srem("set", "c");
      System.out.println("srem:" + result + " 结果集:" + jedis.smembers("set"));
      
      //spop
      String value = jedis.spop("set");
      System.out.println("spop:" + value + " 结果集:" + jedis.smembers("set"));
      
      //smove
      result = jedis.smove("set", "set_2", "b");
      System.out.println("smove:" + result);
      System.out.println(jedis.smembers("set"));
      System.out.println(jedis.smembers("set_2"));
      
      //del
      result = jedis.del("set", "set_2");
      System.out.println("del:" + result);
    }

    @Override
    public void modify() {
      
    }

    @Override
    public void query() {
      //smembers
      Set set = jedis.smembers("set");
      System.out.println("smembers:" + set);
      
      //sismember
      Boolean isExist = jedis.sismember("set", "a");
      System.out.println("sismember:" + isExist);
      
      //scard 集合大小
      Long count = jedis.scard("set");
      System.out.println("scard:" + count);
      
      //srandmember 随机返回个数
      List list = jedis.srandmember("set", 2);
      System.out.println("srandmember:" + list);
    }

    @Override
    public void other() {
      
    }
  }

  /**
   *  ------  --------  -------  -------  ---------
   * |   k  ||   c 7  ||   a 8 ||  d 9  ||   b 10  |
   *  ------   -------  -------  -------  ---------
   */
  static class ZsetCommand implements Operator {

    @Override
    public void add() {
      //zadd
      Long result = jedis.zadd("zset", 7, "c");
      System.out.println("zadd:" + result);
      result = jedis.zadd("zset", 8, "a");
      System.out.println("zadd:" + result);
      result = jedis.zadd("zset", 9, "d");
      System.out.println("zadd:" + result);
      result = jedis.zadd("zset", 10, "b");
      System.out.println("zadd:" + result);
    }

    @Override
    public void dele() {
      //zrem
      Long result = jedis.zrem("zset", "b");
      System.out.println("zrem:" + result);
      
      //del
      result = jedis.del("zset");
      System.out.println("del:" + result);
    }

    @Override
    public void modify() {
      //zadd
      Double result = jedis.zincrby("zset", 99.0, "b");
      System.out.println("zadd:" + result);
    }

    @Override
    public void query() {
      //zcard
      Long count = jedis.zcard("zset");
      System.out.println("zcard:" + count);
      
      //zcount
      count = jedis.zcount("zset", 88, 99);
      System.out.println("zcount:" + count);
      
      //zrank
      Long rank = jedis.zrank("zset", "c");
      System.out.println("zrank:" + rank);
      
      //zscore
      Double score = jedis.zscore("zset", "c");
      System.out.println("zscore:" + score);
      
      //zrange
      Set set = jedis.zrange("zset", 0, -1);
      System.out.println("zrange:" + set);
      
      //zrangeByScore
      set = jedis.zrangeByScore("zset", 7, 9);
      System.out.println("zrangeByScore:" + set);
    }

    @Override
    public void other() {
      
    }
  }

  static class HashCommand implements Operator {

    private String key = "article:1";
    
    @Override
    public void add() {
      //hset
      Long result = jedis.hset(key, "name", "jack");
      System.out.println("hset:" + result);
      result = jedis.hset(key, "age", "18");
      System.out.println("hset:" + result);
      
      //hmset
      Map hash = new HashMap<>();
      hash.put("city", "杭州");
      String v = jedis.hmset(key, hash);
      System.out.println("hmset:" + v);
      
    }

    @Override
    public void dele() {
      //hdel
      Long result = jedis.hdel(key, "name");
      System.out.println("hdel:" + result);
      
      //del
      jedis.del(key);
    }

    @Override
    public void modify() {

    }

    @Override
    public void query() {
      //hget
      String name = jedis.hget(key, "name");
      System.out.println("hget:name:" + name);
      
      //hgetAll
      Map result = jedis.hgetAll(key);
      for (String k : result.keySet()) {
        System.out.println("hgetAll:" + k + ":" + result.get(k));
      }
      
      //hlen
      Long count = jedis.hlen(key);
      System.out.println("hlen:" + count);
      
      //hexists
      Boolean b = jedis.hexists(name, "name");
      System.out.println("hexists:" + b);
      
      //hkeys
      Set keys = jedis.hkeys(key);
      System.out.println("hkeys:" + keys);
      
      //hvals
      List vals = jedis.hvals(key);
      System.out.println("hval:" + vals);
    }

    @Override
    public void other() {
      
    }
  }
}

 

为什么NIO要以块为单位

在这里我们还是用基本IO来证明块的重要性.

#基本IO以字节为基础,新IO是以块为基础

下面的代码展示缓冲的实际效果,在IO流中影响超大.

  public static void main(String[] args) {
    new Thread(() -> {
      long last = System.currentTimeMillis();
      try {
        byte[] buf = new byte[1024];
        FileInputStream fileInputStream = new FileInputStream("");
        while (fileInputStream.read(buf) != -1) {
          //read(buf)对应的native方法 private native int readBytes(byte b[], int off, int len) throws IOException;
          //System.out.println(new String(buf));
        }
        fileInputStream.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
      System.out.println("read over " + (System.currentTimeMillis() - last) );
    }).start();
  }

 

测试结果

  //文件大小 10.03G
  //1b                每秒也就1~2M
  //1kb 1024 15.361s  668M/s
  //2kb 2048 12.845s  799M/s
  //3kb 3072 12.408s  827M/s
  //4kb 4096 11.829s  868M/s
  //1M       11.462s 

 

因为IO访问开销很大,如果以块为单位进行读写能加快读写速度[降低访问频率,增加传输量率]

字符流

字符流比字节流更高级一点,api和设计思路上基本一致,增强的是是把字节解码为字符,根据编码集解析字节为字符,有一点注意,java语言内置char类型在内存中是由2个字节表示,但是后来UNICODE标准修改,utf-16也可以表示4个字节,于是java做出修改,对于增补字符[4个字节]需要用2个char来解析. 增补字符不能用char表示. 字符流嵌套在字节流上使用.

来源主要有三种地 1.程序内字节字符 2.文件 3.网络

-CharArrayWriter
-OutputStreamWriter
-FileWriter[实际是OutputStreamWriter]

-CharArrayReader
-InputStreamReader
-FileReader[实际是InputStreamReader]

package com.liuzhuang.io;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.CharArrayReader;
import java.io.CharArrayWriter;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.StringWriter;


/**
 * char buf[] : 基于字符的IO流
 * 字符: 2个字节,utf-16be
 * 
 * Writer 写入时加锁Object
 *  -CharArrayWriter
 *  -OutputStreamWriter
 *  -FileWriter
 * 
 * Reader
 *  -CharArrayReader
 *  -InputStreamReader
 *  -FileReader
 */
public class CharIO {

  /**
   * writer 注意事项
   * 1.writer有1kb的缓存char数组(char[] writeBuffer), 作用是在write(int c)和write(String str)时避免重新new字节数组.
   * 2.从字符串中获得char字节通过 str.getChars(off, (off + len), cbuf, 0) 
   */
  public void write() throws IOException {
    
    // FilterWriter 装饰器模式
    
    // 字符数组
    CharArrayWriter charArrayWriter = new CharArrayWriter();
    charArrayWriter.write("你好"); // str.getChars(off, off + len, buf, count);
    charArrayWriter.write("hello");
    System.out.println(charArrayWriter.toString());;
    
    // 写缓存,先写入缓存,超过大小后,再实际写入流
    BufferedWriter bufferedWriter = new BufferedWriter(charArrayWriter);
    bufferedWriter.write("你好");
    System.out.println(charArrayWriter.toString());
    bufferedWriter.flush(); // flush可强制刷新到实际流中
    System.out.println(charArrayWriter.toString());
    
    // 连接字节流转换为字符流
    try {
      OutputStreamWriter outputStreamWriter = new OutputStreamWriter(new ByteArrayOutputStream(), "utf-8");
      outputStreamWriter.write("你好");
    } catch (IOException e) {
      e.printStackTrace();
    }
    
    // 写文件
    try {
      FileWriter fileWriter = new FileWriter("test2");
      fileWriter.write("你好");
      fileWriter.write(109990);
      fileWriter.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
    
    // 打印字节流
    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
    PrintWriter streamPrintWriter = new PrintWriter(byteArrayOutputStream);
    streamPrintWriter.write("hello");
    streamPrintWriter.flush();
    System.out.println(byteArrayOutputStream.toString());
    
    // 打印字符流
    PrintWriter printWriter = new PrintWriter(charArrayWriter);
    printWriter.write("你好");
    System.out.println(charArrayWriter.toString());
    
    // 字符串
    StringWriter stringWriter = new StringWriter();
    stringWriter.write("你好");
    
    // PipedWriter
  }
  
  public void read() throws IOException {
    
    // char数组
    System.out.println("#char数组");
    char[] chars = {'你', '好', '\r', '世', '界'};
    CharArrayReader charArrayReader = new CharArrayReader(chars);
    System.out.println((char)charArrayReader.read());
    System.out.println((char)charArrayReader.read());
    
    // FilterReader 装饰器模式类基类
    
    // 连接字节流转为字符流 InputStreamReader
    System.out.println("#InputStreamReader");
    ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream("你好".getBytes());
    InputStreamReader inputStreamReader = new InputStreamReader(byteArrayInputStream);
    //System.out.println((char)inputStreamReader.read());
    //System.out.println((char)inputStreamReader.read());
    
    // 缓冲字符数组,默认8k的缓冲区,可一次读一行
    System.out.println("#缓冲字符数组");
    BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
    //System.out.println((char)bufferedReader.read());
    System.out.println(bufferedReader.readLine());
    
    // 连接文件, 但是对于字节是4的字符,需要用2个char读取
    System.out.println("#读取文件");
    inputStreamReader = new InputStreamReader(new FileInputStream("test"));
    char[] data = new char[2];
    inputStreamReader.read(data);
    inputStreamReader.close();
    System.out.println(new String(data));
    
    // FileReader继承自InputStreamReader 等同上面
    System.out.println("#读取文件2");
    FileReader fileReader = new FileReader("test");
    System.out.println((char)fileReader.read());
    fileReader.close();
    
    special();
  }
  
  /**
   * 特例字符
   * @throws IOException 
   */
  private void special() throws IOException {
    System.out.println("#special");
    String specialChar = ""; // 字符占4个字节 增补字符
    InputStreamReader inputStreamReader = new InputStreamReader(new ByteArrayInputStream(specialChar.getBytes()));
    char[] data = new char[2];
    inputStreamReader.read(data);
    inputStreamReader.close();
    System.out.println(new String(data));
  }
}

 

字节流和字符流

#编码
java用char表示字符,一个字符由两个字节存储,存储格式由编码决定,不同的编码格式保存的字节序列不同。

#疑问
我们平常编码格式都是utf-8,解析数据时也按照utf-8解析,所以在 “编”.getBytes(“utf-8”).length 中发现”编”这个字符串有3个字节,而不像java规范中的char占2个字节,这是因为java内部编码是按照utf-16be进行存储。
而我们调用的String.getBytes()是讲utf-16be编码转换为另一种编码格式,取出来字节序列和实际存储的序列就不一样。

#验证

/**
 * 验证java内部编码
 */
private static void innerCharset() throws IOException {
    CharArrayWriter charArrayWriter = new CharArrayWriter();
    charArrayWriter.write('你');
    for (char c : charArrayWriter.toCharArray()) {
        System.out.print(Integer.toHexString(c) + " ");
    }

    ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream("你".getBytes("utf-16be"));
    int byteread;
    while ((byteread = byteArrayInputStream.read()) != -1) {
        System.out.print(Integer.toHexString(byteread) + " ");
    }
}

 

/**
 *  相同字符在不同编码下所占字节数
 */
private static void charEncode() {
    String str = "你";
    try {
        System.out.println("你 utf-16be :" + str.getBytes("utf-16be").length + " 个字节"); // 2个字节(java内部默认编码)
        System.out.println("你 utf-16le :" + str.getBytes("utf-16le").length + " 个字节"); // 2个字节
        System.out.println("你 utf-16   :" + str.getBytes("utf-16").length + " 个字节"); // 4个字节
        System.out.println("你 utf-8    :" + str.getBytes("utf-8").length + " 个字节"); // 3个字节(常用编码)
    } catch (UnsupportedEncodingException e1) {
        e1.printStackTrace();
    }
}

 

#总结
字节序列和编码息息相关,有编码格式才可以有字节序列,存入读取皆如此。

#扩展
可惜UTF-16在Java设计之初还是真的定长编码,后来Unicode涵盖的字符变多了之后UTF-16变成了坑爹的变长编码(一个完整的“字符”是一个code point;一个code point可以对应1到2个code unit;一个code unit是16位),Java也只好跟进。为了实现UTF-16的变长编码语义,Java规定char仍然只能是一个16位的code point,也就是说Java的char类型不一定能表示一个UTF-16的“字符”——只有只需1个code unit的code point才可以完整的存在char里。但String作为char的序列,可以包含由两个code unit组成的“surrogate pair”来表示需要2个code unit表示的UTF-16 code point。为此Java的标准库新加了一套用于访问code point的API,而这套API就表现出了UTF-16的变长特性。
其实是说:本来UTF-16是定长的2个字节,一个code point对应一个code unit, 但是后来出现了4字节的,因为4字节的字符不能用char表示了,所以要用两个char来表示,还增加了codePointAPI来获取codepoint和codeunit
作者:RednaxelaFX
链接:https://www.zhihu.com/question/27562173/answer/37188642

下面代码是上面的上面扩展的说明 所以我们找一个4字节的字符来验证这个情况

  // 占4个字节的字符无法用char表示,但是可以用字符串来表示2个code unit
  private static void ext() {
    String str = "\uD834\uDD2A"; //U+1D12A \uD834\uDD2A
    int not_really__the_length = str.length(); // value is 2, which is not the length in characters
    int actual_length = str.codePointCount(0, str.length()); // value is 1, which is the length in characters
    
    System.out.println(str);
    System.out.println(not_really__the_length);
    System.out.println(actual_length);
    http://rosettacode.org/wiki/String_length#Java
  }

 

InputStream OutPutStream

这篇文章讲解基于字节的输入输出流.
实际上流分为两种流,一种是实际流,包含真实数据的流,另一种是装饰流,装饰流在实际流的基础上添砖加瓦,更能锦上添花.
从数据源来讲可以分为两种 一种是字节数组 另一种是File文件

#字节输出流 写入
OutputStream 输出流的抽象类,定义了输出流格式,超类,其子类要实现writ方法并写入字节

实际流
ByteArrayOutputStream 写入字节数组
FileOutputStream 写入文件
PipedOutputStream 写入管道

装饰流 锦上添花
FilterOutputStream 方便增加代理
BufferedOutputStream 缓冲数组
DataOutputStream 基本数据类型
ObjectOutputStream 对象数据类型序列化保存

#字节输入流 读取

InputStream 输入流超类,定义了读行为.

实际流
ByteArrayInputStream 读取数组字节
FileInputStream 文件

装饰流
BufferedInputStream 读缓冲
DataInputStream java基本类型
ObjectInputStream java对象
SequenceInputStream 顺序读多个流
PushbackInputStream 允许回退修改数据流

package com.liuzhuang.io;
package com.liuzhuang.io;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PushbackInputStream;
import java.io.SequenceInputStream;
import java.lang.reflect.Field;

/**
 * 字节流 基于InputStream读取流和OutputStream写入流
 * 
 * http://docs.oracle.com/javase/7/docs/api/java/io/InputStream.html
 * http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html
 * 装饰器模式
 */
public class ByteIO {

  /**
   * 字节输出流 写入
   * 
   * OutputStream 输出流的抽象类,定义了输出流格式,超类,其子类要实现writ方法并写入字节
* * 实际流 * ByteArrayOutputStream 写入字节数组 * FileOutputStream 写入文件 * PipedOutputStream 写入管道 * * 装饰流 锦上添花 * FilterOutputStream 方便增加代理 * BufferedOutputStream 缓冲数组 * DataOutputStream 基本数据类型 * ObjectOutputStream 对象数据类型序列化保存 */ @SuppressWarnings("resource") public void write(String value) throws IOException, InterruptedException { // FilterOutputStream是装饰器模式的父类,封装了代理类. 例如BufferedOutputStream // 默认字节数组长度为32,长度不够时扩增至之前一倍. ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); outputStream.write(value.getBytes()); outputStream.toByteArray(); outputStream.close(); // 增加了缓冲字节数组,默认大小是8kb(8192).写入数据时判断缓冲数组是否已满,满的话会把缓冲字节数组全部写入代理的数据流中,分支结束, // 最后加入缓冲字节数组,可手动调用flush把缓冲字节数组数据全部写入代理的输出流 BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream); bufferedOutputStream.write(1); bufferedOutputStream.close(); // 扩展到基本java类型,在内部把java类型转换成字节再写入. DataOutputStream dataOutputStream = new DataOutputStream(bufferedOutputStream); dataOutputStream.writeLong(1L); dataOutputStream.writeBoolean(true); dataOutputStream.writeUTF("hello"); dataOutputStream.close(); // 扩展到所有的java类型但是需要实现Serializable接口,在内部把java类型转换成字节再写入. [序列化保存对象] ObjectOutputStream objectOutputStream = new ObjectOutputStream(dataOutputStream); objectOutputStream.writeObject(new Model()); objectOutputStream.writeLong(1L); objectOutputStream.close(); // FileOutputStream 文件写入 File file = new File("test"); if (!file.exists()) { file.createNewFile(); } FileOutputStream fileOutputStream = new FileOutputStream(file); DataOutputStream fileDataOutputStream = new DataOutputStream(fileOutputStream);//加入一层DataOutputStream,方便写入字符串 fileDataOutputStream.writeUTF("你好"); fileDataOutputStream.close(); // 管道输出流 PipedOutputStream pipedOutputStream = new PipedOutputStream(); PipedInputStream pipedInputStream = new PipedInputStream(pipedOutputStream); new Thread(() -> { try { while (true) { pipedOutputStream.write(18); } } catch (IOException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { while (true) { int result = pipedInputStream.read(); System.out.println(result); } //pipedInputStream.close(); } catch (IOException e) { e.printStackTrace(); } }).start(); } /** * 字节输入流 InputStream * * InputStream 输入流超类,定义了读行为. * * 实际流 * ByteArrayInputStream 读取数组字节 * FileInputStream 文件 * * 装饰流 * BufferedInputStream 读缓冲 * DataInputStream java基本类型 * ObjectInputStream java对象 * SequenceInputStream 顺序读多个流 * PushbackInputStream 允许回退修改数据流 */ public void read() throws IOException { // FilterInputStream 封装了装饰器模式 // ByteArrayInputStream读取字节数组 byte[] bs = new byte[8194]; ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bs); // 增加缓冲数组 缓冲大小默认8kb 1.增加提前缓存功能 先一次性读取8kb大小的数据,然后每次的读取是从缓存中读,能降低直接读取io的次数 2.支持mark和reset操作 BufferedInputStream bufferedInputStream = new BufferedInputStream(byteArrayInputStream); int byteread = 0; printInputBufferInfo(bufferedInputStream); while ((byteread = bufferedInputStream.read()) != -1) { //printInputBufferInfo(bufferedInputStream); //System.out.print(byteread + " "); } // 读基本类型 线程不安全 似乎只能和DataOutputStream配合使用,详细的之后再看 ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); dataOutputStream.writeUTF("你好"); DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); System.out.println(dataInputStream.readUTF()); // FileInputStream 读取文件 只用字节读取会出现一个汉字分为两部分字节的问题 所以有了基于字符的流 FileInputStream fileInputStream = new FileInputStream("test"); BufferedInputStream fileBufferedInputStream = new BufferedInputStream(fileInputStream); byte[] data = new byte[10]; while (fileBufferedInputStream.read(data) != -1) { System.out.println(new String(data)); } fileBufferedInputStream.close(); // ObjectInputStream 要结合ObjectOutputStream使用 ByteArrayOutputStream objectArrayOutputStream = new ByteArrayOutputStream(); ObjectOutputStream objectOutputStream = new ObjectOutputStream(objectArrayOutputStream); objectOutputStream.writeObject(new Model()); ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(objectArrayOutputStream.toByteArray())); try { System.out.println(objectInputStream.readObject()); } catch (ClassNotFoundException e) { e.printStackTrace(); }; // PipedInputStream参见本文write方法 // SequenceInputStream 顺序读两个字节流 ByteArrayInputStream s1 = new ByteArrayInputStream("你".getBytes()); ByteArrayInputStream s2 = new ByteArrayInputStream("好".getBytes()); SequenceInputStream sequenceInputStream = new SequenceInputStream(s1, s2); int index = 0; byte[] combine = new byte[100]; int byteRead; while ((byteRead = sequenceInputStream.read()) != -1) { System.out.print(byteRead + ""); combine[index] = (byte) byteRead; index++; } System.out.println(new String(combine)); // PushbackInputStream PushbackInputStream pushbackInputStream = new PushbackInputStream(new ByteArrayInputStream("hello".getBytes())); while ((byteRead = pushbackInputStream.read()) != -1) { char letter = (char) byteRead; if (letter == 'e') { pushbackInputStream.unread('X'); //回退并修改上一个自己的值 System.out.println("change: " + letter + " to X"); } else { System.out.println("value: " + letter); } } } /** * 打印缓存输入流内部信息 */ private void printInputBufferInfo(BufferedInputStream bufferedInputStream) { Class cls = bufferedInputStream.getClass(); try { Field posfield = cls.getDeclaredField("pos"); posfield.setAccessible(true); System.out.print("pos:" + posfield.get(bufferedInputStream)); Field field = cls.getDeclaredField("count"); field.setAccessible(true); System.out.print(" count:" + field.get(bufferedInputStream)); Field buffield = cls.getDeclaredField("buf"); buffield.setAccessible(true); byte[] buf = (byte[]) buffield.get(bufferedInputStream); System.out.println(" buf:" + buf.length); } catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) { e.printStackTrace(); } } }