Neo4j 基础知识

什么是图数据库

图数据库是一种以图结构(节点和边)来存储、查询和管理数据的数据库系统。它能够高效地处理复杂的数据关系,特别适合社交网络、推荐系统、欺诈检测等场景。

核心概念

  • 节点(Node):代表实体,如人、公司、产品等
  • 关系(Relationship):代表节点之间的连接
  • 属性(Property):存储在节点和关系上的数据
  • 标签(Label):对节点进行分类的标记

为什么选择 Neo4j

  • 原生图存储,性能卓越
  • 支持 ACID 事务
  • 强大的 Cypher 查询语言
  • 丰富的生态系统和工具支持
  • 企业级安全和扩展性

Cypher 查询语言

基础语法

// 创建节点
CREATE (p:Person {name: '张三', age: 30})

// 创建关系
MATCH (p:Person {name: '张三'}), (c:Company {name: 'ABC公司'})
CREATE (p)-[:WORKS_FOR {since: 2020}]->(c)

// 查询节点
MATCH (p:Person)
WHERE p.age > 25
RETURN p.name, p.age

// 查询关系
MATCH (p:Person)-[r:WORKS_FOR]->(c:Company)
RETURN p.name, c.name, r.since

高级查询

// 多跳查询
MATCH (p:Person)-[:FRIEND_OF]->(friend)-[:WORKS_FOR]->(c:Company)
RETURN p.name, friend.name, c.name

// 可变长度路径
MATCH path = (p:Person)-[:KNOWS*1..3]->(other)
RETURN p.name, other.name, length(path) as degrees

// 聚合查询
MATCH (c:Company)<-[:WORKS_FOR]-(p:Person)
RETURN c.name, count(p) as employee_count
ORDER BY employee_count DESC

// 推荐查询(共同好友)
MATCH (person:Person {name: '张三'})-[:FRIEND_OF]->(friend)-[:FRIEND_OF]->(potential)
WHERE NOT (person)-[:FRIEND_OF]->(potential) AND person <> potential
RETURN potential.name, count(friend) as mutual_friends
ORDER BY mutual_friends DESC

模式匹配

// 可选关系
MATCH (p:Person)-[r:WORKS_FOR?]->(c:Company)
RETURN p.name, r.since, c.name

// 可变长度路径(最短路径)
MATCH path = shortestPath((p1:Person {name: '张三'})-[*]-(p2:Person {name: '李四'}))
RETURN path

// 所有简单路径
MATCH path = (p1:Person)-[:KNOWS*]->(p2:Person)
WHERE p1.name = '张三' AND p2.name = '李四'
RETURN path

// 有向路径查询
MATCH (p1:Person)-[:MANAGES*1..3]->(p2:Person)
RETURN p1.name as manager, p2.name as subordinate

聚合与分组

// 分组统计
MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
RETURN c.name, count(p) as employee_count, 
       avg(p.age) as avg_age, 
       max(p.salary) as max_salary
ORDER BY employee_count DESC

// 多级分组
MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
RETURN c.industry, c.name, count(p) as count
ORDER BY c.industry, count DESC

// 列表聚合
MATCH (p:Person)-[:HAS_SKILL]->(s:Skill)
RETURN p.name, collect(s.name) as skills

// 去重聚合
MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
RETURN c.name, count(DISTINCT p.department) as departments

常用函数

// 字符串函数
MATCH (p:Person)
RETURN p.name, 
       toUpper(p.name) as upper_name,
       substring(p.name, 0, 3) as first_three,
       size(p.name) as name_length

// 数学函数
MATCH (p:Person)
RETURN p.name, p.age, 
       abs(p.age - 30) as age_diff,
       round(p.age * 1.5) as rounded,
       p.age % 10 as remainder

// 日期时间函数
MATCH (p:Person)-[r:WORKS_FOR]->(c:Company)
RETURN p.name, 
       date().year - p.birthYear as current_age,
       duration.between(r.since, date()).years as years_worked,
       datetime().year as current_year

// 路径函数
MATCH path = (p:Person)-[:KNOWS*]->(other)
RETURN p.name, other.name, 
       length(path) as hops,
       nodes(path) as path_nodes,
       relationships(path) as path_rels

子查询

// CALL 子查询(2026新增)
MATCH (p:Person)
CALL {
    WITH p
    MATCH (p)-[:WORKS_FOR]->(c:Company)
    RETURN count(c) as company_count
}
RETURN p.name, company_count

// EXISTS 子查询
MATCH (p:Person)
WHERE EXISTS {
    MATCH (p)-[:WORKS_FOR]->(:Company {name: 'ABC公司'})
}
RETURN p.name

// 列表投影
MATCH (p:Person)
RETURN p.name, [skill IN p.skills WHERE skill.level > 3 | skill.name] as advanced_skills

数据修改

// 创建节点和关系
CREATE (p:Person {name: '王五', age: 35})
CREATE (c:Company {name: 'XYZ公司', founded: 2015})
CREATE (p)-[:WORKS_FOR {since: 2021, position: '经理'}]->(c)

