25 Temmuz 2017 Salı

Apache Spark - Cassandra

Bir önceki yazımızda kısaca Apache Spark'tan bahsetmiştik. Bu yazıda da gerçek dünyada kullanımına dair küçük bir kod parçası ve karşılacağımız olası bir problemin çözümünden bahsedeceğim.

Apache Cassandra'da yer alan büyük veri yığını tuttuğumuz tablomuza bağlanıp. Son 1 gün içinde tabloya insert edilmiş yaklaşık 4M kaydın "group by" ile "count (distinct)" sonucunu tek dosyasına yazan bir java uygulamasını aşağıdaki gibi yazabiliriz.

                 String sparkMaster = "local[4]";
   
    String cassandraNode = "x.x.x.x"; // veriyi çekeceğimiz cassandra node
 
    SparkConf conf = new SparkConf();
    conf.setAppName("Test Program");
    conf.setMaster(sparkMaster);
    conf.set("spark.cassandra.connection.host", cassandraNode);
    conf.set("spark.cores.max", "4");
    conf.set("spark.shuffle.service.enabled", "false");
    conf.set("spark.dynamicAllocation.enabled", "false");
    conf.set("spark.io.compression.codec", "snappy");
    conf.set("spark.rdd.compress", "true");
    conf.set("spark.driver.memory", "8G");
    conf.set("spark.executor.memory", "8G");


Spark işlemlerimizi koşacağımız uygulama ile önemli konfigürasyon ayarlarını yaptığımız kısım yukarıda. Kendi Cassandra cluster bilgilerinize ve uygulamanın koşacağı sunucunun kapasitesine göre ayarları değiştirebilirsiniz. Cassandra'dan büyük hacimde veri çekileceğinden en mantıklı yaklaşım Cassandra verisinin tutulduğu sunucu üzerinde bu kodu çalıştırmak olacaktır.

                JavaSparkContext sc = new JavaSparkContext(conf);
      
    SparkContextJavaFunctions functions =                     CassandraJavaUtil.javaFunctions(sc);
    JavaRDD<CassandraRow> rdd = functions.cassandraTable("[cassandra keyspace adı]", "[cassandra tablo adı]") .select("id").where("createDate > ?", sDate).where("createDate < ?", eDate).where("id > ?", 100);

Bu kısımda JavaSparkContext sınıfından bir örnek oluşturarak, bununla Cassandra'daki tablomuzdan belirldiğimiz tarih aralığında insert edilmiş, değeri 100'den büyük id değerine sahip kayıtları çekip rdd nesnemizin içine alıyoruz.

                rdd.saveAsTextFile("Test.txt");

Sonuçları text dosyasına kaydetmek tek satır ile mümkün. Tüm verimiz belirlediğimi dizinde dosyaya yazılıyor olacak.

Kodu Windows'ta çalıştırdığınızda aşağıdaki gibi bir hata alınması olası.

Could not locate executable null\bin\winutils.exe in the Hadoop binaries

Bu durumda aşağıdaki adımları takip ederek bu sorunu kolayca bertaraf edebiliriz.


  1. http://public-repo-1.hortonworks.com/hdp-win-alpha/winutils.exe. adresinden uygulamayı indirin
  2. Uygulamanın çalıştığı yerde C:\winutils\bin dizin yapısını oluşturun
  3. Oluşturduğunuz dizine indirdiğiniz winutils.exe dosyasını kopyalayın.
  4. Java kodunuza aşağıdaki satırı ekleyin.
  5. Başka bir sorun yoksa başarılı bir şekilde işlem tamamlanmış demektir.
                  System.setProperty("hadoop.home.dir", "C:\\winutils");

       

Hiç yorum yok:

Yorum Gönder