25 Temmuz 2017 Salı

Apache Spark - Cassandra

Bir önceki yazımızda kısaca Apache Spark'tan bahsetmiştik. Bu yazıda da gerçek dünyada kullanımına dair küçük bir kod parçası ve karşılacağımız olası bir problemin çözümünden bahsedeceğim.

Apache Cassandra'da yer alan büyük veri yığını tuttuğumuz tablomuza bağlanıp. Son 1 gün içinde tabloya insert edilmiş yaklaşık 4M kaydın "group by" ile "count (distinct)" sonucunu tek dosyasına yazan bir java uygulamasını aşağıdaki gibi yazabiliriz.

                 String sparkMaster = "local[4]";
   
    String cassandraNode = "x.x.x.x"; // veriyi çekeceğimiz cassandra node
 
    SparkConf conf = new SparkConf();
    conf.setAppName("Test Program");
    conf.setMaster(sparkMaster);
    conf.set("spark.cassandra.connection.host", cassandraNode);
    conf.set("spark.cores.max", "4");
    conf.set("spark.shuffle.service.enabled", "false");
    conf.set("spark.dynamicAllocation.enabled", "false");
    conf.set("spark.io.compression.codec", "snappy");
    conf.set("spark.rdd.compress", "true");
    conf.set("spark.driver.memory", "8G");
    conf.set("spark.executor.memory", "8G");


Spark işlemlerimizi koşacağımız uygulama ile önemli konfigürasyon ayarlarını yaptığımız kısım yukarıda. Kendi Cassandra cluster bilgilerinize ve uygulamanın koşacağı sunucunun kapasitesine göre ayarları değiştirebilirsiniz. Cassandra'dan büyük hacimde veri çekileceğinden en mantıklı yaklaşım Cassandra verisinin tutulduğu sunucu üzerinde bu kodu çalıştırmak olacaktır.

                JavaSparkContext sc = new JavaSparkContext(conf);
      
    SparkContextJavaFunctions functions =                     CassandraJavaUtil.javaFunctions(sc);
    JavaRDD<CassandraRow> rdd = functions.cassandraTable("[cassandra keyspace adı]", "[cassandra tablo adı]") .select("id").where("createDate > ?", sDate).where("createDate < ?", eDate).where("id > ?", 100);

Bu kısımda JavaSparkContext sınıfından bir örnek oluşturarak, bununla Cassandra'daki tablomuzdan belirldiğimiz tarih aralığında insert edilmiş, değeri 100'den büyük id değerine sahip kayıtları çekip rdd nesnemizin içine alıyoruz.

                rdd.saveAsTextFile("Test.txt");

Sonuçları text dosyasına kaydetmek tek satır ile mümkün. Tüm verimiz belirlediğimi dizinde dosyaya yazılıyor olacak.

Kodu Windows'ta çalıştırdığınızda aşağıdaki gibi bir hata alınması olası.

Could not locate executable null\bin\winutils.exe in the Hadoop binaries

Bu durumda aşağıdaki adımları takip ederek bu sorunu kolayca bertaraf edebiliriz.


  1. http://public-repo-1.hortonworks.com/hdp-win-alpha/winutils.exe. adresinden uygulamayı indirin
  2. Uygulamanın çalıştığı yerde C:\winutils\bin dizin yapısını oluşturun
  3. Oluşturduğunuz dizine indirdiğiniz winutils.exe dosyasını kopyalayın.
  4. Java kodunuza aşağıdaki satırı ekleyin.
  5. Başka bir sorun yoksa başarılı bir şekilde işlem tamamlanmış demektir.
                  System.setProperty("hadoop.home.dir", "C:\\winutils");

       

Apache Spark İle Büyük Veri Analizi


Apache Spark en basit tanımıyla hızlı ve performanslı olarak "büyük veriyi" analiz etmek için kullanılan bir teknoloji. Daha çok Hadoop MapReduce ile karşılaştırılsa da aslında bazı noktalarda örtüşüp bazı yerlerde farklılaşırlar. Genel olarak aşağıdaki özelliklere sahip :


  • MapReduce'den memory işlemlerinde 100x, disk işlemlerinde 10x daha hızlı
  • Java, Scala, Python dil desteği
  • Kolay kullanım
  • Farklı veri kaynakları ile kolay entegrasyon
  • Akış programlama (streaming), SQL ve komplex analitik desteği.

Yukarıda sayılan özelliklerden ötürü son zamanlarda popüler olmaya başlayan bir büyük veri analiz teknolojisi olarak öne çıkan Apache Spark'ı Apache Cassandra'ya attığımız büyük veri yığınları için de kullanmak mümkün. Datastax'ın ücretsiz olarak sunduğu connector kütüphanesi ile büyük veri ile dilediğiniz gibi sorgular koşup, çıktıları yeni bir tabloya yazabilir, her hangi bir dosyaya kaydedebilirsiniz. 

Örnek olarak günlük 5-6M kayıt attığınız, bir kaç yüz milyon satır veri içeren, Cassandra'da yer alan bir tablo düşünelim. NoSQL yapısı gereği her sorguyu SQL'deki gibi çalıştıramadığımızdan Apache Spark burada devreye giriyor. Cassandra'da yer alan tablomuza bağlanıp istediğimiz filtreye göre veri çekip bu veri üzerinde "count", "group by", "join" gibi NoSQL dünyasında istediğimiz gibi koşamadığımız fonksiyonları rahatça koşabiliriz. Spark temel olarak verimizi ilgili kaynaktan parça parça çekerek, dağıtık sistem üzerinde (spark cluster) sonuçları üreterek bize dilediğimiz çıktıyı üretir. Hadoop MapReduce'teki disk üzerinde gerçekleşen map ve reduce işlemleri, Spark tarafından memory'de gerçekleştiğinden çok daha hızlı sonuçlar alınır.