// 更新节点属性
MATCH (p:Person {name: '张三'})
SET p.age = 31, p.salary = 50000

// 添加/删除标签
MATCH (p:Person {name: '张三'})
SET p:Manager
MATCH (p:Person {name: '张三'})
REMOVE p:Manager

// 删除节点和关系
MATCH (p:Person {name: '王五'})-[r:WORKS_FOR]->()
DELETE r, p

// 批量更新
MATCH (p:Person)
WHERE p.age > 30
SET p.salary = p.salary * 1.1

性能优化技巧

// 使用索引加速查询
CREATE INDEX person_name_index FOR (p:Person) ON (p.name)
CREATE INDEX company_industry_index FOR (c:Company) ON (c.industry)

// 使用约束保证数据唯一性
CREATE CONSTRAINT person_id_unique FOR (p:Person) REQUIRE p.id IS UNIQUE

// 优化:先使用索引过滤
MATCH (p:Person {name: '张三'})-[r:WORKS_FOR]->(c:Company)
WHERE c.industry = '技术'
RETURN p, c

// 使用 LIMIT 限制结果集
MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
RETURN p.name, c.name
LIMIT 100

// 使用 PROFILE 分析查询性能
PROFILE MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
RETURN p.name, c.name

// 避免 Cartesian product
// 不好的写法:
MATCH (p:Person), (c:Company)
RETURN p, c

// 好的写法:
MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
RETURN p, c

图算法(Graph Data Science)

// PageRank 算法 - 识别重要节点
CALL gds.pageRank.stream('myGraph')
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name as name, score
ORDER BY score DESC
LIMIT 10

// 社区发现 - Louvain 算法
CALL gds.louvain.stream('myGraph')
YIELD nodeId, communityId
RETURN communityId, count(*) as community_size
ORDER BY community_size DESC

// 最短路径算法
MATCH (source:Person {name: '张三'}), (target:Person {name: '李四'})
CALL gds.shortestPath.stream('myGraph', {
    sourceNode: source,
    targetNode: target
})
YIELD index, nodeIds
RETURN [nodeId IN nodeIds | gds.util.asNode(nodeId).name] as path

// 连通分量
CALL gds.wcc.stream('myGraph')
YIELD nodeId, componentId
RETURN componentId, count(*) as size
ORDER BY size DESC

实用查询示例

// 查找没有好友的人
MATCH (p:Person)
WHERE NOT (p)-[:FRIEND_OF]->()
RETURN p.name

// 查找拥有最多技能的人
MATCH (p:Person)-[r:HAS_SKILL]->(s:Skill)
WITH p, count(s) as skill_count
ORDER BY skill_count DESC
RETURN p.name, skill_count
LIMIT 5

// 查找共同技能
MATCH (p1:Person {name: '张三'})-[:HAS_SKILL]->(s:Skill)<-[:HAS_SKILL]-(p2:Person)
RETURN p2.name, collect(s.name) as common_skills
ORDER BY size(common_skills) DESC

// 查找技能路径
MATCH (p1:Person {name: '张三'})-[:HAS_SKILL*]->(s:Skill)<-[:HAS_SKILL*]-(p2:Person {name: '李四'})
RETURN p1.name, p2.name, collect(DISTINCT s.name) as connecting_skills

// 时间窗口查询
MATCH (p:Person)-[r:WORKS_FOR]->(c:Company)
WHERE r.since >= date('2020-01-01') AND r.since <= date('2025-12-31')
RETURN p.name, c.name, r.since
ORDER BY r.since DESC

2026 新增特性

  • 模式匹配优化:更智能的查询规划器
  • 子查询支持:CALL {} 语法增强
  • 函数扩展:新增 50+ 内置函数
  • 性能提升:查询速度提升 40%

数据建模最佳实践

建模原则

1. 关注关系而非表结构

图数据库建模应该从关系的角度思考,而不是传统的关系型数据库表结构。让数据关系自然呈现。

2. 使用标签进行分类

合理使用标签来对节点进行分类,便于查询和索引。一个节点可以有多个标签。

3. 关系命名要语义化

关系名称应该清楚地表达两个节点之间的语义,如 WORKS_FOR、FRIENDS_WITH、LOCATED_IN 等。

4. 避免过度规范化

图数据库允许一定的数据冗余,为了提高查询性能,可以在节点上存储必要的属性。

社交网络建模示例

// 创建用户
CREATE (u1:User {id: 'u1', name: 'Alice', age: 28})
CREATE (u2:User {id: 'u2', name: 'Bob', age: 32})
CREATE (u3:User {id: 'u3', name: 'Charlie', age: 25})

// 创建帖子
CREATE (p1:Post {id: 'p1', content: 'Hello Neo4j!', created: datetime()})

// 创建关系
MATCH (u1:User {id: 'u1'}), (p1:Post {id: 'p1'})
CREATE (u1)-[:POSTED]->(p1)

MATCH (u1:User {id: 'u1'}), (u2:User {id: 'u2'})
CREATE (u1)-[:FOLLOWS]->(u2)
CREATE (u2)-[:FRIENDS_WITH]->(u1)

