Search     or:     and:
 LINUX 
 Language 
 Kernel 
 Package 
 Book 
 Test 
 OS 
 Forum 
 iakovlev.org 
 Packages
 Make 
 Iptables 
 Nmap 
 Apache 
 LFS 
 TUX 
 cURL 
 libpcap 
 Parted 
 Httpd 
 File managers 
 FFMPEG 
 RTMP 
 SQL 
 Test 
 Git 
NEWS
Последние статьи :
  Тренажёр 16.01   
  Эльбрус 05.12   
  Алгоритмы 12.04   
  Rust 07.11   
  Go 25.12   
  EXT4 10.11   
  FS benchmark 15.09   
  Сетунь 23.07   
  Trees 25.06   
  Apache 03.02   
 
TOP 20
 Linux Kernel 2.6...5164 
 Trees...935 
 Максвелл 3...861 
 Go Web ...814 
 William Gropp...795 
 Ethreal 3...779 
 Ethreal 4...766 
 Gary V.Vaughan-> Libtool...764 
 Rodriguez 6...756 
 Steve Pate 1...749 
 Clickhouse...748 
 Ext4 FS...748 
 Ethreal 1...736 
 Secure Programming for Li...720 
 C++ Patterns 3...711 
 Ulrich Drepper...693 
 Assembler...687 
 DevFS...655 
 Стивенс 9...644 
 MySQL & PosgreSQL...621 
 
  01.01.2024 : 3621733 посещений 

iakovlev.org

Apache Spark

Apache Spark – это универсальная и высокопроизводительная кластерная вычислительная платформа.
Фреймворк создан для того, чтобы охватить более широкий диапазон рабочих нагрузок, которые прежде требовали создания отдельных распределенных систем, включая приложения пакетной обработки, циклические алгоритмы, интерактивные запросы и потоковую обработку. Поддерживая все эти виды задач с помощью единого механизма, Spark упрощает и удешевляет объединение разных видов обработки, которые часто необходимо выполнять в едином конвейере обработки данных. Кроме того, он уменьшает бремя обслуживания, поддерживая отдельные инструменты.

Apache Spark работает в парадигме резидентных вычислений — обрабатывает данные в оперативной памяти, благодаря чему позволяет получать значительный выигрыш в скорости работы для некоторых классов задач, в частности, возможность многократного доступа к загруженным в память пользовательским данным делает библиотеку привлекательной для алгоритмов машинного обучения.

Проект предоставляет программные интерфейсы для языков Java, Scala, Python, R. Изначально написан на Scala, впоследствии добавлена существенная часть кода на Java для предоставления возможности написания программ непосредственно на Java.
В этом документе мы будем использовать Python.
Состоит из ядра и нескольких расширений, таких как:
Spark SQL - позволяет выполнять SQL-запросы над данными
Spark Streaming - надстройка для обработки потоковых данных
Spark MLlib - набор библиотек машинного обучения
GraphX - предназначено для распределённой обработки графов.
Может работать как в среде кластера Hadoop под управлением YARN, так и без компонентов ядра Hadoop, поддерживает несколько распределённых систем хранения — HDFS, OpenStack Swift, NoSQL-СУБД Cassandra, Amazon S3.

В этой статье описана Standalone установка Apache Spark без привязки к Hadoop

1. Установка Apache Spark

Создаем пользователя spark

 addgroup apache
 adduser --ingroup apache spark
 usermod -a -G sudo spark
 
Качаем Apache Spark

 wget -c 'https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz'
 tar -xf spark-3.2.1-bin-hadoop3.2.tgz
 
Копируем в /usr/local/spark
Копируем шаблон ../conf/spark-env.sh.template

 cp spark-env.sh.template spark-env.sh
 
В конец файла spark-env.sh копируем 3 строкм:

 SPARK_MASTER_HOST=localhost
 export PYSPARK_PYTHON=/usr/bin/python3
 export PYSPARK_DRIVER_PYTHON=python3
 
