kikumotoのメモ帳

インフラ・ミドル周りを中心に、興味をもったことを適当な感じで。twitter : @takakiku

HDFSの読み書き

Hadoop を設定できたので、Java からHDFSにファイルを置いたり、HDFSからファイルを読み出したりする方法をメモっておく。

以下で、大きく分けて2種類の方法を書いているけれど、共通な点としてまず、Configuration クラスのインスタンスを用意しておくというのがある。これはクラスパス上から core-site.xml を読む込むようになっている。
これも含めて、クラスパスには

  • core-site.xml を含むディレクトリへのパス
  • hadoop-0.20.0-core.jar
  • commons-logging-1.0.4.jar

を指定することがが最低限必要なもののようである。

ローカルファイル名を指定した読み書き

HDFSへの書き込み

ローカルファイル名とHDFS上のファイル名を指定して、HDFSに書き込む場合は FileSystem.copyFromLocalFile というメソッドを使えばよい。コードのイメージは以下のような感じとなる。

String localPathString = ...; // ローカルファイルパス名
String hdfsPathString = ...;  // HDFSパス名

Configuration conf = new Configuration();
Path localPath = new Path(localPathString);
Path hdfsPath = new Path(hdfsPathString);
try {
    FileSystem fs = hdfsPath.getFileSystem(conf);
    fs.copyFromLocalFile(false, false, localPath, hdfsPath);
} catch (IOException ex) {
    ex.printStackTrace();
}
HDFSからの読み出し

FileSystem.copyFromLocalFile の代わりに FileSystem.copyToLocalFile を

String localPathString = ...; // ローカルファイルパス名
String hdfsPathString = ...;  // HDFSパス名

Configuration conf = new Configuration();
Path localPath = new Path(localPathString);
Path hdfsPath = new Path(hdfsPathString);
try {
    FileSystem fs = hdfsPath.getFileSystem(conf);
    fs.copyToLocalFile(hdfsPath, localPath);
} catch (IOException e) {
    e.printStackTrace();
}

のように使えば、読み出しはできるが、パーミッションが 777 となったり、同時に .XXXX.crc というような(XXXX はローカルファイル名)ファイルもできてしまうようである。
単純にファイルを読み出したいだけなら、FileUtil.copy を使う以下のような方法でよいようだ。

String localPathString = ...; // ローカルファイルパス名
String hdfsPathString = ...;  // HDFSパス名

Configuration conf = new Configuration();
Path hdfsPath = new Path(hdfsPathString);
try {
    FileSystem fs = hdfsPath.getFileSystem(conf);
    FileUtil.copy(fs, hdfsPath, new File(localPathString), false, conf);
} catch (IOException e) {
    e.printStackTrace();
}

ストリームを使う読み書き

Web アプリケーションとかだと、直接ストリームとHDFSの間でやりとりしたいので、この場合は IOUtil.copyBytes メソッドを使うとよいようだ。

ストリームからHDFSへの書き出し

HDFSに入力ストリームから書き出す場合は、FileSystem.create メソッドHDFS への出力ストリームを取得し、IOUtil.copyBytes を呼び出せばよい。以下のような感じ。

InputStream is = ...;
String hdfsPathString = ...;

Configuration conf = new Configuration();

OutputStream os = null;
try {
    Path hdfsPath = new Path(hdfsPathString);
    FileSystem fs = hdfsPath.getFileSystem(conf);
    os = fs.create(hdfsPath);
    IOUtils.copyBytes(is, os, conf, false);
} catch (IOException e) {
    e.printStackTrace();
} finally {
    if (os != null) {
        try {
            os.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
HDFSからストリームへの書き出し

HDFSから出力ストリームに書き出す場合は、FileSystem.open メソッドHDFS への入力ストリームを取得し、IOUtil.copyBytes を呼び出せばよい。

OutputStream os = ...;
String hdfsPathString = ...;

Configuration conf = new Configuration();

InputStream is = null;
try {
    Path hdfsPath = new Path(hdfsPathString);
    FileSystem fs = hdfsPath.getFileSystem(conf);
    fs.setVerifyChecksum(true);

    is = fs.open(hdfsPath);
    IOUtils.copyBytes(is, os, conf, false);
} catch (IOException e) {
    e.printStackTrace();
} finally {
    if (is != null) {
        try {
            is.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}