kikumotoのメモ帳

インフラ・ミドル周りを中心に、興味をもったことを適当な感じで。twitter : @takakiku

CloudStoreを使う・Java編:ランダムアクセス

Javaライブラリを使ってランダムアクセスをしようと思ったら、Javaライブラリで提供されるAPIではアクセスできない。JNI 側および C/C++ ライブラリ的にはランダムアクセス可能なので、単にJava側のAPIの提供方法の問題のようである。

CloudStore の HDFS に対する魅力の一つはランダムアクセスなので、とりあえずこれをJavaから試せるように、Javaライブラリをいじってみた。
KfsAccess, KfsInputChannel, KfsOutputChannel を修正し、KfsChannel というクラスを追加した。修正したファイルについては diff を、KfsChannel については全体を下に載せておく。
修正の概略は

  • KfsChannel が ReadableByteChannel, WritableByteChannel を実装し、ランダムアクセス機能を提供している。
    • JNI へのアクセスは、とりあえず KfsInputChannel, KfsOutputChannel を経由させている。(本来は、このクラス内で native メソッドを定義して、JNI の実装を書くべきと思うが)
    • JNI 側の実装の制約である、ByteBuffer をallocateDirectで確保しなければならない点は要注意。
  • KfsAccess では、ランダムアクセス用のファイルオープンメソッドを定義し、KfsChannelを返すようにしている。
  • KfsInputChannel, KfsOutputChannel では、KfsChannel が native メソッドにアクセスできるように native メソッドのスコープをパッケージスコープに変更している。

この KfsChannel を使って、ランダムアクセスを使って読み書きする - サンプルコードによるPerl入門 と同等なことを Java で実装してたのが、以下のサンプルである。
これで、JavaからでもCloudStore上のファイルに対してランダムアクセスができることを確認できた。

public class KfsRandomAccessSample {
    private static final int len_rec = 16;
    private static final int pos_name = 0;
    private static final int len_name = 8;
    private static final int pos_age = 8;
    private static final int len_age = 8;
    
    private static final int pos_peko_age = 1 * len_rec + pos_age;

    public static void main(String[] args) throws IOException {
        String metaServer = args[0].trim();
        int port = Integer.parseInt(args[1].trim());
        String path = args[2].trim();
        
        KfsAccess kfsAccess = new KfsAccess(metaServer, port);
        KfsChannel channel = kfsAccess.kfs_open_randomaccess(path);
        if (channel != null) {
            try {
                channel.seek(pos_peko_age);
                
                ByteBuffer buff = ByteBuffer.allocateDirect(len_age);
                channel.read(buff);
                buff.flip();
                byte[] data = new byte[buff.limit()];
                buff.get(data);
                int age = Integer.parseInt(new String(data));
                age++;
                
                channel.seek(pos_peko_age);
                
                ByteArrayOutputStream ba = new ByteArrayOutputStream();
                PrintStream ps = new PrintStream(ba);
                ps.printf("%08d", age);
                buff.clear();
                buff.put(ba.toByteArray());
                buff.flip();
                channel.write(buff);
            } finally {
                channel.close();
            }
        }
    }
}

KfaAccessの修正

--- KfsAccess.java.original
+++ KfsAccess.java
@@ -295,6 +295,19 @@
         return new KfsInputChannel(cPtr, fd);
     }

+    public KfsChannel kfs_open_randomaccess(String path)
+    {
+        return kfs_open_randomaccess(path, 1);
+    }
+
+    public KfsChannel kfs_open_randomaccess(String path, int numReplicas)
+    {
+        int fd = open(cPtr, path, "rw", numReplicas);
+        if (fd < 0)
+            return null;
+        return new KfsChannel(cPtr, fd);
+    }
+
     public int kfs_remove(String path)
     {
         return remove(cPtr, path);

KfsInputChannelの修正

--- KfsInputChannel.java.original
+++ KfsInputChannel.java
@@ -43,16 +43,16 @@
     private int kfsFd = -1;
     private long cPtr;

-    private final static native
+    final static native
     int read(long cPtr, int fd, ByteBuffer buf, int begin, int end);

-    private final static native
+    final static native
     int close(long cPtr, int fd);

-    private final static native
+    final static native
     int seek(long cPtr, int fd, long offset);

-    private final static native
+    final static native
     long tell(long cPtr, int fd);

     public KfsInputChannel(long ptr, int fd)

KfsOutputChannelの修正

--- KfsOutputChannel.java.original
+++ KfsOutputChannel.java
@@ -45,10 +45,10 @@
     private final static native
     int close(long ptr, int fd);

-    private final static native
+    final static native
     int write(long ptr, int fd, ByteBuffer buf, int begin, int end);

-    private final static native
+    final static native
     int sync(long ptr, int fd);

     private final static native

KfsChannelの実装

package org.kosmix.kosmosfs.access;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;

public class KfsChannel implements ReadableByteChannel, WritableByteChannel, Positionable
{
    private int kfsFd = -1;
    private long cPtr;

    public KfsChannel(long ptr, int fd)
    {
        kfsFd = fd;
        cPtr = ptr;
    }

    public boolean isOpen()
    {
        return kfsFd > 0;
    }

    public int read(ByteBuffer buf) throws IOException
    {
        if (kfsFd < 0)
            throw new IOException("File closed");

        if(!buf.isDirect())
            throw new IOException("need direct buffer");

        if(!buf.hasRemaining())
            return 0;

        int pos = buf.position();
        int sz = KfsInputChannel.read(cPtr, kfsFd, buf, pos, buf.limit());
        if(sz < 0)
            throw new IOException("read failed");
        if(sz == 0)
            return -1;

        buf.position(pos + sz);
        return sz;
    }

    public int write(ByteBuffer buf) throws IOException
    {
        if (kfsFd < 0)
            throw new IOException("File closed");

        if(!buf.isDirect())
            throw new IOException("need direct buffer");

        int pos = buf.position();
        int last = buf.limit();

        if (last - pos == 0)
            return 0;

        int sz = KfsOutputChannel.write(cPtr, kfsFd, buf, pos, last);

        if(sz < 0)
            throw new IOException("writeDirect failed");

        // System.out.println("Wrote via JNI: kfsFd: " + kfsFd + " amt: " + sz);

        buf.position(pos + sz);
        return sz;
    }

    public int sync() throws IOException
    {
        if (kfsFd < 0)
            throw new IOException("File closed");

        return KfsOutputChannel.sync(cPtr, kfsFd);
    }

    public int seek(long offset) throws IOException
    {
        if (kfsFd < 0)
            throw new IOException("File closed");

        sync();

        return KfsInputChannel.seek(cPtr, kfsFd, offset);
    }

    public long tell() throws IOException
    {
        if (kfsFd < 0)
            throw new IOException("File closed");

        return KfsInputChannel.tell(cPtr, kfsFd);
    }

    public void close() throws IOException
    {
        if (kfsFd < 0)
            return;

        KfsInputChannel.close(cPtr, kfsFd);
        kfsFd = -1;
    }

    protected void finalize() throws Throwable
    {
        if (kfsFd < 0)
            return;
        close();
    }
}