1.SPARK-38864 - Spark支持unpivot源码分析
2.PyTorch 源码解读之 torch.utils.data:解析数据处理全流程
3.3. torch.utils里需要掌握的函数
4.MMDetection3D之DETR3D源码解析:整体流程篇
5.U-Net代码解读python[每周一篇]Week1-U-Net
6.PyTorch - DataLoader 源码解析(一)
SPARK-38864 - Spark支持unpivot源码分析
unpivot是数据库系统中用于列转行的内置函数,如SQL SERVER, Oracle等。以数据集tb1为例,每个数字代表某个人在某个学科的成绩。若要将此表扩展为三元组,可使用union实现。源码交易的前景但随列数增加,SQL语句变长。许多SQL引擎提供内置函数unpivot简化此过程。unpivot使用时需指定保留列、进行转行的列、新列名及值列名。
SPARK从SPARK-版本开始支持DataSet的unpivot函数,逐步扩展至pyspark与SQL。在Dataset API中,ids为要保留的Column数组,Column类提供了从String构造Column的隐式转换,方便使用。利用此API,可通过unpivot函数将数据集转换为所需的三元组。values表示转行列,variableColumnName为新列名,valueColumnName为值列名。
Analyser阶段解析unpivot算子,将逻辑执行计划转化为物理执行计划。当用户开启hive catalog,SPARK SQL根据表名和metastore URL查找表元数据,转化为Hive相关逻辑执行计划。物理执行计划如BroadcastHashJoinExec,表示具体的执行策略。规则ResolveUnpivot将包含unpivot的算子转换为Expand算子,在物理执行计划阶段执行。此转换由开发者自定义规则完成,通过遍历逻辑执行计划树,根据节点类型及状态进行不同处理。
unpivot函数实现过程中,easytalk系统源码首先将原始数据集投影为包含ids、variableColumnName、valueColumnName的列,实现语义转换。随后,通过map函数处理values列,构建新的行数据,最终返回Expand算子。在物理执行计划阶段,Expand算子将数据转换为所需形式,实现unpivot功能。
综上所述,SPARK内置函数unpivot的实现通过解析列参数,组装Expand算子完成,为用户提供简便的列转行功能。通过理解此过程,可深入掌握SPARK SQL的开发原理与内在机制。
PyTorch 源码解读之 torch.utils.data:解析数据处理全流程
文@ 目录 0 前言 1 Dataset 1.1 Map-style dataset 1.2 Iterable-style dataset 1.3 其他 dataset 2 Sampler 3 DataLoader 3.1 三者关系 (Dataset, Sampler, Dataloader) 3.2 批处理 3.2.1 自动批处理(默认) 3.2.2 关闭自动批处理 3.2.3 collate_fn 3.3 多进程处理 (multi-process) 4 单进程 5 多进程 6 锁页内存 (Memory Pinning) 7 预取 (prefetch) 8 代码讲解 0 前言 本文以 PyTorch 1.7 版本为例,解析 torch.utils.data 模块在数据处理流程中的应用。 理解 Python 中的迭代器是解读 PyTorch 数据处理逻辑的关键。Dataset、Sampler 和 DataLoader 三者共同构建数据处理流程。 迭代器通过实现 __iter__() 和 __next__() 方法,支持数据的循环访问。Dataset 提供数据获取接口,Sampler 控制遍历顺序,DataLoader 负责加载和批处理数据。 1 Dataset Dataset 包括 Map-style 和 Iterable-style 两种,分别用于索引访问和迭代访问数据。 Map-style dataset 通过实现 __getitem__() 和 __len__() 方法,支持通过索引获取数据。 Iterable-style dataset 实现 __iter__() 方法,适用于随机访问且批次大小依赖于获取数据的场景。 2 Sampler Sampler 用于定义数据遍历的顺序,支持用户自定义和 PyTorch 提供的radio源码笔记内置实现。 3 DataLoader DataLoader 是数据加载的核心,支持 Map-style 和 Iterable-style Dataset,提供单多进程处理和批处理等功能。 通过参数配置,如 batch_size、drop_last、collate_fn 等,DataLoader 实现了数据的自动和手动批处理。 4 批处理 3.2.1 自动批处理(默认) DataLoader 默认使用自动批处理,通过参数控制批次生成和样本整理。 3.2.2 关闭自动批处理 关闭自动批处理,允许用户自定义批处理逻辑或处理单个样本。 3.2.3 collate_fn collate_fn 是手动批处理时的关键,用于整理单个样本为批次。 5 多进程 多进程处理通过 num_workers 参数启用,加速数据加载。 6 单进程 单进程模式下,数据加载可能影响计算流程,适用于数据量小且无需多进程的场景。 7 锁页内存 (Memory Pinning) Memory Pinning 技术确保数据在 GPU 加速过程中快速传输,提高性能。 8 代码讲解 通过具体代码分析,展示了 DataLoader 的初始化、迭代和数据获取过程,涉及迭代器、Sampler 和 Dataset 的交互。3. torch.utils里需要掌握的函数
在深度学习框架PyTorch中,torch.utils模块提供了许多实用工具,帮助我们有效地处理和加载数据。其中几个关键组件包括:
1. DataLoader:这是数据加载的核心工具,它封装了Dataset类,使得我们可以并行加载和处理数据,提高训练效率。使用DataLoader时,要特别注意add方法的iapp做源码运用。
2. Dataset:有Map-style的TensorDataset,它允许我们将数据和标签打包成Tensor,便于在索引过程中同时获取数据和对应的标签。源代码如下:
python
dataset = TensorDataset(data, labels)
3. IterableDataset:例如IterableDataset,其加载数据的方式更像迭代器,适用于需要逐批处理的数据源。同样,add方法在使用时也需要注意:
python
iterable_dataset = IterableDataset()
iterable_dataset.add(...)
4. ConcatDataset和ChainDataset:前者用于连接多个Dataset,后者则适用于连接多个IterableDataset,方便处理多源数据集。
5. Subset:用于从一个Dataset中提取指定索引序列的子集,这对于数据增强或者验证集划分非常有用。
通过熟练掌握这些torch.utils中的函数,我们可以更有效地组织和处理数据,提高模型训练的灵活性和性能。
MMDetection3D之DETR3D源码解析:整体流程篇
关于torch.distributed.launch的更多细节: blog.csdn.net/magic_ll/...
设置config file和work dir,work dir保存最终config,log等信息,work dir默认为path/to/user/work_dir/
作者将自定义的部分放在 'projects/mmdet3d_plugin/' 文件夹下,通过registry类注册模块,这里利用importlib导入模块并初始化自定义的类。
这里设置模型的输出信息保存路径、gpus等模型的运行时环境参数
这里初始化模型,初始化train_dataset和val_dataset
这部分完成了DataLoader的初始化,runner和hooks的初始化,并且按照workflow运行runner。
U-Net代码解读python[每周一篇]Week1-U-Net
本文提供Python版本的U-Net代码解读,内容覆盖数据加载、模型架构、训练及结果展示。源码及数据集可在线获取,详细步骤如下:
1. 数据集加载:`dataset.py`文件负责数据读取与预处理,确保输入数据符合模型需求。
2. U-Net模块定义:`unet_parts.py`中定义U-Net结构中的关键组件,包括卷积层、html 查看源码下采样层、上采样层和连接层,实现特征提取与语义分割。
3. U-Net模型构建:`unet_model.py`整合各部分,构建完整的U-Net模型,实现图像分割任务。
4. 模型训练:`train.py`脚本用于训练U-Net模型,设置超参数、损失函数、优化器等,以优化模型性能。
5. 结果展示:`resultshow.py`展示训练效果,包括分割结果可视化等。
源码链接:LYK/U-Net-Week1-(github.com)
训练流程及结果可参照:Pytorch深度学习实战教程(三):UNet模型训练,深度解析!-腾讯云开发者社区-腾讯云 (tencent.com)
通过以上步骤,开发者能够深入理解U-Net在生物医学图像分割领域的应用,掌握其代码实现与训练技巧。
PyTorch - DataLoader 源码解析(一)
本文为作者基于个人经验进行的初步解析,由于能力有限,可能存在遗漏或错误,敬请各位批评指正。
本文并未全面解析 DataLoader 的全部源码,仅对 DataLoader 与 Sampler 之间的联系进行了分析。以下内容均基于单线程迭代器代码展开,多线程情况将在后续文章中阐述。
以一个简单的数据集遍历代码为例,在循环中,数据是如何从 loader 中被取出的?通过断点调试,我们发现循环时,代码进入了 torch.utils.data.DataLoader 类的 __iter__() 方法,具体内容如下:
可以看到,该函数返回了一个迭代器,主要由 self._get_iterator() 和 self._iterator._reset(self) 提供。接下来,我们进入 self._get_iterator() 方法查看迭代器的产生过程。
在此方法中,根据 self.num_workers 的数量返回了不同的迭代器,主要区别在于多线程处理方式不同,但这两种迭代器都是继承自 _BaseDataLoaderIter 类。这里我们先看单线程下的例子,进入 _SingleProcessDataLoaderIter(self)。
构造函数并不复杂,在父类的构造器中执行了大量初始化属性,然后在自己的构造器中获得了一个 self._dataset_fetcher。此时继续单步前进断点,发现程序进入到了父类的 __next__() 方法中。
在分析代码之前,我们先整理一下目前得到的信息:
下面是 __next__() 方法的内容:
可以看到最后返回的是变量 data,而 data 是由 self._next_data() 生成的,进入这个方法,我们发现这个方法由子类负责实现。
在这个方法中,我们可以看到数据从 self._dataset_fecther.fetch() 中得到,需要依赖参数 index,而这个 index 由 self._next_index() 提供。进入这个方法可以发现它是由父类实现的。
而前面的 index 实际上是由这个 self._sampler_iter 迭代器提供的。查找 self._sampler_iter 的定义,我们发现其在构造函数中。
仔细观察,我们可以在倒数第 4 行发现 self._sampler_iter = iter(self._index_sampler),这个迭代器就是这里的 self._index_sampler 提供的,而 self._index_sampler 来自 loader._index_sampler。这个 loader 就是最外层的 DataLoader。因此我们回到 DataLoader 类中查看这个 _index_sampler 是如何得到的。
我们可以发现 _index_sampler 是一个由 @property 装饰得到的属性,会根据 self._auto_collation 来返回 self.batch_sampler 或者 self.sampler。再次整理已知信息,我们可以得到:
因此,只要知道 batch_sampler 和 sampler 如何返回 index,就能了解整个流程。
首先发现这两个属性来自 DataLoader 的构造函数,因此下面先分析构造函数。
由于构造函数代码量较大,因此这里只关注与 Sampler 相关的部分,代码如下:
在这里我们只关注以下部分:
代码首先检查了参数的合法性,然后进行了一轮初始化属性,接着判断了 dataset 的类型,处理完特殊情况。接下来,函数对参数冲突进行了判断,共判断了 3 种参数冲突:
检查完参数冲突后,函数开始创建 sampler 和 batch_sampler,如下图所示:
注意,仅当未指定 sampler 时才会创建 sampler;同理,仅在未指定 batch_sampler 且存在 batch_size 时才会创建 batch_sampler。
在 DataLoader 的构造函数中,如果不指定参数 batch_sampler,则默认创建 BatchSampler 对象。该对象需要一个 Sampler 对象作为参数参与构造。这也是在构造函数中,batch_sampler 与 sampler 冲突的原因之一。因为传入一个 batch_sampler 时,说明 sampler 已经作为参数完成了 batch_sampler 的构造,若再将 sampler 传入 DataLoader 是多余的。
以第一节中的简单代码为例,此时并未指定 Sampler 和 batch_sampler,也未指定 batch_size,默认为 1,因此在 DataLoader 构造时,创建了一个 SequencialSampler,并传入了 BatchSampler 进行构建。继续第一节中的断点,可以发现:
具体使用 sampler 还是 batch_sampler 来生成 index,取决于 _auto_collation,而从上面的代码发现,只要存在 self.batch_sampler 就永远使用 batch_sampler 来生成。batch_sampler 与 sampler 冲突的原因之二:若不设置冲突,那么使用者试图同时指定 batch_sampler 与 sampler 后,尤其是在使用者继承了新的 Sampler 子类后, sampler 在获取数据的时候完全没有被使用,这对开发者来说是一个困惑的现象,容易引起不易察觉的 BUG。
继续断点发现程序进入了 BatchSampler 的 __iter__() 方法,代码如下:
从代码中可以发现,程序不停地从 self.sampler 中获取 idx 加入列表,直到填满一个 batch 的量,并将这一整个 batch 的 index 返回到迭代器的 _next_data()。
此处由 self._dataset_fetcher.fetch(index) 来获取真正的数据,进入函数后看到:
这里依然根据 self.auto_collation(来自 DataLoader._auto_collation)进行分别处理,但是总体逻辑都是通过 self.dataset[] 来调用 Dataset 对象的 __getitem__() 方法。
此处的 Dataset 是来自 torchvision 的 DatasetFolder 对象,这里读取文件路径中的后,经过转换变为 Tensor 对象,与标签 target 一起返回。参数中的 index 是由迭代器的 self._dataset_fetcher.fetch() 传入。
整个获取数据的流程可以用以下流程图简略表示:
注意:
另附:
对于一条循环语句,在执行过程中发生了以下事件:
求决策树源代码。最好使用matlab实现。
function [Tree RulesMatrix]=DecisionTree(DataSet,AttributName)
%输入为训练集,为离散后的数字,如记录1:1 1 3 2 1;
%前面为属性列,最后一列为类标
if nargin<1
error('请输入数据集');
else
if isstr(DataSet)
[DataSet AttributValue]=readdata2(DataSet);
else
AttributValue=[];
end
end
if nargin<2
AttributName=[];
end
Attributs=[1:size(DataSet,2)-1];
Tree=CreatTree(DataSet,Attributs);
disp([char() 'The Decision Tree:']);
showTree(Tree,0,0,1,AttributValue,AttributName);
Rules=getRule(Tree);
RulesMatrix=zeros(size(Rules,1),size(DataSet,2));
for i=1:size(Rules,1)
rule=cell2struct(Rules(i,1),{ 'str'});
rule=str2num([rule.str([1:(find(rule.str=='C')-1)]) rule.str((find(rule.str=='C')+1):length(rule.str))]);
for j=1:(length(rule)-1)/2
RulesMatrix(i,rule((j-1)*2+1))=rule(j*2);
end
RulesMatrix(i,size(DataSet,2))=rule(length(rule));
end
end
function Tree=CreatTree(DataSet,Attributs) %决策树程序 输入为:数据集,属性名列表
%disp(Attributs);
[S ValRecords]=ComputEntropy(DataSet,0);
if(S==0) %当样例全为一类时退出,返回叶子节点类标
for i=1:length(ValRecords)
if(length(ValRecords(i).matrix)==size(DataSet,1))
break;
end
end
Tree.Attribut=i;
Tree.Child=[];
return;
end
if(length(Attributs)==0) %当条件属性个数为0时返回占多数的类标
mostlabelnum=0;
mostlabel=0;
for i=1:length(ValRecords)
if(length(ValRecords(i).matrix)>mostlabelnum)
mostlabelnum=length(ValRecords(i).matrix);
mostlabel=i;
end
end
Tree.Attribut=mostlabel;
Tree.Child=[];
return;
end
for i=1:length(Attributs)
[Sa(i) ValRecord]=ComputEntropy(DataSet,i);
Gains(i)=S-Sa(i);
AtrributMatric(i).val=ValRecord;
end
[maxval maxindex]=max(Gains);
Tree.Attribut=Attributs(maxindex);
Attributs2=[Attributs(1:maxindex-1) Attributs(maxindex+1:length(Attributs))];
for j=1:length(AtrributMatric(maxindex).val)
DataSet2=[DataSet(AtrributMatric(maxindex).val(j).matrix',1:maxindex-1) DataSet(AtrributMatric(maxindex).val(j).matrix',maxindex+1:size(DataSet,2))];
if(size(DataSet2,1)==0)
mostlabelnum=0;
mostlabel=0;
for i=1:length(ValRecords)
if(length(ValRecords(i).matrix)>mostlabelnum)
mostlabelnum=length(ValRecords(i).matrix);
mostlabel=i;
end
end
Tree.Child(j).root.Attribut=mostlabel;
Tree.Child(j).root.Child=[];
else
Tree.Child(j).root=CreatTree(DataSet2,Attributs2);
end
end
end
function [Entropy RecordVal]=ComputEntropy(DataSet,attribut) %计算信息熵
if(attribut==0)
clnum=0;
for i=1:size(DataSet,1)
if(DataSet(i,size(DataSet,2))>clnum) %防止下标越界
classnum(DataSet(i,size(DataSet,2)))=0;
clnum=DataSet(i,size(DataSet,2));
RecordVal(DataSet(i,size(DataSet,2))).matrix=[];
end
classnum(DataSet(i,size(DataSet,2)))=classnum(DataSet(i,size(DataSet,2)))+1;
RecordVal(DataSet(i,size(DataSet,2))).matrix=[RecordVal(DataSet(i,size(DataSet,2))).matrix i];
end
Entropy=0;
for j=1:length(classnum)
P=classnum(j)/size(DataSet,1);
if(P~=0)
Entropy=Entropy+(-P)*log2(P);
end
end
else
valnum=0;
for i=1:size(DataSet,1)
if(DataSet(i,attribut)>valnum) %防止参数下标越界
clnum(DataSet(i,attribut))=0;
valnum=DataSet(i,attribut);
Valueexamnum(DataSet(i,attribut))=0;
RecordVal(DataSet(i,attribut)).matrix=[]; %将编号保留下来,以方便后面按值分割数据集
end
if(DataSet(i,size(DataSet,2))>clnum(DataSet(i,attribut))) %防止下标越界
Value(DataSet(i,attribut)).classnum(DataSet(i,size(DataSet,2)))=0;
clnum(DataSet(i,attribut))=DataSet(i,size(DataSet,2));
end
Value(DataSet(i,attribut)).classnum(DataSet(i,size(DataSet,2)))= Value(DataSet(i,attribut)).classnum(DataSet(i,size(DataSet,2)))+1;
Valueexamnum(DataSet(i,attribut))= Valueexamnum(DataSet(i,attribut))+1;
RecordVal(DataSet(i,attribut)).matrix=[RecordVal(DataSet(i,attribut)).matrix i];
end
Entropy=0;
for j=1:valnum
Entropys=0;
for k=1:length(Value(j).classnum)
P=Value(j).classnum(k)/Valueexamnum(j);
if(P~=0)
Entropys=Entropys+(-P)*log2(P);
end
end
Entropy=Entropy+(Valueexamnum(j)/size(DataSet,1))*Entropys;
end
end
end
function showTree(Tree,level,value,branch,AttributValue,AttributName)
blank=[];
for i=1:level-1
if(branch(i)==1)
blank=[blank ' |'];
else
blank=[blank ' '];
end
end
blank=[blank ' '];
if(level==0)
blank=[' (The Root):'];
else
if isempty(AttributValue)
blank=[blank '|_____' int2str(value) '______'];
else
blank=[blank '|_____' value '______'];
end
end
if(length(Tree.Child)~=0) %非叶子节点
if isempty(AttributName)
disp([blank 'Attribut ' int2str(Tree.Attribut)]);
else
disp([blank 'Attribut ' AttributName{ Tree.Attribut}]);
end
if isempty(AttributValue)
for j=1:length(Tree.Child)-1
showTree(Tree.Child(j).root,level+1,j,[branch 1],AttributValue,AttributName);
end
showTree(Tree.Child(length(Tree.Child)).root,level+1,length(Tree.Child),[branch(1:length(branch)-1) 0 1],AttributValue,AttributName);
else
for j=1:length(Tree.Child)-1
showTree(Tree.Child(j).root,level+1,AttributValue{ Tree.Attribut}{ j},[branch 1],AttributValue,AttributName);
end
showTree(Tree.Child(length(Tree.Child)).root,level+1,AttributValue{ Tree.Attribut}{ length(Tree.Child)},[branch(1:length(branch)-1) 0 1],AttributValue,AttributName);
end
else
if isempty(AttributValue)
disp([blank 'leaf ' int2str(Tree.Attribut)]);
else
disp([blank 'leaf ' AttributValue{ length(AttributValue)}{ Tree.Attribut}]);
end
end
end
function Rules=getRule(Tree)
if(length(Tree.Child)~=0)
Rules={ };
for i=1:length(Tree.Child)
content=getRule(Tree.Child(i).root);
%disp(content);
%disp([num2str(Tree.Attribut) ',' num2str(i) ',']);
for j=1:size(content,1)
rule=cell2struct(content(j,1),{ 'str'});
content(j,1)={ [num2str(Tree.Attribut) ',' num2str(i) ',' rule.str]};
end
Rules=[Rules;content];
end
else
Rules={ ['C' num2str(Tree.Attribut)]};
end
end
详解数据读取--Dataset, Samper, Dataloader
在使用Pytorch进行模型训练时,数据读取过程常涉及到Dataset、Dataloader以及Sampler三个核心组件。通常情况下,我们自定义一个继承自Dataset的类来创建数据集,并作为Dataloader的初始化参数。Dataloader则根据初始化参数如batch_size和shuffle等完成数据加载。本文将深入解析这三个组件如何协同作用,完成数据读取任务。
在构建Dataloader时,两个关键参数sampler和batch_sampler及collate_fn通常被指定。sampler需要继承自torch.utils.data.Sampler类,而collate_fn通常是一个函数。未指定时它们具有默认值。数据读取流程是由Dataset、Dataloader和Sampler共同完成的。本文章将通过源码解析它们如何协同工作。
在理解Dataset、Dataloader和Sampler的联动之前,我们先对迭代器和生成器的概念进行梳理。迭代器iterator和可迭代对象iterable是Python中用于数据遍历的基础概念。一个iterable对象能够通过`iter()`函数获取其对应的iterator对象,而iterator对象在遍历时通过`next()`函数获取iterable中的下一个元素。实际上,for循环的`in`操作符在背后依赖于iterable和iterator的相互作用。
生成器generator是一种特殊的迭代器,具有`yield`关键字,可以实现函数的暂停与恢复,非常适合用于生成序列数据。其操作方式类似于函数调用,但能暂停执行并在需要时恢复,生成序列数据。
在数据读取流程中,Dataloader创建的迭代器最终指向Dataset。具体实现中,Dataloader首先初始化一个iterator对象,通常基于自定义的Sampler。当使用for循环遍历Dataloader时,实际上在遍历这个迭代器。Sampler负责确定数据读取顺序,而Dataset提供实际的数据点。Dataloader内部实现了一个`_next_data()`函数,负责从Dataset中提取并打包成批次数据,再通过`collate_fn`处理,最终生成训练批次。
在Dataloader中,`_next_index()`函数用于获取下一个批次的索引。这些索引由Sampler生成,通常基于随机或顺序策略。获取索引后,Dataloader使用`_dataset_fetcher.fetch(index)`从Dataset中读取数据点。Dataset可能根据其类型(如`IterableDataset`或继承自`Dataset`的自定义类)实现具体的读取逻辑,通常通过`__getitem__`方法获取指定索引的数据。
最后,数据点通过`collate_fn`进行打包,确保批次中的数据结构一致,适应模型训练的需求。整个过程展示了Dataset、Dataloader和Sampler如何协同工作,从数据集读取数据点,确定读取顺序,到最终生成可用于模型训练的批次数据。
综上所述,理解Dataset、Dataloader和Sampler的协同作用是构建高效数据加载系统的关键。通过精心设计这些组件,可以显著提高数据处理效率,优化模型训练过程。
2025-01-01 09:18
2025-01-01 08:54
2025-01-01 08:30
2025-01-01 08:09
2025-01-01 07:17
2025-01-01 06:51
2025-01-01 06:46
2025-01-01 06:37