## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#"""A wrapper for ResampledData to behave like pandas Resampler."""fromabcimportABCMeta,abstractmethodfromdistutils.versionimportLooseVersionfromfunctoolsimportpartialfromtypingimport(Any,Generic,List,Optional,Union,)importnumpyasnpimportpandasaspdfrompandas.tseries.frequenciesimportto_offsetifLooseVersion(pd.__version__)>=LooseVersion("1.3.0"):frompandas.core.commonimport_builtin_table# type: ignore[attr-defined]else:frompandas.core.baseimportSelectionMixin_builtin_table=SelectionMixin._builtin_table# type: ignore[attr-defined]frompysparkimportSparkContextfrompyspark.sqlimportColumn,functionsasFfrompyspark.sql.typesimport(NumericType,StructField,TimestampNTZType,DataType,)frompysparkimportpandasasps# For running doctests and reference resolution in PyCharm.frompyspark.pandas._typingimportFrameLikefrompyspark.pandas.frameimportDataFramefrompyspark.pandas.internalimport(InternalField,InternalFrame,SPARK_DEFAULT_INDEX_NAME,)frompyspark.pandas.missing.resampleimport(MissingPandasLikeDataFrameResampler,MissingPandasLikeSeriesResampler,)frompyspark.pandas.seriesimportSeries,first_seriesfrompyspark.pandas.utilsimport(scol_for,verify_temp_column_name,)frompyspark.sql.utilsimportis_remotefrompyspark.pandas.spark.functionsimporttimestampdiffclassResampler(Generic[FrameLike],metaclass=ABCMeta):""" Class for resampling datetimelike data, a groupby-like operation. It's easiest to use obj.resample(...) to use Resampler. Parameters ---------- psdf : DataFrame Returns ------- a Resampler of the appropriate type Notes ----- After resampling, see aggregate, apply, and transform functions. """def__init__(self,psdf:DataFrame,resamplekey:Optional[Series],rule:str,closed:Optional[str]=None,label:Optional[str]=None,agg_columns:List[Series]=[],):self._psdf=psdfself._resamplekey=resamplekeyself._offset=to_offset(rule)ifself._offset.rule_codenotin["A-DEC","M","D","H","T","S"]:raiseValueError("rule code {} is not supported".format(self._offset.rule_code))ifnotgetattr(self._offset,"n")>0:raiseValueError("rule offset must be positive")ifclosedisNone:self._closed="right"ifself._offset.rule_codein["A-DEC","M"]else"left"elifclosedin["left","right"]:self._closed=closedelse:raiseValueError("invalid closed: '{}'".format(closed))iflabelisNone:self._label="right"ifself._offset.rule_codein["A-DEC","M"]else"left"eliflabelin["left","right"]:self._label=labelelse:raiseValueError("invalid label: '{}'".format(label))self._agg_columns=agg_columns@propertydef_resamplekey_scol(self)->Column:ifself._resamplekeyisNone:returnself._psdf.index.spark.columnelse:returnself._resamplekey.spark.column@propertydef_resamplekey_type(self)->DataType:ifself._resamplekeyisNone:returnself._psdf.index.spark.data_typeelse:returnself._resamplekey.spark.data_type@propertydef_agg_columns_scols(self)->List[Column]:return[s.spark.columnforsinself._agg_columns]defget_make_interval(# type: ignore[return]self,unit:str,col:Union[Column,int,float])->Column:ifis_remote():frompyspark.sql.connect.functionsimportlit,make_intervalcol=colifnotisinstance(col,(int,float))elselit(col)# type: ignore[assignment]ifunit=="MONTH":returnmake_interval(months=col)# type: ignoreifunit=="HOUR":returnmake_interval(hours=col)# type: ignoreifunit=="MINUTE":returnmake_interval(mins=col)# type: ignoreifunit=="SECOND":returnmake_interval(secs=col)# type: ignoreelse:sql_utils=SparkContext._active_spark_context._jvm.PythonSQLUtilscol=col._jcifisinstance(col,Column)elseF.lit(col)._jcreturnsql_utils.makeInterval(unit,col)def_bin_timestamp(self,origin:pd.Timestamp,ts_scol:Column)->Column:key_type=self._resamplekey_typeorigin_scol=F.lit(origin)(rule_code,n)=(self._offset.rule_code,getattr(self._offset,"n"))left_closed,right_closed=(self._closed=="left",self._closed=="right")left_labeled,right_labeled=(self._label=="left",self._label=="right")ifrule_code=="A-DEC":assert(origin.month==12andorigin.day==31andorigin.hour==0andorigin.minute==0andorigin.second==0)diff=F.year(ts_scol)-F.year(origin_scol)mod=F.lit(0)ifn==1else(diff%n)edge_cond=(mod==0)&(F.month(ts_scol)==12)&(F.dayofmonth(ts_scol)==31)edge_label=F.year(ts_scol)ifleft_closedandright_labeled:edge_label+=nelifright_closedandleft_labeled:edge_label-=nifleft_labeled:non_edge_label=F.when(mod==0,F.year(ts_scol)-n).otherwise(F.year(ts_scol)-mod)else:non_edge_label=F.when(mod==0,F.year(ts_scol)).otherwise(F.year(ts_scol)-(mod-n))ret=F.to_timestamp(F.make_date(F.when(edge_cond,edge_label).otherwise(non_edge_label),F.lit(12),F.lit(31)))elifrule_code=="M":assert(origin.is_month_endandorigin.hour==0andorigin.minute==0andorigin.second==0)diff=((F.year(ts_scol)-F.year(origin_scol))*12+F.month(ts_scol)-F.month(origin_scol))mod=F.lit(0)ifn==1else(diff%n)edge_cond=(mod==0)&(F.dayofmonth(ts_scol)==F.dayofmonth(F.last_day(ts_scol)))truncated_ts_scol=F.date_trunc("MONTH",ts_scol)edge_label=truncated_ts_scolifleft_closedandright_labeled:edge_label+=self.get_make_interval("MONTH",n)elifright_closedandleft_labeled:edge_label-=self.get_make_interval("MONTH",n)ifleft_labeled:non_edge_label=F.when(mod==0,truncated_ts_scol-self.get_make_interval("MONTH",n),).otherwise(truncated_ts_scol-self.get_make_interval("MONTH",mod))else:non_edge_label=F.when(mod==0,truncated_ts_scol).otherwise(truncated_ts_scol-self.get_make_interval("MONTH",mod-n))ret=F.to_timestamp(F.last_day(F.when(edge_cond,edge_label).otherwise(non_edge_label)))elifrule_code=="D":assertorigin.hour==0andorigin.minute==0andorigin.second==0ifn==1:# NOTE: the logic to process '1D' is different from the cases with n>1,# since hour/minute/second parts are taken into account to determine edges!edge_cond=((F.hour(ts_scol)==0)&(F.minute(ts_scol)==0)&(F.second(ts_scol)==0))ifleft_closedandleft_labeled:ret=F.date_trunc("DAY",ts_scol)elifleft_closedandright_labeled:ret=F.date_trunc("DAY",F.date_add(ts_scol,1))elifright_closedandleft_labeled:ret=F.when(edge_cond,F.date_trunc("DAY",F.date_sub(ts_scol,1))).otherwise(F.date_trunc("DAY",ts_scol))else:ret=F.when(edge_cond,F.date_trunc("DAY",ts_scol)).otherwise(F.date_trunc("DAY",F.date_add(ts_scol,1)))else:diff=F.datediff(end=ts_scol,start=origin_scol)mod=diff%nedge_cond=mod==0truncated_ts_scol=F.date_trunc("DAY",ts_scol)edge_label=truncated_ts_scolifleft_closedandright_labeled:edge_label=F.date_add(truncated_ts_scol,n)elifright_closedandleft_labeled:edge_label=F.date_sub(truncated_ts_scol,n)ifleft_labeled:non_edge_label=F.date_sub(truncated_ts_scol,mod)else:non_edge_label=F.date_sub(truncated_ts_scol,mod-n)ret=F.when(edge_cond,edge_label).otherwise(non_edge_label)elifrule_codein["H","T","S"]:unit_mapping={"H":"HOUR","T":"MINUTE","S":"SECOND"}unit_str=unit_mapping[rule_code]truncated_ts_scol=F.date_trunc(unit_str,ts_scol)ifisinstance(key_type,TimestampNTZType):truncated_ts_scol=F.to_timestamp_ntz(truncated_ts_scol)diff=timestampdiff(unit_str,origin_scol,truncated_ts_scol)mod=F.lit(0)ifn==1else(diff%F.lit(n))ifrule_code=="H":assertorigin.minute==0andorigin.second==0edge_cond=(mod==0)&(F.minute(ts_scol)==0)&(F.second(ts_scol)==0)elifrule_code=="T":assertorigin.second==0edge_cond=(mod==0)&(F.second(ts_scol)==0)else:edge_cond=mod==0edge_label=truncated_ts_scolifleft_closedandright_labeled:edge_label+=self.get_make_interval(unit_str,n)elifright_closedandleft_labeled:edge_label-=self.get_make_interval(unit_str,n)ifleft_labeled:non_edge_label=F.when(mod==0,truncated_ts_scol).otherwise(truncated_ts_scol-self.get_make_interval(unit_str,mod))else:non_edge_label=F.when(mod==0,truncated_ts_scol+self.get_make_interval(unit_str,n),).otherwise(truncated_ts_scol-self.get_make_interval(unit_str,mod-n))ret=F.when(edge_cond,edge_label).otherwise(non_edge_label)else:raiseValueError("Got the unexpected unit {}".format(rule_code))ifisinstance(key_type,TimestampNTZType):returnF.to_timestamp_ntz(ret)else:returnretdef_downsample(self,f:str)->DataFrame:""" Downsample the defined function. Parameters ---------- how : string / mapped function **kwargs : kw args passed to how function """# a simple example to illustrate the computation:# dates = [# datetime(2012, 1, 2),# datetime(2012, 5, 3),# datetime(2022, 5, 3),# ]# index = pd.DatetimeIndex(dates)# pdf = pd.DataFrame(np.array([1,2,3]), index=index, columns=['A'])# pdf.resample('3Y').max()# A# 2012-12-31 2.0# 2015-12-31 NaN# 2018-12-31 NaN# 2021-12-31 NaN# 2024-12-31 3.0## in this case:# 1, obtain one origin point to bin all timestamps, we can get one (2009-12-31)# from the minimum timestamp (2012-01-02);# 2, the default intervals for 'Y' are right-closed, so intervals are:# (2009-12-31, 2012-12-31], (2012-12-31, 2015-12-31], (2015-12-31, 2018-12-31], ...# 3, bin all timestamps, for example, 2022-05-03 belongs to interval# (2021-12-31, 2024-12-31], since the default label is 'right', label it with the right# edge 2024-12-31;# 4, some intervals maybe too large for this down sampling, so we need to pad the dataframe# to avoid missing some results, like: 2015-12-31, 2018-12-31 and 2021-12-31;# 5, union the binned dataframe and padded dataframe, and apply aggregation 'max' to get# the final results;# one action to obtain the range, in the future we may cache it in the index.ts_min,ts_max=(self._psdf._internal.spark_frame.select(F.min(self._resamplekey_scol),F.max(self._resamplekey_scol)).toPandas().iloc[0])# the logic to obtain an origin point to bin the timestamps is too complex to follow,# here just use Pandas' resample on a 1-length series to get it.ts_origin=(pd.Series([0],index=[ts_min]).resample(rule=self._offset.freqstr,closed=self._closed,label="left").sum().index[0])assertts_origin<=ts_minbin_col_name="__tmp_resample_bin_col__"bin_col_label=verify_temp_column_name(self._psdf,bin_col_name)bin_col_field=InternalField(dtype=np.dtype("datetime64[ns]"),struct_field=StructField(bin_col_name,self._resamplekey_type,True),)bin_scol=self._bin_timestamp(ts_origin,self._resamplekey_scol)agg_columns=[psserforpsserinself._agg_columnsif(isinstance(psser.spark.data_type,NumericType))]assertlen(agg_columns)>0# in the binning side, label the timestamps according to the origin and the freq(rule)bin_sdf=self._psdf._internal.spark_frame.select(F.col(SPARK_DEFAULT_INDEX_NAME),bin_scol.alias(bin_col_name),*[psser.spark.columnforpsserinagg_columns],)# in the padding side, insert necessary points# again, directly apply Pandas' resample on a 2-length series to obtain the indicespad_sdf=(ps.from_pandas(pd.Series([0,0],index=[ts_min,ts_max]).resample(rule=self._offset.freqstr,closed=self._closed,label=self._label).sum().index)._internal.spark_frame.select(F.col(SPARK_DEFAULT_INDEX_NAME).alias(bin_col_name)).where((ts_min<=F.col(bin_col_name))&(F.col(bin_col_name)<=ts_max)))# union the above two spark dataframes.sdf=bin_sdf.unionByName(pad_sdf,allowMissingColumns=True).where(~F.isnull(F.col(bin_col_name)))internal=InternalFrame(spark_frame=sdf,index_spark_columns=[scol_for(sdf,SPARK_DEFAULT_INDEX_NAME)],data_spark_columns=[F.col(bin_col_name)]+[scol_for(sdf,psser._internal.data_spark_column_names[0])forpsserinagg_columns],column_labels=[bin_col_label]+[psser._column_labelforpsserinagg_columns],data_fields=[bin_col_field]+[psser._internal.data_fields[0].copy(nullable=True)forpsserinagg_columns],column_label_names=self._psdf._internal.column_label_names,)psdf:DataFrame=DataFrame(internal)groupby=psdf.groupby(psdf._psser_for(bin_col_label),dropna=False)downsampled=getattr(groupby,f)()downsampled.index.name=Nonereturndownsampled@abstractmethoddef_handle_output(self,psdf:DataFrame)->FrameLike:pass