Копируем второй шаблон

 cp workers.template workers
 
В конце этого файла по умолчанию стоит строка

 localhost
 
Это значит, что мы используем standalone версию
В конец файла .bashrc пользователя spark добавляем

 export SPARK_HOME=/usr/local/spark
 export PATH=$PATH:$SPARK_HOME/bin
 export PYSPARK_PYTHON=/usr/bin/python3
 export PYSPARK_DRIVER_PYTHON=python3
 
Дадим права

 sudo chown -R spark:apache /usr/local/spark 
 
Все, Apache Spark готов к работе
Заходим в систему как пользователь spark
Запускаем:

 start-all.sh
 

2. Работа с Рyspark

Запускаем питоновскую командную строку:

 pyspark
 
Мы попали в командный интерпретатор.

Apache Spark работает в архитектуре master-slave, где master называется “Driver”, а slave называются "Worker”. Когда вы запускаете приложение Spark, драйвер Spark создает контекст, который является точкой входа в ваше приложение, и все операции выполняются на рабочих узлах, а ресурсами управляет диспетчер кластеров.
Spark поддерживает следующие менеджеры кластеров:
Standalone – простой менеджер кластеров, входящий в состав Spark. Это простейший менеджер, который используется в этой статье
Apache Mesos - это менеджер кластеров, который также может запускать приложения Hadoop MapReduce и PySpark.
Hadoop YARN – менеджер ресурсов в Hadoop 2.
Kubernetes – система для автоматизации развертывания, масштабирования и управления контейнерными приложениями.

Основные модули PySpark:
PySpark RDD (pyspark.RDD)
PySpark DataFrame and SQL (pyspark.sql)
PySpark Streaming (pyspark.streaming)
PySpark MLib (pyspark.ml, pyspark.mllib)
PySpark GraphFrames (GraphFrames)
PySpark Resource (pyspark.resource) (PySpark 3.0)


3. Рyspark RDD

PySpark RDD (Resilient Distributed Dataset) - это фундаментальная структура данных PySpark, которая является отказоустойчивой, неизменяемой распределенной коллекцией объектов, что означает, что после создания RDD вы не можете ее изменить. Каждый набор данных в RDD разделен на логические разделы, которые могут быть вычислены на разных узлах кластера.

Для создания RDD нужно прежде создать обьект SparkSession, для этого есть 2 метода:
builder()
newSession()

Внутри сессии создается другой обьект - SparkContext.
Обьектов SparkSession можно создавать сколько угодно, но глобальный обьект SparkContext всегда будет один.
Пример создания сессии:

 from pyspark.sql import SparkSession
 
 spark = SparkSession.builder \
       .master("local[1]") \
       .appName("SparkByExamples.com") \
       .getOrCreate() 
 
У обьекта SparkContext есть несколько встроенных методов - parallelize():

 dataList = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]
 rdd=spark.sparkContext.parallelize(dataList)
 
textFile():

 rdd2 = spark.sparkContext.textFile("/path/test.txt")
 
У RDD все-же есть возможность для апдэйта, при этом будет создаваться другой RDD. Это называется
RDD transformations
Эти преобразования бывают следующих видов:
flatMap()
map()
reduceByKey()
filter()
sortByKey()

Также у RDD есть акции - actions - которые выполняют какие-то вычисления внутри RDD и возвращают какой-то результат.
Акции бывают следующих видов:
count()
collect()
first()
max()
reduce()

4. Pyspark DataFrame

DataFrame - это распределенный набор данных, организованный в именованные столбцы. Концептуально он эквивалентен таблице в реляционной базе данных или фрейму данных , но с более богатой оптимизацией. Фреймы данных могут быть созданы из широкого спектра источников, таких как файлы структурированных данных, таблицы в Hive, внешние базы данных или существующие RDDS.
pandas DataFrame отличается от Pyspark DataFrame тем, что выполняется на одной ноде.