Neo4j 5.x/6.x 新特性

性能提升

查询性能提升 40%,存储空间节省 30%。新的查询优化器更智能,自动选择最佳执行计划。

安全性增强

支持基于角色的访问控制(RBAC),细粒度的权限管理,加密连接和存储。

云原生支持

完整的云原生架构,支持 Kubernetes 部署,自动扩缩容,多区域复制。

AI/ML 集成

内置图算法库(Graph Data Science),支持机器学习模型训练和推理。包含65+图算法、图神经网络、预测模型等。

Cypher 增强

支持子查询、窗口函数、更强大的模式匹配,语法更接近 SQL。

驱动更新

所有官方驱动更新到 5.x 版本,支持异步编程,性能优化。

AI/ML 集成详解

Neo4j Graph Data Science (GDS) 概述

Neo4j Graph Data Science 是一个强大的图算法和机器学习库,提供了65+种图算法,支持节点嵌入、链接预测、图神经网络等高级功能。

  • 图算法库:中心性算法、社区发现算法、路径查找算法
  • 机器学习管道:特征工程、模型训练、预测和评估
  • 图神经网络:图卷积网络、图注意力网络
  • 节点嵌入:Node2Vec、FastRP、GraphSAGE

1. 中心性算法 - 识别关键节点

// PageRank - 识别网络中最有影响力的节点
CALL gds.pageRank.stream('myGraph')
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name as name, score
ORDER BY score DESC
LIMIT 10

// Betweenness Centrality - 识别网络中的桥梁节点
CALL gds.betweenness.stream('myGraph')
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name as name, score
ORDER BY score DESC
LIMIT 10

// Degree Centrality - 识别连接最多的节点
CALL gds.degree.stream('myGraph')
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name as name, score
ORDER BY score DESC
LIMIT 10

2. 社区发现算法 - 识别社群结构

// Louvain 算法 - 发现网络中的社区
CALL gds.louvain.stream('myGraph')
YIELD nodeId, communityId
RETURN communityId, count(*) as community_size
ORDER BY community_size DESC

// Label Propagation - 快速社区发现
CALL gds.labelPropagation.stream('myGraph')
YIELD nodeId, communityId
RETURN communityId, collect(gds.util.asNode(nodeId).name) as members
ORDER BY size(members) DESC
LIMIT 5

// Weakly Connected Components - 识别连通分量
CALL gds.wcc.stream('myGraph')
YIELD nodeId, componentId
RETURN componentId, count(*) as size
ORDER BY size DESC

3. 节点嵌入 - 将节点转换为向量表示

// FastRP - 快速随机投影嵌入
CALL gds.fastRP.write('myGraph', {
    embeddingDimension: 256,
    relationshipWeightProperty: 'weight',
    writeProperty: 'embedding'
})

// Node2Vec - 基于随机游走的节点嵌入
CALL gds.node2vec.stream('myGraph', {
    embeddingDimension: 128,
    walkLength: 80,
    walksPerNode: 10,
    returnFactor: 1.0,
    inOutFactor: 1.0
})
YIELD nodeId, embedding
RETURN gds.util.asNode(nodeId).name as name, embedding[0..5] as preview

// GraphSAGE - 图神经网络嵌入
CALL gds.graphSage.train('myGraph', {
    modelName: 'myModel',
    featureProperties: ['age', 'salary'],
    embeddingDimension: 64,
    aggregator: 'mean',
    epochs: 5
})

4. 链接预测 - 预测未来可能的关系

// 使用 Adamic-Adar 系数预测链接
MATCH (p1:Person)
CALL gds.linkPrediction.adamicAdar.stream('myGraph', {
    sourceNode: p1,
    targetCandidates: [p2, p3, p4]
})
YIELD node1, node2, score
RETURN node1.name as person1, node2.name as person2, score
ORDER BY score DESC
LIMIT 10

// 使用 Common Neighbors 预测链接
MATCH (p1:Person {name: '张三'})
CALL gds.linkPrediction.commonNeighbors.stream('myGraph', {
    sourceNode: p1
})
YIELD node2, score
RETURN node2.name as potential_friend, score
ORDER BY score DESC
LIMIT 10

// 使用 Random Forest 链接预测模型
CALL gds.alpha.ml.linkPrediction.train('myGraph', {
    modelName: 'linkPredictionModel',
    featureProperties: ['adamicAdar', 'commonNeighbors', 'preferentialAttachment']
})

5. 相似度计算

// Jaccard 相似度 - 计算节点间的相似性
MATCH (p1:Person {name: '张三'})
CALL gds.nodeSimilarity.jaccard.stream('myGraph', {
    sourceNode: p1
})
YIELD node2, similarity
RETURN node2.name as similar_person, similarity
ORDER BY similarity DESC
LIMIT 10

// Cosine 相似度 - 基于嵌入向量的相似度
MATCH (p1:Person {name: '张三'}), (p2:Person)
WHERE p1 <> p2
WITH p1, p2, gds.similarity.cosine(p1.embedding, p2.embedding) as similarity
RETURN p2.name, similarity
ORDER BY similarity DESC
LIMIT 10

