Python 连接 Neo4j
Neo4j 官方提供了 Python 驱动程序 `neo4j-driver`,支持异步和同步操作。
安装驱动
# 使用 pip 安装
pip install neo4j
# 或使用 conda
conda install -c conda-forge neo4j-python-driver
基础连接示例
# basic_connection.py
from neo4j import GraphDatabase
class Neo4jConnection:
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
self.driver.close()
def query(self, query, parameters=None):
with self.driver.session() as session:
result = session.run(query, parameters)
return [record for record in result]
# 使用示例
if __name__ == "__main__":
conn = Neo4jConnection(
uri="bolt://localhost:7687",
user="neo4j",
password="your_password"
)
# 执行查询
result = conn.query("MATCH (n) RETURN n LIMIT 5")
for record in result:
print(record)
conn.close()
创建节点和关系
# create_data.py
from neo4j import GraphDatabase
def create_person(tx, name, age):
result = tx.run(
"CREATE (p:Person {name: $name, age: $age}) "
"RETURN p",
name=name, age=age
)
return result.single()[0]
def create_relationship(tx, person1, person2):
result = tx.run(
"MATCH (p1:Person {name: $person1}) "
"MATCH (p2:Person {name: $person2}) "
"CREATE (p1)-[:FRIENDS_WITH]->(p2) "
"RETURN p1, p2",
person1=person1, person2=person2
)
return result.single()
# 使用示例
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
# 创建人员
session.execute_write(create_person, "张三", 30)
session.execute_write(create_person, "李四", 28)
session.execute_write(create_person, "王五", 32)
# 创建关系
session.execute_write(create_relationship, "张三", "李四")
session.execute_write(create_relationship, "李四", "王五")
driver.close()
查询数据
# query_data.py
from neo4j import GraphDatabase
def get_friends(tx, person_name):
result = tx.run(
"MATCH (p:Person {name: $name})-[:FRIENDS_WITH]->(friend) "
"RETURN friend.name as friend_name, friend.age as friend_age",
name=person_name
)
return [{"name": record["friend_name"], "age": record["friend_age"]}
for record in result]
def get_shortest_path(tx, person1, person2):
result = tx.run(
"MATCH path = shortestPath((p1:Person {name: $p1})-[*]-(p2:Person {name: $p2})) "
"RETURN [node IN nodes(path) | node.name] as path",
p1=person1, p2=person2
)
return result.single()["path"]
# 使用示例
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
# 查询朋友
friends = session.execute_read(get_friends, "张三")
print(f"张三的朋友: {friends}")
# 查询最短路径
path = session.execute_read(get_shortest_path, "张三", "王五")
print(f"从张三到王五的路径: {path}")
driver.close()
异步操作示例
# async_example.py
from neo4j import AsyncGraphDatabase
class AsyncNeo4jConnection:
def __init__(self, uri, user, password):
self.driver = AsyncGraphDatabase.driver(uri, auth=(user, password))
async def close(self):
await self.driver.close()
async def query(self, query, parameters=None):
async with self.driver.session() as session:
result = await session.run(query, parameters)
return [record async for record in result]
async def batch_create(self, persons):
async with self.driver.session() as session:
for person in persons:
await session.run(
"CREATE (p:Person {name: $name, age: $age})",
name=person["name"], age=person["age"]
)
# 使用示例
import asyncio
async def main():
conn = AsyncNeo4jConnection(
uri="bolt://localhost:7687",
user="neo4j",
password="password"
)
# 批量创建
persons = [
{"name": "Alice", "age": 25},
{"name": "Bob", "age": 30},
{"name": "Charlie", "age": 28}
]
await conn.batch_create(persons)
# 查询数据
result = await conn.query("MATCH (p:Person) RETURN p.name as name")
for record in result:
print(record["name"])
await conn.close()
asyncio.run(main())
使用 OGM (Object Graph Mapper)
# 安装 neomodel
# pip install neomodel
# ogm_example.py
from neomodel import StructuredNode, StringProperty, IntegerProperty, RelationshipTo, config
# 配置连接
config.DATABASE_URL = 'bolt://neo4j:password@localhost:7687'
class Person(StructuredNode):
name = StringProperty(unique_index=True, required=True)
age = IntegerProperty()
friends = RelationshipTo('Person', 'FRIENDS_WITH')
# 使用示例
# 创建节点
person1 = Person(name='张三', age=30).save()
person2 = Person(name='李四', age=28).save()
# 创建关系
person1.friends.connect(person2)
# 查询数据
person = Person.nodes.get(name='张三')
print(f"{person.name} 的年龄: {person.age}")
# 查询朋友
for friend in person.friends:
print(f"朋友: {friend.name}, 年龄: {friend.age}")
# 批量查询
young_people = Person.nodes.filter(age__lt=30)
for person in young_people:
print(f"年轻人: {person.name}")
Golang 连接 Neo4j
Neo4j 官方提供了 Go 驱动程序 `github.com/neo4j/neo4j-go-driver/v5`,支持完整的功能。
安装驱动
# 使用 go get 安装
go get github.com/neo4j/neo4j-go-driver/v5/neo4j
基础连接示例
// basic_connection.go
package main
import (
"fmt"
"log"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
func main() {
driver, err := neo4j.NewDriver(
"bolt://localhost:7687",
neo4j.BasicAuth("neo4j", "password", ""))
if err != nil {
log.Fatal(err)
}
defer driver.Close()
// 验证连接
err = driver.VerifyConnectivity()
if err != nil {
log.Fatal(err)
}
fmt.Println("连接成功!")
// 执行查询
session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
defer session.Close()
result, err := session.Run("MATCH (n) RETURN n LIMIT 5", nil)
if err != nil {
log.Fatal(err)
}
for result.Next() {
record := result.Record()
fmt.Println(record)
}
if err = result.Err(); err != nil {
log.Fatal(err)
}
}
创建节点和关系
// create_data.go
package main
import (
"fmt"
"log"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
func createPerson(driver neo4j.Driver, name string, age int) error {
session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close()
_, err := session.Run(
"CREATE (p:Person {name: $name, age: $age})",
map[string]interface{}{
"name": name,
"age": age,
})
return err
}
func createRelationship(driver neo4j.Driver, person1, person2 string) error {
session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close()
_, err := session.Run(
"MATCH (p1:Person {name: $p1}) "+
"MATCH (p2:Person {name: $p2}) "+
"CREATE (p1)-[:FRIENDS_WITH]->(p2)",
map[string]interface{}{
"p1": person1,
"p2": person2,
})
return err
}
func main() {
driver, err := neo4j.NewDriver(
"bolt://localhost:7687",
neo4j.BasicAuth("neo4j", "password", ""))
if err != nil {
log.Fatal(err)
}
defer driver.Close()
// 创建人员
persons := []struct {
name string
age int
}{
{"张三", 30},
{"李四", 28},
{"王五", 32},
}
for _, p := range persons {
if err := createPerson(driver, p.name, p.age); err != nil {
log.Fatal(err)
}
fmt.Printf("创建人员: %s\n", p.name)
}
// 创建关系
relationships := [][2]string{
{"张三", "李四"},
{"李四", "王五"},
}
for _, rel := range relationships {
if err := createRelationship(driver, rel[0], rel[1]); err != nil {
log.Fatal(err)
}
fmt.Printf("创建关系: %s -> %s\n", rel[0], rel[1])
}
fmt.Println("数据创建完成!")
}
查询数据
// query_data.go
package main
import (
"fmt"
"log"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
type Friend struct {
Name string
Age int
}
func getFriends(driver neo4j.Driver, personName string) ([]Friend, error) {
session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
defer session.Close()
result, err := session.Run(
"MATCH (p:Person {name: $name})-[:FRIENDS_WITH]->(friend) "+
"RETURN friend.name as name, friend.age as age",
map[string]interface{}{
"name": personName,
})
if err != nil {
return nil, err
}
var friends []Friend
for result.Next() {
record := result.Record()
name, _ := record.Get("name")
age, _ := record.Get("age")
friends = append(friends, Friend{
Name: name.(string),
Age: int(age.(int64)),
})
}
if err = result.Err(); err != nil {
return nil, err
}
return friends, nil
}
func getShortestPath(driver neo4j.Driver, person1, person2 string) ([]string, error) {
session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
defer session.Close()
result, err := session.Run(
"MATCH path = shortestPath((p1:Person {name: $p1})-[*]-(p2:Person {name: $p2})) "+
"RETURN [node IN nodes(path) | node.name] as path",
map[string]interface{}{
"p1": person1,
"p2": person2,
})
if err != nil {
return nil, err
}
if result.Next() {
record := result.Record()
path, _ := record.Get("path")
return path.([]string), nil
}
return nil, nil
}
func main() {
driver, err := neo4j.NewDriver(
"bolt://localhost:7687",
neo4j.BasicAuth("neo4j", "password", ""))
if err != nil {
log.Fatal(err)
}
defer driver.Close()
// 查询朋友
friends, err := getFriends(driver, "张三")
if err != nil {
log.Fatal(err)
}
fmt.Printf("张三的朋友:\n")
for _, friend := range friends {
fmt.Printf(" - %s (年龄: %d)\n", friend.Name, friend.Age)
}
// 查询最短路径
path, err := getShortestPath(driver, "张三", "王五")
if err != nil {
log.Fatal(err)
}
fmt.Printf("从张三到王五的路径: %v\n", path)
}
事务处理
// transaction.go
package main
import (
"fmt"
"log"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
func transferMoney(driver neo4j.Driver, fromUser, toUser string, amount int) error {
session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close()
// 使用事务
_, err := session.WriteTransaction(func(tx neo4j.Transaction) (interface{}, error) {
// 检查发送者余额
result, err := tx.Run(
"MATCH (u:User {name: $name}) RETURN u.balance as balance",
map[string]interface{}{"name": fromUser})
if err != nil {
return nil, err
}
if !result.Next() {
return nil, fmt.Errorf("用户不存在: %s", fromUser)
}
balance, _ := result.Record().Get("balance")
if balance.(int64) < int64(amount) {
return nil, fmt.Errorf("余额不足")
}
// 扣除发送者余额
_, err = tx.Run(
"MATCH (u:User {name: $name}) SET u.balance = u.balance - $amount",
map[string]interface{}{
"name": fromUser,
"amount": amount,
})
if err != nil {
return nil, err
}
// 增加接收者余额
_, err = tx.Run(
"MATCH (u:User {name: $name}) SET u.balance = u.balance + $amount",
map[string]interface{}{
"name": toUser,
"amount": amount,
})
if err != nil {
return nil, err
}
// 创建交易记录
_, err = tx.Run(
"MATCH (from:User {name: $from}), (to:User {name: $to}) "+
"CREATE (from)-[:TRANSFERRED {amount: $amount, date: datetime()}]->(to)",
map[string]interface{}{
"from": fromUser,
"to": toUser,
"amount": amount,
})
if err != nil {
return nil, err
}
return nil, nil
})
return err
}
func main() {
driver, err := neo4j.NewDriver(
"bolt://localhost:7687",
neo4j.BasicAuth("neo4j", "password", ""))
if err != nil {
log.Fatal(err)
}
defer driver.Close()
// 执行转账
err = transferMoney(driver, "张三", "李四", 100)
if err != nil {
log.Printf("转账失败: %v\n", err)
} else {
fmt.Println("转账成功!")
}
}
连接池配置
// connection_pool.go
package main
import (
"fmt"
"log"
"time"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
func main() {
// 配置连接池
driver, err := neo4j.NewDriver(
"bolt://localhost:7687",
neo4j.BasicAuth("neo4j", "password", ""),
func(config *neo4j.Config) {
// 连接池大小
config.MaxConnectionPoolSize = 50
config.MaxTransactionRetryTime = 30 * time.Second
config.ConnectionAcquisitionTimeout = 2 * time.Minute
// 重试配置
config.MaxConnectionLifetime = 1 * time.Hour
config.ConnectionTimeout = 30 * time.Second
// 日志
config.Log = neo4j.ConsoleLogger(neo4j.INFO)
})
if err != nil {
log.Fatal(err)
}
defer driver.Close()
// 验证连接
err = driver.VerifyConnectivity()
if err != nil {
log.Fatal(err)
}
fmt.Printf("连接池状态:\n")
fmt.Printf(" 总连接数: %d\n", driver.TotalConnections())
fmt.Printf(" 空闲连接数: %d\n", driver.IdleConnections())
// 执行查询
session := driver.NewSession(neo4j.SessionConfig{
AccessMode: neo4j.AccessModeRead,
Bookmarks: []string{},
DatabaseName: "neo4j",
FetchSize: 1000,
ImpersonatedUser: "",
})
defer session.Close()
result, err := session.Run("MATCH (n) RETURN count(n) as count", nil)
if err != nil {
log.Fatal(err)
}
if result.Next() {
count, _ := result.Record().Get("count")
fmt.Printf("节点总数: %d\n", count)
}
}
使用 ORM (GORM-like) 封装
// orm_example.go
package main
import (
"fmt"
"log"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
// Repository 封装数据库操作
type PersonRepository struct {
driver neo4j.Driver
}
type Person struct {
Name string
Age int
}
func NewPersonRepository(driver neo4j.Driver) *PersonRepository {
return &PersonRepository{driver: driver}
}
func (r *PersonRepository) Create(name string, age int) error {
session := r.driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close()
_, err := session.Run(
"CREATE (p:Person {name: $name, age: $age})",
map[string]interface{}{
"name": name,
"age": age,
})
return err
}
func (r *PersonRepository) FindByName(name string) (*Person, error) {
session := r.driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
defer session.Close()
result, err := session.Run(
"MATCH (p:Person {name: $name}) RETURN p.name as name, p.age as age",
map[string]interface{}{"name": name})
if err != nil {
return nil, err
}
if result.Next() {
record := result.Record()
name, _ := record.Get("name")
age, _ := record.Get("age")
return &Person{
Name: name.(string),
Age: int(age.(int64)),
}, nil
}
return nil, fmt.Errorf("未找到用户")
}
func (r *PersonRepository) FindAll(limit int) ([]Person, error) {
session := r.driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
defer session.Close()
result, err := session.Run(
"MATCH (p:Person) RETURN p.name as name, p.age as age LIMIT $limit",
map[string]interface{}{"limit": limit})
if err != nil {
return nil, err
}
var persons []Person
for result.Next() {
record := result.Record()
name, _ := record.Get("name")
age, _ := record.Get("age")
persons = append(persons, Person{
Name: name.(string),
Age: int(age.(int64)),
})
}
return persons, nil
}
func (r *PersonRepository) Delete(name string) error {
session := r.driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close()
_, err := session.Run(
"MATCH (p:Person {name: $name}) DETACH DELETE p",
map[string]interface{}{"name": name})
return err
}
func main() {
driver, err := neo4j.NewDriver(
"bolt://localhost:7687",
neo4j.BasicAuth("neo4j", "password", ""))
if err != nil {
log.Fatal(err)
}
defer driver.Close()
repo := NewPersonRepository(driver)
// 创建
err = repo.Create("张三", 30)
if err != nil {
log.Fatal(err)
}
// 查询单个
person, err := repo.FindByName("张三")
if err != nil {
log.Fatal(err)
}
fmt.Printf("找到用户: %s, 年龄: %d\n", person.Name, person.Age)
// 查询所有
persons, err := repo.FindAll(10)
if err != nil {
log.Fatal(err)
}
fmt.Printf("所有用户:\n")
for _, p := range persons {
fmt.Printf(" - %s (年龄: %d)\n", p.Name, p.Age)
}
// 删除
err = repo.Delete("张三")
if err != nil {
log.Fatal(err)
}
fmt.Println("删除成功!")
}