Neo4j 基础知识

什么是图数据库

图数据库是一种以图结构(节点和边)来存储、查询和管理数据的数据库系统。它能够高效地处理复杂的数据关系,特别适合社交网络、推荐系统、欺诈检测等场景。

核心概念

  • 节点(Node):代表实体,如人、公司、产品等
  • 关系(Relationship):代表节点之间的连接
  • 属性(Property):存储在节点和关系上的数据
  • 标签(Label):对节点进行分类的标记

为什么选择 Neo4j

  • 原生图存储,性能卓越
  • 支持 ACID 事务
  • 强大的 Cypher 查询语言
  • 丰富的生态系统和工具支持
  • 企业级安全和扩展性

常见图算法介绍

1. 中心性算法 (Centrality Algorithms)

中心性算法用于识别图中最重要的节点,常用于社交网络分析、关键人物识别等场景。

PageRank

基于网页链接关系的算法,通过迭代计算节点的权重。Google创始人Page和Brin提出,用于网页排名。

  • 应用场景:网页排名、社交网络影响力度量、关键节点识别
  • 原理:一个节点的重要性取决于指向它的其他节点的数量和质量

Degree Centrality

度中心性是最简单的中心性指标,衡量节点的直接连接数量。

  • 应用场景:识别社交网络中的活跃用户、发现高度连接的设备
  • 特点:计算简单,但不能区分节点影响力的质量

Betweenness Centrality

介数中心性衡量节点在最短路径上出现的频率,反映节点的"桥梁"作用。

  • 应用场景:交通网络关键节点识别、信息传播瓶颈发现
  • 特点:能发现连接不同社群的桥接节点

Closeness Centrality

接近中心性衡量节点到其他所有节点的平均距离的倒数。

  • 应用场景:紧急设施选址、病毒传播源识别
  • 特点:值越大表示节点到其他节点的平均距离越短
// PageRank 示例 - 识别最有影响力的节点
CALL gds.pageRank.stream('myGraph')
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name AS name, score
ORDER BY score DESC
LIMIT 10

// Degree Centrality 示例 - 找出连接数最多的节点
CALL gds.degree.stream('myGraph')
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name AS name, score
ORDER BY score DESC

// Betweenness Centrality 示例 - 识别关键桥梁节点
CALL gds.betweenness.stream('myGraph')
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name AS name, score
ORDER BY score DESC
LIMIT 10

2. 社区发现算法 (Community Detection)

社区发现算法用于识别图中紧密连接的节点群组,是图分析的核心任务之一。

Louvain 算法

基于模块度优化的启发式算法,是目前最流行的社区发现算法之一。

  • 应用场景:社交网络分析、生物信息学中的蛋白质网络分析
  • 特点:速度快、可处理大规模图、支持加权图

Label Propagation

标签传播算法是一种简单快速的社区发现方法。

  • 应用场景:大规模网络的快速社区划分
  • 特点:计算复杂度低,结果可能不稳定

Connected Components

连通分量算法找出图中相互连通的节点集合。

  • 应用场景:网络分区分析、孤立网络识别
  • 特点:分为强连通分量和弱连通分量

Triangle Counting

三角形计数算法计算图中三角形的数量及其分布。

  • 应用场景:网络密度分析、社区紧密程度评估
  • 特点:三角形密集的区域通常代表紧密的社区
// Louvain 社区发现
CALL gds.louvain.stream('myGraph')
YIELD nodeId, communityId
RETURN communityId, 
       count(*) AS community_size,
       collect(gds.util.asNode(nodeId).name) AS members
ORDER BY community_size DESC

// 弱连通分量分析
CALL gds.wcc.stream('myGraph')
YIELD nodeId, componentId
RETURN componentId, count(*) AS size
ORDER BY size DESC

// 三角形计数(带过滤)
CALL gds.triangleCount.stream('myGraph')
YIELD nodeId, triangleCount
RETURN gds.util.asNode(nodeId).name AS name, triangleCount
ORDER BY triangleCount DESC
LIMIT 10

3. 路径查找算法 (Path Finding)

路径查找算法用于计算图中节点之间的最短路径或最优路径。

Shortest Path

最短路径算法计算两个节点之间的最短路径,支持加权图。

  • Dijkstra算法:适合非负权重的最短路径
  • A*算法:启发式搜索,效率更高
  • 应用:导航系统、网络路由、资源分配

All Shortest Paths

计算所有节点对之间的最短路径。

  • 应用:网络可靠性分析、系统容错设计
  • 特点:计算复杂度较高,适合中小规模图

Minimum Spanning Tree

最小生成树算法连接所有节点且总权重最小。

  • 应用:网络设计、聚类分析、电路设计
  • 常见算法:Prim算法、Kruskal算法

Random Walk

随机游走算法模拟随机漫步过程,用于采样和嵌入。

  • 应用:节点嵌入采样、推荐系统、社区检测
  • 特点:为Node2Vec等嵌入方法提供基础
// 最短路径 (Dijkstra)
MATCH (start:Person {name: '张三'}), (end:Person {name: '李四'})
CALL gds.shortestPath.dijkstra.stream('myGraph', {
    sourceNode: start,
    targetNode: end,
    relationshipWeightProperty: 'distance'
})
YIELD nodeIds, path
RETURN [nodeId IN nodeIds | gds.util.asNode(nodeId).name] AS path

// A* 算法 (带启发式)
MATCH (start:Location {name: 'A'}), (end:Location {name: 'Z'})
CALL gds.shortestPath.astar.stream('myGraph', {
    sourceNode: start,
    targetNode: end,
    relationshipWeightProperty: 'distance',
    heuristicProperty: 'heuristic'
})
YIELD nodeIds, costs
RETURN [nodeId IN nodeIds | gds.util.asNode(nodeId).name] AS path

// 所有节点对最短路径
CALL gds.allShortestPaths.stream('myGraph', {
    relationshipWeightProperty: 'distance'
})
YIELD sourceNodeName, targetNodeName, distance
WHERE sourceNodeName < targetNodeName
RETURN sourceNodeName, targetNodeName, distance
ORDER BY distance DESC
LIMIT 10

4. 链接预测算法 (Link Prediction)

链接预测算法用于预测图中未来可能形成的关系,广泛应用于推荐系统和欺诈检测。

Common Neighbors

共同邻居算法:两个节点的共同邻居越多,它们越可能相连。

  • 公式:|N(u) ∩ N(v)|
  • 应用:好友推荐、商品推荐

Adamic-Adar Index

AA指数:考虑邻居的稀有程度,稀有邻居贡献更大。

  • 公式:Σ z∈N(u)∩N(v) 1/log(deg(z))
  • 应用:社交网络链接预测

Resource Allocation

资源分配指数:模拟资源从节点u通过共同邻居传递到节点v的过程。

  • 公式:Σ z∈N(u)∩N(v) 1/deg(z)
  • 应用:复杂网络链接预测

Preferential Attachment

优先连接:节点的度越高,越容易获得新连接。

  • 公式:deg(u) × deg(v)
  • 应用:新节点接入预测、网络演化
// Common Neighbors 链接预测
MATCH (p1:Person {name: '张三'})
CALL gds.linkPrediction.commonNeighbors.stream('myGraph', {
    sourceNode: p1
})
YIELD node2, score
RETURN node2.name AS candidate, score
ORDER BY score DESC
LIMIT 10

// Adamic-Adar 链接预测
MATCH (p1:Person {name: '张三'})
CALL gds.linkPrediction.adamicAdar.stream('myGraph', {
    sourceNode: p1
})
YIELD node2, score
RETURN node2.name AS candidate, score
ORDER BY score DESC
LIMIT 10

// 资源分配指数
MATCH (p1:Person {name: '张三'})
CALL gds.linkPrediction.resourceAllocation.stream('myGraph', {
    sourceNode: p1
})
YIELD node2, score
RETURN node2.name AS candidate, score
ORDER BY score DESC
LIMIT 10

// 优先连接
MATCH (p1:Person {name: '张三'})
CALL gds.linkPrediction.preferentialAttachment.stream('myGraph', {
    sourceNode: p1
})
YIELD node2, score
RETURN node2.name AS candidate, score
ORDER BY score DESC
LIMIT 10

5. 节点嵌入算法 (Node Embedding)

节点嵌入算法将图中的节点映射到低维向量空间,便于机器学习模型处理。

Node2Vec

结合深度优先和广度优先游走的节点嵌入方法。

  • 参数:p (return), q (in-out)
  • 应用:节点分类、链接预测、聚类

FastRP (Fast Random Projection)

基于随机投影的快速节点嵌入算法。

  • 特点:速度快、可扩展性好
  • 应用:大规模图嵌入、特征工程