// Euclidean 距离 - 基于嵌入向量的距离
MATCH (p1:Person {name: '张三'}), (p2:Person)
WHERE p1 <> p2
WITH p1, p2, gds.similarity.euclideanDistance(p1.embedding, p2.embedding) as distance
RETURN p2.name, distance
ORDER BY distance ASC
LIMIT 10

6. 实战案例:推荐系统

使用图算法构建个性化推荐系统

// 步骤1: 计算用户相似度
CALL gds.nodeSimilarity.write('userGraph', {
    writeRelationshipType: 'SIMILAR_TO',
    writeProperty: 'similarityScore',
    topK: 10
})

// 步骤2: 基于相似用户推荐商品
MATCH (targetUser:User {id: 'user1'})-[:SIMILAR_TO]->(similarUser:User)
MATCH (similarUser)-[:RATED]->(product:Product)
WHERE NOT (targetUser)-[:RATED]->(product)
WITH targetUser, product, avg(similarUser.similarityScore) as avg_similarity
RETURN product.name, avg_similarity
ORDER BY avg_similarity DESC
LIMIT 10

// 步骤3: 使用 PageRank 提升热门商品权重
CALL gds.pageRank.stream('productGraph', {
    relationshipWeightProperty: 'rating'
})
YIELD nodeId, score
WITH gds.util.asNode(nodeId) as product, score
ORDER BY score DESC
LIMIT 10
RETURN product.name, score as popularity_score

7. 实战案例:欺诈检测

使用图算法识别欺诈网络模式

// 步骤1: 检测异常连接模式
CALL gds.louvain.stream('transactionGraph')
YIELD nodeId, communityId
WITH communityId, count(*) as community_size
WHERE community_size < 5
RETURN communityId as suspicious_community, community_size

// 步骤2: 计算节点的中心性指标
CALL gds.degree.stream('transactionGraph')
YIELD nodeId, score
WITH gds.util.asNode(nodeId) as node, score
WHERE score > 100
RETURN node.id as high_degree_node, score

// 步骤3: 检测环形交易模式
MATCH (a:Account)-[t1:TRANSFER]->(b:Account)
MATCH (b)-[t2:TRANSFER]->(c:Account)
MATCH (c)-[t3:TRANSFER]->(a)
WHERE t1.amount = t2.amount AND t2.amount = t3.amount
RETURN a.id, b.id, c.id, t1.amount, t1.datetime

// 步骤4: 使用弱连通分量发现孤立欺诈团伙
CALL gds.wcc.stream('transactionGraph')
YIELD nodeId, componentId
WITH componentId, collect(gds.util.asNode(nodeId).id) as accounts
WHERE size(accounts) >= 3 AND size(accounts) <= 10
RETURN componentId, accounts as suspicious_group

8. 实战案例:知识图谱问答

结合图算法和自然语言处理

// 步骤1: 构建知识图谱
CREATE (p:Person {name: '张三', profession: '医生'})
CREATE (h:Hospital {name: '北京医院', location: '北京'})
CREATE (d:Disease {name: '高血压', category: '心血管'})
CREATE (p)-[:WORKS_AT]->(h)
CREATE (p)-[:TREATS]->(d)

// 步骤2: 查询医生治疗过的疾病
MATCH (p:Person {profession: '医生'})-[:TREATS]->(d:Disease)
RETURN p.name as doctor, collect(d.name) as diseases

// 步骤3: 查询某种疾病的治疗专家
MATCH (d:Disease {name: '高血压'})<-[:TREATS]-(p:Person)
RETURN p.name as specialist, p.workplace as hospital

// 步骤4: 使用路径查找发现相关疾病
MATCH (d1:Disease {name: '高血压'})-[:RELATED_TO*1..2]-(d2:Disease)
RETURN d1.name, d2.name, length(path) as distance

9. 图神经网络 (GNN) 应用

// 使用 GraphSAGE 进行节点分类
CALL gds.graphSage.train('myGraph', {
    modelName: 'nodeClassifier',
    featureProperties: ['age', 'income', 'score'],
    embeddingDimension: 128,
    aggregator: 'mean',
    epochs: 10,
    batchSize: 64
})

// 使用训练好的模型进行预测
CALL gds.graphSage.predict.stream('myGraph', {
    modelName: 'nodeClassifier'
})
YIELD nodeId, predictedLabel
RETURN gds.util.asNode(nodeId).name as name, predictedLabel

// 图卷积网络 (GCN) 用于链接预测
CALL gds.alpha.ml.linkPrediction.predict('myGraph', {
    modelName: 'gcnLinkPredictor',
    topK: 10
})
YIELD node1, node2, probability
RETURN node1.name, node2.name, probability
ORDER BY probability DESC

10. 性能优化最佳实践

  • 使用命名图投影:预先创建图投影,避免重复计算
  • 批量处理:使用 `stream` 模式处理大数据集
  • 并行计算:利用 GDS 的并行计算能力
  • 内存管理:合理配置内存参数,避免 OOM
  • 缓存结果:将常用算法结果写入数据库
