- Updated pub/sub iterator methods from `asyncIterableIterator` to `asyncIterator` in multiple schema files for improved compatibility with the latest GraphQL subscriptions. - Refactored subscription logic in CollaborationSession, Document, Message, User, and other schema files to enhance readability and maintainability. - Adjusted imports in GraphQL builder to utilize RedisPubSub for better performance in subscription handling.
497 lines
16 KiB
TypeScript
497 lines
16 KiB
TypeScript
import { Inject, Injectable, Logger } from '@nestjs/common'
|
|
import { Document } from '@prisma/client'
|
|
import { Pothos, PothosRef, PothosSchema, SchemaBuilderToken } from '@smatch-corp/nestjs-pothos'
|
|
import Delta from 'quill-delta'
|
|
import { MinioService } from 'src/Minio/minio.service'
|
|
import { Builder, SchemaContext } from '../Graphql/graphql.builder'
|
|
import { PrismaService } from '../Prisma/prisma.service'
|
|
import { DocumentEvent } from './document.event'
|
|
import { DocumentDelta } from './document.type'
|
|
@Injectable()
|
|
export class DocumentSchema extends PothosSchema {
|
|
constructor(
|
|
@Inject(SchemaBuilderToken) private readonly builder: Builder,
|
|
private readonly prisma: PrismaService,
|
|
private readonly minio: MinioService,
|
|
) {
|
|
super()
|
|
}
|
|
|
|
@PothosRef()
|
|
document() {
|
|
return this.builder.prismaObject('Document', {
|
|
fields: (t) => ({
|
|
id: t.exposeID('id'),
|
|
name: t.exposeString('name'),
|
|
fileUrl: t.exposeString('fileUrl'),
|
|
createdAt: t.expose('createdAt', { type: 'DateTime' }),
|
|
updatedAt: t.expose('updatedAt', { type: 'DateTime' }),
|
|
owner: t.relation('owner'),
|
|
ownerId: t.exposeID('ownerId'),
|
|
collaborators: t.relation('collaborators'),
|
|
isPublic: t.exposeBoolean('isPublic'),
|
|
}),
|
|
})
|
|
}
|
|
|
|
@PothosRef()
|
|
documentCollaborator() {
|
|
return this.builder.prismaObject('DocumentCollaborator', {
|
|
fields: (t) => ({
|
|
documentId: t.exposeID('documentId', { nullable: false }),
|
|
userId: t.exposeID('userId', { nullable: false }),
|
|
document: t.relation('document', { nullable: false }),
|
|
user: t.relation('user', { nullable: false }),
|
|
readable: t.exposeBoolean('readable', { nullable: false }),
|
|
writable: t.exposeBoolean('writable', { nullable: false }),
|
|
}),
|
|
})
|
|
}
|
|
|
|
@PothosRef()
|
|
documentDelta() {
|
|
return this.builder.simpleObject('DocumentDelta', {
|
|
fields: (t) => ({
|
|
eventType: t.string(),
|
|
documentId: t.string({
|
|
nullable: true,
|
|
}),
|
|
pageIndex: t.int({
|
|
nullable: true,
|
|
}),
|
|
delta: t.field({
|
|
type: 'Delta',
|
|
nullable: true,
|
|
}),
|
|
senderId: t.string({
|
|
nullable: true,
|
|
}),
|
|
requestSync: t.boolean({
|
|
nullable: true,
|
|
}),
|
|
totalPage: t.int({
|
|
nullable: true,
|
|
}),
|
|
}),
|
|
})
|
|
}
|
|
|
|
@PothosRef()
|
|
documentDeltaInput() {
|
|
return this.builder.inputType('DocumentDeltaInput', {
|
|
fields: (t) => ({
|
|
documentId: t.string(),
|
|
pageIndex: t.int(),
|
|
delta: t.field({ type: 'Delta' }),
|
|
}),
|
|
})
|
|
}
|
|
|
|
@Pothos()
|
|
init(): void {
|
|
this.builder.queryFields((t) => ({
|
|
myDocuments: t.prismaField({
|
|
type: [this.document()],
|
|
args: this.builder.generator.findManyArgs('Document'),
|
|
resolve: async (query, _parent, args, ctx: SchemaContext) => {
|
|
if (ctx.isSubscription) {
|
|
throw new Error('Not allowed')
|
|
}
|
|
if (!ctx.http?.me?.id) {
|
|
throw new Error('User not found')
|
|
}
|
|
return await this.prisma.document.findMany({
|
|
...query,
|
|
orderBy: args.orderBy ?? undefined,
|
|
where: {
|
|
OR: [{ ownerId: ctx.http.me.id }, { collaborators: { some: { userId: ctx.http.me.id } } }],
|
|
},
|
|
})
|
|
},
|
|
}),
|
|
document: t.prismaField({
|
|
type: this.document(),
|
|
args: this.builder.generator.findUniqueArgs('Document'),
|
|
resolve: async (query, _root, args) => {
|
|
return await this.prisma.document.findUnique({
|
|
...query,
|
|
where: args.where,
|
|
})
|
|
},
|
|
}),
|
|
|
|
documents: t.prismaField({
|
|
type: [this.document()],
|
|
args: this.builder.generator.findManyArgs('Document'),
|
|
resolve: async (query, _root, args) => {
|
|
return await this.prisma.document.findMany({
|
|
...query,
|
|
skip: args.skip ?? undefined,
|
|
take: args.take ?? undefined,
|
|
orderBy: args.orderBy ?? undefined,
|
|
where: args.filter ?? undefined,
|
|
})
|
|
},
|
|
}),
|
|
newDocument: t.field({
|
|
type: this.document(),
|
|
args: {},
|
|
resolve: async (query, _args, ctx, _info) => {
|
|
if (ctx.isSubscription) {
|
|
throw new Error('Not allowed')
|
|
}
|
|
const userId = ctx.http?.me?.id
|
|
if (!userId) {
|
|
throw new Error('User not found')
|
|
}
|
|
// const fileUrl = await this.minio.getFileUrl('document', 'document', 'document')
|
|
// if (!fileUrl) throw new Error('File not found')
|
|
const document = await this.prisma.document.create({
|
|
...query,
|
|
data: {
|
|
name: 'Untitled',
|
|
fileUrl: '', // fileUrl,
|
|
ownerId: userId,
|
|
},
|
|
})
|
|
return document
|
|
},
|
|
}),
|
|
eventDocumentClientRequestSync: t.field({
|
|
type: this.documentDelta(),
|
|
args: {
|
|
documentId: t.arg({ type: 'String', required: true }),
|
|
pageIndex: t.arg({ type: 'Int', required: true }),
|
|
},
|
|
resolve: async (_, args, ctx: SchemaContext) => {
|
|
if (ctx.isSubscription) {
|
|
throw new Error('Not allowed')
|
|
}
|
|
if (!ctx.http?.me?.id) {
|
|
throw new Error('User not found')
|
|
}
|
|
if (!args.documentId) {
|
|
throw new Error('Document id not found')
|
|
}
|
|
if (args.pageIndex === undefined || args.pageIndex === null) {
|
|
throw new Error('Page index not found')
|
|
}
|
|
let delta = null
|
|
try {
|
|
delta = await this.minio.getDocumentPage(args.documentId, args.pageIndex)
|
|
} catch (_error) {}
|
|
const totalPage = await this.minio.countDocumentPages(args.documentId)
|
|
return {
|
|
documentId: args.documentId,
|
|
pageIndex: args.pageIndex,
|
|
delta,
|
|
totalPage,
|
|
senderId: ctx.http?.me?.id,
|
|
eventType: DocumentEvent.CLIENT_REQUEST_SYNC,
|
|
}
|
|
},
|
|
}),
|
|
}))
|
|
|
|
this.builder.mutationFields((t) => ({
|
|
createDocument: t.prismaField({
|
|
type: this.document(),
|
|
args: {
|
|
input: t.arg({
|
|
type: this.builder.generator.getCreateInput('Document', [
|
|
'id',
|
|
'ownerId',
|
|
'createdAt',
|
|
'updatedAt',
|
|
'collaborators',
|
|
'owner',
|
|
'fileUrl',
|
|
'previewImageUrl',
|
|
'name',
|
|
]),
|
|
required: false,
|
|
}),
|
|
},
|
|
resolve: async (query, _parent, args, ctx: SchemaContext) => {
|
|
if (ctx.isSubscription) {
|
|
throw new Error('Not allowed')
|
|
}
|
|
const userId = ctx.http?.me?.id
|
|
if (!userId) {
|
|
throw new Error('Unauthorized')
|
|
}
|
|
return await this.prisma.document.create({
|
|
...query,
|
|
data: {
|
|
...args.input,
|
|
name: args.input?.name ?? 'Untitled',
|
|
fileUrl: '',
|
|
owner: {
|
|
connect: {
|
|
id: userId,
|
|
},
|
|
},
|
|
},
|
|
})
|
|
},
|
|
}),
|
|
|
|
eventDocumentChanged: t.field({
|
|
type: this.documentDelta(),
|
|
args: {
|
|
data: t.arg({
|
|
type: this.documentDeltaInput(),
|
|
required: true,
|
|
}),
|
|
},
|
|
resolve: async (_, args, ctx: SchemaContext) => {
|
|
if (ctx.isSubscription) {
|
|
throw new Error('Not allowed')
|
|
}
|
|
const {
|
|
http: { pubSub },
|
|
} = ctx
|
|
const senderId = ctx.http?.me?.id
|
|
if (!senderId) {
|
|
throw new Error('User not found')
|
|
}
|
|
pubSub.publish(`${DocumentEvent.CHANGED}.${args.data.documentId}`, {
|
|
...args.data,
|
|
senderId,
|
|
})
|
|
return args.data
|
|
},
|
|
}),
|
|
|
|
eventDocumentServerRequestSync: t.field({
|
|
type: this.documentDelta(),
|
|
args: {
|
|
data: t.arg({ type: this.documentDeltaInput(), required: true }),
|
|
},
|
|
resolve: async (_, args, ctx: SchemaContext) => {
|
|
if (ctx.isSubscription) {
|
|
throw new Error('Not allowed')
|
|
}
|
|
const senderId = ctx.http?.me?.id
|
|
if (!args.data.documentId) {
|
|
throw new Error('Document id not found')
|
|
}
|
|
if (!senderId) {
|
|
throw new Error('User not found')
|
|
}
|
|
if (args.data.pageIndex === undefined || args.data.pageIndex === null) {
|
|
throw new Error('Page index not found')
|
|
}
|
|
// save delta to minio
|
|
const delta = args.data.delta
|
|
if (!delta) {
|
|
throw new Error('Delta not found')
|
|
}
|
|
await this.minio.upsertDocumentPage(args.data.documentId, args.data.pageIndex, delta)
|
|
const totalPage = await this.minio.countDocumentPages(args.data.documentId)
|
|
return {
|
|
...args.data,
|
|
totalPage,
|
|
senderId: 'server',
|
|
eventType: DocumentEvent.SERVER_REQUEST_SYNC,
|
|
}
|
|
},
|
|
}),
|
|
updateDocument: t.prismaField({
|
|
type: this.document(),
|
|
args: {
|
|
documentId: t.arg({ type: 'String', required: true }),
|
|
data: t.arg({
|
|
type: this.builder.generator.getUpdateInput('Document', [
|
|
'id',
|
|
'ownerId',
|
|
'createdAt',
|
|
'updatedAt',
|
|
'collaborationSessionId',
|
|
'collaborationSession',
|
|
'owner',
|
|
'fileUrl',
|
|
'previewImageUrl',
|
|
]),
|
|
required: true,
|
|
}),
|
|
},
|
|
resolve: async (query, _parent, args, ctx: SchemaContext) => {
|
|
if (ctx.isSubscription) {
|
|
throw new Error('Not allowed')
|
|
}
|
|
if (!ctx.http?.me?.id) {
|
|
throw new Error('Unauthorized')
|
|
}
|
|
// check if user is owner or collaborator
|
|
const document = await this.prisma.document.findUnique({
|
|
where: { id: args.documentId },
|
|
include: {
|
|
collaborators: true,
|
|
},
|
|
})
|
|
if (!document) {
|
|
throw new Error('Document not found')
|
|
}
|
|
if (
|
|
!document.isPublic &&
|
|
!document.collaborators.some((c) => c.userId === ctx.http?.me?.id && c.writable) &&
|
|
document.ownerId !== ctx.http?.me?.id
|
|
) {
|
|
throw new Error('User is not owner or collaborator of document')
|
|
}
|
|
return await this.prisma.document.update({
|
|
...query,
|
|
where: { id: args.documentId },
|
|
data: args.data,
|
|
})
|
|
},
|
|
}),
|
|
addCollaborator: t.prismaField({
|
|
type: this.documentCollaborator(),
|
|
args: {
|
|
documentId: t.arg({ type: 'String', required: true }),
|
|
userId: t.arg({ type: 'String', required: true }),
|
|
readable: t.arg({ type: 'Boolean', required: true }),
|
|
writable: t.arg({ type: 'Boolean', required: true }),
|
|
},
|
|
resolve: async (_, __, args, ctx: SchemaContext) => {
|
|
if (ctx.isSubscription) {
|
|
throw new Error('Not allowed')
|
|
}
|
|
// check if ctx user is owner of document
|
|
const document = await this.prisma.document.findUnique({
|
|
where: { id: args.documentId },
|
|
})
|
|
if (!document) {
|
|
throw new Error('Document not found')
|
|
}
|
|
if (document.ownerId !== ctx.http?.me?.id) {
|
|
throw new Error('User is not owner of document')
|
|
}
|
|
return await this.prisma.documentCollaborator.create({
|
|
data: {
|
|
documentId: args.documentId,
|
|
userId: args.userId,
|
|
readable: args.readable,
|
|
writable: args.writable,
|
|
},
|
|
})
|
|
},
|
|
}),
|
|
removeCollaborator: t.prismaField({
|
|
type: this.documentCollaborator(),
|
|
args: {
|
|
documentId: t.arg({ type: 'String', required: true }),
|
|
userId: t.arg({ type: 'String', required: true }),
|
|
},
|
|
resolve: async (_, __, args, ctx: SchemaContext) => {
|
|
if (ctx.isSubscription) {
|
|
throw new Error('Not allowed')
|
|
}
|
|
// check if ctx user is owner of document
|
|
const document = await this.prisma.document.findUnique({
|
|
where: { id: args.documentId },
|
|
})
|
|
if (!document) {
|
|
throw new Error('Document not found')
|
|
}
|
|
if (document.ownerId !== ctx.http?.me?.id) {
|
|
throw new Error('User is not owner of document')
|
|
}
|
|
return await this.prisma.documentCollaborator.delete({
|
|
where: { documentId_userId: { documentId: args.documentId, userId: args.userId } },
|
|
})
|
|
},
|
|
}),
|
|
editCollaboratorPermission: t.prismaField({
|
|
type: this.documentCollaborator(),
|
|
args: {
|
|
documentId: t.arg({ type: 'String', required: true }),
|
|
userId: t.arg({ type: 'String', required: true }),
|
|
readable: t.arg({ type: 'Boolean', required: true }),
|
|
writable: t.arg({ type: 'Boolean', required: true }),
|
|
},
|
|
resolve: async (_, __, args, ctx: SchemaContext) => {
|
|
if (ctx.isSubscription) {
|
|
throw new Error('Not allowed')
|
|
}
|
|
// check if ctx user is owner of document
|
|
const document = await this.prisma.document.findUnique({
|
|
where: { id: args.documentId },
|
|
})
|
|
if (!document) {
|
|
throw new Error('Document not found')
|
|
}
|
|
if (document.ownerId !== ctx.http?.me?.id) {
|
|
throw new Error('User is not owner of document')
|
|
}
|
|
return await this.prisma.documentCollaborator.update({
|
|
where: { documentId_userId: { documentId: args.documentId, userId: args.userId } },
|
|
data: { readable: args.readable, writable: args.writable },
|
|
})
|
|
},
|
|
}),
|
|
}))
|
|
|
|
this.builder.subscriptionFields((t) => ({
|
|
document: t.field({
|
|
type: this.documentDelta(),
|
|
args: {
|
|
documentId: t.arg({
|
|
type: 'String',
|
|
required: true,
|
|
}),
|
|
},
|
|
subscribe: async (_, args, ctx: SchemaContext) => {
|
|
if (!ctx.isSubscription) {
|
|
throw new Error('Not allowed')
|
|
}
|
|
const documentId = args.documentId
|
|
// check user permission
|
|
const document = await this.prisma.document.findUnique({
|
|
where: { id: documentId },
|
|
include: {
|
|
collaborators: true,
|
|
},
|
|
})
|
|
if (!document) {
|
|
throw new Error('Document not found')
|
|
}
|
|
if (!document.isPublic) {
|
|
if (
|
|
document.ownerId !== ctx.websocket?.me?.id &&
|
|
!document.collaborators.some((c) => c.userId === ctx.websocket?.me?.id && c.writable && c.readable)
|
|
) {
|
|
throw new Error('User is not owner or collaborator of document')
|
|
}
|
|
}
|
|
return ctx.websocket.pubSub.asyncIterator([
|
|
`${DocumentEvent.CHANGED}.${documentId}`,
|
|
`${DocumentEvent.DELETED}.${documentId}`,
|
|
`${DocumentEvent.SAVED}.${documentId}`,
|
|
`${DocumentEvent.PAGE_CREATED}.${documentId}`,
|
|
`${DocumentEvent.PAGE_DELETED}.${documentId}`,
|
|
`${DocumentEvent.ACTIVE_DOCUMENT_ID_CHANGED}.${documentId}`,
|
|
]) as unknown as AsyncIterable<DocumentDelta>
|
|
},
|
|
resolve: async (payload: DocumentDelta, _args, ctx: SchemaContext) => {
|
|
if (!ctx.isSubscription) {
|
|
throw new Error('Not allowed')
|
|
}
|
|
if (!payload.requestSync) {
|
|
// using randomize sync mechanism to avoid performance issue
|
|
const random = Math.random()
|
|
// 0.5% chance to request sync
|
|
if (random <= 0.005) {
|
|
payload.requestSync = true
|
|
return payload
|
|
}
|
|
}
|
|
return payload
|
|
},
|
|
}),
|
|
}))
|
|
}
|
|
}
|