GraphSAGE

基于图神经网络的归纳式节点嵌入方法。

  • 特点:支持未见过的节点泛化
  • 应用:动态图、归纳学习

HashGNN

基于哈希的图神经网络嵌入方法。

  • 特点:高效、可解释性强
  • 应用:快速嵌入、大规模图
// Node2Vec 嵌入
CALL gds.node2vec.stream('myGraph', {
    embeddingDimension: 128,
    walkLength: 80,
    walksPerNode: 10,
    returnFactor: 1.0,
    inOutFactor: 1.0
})
YIELD nodeId, embedding
RETURN gds.util.asNode(nodeId).name AS name, embedding[0..5] AS preview

// FastRP 嵌入
CALL gds.fastRP.write('myGraph', {
    embeddingDimension: 256,
    relationshipWeightProperty: 'weight',
    writeProperty: 'embedding'
})

// GraphSAGE 训练
CALL gds.graphSage.train('myGraph', {
    modelName: 'myModel',
    featureProperties: ['age', 'income'],
    embeddingDimension: 64,
    aggregator: 'mean',
    epochs: 10
})

// 使用训练好的模型预测
CALL gds.graphSage.predict.stream('myGraph', {
    modelName: 'myModel'
})
YIELD nodeId, predictedLabel
RETURN gds.util.asNode(nodeId).name AS name, predictedLabel

6. 相似度算法 (Similarity Algorithms)

相似度算法用于衡量节点或图之间的相似程度。

Jaccard Similarity

Jaccard相似度:两个集合的交集与并集的比值。

  • 公式:|A ∩ B| / |A ∪ B|
  • 应用:用户行为相似度、文本相似度

Cosine Similarity

余弦相似度:向量夹角的余弦值。

  • 公式:A·B / (|A|×|B|)
  • 应用:基于嵌入向量的相似度计算

Euclidean Distance

欧几里得距离:两点之间的直线距离。

  • 公式:√Σ(Ai-Bi)²
  • 应用:聚类分析、异常检测

Pearson Correlation

皮尔逊相关系数:衡量两个向量的线性相关程度。

  • 范围:-1到1
  • 应用:特征相似度、行为分析
// Jaccard 相似度
MATCH (p1:Person {name: '张三'})
CALL gds.nodeSimilarity.jaccard.stream('myGraph', {
    sourceNode: p1
})
YIELD node2, similarity
RETURN node2.name AS similar_person, similarity
ORDER BY similarity DESC
LIMIT 10

// Cosine 相似度 (基于嵌入向量)
MATCH (p1:Person {name: '张三'}), (p2:Person)
WHERE p1 <> p2
WITH p1, p2, gds.similarity.cosine(p1.embedding, p2.embedding) AS similarity
RETURN p2.name, similarity
ORDER BY similarity DESC
LIMIT 10

// Euclidean 距离
MATCH (p1:Person {name: '张三'}), (p2:Person)
WHERE p1 <> p2
WITH p1, p2, gds.similarity.euclideanDistance(p1.embedding, p2.embedding) AS distance
RETURN p2.name, distance
ORDER BY distance ASC
LIMIT 10

// 余弦相似度 (节点属性)
MATCH (p1:Person {name: '张三'}), (p2:Person)
WHERE p1 <> p2
WITH p1, p2, gds.similarity.cosine([p1.age, p1.income], [p2.age, p2.income]) AS similarity
RETURN p2.name, similarity
ORDER BY similarity DESC
LIMIT 10

7. 节点分类算法 (Node Classification)

节点分类算法用于预测节点的标签或类别。

GraphSAGE Classification

基于GraphSAGE的归纳式节点分类。

  • 输入:节点特征、图结构
  • 输出:节点类别标签

Node2Vec + 传统ML

先用Node2Vec生成嵌入,再用传统机器学习分类。

  • 优点:灵活、可解释性强
  • 缺点:需要标注数据

半监督学习

利用少量标注数据和大量未标注数据进行学习。

  • 应用:欺诈检测、异常识别
  • 优点:减少标注成本

多标签分类

一个节点可以同时属于多个类别。

  • 应用:用户兴趣分类、文档标注
  • 特点:更贴近真实场景
// GraphSAGE 节点分类训练
CALL gds.graphSage.train('myGraph', {
    modelName: 'nodeClassifier',
    featureProperties: ['age', 'income', 'activityScore'],
    labelProperty: 'category',
    embeddingDimension: 128,
    aggregator: 'mean',
    epochs: 20,
    batchSize: 64,
    learningRate: 0.01
})

// 使用模型进行预测
CALL gds.graphSage.predict.stream('myGraph', {
    modelName: 'nodeClassifier'
})
YIELD nodeId, predictedCategory, probability
RETURN gds.util.asNode(nodeId).name AS name, 
       predictedCategory AS prediction,
       probability
ORDER BY probability DESC

// 使用投影图进行分类(避免污染原图)
CALL gds.graph.project(
    'projectionGraph',
    ['Person'],
    ['FRIEND_OF', 'WORKS_WITH'],
    {
        nodeProperties: ['age', 'income', 'activityScore'],
        relationshipProperties: {weight: 'weight'}
    }
)

CALL gds.graphSage.train('projectionGraph', {
    modelName: 'classifierOnProjection',
    featureProperties: ['age', 'income', 'activityScore'],
    labelProperty: 'category',
    epochs: 10
})

CALL gds.graph.drop('projectionGraph')

8. 图神经网络 (Graph Neural Networks)

图神经网络是深度学习在图结构数据上的扩展,能够学习复杂的图模式。

GCN (Graph Convolutional Network)

图卷积网络,通过卷积操作聚合邻居信息。

  • 层数:通常2-3层
  • 应用:节点分类、图分类

GAT (Graph Attention Network)

图注意力网络,使用注意力机制聚合邻居信息。

  • 特点:可学习邻居权重
  • 应用:推荐系统、分子性质预测

GraphSAGE

归纳式图神经网络,支持未见节点的泛化。

  • 特点:可扩展、支持采样
  • 应用:动态图、大规模图

链接预测 GNN

专门用于链接预测的图神经网络模型。

  • 应用:社交网络好友推荐、知识图谱补全
  • 特点:端到端学习
// GCN 用于节点分类
CALL gds.alpha.gcn.train('myGraph', {
    modelName: 'gcnClassifier',
    featureProperties: ['feature1', 'feature2', 'feature3'],
    labelProperty: 'label',
    hiddenDim: 64,
    epochs: 100,
    learningRate: 0.01
})

// GAT 节点分类
CALL gds.alpha.gat.train('myGraph', {
    modelName: 'gatClassifier',
    featureProperties: ['feature1', 'feature2'],
    labelProperty: 'label',
    hiddenDim: 64,
    heads: 8,
    epochs: 100
})

// GNN 链接预测
CALL gds.alpha.ml.linkPrediction.train('myGraph', {
    modelName: 'gnnLinkPredictor',
    featureProperties: ['embedding'],
    hiddenDim: 32,
    epochs: 50
})

// 使用 GNN 模型预测新链接
CALL gds.alpha.ml.linkPrediction.predict('myGraph', {
    modelName: 'gnnLinkPredictor',
    topK: 10
})
YIELD node1, node2, probability
RETURN node1.name, node2.name, probability
ORDER BY probability DESC

9. 实体解析算法 (Entity Resolution)

实体解析用于识别不同数据源中代表同一实体的节点。

相似性连接

基于属性相似度识别可能匹配的实体对。

  • 应用:数据去重、记录链接
  • 方法:Jaro-Winkler、编辑距离

基于图的匹配

利用节点之间的关系进行实体匹配。

  • 应用:跨数据库实体对齐
  • 优点:考虑上下文信息

传递闭包

通过传递性推断实体的等价关系。

  • 应用:实体合并、社区发现
  • 注意:避免错误传递

冲突检测

检测合并后数据的属性冲突。

  • 应用:数据清洗、质量检查
  • 方法:投票、最值选择
// 相似性连接示例
MATCH (p1:Person), (p2:Person)
WHERE p1 <> p2 
  AND p1.lastName = p2.lastName
  AND size(p1.firstName) > 3
  AND size(p2.firstName) > 3
WITH p1, p2,
     gds.similarity.jaccard(p1.alias, p2.alias) AS nameSimilarity
WHERE nameSimilarity > 0.7
RETURN p1.name AS entity1, p2.name AS entity2, nameSimilarity
ORDER BY nameSimilarity DESC

