package com.linapex.code;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

/**
 * 项目名称:TempCode 	<br><br>
 * 
 * 类名称:Main 		<br><br>
 * 
 * 创建人:LinApex@163.com 	<br><br>
 * 
 * 创建时间:2014-3-1 下午10:58:33 	<br><br>
 * 
 * 版本:1.0					<br><br>
 * 
 * 功能描述:
 */

public class Main
{
	final static int BSIZE = 1024 * 1024;
	final static int DATACACHENUM = 10000;

	static int currThreadCount = 0;
	static int maxThreadCount = 9;

	static File roomFilterLogFile = new File("roomFilter.log");
	static File sqlFile = new File("roomSql.sql");
	static File csvFile = new File("D:\\baiduyundownload\\如家汉庭等酒店2000W开房数据\\2000W\\1-200W.csv");

	final static String sqlStrTemplate = "INSERT INTO `t_room_record`(id ,name, card, gender, birthday, address, zip, mobile, email, version) VALUES (null,':0', ':1', ':2', ':3', ':4', ':5', ':6', ':7',':8');";

	/**
	 * 
	 * 功能描述:初始化SQL写入
	 * <br><br>
	 * @param sqlFile
	 * @return
	 * @throws Exception 	BufferedWriter
	 * <br><br>
	 * 版本:1.0					<br><br>
	 * 创建人:LinApex@163.com 	<br><br>
	 * 创建时间:2014-3-1 下午11:01:02
	 */
	public static BufferedWriter initSQLWrite(File sqlFile) throws Exception
	{
		if (!sqlFile.exists())
		{
			if (!sqlFile.createNewFile())
			{
				System.err.println("创建文件失败,已存在:" + sqlFile.getAbsolutePath());
			}
		}

		return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(sqlFile, true), "UTF-8"));
	}

	/**
	 * 
	 * 功能描述:加载CSV
	 * <br><br>
	 * @param callBack
	 * @throws Exception 	void
	 * <br><br>
	 * 版本:1.0					<br><br>
	 * 创建人:LinApex@163.com 	<br><br>
	 * 创建时间:2014-3-1 下午11:01:14
	 */
	public static void loadCSV(CallBack3<Void> callBack) throws Exception
	{
		FileChannel inChannel = null;
		try
		{
			String enterStr = "\n";

			inChannel = new FileInputStream(csvFile).getChannel();

			ByteBuffer buffer = ByteBuffer.allocate(BSIZE);

			StringBuilder newlinesBui = new StringBuilder();
			int num = 0;
			while (inChannel.read(buffer) != -1)
			{
				buffer.flip();

				//数据组合.
				String content = new String(buffer.array());
				newlinesBui.append(content).toString();

				int fromIndex = 0;
				int endIndex = -1;
				//循环找到 \n
				while ((endIndex = newlinesBui.indexOf(enterStr, fromIndex)) > -1)
				{
					//得到一行
					String line = newlinesBui.substring(fromIndex, endIndex);

					num++;
					callBack.call(num, line);

					fromIndex = endIndex + 1;
				}

				newlinesBui.delete(0, fromIndex);
				buffer.clear();
			}

		} finally
		{
			if (inChannel != null)
			{
				inChannel.close();
			}
		}

	}

	public static void main(String[] args) throws Exception
	{
		final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreadCount); //线程池

		final List<Future<String>> threadResultList = new ArrayList<Future<String>>(); //线程返回集合

		final BufferedWriter bw = initSQLWrite(sqlFile); //主要的buffer对象.

		final WriteSqlHandle2 writeSqlFile = new WriteSqlHandle2(DATACACHENUM);

		StopWatch2 stopWatch = new StopWatch2(); //计时器
		stopWatch.start();

		loadCSV(new CallBack3<Void>()
		{

			@Override
			public Void call(int num, String str)
			{
				String[] strs = str.split(",");

				if (strs.length < 8)
				{
					writeLog("此条数据不录入::0", Arrays.toString(strs));
					return null;
				}

				try
				{
					String name = strs[0].trim();
					String card = strs[4];
					String gender = strs[5];
					String birthday = strs[6];
					String address = strs[7];
					String zip = strs[8];
					String mobile = strs[20];
					String email = strs[22];
					String version = strs[31];

					//生成sql语句
					final String tempSql = tm(sqlStrTemplate, name, card, gender, birthday, address, zip, mobile, email, version);

					//添加数据,如果超出了缓存数据,则 开始写入文件系统
					if (writeSqlFile.add(tempSql))
					{
						currThreadCount++;

						//如果提交的线程过多,则取回之后再提交.
						if (currThreadCount >= maxThreadCount)
						{
							//							System.out.println(String.format("当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount));
							for (Future<String> fs : threadResultList)
							{
								String tempSqlName = fs.get();

								currThreadCount--;
								//								System.out.println("已回调线程数:" + (maxThreadCount - currThreadCount) + "  线程返回的值:" + tempSqlName);
							}

							threadResultList.clear(); //清空
							currThreadCount = threadResultList.size();
							//							System.out.println(String.format("重新开始提交线程   当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount));
						}

						Future<String> future = threadPool.submit(new TaskWithResult(writeSqlFile, bw));

						threadResultList.add(future);
						//						System.out.println(String.format("开启了%s条线程(保存了%s条数据)", curr_thread_count, num));
					}

				} catch (Exception e)
				{
					writeLog("录入错误的数据::0", Arrays.toString(strs));
					writeLog("错误的原因::0", e.getMessage());
				}
				return null;
			}
		});

		writeSqlFile.flush(bw); //刷新缓存数据

		threadPool.shutdown(); //关闭线程池

		stopWatch.stop(); //停止计时

		System.out.println(String.format("任务完成时间:%s ms", stopWatch.getTime()));

	}

	/**
	 * 
	 * 功能描述:写日志
	 * <br><br>
	 * @param str
	 * @param values 	void
	 * <br><br>
	 * 版本:1.0					<br><br>
	 * 创建人:LinApex@163.com 	<br><br>
	 * 创建时间:2014-3-1 下午11:04:00
	 */
	public static void writeLog(String str, Object... values)
	{
		//FileUtils.doWriteFile(roomFilterLogFile.getAbsolutePath(), tm(str, values) + "\r\n", null, false);
	}

	/**
	 * 
	 * 功能描述:非常高效率的模版方法
	 * <br><br>
	 * @param strSource
	 * @param values
	 * @return 	String
	 * <br><br>
	 * 版本:1.0					<br><br>
	 * 创建人:LinApex@163.com 	<br><br>
	 * 创建时间:2014-3-1 下午11:04:09
	 */
	public static String tm(String strSource, Object... values)
	{
		if (strSource == null)
		{
			return null;
		}

		StringBuilder builder = new StringBuilder(strSource);

		final String prefix = ":";
		for (int index = 0; index < values.length; index++)
		{
			Object value = values[index];
			if (value == null)
			{
				continue;
			}

			String key = new StringBuilder(prefix).append(index).toString();

			int i = -1;
			if ((i = builder.indexOf(key, i)) > -1)
			{
				int len = key.length();
				builder.replace(i, i + len, value.toString());
			}
		}

		return builder.toString();
	}

}

