Apache Spark2 CSV Okuma, Şema Oluşturma, Dataframe Üzerinde SQL Sorguları (Scala)

Merhaba, bu yazımızda Spark Dataframe oluştururken ve dataframe üzerinde veri keşfi yaparken kullandığım bazı işlemleri paylaşacağım. Basit bir yazı olacak ancak bunu önemsiyorum çünkü birçok insan veri yükleme esnasında sorun yaşıyor ve bir sürü zaman kaybediyor. Daha önce de buna benzer yazı yazmıştım ancak o zaman Spark1 kullanmıştım. Artık Spark2 var. Spark 2.0 sürümüyle birlikte dataframe API yönünde kanat kırdı. Sizlere tavsiyem Spark RDD API ile hiç uğraşmayın. Yüzünüzü Spark2 Dataframe API’ye çevirin. Aslında bu Spark için de güzel birşey, gelecek veri tiplerinden emin olarak iş yapıyor, içi rahat oluyor yani 🙂

Bu yazıda kullanacağım araçlar şunlardır:

  • Spark Sürümü: Spark 2.1.1
  • Geliştirme Ortamı: Apache Zeppelin Notebook
  • Kaynak Yönetimi: Apache YARN (Yani Spark’ı YARN modunda çalıştırıyorum)
  • Veri Depolama: Hadoop HDFS
  • Programlama Dili: Scala

Zeppelin kullandığım için ayrıca SparkContext() yaratmayacağım, çünkü Zeppelin bu işi bizim içi yapıyor. Ayrıca interpreter ayarlarımda Spark2 varsayılan interpereter olduğundan her paragrafta %Spark2 diye belirtmeme gerek yok. Kodların olduğu kutucuklarda açık yeşil ile seçilmiş satırlar sorgu sonuçlarını gösterir, kod değildir. Veri seti SanFransico itfaiye teşkikatının olaylara müdahale ile ilegili bilgilerin bulunduğu bir veridir. Kullandığım veri setini buradan indirebilir ve detaylı bilgiler edinebilirsiniz. Bu Youtube videosunda Pyspark ile yapılmış benzer işlemleri görebilirsiniz. Umarım biz de de bu tür veriler tutulmaya ve paylaşılmaya başlanır. Hadoop clusterımda node3 adında bir EdgeNode’um var. Veri setini buradaki /home/erkan/veri_setlerim/ dizinine indirdim. Buradan örnek çalışmalar için kullandığım  HDFS dizni olan /user/erkan/veri_setlerim/‘e aşağıdaki kodlarla kopyaladım.

[erkan@node3 ~]$ hdfs dfs -put /home/erkan/veri_setlerim/Fire_Department_Calls_for_Service.csv /user/erkan/veri_setlerim/

Evet artık HDFS’de veri setimiz hazır.

Spark Versiyonunu Öğrenme

Spark versiyonunu basit bir kod ile öğrenebiliriz.

spark.version
res7: String = 2.1.1.2.6.2.0-205
Spark ile CSV Uzantılı Dosyaları Okumak

HDFS’de bulunan dosyamızı okuyalım ve bir dataframe oluşturalım

val itfaiyeDF = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/user/erkan/veri_setlerim/Fire_Department_Calls_for_Service.csv")
df: org.apache.spark.sql.DataFrame = [Call Number: int, Unit ID: string ... 32 more fields]
Took 23 sec. Last updated by admin at October 29 2017, 7:56:51 AM

Spark’da csv uzantılı dosyaları okurken bir çok seçenek var biz bunlardan sadece iki tanesini kullandık; header ve inferSchema. inferSchema veriden örneklem alarak sütunların veri türlerini belirlemeye çalışır. Böylelikle şema oluşturmak için uğraşmayız. Ancak bunun bir yan etkisi de zaman alması ve sütun isimlerinde  boşluk bırakmak gibi bazı yan etkilerinin olmasıdır. Şimdi Yüklediğimiz veri setini printSchema() ile görelim.

itfaiyeDF.printSchema()
root
 |-- Call Number: integer (nullable = true)
 |-- Unit ID: string (nullable = true)
 |-- Incident Number: integer (nullable = true)
 |-- Call Type: string (nullable = true)
 |-- Call Date: string (nullable = true)
 |-- Watch Date: string (nullable = true)
 |-- Received DtTm: string (nullable = true)
 |-- Entry DtTm: string (nullable = true)
 |-- Dispatch DtTm: string (nullable = true)
 |-- Response DtTm: string (nullable = true)
 |-- On Scene DtTm: string (nullable = true)
 |-- Transport DtTm: string (nullable = true)
 |-- Hospital DtTm: string (nullable = true)
 |-- Call Final Disposition: string (nullable = true)
 |-- Available DtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode of Incident: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- Station Area: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- Original Priority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- Final Priority: integer (nullable = true)
 |-- ALS Unit: boolean (nullable = true)
 |-- Call Type Group: string (nullable = true)
 |-- Number of Alarms: integer (nullable = true)
 |-- Unit Type: string (nullable = true)
 |-- Unit sequence in call dispatch: integer (nullable = true)
 |-- Fire Prevention District: string (nullable = true)
 |-- Supervisor District: string (nullable = true)
 |-- Neighborhooods - Analysis Boundaries: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- RowID: string (nullable = true)