// 基于图的实体解析
MATCH (p1:Person {id: '12345'})
MATCH (p2:Person {id: '67890'})
// 检查是否有共同的朋友
WITH p1, p2,
     [(p1)-[:FRIEND_OF]->(f)-[:FRIEND_OF]->(p2) | f] AS mutualFriends
WHERE size(mutualFriends) > 2
RETURN p1, p2, mutualFriends

// 实体合并(创建等价关系)
MATCH (p1:Person {source: 'systemA'}), (p2:Person {source: 'systemB'})
WHERE p1.name = p2.name AND p1.birthDate = p2.birthDate
MERGE (p1)-[:SAME_AS]->(p2)

// 查询合并后的实体
MATCH (p:Person)-[:SAME_AS]->(other:Person)
RETURN collect(DISTINCT p.id) AS merged_ids, p.name AS name

10. 时序图算法 (Temporal Graph Algorithms)

时序图算法专门处理带有时间信息的图结构。

时序最短路径

考虑时间约束的最短路径计算。

  • 应用:航班预订、时间敏感路由
  • 约束:必须按时间顺序到达

时序社区发现

追踪社区随时间的演变。

  • 应用:社交动态分析、趋势发现
  • 方法:快照比较、演化追踪

时序链接预测

预测未来可能出现的关系。

  • 应用:用户行为预测、趋势分析
  • 特点:考虑时间衰减

影响传播

追踪信息或影响在网络中的传播。

  • 应用:病毒传播建模、营销效果分析
  • 方法:SIR模型、信息级联
// 时序最短路径(考虑时间窗口)
MATCH (start:Event {type: 'start'}),
      (end:Event {type: 'end'})
CALL gds.shortestPath.dijkstra.stream('temporalGraph', {
    sourceNode: start,
    targetNode: end,
    relationshipWeightProperty: 'duration',
    startTimeProperty: 'startTime',
    endTimeProperty: 'endTime'
})
YIELD nodeIds, path, cost
RETURN [nodeId IN nodeIds | gds.util.asNode(nodeId).eventTime] AS timeline

// 时序社区发现 - 追踪社区演变
MATCH (n:Person)
WITH gds.util.asNode(n).community AS community, n.eventTime AS time
RETURN community, 
       collect(DISTINCT time) AS evolution,
       count(*) AS size
ORDER BY size DESC

// 基于时间的链接预测(越近的关系权重越高)
MATCH (p1:Person {name: '张三'})-[r:FRIENDS_WITH]->(p2:Person)
WITH p1, p2, 
     r.since AS established,
     datetime().epochSeconds - established.epochSeconds AS ageInSeconds,
     1.0 / (1 / 86400) AS recencyWeight  + ageInSeconds // 每天衰减
RETURN p2.name AS friend, recencyWeight
ORDER BY recencyWeight DESC
LIMIT 10

// 时序PageRank(考虑时间衰减)
MATCH (n)
WITH gds.pageRank.stream('temporalGraph', {
    nodeWeightProperty: 'importance',
    temporalProperty: 'timestamp',
    decayFactor: 0.95
}) YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name AS name, score
ORDER BY score DESC

算法选择指南

应用场景 推荐算法 GDS 函数 适用条件 计算复杂度
识别关键人物/节点 PageRank, Betweenness, Degree gds.pageRank, gds.betweenness, gds.degree 有向/无向图,加权/无权 中等 / 较高
用户分群/社区发现 Louvain, Label Propagation, WCC gds.louvain, gds.labelPropagation, gds.wcc 大规模图,支持加权 低 / 中等
好友/商品推荐 Link Prediction, Node2Vec gds.linkPrediction.*, gds.node2vec 需要训练/迭代 中等
路径规划/导航 Dijkstra, A*, Shortest Path gds.shortestPath.dijkstra, gds.shortestPath.astar 非负权重,需要启发式(A*) 中等
相似用户/物品 Jaccard, Cosine, Node Similarity gds.nodeSimilarity.*, gds.similarity.* 基于属性/嵌入向量 中等
特征学习/嵌入 FastRP, GraphSAGE, Node2Vec gds.fastRP, gds.graphSage, gds.node2vec 需要节点特征 低 / 中等
节点分类/预测 GNN, GraphSAGE Classification gds.graphSage.train, gds.alpha.gcn.* 需要标注数据 较高
欺诈检测 Louvain + Anomaly Detection gds.louvain + gds.beta.anomaly 需要正常行为基准 中等
网络密度分析 Triangle Counting, Clustering Coefficient gds.triangleCount, gds.localClusteringCoefficient 无向图 中等
影响力度量 PageRank, Eigenvector Centrality gds.pageRank, gds.eigenvector 适合强连接图 中等
信息传播源 Closeness Centrality, PageRank gds.closeness, gds.pageRank 连通图 中等
网络瓶颈识别 Betweenness Centrality gds.betweenness 关注路径流量 较高

提示:根据数据规模和需求选择合适的算法。FastRP 适合大规模快速嵌入,Louvain 适合社区发现,PageRank 适合影响力分析。

Cypher 查询语言

基础语法

// 创建节点
CREATE (p:Person {name: '张三', age: 30})

// 创建关系
MATCH (p:Person {name: '张三'}), (c:Company {name: 'ABC公司'})
CREATE (p)-[:WORKS_FOR {since: 2020}]->(c)

// 查询节点
MATCH (p:Person)
WHERE p.age > 25
RETURN p.name, p.age

// 查询关系
MATCH (p:Person)-[r:WORKS_FOR]->(c:Company)
RETURN p.name, c.name, r.since

高级查询

// 多跳查询
MATCH (p:Person)-[:FRIEND_OF]->(friend)-[:WORKS_FOR]->(c:Company)
RETURN p.name, friend.name, c.name

// 可变长度路径
MATCH path = (p:Person)-[:KNOWS*1..3]->(other)
RETURN p.name, other.name, length(path) as degrees

// 聚合查询
MATCH (c:Company)<-[:WORKS_FOR]-(p:Person)
RETURN c.name, count(p) as employee_count
ORDER BY employee_count DESC

// 推荐查询(共同好友)
MATCH (person:Person {name: '张三'})-[:FRIEND_OF]->(friend)-[:FRIEND_OF]->(potential)
WHERE NOT (person)-[:FRIEND_OF]->(potential) AND person <> potential
RETURN potential.name, count(friend) as mutual_friends
ORDER BY mutual_friends DESC

模式匹配

// 可选关系
MATCH (p:Person)-[r:WORKS_FOR?]->(c:Company)
RETURN p.name, r.since, c.name

// 可变长度路径(最短路径)
MATCH path = shortestPath((p1:Person {name: '张三'})-[*]-(p2:Person {name: '李四'}))
RETURN path

// 所有简单路径
MATCH path = (p1:Person)-[:KNOWS*]->(p2:Person)
WHERE p1.name = '张三' AND p2.name = '李四'
RETURN path

// 有向路径查询
MATCH (p1:Person)-[:MANAGES*1..3]->(p2:Person)
RETURN p1.name as manager, p2.name as subordinate

聚合与分组

// 分组统计
MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
RETURN c.name, count(p) as employee_count, 
       avg(p.age) as avg_age, 
       max(p.salary) as max_salary
ORDER BY employee_count DESC

// 多级分组
MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
RETURN c.industry, c.name, count(p) as count
ORDER BY c.industry, count DESC

// 列表聚合
MATCH (p:Person)-[:HAS_SKILL]->(s:Skill)
RETURN p.name, collect(s.name) as skills

// 去重聚合
MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
RETURN c.name, count(DISTINCT p.department) as departments

常用函数

// 字符串函数
MATCH (p:Person)
RETURN p.name, 
       toUpper(p.name) as upper_name,
       substring(p.name, 0, 3) as first_three,
       size(p.name) as name_length

// 数学函数
MATCH (p:Person)
RETURN p.name, p.age, 
       abs(p.age - 30) as age_diff,
       round(p.age * 1.5) as rounded,
       p.age % 10 as remainder

// 日期时间函数
MATCH (p:Person)-[r:WORKS_FOR]->(c:Company)
RETURN p.name, 
       date().year - p.birthYear as current_age,
       duration.between(r.since, date()).years as years_worked,
       datetime().year as current_year

// 路径函数
MATCH path = (p:Person)-[:KNOWS*]->(other)
RETURN p.name, other.name, 
       length(path) as hops,
       nodes(path) as path_nodes,
       relationships(path) as path_rels

子查询

// CALL 子查询(2026新增)
MATCH (p:Person)
CALL {
    WITH p
    MATCH (p)-[:WORKS_FOR]->(c:Company)
    RETURN count(c) as company_count
}
RETURN p.name, company_count

