Table Of Contents

骑驴找蚂蚁

全干工程师

Java-Logo

记一次JdbcTemplate批量插入性能(非常慢)问题

最近一个需求需要将上亿数据导入到开发环境,主要DBA导入出的数据是txt格式不是sql文件,这样的话需要解析文件然后再进行批量导入。

当时的想法就是将大文件分片成多个文件,将分片之后的文件提交到线程池里面再进行批量插入。

分片文件

		int sizeOfChunk = 1024 * 1024 * 500;
		String path = "需要分片的文件.txt";
        try (Stream<String> lines = Files.lines(path)) {
            AtomicInteger counter = new AtomicInteger(1);
            AtomicInteger fileSize = new AtomicInteger();
            AtomicReference<File> newFile = new AtomicReference<>();
            AtomicReference<OutputStream> out = new AtomicReference<>();
            String eof = System.lineSeparator();
            lines.forEach(line -> {
                if (fileSize.get() == 0) {
                    var file = new File(path.getParent().toString(), FilenameUtils.removeExtension(path.getFileName().toString()) + "." + String.format("%03d.txt", counter.getAndIncrement()));
                    newFile.set(file);
                    System.out.println("新建文件:" + file.getName());
                    try {
                        out.set(new BufferedOutputStream(Files.newOutputStream(newFile.get().toPath())));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
                byte[] bytes = (line + eof).getBytes(StandardCharsets.UTF_8);
                try {
                    out.get().write(bytes);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
                long fileCurrentSize = fileSize.get() + bytes.length;
                fileSize.addAndGet(bytes.length);
                if (fileCurrentSize >= sizeOfChunk) {
                    fileSize.set(0);
                    try {
                        out.get().close();
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    System.out.println("提交任务:"+ newFile.get().getName());
                    // 将分片后文件提交的线程池进行导入
                    this.executor.execute(this.ctx.getBean(ImportDataStore.class, newFile.get().getPath(), dataType));
                }
            });
            // 将剩余的文件提交到线程池进行导入
            if (fileSize.get() > 0) {
                out.get().close();
                System.out.println("提交任务:"+ newFile.get().getName());
                this.executor.execute(this.ctx.getBean(ImportDataStore.class, newFile.get().getPath(), dataType));
            }
            // 所有任务结束后关闭线程池,因为使用的是CommandLineRunner程序
            while (this.executor.getActiveCount() < 1) {
                this.executor.shutdown();
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

批量导入

 // 从分片这里提交的文件
 String path = "分片之后的文件.txt";
 try (Stream<String> lines = Files.lines(path)) {
            lines.forEach(line -> {
                String[] lineSplit = line.split("\t");
                try {
                    idcards.add(new Idcard(Long.parseLong(lineSplit[0]), lineSplit[1], lineSplit[2], formatter.parse(lineSplit[3])));
                } catch (ParseException e) {
                    throw new RuntimeException(e);
                }
                if (idcards.size() == 1000) {
                    count.addAndGet(1000);
                    // 主要性能再这里,这里每次只批量导入1000条
                    this.jdbcTemplate.batchUpdate("insert into idcards(xxx, xxxx, xxxxx, xxxxxx) values(?, ?, ?, ?)", idcards, 1000, (ps, idcard) -> {
                        ps.setLong(1, idcard.getXXX());
                        ps.setString(2, idcard.getXXXX());
                        ps.setString(3, idcard.getXXXXX());
                        Date createdAt = idcard.getXXXXXX();
                        ps.setDate(4, new java.sql.Date(createdAt.getTime()));
                    });
                    logger.info("已导入{}条实名数据", count.get());
                    idcards.clear();
                }
            });
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

再执行代码之后,插入数据的速度只有1000条/秒左右,开启了10个线程也很慢。再通过google搜索后再stackoverflow上面找到了一个相同的问题。 根据里面的回答,在配置datasource的时候我也加入了:

spring:
  datasource:
    url: "jdbc:mysql://${MYSQL_HOST:127.0.0.1}:${MYSQL_PORT:3306}/xxx_xxxx?useServerPrepStmts=false&rewriteBatchedStatements=true"
    driver-class-name: "com.mysql.cj.jdbc.Driver"

在添加了useServerPrepStmts=false&rewriteBatchedStatements=true之后,性能起飞了。

可以看看JdbcTemplate性能介绍的文章.

留言