1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| func (t *SyncTask) Run(ctx context.Context) (err error) { t.tr = timerecord.NewTimeRecorder("syncTask") log := t.getLogger() defer func() { if err != nil { t.HandleError(err) } }() segmentInfo, has := t.metacache.GetSegmentByID(t.segmentID) if !has { if t.pack.isDrop { log.Info("segment dropped, discard sync task") return nil } log.Warn("segment not found in metacache, may be already synced") return nil } columnGroups := t.getColumnGroups(segmentInfo) switch segmentInfo.GetStorageVersion() { case storage.StorageV2: writer := NewBulkPackWriterV2(...) t.insertBinlogs, t.deltaBinlog, t.statsBinlogs, t.bm25Binlogs, t.manifestPath, t.flushedSize, err = writer.Write(ctx, t.pack) default: writer := NewBulkPackWriter(...) t.insertBinlogs, t.deltaBinlog, t.statsBinlogs, t.bm25Binlogs, t.flushedSize, err = writer.Write(ctx, t.pack) } if err != nil { return err } metrics.DataNodeWriteDataCount.Add(float64(t.batchRows)) metrics.DataNodeFlushedSize.Add(float64(t.flushedSize)) metrics.DataNodeSave2StorageLatency.Observe(float64(t.tr.RecordSpan().Milliseconds())) if t.metaWriter != nil { err = t.writeMeta(ctx) if err != nil { return err } } t.pack.ReleaseData() actions := []metacache.SegmentAction{metacache.FinishSyncing(t.batchRows)} if columnGroups != nil { actions = append(actions, metacache.UpdateCurrentSplit(columnGroups)) } if t.pack.isFlush { actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Flushed)) } t.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(t.segmentID)) if t.pack.isDrop { t.metacache.RemoveSegments(metacache.WithSegmentIDs(t.segmentID)) } t.execTime = t.tr.ElapseSpan() log.Info("task done", zap.Int64("flushedSize", t.flushedSize), zap.Duration("timeTaken", t.execTime)) return nil }
|