/**
 * 
 * 项目名称:TempCode 	<br><br>
 * 
 * 类名称:TaskWithResult 		<br><br>
 * 
 * 创建人:LinApex@163.com 	<br><br>
 * 
 * 创建时间:2014-3-1 下午11:04:25 	<br><br>
 * 
 * 版本:1.0					<br><br>
 * 
 * 功能描述:任务与结果
 */
class TaskWithResult implements Callable<String>
{
	WriteSqlHandle2 handle2;

	BufferedWriter bufferedWriter;

	public TaskWithResult(WriteSqlHandle2 handle2, BufferedWriter bufferedWriter)
	{
		this.handle2 = handle2;
		this.bufferedWriter = bufferedWriter;
	}

	@Override
	public String call() throws Exception
	{
		String fileName = Thread.currentThread().getName();

		handle2.save(bufferedWriter);

		return fileName;
	}

}

/**
 * 
 * 项目名称:TempCode 	<br><br>
 * 
 * 类名称:WriteSqlHandle2 		<br><br>
 * 
 * 创建人:LinApex@163.com 	<br><br>
 * 
 * 创建时间:2014-3-1 下午11:05:37 	<br><br>
 * 
 * 版本:1.0					<br><br>
 * 
 * 功能描述:写SQL处理器
 */
class WriteSqlHandle2
{
	ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