Создание DataFrame:

 data = [('James','','Smith','1991-04-01','M',3000),
   ('Michael','Rose','','2000-05-19','M',4000),
   ('Robert','','Williams','1978-09-05','M',4000),
   ('Maria','Anne','Jones','1967-12-01','F',4000),
   ('Jen','Mary','Brown','1980-02-17','F',-1)
 ]
 
 columns = ["firstname","middlename","lastname","dob","gender","salary"]
 df = spark.createDataFrame(data=data, schema = columns)
 df.printSchema()
 df.show()
 
Как и RDD, DataFrame имеет свои собственные операции типа Transformations и Actions.

DataFrame могут создаваться из внешних источников, таких как HDFS, S3 Azure, HBase, MySQL и т.д. Пример создания из локального CSV файла:

 df = spark.read.csv("/tmp/resources/zipcodes.csv")
 df.printSchema()
 
DataFrame поддерживает следующие форматы файлов:
csv
text
Avro
Parquet
tsv
xml
и другие.

5. Pyspark SQL

PySpark SQL - один из наиболее часто используемых модулей PySpark, который используется для обработки структурированного столбчатого формата данных. После создания фрейма данных вы можете взаимодействовать с данными, используя синтаксис SQL.
Spark SQL предоставляет собственные SQL-запросы , что означает, что вы можете запускать традиционные ANSI SQL в Spark Dataframe, используя select, where, group by, join, union и т. д.
Сначала нужно создать временную таблицу во фрейме данных с помощью функции createOrReplaceTempView(). После создания к этой таблице можно получить доступ во время SparkSession с помощью sql().

 df.createOrReplaceTempView("PERSON_DATA")
 df2 = spark.sql("SELECT * from PERSON_DATA")
 df2.printSchema()
 df2.show()
 
 groupDF = spark.sql("SELECT gender, count(*) from PERSON_DATA group by gender")
 groupDF.show()
 

6. Pyspark Streaming

PySpark Streaming - это масштабируемая, высокопроизводительная, отказоустойчивая система потоковой обработки, которая поддерживает как пакетные, так и потоковые рабочие нагрузки. Он используется для обработки данных в реальном времени из таких источников, как файловая система, сокет TCP, Kafka, Twitter, Amazon Kinesis, и это лишь некоторые из них. Обработанные данные могут быть отправлены в базы данных, Kafka, live dashboards и т. д.
Используя readStream.format("socket"), можно читать данные из сокета, задавая в качестве параметров хост, порт:

 df = spark.readStream
       .format("socket")
       .option("host","localhost")
       .option("port","9090")
       .load()
 df.printSchema() 
 
 query = count.writeStream
       .format("console")
       .outputMode("complete")
       .start()
       .awaitTermination()      
 
Возможно также читать данные из Kafka, писать в Kafka в форматах TEXT, CSV, AVRO, JSON:

 df = spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "192.168.1.100:9092")
         .option("subscribe", "json_topic")
         .option("startingOffsets", "earliest") // From starting
         .load()
 
 df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
    .writeStream
    .format("kafka")
    .outputMode("append")
    .option("kafka.bootstrap.servers", "192.168.1.100:9092")
    .option("topic", "josn_data_topic")
    .start()
    .awaitTermination()        
 

7. Немного теории графов

На самом абстрактном уровне анализ графов применяется для прогнозирования поведения и предписания действий для динамических групп. Для этого требуется понимание отношений и структуры внутри группы. Графовые алгоритмы достигают этого понимания путем изучения общей природы сетей через их соединения. Благодаря такому подходу вы в результате сможете понять топологию связанных систем и смоделировать их процессы.