// EXISTS 子查询
MATCH (p:Person)
WHERE EXISTS {
    MATCH (p)-[:WORKS_FOR]->(:Company {name: 'ABC公司'})
}
RETURN p.name

// 列表投影
MATCH (p:Person)
RETURN p.name, [skill IN p.skills WHERE skill.level > 3 | skill.name] as advanced_skills

数据修改

// 创建节点和关系
CREATE (p:Person {name: '王五', age: 35})
CREATE (c:Company {name: 'XYZ公司', founded: 2015})
CREATE (p)-[:WORKS_FOR {since: 2021, position: '经理'}]->(c)

// 更新节点属性
MATCH (p:Person {name: '张三'})
SET p.age = 31, p.salary = 50000

// 添加/删除标签
MATCH (p:Person {name: '张三'})
SET p:Manager
MATCH (p:Person {name: '张三'})
REMOVE p:Manager

// 删除节点和关系
MATCH (p:Person {name: '王五'})-[r:WORKS_FOR]->()
DELETE r, p

// 批量更新
MATCH (p:Person)
WHERE p.age > 30
SET p.salary = p.salary * 1.1

性能优化技巧

// 使用索引加速查询
CREATE INDEX person_name_index FOR (p:Person) ON (p.name)
CREATE INDEX company_industry_index FOR (c:Company) ON (c.industry)

// 使用约束保证数据唯一性
CREATE CONSTRAINT person_id_unique FOR (p:Person) REQUIRE p.id IS UNIQUE

// 优化:先使用索引过滤
MATCH (p:Person {name: '张三'})-[r:WORKS_FOR]->(c:Company)
WHERE c.industry = '技术'
RETURN p, c

// 使用 LIMIT 限制结果集
MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
RETURN p.name, c.name
LIMIT 100

// 使用 PROFILE 分析查询性能
PROFILE MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
RETURN p.name, c.name

// 避免 Cartesian product
// 不好的写法:
MATCH (p:Person), (c:Company)
RETURN p, c

// 好的写法:
MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
RETURN p, c

图算法(Graph Data Science)

// PageRank 算法 - 识别重要节点
CALL gds.pageRank.stream('myGraph')
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name as name, score
ORDER BY score DESC
LIMIT 10

// 社区发现 - Louvain 算法
CALL gds.louvain.stream('myGraph')
YIELD nodeId, communityId
RETURN communityId, count(*) as community_size
ORDER BY community_size DESC

// 最短路径算法
MATCH (source:Person {name: '张三'}), (target:Person {name: '李四'})
CALL gds.shortestPath.stream('myGraph', {
    sourceNode: source,
    targetNode: target
})
YIELD index, nodeIds
RETURN [nodeId IN nodeIds | gds.util.asNode(nodeId).name] as path

// 连通分量
CALL gds.wcc.stream('myGraph')
YIELD nodeId, componentId
RETURN componentId, count(*) as size
ORDER BY size DESC

实用查询示例

// 查找没有好友的人

                    MATCH (p:Person)

                    WHERE NOT (p)-[:FRIEND_OF]->()

                    RETURN p.name

                    

                    // 查找拥有最多技能的人

                    MATCH (p:Person)-[r:HAS_SKILL]->(s:Skill)

                    WITH p, count(s) as skill_count

                    ORDER BY skill_count DESC

                    RETURN p.name, skill_count

                    LIMIT 5

                    

                    // 查找共同技能

                    MATCH (p1:Person {name: '张三'})-[:HAS_SKILL]->(s:Skill)<-[:HAS_SKILL]-(p2:Person)

                    RETURN p2.name, collect(s.name) as common_skills

                    ORDER BY size(common_skills) DESC

                    

                    // 查找技能路径

                    MATCH (p1:Person {name: '张三'})-[:HAS_SKILL*]->(s:Skill)<-[:HAS_SKILL*]-(p2:Person {name: '李四'})

                    RETURN p1.name, p2.name, collect(DISTINCT s.name) as connecting_skills

                    

                    // 时间窗口查询

                    MATCH (p:Person)-[r:WORKS_FOR]->(c:Company)

                    WHERE r.since >= date('2020-01-01') AND r.since <= date('2025-12-31')

                    RETURN p.name, c.name, r.since

                    ORDER BY r.since DESC

图模式约束与验证

// 创建唯一性约束

                    CREATE CONSTRAINT FOR (p:Person) REQUIRE p.id IS UNIQUE

                    CREATE CONSTRAINT FOR (c:Company) REQUIRE c.registrationNumber IS UNIQUE

                    

                    // 创建节点存在性约束(必须存在属性)

                    CREATE CONSTRAINT FOR (p:Person) REQUIRE p.name IS NOT NULL

                    CREATE CONSTRAINT FOR (p:Person) REQUIRE p.email IS NOT NULL

                    

                    // 创建节点键约束(组合属性唯一)

                    CREATE CONSTRAINT FOR (p:Person) REQUIRE (p.firstName, p.lastName) IS NODE KEY

                    

                    // 创建关系存在性约束

                    CREATE CONSTRAINT FOR ()-[r:WORKS_FOR]->() REQUIRE r.since IS NOT NULL

                    

                    // 查看所有约束

                    SHOW CONSTRAINTS

                    

                    // 删除约束

                    DROP CONSTRAINT person_id_unique

存储过程与用户自定义函数

// 调用系统存储过程

                    CALL db.labels() YIELD label

                    RETURN label

                    

                    CALL db.relationshipTypes() YIELD relationshipType

                    RETURN relationshipType

                    

                    CALL db.indexes() YIELD name, state, type

                    RETURN name, state, type

                    

                    // 获取图统计信息

                    CALL apoc.meta.graph() YIELD nodes, relationships

                    RETURN nodes, relationships

                    

                    // 使用 APOC 库进行批量操作

                    CALL apoc.periodic.iterate(

                      'MATCH (p:Person) RETURN p',

                      'SET p.lastUpdated = datetime()',

                      {batchSize:1000}

                    )

                    

                    // 创建用户自定义函数(Java插件)

                    // 示例:自定义函数调用

                    RETURN custom.package.calculateScore(p.age, p.experience) as score

图投影与图算法优化

// 创建命名图投影(内存图)

                    CALL gds.graph.project(

                      'socialGraph',

                      ['Person', 'Company'],

                      {

                        FRIEND_OF: {orientation: 'UNDIRECTED'},

                        WORKS_FOR: {orientation: 'NATURAL'}

                      },

                      {

                        nodeProperties: ['age', 'salary'],

                        relationshipProperties: {weight: 'strength'}

                      }

                    )

                    

                    // 使用投影图进行算法计算

                    CALL gds.pageRank.stream('socialGraph')

                    YIELD nodeId, score

                    RETURN gds.util.asNode(nodeId).name, score

                    ORDER BY score DESC

                    

                    // 创建筛选投影图

                    CALL gds.graph.project(

                      'activeUsersGraph',

                      {

                        Person: {

                          label: 'Person',

                          properties: ['activityScore'],

                          constraint: 'activityScore > 50'

                        }

                      },

                      '*'

                    )

                    

                    // 删除投影图释放内存

                    CALL gds.graph.drop('socialGraph')

高级模式匹配技巧

// 路径查询与过滤

                    MATCH path = (start:Person)-[:KNOWS*1..5]->(end:Person)

                    WHERE start.name = '张三' AND end.name = '李四'

                      AND ALL(r IN relationships(path) WHERE r.strength > 0.5)

                    RETURN path, length(path) as hops

                    

                    // 查找所有最短路径

                    MATCH (start:Person {name: '张三'}), (end:Person {name: '李四'})

                    CALL apoc.algo.allShortestPaths(start, end, 'KNOWS', 5)

                    YIELD path

                    RETURN path, length(path)

                    

                    // 模式组合查询(OR逻辑)

                    MATCH (p:Person)

                    WHERE (p)-[:WORKS_FOR]->(:Company {industry: '技术'})

                       OR (p)-[:HAS_SKILL]->(:Skill {name: 'Neo4j'})

                    RETURN p.name

                    

                    // 模式排除(NOT逻辑)

                    MATCH (p:Person)

                    WHERE NOT (p)-[:WORKS_FOR]->(:Company {industry: '金融'})

                      AND NOT (p.age < 25)

                    RETURN p.name

数据导入与ETL操作