// 创建命名图投影
CALL gds.graph.project('myGraph', ['Person', 'Company'], ['WORKS_FOR', 'KNOWS'])

// 使用批量流式处理
CALL gds.pageRank.stream.estimate('myGraph', {
    relationshipWeightProperty: 'weight'
})
YIELD requiredMemory

// 写入结果到数据库
CALL gds.pageRank.write('myGraph', {
    writeProperty: 'pagerankScore'
})

// 释放图投影内存
CALL gds.graph.drop('myGraph')

编程语言连接指南

Python 连接 Neo4j

Neo4j 官方提供了 Python 驱动程序 `neo4j-driver`,支持异步和同步操作。

安装驱动

# 使用 pip 安装
pip install neo4j

# 或使用 conda
conda install -c conda-forge neo4j-python-driver

基础连接示例

# basic_connection.py
from neo4j import GraphDatabase

class Neo4jConnection:
    def __init__(self, uri, user, password):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    def close(self):
        self.driver.close()

    def query(self, query, parameters=None):
        with self.driver.session() as session:
            result = session.run(query, parameters)
            return [record for record in result]

# 使用示例
if __name__ == "__main__":
    conn = Neo4jConnection(
        uri="bolt://localhost:7687",
        user="neo4j",
        password="your_password"
    )

    # 执行查询
    result = conn.query("MATCH (n) RETURN n LIMIT 5")
    for record in result:
        print(record)

    conn.close()

创建节点和关系

# create_data.py
from neo4j import GraphDatabase

def create_person(tx, name, age):
    result = tx.run(
        "CREATE (p:Person {name: $name, age: $age}) "
        "RETURN p",
        name=name, age=age
    )
    return result.single()[0]

def create_relationship(tx, person1, person2):
    result = tx.run(
        "MATCH (p1:Person {name: $person1}) "
        "MATCH (p2:Person {name: $person2}) "
        "CREATE (p1)-[:FRIENDS_WITH]->(p2) "
        "RETURN p1, p2",
        person1=person1, person2=person2
    )
    return result.single()

# 使用示例
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))

with driver.session() as session:
    # 创建人员
    session.execute_write(create_person, "张三", 30)
    session.execute_write(create_person, "李四", 28)
    session.execute_write(create_person, "王五", 32)

    # 创建关系
    session.execute_write(create_relationship, "张三", "李四")
    session.execute_write(create_relationship, "李四", "王五")

driver.close()

查询数据

# query_data.py
from neo4j import GraphDatabase

def get_friends(tx, person_name):
    result = tx.run(
        "MATCH (p:Person {name: $name})-[:FRIENDS_WITH]->(friend) "
        "RETURN friend.name as friend_name, friend.age as friend_age",
        name=person_name
    )
    return [{"name": record["friend_name"], "age": record["friend_age"]}
            for record in result]

def get_shortest_path(tx, person1, person2):
    result = tx.run(
        "MATCH path = shortestPath((p1:Person {name: $p1})-[*]-(p2:Person {name: $p2})) "
        "RETURN [node IN nodes(path) | node.name] as path",
        p1=person1, p2=person2
    )
    return result.single()["path"]

# 使用示例
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))

with driver.session() as session:
    # 查询朋友
    friends = session.execute_read(get_friends, "张三")
    print(f"张三的朋友: {friends}")

    # 查询最短路径
    path = session.execute_read(get_shortest_path, "张三", "王五")
    print(f"从张三到王五的路径: {path}")

driver.close()

异步操作示例

# async_example.py
from neo4j import AsyncGraphDatabase

class AsyncNeo4jConnection:
    def __init__(self, uri, user, password):
        self.driver = AsyncGraphDatabase.driver(uri, auth=(user, password))

    async def close(self):
        await self.driver.close()

    async def query(self, query, parameters=None):
        async with self.driver.session() as session:
            result = await session.run(query, parameters)
            return [record async for record in result]

    async def batch_create(self, persons):
        async with self.driver.session() as session:
            for person in persons:
                await session.run(
                    "CREATE (p:Person {name: $name, age: $age})",
                    name=person["name"], age=person["age"]
                )

# 使用示例
import asyncio

async def main():
    conn = AsyncNeo4jConnection(
        uri="bolt://localhost:7687",
        user="neo4j",
        password="password"
    )

    # 批量创建
    persons = [
        {"name": "Alice", "age": 25},
        {"name": "Bob", "age": 30},
        {"name": "Charlie", "age": 28}
    ]
    await conn.batch_create(persons)

    # 查询数据
    result = await conn.query("MATCH (p:Person) RETURN p.name as name")
    for record in result:
        print(record["name"])

    await conn.close()

asyncio.run(main())

使用 OGM (Object Graph Mapper)

# 安装 neomodel
# pip install neomodel

# ogm_example.py
from neomodel import StructuredNode, StringProperty, IntegerProperty, RelationshipTo, config