	WriteLock writeLock = readWriteLock.writeLock();

	List<String> cacheList;

	int currItemCount = 0;

	int dataCacheNum;

	public WriteSqlHandle2()
	{
		cacheList = new ArrayList<String>();
	}

	public WriteSqlHandle2(int dataCacheNum)
	{
		this.dataCacheNum = dataCacheNum;
		cacheList = new ArrayList<String>(dataCacheNum);
	}

	public boolean isCacheExpires()
	{
		return currItemCount >= dataCacheNum;
	}

	public boolean add(String sqlStr)
	{
		try
		{
			writeLock.lock();
			cacheList.add(sqlStr);
			currItemCount++;
			return isCacheExpires();
		} finally
		{
			writeLock.unlock();
		}
	}

	public void save(BufferedWriter bw) throws Exception
	{
		try
		{
			writeLock.lock();

			//如果数据没有超出缓存.则返回.
			if (!isCacheExpires())
			{
				return;
			}

			StopWatch2 stopWatch = new StopWatch2();
			stopWatch.start();

			//			System.out.println(String.format("%s,准备消费   需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size()));

			for (String str : cacheList)
			{
				bw.write(str + "\r\n");
				currItemCount--;
			}

			stopWatch.stop();

			System.out.println(String.format("%s,消费完成,耗费时间:%s ms,消费数据长度:%s", Thread.currentThread().getName(), stopWatch.getTime(), cacheList.size()));

			cacheList.clear(); //清空数据.
		} finally
		{
			writeLock.unlock();
		}
	}

	public void flush(BufferedWriter bw) throws Exception
	{
		System.out.println(String.format("flush线程:%s, 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size()));

		for (String str : cacheList)
		{
			bw.write(str + "\r\n");
		}

		System.out.println(String.format("flush线程:%s, 消费完成,消费数据长度:%s", Thread.currentThread().getName(), cacheList.size()));

		cacheList.clear(); //清空数据

		closeWrite(bw);
	}

	private void closeWrite(BufferedWriter bw) throws Exception
	{
		bw.flush();
		bw.close();
	}

}

/**
 * 
 * 项目名称:TempCode 	<br><br>
 * 
 * 类名称:StopWatch2 		<br><br>
 * 
 * 创建人:LinApex@163.com 	<br><br>
 * 
 * 创建时间:2014-3-1 下午11:06:01 	<br><br>
 * 
 * 版本:1.0					<br><br>
 * 
 * 功能描述:类似Apache的计时器类
 */
class StopWatch2
{
	long begin;
	long end;

	public void start()
	{
		begin = System.currentTimeMillis();
	}

	public void stop()
	{
		end = System.currentTimeMillis();
	}

	public long getTime()
	{
		return end - begin;
	}
}

/**
 * 
 * 项目名称:TempCode 	<br><br>
 * 
 * 类名称:CallBack3 		<br><br>
 * 
 * 创建人:LinApex@163.com 	<br><br>
 * 
 * 创建时间:2014-3-1 下午11:06:19 	<br><br>
 * 
 * 版本:1.0					<br><br>
 * 
 * 功能描述:回调接口,类似Hibernate中的 HibernateCallback<T> 这个类一样.
 */
interface CallBack3<T>
{
	T call(int num, String str);
}
最近下载更多
lyn520  LV3 2023年2月21日
mylzdy  LV12 2022年5月21日
2469095052  LV8 2021年12月30日
newhaijun  LV15 2021年1月4日
Coincidance  LV8 2020年12月3日
gao123qq  LV21 2020年8月28日
wkc  LV21 2020年7月26日
耳朵的巧克力  LV1 2020年5月19日
chalet  LV1 2020年4月14日
yinyun1985  LV14 2020年4月8日
最近浏览更多
1105570390  LV8 2023年8月17日
xsz0001 2023年8月17日
暂无贡献等级
yyh1252  LV8 2023年8月1日
lyn520  LV3 2023年2月21日
快快乐乐536 2022年10月10日
暂无贡献等级
MoonSight  LV1 2022年7月1日
mylzdy  LV12 2022年5月21日
liys1234  LV9 2022年4月27日
bai620123  LV16 2022年4月16日
7767087  LV2 2022年2月28日
顶部 客服 微信二维码 底部
>扫描二维码关注最代码为好友扫描二维码关注最代码为好友