1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| func getServiceWithChannel(...) (*DataSyncService, error) { fg := flowgraph.NewTimeTickedFlowGraph(params.Ctx) nodeList := []flowgraph.Node{} dmStreamNode := newDmInputNode(config, input) nodeList = append(nodeList, dmStreamNode) ddNode := newDDNode( params.Ctx, collectionID, channelName, info.GetVchan().GetDroppedSegmentIds(), flushed, unflushed, params.MsgHandler, ) nodeList = append(nodeList, ddNode) if len(info.GetSchema().GetFunctions()) > 0 { emNode, err := newEmbeddingNode(channelName, config.metacache) if err != nil { return nil, err } nodeList = append(nodeList, emNode) } writeNode, err := newWriteNode(params.Ctx, params.WriteBufferManager, ds.timetickSender, config) if err != nil { return nil, err } nodeList = append(nodeList, writeNode) ttNode := newTTNode(config, params.WriteBufferManager, params.CheckpointUpdater) nodeList = append(nodeList, ttNode) if err := fg.AssembleNodes(nodeList...); err != nil { return nil, err } ds.fg = fg err = params.WriteBufferManager.Register(channelName, metacache, writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(params.Broker, config.serverID)), writebuffer.WithIDAllocator(params.Allocator), writebuffer.WithTaskObserverCallback(wbTaskObserverCallback)) return ds, nil }
|