# 配置连接
config.DATABASE_URL = 'bolt://neo4j:password@localhost:7687'

class Person(StructuredNode):
    name = StringProperty(unique_index=True, required=True)
    age = IntegerProperty()
    friends = RelationshipTo('Person', 'FRIENDS_WITH')

# 使用示例
# 创建节点
person1 = Person(name='张三', age=30).save()
person2 = Person(name='李四', age=28).save()

# 创建关系
person1.friends.connect(person2)

# 查询数据
person = Person.nodes.get(name='张三')
print(f"{person.name} 的年龄: {person.age}")

# 查询朋友
for friend in person.friends:
    print(f"朋友: {friend.name}, 年龄: {friend.age}")

# 批量查询
young_people = Person.nodes.filter(age__lt=30)
for person in young_people:
    print(f"年轻人: {person.name}")

Golang 连接 Neo4j

Neo4j 官方提供了 Go 驱动程序 `github.com/neo4j/neo4j-go-driver/v5`,支持完整的功能。

安装驱动

# 使用 go get 安装
go get github.com/neo4j/neo4j-go-driver/v5/neo4j

基础连接示例

// basic_connection.go
package main

import (
    "fmt"
    "log"

    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func main() {
    driver, err := neo4j.NewDriver(
        "bolt://localhost:7687",
        neo4j.BasicAuth("neo4j", "password", ""))
    if err != nil {
        log.Fatal(err)
    }
    defer driver.Close()

    // 验证连接
    err = driver.VerifyConnectivity()
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("连接成功!")

    // 执行查询
    session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
    defer session.Close()

    result, err := session.Run("MATCH (n) RETURN n LIMIT 5", nil)
    if err != nil {
        log.Fatal(err)
    }

    for result.Next() {
        record := result.Record()
        fmt.Println(record)
    }

    if err = result.Err(); err != nil {
        log.Fatal(err)
    }
}

创建节点和关系

// create_data.go
package main

import (
    "fmt"
    "log"

    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func createPerson(driver neo4j.Driver, name string, age int) error {
    session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
    defer session.Close()

    _, err := session.Run(
        "CREATE (p:Person {name: $name, age: $age})",
        map[string]interface{}{
            "name": name,
            "age":  age,
        })
    return err
}

func createRelationship(driver neo4j.Driver, person1, person2 string) error {
    session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
    defer session.Close()

    _, err := session.Run(
        "MATCH (p1:Person {name: $p1}) "+
            "MATCH (p2:Person {name: $p2}) "+
            "CREATE (p1)-[:FRIENDS_WITH]->(p2)",
        map[string]interface{}{
            "p1": person1,
            "p2": person2,
        })
    return err
}

func main() {
    driver, err := neo4j.NewDriver(
        "bolt://localhost:7687",
        neo4j.BasicAuth("neo4j", "password", ""))
    if err != nil {
        log.Fatal(err)
    }
    defer driver.Close()

    // 创建人员
    persons := []struct {
        name string
        age  int
    }{
        {"张三", 30},
        {"李四", 28},
        {"王五", 32},
    }

    for _, p := range persons {
        if err := createPerson(driver, p.name, p.age); err != nil {
            log.Fatal(err)
        }
        fmt.Printf("创建人员: %s\n", p.name)
    }

    // 创建关系
    relationships := [][2]string{
        {"张三", "李四"},
        {"李四", "王五"},
    }

    for _, rel := range relationships {
        if err := createRelationship(driver, rel[0], rel[1]); err != nil {
            log.Fatal(err)
        }
        fmt.Printf("创建关系: %s -> %s\n", rel[0], rel[1])
    }

    fmt.Println("数据创建完成!")
}

查询数据

// query_data.go
package main

import (
    "fmt"
    "log"

    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

type Friend struct {
    Name string
    Age  int
}

func getFriends(driver neo4j.Driver, personName string) ([]Friend, error) {
    session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
    defer session.Close()

    result, err := session.Run(
        "MATCH (p:Person {name: $name})-[:FRIENDS_WITH]->(friend) "+
            "RETURN friend.name as name, friend.age as age",
        map[string]interface{}{
            "name": personName,
        })
    if err != nil {
        return nil, err
    }

    var friends []Friend
    for result.Next() {
        record := result.Record()
        name, _ := record.Get("name")
        age, _ := record.Get("age")
        friends = append(friends, Friend{
            Name: name.(string),
            Age:  int(age.(int64)),
        })
    }

    if err = result.Err(); err != nil {
        return nil, err
    }

    return friends, nil
}

func getShortestPath(driver neo4j.Driver, person1, person2 string) ([]string, error) {
    session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
    defer session.Close()

    result, err := session.Run(
        "MATCH path = shortestPath((p1:Person {name: $p1})-[*]-(p2:Person {name: $p2})) "+
            "RETURN [node IN nodes(path) | node.name] as path",
        map[string]interface{}{
            "p1": person1,
            "p2": person2,
        })
    if err != nil {
        return nil, err
    }

    if result.Next() {
        record := result.Record()
        path, _ := record.Get("path")
        return path.([]string), nil
    }

    return nil, nil
}

func main() {
    driver, err := neo4j.NewDriver(
        "bolt://localhost:7687",
        neo4j.BasicAuth("neo4j", "password", ""))
    if err != nil {
        log.Fatal(err)
    }
    defer driver.Close()

    // 查询朋友
    friends, err := getFriends(driver, "张三")
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("张三的朋友:\n")
    for _, friend := range friends {
        fmt.Printf("  - %s (年龄: %d)\n", friend.Name, friend.Age)
    }

    // 查询最短路径
    path, err := getShortestPath(driver, "张三", "王五")
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("从张三到王五的路径: %v\n", path)
}

事务处理

// transaction.go
package main

import (
    "fmt"
    "log"

    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func transferMoney(driver neo4j.Driver, fromUser, toUser string, amount int) error {
    session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
    defer session.Close()

    // 使用事务
    _, err := session.WriteTransaction(func(tx neo4j.Transaction) (interface{}, error) {
        // 检查发送者余额
        result, err := tx.Run(
            "MATCH (u:User {name: $name}) RETURN u.balance as balance",
            map[string]interface{}{"name": fromUser})
        if err != nil {
            return nil, err
        }

        if !result.Next() {
            return nil, fmt.Errorf("用户不存在: %s", fromUser)
        }

        balance, _ := result.Record().Get("balance")
        if balance.(int64) < int64(amount) {
            return nil, fmt.Errorf("余额不足")
        }

        // 扣除发送者余额
        _, err = tx.Run(
            "MATCH (u:User {name: $name}) SET u.balance = u.balance - $amount",
            map[string]interface{}{
                "name":   fromUser,
                "amount": amount,
            })
        if err != nil {
            return nil, err
        }

        // 增加接收者余额
        _, err = tx.Run(
            "MATCH (u:User {name: $name}) SET u.balance = u.balance + $amount",
            map[string]interface{}{
                "name":   toUser,
                "amount": amount,
            })
        if err != nil {
            return nil, err
        }

        // 创建交易记录
        _, err = tx.Run(
            "MATCH (from:User {name: $from}), (to:User {name: $to}) "+
                "CREATE (from)-[:TRANSFERRED {amount: $amount, date: datetime()}]->(to)",
            map[string]interface{}{
                "from":   fromUser,
                "to":     toUser,
                "amount": amount,
            })
        if err != nil {
            return nil, err
        }

        return nil, nil
    })

    return err
}

func main() {
    driver, err := neo4j.NewDriver(
        "bolt://localhost:7687",
        neo4j.BasicAuth("neo4j", "password", ""))
    if err != nil {
        log.Fatal(err)
    }
    defer driver.Close()

    // 执行转账
    err = transferMoney(driver, "张三", "李四", 100)
    if err != nil {
        log.Printf("转账失败: %v\n", err)
    } else {
        fmt.Println("转账成功!")
    }
}

连接池配置

// connection_pool.go
package main

import (
    "fmt"
    "log"
    "time"

    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

func main() {
    // 配置连接池
    driver, err := neo4j.NewDriver(
        "bolt://localhost:7687",
        neo4j.BasicAuth("neo4j", "password", ""),
        func(config *neo4j.Config) {
            // 连接池大小
            config.MaxConnectionPoolSize = 50
            config.MaxTransactionRetryTime = 30 * time.Second
            config.ConnectionAcquisitionTimeout = 2 * time.Minute

            // 重试配置
            config.MaxConnectionLifetime = 1 * time.Hour
            config.ConnectionTimeout = 30 * time.Second

            // 日志
            config.Log = neo4j.ConsoleLogger(neo4j.INFO)
        })
    if err != nil {
        log.Fatal(err)
    }
    defer driver.Close()

    // 验证连接
    err = driver.VerifyConnectivity()
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("连接池状态:\n")
    fmt.Printf("  总连接数: %d\n", driver.TotalConnections())
    fmt.Printf("  空闲连接数: %d\n", driver.IdleConnections())

    // 执行查询
    session := driver.NewSession(neo4j.SessionConfig{
        AccessMode:      neo4j.AccessModeRead,
        Bookmarks:       []string{},
        DatabaseName:    "neo4j",
        FetchSize:       1000,
        ImpersonatedUser: "",
    })
    defer session.Close()

    result, err := session.Run("MATCH (n) RETURN count(n) as count", nil)
    if err != nil {
        log.Fatal(err)
    }

    if result.Next() {
        count, _ := result.Record().Get("count")
        fmt.Printf("节点总数: %d\n", count)
    }
}

使用 ORM (GORM-like) 封装

// orm_example.go
package main

import (
    "fmt"
    "log"

    "github.com/neo4j/neo4j-go-driver/v5/neo4j"
)

// Repository 封装数据库操作
type PersonRepository struct {
    driver neo4j.Driver
}

type Person struct {
    Name string
    Age  int
}

func NewPersonRepository(driver neo4j.Driver) *PersonRepository {
    return &PersonRepository{driver: driver}
}

func (r *PersonRepository) Create(name string, age int) error {
    session := r.driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
    defer session.Close()

    _, err := session.Run(
        "CREATE (p:Person {name: $name, age: $age})",
        map[string]interface{}{
            "name": name,
            "age":  age,
        })
    return err
}

func (r *PersonRepository) FindByName(name string) (*Person, error) {
    session := r.driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
    defer session.Close()

    result, err := session.Run(
        "MATCH (p:Person {name: $name}) RETURN p.name as name, p.age as age",
        map[string]interface{}{"name": name})
    if err != nil {
        return nil, err
    }

    if result.Next() {
        record := result.Record()
        name, _ := record.Get("name")
        age, _ := record.Get("age")
        return &Person{
            Name: name.(string),
            Age:  int(age.(int64)),
        }, nil
    }

    return nil, fmt.Errorf("未找到用户")
}

func (r *PersonRepository) FindAll(limit int) ([]Person, error) {
    session := r.driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
    defer session.Close()

    result, err := session.Run(
        "MATCH (p:Person) RETURN p.name as name, p.age as age LIMIT $limit",
        map[string]interface{}{"limit": limit})
    if err != nil {
        return nil, err
    }

    var persons []Person
    for result.Next() {
        record := result.Record()
        name, _ := record.Get("name")
        age, _ := record.Get("age")
        persons = append(persons, Person{
            Name: name.(string),
            Age:  int(age.(int64)),
        })
    }

    return persons, nil
}

func (r *PersonRepository) Delete(name string) error {
    session := r.driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
    defer session.Close()

    _, err := session.Run(
        "MATCH (p:Person {name: $name}) DETACH DELETE p",
        map[string]interface{}{"name": name})
    return err
}

func main() {
    driver, err := neo4j.NewDriver(
        "bolt://localhost:7687",
        neo4j.BasicAuth("neo4j", "password", ""))
    if err != nil {
        log.Fatal(err)
    }
    defer driver.Close()

    repo := NewPersonRepository(driver)

    // 创建
    err = repo.Create("张三", 30)
    if err != nil {
        log.Fatal(err)
    }

    // 查询单个
    person, err := repo.FindByName("张三")
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("找到用户: %s, 年龄: %d\n", person.Name, person.Age)

    // 查询所有
    persons, err := repo.FindAll(10)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("所有用户:\n")
    for _, p := range persons {
        fmt.Printf("  - %s (年龄: %d)\n", p.Name, p.Age)
    }

    // 删除
    err = repo.Delete("张三")
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("删除成功!")
}

最佳实践

  • 连接管理:使用连接池,避免频繁创建和销毁连接
  • 事务处理:确保数据一致性,使用事务处理复杂操作
  • 错误处理:妥善处理错误和异常情况
  • 参数化查询:使用参数化查询防止注入攻击
  • 资源释放:确保 session、result 等资源正确释放
  • 性能优化:使用批量操作、索引优化查询性能

实战示例

基于图的推荐系统

利用用户行为和物品关系构建知识图谱,实现精准推荐。

// 查找与用户相似的其他用户
MATCH (u:User {id: 'user1'})-[:RATED]->(p:Product)<-[:RATED]-(other:User)
WITH u, other, count(p) as common_products
WHERE common_products > 2
MATCH (other)-[:RATED]->(rec:Product)
WHERE NOT (u)-[:RATED]->(rec)
RETURN rec.name, avg(r.rating) as avg_rating, count(r) as rating_count
ORDER BY avg_rating DESC
LIMIT 10

欺诈检测系统

发现异常的交易模式和关联网络,识别潜在欺诈行为。

// 检测环形转账模式
MATCH (a:Account)-[t1:TRANSFER]->(b:Account)
MATCH (b)-[t2:TRANSFER]->(c:Account)
MATCH (c)-[t3:TRANSFER]->(a)
WHERE t1.amount = t2.amount AND t2.amount = t3.amount
RETURN a.id, b.id, c.id, t1.amount, t1.datetime

// 检测短时间内多笔交易
MATCH (a:Account)-[t:TRANSFER]->(b:Account)
WHERE t.datetime > datetime() - duration('PT1H')
WITH a, count(t) as transfer_count
WHERE transfer_count > 50
RETURN a.id, transfer_count

社交网络分析

分析社交网络中的影响力传播、社群发现和关键节点识别。

// 查找影响力最大的用户(使用 PageRank)
CALL algo.pageRank.stream('User', 'FOLLOWS')
YIELD nodeId, score
RETURN algo.getNodeById(nodeId).name as user, score
ORDER BY score DESC
LIMIT 10

// 发现社群(使用 Louvain 算法)
CALL algo.louvain.stream('User', 'FOLLOWS')
YIELD nodeId, community
RETURN community, count(*) as size
ORDER BY size DESC

// 查找桥梁节点(连接不同社群)
MATCH (u:User)-[:FOLLOWS]->(friend:User)
WITH u, count(DISTINCT friend.community) as community_count
WHERE community_count > 1
RETURN u.name, community_count
ORDER BY community_count DESC