// 从CSV文件导入数据

                    LOAD CSV WITH HEADERS FROM 'file:///people.csv' AS row

                    CALL {

                      WITH row

                      MERGE (p:Person {id: row.id})

                      SET p.name = row.name,

                          p.age = toInteger(row.age),

                          p.email = row.email

                    } IN TRANSACTIONS OF 1000 ROWS

                    

                    // 导入关系数据

                    LOAD CSV WITH HEADERS FROM 'file:///relationships.csv' AS row

                    MATCH (p1:Person {id: row.person1_id})

                    MATCH (p2:Person {id: row.person2_id})

                    MERGE (p1)-[r:FRIEND_OF]->(p2)

                    SET r.since = date(row.since_date),

                        r.strength = toFloat(row.strength)

                    

                    // 使用LOAD JSON导入数据

                    CALL apoc.load.json('file:///data.json') YIELD value

                    MERGE (p:Person {id: value.id})

                    SET p.name = value.name,

                        p.attributes = value.properties

图遍历与递归查询

// 递归查询组织架构

                    MATCH (manager:Person {name: 'CEO'})-[:MANAGES*0..3]->(subordinate)

                    RETURN manager.name as manager,

                           subordinate.name as subordinate,

                           length(shortestPath((manager)-[:MANAGES*]->(subordinate))) as level

                    ORDER BY level, subordinate.name

                    

                    // 查找循环引用

                    MATCH (a)-[:MANAGES*]->(a)

                    RETURN a.name as has_circular_reference

                    

                    // 广度优先遍历(BFS)

                    MATCH path = (start:Person {name: '张三'})-[:KNOWS*1..3]->(other)

                    WITH other, length(path) as distance

                    ORDER BY distance

                    RETURN other.name, distance

                    LIMIT 10

                    

                    // 深度优先遍历(DFS)

                    MATCH path = (start:Person {name: '张三'})-[:KNOWS*]->(other)

                    WHERE all(node in nodes(path)[1..-1] WHERE node <> start)

                    RETURN path, length(path)

时序数据处理

// 时间序列聚合(按月统计)

                    MATCH (e:Event)

                    WITH e.date.year as year, e.date.month as month, count(e) as event_count

                    RETURN year, month, event_count

                    ORDER BY year DESC, month DESC

                    

                    // 滑动窗口计算

                    MATCH (t:Transaction)

                    WITH t ORDER BY t.timestamp

                    WITH collect(t.amount) as amounts

                    WITH [i IN range(0, size(amounts)-4) | 

                          reduce(avg = 0.0, val IN amounts[i..i+4] | avg + val) / 5.0] as moving_avg

                    RETURN moving_avg

                    

                    // 时间差计算

                    MATCH (p:Person)-[r:WORKS_FOR]->(c:Company)

                    RETURN p.name, c.name,

                           duration.between(r.startDate, r.endDate).days as days_worked,

                           date().year - r.startDate.year as years_worked

图数据库最佳实践

// 1. 使用MERGE而非CREATE+MATCH(避免重复)

                    // 不推荐:

                    MATCH (p:Person {name: '张三'})

                    CREATE (p)-[:WORKS_FOR]->(:Company {name: 'ABC'})

                    

                    // 推荐:

                    MERGE (p:Person {name: '张三'})

                    MERGE (c:Company {name: 'ABC'})

                    MERGE (p)-[:WORKS_FOR]->(c)

                    

                    // 2. 使用参数化查询防止注入

                    // 不推荐:

                    MATCH (p:Person) WHERE p.name = '张三'

                    

                    // 推荐(在驱动中使用参数):

                    MATCH (p:Person) WHERE p.name = $name

                    

                    // 3. 批量操作使用UNWIND提高性能

                    // 不推荐:

                    MATCH (p:Person {name: '张三'}) CREATE (p)-[:KNOWS]->(:Person {name: '李四'})

                    MATCH (p:Person {name: '张三'}) CREATE (p)-[:KNOWS]->(:Person {name: '王五'})

                    

                    // 推荐:

                    UNWIND ['李四', '王五', '赵六'] AS friend_name

                    MATCH (p:Person {name: '张三'})

                    MERGE (other:Person {name: friend_name})

                    MERGE (p)-[:KNOWS]->(other)

                    

                    // 4. 使用WITH进行中间结果处理

                    MATCH (p:Person)-[:WORKS_FOR]->(c:Company)

                    WITH c, count(p) as emp_count

                    WHERE emp_count > 100

                    RETURN c.name, emp_count

数据建模最佳实践

建模原则

1. 关注关系而非表结构

图数据库建模应该从关系的角度思考,而不是传统的关系型数据库表结构。让数据关系自然呈现。

2. 使用标签进行分类

合理使用标签来对节点进行分类,便于查询和索引。一个节点可以有多个标签。

3. 关系命名要语义化

关系名称应该清楚地表达两个节点之间的语义,如 WORKS_FOR、FRIENDS_WITH、LOCATED_IN 等。

4. 避免过度规范化

图数据库允许一定的数据冗余,为了提高查询性能,可以在节点上存储必要的属性。

社交网络建模示例

// 创建用户
CREATE (u1:User {id: 'u1', name: 'Alice', age: 28})
CREATE (u2:User {id: 'u2', name: 'Bob', age: 32})
CREATE (u3:User {id: 'u3', name: 'Charlie', age: 25})

// 创建帖子
CREATE (p1:Post {id: 'p1', content: 'Hello Neo4j!', created: datetime()})

// 创建关系
MATCH (u1:User {id: 'u1'}), (p1:Post {id: 'p1'})
CREATE (u1)-[:POSTED]->(p1)

MATCH (u1:User {id: 'u1'}), (u2:User {id: 'u2'})
CREATE (u1)-[:FOLLOWS]->(u2)
CREATE (u2)-[:FRIENDS_WITH]->(u1)

AI/ML 集成详解

Neo4j Graph Data Science (GDS) 概述

Neo4j Graph Data Science 是一个强大的图算法和机器学习库,提供了65+种图算法,支持节点嵌入、链接预测、图神经网络等高级功能。

  • 图算法库:中心性算法、社区发现算法、路径查找算法
  • 机器学习管道:特征工程、模型训练、预测和评估
  • 图神经网络:图卷积网络、图注意力网络
  • 节点嵌入:Node2Vec、FastRP、GraphSAGE

1. 中心性算法 - 识别关键节点

// PageRank - 识别网络中最有影响力的节点
CALL gds.pageRank.stream('myGraph')
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name as name, score
ORDER BY score DESC
LIMIT 10

// Betweenness Centrality - 识别网络中的桥梁节点
CALL gds.betweenness.stream('myGraph')
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name as name, score
ORDER BY score DESC
LIMIT 10

// Degree Centrality - 识别连接最多的节点
CALL gds.degree.stream('myGraph')
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name as name, score
ORDER BY score DESC
LIMIT 10

2. 社区发现算法 - 识别社群结构

// Louvain 算法 - 发现网络中的社区
CALL gds.louvain.stream('myGraph')
YIELD nodeId, communityId
RETURN communityId, count(*) as community_size
ORDER BY community_size DESC

// Label Propagation - 快速社区发现
CALL gds.labelPropagation.stream('myGraph')
YIELD nodeId, communityId
RETURN communityId, collect(gds.util.asNode(nodeId).name) as members
ORDER BY size(members) DESC
LIMIT 5

// Weakly Connected Components - 识别连通分量
CALL gds.wcc.stream('myGraph')
YIELD nodeId, componentId
RETURN componentId, count(*) as size
ORDER BY size DESC

3. 节点嵌入 - 将节点转换为向量表示

// FastRP - 快速随机投影嵌入
CALL gds.fastRP.write('myGraph', {
    embeddingDimension: 256,
    relationshipWeightProperty: 'weight',
    writeProperty: 'embedding'
})

// Node2Vec - 基于随机游走的节点嵌入
CALL gds.node2vec.stream('myGraph', {
    embeddingDimension: 128,
    walkLength: 80,
    walksPerNode: 10,
    returnFactor: 1.0,
    inOutFactor: 1.0
})
YIELD nodeId, embedding
RETURN gds.util.asNode(nodeId).name as name, embedding[0..5] as preview

// GraphSAGE - 图神经网络嵌入
CALL gds.graphSage.train('myGraph', {
    modelName: 'myModel',
    featureProperties: ['age', 'salary'],
    embeddingDimension: 64,
    aggregator: 'mean',
    epochs: 5
})

4. 链接预测 - 预测未来可能的关系

// 使用 Adamic-Adar 系数预测链接
MATCH (p1:Person)
CALL gds.linkPrediction.adamicAdar.stream('myGraph', {
    sourceNode: p1,
    targetCandidates: [p2, p3, p4]
})
YIELD node1, node2, score
RETURN node1.name as person1, node2.name as person2, score
ORDER BY score DESC
LIMIT 10