Вот несколько типов задач, в которых используются графовые алгоритмы:
1. изучение путей распространения заболевания или каскадного транспортного сбоя;
2. выявление наиболее уязвимых или вредоносных компонентов при сетевой атаке;
3. определение наименее затратного или самого быстрого способа маршрутизации информации или ресурсов;
4. предсказание недостающие связей в ваших данных;
5. выявление прямого и косвенного влияния в сложной системе;
6. обнаружение невидимых иерархий и зависимостей;
7. прогнозирование, будут ли группы объединяться или распадаться;
8. поиск перегруженных или недогруженных узлов в сетях;
9. выявление сообщества на основе поведения и создание персональных рекомендаций;
10. уменьшение количества ложных срабатываний при обнаружении мошенничества и аномалий;
11. извлечение дополнительных признаков для машинного обучения.

В классической теории графов термин граф приравнивается к простому (simple), или строгому (strict), графу, где две вершины связывает только одно отношение. Однако большинство графов реального мира имеют несколько связей между вершинами и даже связи, замкнутые на себя:


В следующей таблице приведены ключевые обобщенные признаки графов:



Существуют три популярных области анализа, которые используют графовые алгоритмы:
1. Поиск пути
Пути являются основополагающим понятием в анализе графов и графовых алгоритмов.
Поиск кратчайших путей, является одной из наиболее частых задач, решаемых с помощью графовых алгоритмов, и является предшественником различных типов анализа. Кратчайший путь – это маршрут движения с наименьшим количеством переходов или самым низким суммарным весом ребер. Если граф ориентированный, то это самый короткий маршрут между двумя узлами с учетом разрешенных направлений.
2. Поиск веса (важности) узла
Это представление о том, какие узлы важнее в сети. Существуют различные типы алгоритмов , созданных для измерения разных свойств, например, таких как способность быстро распространять информацию.
3. Поиск связанности узлов.
Связанность – это основная концепция теории графов, которая позволяет проводить сложный сетевой анализ, например поиск сообществ. Большинство реальных сетей имеют подструктуры (часто квазифрактальные) из более или менее независимых подграфов.

GraphFrames – это библиотека обработки графов для Spark, сменившая GraphX в 2016 году. Библиотека GraphFrames основана на GraphX, но использует объекты DataFrame в качестве базовой структуры данных. GraphFrames поддерживает языки программирования Java, Scala и Python.

Вершины и ребра графа представлены в виде DataFrame с уникальным идентификатором для каждой вершины и указанием начальной и конечной вершины для каждого отношения. Мы можем увидеть пример описания вершины DataFrame в табл. 3.1 и описания ребра DataFrame в табл. 3.2:

Граф GraphFrame, основанный на DataFrame из этого примера, будет иметь две вершины, JFK и SEA, и одно направленное ребро от JFK до SEA.
Таблица DataFrame для вершин должна иметь столбец id – значение в этом столбце используется для уникальной идентификации каждой вершины. Ребра в таблице DataFrame должны иметь столбцы src и dst – значения в этих столбцах описывают, какие вершины связаны ребром, и должны ссылаться на записи в столбце id таблицы вершин DataFrame.

8. Поиск пути

В следующей таблице приведен обзор алгоритмов нахождения пути и поисковых алгоритмов:

Примеры в этом разделы работают с графом, содержащим подмножество европейской дорожной сети.
Прежде, чем выполнять эти примеры, нужно предварительно загрузить библиотеку graphframes в виде бинарного файла .jar нужной версии, который можно взять по адресу:
https://spark-packages.org/package/graphframes/graphframes
Этот файл нужно положить в каталог
/usr/local/spark/jars

Нам понадобятся две таблицы
Первая таблица находится в файле transport-nodes.csv(вершины). В ней 4 поля - название города, широта, долгота и население:


Вторая таблица - файл transport-relationships.csv (ребра). В ней 4 поля - 2 прилегающих города, тип дороги и расстояние между городами:


Загрузим данные из csv-файлов (import_graph.py):

 from pyspark.sql.types import *
 from graphframes import *
 from pyspark import SparkConf, SparkContext
 from pyspark.sql import SparkSession
 
 def create_transport_graph(spark):
     node_fields = [
         StructField("id", StringType(), True),
         StructField("latitude", FloatType(), True),
         StructField("longitude", FloatType(), True),
         StructField("population", IntegerType(), True)
     ]
     nodes = spark.read.csv("transport-nodes.csv", header=True,
                            schema=StructType(node_fields))
 
     rels = spark.read.csv("transport-relationships.csv", header=True)
     reversed_rels = rels.withColumn("newSrc", rels.dst) \
         .withColumn("newDst", rels.src) \
         .drop("dst", "src") \
         .withColumnRenamed("newSrc", "src") \
         .withColumnRenamed("newDst", "dst") \
         .select("src", "dst", "relationship", "cost")
 
     relationships = rels.union(reversed_rels)
 
     return GraphFrame(nodes, relationships)
 
 conf = SparkConf().setMaster("local").setAppName("FriendsByAge")
 sc = SparkContext(conf = conf)
 spark = SparkSession.builder \
       .master("local[1]") \
       .appName("SparkByExamples.com") \
       .getOrCreate() 
 gf = create_transport_graph(spark)
 print(gf)
 
Положим файл import_graph.py в каталог ../spark/bin/ , там же лежат два csv-файла. Выполним этот файл, не заходя в pyspark, с помощью

 spark-submit import_graph.py
 


Переведя сырые данные из csv-файлов в dataframe, теперь можно начать графовый поиск.

Поиск в ширину (breadth first search, BFS) – один из фундаментальных алгоритмов обхода графа. Он начинается с выбранной вершины и исследует всех ее соседей на расстоянии одного перехода, а затем посещает всех соседей на расстоянии двух переходов и т. д.
BFS чаще всего используется в качестве основы для других более целенаправленных алгоритмов. Например, алгоритмы кратчайшего пути, связанных компонентов и центральности применяют алгоритм BFS. Его также можно использовать для поиска кратчайшего пути между вершинами.

Алгоритм поиска в ширину на платформе Spark находит кратчайший путь между двумя вершинами по количеству связей (т. е. переходов) между ними. Вы можете явно назвать вашу целевую вершину или задать критерии, которые должны быть выполнены.

Например, вы можете использовать функцию bfs, чтобы найти первый город среднего размера (по европейским стандартам) с населением от 100 000 до 300 000 человек. Давайте сначала проверим, в каких местах население соответствует этим критериям. Для этого добавим в наш питоновский файл следующий код:

 (gf.vertices
 .filter("population > 100000 and population < 300000")
 .sort('population')
 .show())
 
В ответ мы получим вывод:

 +----------+--------+---------+----------+
 |        id|latitude|longitude|population|
 +----------+--------+---------+----------+
 |Colchester|51.88921|  0.90421|    104390|
 |   Ipswich|52.05917|  1.15545|    133384|
 +----------+--------+---------+----------+
 
Найдено всего два города по выбранному критерию - Ипсвич (Ipswich) и Колчестер (Colchester).
Следующий код находит кратчайший путь от Гааги до города среднего размера:

 from_expr = "id='Den Haag'"
 to_expr = "population > 100000 and population < 300000 and id <> 'Den Haag'"
 result = gf.bfs(from_expr, to_expr)
 columns = [column for column in result.columns if not column.startswith("e")]
 result.select(columns).show()
 
