BigData/Cassandra

sstableloader 를 이용한 DB table data migration

Lawmin 2012. 4. 6. 13:16

rdbms와 nosql db는 그 구조에 차이가 있어 rdb의 table 자체를 넣는 것은 큰 의미가 없을 수 있다.

하지미나 꼭 필요하다면, sstable 형태로 만들어주는 프로그램을 작성해서 sstableloader 등으로 import 해야한다.

아래는, 공식사이트의 소스를 참고하여 범용으로 사용할 있도록 수정한 java 프로그램이다.

모든 컬럼 데이터를 string 으로 넣도록 하였으나, 필요하면 각 타입에 맞게 변환해서 넣으면 된다.

csv (각 컬럼은 콤마로 구분되어 있으며, 첫줄은 column header) 를 sstable 형태로 변환해주며,

sstableloader 로 결과 디렉토리를 지정해주면, load 를 시작하게 된다.

컴파일 전에 cassandra 의 모든 lib 을 클래스패스에 넣어줘야 하고, 실행시엔 운영 cassandra 와 별도로, import 용 node 를 띄워서 load 해야 한다. (loopback 및 다른 포트 이용)


import static org.apache.cassandra.utils.ByteBufferUtil.bytes;


import java.io.BufferedReader;

import java.io.File;

import java.io.FileReader;

import java.io.IOException;

import java.nio.charset.Charset;


import org.apache.cassandra.db.marshal.UTF8Type;

import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;


public class DataImport {

static String keyspace;

static String columnFamily;

    static String dataFilename;


    public static void main(String[] args) throws IOException {

        if (args.length != 3) {

            System.out.println("Expecting <keyspace> <column_family_name> <csv_DATA_with_column_header_file> as argument");

            System.exit(1);

        }

        keyspace = args[0];

        columnFamily = args[1];

        dataFilename = args[2];

        BufferedReader dataReader = new BufferedReader(new FileReader(dataFilename));

        String line;

        String[] columnHeader = null; 

        if((line = dataReader.readLine()) != null) {

        columnHeader = line.split(",", -1);

        }

        if(columnHeader == null || columnHeader.length == 0) {

        System.out.println("no column header");

        return;

        }


        File directory = new File(keyspace);

        if (!directory.exists())

            directory.mkdir();


        SSTableSimpleUnsortedWriter usersWriter = new SSTableSimpleUnsortedWriter(

                directory,

                keyspace,

                columnFamily,

                UTF8Type.instance,

                null,

                64);


        int lineNumber = 1;

        long timestamp = System.currentTimeMillis() * 1000;

        Charset cs = Charset.forName("UTF-8");

        while ((line = dataReader.readLine()) != null) {

            ++lineNumber;

        String[] columns = line.split(",", -1);

        if (columns.length != columnHeader.length) {

                System.out.println(String.format("Invalid input '%s' at line %d of %s", line, lineNumber, dataFilename));

                continue;

            }

        try {

                usersWriter.newRow(bytes(columns[0].trim(), cs));

                for(int i = 1; i < columns.length; ++i) {

                usersWriter.addColumn(bytes(columnHeader[i], cs), bytes(columns[i], cs), timestamp);

                }

            }

            catch (NumberFormatException e)

            {

                System.out.println(String.format("Invalid number in input '%s' at line %d of %s", line, lineNumber, dataFilename));

                continue;

            }

            if(lineNumber % 10000 == 0)

            System.out.println(lineNumber + " lines processed.");

        }

        usersWriter.close();

        System.out.println(lineNumber + " lines processed.");

        System.exit(0);

    }

}