// 使用 Common Neighbors 预测链接
MATCH (p1:Person {name: '张三'})
CALL gds.linkPrediction.commonNeighbors.stream('myGraph', {
    sourceNode: p1
})
YIELD node2, score
RETURN node2.name as potential_friend, score
ORDER BY score DESC
LIMIT 10

// 使用 Random Forest 链接预测模型
CALL gds.alpha.ml.linkPrediction.train('myGraph', {
    modelName: 'linkPredictionModel',
    featureProperties: ['adamicAdar', 'commonNeighbors', 'preferentialAttachment']
})

5. 相似度计算

// Jaccard 相似度 - 计算节点间的相似性
MATCH (p1:Person {name: '张三'})
CALL gds.nodeSimilarity.jaccard.stream('myGraph', {
    sourceNode: p1
})
YIELD node2, similarity
RETURN node2.name as similar_person, similarity
ORDER BY similarity DESC
LIMIT 10

// Cosine 相似度 - 基于嵌入向量的相似度
MATCH (p1:Person {name: '张三'}), (p2:Person)
WHERE p1 <> p2
WITH p1, p2, gds.similarity.cosine(p1.embedding, p2.embedding) as similarity
RETURN p2.name, similarity
ORDER BY similarity DESC
LIMIT 10

// Euclidean 距离 - 基于嵌入向量的距离
MATCH (p1:Person {name: '张三'}), (p2:Person)
WHERE p1 <> p2
WITH p1, p2, gds.similarity.euclideanDistance(p1.embedding, p2.embedding) as distance
RETURN p2.name, distance
ORDER BY distance ASC
LIMIT 10

6. 实战案例:推荐系统

使用图算法构建个性化推荐系统

// 步骤1: 计算用户相似度
CALL gds.nodeSimilarity.write('userGraph', {
    writeRelationshipType: 'SIMILAR_TO',
    writeProperty: 'similarityScore',
    topK: 10
})

// 步骤2: 基于相似用户推荐商品
MATCH (targetUser:User {id: 'user1'})-[:SIMILAR_TO]->(similarUser:User)
MATCH (similarUser)-[:RATED]->(product:Product)
WHERE NOT (targetUser)-[:RATED]->(product)
WITH targetUser, product, avg(similarUser.similarityScore) as avg_similarity
RETURN product.name, avg_similarity
ORDER BY avg_similarity DESC
LIMIT 10

// 步骤3: 使用 PageRank 提升热门商品权重
CALL gds.pageRank.stream('productGraph', {
    relationshipWeightProperty: 'rating'
})
YIELD nodeId, score
WITH gds.util.asNode(nodeId) as product, score
ORDER BY score DESC
LIMIT 10
RETURN product.name, score as popularity_score

7. 实战案例:欺诈检测

使用图算法识别欺诈网络模式

// 步骤1: 检测异常连接模式
CALL gds.louvain.stream('transactionGraph')
YIELD nodeId, communityId
WITH communityId, count(*) as community_size
WHERE community_size < 5
RETURN communityId as suspicious_community, community_size

// 步骤2: 计算节点的中心性指标
CALL gds.degree.stream('transactionGraph')
YIELD nodeId, score
WITH gds.util.asNode(nodeId) as node, score
WHERE score > 100
RETURN node.id as high_degree_node, score

// 步骤3: 检测环形交易模式
MATCH (a:Account)-[t1:TRANSFER]->(b:Account)
MATCH (b)-[t2:TRANSFER]->(c:Account)
MATCH (c)-[t3:TRANSFER]->(a)
WHERE t1.amount = t2.amount AND t2.amount = t3.amount
RETURN a.id, b.id, c.id, t1.amount, t1.datetime

// 步骤4: 使用弱连通分量发现孤立欺诈团伙
CALL gds.wcc.stream('transactionGraph')
YIELD nodeId, componentId
WITH componentId, collect(gds.util.asNode(nodeId).id) as accounts
WHERE size(accounts) >= 3 AND size(accounts) <= 10
RETURN componentId, accounts as suspicious_group

8. 实战案例:知识图谱问答

结合图算法和自然语言处理

// 步骤1: 构建知识图谱
CREATE (p:Person {name: '张三', profession: '医生'})
CREATE (h:Hospital {name: '北京医院', location: '北京'})
CREATE (d:Disease {name: '高血压', category: '心血管'})
CREATE (p)-[:WORKS_AT]->(h)
CREATE (p)-[:TREATS]->(d)

// 步骤2: 查询医生治疗过的疾病
MATCH (p:Person {profession: '医生'})-[:TREATS]->(d:Disease)
RETURN p.name as doctor, collect(d.name) as diseases

// 步骤3: 查询某种疾病的治疗专家
MATCH (d:Disease {name: '高血压'})<-[:TREATS]-(p:Person)
RETURN p.name as specialist, p.workplace as hospital

// 步骤4: 使用路径查找发现相关疾病
MATCH (d1:Disease {name: '高血压'})-[:RELATED_TO*1..2]-(d2:Disease)
RETURN d1.name, d2.name, length(path) as distance

9. 图神经网络 (GNN) 应用

// 使用 GraphSAGE 进行节点分类
CALL gds.graphSage.train('myGraph', {
    modelName: 'nodeClassifier',
    featureProperties: ['age', 'income', 'score'],
    embeddingDimension: 128,
    aggregator: 'mean',
    epochs: 10,
    batchSize: 64
})

// 使用训练好的模型进行预测
CALL gds.graphSage.predict.stream('myGraph', {
    modelName: 'nodeClassifier'
})
YIELD nodeId, predictedLabel
RETURN gds.util.asNode(nodeId).name as name, predictedLabel

// 图卷积网络 (GCN) 用于链接预测
CALL gds.alpha.ml.linkPrediction.predict('myGraph', {
    modelName: 'gcnLinkPredictor',
    topK: 10
})
YIELD node1, node2, probability
RETURN node1.name, node2.name, probability
ORDER BY probability DESC

10. 性能优化最佳实践

  • 使用命名图投影:预先创建图投影,避免重复计算
  • 批量处理:使用 `stream` 模式处理大数据集
  • 并行计算:利用 GDS 的并行计算能力
  • 内存管理:合理配置内存参数,避免 OOM
  • 缓存结果:将常用算法结果写入数据库
// 创建命名图投影
CALL gds.graph.project('myGraph', ['Person', 'Company'], ['WORKS_FOR', 'KNOWS'])

// 使用批量流式处理
CALL gds.pageRank.stream.estimate('myGraph', {
    relationshipWeightProperty: 'weight'
})
YIELD requiredMemory

// 写入结果到数据库
CALL gds.pageRank.write('myGraph', {
    writeProperty: 'pagerankScore'
})

// 释放图投影内存
CALL gds.graph.drop('myGraph')

编程语言连接指南

Python 连接 Neo4j

Neo4j 官方提供了 Python 驱动程序 `neo4j-driver`,支持异步和同步操作。

安装驱动

# 使用 pip 安装
pip install neo4j

# 或使用 conda
conda install -c conda-forge neo4j-python-driver

基础连接示例

# basic_connection.py
from neo4j import GraphDatabase

class Neo4jConnection:
    def __init__(self, uri, user, password):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    def close(self):
        self.driver.close()

    def query(self, query, parameters=None):
        with self.driver.session() as session:
            result = session.run(query, parameters)
            return [record for record in result]

# 使用示例
if __name__ == "__main__":
    conn = Neo4jConnection(
        uri="bolt://localhost:7687",
        user="neo4j",
        password="your_password"
    )

    # 执行查询
    result = conn.query("MATCH (n) RETURN n LIMIT 5")
    for record in result:
        print(record)

    conn.close()

创建节点和关系

# create_data.py
from neo4j import GraphDatabase

def create_person(tx, name, age):
    result = tx.run(
        "CREATE (p:Person {name: $name, age: $age}) "
        "RETURN p",
        name=name, age=age
    )
    return result.single()[0]

def create_relationship(tx, person1, person2):
    result = tx.run(
        "MATCH (p1:Person {name: $person1}) "
        "MATCH (p2:Person {name: $person2}) "
        "CREATE (p1)-[:FRIENDS_WITH]->(p2) "
        "RETURN p1, p2",
        person1=person1, person2=person2
    )
    return result.single()

# 使用示例
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))

with driver.session() as session:
    # 创建人员
    session.execute_write(create_person, "张三", 30)
    session.execute_write(create_person, "李四", 28)
    session.execute_write(create_person, "王五", 32)

    # 创建关系
    session.execute_write(create_relationship, "张三", "李四")
    session.execute_write(create_relationship, "李四", "王五")

driver.close()

查询数据

# query_data.py
from neo4j import GraphDatabase

