HDFSの読み書き
Hadoop を設定できたので、Java からHDFSにファイルを置いたり、HDFSからファイルを読み出したりする方法をメモっておく。
以下で、大きく分けて2種類の方法を書いているけれど、共通な点としてまず、Configuration クラスのインスタンスを用意しておくというのがある。これはクラスパス上から core-site.xml を読む込むようになっている。
これも含めて、クラスパスには
を指定することがが最低限必要なもののようである。
ローカルファイル名を指定した読み書き
HDFSへの書き込み
ローカルファイル名とHDFS上のファイル名を指定して、HDFSに書き込む場合は FileSystem.copyFromLocalFile というメソッドを使えばよい。コードのイメージは以下のような感じとなる。
String localPathString = ...; // ローカルファイルパス名 String hdfsPathString = ...; // HDFSパス名 Configuration conf = new Configuration(); Path localPath = new Path(localPathString); Path hdfsPath = new Path(hdfsPathString); try { FileSystem fs = hdfsPath.getFileSystem(conf); fs.copyFromLocalFile(false, false, localPath, hdfsPath); } catch (IOException ex) { ex.printStackTrace(); }
HDFSからの読み出し
FileSystem.copyFromLocalFile の代わりに FileSystem.copyToLocalFile を
String localPathString = ...; // ローカルファイルパス名 String hdfsPathString = ...; // HDFSパス名 Configuration conf = new Configuration(); Path localPath = new Path(localPathString); Path hdfsPath = new Path(hdfsPathString); try { FileSystem fs = hdfsPath.getFileSystem(conf); fs.copyToLocalFile(hdfsPath, localPath); } catch (IOException e) { e.printStackTrace(); }
のように使えば、読み出しはできるが、パーミッションが 777 となったり、同時に .XXXX.crc というような(XXXX はローカルファイル名)ファイルもできてしまうようである。
単純にファイルを読み出したいだけなら、FileUtil.copy を使う以下のような方法でよいようだ。
String localPathString = ...; // ローカルファイルパス名 String hdfsPathString = ...; // HDFSパス名 Configuration conf = new Configuration(); Path hdfsPath = new Path(hdfsPathString); try { FileSystem fs = hdfsPath.getFileSystem(conf); FileUtil.copy(fs, hdfsPath, new File(localPathString), false, conf); } catch (IOException e) { e.printStackTrace(); }
ストリームを使う読み書き
Web アプリケーションとかだと、直接ストリームとHDFSの間でやりとりしたいので、この場合は IOUtil.copyBytes メソッドを使うとよいようだ。
ストリームからHDFSへの書き出し
HDFSに入力ストリームから書き出す場合は、FileSystem.create メソッドで HDFS への出力ストリームを取得し、IOUtil.copyBytes を呼び出せばよい。以下のような感じ。
InputStream is = ...; String hdfsPathString = ...; Configuration conf = new Configuration(); OutputStream os = null; try { Path hdfsPath = new Path(hdfsPathString); FileSystem fs = hdfsPath.getFileSystem(conf); os = fs.create(hdfsPath); IOUtils.copyBytes(is, os, conf, false); } catch (IOException e) { e.printStackTrace(); } finally { if (os != null) { try { os.close(); } catch (IOException e) { e.printStackTrace(); } } }
HDFSからストリームへの書き出し
HDFSから出力ストリームに書き出す場合は、FileSystem.open メソッドで HDFS への入力ストリームを取得し、IOUtil.copyBytes を呼び出せばよい。
OutputStream os = ...; String hdfsPathString = ...; Configuration conf = new Configuration(); InputStream is = null; try { Path hdfsPath = new Path(hdfsPathString); FileSystem fs = hdfsPath.getFileSystem(conf); fs.setVerifyChecksum(true); is = fs.open(hdfsPath); IOUtils.copyBytes(is, os, conf, false); } catch (IOException e) { e.printStackTrace(); } finally { if (is != null) { try { is.close(); } catch (IOException e) { e.printStackTrace(); } } }