| 注册
请输入搜索内容

热门搜索

Java Linux MySQL PHP JavaScript Hibernate jQuery Nginx
TerriMabry
10年前发布

Java多线程实现文件快速切分

前段时间需要进行大批量数据导入,DBA给提供的是CVS文件,但是每个CVS文件都好几个GB大小,直接进行load,数据库很慢还会产生内存不足的问题,为了实现这个功能,写了个快速切分文件的程序。
</div>

 

[Java]代码    

import org.apache.log4j.LogManager;  import org.apache.log4j.Logger;    import java.io.*;  import java.util.*;  import java.util.concurrent.*;    public class FileSplitUtil {        private final static Logger log = LogManager.getLogger(FileSplitUtil.class);      private static final long originFileSize = 1024 * 1024 * 100;// 100M      private static final int blockFileSize = 1024 * 1024 * 64;// 防止中文乱码,必须取2的N次方      /**       * CVS文件分隔符       */      private static final char cvsSeparator = '^';      public static  void  main(String args[]){          long start = System.currentTimeMillis();          try {              String fileName = "D:\\csvtest\\aa.csv";              File sourceFile = new File(fileName);              if (sourceFile.length() >= originFileSize) {                  String cvsFileName = fileName.replaceAll("\\\\", "/");                  FileSplitUtil fileSplitUtil = new FileSplitUtil();                  List<String> parts=fileSplitUtil.splitBySize(cvsFileName, blockFileSize);                  for(String part:parts){                      System.out.println("partName is:"+part);                  }              }              System.out.println("总文件长度"+sourceFile.length()+",拆分文件耗时:" + (System.currentTimeMillis() - start) + "ms.");          }catch (Exception e){              log.info(e.getStackTrace());          }        }            /**       * 拆分文件       *       * @param fileName 待拆分的完整文件名       * @param byteSize 按多少字节大小拆分       * @return 拆分后的文件名列表       */      public List<String> splitBySize(String fileName, int byteSize)              throws IOException, InterruptedException {          List<String> parts = new ArrayList<String>();          File file = new File(fileName);          int count = (int) Math.ceil(file.length() / (double) byteSize);          int countLen = (count + "").length();          RandomAccessFile raf = new RandomAccessFile(fileName, "r");          long totalLen = raf.length();          CountDownLatch latch = new CountDownLatch(count);            for (int i = 0; i < count; i++) {              String partFileName = file.getPath() + "."                      + leftPad((i + 1) + "", countLen, '0') + ".cvs";              int readSize=byteSize;              long startPos=(long)i * byteSize;              long nextPos=(long)(i+1) * byteSize;              if(nextPos>totalLen){                  readSize= (int) (totalLen-startPos);              }              new SplitRunnable(readSize, startPos, partFileName, file, latch).run();              parts.add(partFileName);          }          latch.await();//等待所有文件写完          //由于切割时可能会导致行被切断,加工所有的的分割文件,合并行          mergeRow(parts);          return parts;      }        /**       * 分割处理Runnable       *       * @author supeidong       */      private class SplitRunnable implements Runnable {          int byteSize;          String partFileName;          File originFile;          long startPos;          CountDownLatch latch;          public SplitRunnable(int byteSize, long startPos, String partFileName,                               File originFile, CountDownLatch latch) {              this.startPos = startPos;              this.byteSize = byteSize;              this.partFileName = partFileName;              this.originFile = originFile;              this.latch = latch;          }            public void run() {              RandomAccessFile rFile;              OutputStream os;              try {                  rFile = new RandomAccessFile(originFile, "r");                  byte[] b = new byte[byteSize];                  rFile.seek(startPos);// 移动指针到每“段”开头                  int s = rFile.read(b);                  os = new FileOutputStream(partFileName);                  os.write(b, 0, s);                  os.flush();                  os.close();                  latch.countDown();              } catch (IOException e) {                  log.error(e.getMessage());                  latch.countDown();              }          }      }        /**       * 合并被切断的行       *       * @param parts       */      private void mergeRow(List<String> parts) {          List<PartFile> partFiles = new ArrayList<PartFile>();          try {              //组装被切分表对象              for (int i=0;i<parts.size();i++) {                  String partFileName=parts.get(i);                  File splitFileTemp = new File(partFileName);                  if (splitFileTemp.exists()) {                      PartFile partFile = new PartFile();                      BufferedReader reader=new BufferedReader(new InputStreamReader(new FileInputStream(splitFileTemp),"gbk"));                      String firstRow = reader.readLine();                      String secondRow = reader.readLine();                      String endRow = readLastLine(partFileName);                      partFile.setPartFileName(partFileName);                      partFile.setFirstRow(firstRow);                      partFile.setEndRow(endRow);                      if(i>=1){                          String prePartFile=parts.get(i - 1);                          String preEndRow = readLastLine(prePartFile);                          partFile.setFirstIsFull(getCharCount(firstRow+preEndRow)>getCharCount(secondRow));                      }                        partFiles.add(partFile);                      reader.close();                  }              }              //进行需要合并的行的写入              for (int i = 0; i < partFiles.size() - 1; i++) {                  PartFile partFile = partFiles.get(i);                  PartFile partFileNext = partFiles.get(i + 1);                  StringBuilder sb = new StringBuilder();                  if (partFileNext.getFirstIsFull()) {                      sb.append("\r\n");                      sb.append(partFileNext.getFirstRow());                  } else {                      sb.append(partFileNext.getFirstRow());                  }                  writeLastLine(partFile.getPartFileName(),sb.toString());              }          } catch (Exception e) {              log.error(e.getMessage());          }      }        /**       * 得到某个字符出现的次数       * @param s       * @return       */      private int getCharCount(String s) {          int count = 0;          for (int i = 0; i < s.length(); i++) {              if (s.charAt(i) == cvsSeparator) {                  count++;              }          }          return count;      }        /**       * 采用BufferedInputStream方式读取文件行数       *       * @param filename       * @return       */      public int getFileRow(String filename) throws IOException {          InputStream is = new BufferedInputStream(new FileInputStream(filename));          byte[] c = new byte[1024];          int count = 0;          int readChars = 0;          while ((readChars = is.read(c)) != -1) {              for (int i = 0; i < readChars; ++i) {                  if (c[i] == '\n')                      ++count;              }          }          is.close();          return count;      }        /**       * 读取最后一行数据       * @param filename       * @return       * @throws IOException       */      private String readLastLine(String filename) throws IOException {          // 使用RandomAccessFile , 从后找最后一行数据          RandomAccessFile raf = new RandomAccessFile(filename, "r");          long len = raf.length();          String lastLine = "";          if(len!=0L) {              long pos = len - 1;              while (pos > 0) {                  pos--;                  raf.seek(pos);                  if (raf.readByte() == '\n') {                      lastLine = raf.readLine();                      lastLine=new String(lastLine.getBytes("8859_1"), "gbk");                      break;                  }              }          }          raf.close();          return lastLine;      }      /**       * 修改最后一行数据       * @param fileName       * @param lastString       * @return       * @throws IOException       */      private void writeLastLine(String fileName,String lastString){          try {              // 打开一个随机访问文件流,按读写方式              RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw");              // 文件长度,字节数              long fileLength = randomFile.length();              //将写文件指针移到文件尾。              randomFile.seek(fileLength);              //此处必须加gbk,否则会出现写入乱码              randomFile.write(lastString.getBytes("gbk"));              randomFile.close();          } catch (IOException e) {              log.error(e.getMessage());          }      }      /**       * 左填充       *       * @param str       * @param length       * @param ch       * @return       */      public static String leftPad(String str, int length, char ch) {          if (str.length() >= length) {              return str;          }          char[] chs = new char[length];          Arrays.fill(chs, ch);          char[] src = str.toCharArray();          System.arraycopy(src, 0, chs, length - src.length, src.length);          return new String(chs);      }        /**       * 合并文件行内部类       */      class PartFile {          private String partFileName;          private String firstRow;          private String endRow;          private boolean firstIsFull;            public String getPartFileName() {              return partFileName;          }            public void setPartFileName(String partFileName) {              this.partFileName = partFileName;          }            public String getFirstRow() {              return firstRow;          }            public void setFirstRow(String firstRow) {              this.firstRow = firstRow;          }            public String getEndRow() {              return endRow;          }            public void setEndRow(String endRow) {              this.endRow = endRow;          }            public boolean getFirstIsFull() {              return firstIsFull;          }            public void setFirstIsFull(boolean firstIsFull) {              this.firstIsFull = firstIsFull;          }      }    }