def get_friends(tx, person_name):
    result = tx.run(
        "MATCH (p:Person {name: $name})-[:FRIENDS_WITH]->(friend) "
        "RETURN friend.name as friend_name, friend.age as friend_age",
        name=person_name
    )
    return [{"name": record["friend_name"], "age": record["friend_age"]}
            for record in result]

def get_shortest_path(tx, person1, person2):
    result = tx.run(
        "MATCH path = shortestPath((p1:Person {name: $p1})-[*]-(p2:Person {name: $p2})) "
        "RETURN [node IN nodes(path) | node.name] as path",
        p1=person1, p2=person2
    )
    return result.single()["path"]

# 使用示例
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))

with driver.session() as session:
    # 查询朋友
    friends = session.execute_read(get_friends, "张三")
    print(f"张三的朋友: {friends}")

    # 查询最短路径
    path = session.execute_read(get_shortest_path, "张三", "王五")
    print(f"从张三到王五的路径: {path}")

driver.close()

异步操作示例

# async_example.py
from neo4j import AsyncGraphDatabase

class AsyncNeo4jConnection:
    def __init__(self, uri, user, password):
        self.driver = AsyncGraphDatabase.driver(uri, auth=(user, password))

    async def close(self):
        await self.driver.close()

    async def query(self, query, parameters=None):
        async with self.driver.session() as session:
            result = await session.run(query, parameters)
            return [record async for record in result]

    async def batch_create(self, persons):
        async with self.driver.session() as session:
            for person in persons:
                await session.run(
                    "CREATE (p:Person {name: $name, age: $age})",
                    name=person["name"], age=person["age"]
                )

# 使用示例
import asyncio

async def main():
    conn = AsyncNeo4jConnection(
        uri="bolt://localhost:7687",
        user="neo4j",
        password="password"
    )

    # 批量创建
    persons = [
        {"name": "Alice", "age": 25},
        {"name": "Bob", "age": 30},
        {"name": "Charlie", "age": 28}
    ]
    await conn.batch_create(persons)

    # 查询数据
    result = await conn.query("MATCH (p:Person) RETURN p.name as name")
    for record in result:
        print(record["name"])

    await conn.close()

asyncio.run(main())

使用 OGM (Object Graph Mapper)

# 安装 neomodel
# pip install neomodel

# ogm_example.py
from neomodel import StructuredNode, StringProperty, IntegerProperty, RelationshipTo, config

# 配置连接
config.DATABASE_URL = 'bolt://neo4j:password@localhost:7687'

class Person(StructuredNode):
    name = StringProperty(unique_index=True, required=True)
    age = IntegerProperty()
    friends = RelationshipTo('Person', 'FRIENDS_WITH')

# 使用示例
# 创建节点
person1 = Person(name='张三', age=30).save()
person2 = Person(name='李四', age=28).save()

# 创建关系
person1.friends.connect(person2)

# 查询数据
person = Person.nodes.get(name='张三')
print(f"{person.name} 的年龄: {person.age}")

# 查询朋友
for friend in person.friends:
    print(f"朋友: {friend.name}, 年龄: {friend.age}")

# 批量查询
young_people = Person.nodes.filter(age__lt=30)
for person in young_people:
    print(f"年轻人: {person.name}")

Golang 连接 Neo4j

Neo4j 官方提供了 Go 驱动程序 `github.com/neo4j/neo4j-go-driver/v5`,支持完整的功能。

安装驱动

# 使用 go get 安装
go get github.com/neo4j/neo4j-go-driver/v5/neo4j

基础连接示例

// basic_connection.go
package main