Okuma süresini azaltmak ve sütun isimlerini değiştirerek okumak istiyorsak kendimiz bir şema hazırlayarak okuma esnasında schema() kullanmalıyız.

Spark Dataframe için Şema Hazırlamak

Şema hazırlamak için gerekli kütüphaneleri indirelim ve yukarıda yazdırdığımız şemanın aynısını kendimiz hazırlayalım. Ancak sütun isimlerini birleştirelim.

import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType, BooleanType, DateType, TimestampType}
val itfaiyeDFSchema = StructType(Array(
 StructField("CallNumber", IntegerType, true),
 StructField("UnitID", StringType, true),
 StructField("IncidentNumber", IntegerType, true),
 StructField("CallType", StringType, true),
 StructField("CallDate", StringType, true),
 StructField("WatchDate", StringType, true),
 StructField("ReceivedDtTm", StringType, true),
 StructField("EntryDtTm", StringType, true),
 StructField("DispatchDtTm", StringType, true),
 StructField("ResponseDtTm", StringType, true),
 StructField("OnSceneDtTm", StringType, true),
 StructField("TransportDtTm", StringType, true),
 StructField("HospitalDtTm", StringType, true),
 StructField("CallFinalDisposition", StringType, true),
 StructField("AvailableDtTm", StringType, true),
 StructField("Address", StringType, true),
 StructField("City", StringType, true),
 StructField("ZipcodeOfIncident", IntegerType, true),
 StructField("Battalion", StringType, true),
 StructField("StationArea", StringType, true),
 StructField("Box", StringType, true),
 StructField("OriginalPriority", StringType, true),
 StructField("Priority", StringType, true),
 StructField("FinalPriority", IntegerType, true),
 StructField("ALSUnit", StringType, true),
 StructField("CallTypeGroup", StringType, true),
 StructField("NumberOfAlarms", StringType, true),
 StructField("UnitType", StringType, true),
 StructField("UnitSequenceInCallDispatch", IntegerType, true),
 StructField("FirePreventionDistrict", StringType, true),
 StructField("SupervisorDistrict", StringType, true),
 StructField("NeighborhooodsAnalysisBoundaries", StringType, true),
 StructField("Location", StringType, true),
 StructField("RowID", StringType, true)
 ))
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType, BooleanType, DateType, TimestampType}itfaiyeDFSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,IntegerType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(ReceivedDtTm,StringType,true), StructField(EntryDtTm,StringType,true), StructField(DispatchDtTm,StringType,true), StructField(ResponseDtTm,StringType,true), StructField(OnSceneDtTm,StringType,true), StructField(TransportDtTm,StringType,true), StructField(HospitalDtTm,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(Available DtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipco...

şimdi verimizi tekrar okuyalım ancak inferSchema() yerine kendi şemamızı kullanalım. Bu sefer ikinci option() yerine schema() fonksiyonunu kullanacağız.

val itfaiyeDF = spark.read.format("csv").option("header","true").schema(itfaiyeDFSchema).load("/user/erkan/veri_setlerim/Fire_Department_Calls_for_Service.csv")
itfaiyeDF.printSchema()
itfaiyeDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 32 more fields]
root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- ReceivedDtTm: string (nullable = true)
 |-- EntryDtTm: string (nullable = true)
 |-- DispatchDtTm: string (nullable = true)
 |-- ResponseDtTm: string (nullable = true)
 |-- OnSceneDtTm: string (nullable = true)
 |-- TransportDtTm: string (nullable = true)
 |-- HospitalDtTm: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipcodeOfIncident: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: string (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumberOfAlarms: string (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- NeighborhooodsAnalysisBoundaries: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- RowID: string (nullable = true)
 
Took 0 sec. Last updated by admin at October 29 2017, 11:28:31 AM.
Spark Dataframe’i SQL Tablosu Olarak Kaydetme ve Sorgu Çalıştırma

SparkSQL doğrudan SQL sorgularını çalıştırmamıza olanak veriyor. Üsetlik bunu kullanmak çok kolay. Dataframe’i geçici bir SQL tablosu olarak kaydediyoruz arkasındanda bu tablo sanki gerçek bir SQL tablosuymuş gibi SQL sogularımızı çalıştıryoruz. Önce tablomuzu kaydedelim. Bunun için createOrReplaceTempView("KendimizBelirliyoruz") kullanıyoruz. Ben tabloya itfaiyeTable dedim siz başka bir isim verebilirsiniz.

itfaiyeDF.createOrReplaceTempView("itfaiyeTable")

Şimdi de basit bir SQL sorgusu çalıştıralım. Ben Zeppelin kullandığım için paragrafa %sql ile başlamak zorundayım.

%sql
SELECT * FROM itfaiyeTable LIMIT 10

Sorgu sonucu:

Sütun sayısı çok olduğu için diğer veriyi görmek için sağa kaydırmak gerekir.
Hoşçakalın…

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Barkod Etiketi üretimi yapan firmaların işi ciddi bir iştir. Bu anlamda sizin de hangi firmayla çalışma yatığınız çok büyük önem taşır. Kullanım alanı sınırsızdır. Her alanda ve her sektörde bu etiketlere ihtiyaç duyulur. Etiket çeşitleri ve Barkod etiketleri, seri üretimle hazırlanmaktadır. Etiketler ahşap, plastik, metal ya da cam gibi ambalajlı ürünlerin üzerilerine ugulanır.
Mide botoksu midenin belirli yerlerine botoks maddesi enjekte etme suretiyle midedeki kasların çalışmasını sınırlandırmayı ve sayede midenin gıdaları sindirim sürecini yavaşlatarak buna bağlı olan açlık-tokluk hissi süresinin de uzatılmasını amaçlayan ameliyatsız kolay kilo verme tedavisidir. Botoks uygulanırken, midenin detaylı şekilde içerden görüntülenmesini sağlayan endoskopi uygulaması ile gerçekleştirilir. Bu sayede hastaya sadece gastroskpik uygulaması kadar bir rahatsızlık olur. Özellikle diyet programlarına ve düzenli egzersizlere uymakta zorlanan ve buna bağlı olarak da obeziteye yakalanan, bu yüzdende obezitenin sebep olduğu çeşitli sağlık sorunları olan kişiler için mide botoksu bir devrim niteliğindedir ve son yıllarda ülkemizde yaygın olarak kullanılmaktadır.
En güzel cami halısı dış avlusu olup bunun çevresi pencereli duvarlarla çevrilidir. Bu avulya 3 ü cephede olmak üzere, 8 kapıdan girilir. Şadırvan avlusu, 26 adet granit mermer ve porfir sütuna oturtulmuş, 30 kubbeyle çevrili geniş alandır. Mermer döşemeli bu geniş sahanın ortasında 6 mermer sütunlu şadırvan, sahanın azametini gösterir. Şadırvanın kemerleri, kabartma olarak Rumi geçmelerle ve köşebentleri, kabartma, lale ve karanfil motifleriyle bezelidir.
En güzel cami halısı dış avlusu olup bunun çevresi pencereli duvarlarla çevrilidir. Bu avulya 3 ü cephede olmak üzere, 8 kapıdan girilir. Şadırvan avlusu, 26 adet granit mermer ve porfir sütuna oturtulmuş, 30 kubbeyle çevrili geniş alandır. Mermer döşemeli bu geniş sahanın ortasında 6 mermer sütunlu şadırvan, sahanın azametini gösterir. Şadırvanın kemerleri, kabartma olarak Rumi geçmelerle ve köşebentleri, kabartma, lale ve karanfil motifleriyle bezelidir.
Termal Etiket Eco Termal etiket, yüzeyinde hami bir katman bulunmayan miktar çeşididir. Kumbara üzerine termal lamine edilmesi sonucunda oluşmaktadır. Kullanılan barkod yazıcının baş bölgesindeki ısı beraberlik birlikte termal sıvılaşma özelliği gösterir dahi bu şekilde Eco termal etiketin üzerine baskı alınır. Bu termal etiketlere yumruk termal olarak (ısıyla) yapılır ve yerde yüzden ribon kullanılmaz. Ribon kullanımı olmadığı için tahakküm maliyeti sıfıra yakındır.
Dijital Baskı ve baskı etiketi teknolojileri geliştikçe firmaların büyük ebatlı etiket ihtiyaçlarına da dijital çözümler sunulmaya başlamıştır. Böylece, birbirinden canlı renklerin ve kusursuz çizgilerin hakimiyetindeki büyük ebatlı dijital baskı etkileri; kurumsal firmaların reklam kampanyalarındaki en iddialı unsurlarına dönüşmüştür.