Получим вывод - ближайший к Гааге город 100-сячник - это Ипсвич:

 +--------------------+--------------------+--------------------+--------------------+
 |                from|                  v1|                  v2|                  to|
 +--------------------+--------------------+--------------------+--------------------+
 |{Den Haag, 52.078...|{Hoek van Holland...|{Felixstowe, 51.9...|{Ipswich, 52.0591...|
 +--------------------+--------------------+--------------------+--------------------+
 


Вычислим минимальный кратчайший путь между Амстердамом и Колчестером. В предыдущем примере в качестве критерия минимальности мы испоьзовали минимальное количество узлов. Здесь, в отличие от предыдущего примера, в качестве основного критерия мы выбираем минимальное суммарное расстояние между двумя городами. Расстояние - это столбик cost:

 add_path_udf = F.udf(lambda path, id: path + [id], ArrayType(StringType()))
 # // end::udfs[]
 
 
 # // tag::custom-shortest-path[]
 def shortest_path(g, origin, destination, column_name="cost"):
     if g.vertices.filter(g.vertices.id == destination).count() == 0:
         return (spark.createDataFrame(sc.emptyRDD(), g.vertices.schema)
                      .withColumn("path", F.array()))
 
     vertices = (g.vertices.withColumn("visited", F.lit(False))
                           .withColumn("distance", F.when(g.vertices["id"] == origin, 0)
                                                    .otherwise(float("inf")))
                           .withColumn("path", F.array()))
     cached_vertices = AM.getCachedDataFrame(vertices)
     g2 = GraphFrame(cached_vertices, g.edges)
 
     while g2.vertices.filter('visited == False').first():
         current_node_id = g2.vertices.filter('visited == False').sort("distance").first().id
 
         msg_distance = AM.edge[column_name] + AM.src['distance']
         msg_path = add_path_udf(AM.src["path"], AM.src["id"])
         msg_for_dst = F.when(AM.src['id'] == current_node_id, F.struct(msg_distance, msg_path))
         new_distances = g2.aggregateMessages(F.min(AM.msg).alias("aggMess"),
                                              sendToDst=msg_for_dst)
 
         new_visited_col = F.when(
             g2.vertices.visited | (g2.vertices.id == current_node_id), True).otherwise(False)
         new_distance_col = F.when(new_distances["aggMess"].isNotNull() &
                                   (new_distances.aggMess["col1"] < g2.vertices.distance),
                                   new_distances.aggMess["col1"]) \
                             .otherwise(g2.vertices.distance)
         new_path_col = F.when(new_distances["aggMess"].isNotNull() &
                               (new_distances.aggMess["col1"] < g2.vertices.distance),
                               new_distances.aggMess["col2"].cast("array")) \
                         .otherwise(g2.vertices.path)
 
         new_vertices = (g2.vertices.join(new_distances, on="id", how="left_outer")
                                    .drop(new_distances["id"])
                                    .withColumn("visited", new_visited_col)
                                    .withColumn("newDistance", new_distance_col)
                                    .withColumn("newPath", new_path_col)
                                    .drop("aggMess", "distance", "path")
                                    .withColumnRenamed('newDistance', 'distance')
                                    .withColumnRenamed('newPath', 'path'))
         cached_new_vertices = AM.getCachedDataFrame(new_vertices)
         g2 = GraphFrame(cached_new_vertices, g2.edges)
         if g2.vertices.filter(g2.vertices.id == destination).first().visited:
             return (g2.vertices.filter(g2.vertices.id == destination)
                                .withColumn("newPath", add_path_udf("path", "id"))
                                .drop("visited", "path")
                                .withColumnRenamed("newPath", "path"))
     return (spark.createDataFrame(sc.emptyRDD(), g.vertices.schema)
                  .withColumn("path", F.array()))
 
 result = shortest_path(g, "Amsterdam", "Colchester", "cost")
 result.select("id", "distance", "path").show(truncate=False)
 
Общая длина кратчайшего пути между Амстердамом и Колчестером составляет 347 км, маршрут проходит через Гаагу, Хук-ван-Холланд,Феликстоу и Ипсвич. Напротив, кратчайший путь с точки зрения количества переходов между городами, который мы разработали с помощью алгоритма поиска в ширину , пролегает через Иммингем, Донкастер и Лондон.

 +----------+--------+------------------------------------------------------------------------+
 |id        |distance|path                                                                    |
 +----------+--------+------------------------------------------------------------------------+
 |Colchester|347.0   |[Amsterdam, Den Haag, Hoek van Holland, Felixstowe, Ipswich, Colchester]|
 +----------+--------+------------------------------------------------------------------------+
 
 
Код примера и csv-файлы можно скачать тут

9. Вычисление веса узлов

Алгоритмы вычисления центральности (centrality algorithm, далее просто алгоритмы центральности) используются для понимания роли отдельных узлов в сети и оценки уровня их влияния на сеть. Они полезны, потому что выявляют наиболее важные узлы и помогают нам понять групповую динамику, такую как достоверность, доступность, скорость обмена информацией, и мосты между группами. Хотя многие из этих алгоритмов были изобретены для анализа социальных сетей, с тех пор они нашли применение в других областях.

Алгоритмы центральности применимы к любым графам, но социальные сети являются наиболее ярким примером динамически меняющегося влияния и потоков информации. Примеры в этой главе рассматривают небольшой граф в стиле Твиттера.

Есть одна большая группа пользователей со связями между ними и меньшая группа, не имеющая связей с большой группой.

Понимание охвата (reach) вершины является справедливой мерой важности. Скольких других вершин эта вершина может коснуться прямо сейчас? Степень (degree) вершины – это общее число прямых входящих и исходящих связей, которыми она обладает. Вы можете интерпретировать степень как меру непосредственной доступности узла. Например, человек с высокой степенью в социальной сети будет иметь много непосредственных контактов и с большей вероятностью окажется и в вашей сети тоже.
Средняя степень (average degree) сети – это просто общее количество связей, деленное на общее количество вершин; это значение может быть сильно искажено наличием вершин с высокой степенью. Распределение степеней (degree distribution) – это вероятность того, что случайно выбранная вершина будет иметь определенное число отношений.
Следующий код грузит грузит данные из csv-файлов. Программа подсчитывает количество всех входящих и исходящих связей вершины и используется для поиска популярных вершин. Код позволяет применить алгоритм степенной центральности к данным из нашего примера социальных связей:

 from graphframes import *
 from pyspark import SparkContext
 from pyspark import SparkConf, SparkContext
 from pyspark.sql import SparkSession
 
 conf = SparkConf().setMaster("local").setAppName("FriendsByAge")
 sc = SparkContext(conf = conf)
 spark = SparkSession.builder \
       .master("local[1]") \
       .appName("SparkByExamples.com") \
       .getOrCreate()
 
 # // tag::load-graph-frame[]
 v = spark.read.csv("social-nodes.csv", header=True)
 e = spark.read.csv("social-relationships.csv", header=True)
 g = GraphFrame(v, e)
 # // end::load-graph-frame[]
 
 # // tag::degree[]
 total_degree = g.degrees
 in_degree = g.inDegrees
 out_degree = g.outDegrees
 
 total_degree.join(in_degree, "id", how="left") \
             .join(out_degree, "id", how="left") \
             .fillna(0) \
             .sort("inDegree", ascending=False) \
             .show()
 
Он выводит следующую таблицу:

 +-------+------+--------+---------+
 |     id|degree|inDegree|outDegree|
 +-------+------+--------+---------+
 |   Doug|     6|       5|        1|
 |  Alice|     7|       3|        4|
 |Bridget|     5|       2|        3|
 |Michael|     5|       2|        3|
 |    Amy|     1|       1|        0|
 |Charles|     2|       1|        1|
 |  David|     2|       1|        1|
 |   Mark|     3|       1|        2|
 |  James|     1|       0|        1|
 +-------+------+--------+---------+
 
Дуг (Doug) – самый популярный пользователь в нашем графе Твиттера с пятью подписчиками (входящие связи). Все остальные пользователи в этой части графа подписаны на него, а он – только на одного человека.
Код примера и csv-файлы можно скачать тут


Оставьте свой комментарий !

Ваше имя:
Комментарий:
Оба поля являются обязательными

 Автор  Комментарий к данной статье