import (
    "fmt"
    "log"

    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func main() {
    driver, err := neo4j.NewDriver(
        "bolt://localhost:7687",
        neo4j.BasicAuth("neo4j", "password", ""))
    if err != nil {
        log.Fatal(err)
    }
    defer driver.Close()

    // 验证连接
    err = driver.VerifyConnectivity()
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("连接成功!")

    // 执行查询
    session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
    defer session.Close()

    result, err := session.Run("MATCH (n) RETURN n LIMIT 5", nil)
    if err != nil {
        log.Fatal(err)
    }

    for result.Next() {
        record := result.Record()
        fmt.Println(record)
    }

    if err = result.Err(); err != nil {
        log.Fatal(err)
    }
}

创建节点和关系

// create_data.go
package main

import (
    "fmt"
    "log"

    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func createPerson(driver neo4j.Driver, name string, age int) error {
    session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
    defer session.Close()

    _, err := session.Run(
        "CREATE (p:Person {name: $name, age: $age})",
        map[string]interface{}{
            "name": name,
            "age":  age,
        })
    return err
}

func createRelationship(driver neo4j.Driver, person1, person2 string) error {
    session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
    defer session.Close()

    _, err := session.Run(
        "MATCH (p1:Person {name: $p1}) "+
            "MATCH (p2:Person {name: $p2}) "+
            "CREATE (p1)-[:FRIENDS_WITH]->(p2)",
        map[string]interface{}{
            "p1": person1,
            "p2": person2,
        })
    return err
}

func main() {
    driver, err := neo4j.NewDriver(
        "bolt://localhost:7687",
        neo4j.BasicAuth("neo4j", "password", ""))
    if err != nil {
        log.Fatal(err)
    }
    defer driver.Close()

    // 创建人员
    persons := []struct {
        name string
        age  int
    }{
        {"张三", 30},
        {"李四", 28},
        {"王五", 32},
    }

    for _, p := range persons {
        if err := createPerson(driver, p.name, p.age); err != nil {
            log.Fatal(err)
        }
        fmt.Printf("创建人员: %s\n", p.name)
    }

    // 创建关系
    relationships := [][2]string{
        {"张三", "李四"},
        {"李四", "王五"},
    }

    for _, rel := range relationships {
        if err := createRelationship(driver, rel[0], rel[1]); err != nil {
            log.Fatal(err)
        }
        fmt.Printf("创建关系: %s -> %s\n", rel[0], rel[1])
    }

    fmt.Println("数据创建完成!")
}

查询数据

// query_data.go
package main

import (
    "fmt"
    "log"

    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

type Friend struct {
    Name string
    Age  int
}

func getFriends(driver neo4j.Driver, personName string) ([]Friend, error) {
    session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
    defer session.Close()

    result, err := session.Run(
        "MATCH (p:Person {name: $name})-[:FRIENDS_WITH]->(friend) "+
            "RETURN friend.name as name, friend.age as age",
        map[string]interface{}{
            "name": personName,
        })
    if err != nil {
        return nil, err
    }

    var friends []Friend
    for result.Next() {
        record := result.Record()
        name, _ := record.Get("name")
        age, _ := record.Get("age")
        friends = append(friends, Friend{
            Name: name.(string),
            Age:  int(age.(int64)),
        })
    }

    if err = result.Err(); err != nil {
        return nil, err
    }

    return friends, nil
}

func getShortestPath(driver neo4j.Driver, person1, person2 string) ([]string, error) {
    session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
    defer session.Close()

    result, err := session.Run(
        "MATCH path = shortestPath((p1:Person {name: $p1})-[*]-(p2:Person {name: $p2})) "+
            "RETURN [node IN nodes(path) | node.name] as path",
        map[string]interface{}{
            "p1": person1,
            "p2": person2,
        })
    if err != nil {
        return nil, err
    }

    if result.Next() {
        record := result.Record()
        path, _ := record.Get("path")
        return path.([]string), nil
    }

    return nil, nil
}

func main() {
    driver, err := neo4j.NewDriver(
        "bolt://localhost:7687",
        neo4j.BasicAuth("neo4j", "password", ""))
    if err != nil {
        log.Fatal(err)
    }
    defer driver.Close()

    // 查询朋友
    friends, err := getFriends(driver, "张三")
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("张三的朋友:\n")
    for _, friend := range friends {
        fmt.Printf("  - %s (年龄: %d)\n", friend.Name, friend.Age)
    }

    // 查询最短路径
    path, err := getShortestPath(driver, "张三", "王五")
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("从张三到王五的路径: %v\n", path)
}

事务处理

// transaction.go
package main

import (
    "fmt"
    "log"

    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func transferMoney(driver neo4j.Driver, fromUser, toUser string, amount int) error {
    session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
    defer session.Close()

    // 使用事务
    _, err := session.WriteTransaction(func(tx neo4j.Transaction) (interface{}, error) {
        // 检查发送者余额
        result, err := tx.Run(
            "MATCH (u:User {name: $name}) RETURN u.balance as balance",
            map[string]interface{}{"name": fromUser})
        if err != nil {
            return nil, err
        }

        if !result.Next() {
            return nil, fmt.Errorf("用户不存在: %s", fromUser)
        }

        balance, _ := result.Record().Get("balance")
        if balance.(int64) < int64(amount) {
            return nil, fmt.Errorf("余额不足")
        }

        // 扣除发送者余额
        _, err = tx.Run(
            "MATCH (u:User {name: $name}) SET u.balance = u.balance - $amount",
            map[string]interface{}{
                "name":   fromUser,
                "amount": amount,
            })
        if err != nil {
            return nil, err
        }

        // 增加接收者余额
        _, err = tx.Run(
            "MATCH (u:User {name: $name}) SET u.balance = u.balance + $amount",
            map[string]interface{}{
                "name":   toUser,
                "amount": amount,
            })
        if err != nil {
            return nil, err
        }

        // 创建交易记录
        _, err = tx.Run(
            "MATCH (from:User {name: $from}), (to:User {name: $to}) "+
                "CREATE (from)-[:TRANSFERRED {amount: $amount, date: datetime()}]->(to)",
            map[string]interface{}{
                "from":   fromUser,
                "to":     toUser,
                "amount": amount,
            })
        if err != nil {
            return nil, err
        }

        return nil, nil
    })

    return err
}

func main() {
    driver, err := neo4j.NewDriver(
        "bolt://localhost:7687",
        neo4j.BasicAuth("neo4j", "password", ""))
    if err != nil {
        log.Fatal(err)
    }
    defer driver.Close()

    // 执行转账
    err = transferMoney(driver, "张三", "李四", 100)
    if err != nil {
        log.Printf("转账失败: %v\n", err)
    } else {
        fmt.Println("转账成功!")
    }
}

连接池配置

// connection_pool.go
package main

import (
    "fmt"
    "log"
    "time"

    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func main() {
    // 配置连接池
    driver, err := neo4j.NewDriver(
        "bolt://localhost:7687",
        neo4j.BasicAuth("neo4j", "password", ""),
        func(config *neo4j.Config) {
            // 连接池大小
            config.MaxConnectionPoolSize = 50
            config.MaxTransactionRetryTime = 30 * time.Second
            config.ConnectionAcquisitionTimeout = 2 * time.Minute

            // 重试配置
            config.MaxConnectionLifetime = 1 * time.Hour
            config.ConnectionTimeout = 30 * time.Second

            // 日志
            config.Log = neo4j.ConsoleLogger(neo4j.INFO)
        })
    if err != nil {
        log.Fatal(err)
    }
    defer driver.Close()

    // 验证连接
    err = driver.VerifyConnectivity()
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("连接池状态:\n")
    fmt.Printf("  总连接数: %d\n", driver.TotalConnections())
    fmt.Printf("  空闲连接数: %d\n", driver.IdleConnections())

    // 执行查询
    session := driver.NewSession(neo4j.SessionConfig{
        AccessMode:      neo4j.AccessModeRead,
        Bookmarks:       []string{},
        DatabaseName:    "neo4j",
        FetchSize:       1000,
        ImpersonatedUser: "",
    })
    defer session.Close()

    result, err := session.Run("MATCH (n) RETURN count(n) as count", nil)
    if err != nil {
        log.Fatal(err)
    }

    if result.Next() {
        count, _ := result.Record().Get("count")
        fmt.Printf("节点总数: %d\n", count)
    }
}

使用 ORM (GORM-like) 封装

// orm_example.go
package main

import (
    "fmt"
    "log"

    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

// Repository 封装数据库操作
type PersonRepository struct {
    driver neo4j.Driver
}

type Person struct {
    Name string
    Age  int
}

func NewPersonRepository(driver neo4j.Driver) *PersonRepository {
    return &PersonRepository{driver: driver}
}

func (r *PersonRepository) Create(name string, age int) error {
    session := r.driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
    defer session.Close()

    _, err := session.Run(
        "CREATE (p:Person {name: $name, age: $age})",
        map[string]interface{}{
            "name": name,
            "age":  age,
        })
    return err
}

func (r *PersonRepository) FindByName(name string) (*Person, error) {
    session := r.driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
    defer session.Close()

    result, err := session.Run(
        "MATCH (p:Person {name: $name}) RETURN p.name as name, p.age as age",
        map[string]interface{}{"name": name})
    if err != nil {
        return nil, err
    }

    if result.Next() {
        record := result.Record()
        name, _ := record.Get("name")
        age, _ := record.Get("age")
        return &Person{
            Name: name.(string),
            Age:  int(age.(int64)),
        }, nil
    }

    return nil, fmt.Errorf("未找到用户")
}

func (r *PersonRepository) FindAll(limit int) ([]Person, error) {
    session := r.driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
    defer session.Close()

    result, err := session.Run(
        "MATCH (p:Person) RETURN p.name as name, p.age as age LIMIT $limit",
        map[string]interface{}{"limit": limit})
    if err != nil {
        return nil, err
    }

    var persons []Person
    for result.Next() {
        record := result.Record()
        name, _ := record.Get("name")
        age, _ := record.Get("age")
        persons = append(persons, Person{
            Name: name.(string),
            Age:  int(age.(int64)),
        })
    }

    return persons, nil
}

func (r *PersonRepository) Delete(name string) error {
    session := r.driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
    defer session.Close()

    _, err := session.Run(
        "MATCH (p:Person {name: $name}) DETACH DELETE p",
        map[string]interface{}{"name": name})
    return err
}

func main() {
    driver, err := neo4j.NewDriver(
        "bolt://localhost:7687",
        neo4j.BasicAuth("neo4j", "password", ""))
    if err != nil {
        log.Fatal(err)
    }
    defer driver.Close()

    repo := NewPersonRepository(driver)

    // 创建
    err = repo.Create("张三", 30)
    if err != nil {
        log.Fatal(err)
    }

    // 查询单个
    person, err := repo.FindByName("张三")
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("找到用户: %s, 年龄: %d\n", person.Name, person.Age)

    // 查询所有
    persons, err := repo.FindAll(10)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("所有用户:\n")
    for _, p := range persons {
        fmt.Printf("  - %s (年龄: %d)\n", p.Name, p.Age)
    }

    // 删除
    err = repo.Delete("张三")
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("删除成功!")
}

最佳实践

  • 连接管理:使用连接池,避免频繁创建和销毁连接
  • 事务处理:确保数据一致性,使用事务处理复杂操作
  • 错误处理:妥善处理错误和异常情况
  • 参数化查询:使用参数化查询防止注入攻击
  • 资源释放:确保 session、result 等资源正确释放
  • 性能优化:使用批量操作、索引优化查询性能

实战示例

基于图的推荐系统

利用用户行为和物品关系构建知识图谱,实现精准推荐。

// 查找与用户相似的其他用户
MATCH (u:User {id: 'user1'})-[:RATED]->(p:Product)<-[:RATED]-(other:User)
WITH u, other, count(p) as common_products
WHERE common_products > 2
MATCH (other)-[:RATED]->(rec:Product)
WHERE NOT (u)-[:RATED]->(rec)
RETURN rec.name, avg(r.rating) as avg_rating, count(r) as rating_count
ORDER BY avg_rating DESC
LIMIT 10

欺诈检测系统

发现异常的交易模式和关联网络,识别潜在欺诈行为。

// 检测环形转账模式
MATCH (a:Account)-[t1:TRANSFER]->(b:Account)
MATCH (b)-[t2:TRANSFER]->(c:Account)
MATCH (c)-[t3:TRANSFER]->(a)
WHERE t1.amount = t2.amount AND t2.amount = t3.amount
RETURN a.id, b.id, c.id, t1.amount, t1.datetime

// 检测短时间内多笔交易
MATCH (a:Account)-[t:TRANSFER]->(b:Account)
WHERE t.datetime > datetime() - duration('PT1H')
WITH a, count(t) as transfer_count
WHERE transfer_count > 50
RETURN a.id, transfer_count

社交网络分析

分析社交网络中的影响力传播、社群发现和关键节点识别。

// 查找影响力最大的用户(使用 PageRank)
CALL algo.pageRank.stream('User', 'FOLLOWS')
YIELD nodeId, score
RETURN algo.getNodeById(nodeId).name as user, score
ORDER BY score DESC
LIMIT 10

// 发现社群(使用 Louvain 算法)
CALL algo.louvain.stream('User', 'FOLLOWS')
YIELD nodeId, community
RETURN community, count(*) as size
ORDER BY size DESC

// 查找桥梁节点(连接不同社群)
MATCH (u:User)-[:FOLLOWS]->(friend:User)
WITH u, count(DISTINCT friend.community) as community_count
WHERE community_count > 1
RETURN u.name, community_count
ORDER BY community_count DESC